1use std::io::{BufRead, BufReader, Stdin, Write};
19use std::panic::AssertUnwindSafe;
20
21use tokio::sync::Mutex as AsyncMutex;
22
23use crate::application::entity::{CreateRowInput, CreateRowsBatchInput};
24use crate::application::ports::RuntimeEntityPort;
25use crate::json::{self as json, Value};
26use crate::runtime::{RedDBRuntime, RuntimeQueryResult};
27use crate::storage::query::unified::UnifiedRecord;
28use crate::storage::schema::Value as SchemaValue;
29use reddb_client_connector::RedDBClient;
30
31enum Backend<'a> {
44 Local(&'a RedDBRuntime),
45 Remote(Box<RemoteBackend<'a>>),
46}
47
48struct RemoteBackend<'a> {
49 client: AsyncMutex<RedDBClient>,
50 tokio_rt: &'a tokio::runtime::Runtime,
51}
52
53pub const PROTOCOL_VERSION: &str = "1.0";
55const STDIO_BULK_INSERT_CHUNK_ROWS: usize = 500;
56
57pub mod error_code {
59 pub const PARSE_ERROR: &str = "PARSE_ERROR";
60 pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
61 pub const INVALID_PARAMS: &str = "INVALID_PARAMS";
62 pub const QUERY_ERROR: &str = "QUERY_ERROR";
63 pub const NOT_FOUND: &str = "NOT_FOUND";
64 pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
65 pub const TX_ALREADY_OPEN: &str = "TX_ALREADY_OPEN";
68 pub const NO_TX_OPEN: &str = "NO_TX_OPEN";
71 pub const TX_REPLAY_FAILED: &str = "TX_REPLAY_FAILED";
75 pub const TX_NOT_SUPPORTED_REMOTE: &str = "TX_NOT_SUPPORTED_REMOTE";
77 pub const CURSOR_NOT_FOUND: &str = "CURSOR_NOT_FOUND";
81 pub const CURSOR_LIMIT_EXCEEDED: &str = "CURSOR_LIMIT_EXCEEDED";
83}
84
85pub(crate) const MAX_CURSORS_PER_SESSION: usize = 64;
89pub(crate) const DEFAULT_CURSOR_BATCH_SIZE: usize = 100;
93pub(crate) const MAX_CURSOR_BATCH_SIZE: usize = 10_000;
96
97struct StdioPreparedStatement {
120 shape: crate::storage::query::ast::QueryExpr,
121 parameter_count: usize,
122}
123
124pub(crate) struct Session {
125 next_tx_id: u64,
126 current_tx: Option<OpenTx>,
127 next_cursor_id: u64,
128 cursors: std::collections::HashMap<u64, Cursor>,
129 next_prepared_id: u64,
131 prepared: std::collections::HashMap<u64, StdioPreparedStatement>,
133}
134
135impl Session {
136 pub(crate) fn new() -> Self {
137 Self {
138 next_tx_id: 1,
139 current_tx: None,
140 next_cursor_id: 1,
141 cursors: std::collections::HashMap::new(),
142 next_prepared_id: 1,
143 prepared: std::collections::HashMap::new(),
144 }
145 }
146
147 fn open_tx(&mut self) -> Result<u64, (&'static str, String)> {
148 if let Some(tx) = &self.current_tx {
149 return Err((
150 error_code::TX_ALREADY_OPEN,
151 format!("transaction {} already open in this session", tx.tx_id),
152 ));
153 }
154 let tx_id = self.next_tx_id;
155 self.next_tx_id = self.next_tx_id.saturating_add(1);
156 self.current_tx = Some(OpenTx {
157 tx_id,
158 write_set: Vec::new(),
159 });
160 Ok(tx_id)
161 }
162
163 fn take_tx(&mut self) -> Option<OpenTx> {
164 self.current_tx.take()
165 }
166
167 fn current_tx_mut(&mut self) -> Option<&mut OpenTx> {
168 self.current_tx.as_mut()
169 }
170
171 #[allow(dead_code)]
172 fn has_tx(&self) -> bool {
173 self.current_tx.is_some()
174 }
175
176 fn insert_cursor(&mut self, cursor: Cursor) -> Result<u64, (&'static str, String)> {
179 if self.cursors.len() >= MAX_CURSORS_PER_SESSION {
180 return Err((
181 error_code::CURSOR_LIMIT_EXCEEDED,
182 format!(
183 "session already holds {} cursors (max {}) — close some before opening new ones",
184 self.cursors.len(),
185 MAX_CURSORS_PER_SESSION
186 ),
187 ));
188 }
189 let id = self.next_cursor_id;
190 self.next_cursor_id = self.next_cursor_id.saturating_add(1);
191 let mut cursor = cursor;
192 cursor.cursor_id = id;
193 self.cursors.insert(id, cursor);
194 Ok(id)
195 }
196
197 fn cursor_mut(&mut self, id: u64) -> Option<&mut Cursor> {
198 self.cursors.get_mut(&id)
199 }
200
201 fn drop_cursor(&mut self, id: u64) -> Option<Cursor> {
202 self.cursors.remove(&id)
203 }
204
205 fn clear_cursors(&mut self) {
206 self.cursors.clear();
207 }
208}
209
210impl Default for Session {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216struct OpenTx {
218 tx_id: u64,
219 write_set: Vec<PendingSql>,
220}
221
222enum PendingSql {
226 Insert(String),
227 Delete(String),
228 #[allow(dead_code)] Update(String),
230}
231
232impl PendingSql {
233 fn sql(&self) -> &str {
234 match self {
235 PendingSql::Insert(s) | PendingSql::Delete(s) | PendingSql::Update(s) => s,
236 }
237 }
238}
239
240pub(crate) struct Cursor {
252 cursor_id: u64,
253 columns: Vec<String>,
254 rows: Vec<UnifiedRecord>,
255 position: usize,
256}
257
258impl Cursor {
259 fn new(columns: Vec<String>, rows: Vec<UnifiedRecord>) -> Self {
260 Self {
261 cursor_id: 0, columns,
263 rows,
264 position: 0,
265 }
266 }
267
268 fn total(&self) -> usize {
269 self.rows.len()
270 }
271
272 fn remaining(&self) -> usize {
273 self.rows.len().saturating_sub(self.position)
274 }
275
276 fn is_exhausted(&self) -> bool {
277 self.position >= self.rows.len()
278 }
279
280 fn take_batch(&mut self, batch_size: usize) -> &[UnifiedRecord] {
283 let end = (self.position + batch_size).min(self.rows.len());
284 let slice = &self.rows[self.position..end];
285 self.position = end;
286 slice
287 }
288}
289
290pub fn run(runtime: &RedDBRuntime) -> i32 {
296 run_with_io(runtime, std::io::stdin(), &mut std::io::stdout())
297}
298
299pub fn run_remote(endpoint: &str, token: Option<String>) -> i32 {
306 let tokio_rt = match tokio::runtime::Builder::new_current_thread()
307 .enable_all()
308 .build()
309 {
310 Ok(rt) => rt,
311 Err(e) => {
312 tracing::error!(err = %e, "rpc: failed to build tokio runtime");
313 return 1;
314 }
315 };
316 let client = match tokio_rt.block_on(RedDBClient::connect(endpoint, token)) {
317 Ok(c) => c,
318 Err(e) => {
319 tracing::error!(endpoint, err = %e, "rpc: failed to connect");
320 return 1;
321 }
322 };
323 let backend = Backend::Remote(Box::new(RemoteBackend {
324 client: AsyncMutex::new(client),
325 tokio_rt: &tokio_rt,
326 }));
327 run_backend(&backend, std::io::stdin(), &mut std::io::stdout())
328}
329
330pub fn run_with_io<W: Write>(runtime: &RedDBRuntime, stdin: Stdin, stdout: &mut W) -> i32 {
332 run_backend(&Backend::Local(runtime), stdin, stdout)
333}
334
335static STDIO_SESSION_CONN_ID: std::sync::atomic::AtomicU64 =
342 std::sync::atomic::AtomicU64::new(1_000_000);
343
344fn next_stdio_conn_id() -> u64 {
345 STDIO_SESSION_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
346}
347
348fn run_backend<W: Write>(backend: &Backend<'_>, stdin: Stdin, stdout: &mut W) -> i32 {
349 let reader = BufReader::new(stdin.lock());
350 let mut session = Session::new();
351 let conn_id = next_stdio_conn_id();
355 crate::runtime::impl_core::set_current_connection_id(conn_id);
356 for line_result in reader.lines() {
357 let line = match line_result {
358 Ok(l) => l,
359 Err(e) => {
360 let _ = writeln!(
361 stdout,
362 "{}",
363 error_response(&Value::Null, error_code::INTERNAL_ERROR, &e.to_string())
364 );
365 let _ = stdout.flush();
366 return 1;
367 }
368 };
369 if line.trim().is_empty() {
370 continue;
371 }
372 let response = handle_line(backend, &mut session, &line);
373 if writeln!(stdout, "{}", response).is_err() || stdout.flush().is_err() {
374 return 1;
375 }
376 if response.contains("\"__close__\":true") {
377 return 0;
378 }
379 }
380 let _ = session.take_tx();
384 crate::runtime::impl_core::clear_current_connection_id();
385 0
386}
387
388fn handle_line(backend: &Backend<'_>, session: &mut Session, line: &str) -> String {
392 let parsed: Value = match json::from_str(line) {
393 Ok(v) => v,
394 Err(err) => {
395 return error_response(
396 &Value::Null,
397 error_code::PARSE_ERROR,
398 &format!("invalid JSON: {err}"),
399 );
400 }
401 };
402
403 let id = parsed.get("id").cloned().unwrap_or(Value::Null);
404
405 let method = match parsed.get("method").and_then(Value::as_str) {
406 Some(m) => m.to_string(),
407 None => {
408 return error_response(&id, error_code::INVALID_REQUEST, "missing 'method' field");
409 }
410 };
411
412 let params = parsed.get("params").cloned().unwrap_or(Value::Null);
413
414 let dispatch = std::panic::catch_unwind(AssertUnwindSafe(|| match backend {
415 Backend::Local(rt) => dispatch_method(rt, session, &method, ¶ms),
416 Backend::Remote(remote) => {
417 if matches!(
422 method.as_str(),
423 "tx.begin"
424 | "tx.commit"
425 | "tx.rollback"
426 | "query.open"
427 | "query.next"
428 | "query.close"
429 ) {
430 Err((
431 error_code::TX_NOT_SUPPORTED_REMOTE,
432 format!("{method} is not supported over remote gRPC yet"),
433 ))
434 } else {
435 dispatch_method_remote(&remote.client, remote.tokio_rt, &method, ¶ms)
436 }
437 }
438 }));
439
440 match dispatch {
441 Ok(Ok(result)) => success_response(&id, &result, method == "close"),
442 Ok(Err((code, msg))) => error_response(&id, code, &msg),
443 Err(_) => error_response(&id, error_code::INTERNAL_ERROR, "handler panicked (caught)"),
444 }
445}
446
447fn dispatch_method(
450 runtime: &RedDBRuntime,
451 session: &mut Session,
452 method: &str,
453 params: &Value,
454) -> Result<Value, (&'static str, String)> {
455 match method {
456 "tx.begin" => {
457 let tx_id = session.open_tx()?;
458 Ok(Value::Object(
459 [
460 ("tx_id".to_string(), Value::Number(tx_id as f64)),
461 (
462 "isolation".to_string(),
463 Value::String("read_committed_deferred".to_string()),
464 ),
465 ]
466 .into_iter()
467 .collect(),
468 ))
469 }
470
471 "tx.commit" => {
472 let tx = session.take_tx().ok_or((
473 error_code::NO_TX_OPEN,
474 "no transaction is open in this session".to_string(),
475 ))?;
476 let tx_id = tx.tx_id;
477 let op_count = tx.write_set.len();
478
479 let replay: Result<(u64, usize), (usize, String)> = (|| {
486 runtime
487 .execute_query("BEGIN")
488 .map_err(|e| (0usize, format!("BEGIN: {e}")))?;
489 let mut total_affected: u64 = 0;
490 for (idx, op) in tx.write_set.iter().enumerate() {
491 match runtime.execute_query(op.sql()) {
492 Ok(qr) => total_affected += qr.affected_rows,
493 Err(e) => {
494 let _ = runtime.execute_query("ROLLBACK");
495 return Err((idx, e.to_string()));
496 }
497 }
498 }
499 runtime
500 .execute_query("COMMIT")
501 .map_err(|e| (op_count, format!("COMMIT: {e}")))?;
502 Ok((total_affected, op_count))
503 })();
504
505 match replay {
506 Ok((affected, replayed)) => Ok(Value::Object(
507 [
508 ("tx_id".to_string(), Value::Number(tx_id as f64)),
509 ("ops_replayed".to_string(), Value::Number(replayed as f64)),
510 ("affected".to_string(), Value::Number(affected as f64)),
511 ]
512 .into_iter()
513 .collect(),
514 )),
515 Err((failed_idx, msg)) => Err((
516 error_code::TX_REPLAY_FAILED,
517 format!(
518 "tx {tx_id} replay failed at op {failed_idx}/{op_count}: {msg} \
519 (ops 0..{failed_idx} already applied and are NOT rolled back)"
520 ),
521 )),
522 }
523 }
524
525 "query.open" => {
526 let sql = params.get("sql").and_then(Value::as_str).ok_or((
527 error_code::INVALID_PARAMS,
528 "missing 'sql' string".to_string(),
529 ))?;
530 let qr = runtime
531 .execute_query(sql)
532 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
533
534 let columns: Vec<String> = qr
538 .result
539 .records
540 .first()
541 .map(|first| {
542 let mut keys: Vec<String> = first
543 .column_names()
544 .into_iter()
545 .map(|k| k.to_string())
546 .collect();
547 keys.sort();
548 keys
549 })
550 .unwrap_or_default();
551
552 let cursor = Cursor::new(columns.clone(), qr.result.records);
553 let total = cursor.total();
554 let cursor_id = session.insert_cursor(cursor)?;
555
556 Ok(Value::Object(
557 [
558 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
559 (
560 "columns".to_string(),
561 Value::Array(columns.into_iter().map(Value::String).collect()),
562 ),
563 ("total_rows".to_string(), Value::Number(total as f64)),
564 ]
565 .into_iter()
566 .collect(),
567 ))
568 }
569
570 "query.next" => {
571 let cursor_id = params
572 .get("cursor_id")
573 .and_then(|v| v.as_f64())
574 .map(|n| n as u64)
575 .ok_or((
576 error_code::INVALID_PARAMS,
577 "missing 'cursor_id' number".to_string(),
578 ))?;
579 let batch_size = params
580 .get("batch_size")
581 .and_then(|v| v.as_f64())
582 .map(|n| n as usize)
583 .unwrap_or(DEFAULT_CURSOR_BATCH_SIZE)
584 .clamp(1, MAX_CURSOR_BATCH_SIZE);
585
586 let (rows, done, remaining) = {
589 let cursor = session.cursor_mut(cursor_id).ok_or((
590 error_code::CURSOR_NOT_FOUND,
591 format!("cursor {cursor_id} not found"),
592 ))?;
593 let batch = cursor.take_batch(batch_size);
594 let rows_json: Vec<Value> = batch.iter().map(record_to_json_object).collect();
595 (rows_json, cursor.is_exhausted(), cursor.remaining())
596 };
597
598 if done {
599 let _ = session.drop_cursor(cursor_id);
602 }
603
604 Ok(Value::Object(
605 [
606 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
607 ("rows".to_string(), Value::Array(rows)),
608 ("done".to_string(), Value::Bool(done)),
609 ("remaining".to_string(), Value::Number(remaining as f64)),
610 ]
611 .into_iter()
612 .collect(),
613 ))
614 }
615
616 "query.close" => {
617 let cursor_id = params
618 .get("cursor_id")
619 .and_then(|v| v.as_f64())
620 .map(|n| n as u64)
621 .ok_or((
622 error_code::INVALID_PARAMS,
623 "missing 'cursor_id' number".to_string(),
624 ))?;
625 let existed = session.drop_cursor(cursor_id).is_some();
626 if !existed {
627 return Err((
628 error_code::CURSOR_NOT_FOUND,
629 format!("cursor {cursor_id} not found"),
630 ));
631 }
632 Ok(Value::Object(
633 [
634 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
635 ("closed".to_string(), Value::Bool(true)),
636 ]
637 .into_iter()
638 .collect(),
639 ))
640 }
641
642 "tx.rollback" => {
643 let tx = session.take_tx().ok_or((
644 error_code::NO_TX_OPEN,
645 "no transaction is open in this session".to_string(),
646 ))?;
647 let ops_discarded = tx.write_set.len();
648 Ok(Value::Object(
649 [
650 ("tx_id".to_string(), Value::Number(tx.tx_id as f64)),
651 (
652 "ops_discarded".to_string(),
653 Value::Number(ops_discarded as f64),
654 ),
655 ]
656 .into_iter()
657 .collect(),
658 ))
659 }
660
661 "version" => Ok(Value::Object(
662 [
663 (
664 "version".to_string(),
665 Value::String(env!("CARGO_PKG_VERSION").to_string()),
666 ),
667 (
668 "protocol".to_string(),
669 Value::String(PROTOCOL_VERSION.to_string()),
670 ),
671 ]
672 .into_iter()
673 .collect(),
674 )),
675
676 "health" => Ok(Value::Object(
677 [
678 ("ok".to_string(), Value::Bool(true)),
679 (
680 "version".to_string(),
681 Value::String(env!("CARGO_PKG_VERSION").to_string()),
682 ),
683 ]
684 .into_iter()
685 .collect(),
686 )),
687
688 "query" => {
689 let sql = params.get("sql").and_then(Value::as_str).ok_or((
690 error_code::INVALID_PARAMS,
691 "missing 'sql' string".to_string(),
692 ))?;
693
694 let bind_values: Option<Vec<SchemaValue>> = params
697 .get("params")
698 .map(|v| {
699 v.as_array()
700 .ok_or((
701 error_code::INVALID_PARAMS,
702 "'params' must be an array".to_string(),
703 ))
704 .map(|arr| arr.iter().map(json_value_to_schema_value).collect())
705 })
706 .transpose()?;
707
708 if let Some(binds) = bind_values {
709 let qr = runtime
710 .execute_query_with_params(sql, &binds)
711 .map_err(|e| {
712 if matches!(e, crate::api::RedDBError::Validation { .. }) {
713 (error_code::INVALID_PARAMS, e.to_string())
714 } else {
715 (error_code::QUERY_ERROR, e.to_string())
716 }
717 })?;
718 return Ok(query_result_to_json(&qr));
719 }
720
721 let qr = runtime
722 .execute_query(sql)
723 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
724 Ok(query_result_to_json(&qr))
725 }
726
727 "prepare" => {
736 use crate::storage::query::modes::parse_multi;
737 use crate::storage::query::planner::shape::parameterize_query_expr;
738
739 let sql = params.get("sql").and_then(Value::as_str).ok_or((
740 error_code::INVALID_PARAMS,
741 "missing 'sql' string".to_string(),
742 ))?;
743 let parsed = parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
744 let (shape, parameter_count) = if let Some(prepared) = parameterize_query_expr(&parsed)
745 {
746 (prepared.shape, prepared.parameter_count)
747 } else {
748 (parsed, 0)
749 };
750 let id = session.next_prepared_id;
751 session.next_prepared_id = session.next_prepared_id.saturating_add(1);
752 session.prepared.insert(
753 id,
754 StdioPreparedStatement {
755 shape,
756 parameter_count,
757 },
758 );
759 Ok(Value::Object(
760 [
761 ("prepared_id".to_string(), Value::Number(id as f64)),
762 (
763 "parameter_count".to_string(),
764 Value::Number(parameter_count as f64),
765 ),
766 ]
767 .into_iter()
768 .collect(),
769 ))
770 }
771
772 "execute_prepared" => {
773 use crate::storage::query::planner::shape::bind_parameterized_query;
774 use crate::storage::schema::Value as SV;
775
776 let id = params
777 .get("prepared_id")
778 .and_then(Value::as_f64)
779 .map(|n| n as u64)
780 .ok_or((
781 error_code::INVALID_PARAMS,
782 "missing 'prepared_id'".to_string(),
783 ))?;
784
785 let stmt = session.prepared.get(&id).ok_or((
786 error_code::QUERY_ERROR,
787 format!("no prepared statement with id {id}"),
788 ))?;
789
790 let binds_json: Vec<Value> = params
792 .get("binds")
793 .and_then(Value::as_array)
794 .map(|a| a.to_vec())
795 .unwrap_or_default();
796 if binds_json.len() != stmt.parameter_count {
797 return Err((
798 error_code::INVALID_PARAMS,
799 format!(
800 "expected {} bind values, got {}",
801 stmt.parameter_count,
802 binds_json.len()
803 ),
804 ));
805 }
806
807 let binds: Vec<SV> = binds_json.iter().map(json_value_to_schema_value).collect();
809
810 let expr = if stmt.parameter_count == 0 {
812 stmt.shape.clone()
813 } else {
814 bind_parameterized_query(&stmt.shape, &binds, stmt.parameter_count)
815 .ok_or((error_code::QUERY_ERROR, "bind failed".to_string()))?
816 };
817
818 let qr = runtime
819 .execute_query_expr(expr)
820 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
821 Ok(query_result_to_json(&qr))
822 }
823
824 "insert" => {
825 let collection = params.get("collection").and_then(Value::as_str).ok_or((
826 error_code::INVALID_PARAMS,
827 "missing 'collection' string".to_string(),
828 ))?;
829 let payload = params.get("payload").ok_or((
830 error_code::INVALID_PARAMS,
831 "missing 'payload' object".to_string(),
832 ))?;
833 let payload_obj = payload.as_object().ok_or((
834 error_code::INVALID_PARAMS,
835 "'payload' must be a JSON object".to_string(),
836 ))?;
837 if let Some(tx) = session.current_tx_mut() {
838 let sql = build_insert_sql(collection, payload_obj.iter());
839 tx.write_set.push(PendingSql::Insert(sql));
840 return Ok(pending_tx_response(tx.tx_id));
841 }
842
843 let output = runtime
844 .create_row(flat_payload_to_row_input(collection, payload_obj))
845 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
846 let mut out = json::Map::new();
847 out.insert("affected".to_string(), Value::Number(1.0));
848 out.insert("id".to_string(), Value::String(output.id.raw().to_string()));
849 Ok(Value::Object(out))
850 }
851
852 "bulk_insert" => {
853 let collection = params.get("collection").and_then(Value::as_str).ok_or((
854 error_code::INVALID_PARAMS,
855 "missing 'collection' string".to_string(),
856 ))?;
857 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
858 error_code::INVALID_PARAMS,
859 "missing 'payloads' array".to_string(),
860 ))?;
861
862 let mut objects = Vec::with_capacity(payloads.len());
863 for entry in payloads {
864 objects.push(entry.as_object().ok_or((
865 error_code::INVALID_PARAMS,
866 "each payload must be a JSON object".to_string(),
867 ))?);
868 }
869
870 if let Some(tx) = session.current_tx_mut() {
871 let mut buffered: u64 = 0;
872 for obj in &objects {
873 let sql = build_insert_sql(collection, obj.iter());
874 tx.write_set.push(PendingSql::Insert(sql));
875 buffered += 1;
876 }
877 let tx_id = tx.tx_id;
878 return Ok(Value::Object(
879 [
880 ("affected".to_string(), Value::Number(0.0)),
881 ("buffered".to_string(), Value::Number(buffered as f64)),
882 ("pending".to_string(), Value::Bool(true)),
883 ("tx_id".to_string(), Value::Number(tx_id as f64)),
884 ]
885 .into_iter()
886 .collect(),
887 ));
888 }
889
890 if should_bulk_insert_graph(runtime, collection, &objects) {
891 return bulk_insert_graph(runtime, collection, &objects)
892 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()));
893 }
894
895 let mut total_affected: u64 = 0;
896 let mut ids = Vec::with_capacity(objects.len());
897 for chunk in objects.chunks(STDIO_BULK_INSERT_CHUNK_ROWS) {
898 let rows = chunk
899 .iter()
900 .map(|obj| flat_payload_to_row_input(collection, obj))
901 .collect();
902 let outputs = runtime
903 .create_rows_batch(CreateRowsBatchInput {
904 collection: collection.to_string(),
905 rows,
906 suppress_events: false,
907 })
908 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
909 total_affected += outputs.len() as u64;
910 ids.extend(
911 outputs
912 .into_iter()
913 .map(|output| Value::String(output.id.raw().to_string())),
914 );
915 }
916 let mut out = json::Map::new();
917 out.insert("affected".to_string(), Value::Number(total_affected as f64));
918 out.insert("ids".to_string(), Value::Array(ids));
919 Ok(Value::Object(out))
920 }
921
922 "get" => {
923 let collection = params.get("collection").and_then(Value::as_str).ok_or((
924 error_code::INVALID_PARAMS,
925 "missing 'collection' string".to_string(),
926 ))?;
927 let id = params.get("id").and_then(Value::as_str).ok_or((
928 error_code::INVALID_PARAMS,
929 "missing 'id' string".to_string(),
930 ))?;
931 let sql = format!("SELECT * FROM {collection} WHERE red_entity_id = {id} LIMIT 1");
932 let qr = runtime
933 .execute_query(&sql)
934 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
935 let entity = qr
936 .result
937 .records
938 .first()
939 .map(record_to_json_object)
940 .unwrap_or(Value::Null);
941 Ok(Value::Object(
942 [("entity".to_string(), entity)].into_iter().collect(),
943 ))
944 }
945
946 "delete" => {
947 let collection = params.get("collection").and_then(Value::as_str).ok_or((
948 error_code::INVALID_PARAMS,
949 "missing 'collection' string".to_string(),
950 ))?;
951 let id = params.get("id").and_then(Value::as_str).ok_or((
952 error_code::INVALID_PARAMS,
953 "missing 'id' string".to_string(),
954 ))?;
955 let sql = format!("DELETE FROM {collection} WHERE red_entity_id = {id}");
956
957 if let Some(tx) = session.current_tx_mut() {
958 tx.write_set.push(PendingSql::Delete(sql));
959 return Ok(pending_tx_response(tx.tx_id));
960 }
961
962 let qr = runtime
963 .execute_query(&sql)
964 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
965 Ok(Value::Object(
966 [(
967 "affected".to_string(),
968 Value::Number(qr.affected_rows as f64),
969 )]
970 .into_iter()
971 .collect(),
972 ))
973 }
974
975 "close" => {
976 let _ = session.take_tx();
981 session.clear_cursors();
982 let _ = runtime.checkpoint();
983 Ok(Value::Null)
984 }
985
986 "auth.login"
991 | "auth.whoami"
992 | "auth.change_password"
993 | "auth.create_api_key"
994 | "auth.revoke_api_key" => {
995 let _ = (session, params);
996 Err((
997 error_code::INVALID_REQUEST,
998 format!(
999 "{method}: auth methods are only available on grpc:// connections; \
1000 embedded modes (memory://, file://) inherit caller privileges"
1001 ),
1002 ))
1003 }
1004
1005 other => Err((
1006 error_code::INVALID_REQUEST,
1007 format!("unknown method: {other}"),
1008 )),
1009 }
1010}
1011
1012fn success_response(id: &Value, result: &Value, is_close: bool) -> String {
1017 let mut envelope = json::Map::new();
1022 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1023 envelope.insert("id".to_string(), id.clone());
1024 envelope.insert("result".to_string(), result.clone());
1025 if is_close {
1026 envelope.insert("__close__".to_string(), Value::Bool(true));
1027 }
1028 Value::Object(envelope).to_string_compact()
1029}
1030
1031fn error_response(id: &Value, code: &str, message: &str) -> String {
1032 let mut err = json::Map::new();
1033 err.insert("code".to_string(), Value::String(code.to_string()));
1034 err.insert("message".to_string(), Value::String(message.to_string()));
1035 err.insert("data".to_string(), Value::Null);
1036
1037 let mut envelope = json::Map::new();
1038 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1039 envelope.insert("id".to_string(), id.clone());
1040 envelope.insert("error".to_string(), Value::Object(err));
1041 Value::Object(envelope).to_string_compact()
1042}
1043
1044fn pending_tx_response(tx_id: u64) -> Value {
1051 Value::Object(
1052 [
1053 ("affected".to_string(), Value::Number(0.0)),
1054 ("pending".to_string(), Value::Bool(true)),
1055 ("tx_id".to_string(), Value::Number(tx_id as f64)),
1056 ]
1057 .into_iter()
1058 .collect(),
1059 )
1060}
1061
1062pub(crate) fn build_insert_sql<'a, I>(collection: &str, fields: I) -> String
1063where
1064 I: Iterator<Item = (&'a String, &'a Value)>,
1065{
1066 let mut cols = Vec::new();
1067 let mut vals = Vec::new();
1068 for (k, v) in fields {
1069 cols.push(k.clone());
1070 vals.push(value_to_sql_literal(v));
1071 }
1072 format!(
1073 "INSERT INTO {collection} ({}) VALUES ({})",
1074 cols.join(", "),
1075 vals.join(", "),
1076 )
1077}
1078
1079fn flat_payload_to_row_input(
1080 collection: &str,
1081 payload: &json::Map<String, Value>,
1082) -> CreateRowInput {
1083 CreateRowInput {
1084 collection: collection.to_string(),
1085 fields: payload
1086 .iter()
1087 .map(|(key, value)| (key.clone(), json_value_to_schema_value(value)))
1088 .collect(),
1089 metadata: Vec::new(),
1090 node_links: Vec::new(),
1091 vector_links: Vec::new(),
1092 }
1093}
1094
1095fn bulk_insert_chunk_count(row_count: usize) -> usize {
1096 if row_count == 0 {
1097 0
1098 } else {
1099 ((row_count - 1) / STDIO_BULK_INSERT_CHUNK_ROWS) + 1
1100 }
1101}
1102
1103pub(crate) fn should_bulk_insert_graph(
1104 runtime: &RedDBRuntime,
1105 collection: &str,
1106 payloads: &[&json::Map<String, Value>],
1107) -> bool {
1108 let graph_shaped = payloads
1109 .iter()
1110 .all(|payload| payload.get("label").and_then(Value::as_str).is_some());
1111 if !graph_shaped {
1112 return false;
1113 }
1114
1115 matches!(
1116 runtime
1117 .db()
1118 .catalog_model_snapshot()
1119 .collections
1120 .iter()
1121 .find(|descriptor| descriptor.name == collection)
1122 .map(|descriptor| descriptor.declared_model.unwrap_or(descriptor.model)),
1123 Some(crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed)
1124 )
1125}
1126
1127pub(crate) fn bulk_insert_graph(
1128 runtime: &RedDBRuntime,
1129 collection: &str,
1130 payloads: &[&json::Map<String, Value>],
1131) -> crate::RedDBResult<Value> {
1132 use crate::application::entity_payload::{parse_create_edge_input, parse_create_node_input};
1133 use crate::application::ports::RuntimeEntityPort;
1134
1135 let mut ids = Vec::with_capacity(payloads.len());
1136 for payload in payloads {
1137 let input_payload = normalize_flat_graph_payload(payload);
1138 let id = if payload.contains_key("from") || payload.contains_key("to") {
1139 runtime
1140 .create_edge(parse_create_edge_input(
1141 collection.to_string(),
1142 &input_payload,
1143 )?)?
1144 .id
1145 } else {
1146 runtime
1147 .create_node(parse_create_node_input(
1148 collection.to_string(),
1149 &input_payload,
1150 )?)?
1151 .id
1152 };
1153 ids.push(Value::Number(id.raw() as f64));
1154 }
1155
1156 let mut out = json::Map::new();
1157 out.insert("affected".to_string(), Value::Number(ids.len() as f64));
1158 out.insert("ids".to_string(), Value::Array(ids));
1159 Ok(Value::Object(out))
1160}
1161
1162fn normalize_flat_graph_payload(payload: &json::Map<String, Value>) -> Value {
1163 if payload.contains_key("properties") || payload.contains_key("fields") {
1164 return Value::Object(payload.clone());
1165 }
1166
1167 let is_edge = payload.contains_key("from") || payload.contains_key("to");
1168 let mut normalized = payload.clone();
1169 let mut properties = json::Map::new();
1170 for (key, value) in payload {
1171 let reserved = if is_edge {
1172 matches!(
1173 key.as_str(),
1174 "label"
1175 | "from"
1176 | "to"
1177 | "weight"
1178 | "metadata"
1179 | "properties"
1180 | "fields"
1181 | "_ttl_ms"
1182 | "_expires_at"
1183 )
1184 } else {
1185 matches!(
1186 key.as_str(),
1187 "label"
1188 | "node_type"
1189 | "metadata"
1190 | "links"
1191 | "embeddings"
1192 | "properties"
1193 | "fields"
1194 | "_ttl_ms"
1195 | "_expires_at"
1196 )
1197 };
1198 if !reserved {
1199 properties.insert(key.clone(), value.clone());
1200 }
1201 }
1202 if !properties.is_empty() {
1203 normalized.insert("properties".to_string(), Value::Object(properties));
1204 }
1205 Value::Object(normalized)
1206}
1207
1208pub(crate) fn value_to_sql_literal(v: &Value) -> String {
1209 match v {
1210 Value::Null => "NULL".to_string(),
1211 Value::Bool(b) => b.to_string(),
1212 Value::Number(n) => {
1213 if n.fract() == 0.0 {
1214 format!("{}", *n as i64)
1215 } else {
1216 n.to_string()
1217 }
1218 }
1219 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1220 other => format!("'{}'", other.to_string_compact().replace('\'', "''")),
1221 }
1222}
1223
1224pub(crate) fn query_result_to_json(qr: &RuntimeQueryResult) -> Value {
1225 if let Some(ask) = ask_query_result_to_json(qr) {
1226 return ask;
1227 }
1228
1229 let mut envelope = json::Map::new();
1230 envelope.insert(
1231 "statement".to_string(),
1232 Value::String(qr.statement_type.to_string()),
1233 );
1234 envelope.insert(
1235 "affected".to_string(),
1236 Value::Number(qr.affected_rows as f64),
1237 );
1238
1239 let mut columns = Vec::new();
1240 if let Some(first) = qr.result.records.first() {
1241 let mut keys: Vec<String> = first
1242 .column_names()
1243 .into_iter()
1244 .map(|k| k.to_string())
1245 .collect();
1246 keys.sort();
1247 columns = keys.into_iter().map(Value::String).collect();
1248 }
1249 envelope.insert("columns".to_string(), Value::Array(columns));
1250
1251 let rows: Vec<Value> = qr
1252 .result
1253 .records
1254 .iter()
1255 .map(record_to_json_object)
1256 .collect();
1257 envelope.insert("rows".to_string(), Value::Array(rows));
1258
1259 Value::Object(envelope)
1260}
1261
1262fn ask_query_result_to_json(qr: &RuntimeQueryResult) -> Option<Value> {
1263 if qr.statement != "ask" {
1264 return None;
1265 }
1266 let row = qr.result.records.first()?;
1267 let answer = text_field(row, "answer")?;
1268 let provider = text_field(row, "provider").unwrap_or_default();
1269 let model = text_field(row, "model").unwrap_or_default();
1270 let sources_flat_json = json_field(row, "sources_flat").unwrap_or(Value::Array(Vec::new()));
1271 let citations_json = json_field(row, "citations").unwrap_or(Value::Array(Vec::new()));
1272 let validation_json = json_field(row, "validation").unwrap_or(Value::Object(json::Map::new()));
1273
1274 let effective_mode = match text_field(row, "mode").as_deref() {
1275 Some("lenient") => crate::runtime::ai::ask_response_envelope::Mode::Lenient,
1276 _ => crate::runtime::ai::ask_response_envelope::Mode::Strict,
1277 };
1278
1279 let result = crate::runtime::ai::ask_response_envelope::AskResult {
1280 answer,
1281 sources_flat: envelope_sources_flat(&sources_flat_json),
1282 citations: envelope_citations(&citations_json),
1283 validation: envelope_validation(&validation_json),
1284 cache_hit: bool_field(row, "cache_hit").unwrap_or(false),
1285 provider,
1286 model,
1287 prompt_tokens: u32_field(row, "prompt_tokens").unwrap_or(0),
1288 completion_tokens: u32_field(row, "completion_tokens").unwrap_or(0),
1289 cost_usd: f64_field(row, "cost_usd").unwrap_or(0.0),
1290 effective_mode,
1291 retry_count: u32_field(row, "retry_count").unwrap_or(0),
1292 };
1293 Some(crate::runtime::ai::ask_response_envelope::build(&result))
1294}
1295
1296fn record_field<'a>(record: &'a UnifiedRecord, name: &str) -> Option<&'a SchemaValue> {
1297 record
1298 .iter_fields()
1299 .find_map(|(key, value)| (key.as_ref() == name).then_some(value))
1300}
1301
1302fn text_field(record: &UnifiedRecord, name: &str) -> Option<String> {
1303 match record_field(record, name)? {
1304 SchemaValue::Text(s) => Some(s.to_string()),
1305 SchemaValue::Email(s)
1306 | SchemaValue::Url(s)
1307 | SchemaValue::NodeRef(s)
1308 | SchemaValue::EdgeRef(s) => Some(s.clone()),
1309 other => Some(format!("{other}")),
1310 }
1311}
1312
1313fn u32_field(record: &UnifiedRecord, name: &str) -> Option<u32> {
1314 match record_field(record, name)? {
1315 SchemaValue::Integer(n) => (*n >= 0).then_some((*n).min(u32::MAX as i64) as u32),
1316 SchemaValue::UnsignedInteger(n) => Some((*n).min(u32::MAX as u64) as u32),
1317 SchemaValue::BigInt(n)
1318 | SchemaValue::TimestampMs(n)
1319 | SchemaValue::Timestamp(n)
1320 | SchemaValue::Duration(n)
1321 | SchemaValue::Decimal(n) => (*n >= 0).then_some((*n).min(u32::MAX as i64) as u32),
1322 SchemaValue::Float(n) => (*n >= 0.0).then_some((*n).min(u32::MAX as f64) as u32),
1323 _ => None,
1324 }
1325}
1326
1327fn f64_field(record: &UnifiedRecord, name: &str) -> Option<f64> {
1328 match record_field(record, name)? {
1329 SchemaValue::Integer(n) => Some(*n as f64),
1330 SchemaValue::UnsignedInteger(n) => Some(*n as f64),
1331 SchemaValue::BigInt(n)
1332 | SchemaValue::TimestampMs(n)
1333 | SchemaValue::Timestamp(n)
1334 | SchemaValue::Duration(n)
1335 | SchemaValue::Decimal(n) => Some(*n as f64),
1336 SchemaValue::Float(n) => Some(*n),
1337 _ => None,
1338 }
1339}
1340
1341fn bool_field(record: &UnifiedRecord, name: &str) -> Option<bool> {
1342 match record_field(record, name)? {
1343 SchemaValue::Boolean(value) => Some(*value),
1344 _ => None,
1345 }
1346}
1347
1348fn json_field(record: &UnifiedRecord, name: &str) -> Option<Value> {
1349 match record_field(record, name)? {
1350 SchemaValue::Json(bytes) => json::from_slice(bytes).ok(),
1351 SchemaValue::Text(text) => json::from_str(text).ok(),
1352 _ => None,
1353 }
1354}
1355
1356fn envelope_sources_flat(
1357 value: &Value,
1358) -> Vec<crate::runtime::ai::ask_response_envelope::SourceRow> {
1359 value
1360 .as_array()
1361 .unwrap_or(&[])
1362 .iter()
1363 .filter_map(|source| {
1364 let urn = source.get("urn").and_then(Value::as_str)?.to_string();
1365 let payload = source
1366 .get("payload")
1367 .and_then(Value::as_str)
1368 .map(ToString::to_string)
1369 .unwrap_or_else(|| source.to_string_compact());
1370 Some(crate::runtime::ai::ask_response_envelope::SourceRow { urn, payload })
1371 })
1372 .collect()
1373}
1374
1375fn envelope_citations(value: &Value) -> Vec<crate::runtime::ai::ask_response_envelope::Citation> {
1376 value
1377 .as_array()
1378 .unwrap_or(&[])
1379 .iter()
1380 .filter_map(|citation| {
1381 let marker = citation.get("marker").and_then(Value::as_u64)?;
1382 let urn = citation.get("urn").and_then(Value::as_str)?.to_string();
1383 Some(crate::runtime::ai::ask_response_envelope::Citation {
1384 marker: marker.min(u32::MAX as u64) as u32,
1385 urn,
1386 })
1387 })
1388 .collect()
1389}
1390
1391fn envelope_validation(value: &Value) -> crate::runtime::ai::ask_response_envelope::Validation {
1392 crate::runtime::ai::ask_response_envelope::Validation {
1393 ok: value.get("ok").and_then(Value::as_bool).unwrap_or(true),
1394 warnings: validation_items(value, "warnings")
1395 .into_iter()
1396 .map(
1397 |(kind, detail)| crate::runtime::ai::ask_response_envelope::ValidationWarning {
1398 kind,
1399 detail,
1400 },
1401 )
1402 .collect(),
1403 errors: validation_items(value, "errors")
1404 .into_iter()
1405 .map(
1406 |(kind, detail)| crate::runtime::ai::ask_response_envelope::ValidationError {
1407 kind,
1408 detail,
1409 },
1410 )
1411 .collect(),
1412 }
1413}
1414
1415fn validation_items(value: &Value, key: &str) -> Vec<(String, String)> {
1416 value
1417 .get(key)
1418 .and_then(Value::as_array)
1419 .unwrap_or(&[])
1420 .iter()
1421 .filter_map(|item| {
1422 Some((
1423 item.get("kind").and_then(Value::as_str)?.to_string(),
1424 item.get("detail")
1425 .and_then(Value::as_str)
1426 .unwrap_or("")
1427 .to_string(),
1428 ))
1429 })
1430 .collect()
1431}
1432
1433pub(crate) fn insert_result_to_json(qr: &RuntimeQueryResult) -> Value {
1434 let mut envelope = json::Map::new();
1435 envelope.insert(
1436 "affected".to_string(),
1437 Value::Number(qr.affected_rows as f64),
1438 );
1439 if let Some(first) = qr.result.records.first() {
1441 if let Some(id_val) = first
1442 .iter_fields()
1443 .find(|(k, _)| {
1444 let s: &str = k;
1445 s == "_entity_id"
1446 })
1447 .map(|(_, v)| schema_value_to_json(v))
1448 {
1449 envelope.insert("id".to_string(), id_val);
1450 }
1451 }
1452 Value::Object(envelope)
1453}
1454
1455fn record_to_json_object(record: &UnifiedRecord) -> Value {
1456 let mut map = json::Map::new();
1457 let mut entries: Vec<(&str, &SchemaValue)> =
1460 record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
1461 entries.sort_by(|a, b| a.0.cmp(b.0));
1462 for (k, v) in entries {
1463 map.insert(k.to_string(), schema_value_to_json(v));
1464 }
1465 Value::Object(map)
1466}
1467
1468fn schema_value_to_json(v: &SchemaValue) -> Value {
1469 match v {
1470 SchemaValue::Null => Value::Null,
1471 SchemaValue::Boolean(b) => Value::Bool(*b),
1472 SchemaValue::Integer(n) => Value::Number(*n as f64),
1473 SchemaValue::UnsignedInteger(n) => Value::Number(*n as f64),
1474 SchemaValue::Float(n) if n.is_finite() => Value::Number(*n),
1475 SchemaValue::Float(n) => {
1476 let token = if n.is_nan() {
1477 "NaN"
1478 } else if n.is_sign_positive() {
1479 "Infinity"
1480 } else {
1481 "-Infinity"
1482 };
1483 single_key_object("$float", Value::String(token.to_string()))
1484 }
1485 SchemaValue::BigInt(n) => Value::Number(*n as f64),
1486 SchemaValue::TimestampMs(n) | SchemaValue::Duration(n) | SchemaValue::Decimal(n) => {
1487 Value::Number(*n as f64)
1488 }
1489 SchemaValue::Timestamp(n) => single_key_object("$ts", Value::String(n.to_string())),
1490 SchemaValue::Password(_) | SchemaValue::Secret(_) => Value::String("***".to_string()),
1491 SchemaValue::Text(s) => Value::String(s.to_string()),
1492 SchemaValue::Blob(bytes) => {
1493 single_key_object("$bytes", Value::String(base64_encode(bytes)))
1494 }
1495 SchemaValue::Json(bytes) => {
1496 crate::presentation::entity_json::storage_json_bytes_to_json(bytes)
1497 }
1498 SchemaValue::Uuid(bytes) => single_key_object("$uuid", Value::String(format_uuid(bytes))),
1499 SchemaValue::Email(s)
1500 | SchemaValue::Url(s)
1501 | SchemaValue::NodeRef(s)
1502 | SchemaValue::EdgeRef(s) => Value::String(s.clone()),
1503 other => Value::String(format!("{other}")),
1504 }
1505}
1506
1507fn single_key_object(key: &str, value: Value) -> Value {
1508 Value::Object([(key.to_string(), value)].into_iter().collect())
1509}
1510
1511pub(crate) fn json_value_to_schema_value(v: &Value) -> SchemaValue {
1515 match v {
1516 Value::Null => SchemaValue::Null,
1517 Value::Bool(b) => SchemaValue::Boolean(*b),
1518 Value::Number(n) => {
1519 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
1520 SchemaValue::Integer(*n as i64)
1521 } else {
1522 SchemaValue::Float(*n)
1523 }
1524 }
1525 Value::String(s) => SchemaValue::text(s.clone()),
1526 Value::Array(items) => {
1527 if items.iter().all(|v| matches!(v, Value::Number(_))) {
1531 let floats: Vec<f32> = items
1532 .iter()
1533 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
1534 .collect();
1535 SchemaValue::Vector(floats)
1536 } else {
1537 SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default())
1538 }
1539 }
1540 Value::Object(map) => {
1541 if map.len() == 1 {
1542 if let Some(Value::String(encoded)) = map.get("$bytes") {
1543 if let Ok(bytes) = base64_decode(encoded) {
1544 return SchemaValue::Blob(bytes);
1545 }
1546 }
1547 if let Some(value) = map.get("$ts") {
1548 if let Some(ts) = json_i64(value) {
1549 return SchemaValue::Timestamp(ts);
1550 }
1551 }
1552 if let Some(Value::String(value)) = map.get("$uuid") {
1553 if let Ok(uuid) = crate::crypto::Uuid::parse_str(value) {
1554 return SchemaValue::Uuid(*uuid.as_bytes());
1555 }
1556 }
1557 if let Some(Value::String(value)) = map.get("$float") {
1558 return match value.as_str() {
1559 "NaN" => SchemaValue::Float(f64::NAN),
1560 "Infinity" | "+Infinity" | "inf" | "+inf" => {
1561 SchemaValue::Float(f64::INFINITY)
1562 }
1563 "-Infinity" | "-inf" => SchemaValue::Float(f64::NEG_INFINITY),
1564 _ => SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default()),
1565 };
1566 }
1567 }
1568 SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default())
1569 }
1570 }
1571}
1572
1573fn json_i64(value: &Value) -> Option<i64> {
1574 match value {
1575 Value::Number(n) => {
1576 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
1577 Some(*n as i64)
1578 } else {
1579 None
1580 }
1581 }
1582 Value::String(s) => s.parse::<i64>().ok(),
1583 _ => None,
1584 }
1585}
1586
1587fn format_uuid(bytes: &[u8; 16]) -> String {
1588 format!(
1589 "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
1590 bytes[0],
1591 bytes[1],
1592 bytes[2],
1593 bytes[3],
1594 bytes[4],
1595 bytes[5],
1596 bytes[6],
1597 bytes[7],
1598 bytes[8],
1599 bytes[9],
1600 bytes[10],
1601 bytes[11],
1602 bytes[12],
1603 bytes[13],
1604 bytes[14],
1605 bytes[15]
1606 )
1607}
1608
1609fn base64_encode(bytes: &[u8]) -> String {
1610 const TABLE: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1611 let mut out = String::with_capacity(bytes.len().div_ceil(3) * 4);
1612 let mut chunks = bytes.chunks_exact(3);
1613 for chunk in chunks.by_ref() {
1614 let n = ((chunk[0] as u32) << 16) | ((chunk[1] as u32) << 8) | chunk[2] as u32;
1615 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1616 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1617 out.push(TABLE[((n >> 6) & 0x3f) as usize] as char);
1618 out.push(TABLE[(n & 0x3f) as usize] as char);
1619 }
1620 match chunks.remainder() {
1621 [] => {}
1622 [a] => {
1623 let n = (*a as u32) << 16;
1624 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1625 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1626 out.push('=');
1627 out.push('=');
1628 }
1629 [a, b] => {
1630 let n = ((*a as u32) << 16) | ((*b as u32) << 8);
1631 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1632 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1633 out.push(TABLE[((n >> 6) & 0x3f) as usize] as char);
1634 out.push('=');
1635 }
1636 _ => unreachable!(),
1637 }
1638 out
1639}
1640
1641fn base64_decode(input: &str) -> Result<Vec<u8>, String> {
1642 let bytes = input.as_bytes();
1643 if !bytes.len().is_multiple_of(4) {
1644 return Err("base64 length must be a multiple of 4".to_string());
1645 }
1646 let mut out = Vec::with_capacity(bytes.len() / 4 * 3);
1647 for chunk in bytes.chunks_exact(4) {
1648 let pad = chunk.iter().rev().take_while(|&&b| b == b'=').count();
1649 let a = base64_value(chunk[0])?;
1650 let b = base64_value(chunk[1])?;
1651 let c = if chunk[2] == b'=' {
1652 0
1653 } else {
1654 base64_value(chunk[2])?
1655 };
1656 let d = if chunk[3] == b'=' {
1657 0
1658 } else {
1659 base64_value(chunk[3])?
1660 };
1661 let n = ((a as u32) << 18) | ((b as u32) << 12) | ((c as u32) << 6) | d as u32;
1662 out.push(((n >> 16) & 0xff) as u8);
1663 if pad < 2 {
1664 out.push(((n >> 8) & 0xff) as u8);
1665 }
1666 if pad < 1 {
1667 out.push((n & 0xff) as u8);
1668 }
1669 }
1670 Ok(out)
1671}
1672
1673fn base64_value(byte: u8) -> Result<u8, String> {
1674 match byte {
1675 b'A'..=b'Z' => Ok(byte - b'A'),
1676 b'a'..=b'z' => Ok(byte - b'a' + 26),
1677 b'0'..=b'9' => Ok(byte - b'0' + 52),
1678 b'+' => Ok(62),
1679 b'/' => Ok(63),
1680 b'=' => Ok(0),
1681 _ => Err(format!("invalid base64 character: {}", byte as char)),
1682 }
1683}
1684
1685fn dispatch_method_remote(
1695 client: &AsyncMutex<RedDBClient>,
1696 tokio_rt: &tokio::runtime::Runtime,
1697 method: &str,
1698 params: &Value,
1699) -> Result<Value, (&'static str, String)> {
1700 match method {
1701 "version" => Ok(Value::Object(
1702 [
1703 (
1704 "version".to_string(),
1705 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1706 ),
1707 (
1708 "protocol".to_string(),
1709 Value::String(PROTOCOL_VERSION.to_string()),
1710 ),
1711 ]
1712 .into_iter()
1713 .collect(),
1714 )),
1715
1716 "health" => {
1717 let result = tokio_rt.block_on(async {
1718 let mut guard = client.lock().await;
1719 guard.health_status().await
1720 });
1721 match result {
1722 Ok(status) => Ok(Value::Object(
1723 [
1724 ("ok".to_string(), Value::Bool(status.healthy)),
1725 ("state".to_string(), Value::String(status.state)),
1726 (
1727 "checked_at_unix_ms".to_string(),
1728 Value::Number(status.checked_at_unix_ms as f64),
1729 ),
1730 (
1731 "version".to_string(),
1732 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1733 ),
1734 ]
1735 .into_iter()
1736 .collect(),
1737 )),
1738 Err(e) => Err((error_code::INTERNAL_ERROR, e.to_string())),
1739 }
1740 }
1741
1742 "query" => {
1743 let sql = params.get("sql").and_then(Value::as_str).ok_or((
1744 error_code::INVALID_PARAMS,
1745 "missing 'sql' string".to_string(),
1746 ))?;
1747 let json_str = tokio_rt
1748 .block_on(async {
1749 let mut guard = client.lock().await;
1750 guard.query(sql).await
1751 })
1752 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1753 let parsed = json::from_str::<Value>(&json_str)
1758 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1759 Ok(parsed)
1760 }
1761
1762 "insert" => {
1763 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1764 error_code::INVALID_PARAMS,
1765 "missing 'collection' string".to_string(),
1766 ))?;
1767 let payload = params.get("payload").ok_or((
1768 error_code::INVALID_PARAMS,
1769 "missing 'payload' object".to_string(),
1770 ))?;
1771 if payload.as_object().is_none() {
1772 return Err((
1773 error_code::INVALID_PARAMS,
1774 "'payload' must be a JSON object".to_string(),
1775 ));
1776 }
1777 let payload_json = payload.to_string_compact();
1778 let reply = tokio_rt
1779 .block_on(async {
1780 let mut guard = client.lock().await;
1781 guard.create_row_entity(collection, &payload_json).await
1782 })
1783 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1784 let mut out = json::Map::new();
1785 out.insert("affected".to_string(), Value::Number(1.0));
1786 out.insert("id".to_string(), Value::String(reply.id.to_string()));
1787 Ok(Value::Object(out))
1788 }
1789
1790 "bulk_insert" => {
1791 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1792 error_code::INVALID_PARAMS,
1793 "missing 'collection' string".to_string(),
1794 ))?;
1795 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
1796 error_code::INVALID_PARAMS,
1797 "missing 'payloads' array".to_string(),
1798 ))?;
1799 let mut encoded = Vec::with_capacity(payloads.len());
1800 for entry in payloads {
1801 if entry.as_object().is_none() {
1802 return Err((
1803 error_code::INVALID_PARAMS,
1804 "each payload must be a JSON object".to_string(),
1805 ));
1806 }
1807 encoded.push(entry.to_string_compact());
1808 }
1809 let status = tokio_rt
1810 .block_on(async {
1811 let mut guard = client.lock().await;
1812 guard.bulk_create_rows(collection, encoded).await
1813 })
1814 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1815 Ok(Value::Object(
1816 [
1817 ("affected".to_string(), Value::Number(status.count as f64)),
1818 (
1819 "ids".to_string(),
1820 Value::Array(
1821 status
1822 .ids
1823 .into_iter()
1824 .map(|id| Value::Number(id as f64))
1825 .collect(),
1826 ),
1827 ),
1828 ]
1829 .into_iter()
1830 .collect(),
1831 ))
1832 }
1833
1834 "get" => {
1835 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1836 error_code::INVALID_PARAMS,
1837 "missing 'collection' string".to_string(),
1838 ))?;
1839 let id = params.get("id").and_then(Value::as_str).ok_or((
1840 error_code::INVALID_PARAMS,
1841 "missing 'id' string".to_string(),
1842 ))?;
1843 let sql = format!("SELECT * FROM {collection} WHERE red_entity_id = {id} LIMIT 1");
1844 let json_str = tokio_rt
1845 .block_on(async {
1846 let mut guard = client.lock().await;
1847 guard.query(&sql).await
1848 })
1849 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1850 let parsed = json::from_str::<Value>(&json_str)
1851 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1852 let entity = parsed
1855 .get("rows")
1856 .and_then(Value::as_array)
1857 .and_then(|rows| rows.first().cloned())
1858 .unwrap_or(Value::Null);
1859 Ok(Value::Object(
1860 [("entity".to_string(), entity)].into_iter().collect(),
1861 ))
1862 }
1863
1864 "delete" => {
1865 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1866 error_code::INVALID_PARAMS,
1867 "missing 'collection' string".to_string(),
1868 ))?;
1869 let id = params.get("id").and_then(Value::as_str).ok_or((
1870 error_code::INVALID_PARAMS,
1871 "missing 'id' string".to_string(),
1872 ))?;
1873 let id = id.parse::<u64>().map_err(|_| {
1874 (
1875 error_code::INVALID_PARAMS,
1876 "id must be a numeric string".to_string(),
1877 )
1878 })?;
1879 let _reply = tokio_rt
1880 .block_on(async {
1881 let mut guard = client.lock().await;
1882 guard.delete_entity(collection, id).await
1883 })
1884 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1885 Ok(Value::Object(
1886 [("affected".to_string(), Value::Number(1.0))]
1887 .into_iter()
1888 .collect(),
1889 ))
1890 }
1891
1892 "close" => Ok(Value::Null),
1893
1894 other => Err((
1895 error_code::INVALID_REQUEST,
1896 format!("unknown method: {other}"),
1897 )),
1898 }
1899}
1900
1901#[cfg(test)]
1902mod tests {
1903 use super::*;
1904 use crate::json::json;
1905 use proptest::prelude::*;
1906
1907 fn make_runtime() -> RedDBRuntime {
1908 RedDBRuntime::in_memory().expect("in-memory runtime")
1909 }
1910
1911 fn create_graph_collection(rt: &RedDBRuntime, name: &str) {
1912 let db = rt.db();
1913 db.store()
1914 .create_collection(name)
1915 .expect("create collection");
1916 let now = std::time::SystemTime::now()
1917 .duration_since(std::time::UNIX_EPOCH)
1918 .unwrap_or_default()
1919 .as_millis();
1920 db.save_collection_contract(crate::physical::CollectionContract {
1921 name: name.to_string(),
1922 declared_model: crate::catalog::CollectionModel::Graph,
1923 schema_mode: crate::catalog::SchemaMode::Dynamic,
1924 origin: crate::physical::ContractOrigin::Explicit,
1925 version: 1,
1926 created_at_unix_ms: now,
1927 updated_at_unix_ms: now,
1928 default_ttl_ms: None,
1929 vector_dimension: None,
1930 vector_metric: None,
1931 context_index_fields: Vec::new(),
1932 declared_columns: Vec::new(),
1933 table_def: None,
1934 timestamps_enabled: false,
1935 context_index_enabled: false,
1936 metrics_raw_retention_ms: None,
1937 metrics_rollup_policies: Vec::new(),
1938 metrics_tenant_identity: None,
1939 metrics_namespace: None,
1940 append_only: false,
1941 subscriptions: Vec::new(),
1942 analytics_config: Vec::new(),
1943 session_key: None,
1944 session_gap_ms: None,
1945 retention_duration_ms: None,
1946 analytical_storage: None,
1947
1948 ai_policy: None,
1949 })
1950 .expect("save graph contract");
1951 }
1952
1953 fn handle(rt: &RedDBRuntime, line: &str) -> String {
1954 let mut session = Session::new();
1955 handle_line(&Backend::Local(rt), &mut session, line)
1956 }
1957
1958 fn query_request(id: u64, sql: &str) -> String {
1959 let mut params = json::Map::new();
1960 params.insert("sql".to_string(), Value::String(sql.to_string()));
1961
1962 let mut request = json::Map::new();
1963 request.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1964 request.insert("id".to_string(), Value::Number(id as f64));
1965 request.insert("method".to_string(), Value::String("query".to_string()));
1966 request.insert("params".to_string(), Value::Object(params));
1967 Value::Object(request).to_string_compact()
1968 }
1969
1970 fn query_request_with_params(id: u64, sql: &str, binds: Vec<Value>) -> String {
1971 let mut params = json::Map::new();
1972 params.insert("sql".to_string(), Value::String(sql.to_string()));
1973 params.insert("params".to_string(), Value::Array(binds));
1974
1975 let mut request = json::Map::new();
1976 request.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1977 request.insert("id".to_string(), Value::Number(id as f64));
1978 request.insert("method".to_string(), Value::String("query".to_string()));
1979 request.insert("params".to_string(), Value::Object(params));
1980 Value::Object(request).to_string_compact()
1981 }
1982
1983 fn with_session<F>(rt: &RedDBRuntime, f: F)
1986 where
1987 F: FnOnce(&dyn Fn(&str) -> String, &RedDBRuntime),
1988 {
1989 let session = std::cell::RefCell::new(Session::new());
1990 let call = |line: &str| -> String {
1991 let mut s = session.borrow_mut();
1992 handle_line(&Backend::Local(rt), &mut s, line)
1993 };
1994 f(&call, rt);
1995 }
1996
1997 fn result_rows(response: &str) -> Vec<Value> {
1998 json::from_str::<Value>(response)
1999 .expect("json response")
2000 .get("result")
2001 .and_then(|result| result.get("rows"))
2002 .and_then(Value::as_array)
2003 .map(|rows| rows.to_vec())
2004 .unwrap_or_default()
2005 }
2006
2007 fn result_name_kind(response: &str) -> Vec<(String, String)> {
2008 result_rows(response)
2009 .into_iter()
2010 .map(|row| {
2011 let object = row.as_object().expect("row object");
2012 let name = object
2013 .get("name")
2014 .and_then(Value::as_str)
2015 .expect("row name")
2016 .to_string();
2017 let kind = object
2018 .get("kind")
2019 .and_then(Value::as_str)
2020 .expect("row kind")
2021 .to_string();
2022 (name, kind)
2023 })
2024 .collect()
2025 }
2026
2027 fn json_scalar_param() -> impl Strategy<Value = Value> {
2028 prop_oneof![
2029 Just(Value::Null),
2030 any::<bool>().prop_map(Value::Bool),
2031 (-1000_i64..1000_i64).prop_map(|n| Value::Number(n as f64)),
2032 "[a-z']{0,8}".prop_map(Value::String),
2033 ]
2034 }
2035
2036 fn sql_literal_for_json(value: &Value) -> String {
2037 match value {
2038 Value::Null => "NULL".to_string(),
2039 Value::Bool(true) => "TRUE".to_string(),
2040 Value::Bool(false) => "FALSE".to_string(),
2041 Value::Number(n) => format!("{n:.0}"),
2042 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2043 _ => panic!("unsupported scalar param: {value:?}"),
2044 }
2045 }
2046
2047 #[test]
2048 fn version_method_returns_version_and_protocol() {
2049 let rt = make_runtime();
2050 let line = r#"{"jsonrpc":"2.0","id":1,"method":"version","params":{}}"#;
2051 let resp = handle(&rt, line);
2052 assert!(resp.contains("\"id\":1"));
2053 assert!(resp.contains("\"protocol\":\"1.0\""));
2054 assert!(resp.contains("\"version\""));
2055 }
2056
2057 #[test]
2058 fn health_method_returns_ok_true() {
2059 let rt = make_runtime();
2060 let resp = handle(
2061 &rt,
2062 r#"{"jsonrpc":"2.0","id":"abc","method":"health","params":{}}"#,
2063 );
2064 assert!(resp.contains("\"ok\":true"));
2065 assert!(resp.contains("\"id\":\"abc\""));
2066 }
2067
2068 #[test]
2069 fn parse_error_for_invalid_json() {
2070 let rt = make_runtime();
2071 let resp = handle(&rt, "not json {");
2072 assert!(resp.contains("\"code\":\"PARSE_ERROR\""));
2073 assert!(resp.contains("\"id\":null"));
2074 }
2075
2076 #[test]
2077 fn invalid_request_when_method_missing() {
2078 let rt = make_runtime();
2079 let resp = handle(&rt, r#"{"jsonrpc":"2.0","id":1,"params":{}}"#);
2080 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
2081 }
2082
2083 #[test]
2084 fn unknown_method_is_invalid_request() {
2085 let rt = make_runtime();
2086 let resp = handle(
2087 &rt,
2088 r#"{"jsonrpc":"2.0","id":1,"method":"frobnicate","params":{}}"#,
2089 );
2090 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
2091 assert!(resp.contains("frobnicate"));
2092 }
2093
2094 #[test]
2095 fn invalid_params_when_query_sql_missing() {
2096 let rt = make_runtime();
2097 let resp = handle(
2098 &rt,
2099 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{}}"#,
2100 );
2101 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
2102 }
2103
2104 #[test]
2105 fn close_method_marks_response_for_shutdown() {
2106 let rt = make_runtime();
2107 let resp = handle(
2108 &rt,
2109 r#"{"jsonrpc":"2.0","id":1,"method":"close","params":{}}"#,
2110 );
2111 assert!(resp.contains("\"__close__\":true"));
2112 }
2113
2114 #[test]
2115 fn query_with_int_text_params_round_trips() {
2116 let rt = make_runtime();
2117 let _ = handle(
2118 &rt,
2119 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE p (id INTEGER, name TEXT)"}}"#,
2120 );
2121 let _ = handle(
2122 &rt,
2123 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (1, 'Alice')"}}"#,
2124 );
2125 let _ = handle(
2126 &rt,
2127 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (2, 'Bob')"}}"#,
2128 );
2129 let resp = handle(
2130 &rt,
2131 r#"{"jsonrpc":"2.0","id":4,"method":"query","params":{"sql":"SELECT * FROM p WHERE id = $1 AND name = $2","params":[1,"Alice"]}}"#,
2132 );
2133 assert!(resp.contains("\"Alice\""), "got: {resp}");
2134 assert!(!resp.contains("\"Bob\""), "got: {resp}");
2135 }
2136
2137 #[test]
2138 fn query_with_question_params_covers_select_insert_update_delete() {
2139 let rt = make_runtime();
2140 let create = handle(
2141 &rt,
2142 &query_request(1, "CREATE TABLE qp (id INTEGER, name TEXT)"),
2143 );
2144 assert!(!create.contains("\"error\""), "got: {create}");
2145
2146 let inserted = handle(
2147 &rt,
2148 &query_request_with_params(
2149 2,
2150 "INSERT INTO qp (id, name) VALUES (?, ?)",
2151 vec![json!(1), json!("O'Reilly")],
2152 ),
2153 );
2154 assert!(inserted.contains("\"affected\":1"), "got: {inserted}");
2155
2156 let selected = handle(
2157 &rt,
2158 &query_request_with_params(3, "SELECT name FROM qp WHERE id = ?", vec![json!(1)]),
2159 );
2160 let rows = result_rows(&selected);
2161 assert_eq!(rows.len(), 1, "got: {selected}");
2162 assert_eq!(
2163 rows[0].get("name").and_then(Value::as_str),
2164 Some("O'Reilly")
2165 );
2166
2167 let selected_numbered = handle(
2168 &rt,
2169 &query_request_with_params(
2170 4,
2171 "SELECT name FROM qp WHERE name = ?1 AND id = ?2",
2172 vec![json!("O'Reilly"), json!(1)],
2173 ),
2174 );
2175 assert_eq!(
2176 result_rows(&selected_numbered).len(),
2177 1,
2178 "got: {selected_numbered}"
2179 );
2180
2181 let updated = handle(
2182 &rt,
2183 &query_request_with_params(
2184 5,
2185 "UPDATE qp SET name = ? WHERE id = ?",
2186 vec![json!("Alice"), json!(1)],
2187 ),
2188 );
2189 assert!(updated.contains("\"affected\":1"), "got: {updated}");
2190
2191 let deleted = handle(
2192 &rt,
2193 &query_request_with_params(6, "DELETE FROM qp WHERE name = ?", vec![json!("Alice")]),
2194 );
2195 assert!(deleted.contains("\"affected\":1"), "got: {deleted}");
2196
2197 let remaining = handle(&rt, &query_request(7, "SELECT * FROM qp"));
2198 assert!(result_rows(&remaining).is_empty(), "got: {remaining}");
2199 }
2200
2201 #[test]
2202 fn query_with_params_insert_and_search_round_trip() {
2203 let rt = make_runtime();
2204 let insert = handle(
2205 &rt,
2206 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"INSERT INTO bun_embeddings VECTOR (dense, content) VALUES ($1, $2)","params":[[1.0,0.0],"bun vector"]}}"#,
2207 );
2208 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2209
2210 let search = handle(
2211 &rt,
2212 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SEARCH SIMILAR $1 COLLECTION bun_embeddings LIMIT 1","params":[[1.0,0.0]]}}"#,
2213 );
2214 assert!(search.contains("\"rows\""), "got: {search}");
2215 assert!(search.contains("\"score\":1"), "got: {search}");
2216 assert!(!search.contains("\"error\""), "got: {search}");
2217 }
2218
2219 #[test]
2220 fn query_with_question_vector_param_round_trips() {
2221 let rt = make_runtime();
2222 let insert = handle(
2223 &rt,
2224 &query_request_with_params(
2225 1,
2226 "INSERT INTO question_embeddings VECTOR (dense, content) VALUES (?, ?)",
2227 vec![json!([1.0, 0.0]), json!("question vector")],
2228 ),
2229 );
2230 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2231
2232 let search = handle(
2233 &rt,
2234 &query_request_with_params(
2235 2,
2236 "SEARCH SIMILAR ? COLLECTION question_embeddings LIMIT 1",
2237 vec![json!([1.0, 0.0])],
2238 ),
2239 );
2240 assert!(search.contains("\"rows\""), "got: {search}");
2241 assert!(search.contains("\"score\":1"), "got: {search}");
2242 assert!(!search.contains("\"error\""), "got: {search}");
2243 }
2244
2245 #[test]
2246 fn query_with_typed_json_rpc_params_round_trips() {
2247 let rt = make_runtime();
2248 let create = handle(
2249 &rt,
2250 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE value_params (ok BOOLEAN, score FLOAT, payload BLOB, body JSON, seen_at TIMESTAMP, ident UUID)"}}"#,
2251 );
2252 assert!(!create.contains("\"error\""), "got: {create}");
2253
2254 let insert = handle(
2255 &rt,
2256 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO value_params (ok, score, payload, body, seen_at, ident) VALUES ($1, $2, $3, $4, $5, $6)","params":[true,{"$float":"NaN"},{"$bytes":"3q2+7w=="},{"z":[1,{"a":true}],"a":null},{"$ts":"1700000000123456789"},{"$uuid":"00112233-4455-6677-8899-aabbccddeeff"}]}}"#,
2257 );
2258 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2259
2260 let selected = handle(
2261 &rt,
2262 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"SELECT * FROM value_params"}}"#,
2263 );
2264 assert!(selected.contains("\"ok\":true"), "got: {selected}");
2265 assert!(selected.contains("\"$float\":\"NaN\""), "got: {selected}");
2266 assert!(
2267 selected.contains("\"$bytes\":\"3q2+7w==\""),
2268 "got: {selected}"
2269 );
2270 assert!(
2271 selected.contains("\"body\":{\"a\":null,\"z\":[1,{\"a\":true}]}"),
2272 "got: {selected}"
2273 );
2274 assert!(
2275 selected.contains("\"$ts\":\"1700000000123456789\""),
2276 "got: {selected}"
2277 );
2278 assert!(
2279 selected.contains("\"$uuid\":\"00112233-4455-6677-8899-aabbccddeeff\""),
2280 "got: {selected}"
2281 );
2282 }
2283
2284 #[test]
2285 fn select_timeseries_tags_decodes_json_payload() {
2286 let rt = make_runtime();
2287 let create = handle(&rt, &query_request(1, "CREATE TIMESERIES ts1"));
2288 assert!(!create.contains("\"error\""), "got: {create}");
2289
2290 let insert = handle(
2291 &rt,
2292 &query_request(
2293 2,
2294 r#"INSERT INTO ts1 (metric, value, tags, timestamp) VALUES ('cpu', 85, '{"host":"a"}', 1000)"#,
2295 ),
2296 );
2297 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2298
2299 let selected = handle(&rt, &query_request(3, "SELECT tags FROM ts1"));
2300 assert!(!selected.contains("<json"), "got: {selected}");
2301 let response = json::from_str::<Value>(&selected).expect("response json");
2302 let tags = response
2303 .get("result")
2304 .and_then(|result| result.get("rows"))
2305 .and_then(Value::as_array)
2306 .and_then(|rows| rows.first())
2307 .and_then(|row| row.get("tags"))
2308 .expect("tags field");
2309 assert_eq!(tags, &json!({"host": "a"}));
2310 }
2311
2312 #[test]
2313 fn select_table_json_column_round_trips_after_single_parse() {
2314 let rt = make_runtime();
2315 let create = handle(&rt, &query_request(1, "CREATE TABLE docs (payload JSON)"));
2316 assert!(!create.contains("\"error\""), "got: {create}");
2317
2318 let original = r#"{"nested":{"items":[1,true,"x"],"object":{"k":"v"}}}"#;
2319 let insert_sql = format!("INSERT INTO docs (payload) VALUES ({original})");
2320 let insert = handle(&rt, &query_request(2, &insert_sql));
2321 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2322
2323 let selected = handle(&rt, &query_request(3, "SELECT payload FROM docs"));
2324 assert!(!selected.contains("<json"), "got: {selected}");
2325 let response = json::from_str::<Value>(&selected).expect("response json");
2326 let payload = response
2327 .get("result")
2328 .and_then(|result| result.get("rows"))
2329 .and_then(Value::as_array)
2330 .and_then(|rows| rows.first())
2331 .and_then(|row| row.get("payload"))
2332 .expect("payload field");
2333 let expected = json::from_str::<Value>(original).expect("expected json");
2334 assert_eq!(payload, &expected);
2335
2336 let payload_text = payload.to_string_compact();
2337 assert_eq!(
2338 json::from_str::<Value>(&payload_text).expect("single parse"),
2339 expected
2340 );
2341 }
2342
2343 #[test]
2344 fn select_json_corruption_falls_back_to_code_and_hex() {
2345 use crate::storage::query::unified::UnifiedResult;
2346
2347 let mut result = UnifiedResult::with_columns(vec!["payload".into()]);
2348 let mut record = UnifiedRecord::new();
2349 record.set("payload", SchemaValue::Json(b"{not json".to_vec()));
2350 result.push(record);
2351
2352 let json = query_result_to_json(&RuntimeQueryResult {
2353 query: "SELECT payload FROM docs".to_string(),
2354 mode: crate::storage::query::modes::QueryMode::Sql,
2355 statement: "select",
2356 engine: "runtime-table",
2357 result,
2358 affected_rows: 0,
2359 statement_type: "select",
2360 bookmark: None,
2361 });
2362
2363 let payload = json
2364 .get("rows")
2365 .and_then(Value::as_array)
2366 .and_then(|rows| rows.first())
2367 .and_then(|row| row.get("payload"))
2368 .expect("payload field");
2369 assert_eq!(
2370 payload.get("code").and_then(Value::as_str),
2371 Some("INVALID_JSON")
2372 );
2373 assert_eq!(
2374 payload.get("hex").and_then(Value::as_str),
2375 Some("7b6e6f74206a736f6e")
2376 );
2377 }
2378
2379 #[test]
2380 fn json_value_to_schema_value_decodes_typed_envelopes() {
2381 let SchemaValue::Blob(bytes) = json_value_to_schema_value(&json!({ "$bytes": "AAECAw==" }))
2382 else {
2383 panic!("expected blob");
2384 };
2385 assert_eq!(bytes, vec![0, 1, 2, 3]);
2386
2387 assert_eq!(
2388 json_value_to_schema_value(&json!({ "$ts": "9223372036854775807" })),
2389 SchemaValue::Timestamp(i64::MAX)
2390 );
2391
2392 let SchemaValue::Uuid(bytes) = json_value_to_schema_value(&json!({
2393 "$uuid": "00112233-4455-6677-8899-aabbccddeeff"
2394 })) else {
2395 panic!("expected uuid");
2396 };
2397 assert_eq!(
2398 bytes,
2399 [
2400 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd,
2401 0xee, 0xff
2402 ]
2403 );
2404
2405 let SchemaValue::Float(value) =
2406 json_value_to_schema_value(&json!({ "$float": "-Infinity" }))
2407 else {
2408 panic!("expected float");
2409 };
2410 assert!(value.is_infinite() && value.is_sign_negative());
2411 }
2412
2413 #[test]
2414 fn query_with_params_arity_mismatch_rejected() {
2415 let rt = make_runtime();
2416 let _ = handle(
2417 &rt,
2418 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pa (id INTEGER)"}}"#,
2419 );
2420 let resp = handle(
2421 &rt,
2422 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pa WHERE id = $1","params":[1,2]}}"#,
2423 );
2424 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2425 }
2426
2427 #[test]
2428 fn query_with_question_params_arity_mismatch_rejected() {
2429 let rt = make_runtime();
2430 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpa (id INTEGER)"));
2431 let resp = handle(
2432 &rt,
2433 &query_request_with_params(
2434 2,
2435 "SELECT * FROM qpa WHERE id = ?",
2436 vec![json!(1), json!(2)],
2437 ),
2438 );
2439 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2440 assert!(resp.contains("SQL expects 1, got 2"), "got: {resp}");
2441 }
2442
2443 #[test]
2444 fn query_with_params_gap_rejected() {
2445 let rt = make_runtime();
2446 let _ = handle(
2447 &rt,
2448 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pg (a INTEGER, b INTEGER)"}}"#,
2449 );
2450 let resp = handle(
2451 &rt,
2452 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pg WHERE a = $1 AND b = $3","params":[1,2,3]}}"#,
2453 );
2454 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2455 }
2456
2457 #[test]
2458 fn query_with_question_numbered_gap_rejected() {
2459 let rt = make_runtime();
2460 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpg (id INTEGER)"));
2461 let resp = handle(
2462 &rt,
2463 &query_request_with_params(
2464 2,
2465 "SELECT * FROM qpg WHERE id = ?2",
2466 vec![json!(1), json!(2)],
2467 ),
2468 );
2469 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2470 assert!(resp.contains("parameter $`1` is missing"), "got: {resp}");
2471 }
2472
2473 #[test]
2474 fn query_with_question_params_type_mismatch_names_slot() {
2475 let rt = make_runtime();
2476 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpt (id INTEGER)"));
2477 let resp = handle(
2478 &rt,
2479 &query_request_with_params(
2480 2,
2481 "INSERT INTO qpt (id) VALUES (?)",
2482 vec![json!("not-an-integer")],
2483 ),
2484 );
2485 assert!(resp.contains("\"QUERY_ERROR\""), "got: {resp}");
2486 assert!(resp.contains("id"), "got: {resp}");
2487 assert!(resp.contains("integer"), "got: {resp}");
2488 }
2489
2490 #[test]
2491 fn query_select_one_returns_rows() {
2492 let rt = make_runtime();
2493 let resp = handle(
2494 &rt,
2495 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"SELECT 1 AS one"}}"#,
2496 );
2497 assert!(resp.contains("\"result\""));
2498 assert!(!resp.contains("\"error\""));
2499 }
2500
2501 #[test]
2502 fn ask_query_result_uses_canonical_envelope() {
2503 use crate::storage::query::unified::UnifiedResult;
2504
2505 let mut result = UnifiedResult::with_columns(vec![
2506 "answer".into(),
2507 "provider".into(),
2508 "model".into(),
2509 "prompt_tokens".into(),
2510 "completion_tokens".into(),
2511 "sources_count".into(),
2512 "sources_flat".into(),
2513 "citations".into(),
2514 "validation".into(),
2515 ]);
2516 let mut record = UnifiedRecord::new();
2517 record.set("answer", SchemaValue::text("Deploy failed [^1]."));
2518 record.set("provider", SchemaValue::text("openai"));
2519 record.set("model", SchemaValue::text("gpt-4o-mini"));
2520 record.set("prompt_tokens", SchemaValue::Integer(11));
2521 record.set("completion_tokens", SchemaValue::Integer(7));
2522 record.set(
2523 "sources_flat",
2524 SchemaValue::Json(
2525 br#"[{"urn":"urn:reddb:row:deployments:1","kind":"row","collection":"deployments","id":"1"}]"#.to_vec(),
2526 ),
2527 );
2528 record.set(
2529 "citations",
2530 SchemaValue::Json(br#"[{"marker":1,"urn":"urn:reddb:row:deployments:1"}]"#.to_vec()),
2531 );
2532 record.set(
2533 "validation",
2534 SchemaValue::Json(br#"{"ok":true,"warnings":[],"errors":[]}"#.to_vec()),
2535 );
2536 result.push(record);
2537
2538 let json = query_result_to_json(&RuntimeQueryResult {
2539 query: "ASK 'why did deploy fail?'".to_string(),
2540 mode: crate::storage::query::modes::QueryMode::Sql,
2541 statement: "ask",
2542 engine: "runtime-ai",
2543 result,
2544 affected_rows: 0,
2545 statement_type: "select",
2546 bookmark: None,
2547 });
2548
2549 assert_eq!(
2550 json.get("answer").and_then(Value::as_str),
2551 Some("Deploy failed [^1].")
2552 );
2553 assert_eq!(json.get("cache_hit").and_then(Value::as_bool), Some(false));
2554 assert_eq!(json.get("cost_usd").and_then(Value::as_f64), Some(0.0));
2555 assert_eq!(json.get("mode").and_then(Value::as_str), Some("strict"));
2556 assert_eq!(json.get("retry_count").and_then(Value::as_u64), Some(0));
2557 assert!(
2558 json.get("rows").is_none(),
2559 "ASK envelope must not be row-wrapped: {json}"
2560 );
2561 assert!(
2562 json.get("sources_flat")
2563 .and_then(Value::as_array)
2564 .is_some_and(|sources| sources.len() == 1
2565 && sources[0].get("payload").and_then(Value::as_str).is_some()),
2566 "sources_flat must be a parsed array: {json}"
2567 );
2568 assert!(
2569 json.get("citations")
2570 .and_then(Value::as_array)
2571 .is_some_and(|citations| citations.len() == 1),
2572 "citations must be a parsed array: {json}"
2573 );
2574 assert_eq!(
2575 json.get("validation")
2576 .and_then(|v| v.get("ok"))
2577 .and_then(Value::as_bool),
2578 Some(true)
2579 );
2580 }
2581
2582 #[test]
2587 fn tx_begin_returns_tx_id_and_isolation() {
2588 let rt = make_runtime();
2589 with_session(&rt, |call, _| {
2590 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2591 assert!(resp.contains("\"tx_id\":1"));
2592 assert!(resp.contains("\"isolation\":\"read_committed_deferred\""));
2593 assert!(!resp.contains("\"error\""));
2594 });
2595 }
2596
2597 #[test]
2598 fn tx_begin_twice_returns_already_open() {
2599 let rt = make_runtime();
2600 with_session(&rt, |call, _| {
2601 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2602 let resp = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
2603 assert!(resp.contains("\"code\":\"TX_ALREADY_OPEN\""));
2604 });
2605 }
2606
2607 #[test]
2608 fn tx_commit_without_begin_returns_no_tx_open() {
2609 let rt = make_runtime();
2610 with_session(&rt, |call, _| {
2611 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.commit","params":null}"#);
2612 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
2613 });
2614 }
2615
2616 #[test]
2617 fn tx_rollback_without_begin_returns_no_tx_open() {
2618 let rt = make_runtime();
2619 with_session(&rt, |call, _| {
2620 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.rollback","params":null}"#);
2621 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
2622 });
2623 }
2624
2625 #[test]
2626 fn insert_inside_tx_returns_pending_envelope() {
2627 let rt = make_runtime();
2628 let _ = handle(
2630 &rt,
2631 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE users (name TEXT)"}}"#,
2632 );
2633 with_session(&rt, |call, _| {
2634 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2635 let resp = call(
2636 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"users","payload":{"name":"alice"}}}"#,
2637 );
2638 assert!(resp.contains("\"pending\":true"));
2639 assert!(resp.contains("\"tx_id\":1"));
2640 assert!(resp.contains("\"affected\":0"));
2641 });
2642 }
2643
2644 #[test]
2645 fn begin_insert_rollback_does_not_persist() {
2646 let rt = make_runtime();
2647 let _ = handle(
2648 &rt,
2649 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u (name TEXT)"}}"#,
2650 );
2651 with_session(&rt, |call, _| {
2652 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2653 let _ = call(
2654 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u","payload":{"name":"ghost"}}}"#,
2655 );
2656 let rollback = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
2657 assert!(rollback.contains("\"ops_discarded\":1"));
2658 assert!(rollback.contains("\"tx_id\":1"));
2659 });
2660 let resp = handle(
2662 &rt,
2663 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u"}}"#,
2664 );
2665 assert!(!resp.contains("\"ghost\""));
2666 }
2667
2668 #[test]
2669 fn begin_insert_commit_persists() {
2670 let rt = make_runtime();
2671 let _ = handle(
2672 &rt,
2673 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u2 (name TEXT)"}}"#,
2674 );
2675 with_session(&rt, |call, _| {
2676 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2677 let _ = call(
2678 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u2","payload":{"name":"alice"}}}"#,
2679 );
2680 let _ = call(
2681 r#"{"jsonrpc":"2.0","id":3,"method":"insert","params":{"collection":"u2","payload":{"name":"bob"}}}"#,
2682 );
2683 let commit = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.commit","params":null}"#);
2684 assert!(commit.contains("\"ops_replayed\":2"));
2685 assert!(!commit.contains("\"error\""));
2686 });
2687 let resp = handle(
2688 &rt,
2689 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u2"}}"#,
2690 );
2691 assert!(resp.contains("\"alice\""));
2692 assert!(resp.contains("\"bob\""));
2693 }
2694
2695 #[test]
2696 fn bulk_insert_inside_tx_buffers_everything() {
2697 let rt = make_runtime();
2698 let _ = handle(
2699 &rt,
2700 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u3 (name TEXT)"}}"#,
2701 );
2702 with_session(&rt, |call, _| {
2703 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2704 let resp = call(
2705 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"u3","payloads":[{"name":"a"},{"name":"b"},{"name":"c"}]}}"#,
2706 );
2707 assert!(resp.contains("\"buffered\":3"));
2708 assert!(resp.contains("\"pending\":true"));
2709 assert!(resp.contains("\"affected\":0"));
2710
2711 let commit = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
2712 assert!(commit.contains("\"ops_replayed\":3"));
2713 });
2714 }
2715
2716 #[test]
2717 fn bulk_insert_chunks_at_internal_500_row_limit() {
2718 assert_eq!(bulk_insert_chunk_count(0), 0);
2719 assert_eq!(bulk_insert_chunk_count(1), 1);
2720 assert_eq!(bulk_insert_chunk_count(500), 1);
2721 assert_eq!(bulk_insert_chunk_count(501), 2);
2722 assert_eq!(bulk_insert_chunk_count(1000), 2);
2723 assert_eq!(bulk_insert_chunk_count(1001), 3);
2724 }
2725
2726 proptest! {
2727 #![proptest_config(ProptestConfig {
2728 cases: 12,
2729 ..ProptestConfig::default()
2730 })]
2731
2732 #[test]
2733 fn bulk_insert_matches_sequential_insert_state(
2734 names in proptest::collection::vec("[a-z]{1,8}", 1usize..20)
2735 ) {
2736 let rt = make_runtime();
2737 let payloads = names
2738 .iter()
2739 .map(|name| format!(r#"{{"name":"{name}","kind":"bulk"}}"#))
2740 .collect::<Vec<_>>();
2741 let payload_array = payloads.join(",");
2742
2743 let bulk = handle(
2744 &rt,
2745 &format!(
2746 r#"{{"jsonrpc":"2.0","id":1,"method":"bulk_insert","params":{{"collection":"bulk_prop","payloads":[{payload_array}]}}}}"#
2747 ),
2748 );
2749 let bulk_result = json::from_str::<Value>(&bulk).expect("bulk json");
2750 let bulk_ids = bulk_result
2751 .get("result")
2752 .and_then(|result| result.get("ids"))
2753 .and_then(Value::as_array)
2754 .expect("bulk ids");
2755 prop_assert_eq!(bulk_ids.len(), names.len());
2756
2757 for (index, payload) in payloads.iter().enumerate() {
2758 let insert = handle(
2759 &rt,
2760 &format!(
2761 r#"{{"jsonrpc":"2.0","id":{},"method":"insert","params":{{"collection":"seq_prop","payload":{payload}}}}}"#,
2762 index + 10
2763 ),
2764 );
2765 let insert_result = json::from_str::<Value>(&insert).expect("insert json");
2766 prop_assert!(
2767 insert_result
2768 .get("result")
2769 .and_then(|result| result.get("id"))
2770 .is_some(),
2771 "insert response missing id: {insert}"
2772 );
2773 }
2774
2775 let bulk_rows = result_name_kind(&handle(
2776 &rt,
2777 r#"{"jsonrpc":"2.0","id":99,"method":"query","params":{"sql":"SELECT name, kind FROM bulk_prop ORDER BY red_entity_id"}}"#,
2778 ));
2779 let seq_rows = result_name_kind(&handle(
2780 &rt,
2781 r#"{"jsonrpc":"2.0","id":100,"method":"query","params":{"sql":"SELECT name, kind FROM seq_prop ORDER BY red_entity_id"}}"#,
2782 ));
2783 prop_assert_eq!(bulk_rows, seq_rows);
2784 }
2785
2786 #[test]
2787 fn question_param_select_matches_inlined_literal(value in json_scalar_param()) {
2788 let rt = make_runtime();
2789 let bound = handle(
2790 &rt,
2791 &query_request_with_params(1, "SELECT ? AS v", vec![value.clone()]),
2792 );
2793 let inline_sql = format!("SELECT {} AS v", sql_literal_for_json(&value));
2794 let inlined = handle(&rt, &query_request(2, &inline_sql));
2795 prop_assert_eq!(
2796 result_rows(&bound),
2797 result_rows(&inlined),
2798 "bound={}, inlined={}",
2799 bound,
2800 inlined
2801 );
2802 }
2803 }
2804
2805 #[test]
2806 fn bulk_insert_graph_nodes_accepts_flat_rows_and_returns_ids() {
2807 let rt = make_runtime();
2808 create_graph_collection(&rt, "social");
2809
2810 let resp = handle(
2811 &rt,
2812 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"social","payloads":[{"label":"User","name":"alice"},{"label":"User","name":"bob"}]}}"#,
2813 );
2814 let envelope: Value = json::from_str(&resp).expect("json response");
2815 let result = envelope.get("result").expect("result");
2816 assert_eq!(result.get("affected").and_then(Value::as_u64), Some(2));
2817 assert_eq!(
2818 result
2819 .get("ids")
2820 .and_then(Value::as_array)
2821 .map(|ids| ids.len()),
2822 Some(2)
2823 );
2824
2825 let query = handle(
2826 &rt,
2827 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"MATCH (n:User) RETURN n.name"}}"#,
2828 );
2829 assert!(query.contains("\"alice\""), "got: {query}");
2830 assert!(query.contains("\"bob\""), "got: {query}");
2831 }
2832
2833 #[test]
2834 fn bulk_insert_graph_edges_accepts_flat_rows_and_returns_ids() {
2835 let rt = make_runtime();
2836 create_graph_collection(&rt, "network");
2837 let nodes = handle(
2838 &rt,
2839 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"network","payloads":[{"label":"Host","name":"app"},{"label":"Host","name":"db"}]}}"#,
2840 );
2841 let envelope: Value = json::from_str(&nodes).expect("node response");
2842 let ids = envelope
2843 .get("result")
2844 .and_then(|r| r.get("ids"))
2845 .and_then(Value::as_array)
2846 .expect("node ids");
2847 let from = ids[0].as_u64().expect("from id");
2848 let to = ids[1].as_u64().expect("to id");
2849
2850 let resp = handle(
2851 &rt,
2852 &format!(
2853 r#"{{"jsonrpc":"2.0","id":3,"method":"bulk_insert","params":{{"collection":"network","payloads":[{{"label":"connects","from":{from},"to":{to},"weight":0.5,"role":"primary"}}]}}}}"#
2854 ),
2855 );
2856 let envelope: Value = json::from_str(&resp).expect("edge response");
2857 let result = envelope.get("result").expect("result");
2858 assert_eq!(result.get("affected").and_then(Value::as_u64), Some(1));
2859 assert_eq!(
2860 result
2861 .get("ids")
2862 .and_then(Value::as_array)
2863 .map(|ids| ids.len()),
2864 Some(1)
2865 );
2866 }
2867
2868 #[test]
2869 fn delete_inside_tx_is_buffered() {
2870 let rt = make_runtime();
2871 let _ = handle(
2873 &rt,
2874 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u4 (name TEXT)"}}"#,
2875 );
2876 let _ = handle(
2877 &rt,
2878 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO u4 (name) VALUES ('keep')"}}"#,
2879 );
2880 with_session(&rt, |call, _| {
2881 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2882 let resp = call(
2883 r#"{"jsonrpc":"2.0","id":2,"method":"delete","params":{"collection":"u4","id":"1"}}"#,
2884 );
2885 assert!(resp.contains("\"pending\":true"));
2886 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
2887 });
2888 let resp = handle(
2890 &rt,
2891 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u4"}}"#,
2892 );
2893 assert!(resp.contains("\"keep\""));
2894 }
2895
2896 #[test]
2897 fn close_with_open_tx_auto_rollbacks() {
2898 let rt = make_runtime();
2899 let _ = handle(
2900 &rt,
2901 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u5 (name TEXT)"}}"#,
2902 );
2903 with_session(&rt, |call, _| {
2904 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2905 let _ = call(
2906 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u5","payload":{"name":"ghost"}}}"#,
2907 );
2908 let close = call(r#"{"jsonrpc":"2.0","id":3,"method":"close","params":null}"#);
2909 assert!(close.contains("\"__close__\":true"));
2910 assert!(!close.contains("\"error\""));
2911 });
2912 let resp = handle(
2913 &rt,
2914 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u5"}}"#,
2915 );
2916 assert!(!resp.contains("\"ghost\""));
2917 }
2918
2919 fn seed_numbers_table(rt: &RedDBRuntime, table: &str, count: u32) {
2924 let _ = handle(
2925 rt,
2926 &format!(
2927 r#"{{"jsonrpc":"2.0","id":1,"method":"query","params":{{"sql":"CREATE TABLE {table} (n INTEGER)"}}}}"#,
2928 ),
2929 );
2930 for i in 0..count {
2931 let _ = handle(
2932 rt,
2933 &format!(
2934 r#"{{"jsonrpc":"2.0","id":2,"method":"query","params":{{"sql":"INSERT INTO {table} (n) VALUES ({i})"}}}}"#,
2935 ),
2936 );
2937 }
2938 }
2939
2940 #[test]
2941 fn cursor_open_returns_id_columns_and_total() {
2942 let rt = make_runtime();
2943 seed_numbers_table(&rt, "nums1", 3);
2944 with_session(&rt, |call, _| {
2945 let resp = call(
2946 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums1"}}"#,
2947 );
2948 assert!(resp.contains("\"cursor_id\":1"));
2949 assert!(resp.contains("\"total_rows\":3"));
2950 assert!(resp.contains("\"columns\""));
2951 assert!(!resp.contains("\"error\""));
2952 });
2953 }
2954
2955 #[test]
2956 fn cursor_next_chunks_rows_and_signals_done() {
2957 let rt = make_runtime();
2958 seed_numbers_table(&rt, "nums2", 5);
2959 with_session(&rt, |call, _| {
2960 let _ = call(
2961 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums2"}}"#,
2962 );
2963 let first = call(
2964 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2965 );
2966 assert!(first.contains("\"done\":false"));
2967 assert!(first.contains("\"remaining\":3"));
2968
2969 let second = call(
2970 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2971 );
2972 assert!(second.contains("\"done\":false"));
2973 assert!(second.contains("\"remaining\":1"));
2974
2975 let third = call(
2976 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2977 );
2978 assert!(third.contains("\"done\":true"));
2979 assert!(third.contains("\"remaining\":0"));
2980 });
2981 }
2982
2983 #[test]
2984 fn cursor_auto_drops_when_exhausted() {
2985 let rt = make_runtime();
2986 seed_numbers_table(&rt, "nums3", 2);
2987 with_session(&rt, |call, _| {
2988 let _ = call(
2989 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums3"}}"#,
2990 );
2991 let _ = call(
2992 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
2993 );
2994 let resp = call(
2997 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
2998 );
2999 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3000 });
3001 }
3002
3003 #[test]
3004 fn cursor_close_removes_it() {
3005 let rt = make_runtime();
3006 seed_numbers_table(&rt, "nums4", 3);
3007 with_session(&rt, |call, _| {
3008 let _ = call(
3009 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums4"}}"#,
3010 );
3011 let close =
3012 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.close","params":{"cursor_id":1}}"#);
3013 assert!(close.contains("\"closed\":true"));
3014 let after = call(
3015 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3016 );
3017 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3018 });
3019 }
3020
3021 #[test]
3022 fn cursor_close_unknown_errors() {
3023 let rt = make_runtime();
3024 with_session(&rt, |call, _| {
3025 let resp = call(
3026 r#"{"jsonrpc":"2.0","id":1,"method":"query.close","params":{"cursor_id":9999}}"#,
3027 );
3028 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3029 });
3030 }
3031
3032 #[test]
3033 fn cursor_next_without_cursor_id_errors() {
3034 let rt = make_runtime();
3035 with_session(&rt, |call, _| {
3036 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"query.next","params":{}}"#);
3037 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
3038 });
3039 }
3040
3041 #[test]
3042 fn cursor_default_batch_size_returns_all_when_smaller_than_default() {
3043 let rt = make_runtime();
3044 seed_numbers_table(&rt, "nums5", 7);
3045 with_session(&rt, |call, _| {
3046 let _ = call(
3047 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums5"}}"#,
3048 );
3049 let resp =
3051 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1}}"#);
3052 assert!(resp.contains("\"done\":true"));
3053 assert!(resp.contains("\"remaining\":0"));
3054 });
3055 }
3056
3057 #[test]
3058 fn close_method_drops_open_cursors() {
3059 let rt = make_runtime();
3060 seed_numbers_table(&rt, "nums6", 3);
3061 with_session(&rt, |call, _| {
3064 let _ = call(
3065 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums6"}}"#,
3066 );
3067 let close = call(r#"{"jsonrpc":"2.0","id":2,"method":"close","params":null}"#);
3068 assert!(close.contains("\"__close__\":true"));
3069 let after = call(
3071 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3072 );
3073 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3074 });
3075 }
3076
3077 #[test]
3078 fn cursor_independent_of_transaction_state() {
3079 let rt = make_runtime();
3080 seed_numbers_table(&rt, "nums7", 4);
3081 with_session(&rt, |call, _| {
3082 let _ = call(
3084 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums7"}}"#,
3085 );
3086 let _ = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
3087 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
3088 let resp = call(
3089 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3090 );
3091 assert!(resp.contains("\"done\":true"));
3092 assert!(!resp.contains("\"error\""));
3093 });
3094 }
3095
3096 #[test]
3097 fn second_tx_after_commit_gets_fresh_id() {
3098 let rt = make_runtime();
3099 let _ = handle(
3100 &rt,
3101 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u6 (name TEXT)"}}"#,
3102 );
3103 with_session(&rt, |call, _| {
3104 let first = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
3105 assert!(first.contains("\"tx_id\":1"));
3106 let _ = call(
3107 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u6","payload":{"name":"x"}}}"#,
3108 );
3109 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
3110
3111 let second = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.begin","params":null}"#);
3112 assert!(second.contains("\"tx_id\":2"));
3113 let _ = call(r#"{"jsonrpc":"2.0","id":5,"method":"tx.rollback","params":null}"#);
3114 });
3115 }
3116
3117 #[test]
3118 fn prepare_and_execute_prepared_statement() {
3119 let rt = make_runtime();
3120 let _ = handle(
3122 &rt,
3123 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE ps_test (n INTEGER)"}}"#,
3124 );
3125 let _ = handle(
3126 &rt,
3127 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO ps_test (n) VALUES (42)"}}"#,
3128 );
3129
3130 with_session(&rt, |call, _| {
3131 let prep = call(
3133 r#"{"jsonrpc":"2.0","id":3,"method":"prepare","params":{"sql":"SELECT n FROM ps_test WHERE n = 42"}}"#,
3134 );
3135 assert!(prep.contains("\"prepared_id\""), "prepare response: {prep}");
3136
3137 let id: u64 = {
3139 let v: crate::json::Value = crate::json::from_str(&prep).expect("json");
3140 let result = v.get("result").expect("result");
3141 result
3142 .get("prepared_id")
3143 .and_then(|n| n.as_f64())
3144 .expect("prepared_id") as u64
3145 };
3146
3147 let exec = call(&format!(
3149 r#"{{"jsonrpc":"2.0","id":4,"method":"execute_prepared","params":{{"prepared_id":{id},"binds":[42]}}}}"#
3150 ));
3151 assert!(
3153 exec.contains("\"rows\""),
3154 "execute_prepared response: {exec}"
3155 );
3156 assert!(exec.contains("42"), "expected row with n=42 in: {exec}");
3157 });
3158 }
3159}