1use std::io::{BufRead, BufReader, Stdin, Write};
19use std::panic::AssertUnwindSafe;
20
21use tokio::sync::Mutex as AsyncMutex;
22
23use crate::application::entity::{CreateRowInput, CreateRowsBatchInput};
24use crate::application::ports::RuntimeEntityPort;
25use crate::json::{self as json, Value};
26use crate::runtime::{RedDBRuntime, RuntimeQueryResult};
27use crate::storage::query::unified::UnifiedRecord;
28use crate::storage::schema::Value as SchemaValue;
29use reddb_client_connector::RedDBClient;
30
31enum Backend<'a> {
44 Local(&'a RedDBRuntime),
45 Remote(Box<RemoteBackend<'a>>),
46}
47
48struct RemoteBackend<'a> {
49 client: AsyncMutex<RedDBClient>,
50 tokio_rt: &'a tokio::runtime::Runtime,
51}
52
53pub const PROTOCOL_VERSION: &str = "1.0";
55const STDIO_BULK_INSERT_CHUNK_ROWS: usize = 500;
56
57pub mod error_code {
59 pub const PARSE_ERROR: &str = "PARSE_ERROR";
60 pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
61 pub const INVALID_PARAMS: &str = "INVALID_PARAMS";
62 pub const QUERY_ERROR: &str = "QUERY_ERROR";
63 pub const NOT_FOUND: &str = "NOT_FOUND";
64 pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
65 pub const TX_ALREADY_OPEN: &str = "TX_ALREADY_OPEN";
68 pub const NO_TX_OPEN: &str = "NO_TX_OPEN";
71 pub const TX_REPLAY_FAILED: &str = "TX_REPLAY_FAILED";
75 pub const TX_NOT_SUPPORTED_REMOTE: &str = "TX_NOT_SUPPORTED_REMOTE";
77 pub const CURSOR_NOT_FOUND: &str = "CURSOR_NOT_FOUND";
81 pub const CURSOR_LIMIT_EXCEEDED: &str = "CURSOR_LIMIT_EXCEEDED";
83}
84
85pub(crate) const MAX_CURSORS_PER_SESSION: usize = 64;
89pub(crate) const DEFAULT_CURSOR_BATCH_SIZE: usize = 100;
93pub(crate) const MAX_CURSOR_BATCH_SIZE: usize = 10_000;
96
97struct StdioPreparedStatement {
120 shape: crate::storage::query::ast::QueryExpr,
121 parameter_count: usize,
122}
123
124pub(crate) struct Session {
125 next_tx_id: u64,
126 current_tx: Option<OpenTx>,
127 next_cursor_id: u64,
128 cursors: std::collections::HashMap<u64, Cursor>,
129 next_prepared_id: u64,
131 prepared: std::collections::HashMap<u64, StdioPreparedStatement>,
133}
134
135impl Session {
136 pub(crate) fn new() -> Self {
137 Self {
138 next_tx_id: 1,
139 current_tx: None,
140 next_cursor_id: 1,
141 cursors: std::collections::HashMap::new(),
142 next_prepared_id: 1,
143 prepared: std::collections::HashMap::new(),
144 }
145 }
146
147 fn open_tx(&mut self) -> Result<u64, (&'static str, String)> {
148 if let Some(tx) = &self.current_tx {
149 return Err((
150 error_code::TX_ALREADY_OPEN,
151 format!("transaction {} already open in this session", tx.tx_id),
152 ));
153 }
154 let tx_id = self.next_tx_id;
155 self.next_tx_id = self.next_tx_id.saturating_add(1);
156 self.current_tx = Some(OpenTx {
157 tx_id,
158 write_set: Vec::new(),
159 });
160 Ok(tx_id)
161 }
162
163 fn take_tx(&mut self) -> Option<OpenTx> {
164 self.current_tx.take()
165 }
166
167 fn current_tx_mut(&mut self) -> Option<&mut OpenTx> {
168 self.current_tx.as_mut()
169 }
170
171 #[allow(dead_code)]
172 fn has_tx(&self) -> bool {
173 self.current_tx.is_some()
174 }
175
176 fn insert_cursor(&mut self, cursor: Cursor) -> Result<u64, (&'static str, String)> {
179 if self.cursors.len() >= MAX_CURSORS_PER_SESSION {
180 return Err((
181 error_code::CURSOR_LIMIT_EXCEEDED,
182 format!(
183 "session already holds {} cursors (max {}) — close some before opening new ones",
184 self.cursors.len(),
185 MAX_CURSORS_PER_SESSION
186 ),
187 ));
188 }
189 let id = self.next_cursor_id;
190 self.next_cursor_id = self.next_cursor_id.saturating_add(1);
191 let mut cursor = cursor;
192 cursor.cursor_id = id;
193 self.cursors.insert(id, cursor);
194 Ok(id)
195 }
196
197 fn cursor_mut(&mut self, id: u64) -> Option<&mut Cursor> {
198 self.cursors.get_mut(&id)
199 }
200
201 fn drop_cursor(&mut self, id: u64) -> Option<Cursor> {
202 self.cursors.remove(&id)
203 }
204
205 fn clear_cursors(&mut self) {
206 self.cursors.clear();
207 }
208}
209
210impl Default for Session {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216struct OpenTx {
218 tx_id: u64,
219 write_set: Vec<PendingSql>,
220}
221
222enum PendingSql {
226 Insert(String),
227 Delete(String),
228 #[allow(dead_code)] Update(String),
230}
231
232impl PendingSql {
233 fn sql(&self) -> &str {
234 match self {
235 PendingSql::Insert(s) | PendingSql::Delete(s) | PendingSql::Update(s) => s,
236 }
237 }
238}
239
240pub(crate) struct Cursor {
252 cursor_id: u64,
253 columns: Vec<String>,
254 rows: Vec<UnifiedRecord>,
255 position: usize,
256}
257
258impl Cursor {
259 fn new(columns: Vec<String>, rows: Vec<UnifiedRecord>) -> Self {
260 Self {
261 cursor_id: 0, columns,
263 rows,
264 position: 0,
265 }
266 }
267
268 fn total(&self) -> usize {
269 self.rows.len()
270 }
271
272 fn remaining(&self) -> usize {
273 self.rows.len().saturating_sub(self.position)
274 }
275
276 fn is_exhausted(&self) -> bool {
277 self.position >= self.rows.len()
278 }
279
280 fn take_batch(&mut self, batch_size: usize) -> &[UnifiedRecord] {
283 let end = (self.position + batch_size).min(self.rows.len());
284 let slice = &self.rows[self.position..end];
285 self.position = end;
286 slice
287 }
288}
289
290pub fn run(runtime: &RedDBRuntime) -> i32 {
296 run_with_io(runtime, std::io::stdin(), &mut std::io::stdout())
297}
298
299pub fn run_remote(endpoint: &str, token: Option<String>) -> i32 {
306 let tokio_rt = match tokio::runtime::Builder::new_current_thread()
307 .enable_all()
308 .build()
309 {
310 Ok(rt) => rt,
311 Err(e) => {
312 tracing::error!(err = %e, "rpc: failed to build tokio runtime");
313 return 1;
314 }
315 };
316 let client = match tokio_rt.block_on(RedDBClient::connect(endpoint, token)) {
317 Ok(c) => c,
318 Err(e) => {
319 tracing::error!(endpoint, err = %e, "rpc: failed to connect");
320 return 1;
321 }
322 };
323 let backend = Backend::Remote(Box::new(RemoteBackend {
324 client: AsyncMutex::new(client),
325 tokio_rt: &tokio_rt,
326 }));
327 run_backend(&backend, std::io::stdin(), &mut std::io::stdout())
328}
329
330pub fn run_with_io<W: Write>(runtime: &RedDBRuntime, stdin: Stdin, stdout: &mut W) -> i32 {
332 run_backend(&Backend::Local(runtime), stdin, stdout)
333}
334
335static STDIO_SESSION_CONN_ID: std::sync::atomic::AtomicU64 =
342 std::sync::atomic::AtomicU64::new(1_000_000);
343
344fn next_stdio_conn_id() -> u64 {
345 STDIO_SESSION_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
346}
347
348fn run_backend<W: Write>(backend: &Backend<'_>, stdin: Stdin, stdout: &mut W) -> i32 {
349 let reader = BufReader::new(stdin.lock());
350 let mut session = Session::new();
351 let conn_id = next_stdio_conn_id();
355 crate::runtime::impl_core::set_current_connection_id(conn_id);
356 for line_result in reader.lines() {
357 let line = match line_result {
358 Ok(l) => l,
359 Err(e) => {
360 let _ = writeln!(
361 stdout,
362 "{}",
363 error_response(&Value::Null, error_code::INTERNAL_ERROR, &e.to_string())
364 );
365 let _ = stdout.flush();
366 return 1;
367 }
368 };
369 if line.trim().is_empty() {
370 continue;
371 }
372 let response = handle_line(backend, &mut session, &line);
373 if writeln!(stdout, "{}", response).is_err() || stdout.flush().is_err() {
374 return 1;
375 }
376 if response.contains("\"__close__\":true") {
377 return 0;
378 }
379 }
380 let _ = session.take_tx();
384 crate::runtime::impl_core::clear_current_connection_id();
385 0
386}
387
388fn handle_line(backend: &Backend<'_>, session: &mut Session, line: &str) -> String {
392 let parsed: Value = match json::from_str(line) {
393 Ok(v) => v,
394 Err(err) => {
395 return error_response(
396 &Value::Null,
397 error_code::PARSE_ERROR,
398 &format!("invalid JSON: {err}"),
399 );
400 }
401 };
402
403 let id = parsed.get("id").cloned().unwrap_or(Value::Null);
404
405 let method = match parsed.get("method").and_then(Value::as_str) {
406 Some(m) => m.to_string(),
407 None => {
408 return error_response(&id, error_code::INVALID_REQUEST, "missing 'method' field");
409 }
410 };
411
412 let params = parsed.get("params").cloned().unwrap_or(Value::Null);
413
414 let dispatch = std::panic::catch_unwind(AssertUnwindSafe(|| match backend {
415 Backend::Local(rt) => dispatch_method(rt, session, &method, ¶ms),
416 Backend::Remote(remote) => {
417 if matches!(
422 method.as_str(),
423 "tx.begin"
424 | "tx.commit"
425 | "tx.rollback"
426 | "query.open"
427 | "query.next"
428 | "query.close"
429 ) {
430 Err((
431 error_code::TX_NOT_SUPPORTED_REMOTE,
432 format!("{method} is not supported over remote gRPC yet"),
433 ))
434 } else {
435 dispatch_method_remote(&remote.client, remote.tokio_rt, &method, ¶ms)
436 }
437 }
438 }));
439
440 match dispatch {
441 Ok(Ok(result)) => success_response(&id, &result, method == "close"),
442 Ok(Err((code, msg))) => error_response(&id, code, &msg),
443 Err(_) => error_response(&id, error_code::INTERNAL_ERROR, "handler panicked (caught)"),
444 }
445}
446
447fn dispatch_method(
450 runtime: &RedDBRuntime,
451 session: &mut Session,
452 method: &str,
453 params: &Value,
454) -> Result<Value, (&'static str, String)> {
455 match method {
456 "tx.begin" => {
457 let tx_id = session.open_tx()?;
458 Ok(Value::Object(
459 [
460 ("tx_id".to_string(), Value::Number(tx_id as f64)),
461 (
462 "isolation".to_string(),
463 Value::String("read_committed_deferred".to_string()),
464 ),
465 ]
466 .into_iter()
467 .collect(),
468 ))
469 }
470
471 "tx.commit" => {
472 let tx = session.take_tx().ok_or((
473 error_code::NO_TX_OPEN,
474 "no transaction is open in this session".to_string(),
475 ))?;
476 let tx_id = tx.tx_id;
477 let op_count = tx.write_set.len();
478
479 let replay: Result<(u64, usize), (usize, String)> = (|| {
486 runtime
487 .execute_query("BEGIN")
488 .map_err(|e| (0usize, format!("BEGIN: {e}")))?;
489 let mut total_affected: u64 = 0;
490 for (idx, op) in tx.write_set.iter().enumerate() {
491 match runtime.execute_query(op.sql()) {
492 Ok(qr) => total_affected += qr.affected_rows,
493 Err(e) => {
494 let _ = runtime.execute_query("ROLLBACK");
495 return Err((idx, e.to_string()));
496 }
497 }
498 }
499 runtime
500 .execute_query("COMMIT")
501 .map_err(|e| (op_count, format!("COMMIT: {e}")))?;
502 Ok((total_affected, op_count))
503 })();
504
505 match replay {
506 Ok((affected, replayed)) => Ok(Value::Object(
507 [
508 ("tx_id".to_string(), Value::Number(tx_id as f64)),
509 ("ops_replayed".to_string(), Value::Number(replayed as f64)),
510 ("affected".to_string(), Value::Number(affected as f64)),
511 ]
512 .into_iter()
513 .collect(),
514 )),
515 Err((failed_idx, msg)) => Err((
516 error_code::TX_REPLAY_FAILED,
517 format!(
518 "tx {tx_id} replay failed at op {failed_idx}/{op_count}: {msg} \
519 (ops 0..{failed_idx} already applied and are NOT rolled back)"
520 ),
521 )),
522 }
523 }
524
525 "query.open" => {
526 let sql = params.get("sql").and_then(Value::as_str).ok_or((
527 error_code::INVALID_PARAMS,
528 "missing 'sql' string".to_string(),
529 ))?;
530 let qr = runtime
531 .execute_query(sql)
532 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
533
534 let columns: Vec<String> = qr
538 .result
539 .records
540 .first()
541 .map(|first| {
542 let mut keys: Vec<String> = first
543 .column_names()
544 .into_iter()
545 .map(|k| k.to_string())
546 .collect();
547 keys.sort();
548 keys
549 })
550 .unwrap_or_default();
551
552 let cursor = Cursor::new(columns.clone(), qr.result.records);
553 let total = cursor.total();
554 let cursor_id = session.insert_cursor(cursor)?;
555
556 Ok(Value::Object(
557 [
558 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
559 (
560 "columns".to_string(),
561 Value::Array(columns.into_iter().map(Value::String).collect()),
562 ),
563 ("total_rows".to_string(), Value::Number(total as f64)),
564 ]
565 .into_iter()
566 .collect(),
567 ))
568 }
569
570 "query.next" => {
571 let cursor_id = params
572 .get("cursor_id")
573 .and_then(|v| v.as_f64())
574 .map(|n| n as u64)
575 .ok_or((
576 error_code::INVALID_PARAMS,
577 "missing 'cursor_id' number".to_string(),
578 ))?;
579 let batch_size = params
580 .get("batch_size")
581 .and_then(|v| v.as_f64())
582 .map(|n| n as usize)
583 .unwrap_or(DEFAULT_CURSOR_BATCH_SIZE)
584 .clamp(1, MAX_CURSOR_BATCH_SIZE);
585
586 let (rows, done, remaining) = {
589 let cursor = session.cursor_mut(cursor_id).ok_or((
590 error_code::CURSOR_NOT_FOUND,
591 format!("cursor {cursor_id} not found"),
592 ))?;
593 let batch = cursor.take_batch(batch_size);
594 let rows_json: Vec<Value> = batch.iter().map(record_to_json_object).collect();
595 (rows_json, cursor.is_exhausted(), cursor.remaining())
596 };
597
598 if done {
599 let _ = session.drop_cursor(cursor_id);
602 }
603
604 Ok(Value::Object(
605 [
606 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
607 ("rows".to_string(), Value::Array(rows)),
608 ("done".to_string(), Value::Bool(done)),
609 ("remaining".to_string(), Value::Number(remaining as f64)),
610 ]
611 .into_iter()
612 .collect(),
613 ))
614 }
615
616 "query.close" => {
617 let cursor_id = params
618 .get("cursor_id")
619 .and_then(|v| v.as_f64())
620 .map(|n| n as u64)
621 .ok_or((
622 error_code::INVALID_PARAMS,
623 "missing 'cursor_id' number".to_string(),
624 ))?;
625 let existed = session.drop_cursor(cursor_id).is_some();
626 if !existed {
627 return Err((
628 error_code::CURSOR_NOT_FOUND,
629 format!("cursor {cursor_id} not found"),
630 ));
631 }
632 Ok(Value::Object(
633 [
634 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
635 ("closed".to_string(), Value::Bool(true)),
636 ]
637 .into_iter()
638 .collect(),
639 ))
640 }
641
642 "tx.rollback" => {
643 let tx = session.take_tx().ok_or((
644 error_code::NO_TX_OPEN,
645 "no transaction is open in this session".to_string(),
646 ))?;
647 let ops_discarded = tx.write_set.len();
648 Ok(Value::Object(
649 [
650 ("tx_id".to_string(), Value::Number(tx.tx_id as f64)),
651 (
652 "ops_discarded".to_string(),
653 Value::Number(ops_discarded as f64),
654 ),
655 ]
656 .into_iter()
657 .collect(),
658 ))
659 }
660
661 "version" => Ok(Value::Object(
662 [
663 (
664 "version".to_string(),
665 Value::String(env!("CARGO_PKG_VERSION").to_string()),
666 ),
667 (
668 "protocol".to_string(),
669 Value::String(PROTOCOL_VERSION.to_string()),
670 ),
671 ]
672 .into_iter()
673 .collect(),
674 )),
675
676 "health" => Ok(Value::Object(
677 [
678 ("ok".to_string(), Value::Bool(true)),
679 (
680 "version".to_string(),
681 Value::String(env!("CARGO_PKG_VERSION").to_string()),
682 ),
683 ]
684 .into_iter()
685 .collect(),
686 )),
687
688 "query" => {
689 let sql = params.get("sql").and_then(Value::as_str).ok_or((
690 error_code::INVALID_PARAMS,
691 "missing 'sql' string".to_string(),
692 ))?;
693
694 let bind_values: Option<Vec<SchemaValue>> = params
697 .get("params")
698 .map(|v| {
699 v.as_array()
700 .ok_or((
701 error_code::INVALID_PARAMS,
702 "'params' must be an array".to_string(),
703 ))
704 .map(|arr| arr.iter().map(json_value_to_schema_value).collect())
705 })
706 .transpose()?;
707
708 if let Some(binds) = bind_values {
709 use crate::storage::query::modes::parse_multi;
710 use crate::storage::query::user_params;
711 let parsed =
712 parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
713 let bound = user_params::bind(&parsed, &binds)
714 .map_err(|e| (error_code::INVALID_PARAMS, e.to_string()))?;
715 let qr = runtime
716 .execute_query_expr(bound)
717 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
718 return Ok(query_result_to_json(&qr));
719 }
720
721 let qr = runtime
722 .execute_query(sql)
723 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
724 Ok(query_result_to_json(&qr))
725 }
726
727 "prepare" => {
736 use crate::storage::query::modes::parse_multi;
737 use crate::storage::query::planner::shape::parameterize_query_expr;
738
739 let sql = params.get("sql").and_then(Value::as_str).ok_or((
740 error_code::INVALID_PARAMS,
741 "missing 'sql' string".to_string(),
742 ))?;
743 let parsed = parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
744 let (shape, parameter_count) = if let Some(prepared) = parameterize_query_expr(&parsed)
745 {
746 (prepared.shape, prepared.parameter_count)
747 } else {
748 (parsed, 0)
749 };
750 let id = session.next_prepared_id;
751 session.next_prepared_id = session.next_prepared_id.saturating_add(1);
752 session.prepared.insert(
753 id,
754 StdioPreparedStatement {
755 shape,
756 parameter_count,
757 },
758 );
759 Ok(Value::Object(
760 [
761 ("prepared_id".to_string(), Value::Number(id as f64)),
762 (
763 "parameter_count".to_string(),
764 Value::Number(parameter_count as f64),
765 ),
766 ]
767 .into_iter()
768 .collect(),
769 ))
770 }
771
772 "execute_prepared" => {
773 use crate::storage::query::planner::shape::bind_parameterized_query;
774 use crate::storage::schema::Value as SV;
775
776 let id = params
777 .get("prepared_id")
778 .and_then(Value::as_f64)
779 .map(|n| n as u64)
780 .ok_or((
781 error_code::INVALID_PARAMS,
782 "missing 'prepared_id'".to_string(),
783 ))?;
784
785 let stmt = session.prepared.get(&id).ok_or((
786 error_code::QUERY_ERROR,
787 format!("no prepared statement with id {id}"),
788 ))?;
789
790 let binds_json: Vec<Value> = params
792 .get("binds")
793 .and_then(Value::as_array)
794 .map(|a| a.to_vec())
795 .unwrap_or_default();
796 if binds_json.len() != stmt.parameter_count {
797 return Err((
798 error_code::INVALID_PARAMS,
799 format!(
800 "expected {} bind values, got {}",
801 stmt.parameter_count,
802 binds_json.len()
803 ),
804 ));
805 }
806
807 let binds: Vec<SV> = binds_json.iter().map(json_value_to_schema_value).collect();
809
810 let expr = if stmt.parameter_count == 0 {
812 stmt.shape.clone()
813 } else {
814 bind_parameterized_query(&stmt.shape, &binds, stmt.parameter_count)
815 .ok_or((error_code::QUERY_ERROR, "bind failed".to_string()))?
816 };
817
818 let qr = runtime
819 .execute_query_expr(expr)
820 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
821 Ok(query_result_to_json(&qr))
822 }
823
824 "insert" => {
825 let collection = params.get("collection").and_then(Value::as_str).ok_or((
826 error_code::INVALID_PARAMS,
827 "missing 'collection' string".to_string(),
828 ))?;
829 let payload = params.get("payload").ok_or((
830 error_code::INVALID_PARAMS,
831 "missing 'payload' object".to_string(),
832 ))?;
833 let payload_obj = payload.as_object().ok_or((
834 error_code::INVALID_PARAMS,
835 "'payload' must be a JSON object".to_string(),
836 ))?;
837 if let Some(tx) = session.current_tx_mut() {
838 let sql = build_insert_sql(collection, payload_obj.iter());
839 tx.write_set.push(PendingSql::Insert(sql));
840 return Ok(pending_tx_response(tx.tx_id));
841 }
842
843 let output = runtime
844 .create_row(flat_payload_to_row_input(collection, payload_obj))
845 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
846 let mut out = json::Map::new();
847 out.insert("affected".to_string(), Value::Number(1.0));
848 out.insert("id".to_string(), Value::String(output.id.raw().to_string()));
849 Ok(Value::Object(out))
850 }
851
852 "bulk_insert" => {
853 let collection = params.get("collection").and_then(Value::as_str).ok_or((
854 error_code::INVALID_PARAMS,
855 "missing 'collection' string".to_string(),
856 ))?;
857 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
858 error_code::INVALID_PARAMS,
859 "missing 'payloads' array".to_string(),
860 ))?;
861
862 let mut objects = Vec::with_capacity(payloads.len());
863 for entry in payloads {
864 objects.push(entry.as_object().ok_or((
865 error_code::INVALID_PARAMS,
866 "each payload must be a JSON object".to_string(),
867 ))?);
868 }
869
870 if let Some(tx) = session.current_tx_mut() {
871 let mut buffered: u64 = 0;
872 for obj in &objects {
873 let sql = build_insert_sql(collection, obj.iter());
874 tx.write_set.push(PendingSql::Insert(sql));
875 buffered += 1;
876 }
877 let tx_id = tx.tx_id;
878 return Ok(Value::Object(
879 [
880 ("affected".to_string(), Value::Number(0.0)),
881 ("buffered".to_string(), Value::Number(buffered as f64)),
882 ("pending".to_string(), Value::Bool(true)),
883 ("tx_id".to_string(), Value::Number(tx_id as f64)),
884 ]
885 .into_iter()
886 .collect(),
887 ));
888 }
889
890 if should_bulk_insert_graph(runtime, collection, &objects) {
891 return bulk_insert_graph(runtime, collection, &objects)
892 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()));
893 }
894
895 let mut total_affected: u64 = 0;
896 let mut ids = Vec::with_capacity(objects.len());
897 for chunk in objects.chunks(STDIO_BULK_INSERT_CHUNK_ROWS) {
898 let rows = chunk
899 .iter()
900 .map(|obj| flat_payload_to_row_input(collection, obj))
901 .collect();
902 let outputs = runtime
903 .create_rows_batch(CreateRowsBatchInput {
904 collection: collection.to_string(),
905 rows,
906 suppress_events: false,
907 })
908 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
909 total_affected += outputs.len() as u64;
910 ids.extend(
911 outputs
912 .into_iter()
913 .map(|output| Value::String(output.id.raw().to_string())),
914 );
915 }
916 let mut out = json::Map::new();
917 out.insert("affected".to_string(), Value::Number(total_affected as f64));
918 out.insert("ids".to_string(), Value::Array(ids));
919 Ok(Value::Object(out))
920 }
921
922 "get" => {
923 let collection = params.get("collection").and_then(Value::as_str).ok_or((
924 error_code::INVALID_PARAMS,
925 "missing 'collection' string".to_string(),
926 ))?;
927 let id = params.get("id").and_then(Value::as_str).ok_or((
928 error_code::INVALID_PARAMS,
929 "missing 'id' string".to_string(),
930 ))?;
931 let sql = format!("SELECT * FROM {collection} WHERE red_entity_id = {id} LIMIT 1");
932 let qr = runtime
933 .execute_query(&sql)
934 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
935 let entity = qr
936 .result
937 .records
938 .first()
939 .map(record_to_json_object)
940 .unwrap_or(Value::Null);
941 Ok(Value::Object(
942 [("entity".to_string(), entity)].into_iter().collect(),
943 ))
944 }
945
946 "delete" => {
947 let collection = params.get("collection").and_then(Value::as_str).ok_or((
948 error_code::INVALID_PARAMS,
949 "missing 'collection' string".to_string(),
950 ))?;
951 let id = params.get("id").and_then(Value::as_str).ok_or((
952 error_code::INVALID_PARAMS,
953 "missing 'id' string".to_string(),
954 ))?;
955 let sql = format!("DELETE FROM {collection} WHERE red_entity_id = {id}");
956
957 if let Some(tx) = session.current_tx_mut() {
958 tx.write_set.push(PendingSql::Delete(sql));
959 return Ok(pending_tx_response(tx.tx_id));
960 }
961
962 let qr = runtime
963 .execute_query(&sql)
964 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
965 Ok(Value::Object(
966 [(
967 "affected".to_string(),
968 Value::Number(qr.affected_rows as f64),
969 )]
970 .into_iter()
971 .collect(),
972 ))
973 }
974
975 "close" => {
976 let _ = session.take_tx();
981 session.clear_cursors();
982 let _ = runtime.checkpoint();
983 Ok(Value::Null)
984 }
985
986 "auth.login"
991 | "auth.whoami"
992 | "auth.change_password"
993 | "auth.create_api_key"
994 | "auth.revoke_api_key" => {
995 let _ = (session, params);
996 Err((
997 error_code::INVALID_REQUEST,
998 format!(
999 "{method}: auth methods are only available on grpc:// connections; \
1000 embedded modes (memory://, file://) inherit caller privileges"
1001 ),
1002 ))
1003 }
1004
1005 other => Err((
1006 error_code::INVALID_REQUEST,
1007 format!("unknown method: {other}"),
1008 )),
1009 }
1010}
1011
1012fn success_response(id: &Value, result: &Value, is_close: bool) -> String {
1017 let mut envelope = json::Map::new();
1022 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1023 envelope.insert("id".to_string(), id.clone());
1024 envelope.insert("result".to_string(), result.clone());
1025 if is_close {
1026 envelope.insert("__close__".to_string(), Value::Bool(true));
1027 }
1028 Value::Object(envelope).to_string_compact()
1029}
1030
1031fn error_response(id: &Value, code: &str, message: &str) -> String {
1032 let mut err = json::Map::new();
1033 err.insert("code".to_string(), Value::String(code.to_string()));
1034 err.insert("message".to_string(), Value::String(message.to_string()));
1035 err.insert("data".to_string(), Value::Null);
1036
1037 let mut envelope = json::Map::new();
1038 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1039 envelope.insert("id".to_string(), id.clone());
1040 envelope.insert("error".to_string(), Value::Object(err));
1041 Value::Object(envelope).to_string_compact()
1042}
1043
1044fn pending_tx_response(tx_id: u64) -> Value {
1051 Value::Object(
1052 [
1053 ("affected".to_string(), Value::Number(0.0)),
1054 ("pending".to_string(), Value::Bool(true)),
1055 ("tx_id".to_string(), Value::Number(tx_id as f64)),
1056 ]
1057 .into_iter()
1058 .collect(),
1059 )
1060}
1061
1062pub(crate) fn build_insert_sql<'a, I>(collection: &str, fields: I) -> String
1063where
1064 I: Iterator<Item = (&'a String, &'a Value)>,
1065{
1066 let mut cols = Vec::new();
1067 let mut vals = Vec::new();
1068 for (k, v) in fields {
1069 cols.push(k.clone());
1070 vals.push(value_to_sql_literal(v));
1071 }
1072 format!(
1073 "INSERT INTO {collection} ({}) VALUES ({})",
1074 cols.join(", "),
1075 vals.join(", "),
1076 )
1077}
1078
1079fn flat_payload_to_row_input(
1080 collection: &str,
1081 payload: &json::Map<String, Value>,
1082) -> CreateRowInput {
1083 CreateRowInput {
1084 collection: collection.to_string(),
1085 fields: payload
1086 .iter()
1087 .map(|(key, value)| (key.clone(), json_value_to_schema_value(value)))
1088 .collect(),
1089 metadata: Vec::new(),
1090 node_links: Vec::new(),
1091 vector_links: Vec::new(),
1092 }
1093}
1094
1095fn bulk_insert_chunk_count(row_count: usize) -> usize {
1096 if row_count == 0 {
1097 0
1098 } else {
1099 ((row_count - 1) / STDIO_BULK_INSERT_CHUNK_ROWS) + 1
1100 }
1101}
1102
1103pub(crate) fn should_bulk_insert_graph(
1104 runtime: &RedDBRuntime,
1105 collection: &str,
1106 payloads: &[&json::Map<String, Value>],
1107) -> bool {
1108 let graph_shaped = payloads
1109 .iter()
1110 .all(|payload| payload.get("label").and_then(Value::as_str).is_some());
1111 if !graph_shaped {
1112 return false;
1113 }
1114
1115 matches!(
1116 runtime
1117 .db()
1118 .catalog_model_snapshot()
1119 .collections
1120 .iter()
1121 .find(|descriptor| descriptor.name == collection)
1122 .map(|descriptor| descriptor.declared_model.unwrap_or(descriptor.model)),
1123 Some(crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed)
1124 )
1125}
1126
1127pub(crate) fn bulk_insert_graph(
1128 runtime: &RedDBRuntime,
1129 collection: &str,
1130 payloads: &[&json::Map<String, Value>],
1131) -> crate::RedDBResult<Value> {
1132 use crate::application::entity_payload::{parse_create_edge_input, parse_create_node_input};
1133 use crate::application::ports::RuntimeEntityPort;
1134
1135 let mut ids = Vec::with_capacity(payloads.len());
1136 for payload in payloads {
1137 let input_payload = normalize_flat_graph_payload(payload);
1138 let id = if payload.contains_key("from") || payload.contains_key("to") {
1139 runtime
1140 .create_edge(parse_create_edge_input(
1141 collection.to_string(),
1142 &input_payload,
1143 )?)?
1144 .id
1145 } else {
1146 runtime
1147 .create_node(parse_create_node_input(
1148 collection.to_string(),
1149 &input_payload,
1150 )?)?
1151 .id
1152 };
1153 ids.push(Value::Number(id.raw() as f64));
1154 }
1155
1156 let mut out = json::Map::new();
1157 out.insert("affected".to_string(), Value::Number(ids.len() as f64));
1158 out.insert("ids".to_string(), Value::Array(ids));
1159 Ok(Value::Object(out))
1160}
1161
1162fn normalize_flat_graph_payload(payload: &json::Map<String, Value>) -> Value {
1163 if payload.contains_key("properties") || payload.contains_key("fields") {
1164 return Value::Object(payload.clone());
1165 }
1166
1167 let is_edge = payload.contains_key("from") || payload.contains_key("to");
1168 let mut normalized = payload.clone();
1169 let mut properties = json::Map::new();
1170 for (key, value) in payload {
1171 let reserved = if is_edge {
1172 matches!(
1173 key.as_str(),
1174 "label"
1175 | "from"
1176 | "to"
1177 | "weight"
1178 | "metadata"
1179 | "properties"
1180 | "fields"
1181 | "_ttl_ms"
1182 | "_expires_at"
1183 )
1184 } else {
1185 matches!(
1186 key.as_str(),
1187 "label"
1188 | "node_type"
1189 | "metadata"
1190 | "links"
1191 | "embeddings"
1192 | "properties"
1193 | "fields"
1194 | "_ttl_ms"
1195 | "_expires_at"
1196 )
1197 };
1198 if !reserved {
1199 properties.insert(key.clone(), value.clone());
1200 }
1201 }
1202 if !properties.is_empty() {
1203 normalized.insert("properties".to_string(), Value::Object(properties));
1204 }
1205 Value::Object(normalized)
1206}
1207
1208pub(crate) fn value_to_sql_literal(v: &Value) -> String {
1209 match v {
1210 Value::Null => "NULL".to_string(),
1211 Value::Bool(b) => b.to_string(),
1212 Value::Number(n) => {
1213 if n.fract() == 0.0 {
1214 format!("{}", *n as i64)
1215 } else {
1216 n.to_string()
1217 }
1218 }
1219 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1220 other => format!("'{}'", other.to_string_compact().replace('\'', "''")),
1221 }
1222}
1223
1224pub(crate) fn query_result_to_json(qr: &RuntimeQueryResult) -> Value {
1225 if let Some(ask) = ask_query_result_to_json(qr) {
1226 return ask;
1227 }
1228
1229 let mut envelope = json::Map::new();
1230 envelope.insert(
1231 "statement".to_string(),
1232 Value::String(qr.statement_type.to_string()),
1233 );
1234 envelope.insert(
1235 "affected".to_string(),
1236 Value::Number(qr.affected_rows as f64),
1237 );
1238
1239 let mut columns = Vec::new();
1240 if let Some(first) = qr.result.records.first() {
1241 let mut keys: Vec<String> = first
1242 .column_names()
1243 .into_iter()
1244 .map(|k| k.to_string())
1245 .collect();
1246 keys.sort();
1247 columns = keys.into_iter().map(Value::String).collect();
1248 }
1249 envelope.insert("columns".to_string(), Value::Array(columns));
1250
1251 let rows: Vec<Value> = qr
1252 .result
1253 .records
1254 .iter()
1255 .map(record_to_json_object)
1256 .collect();
1257 envelope.insert("rows".to_string(), Value::Array(rows));
1258
1259 Value::Object(envelope)
1260}
1261
1262fn ask_query_result_to_json(qr: &RuntimeQueryResult) -> Option<Value> {
1263 if qr.statement != "ask" {
1264 return None;
1265 }
1266 let row = qr.result.records.first()?;
1267 let answer = text_field(row, "answer")?;
1268 let provider = text_field(row, "provider").unwrap_or_default();
1269 let model = text_field(row, "model").unwrap_or_default();
1270 let sources_flat_json = json_field(row, "sources_flat").unwrap_or(Value::Array(Vec::new()));
1271 let citations_json = json_field(row, "citations").unwrap_or(Value::Array(Vec::new()));
1272 let validation_json = json_field(row, "validation").unwrap_or(Value::Object(json::Map::new()));
1273
1274 let effective_mode = match text_field(row, "mode").as_deref() {
1275 Some("lenient") => crate::runtime::ai::ask_response_envelope::Mode::Lenient,
1276 _ => crate::runtime::ai::ask_response_envelope::Mode::Strict,
1277 };
1278
1279 let result = crate::runtime::ai::ask_response_envelope::AskResult {
1280 answer,
1281 sources_flat: envelope_sources_flat(&sources_flat_json),
1282 citations: envelope_citations(&citations_json),
1283 validation: envelope_validation(&validation_json),
1284 cache_hit: bool_field(row, "cache_hit").unwrap_or(false),
1285 provider,
1286 model,
1287 prompt_tokens: u32_field(row, "prompt_tokens").unwrap_or(0),
1288 completion_tokens: u32_field(row, "completion_tokens").unwrap_or(0),
1289 cost_usd: f64_field(row, "cost_usd").unwrap_or(0.0),
1290 effective_mode,
1291 retry_count: u32_field(row, "retry_count").unwrap_or(0),
1292 };
1293 Some(crate::runtime::ai::ask_response_envelope::build(&result))
1294}
1295
1296fn record_field<'a>(record: &'a UnifiedRecord, name: &str) -> Option<&'a SchemaValue> {
1297 record
1298 .iter_fields()
1299 .find_map(|(key, value)| (key.as_ref() == name).then_some(value))
1300}
1301
1302fn text_field(record: &UnifiedRecord, name: &str) -> Option<String> {
1303 match record_field(record, name)? {
1304 SchemaValue::Text(s) => Some(s.to_string()),
1305 SchemaValue::Email(s)
1306 | SchemaValue::Url(s)
1307 | SchemaValue::NodeRef(s)
1308 | SchemaValue::EdgeRef(s) => Some(s.clone()),
1309 other => Some(format!("{other}")),
1310 }
1311}
1312
1313fn u32_field(record: &UnifiedRecord, name: &str) -> Option<u32> {
1314 match record_field(record, name)? {
1315 SchemaValue::Integer(n) => (*n >= 0).then_some((*n).min(u32::MAX as i64) as u32),
1316 SchemaValue::UnsignedInteger(n) => Some((*n).min(u32::MAX as u64) as u32),
1317 SchemaValue::BigInt(n)
1318 | SchemaValue::TimestampMs(n)
1319 | SchemaValue::Timestamp(n)
1320 | SchemaValue::Duration(n)
1321 | SchemaValue::Decimal(n) => (*n >= 0).then_some((*n).min(u32::MAX as i64) as u32),
1322 SchemaValue::Float(n) => (*n >= 0.0).then_some((*n).min(u32::MAX as f64) as u32),
1323 _ => None,
1324 }
1325}
1326
1327fn f64_field(record: &UnifiedRecord, name: &str) -> Option<f64> {
1328 match record_field(record, name)? {
1329 SchemaValue::Integer(n) => Some(*n as f64),
1330 SchemaValue::UnsignedInteger(n) => Some(*n as f64),
1331 SchemaValue::BigInt(n)
1332 | SchemaValue::TimestampMs(n)
1333 | SchemaValue::Timestamp(n)
1334 | SchemaValue::Duration(n)
1335 | SchemaValue::Decimal(n) => Some(*n as f64),
1336 SchemaValue::Float(n) => Some(*n),
1337 _ => None,
1338 }
1339}
1340
1341fn bool_field(record: &UnifiedRecord, name: &str) -> Option<bool> {
1342 match record_field(record, name)? {
1343 SchemaValue::Boolean(value) => Some(*value),
1344 _ => None,
1345 }
1346}
1347
1348fn json_field(record: &UnifiedRecord, name: &str) -> Option<Value> {
1349 match record_field(record, name)? {
1350 SchemaValue::Json(bytes) => json::from_slice(bytes).ok(),
1351 SchemaValue::Text(text) => json::from_str(text).ok(),
1352 _ => None,
1353 }
1354}
1355
1356fn envelope_sources_flat(
1357 value: &Value,
1358) -> Vec<crate::runtime::ai::ask_response_envelope::SourceRow> {
1359 value
1360 .as_array()
1361 .unwrap_or(&[])
1362 .iter()
1363 .filter_map(|source| {
1364 let urn = source.get("urn").and_then(Value::as_str)?.to_string();
1365 let payload = source
1366 .get("payload")
1367 .and_then(Value::as_str)
1368 .map(ToString::to_string)
1369 .unwrap_or_else(|| source.to_string_compact());
1370 Some(crate::runtime::ai::ask_response_envelope::SourceRow { urn, payload })
1371 })
1372 .collect()
1373}
1374
1375fn envelope_citations(value: &Value) -> Vec<crate::runtime::ai::ask_response_envelope::Citation> {
1376 value
1377 .as_array()
1378 .unwrap_or(&[])
1379 .iter()
1380 .filter_map(|citation| {
1381 let marker = citation.get("marker").and_then(Value::as_u64)?;
1382 let urn = citation.get("urn").and_then(Value::as_str)?.to_string();
1383 Some(crate::runtime::ai::ask_response_envelope::Citation {
1384 marker: marker.min(u32::MAX as u64) as u32,
1385 urn,
1386 })
1387 })
1388 .collect()
1389}
1390
1391fn envelope_validation(value: &Value) -> crate::runtime::ai::ask_response_envelope::Validation {
1392 crate::runtime::ai::ask_response_envelope::Validation {
1393 ok: value.get("ok").and_then(Value::as_bool).unwrap_or(true),
1394 warnings: validation_items(value, "warnings")
1395 .into_iter()
1396 .map(
1397 |(kind, detail)| crate::runtime::ai::ask_response_envelope::ValidationWarning {
1398 kind,
1399 detail,
1400 },
1401 )
1402 .collect(),
1403 errors: validation_items(value, "errors")
1404 .into_iter()
1405 .map(
1406 |(kind, detail)| crate::runtime::ai::ask_response_envelope::ValidationError {
1407 kind,
1408 detail,
1409 },
1410 )
1411 .collect(),
1412 }
1413}
1414
1415fn validation_items(value: &Value, key: &str) -> Vec<(String, String)> {
1416 value
1417 .get(key)
1418 .and_then(Value::as_array)
1419 .unwrap_or(&[])
1420 .iter()
1421 .filter_map(|item| {
1422 Some((
1423 item.get("kind").and_then(Value::as_str)?.to_string(),
1424 item.get("detail")
1425 .and_then(Value::as_str)
1426 .unwrap_or("")
1427 .to_string(),
1428 ))
1429 })
1430 .collect()
1431}
1432
1433pub(crate) fn insert_result_to_json(qr: &RuntimeQueryResult) -> Value {
1434 let mut envelope = json::Map::new();
1435 envelope.insert(
1436 "affected".to_string(),
1437 Value::Number(qr.affected_rows as f64),
1438 );
1439 if let Some(first) = qr.result.records.first() {
1441 if let Some(id_val) = first
1442 .iter_fields()
1443 .find(|(k, _)| {
1444 let s: &str = k;
1445 s == "_entity_id"
1446 })
1447 .map(|(_, v)| schema_value_to_json(v))
1448 {
1449 envelope.insert("id".to_string(), id_val);
1450 }
1451 }
1452 Value::Object(envelope)
1453}
1454
1455fn record_to_json_object(record: &UnifiedRecord) -> Value {
1456 let mut map = json::Map::new();
1457 let mut entries: Vec<(&str, &SchemaValue)> =
1460 record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
1461 entries.sort_by(|a, b| a.0.cmp(b.0));
1462 for (k, v) in entries {
1463 map.insert(k.to_string(), schema_value_to_json(v));
1464 }
1465 Value::Object(map)
1466}
1467
1468fn schema_value_to_json(v: &SchemaValue) -> Value {
1469 match v {
1470 SchemaValue::Null => Value::Null,
1471 SchemaValue::Boolean(b) => Value::Bool(*b),
1472 SchemaValue::Integer(n) => Value::Number(*n as f64),
1473 SchemaValue::UnsignedInteger(n) => Value::Number(*n as f64),
1474 SchemaValue::Float(n) if n.is_finite() => Value::Number(*n),
1475 SchemaValue::Float(n) => {
1476 let token = if n.is_nan() {
1477 "NaN"
1478 } else if n.is_sign_positive() {
1479 "Infinity"
1480 } else {
1481 "-Infinity"
1482 };
1483 single_key_object("$float", Value::String(token.to_string()))
1484 }
1485 SchemaValue::BigInt(n) => Value::Number(*n as f64),
1486 SchemaValue::TimestampMs(n) | SchemaValue::Duration(n) | SchemaValue::Decimal(n) => {
1487 Value::Number(*n as f64)
1488 }
1489 SchemaValue::Timestamp(n) => single_key_object("$ts", Value::String(n.to_string())),
1490 SchemaValue::Password(_) | SchemaValue::Secret(_) => Value::String("***".to_string()),
1491 SchemaValue::Text(s) => Value::String(s.to_string()),
1492 SchemaValue::Blob(bytes) => {
1493 single_key_object("$bytes", Value::String(base64_encode(bytes)))
1494 }
1495 SchemaValue::Json(bytes) => {
1496 crate::presentation::entity_json::storage_json_bytes_to_json(bytes)
1497 }
1498 SchemaValue::Uuid(bytes) => single_key_object("$uuid", Value::String(format_uuid(bytes))),
1499 SchemaValue::Email(s)
1500 | SchemaValue::Url(s)
1501 | SchemaValue::NodeRef(s)
1502 | SchemaValue::EdgeRef(s) => Value::String(s.clone()),
1503 other => Value::String(format!("{other}")),
1504 }
1505}
1506
1507fn single_key_object(key: &str, value: Value) -> Value {
1508 Value::Object([(key.to_string(), value)].into_iter().collect())
1509}
1510
1511pub(crate) fn json_value_to_schema_value(v: &Value) -> SchemaValue {
1515 match v {
1516 Value::Null => SchemaValue::Null,
1517 Value::Bool(b) => SchemaValue::Boolean(*b),
1518 Value::Number(n) => {
1519 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
1520 SchemaValue::Integer(*n as i64)
1521 } else {
1522 SchemaValue::Float(*n)
1523 }
1524 }
1525 Value::String(s) => SchemaValue::text(s.clone()),
1526 Value::Array(items) => {
1527 if items.iter().all(|v| matches!(v, Value::Number(_))) {
1531 let floats: Vec<f32> = items
1532 .iter()
1533 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
1534 .collect();
1535 SchemaValue::Vector(floats)
1536 } else {
1537 SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default())
1538 }
1539 }
1540 Value::Object(map) => {
1541 if map.len() == 1 {
1542 if let Some(Value::String(encoded)) = map.get("$bytes") {
1543 if let Ok(bytes) = base64_decode(encoded) {
1544 return SchemaValue::Blob(bytes);
1545 }
1546 }
1547 if let Some(value) = map.get("$ts") {
1548 if let Some(ts) = json_i64(value) {
1549 return SchemaValue::Timestamp(ts);
1550 }
1551 }
1552 if let Some(Value::String(value)) = map.get("$uuid") {
1553 if let Ok(uuid) = crate::crypto::Uuid::parse_str(value) {
1554 return SchemaValue::Uuid(*uuid.as_bytes());
1555 }
1556 }
1557 if let Some(Value::String(value)) = map.get("$float") {
1558 return match value.as_str() {
1559 "NaN" => SchemaValue::Float(f64::NAN),
1560 "Infinity" | "+Infinity" | "inf" | "+inf" => {
1561 SchemaValue::Float(f64::INFINITY)
1562 }
1563 "-Infinity" | "-inf" => SchemaValue::Float(f64::NEG_INFINITY),
1564 _ => SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default()),
1565 };
1566 }
1567 }
1568 SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default())
1569 }
1570 }
1571}
1572
1573fn json_i64(value: &Value) -> Option<i64> {
1574 match value {
1575 Value::Number(n) => {
1576 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
1577 Some(*n as i64)
1578 } else {
1579 None
1580 }
1581 }
1582 Value::String(s) => s.parse::<i64>().ok(),
1583 _ => None,
1584 }
1585}
1586
1587fn format_uuid(bytes: &[u8; 16]) -> String {
1588 format!(
1589 "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
1590 bytes[0],
1591 bytes[1],
1592 bytes[2],
1593 bytes[3],
1594 bytes[4],
1595 bytes[5],
1596 bytes[6],
1597 bytes[7],
1598 bytes[8],
1599 bytes[9],
1600 bytes[10],
1601 bytes[11],
1602 bytes[12],
1603 bytes[13],
1604 bytes[14],
1605 bytes[15]
1606 )
1607}
1608
1609fn base64_encode(bytes: &[u8]) -> String {
1610 const TABLE: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1611 let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1612 let mut chunks = bytes.chunks_exact(3);
1613 for chunk in chunks.by_ref() {
1614 let n = ((chunk[0] as u32) << 16) | ((chunk[1] as u32) << 8) | chunk[2] as u32;
1615 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1616 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1617 out.push(TABLE[((n >> 6) & 0x3f) as usize] as char);
1618 out.push(TABLE[(n & 0x3f) as usize] as char);
1619 }
1620 match chunks.remainder() {
1621 [] => {}
1622 [a] => {
1623 let n = (*a as u32) << 16;
1624 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1625 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1626 out.push('=');
1627 out.push('=');
1628 }
1629 [a, b] => {
1630 let n = ((*a as u32) << 16) | ((*b as u32) << 8);
1631 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1632 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1633 out.push(TABLE[((n >> 6) & 0x3f) as usize] as char);
1634 out.push('=');
1635 }
1636 _ => unreachable!(),
1637 }
1638 out
1639}
1640
1641fn base64_decode(input: &str) -> Result<Vec<u8>, String> {
1642 let bytes = input.as_bytes();
1643 if bytes.len() % 4 != 0 {
1644 return Err("base64 length must be a multiple of 4".to_string());
1645 }
1646 let mut out = Vec::with_capacity(bytes.len() / 4 * 3);
1647 for chunk in bytes.chunks_exact(4) {
1648 let pad = chunk.iter().rev().take_while(|&&b| b == b'=').count();
1649 let a = base64_value(chunk[0])?;
1650 let b = base64_value(chunk[1])?;
1651 let c = if chunk[2] == b'=' {
1652 0
1653 } else {
1654 base64_value(chunk[2])?
1655 };
1656 let d = if chunk[3] == b'=' {
1657 0
1658 } else {
1659 base64_value(chunk[3])?
1660 };
1661 let n = ((a as u32) << 18) | ((b as u32) << 12) | ((c as u32) << 6) | d as u32;
1662 out.push(((n >> 16) & 0xff) as u8);
1663 if pad < 2 {
1664 out.push(((n >> 8) & 0xff) as u8);
1665 }
1666 if pad < 1 {
1667 out.push((n & 0xff) as u8);
1668 }
1669 }
1670 Ok(out)
1671}
1672
1673fn base64_value(byte: u8) -> Result<u8, String> {
1674 match byte {
1675 b'A'..=b'Z' => Ok(byte - b'A'),
1676 b'a'..=b'z' => Ok(byte - b'a' + 26),
1677 b'0'..=b'9' => Ok(byte - b'0' + 52),
1678 b'+' => Ok(62),
1679 b'/' => Ok(63),
1680 b'=' => Ok(0),
1681 _ => Err(format!("invalid base64 character: {}", byte as char)),
1682 }
1683}
1684
1685fn dispatch_method_remote(
1695 client: &AsyncMutex<RedDBClient>,
1696 tokio_rt: &tokio::runtime::Runtime,
1697 method: &str,
1698 params: &Value,
1699) -> Result<Value, (&'static str, String)> {
1700 match method {
1701 "version" => Ok(Value::Object(
1702 [
1703 (
1704 "version".to_string(),
1705 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1706 ),
1707 (
1708 "protocol".to_string(),
1709 Value::String(PROTOCOL_VERSION.to_string()),
1710 ),
1711 ]
1712 .into_iter()
1713 .collect(),
1714 )),
1715
1716 "health" => {
1717 let result = tokio_rt.block_on(async {
1718 let mut guard = client.lock().await;
1719 guard.health_status().await
1720 });
1721 match result {
1722 Ok(status) => Ok(Value::Object(
1723 [
1724 ("ok".to_string(), Value::Bool(status.healthy)),
1725 ("state".to_string(), Value::String(status.state)),
1726 (
1727 "checked_at_unix_ms".to_string(),
1728 Value::Number(status.checked_at_unix_ms as f64),
1729 ),
1730 (
1731 "version".to_string(),
1732 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1733 ),
1734 ]
1735 .into_iter()
1736 .collect(),
1737 )),
1738 Err(e) => Err((error_code::INTERNAL_ERROR, e.to_string())),
1739 }
1740 }
1741
1742 "query" => {
1743 let sql = params.get("sql").and_then(Value::as_str).ok_or((
1744 error_code::INVALID_PARAMS,
1745 "missing 'sql' string".to_string(),
1746 ))?;
1747 let json_str = tokio_rt
1748 .block_on(async {
1749 let mut guard = client.lock().await;
1750 guard.query(sql).await
1751 })
1752 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1753 let parsed = json::from_str::<Value>(&json_str)
1758 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1759 Ok(parsed)
1760 }
1761
1762 "insert" => {
1763 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1764 error_code::INVALID_PARAMS,
1765 "missing 'collection' string".to_string(),
1766 ))?;
1767 let payload = params.get("payload").ok_or((
1768 error_code::INVALID_PARAMS,
1769 "missing 'payload' object".to_string(),
1770 ))?;
1771 if payload.as_object().is_none() {
1772 return Err((
1773 error_code::INVALID_PARAMS,
1774 "'payload' must be a JSON object".to_string(),
1775 ));
1776 }
1777 let payload_json = payload.to_string_compact();
1778 let reply = tokio_rt
1779 .block_on(async {
1780 let mut guard = client.lock().await;
1781 guard.create_row_entity(collection, &payload_json).await
1782 })
1783 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1784 let mut out = json::Map::new();
1785 out.insert("affected".to_string(), Value::Number(1.0));
1786 out.insert("id".to_string(), Value::String(reply.id.to_string()));
1787 Ok(Value::Object(out))
1788 }
1789
1790 "bulk_insert" => {
1791 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1792 error_code::INVALID_PARAMS,
1793 "missing 'collection' string".to_string(),
1794 ))?;
1795 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
1796 error_code::INVALID_PARAMS,
1797 "missing 'payloads' array".to_string(),
1798 ))?;
1799 let mut encoded = Vec::with_capacity(payloads.len());
1800 for entry in payloads {
1801 if entry.as_object().is_none() {
1802 return Err((
1803 error_code::INVALID_PARAMS,
1804 "each payload must be a JSON object".to_string(),
1805 ));
1806 }
1807 encoded.push(entry.to_string_compact());
1808 }
1809 let status = tokio_rt
1810 .block_on(async {
1811 let mut guard = client.lock().await;
1812 guard.bulk_create_rows(collection, encoded).await
1813 })
1814 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1815 Ok(Value::Object(
1816 [
1817 ("affected".to_string(), Value::Number(status.count as f64)),
1818 (
1819 "ids".to_string(),
1820 Value::Array(
1821 status
1822 .ids
1823 .into_iter()
1824 .map(|id| Value::Number(id as f64))
1825 .collect(),
1826 ),
1827 ),
1828 ]
1829 .into_iter()
1830 .collect(),
1831 ))
1832 }
1833
1834 "get" => {
1835 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1836 error_code::INVALID_PARAMS,
1837 "missing 'collection' string".to_string(),
1838 ))?;
1839 let id = params.get("id").and_then(Value::as_str).ok_or((
1840 error_code::INVALID_PARAMS,
1841 "missing 'id' string".to_string(),
1842 ))?;
1843 let sql = format!("SELECT * FROM {collection} WHERE red_entity_id = {id} LIMIT 1");
1844 let json_str = tokio_rt
1845 .block_on(async {
1846 let mut guard = client.lock().await;
1847 guard.query(&sql).await
1848 })
1849 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1850 let parsed = json::from_str::<Value>(&json_str)
1851 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1852 let entity = parsed
1855 .get("rows")
1856 .and_then(Value::as_array)
1857 .and_then(|rows| rows.first().cloned())
1858 .unwrap_or(Value::Null);
1859 Ok(Value::Object(
1860 [("entity".to_string(), entity)].into_iter().collect(),
1861 ))
1862 }
1863
1864 "delete" => {
1865 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1866 error_code::INVALID_PARAMS,
1867 "missing 'collection' string".to_string(),
1868 ))?;
1869 let id = params.get("id").and_then(Value::as_str).ok_or((
1870 error_code::INVALID_PARAMS,
1871 "missing 'id' string".to_string(),
1872 ))?;
1873 let id = id.parse::<u64>().map_err(|_| {
1874 (
1875 error_code::INVALID_PARAMS,
1876 "id must be a numeric string".to_string(),
1877 )
1878 })?;
1879 let _reply = tokio_rt
1880 .block_on(async {
1881 let mut guard = client.lock().await;
1882 guard.delete_entity(collection, id).await
1883 })
1884 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1885 Ok(Value::Object(
1886 [("affected".to_string(), Value::Number(1.0))]
1887 .into_iter()
1888 .collect(),
1889 ))
1890 }
1891
1892 "close" => Ok(Value::Null),
1893
1894 other => Err((
1895 error_code::INVALID_REQUEST,
1896 format!("unknown method: {other}"),
1897 )),
1898 }
1899}
1900
1901#[cfg(test)]
1902mod tests {
1903 use super::*;
1904 use crate::json::json;
1905 use proptest::prelude::*;
1906
1907 fn make_runtime() -> RedDBRuntime {
1908 RedDBRuntime::in_memory().expect("in-memory runtime")
1909 }
1910
1911 fn create_graph_collection(rt: &RedDBRuntime, name: &str) {
1912 let db = rt.db();
1913 db.store()
1914 .create_collection(name)
1915 .expect("create collection");
1916 let now = std::time::SystemTime::now()
1917 .duration_since(std::time::UNIX_EPOCH)
1918 .unwrap_or_default()
1919 .as_millis();
1920 db.save_collection_contract(crate::physical::CollectionContract {
1921 name: name.to_string(),
1922 declared_model: crate::catalog::CollectionModel::Graph,
1923 schema_mode: crate::catalog::SchemaMode::Dynamic,
1924 origin: crate::physical::ContractOrigin::Explicit,
1925 version: 1,
1926 created_at_unix_ms: now,
1927 updated_at_unix_ms: now,
1928 default_ttl_ms: None,
1929 vector_dimension: None,
1930 vector_metric: None,
1931 context_index_fields: Vec::new(),
1932 declared_columns: Vec::new(),
1933 table_def: None,
1934 timestamps_enabled: false,
1935 context_index_enabled: false,
1936 append_only: false,
1937 subscriptions: Vec::new(),
1938 })
1939 .expect("save graph contract");
1940 }
1941
1942 fn handle(rt: &RedDBRuntime, line: &str) -> String {
1943 let mut session = Session::new();
1944 handle_line(&Backend::Local(rt), &mut session, line)
1945 }
1946
1947 fn query_request(id: u64, sql: &str) -> String {
1948 let mut params = json::Map::new();
1949 params.insert("sql".to_string(), Value::String(sql.to_string()));
1950
1951 let mut request = json::Map::new();
1952 request.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1953 request.insert("id".to_string(), Value::Number(id as f64));
1954 request.insert("method".to_string(), Value::String("query".to_string()));
1955 request.insert("params".to_string(), Value::Object(params));
1956 Value::Object(request).to_string_compact()
1957 }
1958
1959 fn query_request_with_params(id: u64, sql: &str, binds: Vec<Value>) -> String {
1960 let mut params = json::Map::new();
1961 params.insert("sql".to_string(), Value::String(sql.to_string()));
1962 params.insert("params".to_string(), Value::Array(binds));
1963
1964 let mut request = json::Map::new();
1965 request.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1966 request.insert("id".to_string(), Value::Number(id as f64));
1967 request.insert("method".to_string(), Value::String("query".to_string()));
1968 request.insert("params".to_string(), Value::Object(params));
1969 Value::Object(request).to_string_compact()
1970 }
1971
1972 fn with_session<F>(rt: &RedDBRuntime, f: F)
1975 where
1976 F: FnOnce(&dyn Fn(&str) -> String, &RedDBRuntime),
1977 {
1978 let session = std::cell::RefCell::new(Session::new());
1979 let call = |line: &str| -> String {
1980 let mut s = session.borrow_mut();
1981 handle_line(&Backend::Local(rt), &mut s, line)
1982 };
1983 f(&call, rt);
1984 }
1985
1986 fn result_rows(response: &str) -> Vec<Value> {
1987 json::from_str::<Value>(response)
1988 .expect("json response")
1989 .get("result")
1990 .and_then(|result| result.get("rows"))
1991 .and_then(Value::as_array)
1992 .map(|rows| rows.to_vec())
1993 .unwrap_or_default()
1994 }
1995
1996 fn result_name_kind(response: &str) -> Vec<(String, String)> {
1997 result_rows(response)
1998 .into_iter()
1999 .map(|row| {
2000 let object = row.as_object().expect("row object");
2001 let name = object
2002 .get("name")
2003 .and_then(Value::as_str)
2004 .expect("row name")
2005 .to_string();
2006 let kind = object
2007 .get("kind")
2008 .and_then(Value::as_str)
2009 .expect("row kind")
2010 .to_string();
2011 (name, kind)
2012 })
2013 .collect()
2014 }
2015
2016 fn json_scalar_param() -> impl Strategy<Value = Value> {
2017 prop_oneof![
2018 Just(Value::Null),
2019 any::<bool>().prop_map(Value::Bool),
2020 (-1000_i64..1000_i64).prop_map(|n| Value::Number(n as f64)),
2021 "[a-z']{0,8}".prop_map(Value::String),
2022 ]
2023 }
2024
2025 fn sql_literal_for_json(value: &Value) -> String {
2026 match value {
2027 Value::Null => "NULL".to_string(),
2028 Value::Bool(true) => "TRUE".to_string(),
2029 Value::Bool(false) => "FALSE".to_string(),
2030 Value::Number(n) => format!("{n:.0}"),
2031 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2032 _ => panic!("unsupported scalar param: {value:?}"),
2033 }
2034 }
2035
2036 #[test]
2037 fn version_method_returns_version_and_protocol() {
2038 let rt = make_runtime();
2039 let line = r#"{"jsonrpc":"2.0","id":1,"method":"version","params":{}}"#;
2040 let resp = handle(&rt, line);
2041 assert!(resp.contains("\"id\":1"));
2042 assert!(resp.contains("\"protocol\":\"1.0\""));
2043 assert!(resp.contains("\"version\""));
2044 }
2045
2046 #[test]
2047 fn health_method_returns_ok_true() {
2048 let rt = make_runtime();
2049 let resp = handle(
2050 &rt,
2051 r#"{"jsonrpc":"2.0","id":"abc","method":"health","params":{}}"#,
2052 );
2053 assert!(resp.contains("\"ok\":true"));
2054 assert!(resp.contains("\"id\":\"abc\""));
2055 }
2056
2057 #[test]
2058 fn parse_error_for_invalid_json() {
2059 let rt = make_runtime();
2060 let resp = handle(&rt, "not json {");
2061 assert!(resp.contains("\"code\":\"PARSE_ERROR\""));
2062 assert!(resp.contains("\"id\":null"));
2063 }
2064
2065 #[test]
2066 fn invalid_request_when_method_missing() {
2067 let rt = make_runtime();
2068 let resp = handle(&rt, r#"{"jsonrpc":"2.0","id":1,"params":{}}"#);
2069 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
2070 }
2071
2072 #[test]
2073 fn unknown_method_is_invalid_request() {
2074 let rt = make_runtime();
2075 let resp = handle(
2076 &rt,
2077 r#"{"jsonrpc":"2.0","id":1,"method":"frobnicate","params":{}}"#,
2078 );
2079 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
2080 assert!(resp.contains("frobnicate"));
2081 }
2082
2083 #[test]
2084 fn invalid_params_when_query_sql_missing() {
2085 let rt = make_runtime();
2086 let resp = handle(
2087 &rt,
2088 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{}}"#,
2089 );
2090 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
2091 }
2092
2093 #[test]
2094 fn close_method_marks_response_for_shutdown() {
2095 let rt = make_runtime();
2096 let resp = handle(
2097 &rt,
2098 r#"{"jsonrpc":"2.0","id":1,"method":"close","params":{}}"#,
2099 );
2100 assert!(resp.contains("\"__close__\":true"));
2101 }
2102
2103 #[test]
2104 fn query_with_int_text_params_round_trips() {
2105 let rt = make_runtime();
2106 let _ = handle(
2107 &rt,
2108 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE p (id INTEGER, name TEXT)"}}"#,
2109 );
2110 let _ = handle(
2111 &rt,
2112 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (1, 'Alice')"}}"#,
2113 );
2114 let _ = handle(
2115 &rt,
2116 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (2, 'Bob')"}}"#,
2117 );
2118 let resp = handle(
2119 &rt,
2120 r#"{"jsonrpc":"2.0","id":4,"method":"query","params":{"sql":"SELECT * FROM p WHERE id = $1 AND name = $2","params":[1,"Alice"]}}"#,
2121 );
2122 assert!(resp.contains("\"Alice\""), "got: {resp}");
2123 assert!(!resp.contains("\"Bob\""), "got: {resp}");
2124 }
2125
2126 #[test]
2127 fn query_with_question_params_covers_select_insert_update_delete() {
2128 let rt = make_runtime();
2129 let create = handle(
2130 &rt,
2131 &query_request(1, "CREATE TABLE qp (id INTEGER, name TEXT)"),
2132 );
2133 assert!(!create.contains("\"error\""), "got: {create}");
2134
2135 let inserted = handle(
2136 &rt,
2137 &query_request_with_params(
2138 2,
2139 "INSERT INTO qp (id, name) VALUES (?, ?)",
2140 vec![json!(1), json!("O'Reilly")],
2141 ),
2142 );
2143 assert!(inserted.contains("\"affected\":1"), "got: {inserted}");
2144
2145 let selected = handle(
2146 &rt,
2147 &query_request_with_params(3, "SELECT name FROM qp WHERE id = ?", vec![json!(1)]),
2148 );
2149 let rows = result_rows(&selected);
2150 assert_eq!(rows.len(), 1, "got: {selected}");
2151 assert_eq!(
2152 rows[0].get("name").and_then(Value::as_str),
2153 Some("O'Reilly")
2154 );
2155
2156 let selected_numbered = handle(
2157 &rt,
2158 &query_request_with_params(
2159 4,
2160 "SELECT name FROM qp WHERE name = ?1 AND id = ?2",
2161 vec![json!("O'Reilly"), json!(1)],
2162 ),
2163 );
2164 assert_eq!(
2165 result_rows(&selected_numbered).len(),
2166 1,
2167 "got: {selected_numbered}"
2168 );
2169
2170 let updated = handle(
2171 &rt,
2172 &query_request_with_params(
2173 5,
2174 "UPDATE qp SET name = ? WHERE id = ?",
2175 vec![json!("Alice"), json!(1)],
2176 ),
2177 );
2178 assert!(updated.contains("\"affected\":1"), "got: {updated}");
2179
2180 let deleted = handle(
2181 &rt,
2182 &query_request_with_params(6, "DELETE FROM qp WHERE name = ?", vec![json!("Alice")]),
2183 );
2184 assert!(deleted.contains("\"affected\":1"), "got: {deleted}");
2185
2186 let remaining = handle(&rt, &query_request(7, "SELECT * FROM qp"));
2187 assert!(result_rows(&remaining).is_empty(), "got: {remaining}");
2188 }
2189
2190 #[test]
2191 fn query_with_params_insert_and_search_round_trip() {
2192 let rt = make_runtime();
2193 let insert = handle(
2194 &rt,
2195 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"INSERT INTO bun_embeddings VECTOR (dense, content) VALUES ($1, $2)","params":[[1.0,0.0],"bun vector"]}}"#,
2196 );
2197 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2198
2199 let search = handle(
2200 &rt,
2201 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SEARCH SIMILAR $1 COLLECTION bun_embeddings LIMIT 1","params":[[1.0,0.0]]}}"#,
2202 );
2203 assert!(search.contains("\"rows\""), "got: {search}");
2204 assert!(search.contains("\"score\":1"), "got: {search}");
2205 assert!(!search.contains("\"error\""), "got: {search}");
2206 }
2207
2208 #[test]
2209 fn query_with_question_vector_param_round_trips() {
2210 let rt = make_runtime();
2211 let insert = handle(
2212 &rt,
2213 &query_request_with_params(
2214 1,
2215 "INSERT INTO question_embeddings VECTOR (dense, content) VALUES (?, ?)",
2216 vec![json!([1.0, 0.0]), json!("question vector")],
2217 ),
2218 );
2219 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2220
2221 let search = handle(
2222 &rt,
2223 &query_request_with_params(
2224 2,
2225 "SEARCH SIMILAR ? COLLECTION question_embeddings LIMIT 1",
2226 vec![json!([1.0, 0.0])],
2227 ),
2228 );
2229 assert!(search.contains("\"rows\""), "got: {search}");
2230 assert!(search.contains("\"score\":1"), "got: {search}");
2231 assert!(!search.contains("\"error\""), "got: {search}");
2232 }
2233
2234 #[test]
2235 fn query_with_typed_json_rpc_params_round_trips() {
2236 let rt = make_runtime();
2237 let create = handle(
2238 &rt,
2239 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE value_params (ok BOOLEAN, score FLOAT, payload BLOB, body JSON, seen_at TIMESTAMP, ident UUID)"}}"#,
2240 );
2241 assert!(!create.contains("\"error\""), "got: {create}");
2242
2243 let insert = handle(
2244 &rt,
2245 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO value_params (ok, score, payload, body, seen_at, ident) VALUES ($1, $2, $3, $4, $5, $6)","params":[true,{"$float":"NaN"},{"$bytes":"3q2+7w=="},{"z":[1,{"a":true}],"a":null},{"$ts":"1700000000123456789"},{"$uuid":"00112233-4455-6677-8899-aabbccddeeff"}]}}"#,
2246 );
2247 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2248
2249 let selected = handle(
2250 &rt,
2251 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"SELECT * FROM value_params"}}"#,
2252 );
2253 assert!(selected.contains("\"ok\":true"), "got: {selected}");
2254 assert!(selected.contains("\"$float\":\"NaN\""), "got: {selected}");
2255 assert!(
2256 selected.contains("\"$bytes\":\"3q2+7w==\""),
2257 "got: {selected}"
2258 );
2259 assert!(
2260 selected.contains("\"body\":{\"a\":null,\"z\":[1,{\"a\":true}]}"),
2261 "got: {selected}"
2262 );
2263 assert!(
2264 selected.contains("\"$ts\":\"1700000000123456789\""),
2265 "got: {selected}"
2266 );
2267 assert!(
2268 selected.contains("\"$uuid\":\"00112233-4455-6677-8899-aabbccddeeff\""),
2269 "got: {selected}"
2270 );
2271 }
2272
2273 #[test]
2274 fn select_timeseries_tags_decodes_json_payload() {
2275 let rt = make_runtime();
2276 let create = handle(&rt, &query_request(1, "CREATE TIMESERIES ts1"));
2277 assert!(!create.contains("\"error\""), "got: {create}");
2278
2279 let insert = handle(
2280 &rt,
2281 &query_request(
2282 2,
2283 r#"INSERT INTO ts1 (metric, value, tags, timestamp) VALUES ('cpu', 85, '{"host":"a"}', 1000)"#,
2284 ),
2285 );
2286 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2287
2288 let selected = handle(&rt, &query_request(3, "SELECT tags FROM ts1"));
2289 assert!(!selected.contains("<json"), "got: {selected}");
2290 let response = json::from_str::<Value>(&selected).expect("response json");
2291 let tags = response
2292 .get("result")
2293 .and_then(|result| result.get("rows"))
2294 .and_then(Value::as_array)
2295 .and_then(|rows| rows.first())
2296 .and_then(|row| row.get("tags"))
2297 .expect("tags field");
2298 assert_eq!(tags, &json!({"host": "a"}));
2299 }
2300
2301 #[test]
2302 fn select_table_json_column_round_trips_after_single_parse() {
2303 let rt = make_runtime();
2304 let create = handle(&rt, &query_request(1, "CREATE TABLE docs (payload JSON)"));
2305 assert!(!create.contains("\"error\""), "got: {create}");
2306
2307 let original = r#"{"nested":{"items":[1,true,"x"],"object":{"k":"v"}}}"#;
2308 let insert_sql = format!("INSERT INTO docs (payload) VALUES ({original})");
2309 let insert = handle(&rt, &query_request(2, &insert_sql));
2310 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2311
2312 let selected = handle(&rt, &query_request(3, "SELECT payload FROM docs"));
2313 assert!(!selected.contains("<json"), "got: {selected}");
2314 let response = json::from_str::<Value>(&selected).expect("response json");
2315 let payload = response
2316 .get("result")
2317 .and_then(|result| result.get("rows"))
2318 .and_then(Value::as_array)
2319 .and_then(|rows| rows.first())
2320 .and_then(|row| row.get("payload"))
2321 .expect("payload field");
2322 let expected = json::from_str::<Value>(original).expect("expected json");
2323 assert_eq!(payload, &expected);
2324
2325 let payload_text = payload.to_string_compact();
2326 assert_eq!(
2327 json::from_str::<Value>(&payload_text).expect("single parse"),
2328 expected
2329 );
2330 }
2331
2332 #[test]
2333 fn select_json_corruption_falls_back_to_code_and_hex() {
2334 use crate::storage::query::unified::UnifiedResult;
2335
2336 let mut result = UnifiedResult::with_columns(vec!["payload".into()]);
2337 let mut record = UnifiedRecord::new();
2338 record.set("payload", SchemaValue::Json(b"{not json".to_vec()));
2339 result.push(record);
2340
2341 let json = query_result_to_json(&RuntimeQueryResult {
2342 query: "SELECT payload FROM docs".to_string(),
2343 mode: crate::storage::query::modes::QueryMode::Sql,
2344 statement: "select",
2345 engine: "runtime-table",
2346 result,
2347 affected_rows: 0,
2348 statement_type: "select",
2349 });
2350
2351 let payload = json
2352 .get("rows")
2353 .and_then(Value::as_array)
2354 .and_then(|rows| rows.first())
2355 .and_then(|row| row.get("payload"))
2356 .expect("payload field");
2357 assert_eq!(
2358 payload.get("code").and_then(Value::as_str),
2359 Some("INVALID_JSON")
2360 );
2361 assert_eq!(
2362 payload.get("hex").and_then(Value::as_str),
2363 Some("7b6e6f74206a736f6e")
2364 );
2365 }
2366
2367 #[test]
2368 fn json_value_to_schema_value_decodes_typed_envelopes() {
2369 let SchemaValue::Blob(bytes) = json_value_to_schema_value(&json!({ "$bytes": "AAECAw==" }))
2370 else {
2371 panic!("expected blob");
2372 };
2373 assert_eq!(bytes, vec![0, 1, 2, 3]);
2374
2375 assert_eq!(
2376 json_value_to_schema_value(&json!({ "$ts": "9223372036854775807" })),
2377 SchemaValue::Timestamp(i64::MAX)
2378 );
2379
2380 let SchemaValue::Uuid(bytes) = json_value_to_schema_value(&json!({
2381 "$uuid": "00112233-4455-6677-8899-aabbccddeeff"
2382 })) else {
2383 panic!("expected uuid");
2384 };
2385 assert_eq!(
2386 bytes,
2387 [
2388 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd,
2389 0xee, 0xff
2390 ]
2391 );
2392
2393 let SchemaValue::Float(value) =
2394 json_value_to_schema_value(&json!({ "$float": "-Infinity" }))
2395 else {
2396 panic!("expected float");
2397 };
2398 assert!(value.is_infinite() && value.is_sign_negative());
2399 }
2400
2401 #[test]
2402 fn query_with_params_arity_mismatch_rejected() {
2403 let rt = make_runtime();
2404 let _ = handle(
2405 &rt,
2406 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pa (id INTEGER)"}}"#,
2407 );
2408 let resp = handle(
2409 &rt,
2410 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pa WHERE id = $1","params":[1,2]}}"#,
2411 );
2412 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2413 }
2414
2415 #[test]
2416 fn query_with_question_params_arity_mismatch_rejected() {
2417 let rt = make_runtime();
2418 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpa (id INTEGER)"));
2419 let resp = handle(
2420 &rt,
2421 &query_request_with_params(
2422 2,
2423 "SELECT * FROM qpa WHERE id = ?",
2424 vec![json!(1), json!(2)],
2425 ),
2426 );
2427 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2428 assert!(resp.contains("SQL expects 1, got 2"), "got: {resp}");
2429 }
2430
2431 #[test]
2432 fn query_with_params_gap_rejected() {
2433 let rt = make_runtime();
2434 let _ = handle(
2435 &rt,
2436 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pg (a INTEGER, b INTEGER)"}}"#,
2437 );
2438 let resp = handle(
2439 &rt,
2440 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pg WHERE a = $1 AND b = $3","params":[1,2,3]}}"#,
2441 );
2442 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2443 }
2444
2445 #[test]
2446 fn query_with_question_numbered_gap_rejected() {
2447 let rt = make_runtime();
2448 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpg (id INTEGER)"));
2449 let resp = handle(
2450 &rt,
2451 &query_request_with_params(
2452 2,
2453 "SELECT * FROM qpg WHERE id = ?2",
2454 vec![json!(1), json!(2)],
2455 ),
2456 );
2457 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2458 assert!(resp.contains("parameter $`1` is missing"), "got: {resp}");
2459 }
2460
2461 #[test]
2462 fn query_with_question_params_type_mismatch_names_slot() {
2463 let rt = make_runtime();
2464 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpt (id INTEGER)"));
2465 let resp = handle(
2466 &rt,
2467 &query_request_with_params(
2468 2,
2469 "INSERT INTO qpt (id) VALUES (?)",
2470 vec![json!("not-an-integer")],
2471 ),
2472 );
2473 assert!(resp.contains("\"QUERY_ERROR\""), "got: {resp}");
2474 assert!(resp.contains("id"), "got: {resp}");
2475 assert!(resp.contains("integer"), "got: {resp}");
2476 }
2477
2478 #[test]
2479 fn query_select_one_returns_rows() {
2480 let rt = make_runtime();
2481 let resp = handle(
2482 &rt,
2483 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"SELECT 1 AS one"}}"#,
2484 );
2485 assert!(resp.contains("\"result\""));
2486 assert!(!resp.contains("\"error\""));
2487 }
2488
2489 #[test]
2490 fn ask_query_result_uses_canonical_envelope() {
2491 use crate::storage::query::unified::UnifiedResult;
2492
2493 let mut result = UnifiedResult::with_columns(vec![
2494 "answer".into(),
2495 "provider".into(),
2496 "model".into(),
2497 "prompt_tokens".into(),
2498 "completion_tokens".into(),
2499 "sources_count".into(),
2500 "sources_flat".into(),
2501 "citations".into(),
2502 "validation".into(),
2503 ]);
2504 let mut record = UnifiedRecord::new();
2505 record.set("answer", SchemaValue::text("Deploy failed [^1]."));
2506 record.set("provider", SchemaValue::text("openai"));
2507 record.set("model", SchemaValue::text("gpt-4o-mini"));
2508 record.set("prompt_tokens", SchemaValue::Integer(11));
2509 record.set("completion_tokens", SchemaValue::Integer(7));
2510 record.set(
2511 "sources_flat",
2512 SchemaValue::Json(
2513 br#"[{"urn":"urn:reddb:row:deployments:1","kind":"row","collection":"deployments","id":"1"}]"#.to_vec(),
2514 ),
2515 );
2516 record.set(
2517 "citations",
2518 SchemaValue::Json(br#"[{"marker":1,"urn":"urn:reddb:row:deployments:1"}]"#.to_vec()),
2519 );
2520 record.set(
2521 "validation",
2522 SchemaValue::Json(br#"{"ok":true,"warnings":[],"errors":[]}"#.to_vec()),
2523 );
2524 result.push(record);
2525
2526 let json = query_result_to_json(&RuntimeQueryResult {
2527 query: "ASK 'why did deploy fail?'".to_string(),
2528 mode: crate::storage::query::modes::QueryMode::Sql,
2529 statement: "ask",
2530 engine: "runtime-ai",
2531 result,
2532 affected_rows: 0,
2533 statement_type: "select",
2534 });
2535
2536 assert_eq!(
2537 json.get("answer").and_then(Value::as_str),
2538 Some("Deploy failed [^1].")
2539 );
2540 assert_eq!(json.get("cache_hit").and_then(Value::as_bool), Some(false));
2541 assert_eq!(json.get("cost_usd").and_then(Value::as_f64), Some(0.0));
2542 assert_eq!(json.get("mode").and_then(Value::as_str), Some("strict"));
2543 assert_eq!(json.get("retry_count").and_then(Value::as_u64), Some(0));
2544 assert!(
2545 json.get("rows").is_none(),
2546 "ASK envelope must not be row-wrapped: {json}"
2547 );
2548 assert!(
2549 json.get("sources_flat")
2550 .and_then(Value::as_array)
2551 .is_some_and(|sources| sources.len() == 1
2552 && sources[0].get("payload").and_then(Value::as_str).is_some()),
2553 "sources_flat must be a parsed array: {json}"
2554 );
2555 assert!(
2556 json.get("citations")
2557 .and_then(Value::as_array)
2558 .is_some_and(|citations| citations.len() == 1),
2559 "citations must be a parsed array: {json}"
2560 );
2561 assert_eq!(
2562 json.get("validation")
2563 .and_then(|v| v.get("ok"))
2564 .and_then(Value::as_bool),
2565 Some(true)
2566 );
2567 }
2568
2569 #[test]
2574 fn tx_begin_returns_tx_id_and_isolation() {
2575 let rt = make_runtime();
2576 with_session(&rt, |call, _| {
2577 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2578 assert!(resp.contains("\"tx_id\":1"));
2579 assert!(resp.contains("\"isolation\":\"read_committed_deferred\""));
2580 assert!(!resp.contains("\"error\""));
2581 });
2582 }
2583
2584 #[test]
2585 fn tx_begin_twice_returns_already_open() {
2586 let rt = make_runtime();
2587 with_session(&rt, |call, _| {
2588 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2589 let resp = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
2590 assert!(resp.contains("\"code\":\"TX_ALREADY_OPEN\""));
2591 });
2592 }
2593
2594 #[test]
2595 fn tx_commit_without_begin_returns_no_tx_open() {
2596 let rt = make_runtime();
2597 with_session(&rt, |call, _| {
2598 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.commit","params":null}"#);
2599 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
2600 });
2601 }
2602
2603 #[test]
2604 fn tx_rollback_without_begin_returns_no_tx_open() {
2605 let rt = make_runtime();
2606 with_session(&rt, |call, _| {
2607 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.rollback","params":null}"#);
2608 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
2609 });
2610 }
2611
2612 #[test]
2613 fn insert_inside_tx_returns_pending_envelope() {
2614 let rt = make_runtime();
2615 let _ = handle(
2617 &rt,
2618 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE users (name TEXT)"}}"#,
2619 );
2620 with_session(&rt, |call, _| {
2621 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2622 let resp = call(
2623 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"users","payload":{"name":"alice"}}}"#,
2624 );
2625 assert!(resp.contains("\"pending\":true"));
2626 assert!(resp.contains("\"tx_id\":1"));
2627 assert!(resp.contains("\"affected\":0"));
2628 });
2629 }
2630
2631 #[test]
2632 fn begin_insert_rollback_does_not_persist() {
2633 let rt = make_runtime();
2634 let _ = handle(
2635 &rt,
2636 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u (name TEXT)"}}"#,
2637 );
2638 with_session(&rt, |call, _| {
2639 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2640 let _ = call(
2641 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u","payload":{"name":"ghost"}}}"#,
2642 );
2643 let rollback = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
2644 assert!(rollback.contains("\"ops_discarded\":1"));
2645 assert!(rollback.contains("\"tx_id\":1"));
2646 });
2647 let resp = handle(
2649 &rt,
2650 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u"}}"#,
2651 );
2652 assert!(!resp.contains("\"ghost\""));
2653 }
2654
2655 #[test]
2656 fn begin_insert_commit_persists() {
2657 let rt = make_runtime();
2658 let _ = handle(
2659 &rt,
2660 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u2 (name TEXT)"}}"#,
2661 );
2662 with_session(&rt, |call, _| {
2663 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2664 let _ = call(
2665 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u2","payload":{"name":"alice"}}}"#,
2666 );
2667 let _ = call(
2668 r#"{"jsonrpc":"2.0","id":3,"method":"insert","params":{"collection":"u2","payload":{"name":"bob"}}}"#,
2669 );
2670 let commit = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.commit","params":null}"#);
2671 assert!(commit.contains("\"ops_replayed\":2"));
2672 assert!(!commit.contains("\"error\""));
2673 });
2674 let resp = handle(
2675 &rt,
2676 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u2"}}"#,
2677 );
2678 assert!(resp.contains("\"alice\""));
2679 assert!(resp.contains("\"bob\""));
2680 }
2681
2682 #[test]
2683 fn bulk_insert_inside_tx_buffers_everything() {
2684 let rt = make_runtime();
2685 let _ = handle(
2686 &rt,
2687 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u3 (name TEXT)"}}"#,
2688 );
2689 with_session(&rt, |call, _| {
2690 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2691 let resp = call(
2692 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"u3","payloads":[{"name":"a"},{"name":"b"},{"name":"c"}]}}"#,
2693 );
2694 assert!(resp.contains("\"buffered\":3"));
2695 assert!(resp.contains("\"pending\":true"));
2696 assert!(resp.contains("\"affected\":0"));
2697
2698 let commit = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
2699 assert!(commit.contains("\"ops_replayed\":3"));
2700 });
2701 }
2702
2703 #[test]
2704 fn bulk_insert_chunks_at_internal_500_row_limit() {
2705 assert_eq!(bulk_insert_chunk_count(0), 0);
2706 assert_eq!(bulk_insert_chunk_count(1), 1);
2707 assert_eq!(bulk_insert_chunk_count(500), 1);
2708 assert_eq!(bulk_insert_chunk_count(501), 2);
2709 assert_eq!(bulk_insert_chunk_count(1000), 2);
2710 assert_eq!(bulk_insert_chunk_count(1001), 3);
2711 }
2712
2713 proptest! {
2714 #![proptest_config(ProptestConfig {
2715 cases: 12,
2716 ..ProptestConfig::default()
2717 })]
2718
2719 #[test]
2720 fn bulk_insert_matches_sequential_insert_state(
2721 names in proptest::collection::vec("[a-z]{1,8}", 1usize..20)
2722 ) {
2723 let rt = make_runtime();
2724 let payloads = names
2725 .iter()
2726 .map(|name| format!(r#"{{"name":"{name}","kind":"bulk"}}"#))
2727 .collect::<Vec<_>>();
2728 let payload_array = payloads.join(",");
2729
2730 let bulk = handle(
2731 &rt,
2732 &format!(
2733 r#"{{"jsonrpc":"2.0","id":1,"method":"bulk_insert","params":{{"collection":"bulk_prop","payloads":[{payload_array}]}}}}"#
2734 ),
2735 );
2736 let bulk_result = json::from_str::<Value>(&bulk).expect("bulk json");
2737 let bulk_ids = bulk_result
2738 .get("result")
2739 .and_then(|result| result.get("ids"))
2740 .and_then(Value::as_array)
2741 .expect("bulk ids");
2742 prop_assert_eq!(bulk_ids.len(), names.len());
2743
2744 for (index, payload) in payloads.iter().enumerate() {
2745 let insert = handle(
2746 &rt,
2747 &format!(
2748 r#"{{"jsonrpc":"2.0","id":{},"method":"insert","params":{{"collection":"seq_prop","payload":{payload}}}}}"#,
2749 index + 10
2750 ),
2751 );
2752 let insert_result = json::from_str::<Value>(&insert).expect("insert json");
2753 prop_assert!(
2754 insert_result
2755 .get("result")
2756 .and_then(|result| result.get("id"))
2757 .is_some(),
2758 "insert response missing id: {insert}"
2759 );
2760 }
2761
2762 let bulk_rows = result_name_kind(&handle(
2763 &rt,
2764 r#"{"jsonrpc":"2.0","id":99,"method":"query","params":{"sql":"SELECT name, kind FROM bulk_prop ORDER BY red_entity_id"}}"#,
2765 ));
2766 let seq_rows = result_name_kind(&handle(
2767 &rt,
2768 r#"{"jsonrpc":"2.0","id":100,"method":"query","params":{"sql":"SELECT name, kind FROM seq_prop ORDER BY red_entity_id"}}"#,
2769 ));
2770 prop_assert_eq!(bulk_rows, seq_rows);
2771 }
2772
2773 #[test]
2774 fn question_param_select_matches_inlined_literal(value in json_scalar_param()) {
2775 let rt = make_runtime();
2776 let bound = handle(
2777 &rt,
2778 &query_request_with_params(1, "SELECT ? AS v", vec![value.clone()]),
2779 );
2780 let inline_sql = format!("SELECT {} AS v", sql_literal_for_json(&value));
2781 let inlined = handle(&rt, &query_request(2, &inline_sql));
2782 prop_assert_eq!(
2783 result_rows(&bound),
2784 result_rows(&inlined),
2785 "bound={}, inlined={}",
2786 bound,
2787 inlined
2788 );
2789 }
2790 }
2791
2792 #[test]
2793 fn bulk_insert_graph_nodes_accepts_flat_rows_and_returns_ids() {
2794 let rt = make_runtime();
2795 create_graph_collection(&rt, "social");
2796
2797 let resp = handle(
2798 &rt,
2799 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"social","payloads":[{"label":"User","name":"alice"},{"label":"User","name":"bob"}]}}"#,
2800 );
2801 let envelope: Value = json::from_str(&resp).expect("json response");
2802 let result = envelope.get("result").expect("result");
2803 assert_eq!(result.get("affected").and_then(Value::as_u64), Some(2));
2804 assert_eq!(
2805 result
2806 .get("ids")
2807 .and_then(Value::as_array)
2808 .map(|ids| ids.len()),
2809 Some(2)
2810 );
2811
2812 let query = handle(
2813 &rt,
2814 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"MATCH (n:User) RETURN n.name"}}"#,
2815 );
2816 assert!(query.contains("\"alice\""), "got: {query}");
2817 assert!(query.contains("\"bob\""), "got: {query}");
2818 }
2819
2820 #[test]
2821 fn bulk_insert_graph_edges_accepts_flat_rows_and_returns_ids() {
2822 let rt = make_runtime();
2823 create_graph_collection(&rt, "network");
2824 let nodes = handle(
2825 &rt,
2826 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"network","payloads":[{"label":"Host","name":"app"},{"label":"Host","name":"db"}]}}"#,
2827 );
2828 let envelope: Value = json::from_str(&nodes).expect("node response");
2829 let ids = envelope
2830 .get("result")
2831 .and_then(|r| r.get("ids"))
2832 .and_then(Value::as_array)
2833 .expect("node ids");
2834 let from = ids[0].as_u64().expect("from id");
2835 let to = ids[1].as_u64().expect("to id");
2836
2837 let resp = handle(
2838 &rt,
2839 &format!(
2840 r#"{{"jsonrpc":"2.0","id":3,"method":"bulk_insert","params":{{"collection":"network","payloads":[{{"label":"connects","from":{from},"to":{to},"weight":0.5,"role":"primary"}}]}}}}"#
2841 ),
2842 );
2843 let envelope: Value = json::from_str(&resp).expect("edge response");
2844 let result = envelope.get("result").expect("result");
2845 assert_eq!(result.get("affected").and_then(Value::as_u64), Some(1));
2846 assert_eq!(
2847 result
2848 .get("ids")
2849 .and_then(Value::as_array)
2850 .map(|ids| ids.len()),
2851 Some(1)
2852 );
2853 }
2854
2855 #[test]
2856 fn delete_inside_tx_is_buffered() {
2857 let rt = make_runtime();
2858 let _ = handle(
2860 &rt,
2861 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u4 (name TEXT)"}}"#,
2862 );
2863 let _ = handle(
2864 &rt,
2865 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO u4 (name) VALUES ('keep')"}}"#,
2866 );
2867 with_session(&rt, |call, _| {
2868 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2869 let resp = call(
2870 r#"{"jsonrpc":"2.0","id":2,"method":"delete","params":{"collection":"u4","id":"1"}}"#,
2871 );
2872 assert!(resp.contains("\"pending\":true"));
2873 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
2874 });
2875 let resp = handle(
2877 &rt,
2878 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u4"}}"#,
2879 );
2880 assert!(resp.contains("\"keep\""));
2881 }
2882
2883 #[test]
2884 fn close_with_open_tx_auto_rollbacks() {
2885 let rt = make_runtime();
2886 let _ = handle(
2887 &rt,
2888 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u5 (name TEXT)"}}"#,
2889 );
2890 with_session(&rt, |call, _| {
2891 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2892 let _ = call(
2893 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u5","payload":{"name":"ghost"}}}"#,
2894 );
2895 let close = call(r#"{"jsonrpc":"2.0","id":3,"method":"close","params":null}"#);
2896 assert!(close.contains("\"__close__\":true"));
2897 assert!(!close.contains("\"error\""));
2898 });
2899 let resp = handle(
2900 &rt,
2901 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u5"}}"#,
2902 );
2903 assert!(!resp.contains("\"ghost\""));
2904 }
2905
2906 fn seed_numbers_table(rt: &RedDBRuntime, table: &str, count: u32) {
2911 let _ = handle(
2912 rt,
2913 &format!(
2914 r#"{{"jsonrpc":"2.0","id":1,"method":"query","params":{{"sql":"CREATE TABLE {table} (n INTEGER)"}}}}"#,
2915 ),
2916 );
2917 for i in 0..count {
2918 let _ = handle(
2919 rt,
2920 &format!(
2921 r#"{{"jsonrpc":"2.0","id":2,"method":"query","params":{{"sql":"INSERT INTO {table} (n) VALUES ({i})"}}}}"#,
2922 ),
2923 );
2924 }
2925 }
2926
2927 #[test]
2928 fn cursor_open_returns_id_columns_and_total() {
2929 let rt = make_runtime();
2930 seed_numbers_table(&rt, "nums1", 3);
2931 with_session(&rt, |call, _| {
2932 let resp = call(
2933 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums1"}}"#,
2934 );
2935 assert!(resp.contains("\"cursor_id\":1"));
2936 assert!(resp.contains("\"total_rows\":3"));
2937 assert!(resp.contains("\"columns\""));
2938 assert!(!resp.contains("\"error\""));
2939 });
2940 }
2941
2942 #[test]
2943 fn cursor_next_chunks_rows_and_signals_done() {
2944 let rt = make_runtime();
2945 seed_numbers_table(&rt, "nums2", 5);
2946 with_session(&rt, |call, _| {
2947 let _ = call(
2948 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums2"}}"#,
2949 );
2950 let first = call(
2951 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2952 );
2953 assert!(first.contains("\"done\":false"));
2954 assert!(first.contains("\"remaining\":3"));
2955
2956 let second = call(
2957 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2958 );
2959 assert!(second.contains("\"done\":false"));
2960 assert!(second.contains("\"remaining\":1"));
2961
2962 let third = call(
2963 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2964 );
2965 assert!(third.contains("\"done\":true"));
2966 assert!(third.contains("\"remaining\":0"));
2967 });
2968 }
2969
2970 #[test]
2971 fn cursor_auto_drops_when_exhausted() {
2972 let rt = make_runtime();
2973 seed_numbers_table(&rt, "nums3", 2);
2974 with_session(&rt, |call, _| {
2975 let _ = call(
2976 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums3"}}"#,
2977 );
2978 let _ = call(
2979 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
2980 );
2981 let resp = call(
2984 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
2985 );
2986 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
2987 });
2988 }
2989
2990 #[test]
2991 fn cursor_close_removes_it() {
2992 let rt = make_runtime();
2993 seed_numbers_table(&rt, "nums4", 3);
2994 with_session(&rt, |call, _| {
2995 let _ = call(
2996 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums4"}}"#,
2997 );
2998 let close =
2999 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.close","params":{"cursor_id":1}}"#);
3000 assert!(close.contains("\"closed\":true"));
3001 let after = call(
3002 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3003 );
3004 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3005 });
3006 }
3007
3008 #[test]
3009 fn cursor_close_unknown_errors() {
3010 let rt = make_runtime();
3011 with_session(&rt, |call, _| {
3012 let resp = call(
3013 r#"{"jsonrpc":"2.0","id":1,"method":"query.close","params":{"cursor_id":9999}}"#,
3014 );
3015 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3016 });
3017 }
3018
3019 #[test]
3020 fn cursor_next_without_cursor_id_errors() {
3021 let rt = make_runtime();
3022 with_session(&rt, |call, _| {
3023 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"query.next","params":{}}"#);
3024 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
3025 });
3026 }
3027
3028 #[test]
3029 fn cursor_default_batch_size_returns_all_when_smaller_than_default() {
3030 let rt = make_runtime();
3031 seed_numbers_table(&rt, "nums5", 7);
3032 with_session(&rt, |call, _| {
3033 let _ = call(
3034 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums5"}}"#,
3035 );
3036 let resp =
3038 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1}}"#);
3039 assert!(resp.contains("\"done\":true"));
3040 assert!(resp.contains("\"remaining\":0"));
3041 });
3042 }
3043
3044 #[test]
3045 fn close_method_drops_open_cursors() {
3046 let rt = make_runtime();
3047 seed_numbers_table(&rt, "nums6", 3);
3048 with_session(&rt, |call, _| {
3051 let _ = call(
3052 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums6"}}"#,
3053 );
3054 let close = call(r#"{"jsonrpc":"2.0","id":2,"method":"close","params":null}"#);
3055 assert!(close.contains("\"__close__\":true"));
3056 let after = call(
3058 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3059 );
3060 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3061 });
3062 }
3063
3064 #[test]
3065 fn cursor_independent_of_transaction_state() {
3066 let rt = make_runtime();
3067 seed_numbers_table(&rt, "nums7", 4);
3068 with_session(&rt, |call, _| {
3069 let _ = call(
3071 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums7"}}"#,
3072 );
3073 let _ = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
3074 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
3075 let resp = call(
3076 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3077 );
3078 assert!(resp.contains("\"done\":true"));
3079 assert!(!resp.contains("\"error\""));
3080 });
3081 }
3082
3083 #[test]
3084 fn second_tx_after_commit_gets_fresh_id() {
3085 let rt = make_runtime();
3086 let _ = handle(
3087 &rt,
3088 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u6 (name TEXT)"}}"#,
3089 );
3090 with_session(&rt, |call, _| {
3091 let first = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
3092 assert!(first.contains("\"tx_id\":1"));
3093 let _ = call(
3094 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u6","payload":{"name":"x"}}}"#,
3095 );
3096 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
3097
3098 let second = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.begin","params":null}"#);
3099 assert!(second.contains("\"tx_id\":2"));
3100 let _ = call(r#"{"jsonrpc":"2.0","id":5,"method":"tx.rollback","params":null}"#);
3101 });
3102 }
3103
3104 #[test]
3105 fn prepare_and_execute_prepared_statement() {
3106 let rt = make_runtime();
3107 let _ = handle(
3109 &rt,
3110 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE ps_test (n INTEGER)"}}"#,
3111 );
3112 let _ = handle(
3113 &rt,
3114 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO ps_test (n) VALUES (42)"}}"#,
3115 );
3116
3117 with_session(&rt, |call, _| {
3118 let prep = call(
3120 r#"{"jsonrpc":"2.0","id":3,"method":"prepare","params":{"sql":"SELECT n FROM ps_test WHERE n = 42"}}"#,
3121 );
3122 assert!(prep.contains("\"prepared_id\""), "prepare response: {prep}");
3123
3124 let id: u64 = {
3126 let v: crate::json::Value = crate::json::from_str(&prep).expect("json");
3127 let result = v.get("result").expect("result");
3128 result
3129 .get("prepared_id")
3130 .and_then(|n| n.as_f64())
3131 .expect("prepared_id") as u64
3132 };
3133
3134 let exec = call(&format!(
3136 r#"{{"jsonrpc":"2.0","id":4,"method":"execute_prepared","params":{{"prepared_id":{id},"binds":[42]}}}}"#
3137 ));
3138 assert!(
3140 exec.contains("\"rows\""),
3141 "execute_prepared response: {exec}"
3142 );
3143 assert!(exec.contains("42"), "expected row with n=42 in: {exec}");
3144 });
3145 }
3146}