1use std::io::{BufRead, BufReader, Stdin, Write};
19use std::panic::AssertUnwindSafe;
20
21use tokio::sync::Mutex as AsyncMutex;
22
23use crate::application::entity::{CreateRowInput, CreateRowsBatchInput};
24use crate::application::ports::RuntimeEntityPort;
25use crate::json::{self as json, Value};
26use crate::runtime::{RedDBRuntime, RuntimeQueryResult};
27use crate::storage::query::unified::UnifiedRecord;
28use crate::storage::schema::Value as SchemaValue;
29use reddb_client_connector::RedDBClient;
30
31enum Backend<'a> {
44 Local(&'a RedDBRuntime),
45 Remote(Box<RemoteBackend<'a>>),
46}
47
48struct RemoteBackend<'a> {
49 client: AsyncMutex<RedDBClient>,
50 tokio_rt: &'a tokio::runtime::Runtime,
51}
52
53pub const PROTOCOL_VERSION: &str = "1.0";
55const STDIO_BULK_INSERT_CHUNK_ROWS: usize = 500;
56
57pub mod error_code {
59 pub const PARSE_ERROR: &str = "PARSE_ERROR";
60 pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
61 pub const INVALID_PARAMS: &str = "INVALID_PARAMS";
62 pub const QUERY_ERROR: &str = "QUERY_ERROR";
63 pub const NOT_FOUND: &str = "NOT_FOUND";
64 pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
65 pub const TX_ALREADY_OPEN: &str = "TX_ALREADY_OPEN";
68 pub const NO_TX_OPEN: &str = "NO_TX_OPEN";
71 pub const TX_REPLAY_FAILED: &str = "TX_REPLAY_FAILED";
75 pub const TX_NOT_SUPPORTED_REMOTE: &str = "TX_NOT_SUPPORTED_REMOTE";
77 pub const CURSOR_NOT_FOUND: &str = "CURSOR_NOT_FOUND";
81 pub const CURSOR_LIMIT_EXCEEDED: &str = "CURSOR_LIMIT_EXCEEDED";
83}
84
85pub(crate) const MAX_CURSORS_PER_SESSION: usize = 64;
89pub(crate) const DEFAULT_CURSOR_BATCH_SIZE: usize = 100;
93pub(crate) const MAX_CURSOR_BATCH_SIZE: usize = 10_000;
96
97struct StdioPreparedStatement {
120 shape: crate::storage::query::ast::QueryExpr,
121 parameter_count: usize,
122}
123
124pub(crate) struct Session {
125 next_tx_id: u64,
126 current_tx: Option<OpenTx>,
127 next_cursor_id: u64,
128 cursors: std::collections::HashMap<u64, Cursor>,
129 next_prepared_id: u64,
131 prepared: std::collections::HashMap<u64, StdioPreparedStatement>,
133}
134
135impl Session {
136 pub(crate) fn new() -> Self {
137 Self {
138 next_tx_id: 1,
139 current_tx: None,
140 next_cursor_id: 1,
141 cursors: std::collections::HashMap::new(),
142 next_prepared_id: 1,
143 prepared: std::collections::HashMap::new(),
144 }
145 }
146
147 fn open_tx(&mut self) -> Result<u64, (&'static str, String)> {
148 if let Some(tx) = &self.current_tx {
149 return Err((
150 error_code::TX_ALREADY_OPEN,
151 format!("transaction {} already open in this session", tx.tx_id),
152 ));
153 }
154 let tx_id = self.next_tx_id;
155 self.next_tx_id = self.next_tx_id.saturating_add(1);
156 self.current_tx = Some(OpenTx {
157 tx_id,
158 write_set: Vec::new(),
159 });
160 Ok(tx_id)
161 }
162
163 fn take_tx(&mut self) -> Option<OpenTx> {
164 self.current_tx.take()
165 }
166
167 fn current_tx_mut(&mut self) -> Option<&mut OpenTx> {
168 self.current_tx.as_mut()
169 }
170
171 #[allow(dead_code)]
172 fn has_tx(&self) -> bool {
173 self.current_tx.is_some()
174 }
175
176 fn insert_cursor(&mut self, cursor: Cursor) -> Result<u64, (&'static str, String)> {
179 if self.cursors.len() >= MAX_CURSORS_PER_SESSION {
180 return Err((
181 error_code::CURSOR_LIMIT_EXCEEDED,
182 format!(
183 "session already holds {} cursors (max {}) — close some before opening new ones",
184 self.cursors.len(),
185 MAX_CURSORS_PER_SESSION
186 ),
187 ));
188 }
189 let id = self.next_cursor_id;
190 self.next_cursor_id = self.next_cursor_id.saturating_add(1);
191 let mut cursor = cursor;
192 cursor.cursor_id = id;
193 self.cursors.insert(id, cursor);
194 Ok(id)
195 }
196
197 fn cursor_mut(&mut self, id: u64) -> Option<&mut Cursor> {
198 self.cursors.get_mut(&id)
199 }
200
201 fn drop_cursor(&mut self, id: u64) -> Option<Cursor> {
202 self.cursors.remove(&id)
203 }
204
205 fn clear_cursors(&mut self) {
206 self.cursors.clear();
207 }
208}
209
210impl Default for Session {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216struct OpenTx {
218 tx_id: u64,
219 write_set: Vec<PendingSql>,
220}
221
222enum PendingSql {
226 Insert(String),
227 Delete(String),
228 #[allow(dead_code)] Update(String),
230}
231
232impl PendingSql {
233 fn sql(&self) -> &str {
234 match self {
235 PendingSql::Insert(s) | PendingSql::Delete(s) | PendingSql::Update(s) => s,
236 }
237 }
238}
239
240pub(crate) struct Cursor {
252 cursor_id: u64,
253 columns: Vec<String>,
254 rows: Vec<UnifiedRecord>,
255 position: usize,
256}
257
258impl Cursor {
259 fn new(columns: Vec<String>, rows: Vec<UnifiedRecord>) -> Self {
260 Self {
261 cursor_id: 0, columns,
263 rows,
264 position: 0,
265 }
266 }
267
268 fn total(&self) -> usize {
269 self.rows.len()
270 }
271
272 fn remaining(&self) -> usize {
273 self.rows.len().saturating_sub(self.position)
274 }
275
276 fn is_exhausted(&self) -> bool {
277 self.position >= self.rows.len()
278 }
279
280 fn take_batch(&mut self, batch_size: usize) -> &[UnifiedRecord] {
283 let end = (self.position + batch_size).min(self.rows.len());
284 let slice = &self.rows[self.position..end];
285 self.position = end;
286 slice
287 }
288}
289
290pub fn run(runtime: &RedDBRuntime) -> i32 {
296 run_with_io(runtime, std::io::stdin(), &mut std::io::stdout())
297}
298
299pub fn run_remote(endpoint: &str, token: Option<String>) -> i32 {
306 let tokio_rt = match tokio::runtime::Builder::new_current_thread()
307 .enable_all()
308 .build()
309 {
310 Ok(rt) => rt,
311 Err(e) => {
312 tracing::error!(err = %e, "rpc: failed to build tokio runtime");
313 return 1;
314 }
315 };
316 let client = match tokio_rt.block_on(RedDBClient::connect(endpoint, token)) {
317 Ok(c) => c,
318 Err(e) => {
319 tracing::error!(endpoint, err = %e, "rpc: failed to connect");
320 return 1;
321 }
322 };
323 let backend = Backend::Remote(Box::new(RemoteBackend {
324 client: AsyncMutex::new(client),
325 tokio_rt: &tokio_rt,
326 }));
327 run_backend(&backend, std::io::stdin(), &mut std::io::stdout())
328}
329
330pub fn run_with_io<W: Write>(runtime: &RedDBRuntime, stdin: Stdin, stdout: &mut W) -> i32 {
332 run_backend(&Backend::Local(runtime), stdin, stdout)
333}
334
335static STDIO_SESSION_CONN_ID: std::sync::atomic::AtomicU64 =
342 std::sync::atomic::AtomicU64::new(1_000_000);
343
344fn next_stdio_conn_id() -> u64 {
345 STDIO_SESSION_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
346}
347
348fn run_backend<W: Write>(backend: &Backend<'_>, stdin: Stdin, stdout: &mut W) -> i32 {
349 let reader = BufReader::new(stdin.lock());
350 let mut session = Session::new();
351 let conn_id = next_stdio_conn_id();
355 crate::runtime::impl_core::set_current_connection_id(conn_id);
356 for line_result in reader.lines() {
357 let line = match line_result {
358 Ok(l) => l,
359 Err(e) => {
360 let _ = writeln!(
361 stdout,
362 "{}",
363 error_response(&Value::Null, error_code::INTERNAL_ERROR, &e.to_string())
364 );
365 let _ = stdout.flush();
366 return 1;
367 }
368 };
369 if line.trim().is_empty() {
370 continue;
371 }
372 let response = handle_line(backend, &mut session, &line);
373 if writeln!(stdout, "{}", response).is_err() || stdout.flush().is_err() {
374 return 1;
375 }
376 if response.contains("\"__close__\":true") {
377 return 0;
378 }
379 }
380 let _ = session.take_tx();
384 crate::runtime::impl_core::clear_current_connection_id();
385 0
386}
387
388fn handle_line(backend: &Backend<'_>, session: &mut Session, line: &str) -> String {
392 let parsed: Value = match json::from_str(line) {
393 Ok(v) => v,
394 Err(err) => {
395 return error_response(
396 &Value::Null,
397 error_code::PARSE_ERROR,
398 &format!("invalid JSON: {err}"),
399 );
400 }
401 };
402
403 let id = parsed.get("id").cloned().unwrap_or(Value::Null);
404
405 let method = match parsed.get("method").and_then(Value::as_str) {
406 Some(m) => m.to_string(),
407 None => {
408 return error_response(&id, error_code::INVALID_REQUEST, "missing 'method' field");
409 }
410 };
411
412 let params = parsed.get("params").cloned().unwrap_or(Value::Null);
413
414 let dispatch = std::panic::catch_unwind(AssertUnwindSafe(|| match backend {
415 Backend::Local(rt) => dispatch_method(rt, session, &method, ¶ms),
416 Backend::Remote(remote) => {
417 if matches!(
422 method.as_str(),
423 "tx.begin"
424 | "tx.commit"
425 | "tx.rollback"
426 | "query.open"
427 | "query.next"
428 | "query.close"
429 ) {
430 Err((
431 error_code::TX_NOT_SUPPORTED_REMOTE,
432 format!("{method} is not supported over remote gRPC yet"),
433 ))
434 } else {
435 dispatch_method_remote(&remote.client, remote.tokio_rt, &method, ¶ms)
436 }
437 }
438 }));
439
440 match dispatch {
441 Ok(Ok(result)) => success_response(&id, &result, method == "close"),
442 Ok(Err((code, msg))) => error_response(&id, code, &msg),
443 Err(_) => error_response(&id, error_code::INTERNAL_ERROR, "handler panicked (caught)"),
444 }
445}
446
447fn dispatch_method(
450 runtime: &RedDBRuntime,
451 session: &mut Session,
452 method: &str,
453 params: &Value,
454) -> Result<Value, (&'static str, String)> {
455 match method {
456 "tx.begin" => {
457 let tx_id = session.open_tx()?;
458 Ok(Value::Object(
459 [
460 ("tx_id".to_string(), Value::Number(tx_id as f64)),
461 (
462 "isolation".to_string(),
463 Value::String("read_committed_deferred".to_string()),
464 ),
465 ]
466 .into_iter()
467 .collect(),
468 ))
469 }
470
471 "tx.commit" => {
472 let tx = session.take_tx().ok_or((
473 error_code::NO_TX_OPEN,
474 "no transaction is open in this session".to_string(),
475 ))?;
476 let tx_id = tx.tx_id;
477 let op_count = tx.write_set.len();
478
479 let replay: Result<(u64, usize), (usize, String)> = (|| {
486 runtime
487 .execute_query("BEGIN")
488 .map_err(|e| (0usize, format!("BEGIN: {e}")))?;
489 let mut total_affected: u64 = 0;
490 for (idx, op) in tx.write_set.iter().enumerate() {
491 match runtime.execute_query(op.sql()) {
492 Ok(qr) => total_affected += qr.affected_rows,
493 Err(e) => {
494 let _ = runtime.execute_query("ROLLBACK");
495 return Err((idx, e.to_string()));
496 }
497 }
498 }
499 runtime
500 .execute_query("COMMIT")
501 .map_err(|e| (op_count, format!("COMMIT: {e}")))?;
502 Ok((total_affected, op_count))
503 })();
504
505 match replay {
506 Ok((affected, replayed)) => Ok(Value::Object(
507 [
508 ("tx_id".to_string(), Value::Number(tx_id as f64)),
509 ("ops_replayed".to_string(), Value::Number(replayed as f64)),
510 ("affected".to_string(), Value::Number(affected as f64)),
511 ]
512 .into_iter()
513 .collect(),
514 )),
515 Err((failed_idx, msg)) => Err((
516 error_code::TX_REPLAY_FAILED,
517 format!(
518 "tx {tx_id} replay failed at op {failed_idx}/{op_count}: {msg} \
519 (ops 0..{failed_idx} already applied and are NOT rolled back)"
520 ),
521 )),
522 }
523 }
524
525 "query.open" => {
526 let sql = params.get("sql").and_then(Value::as_str).ok_or((
527 error_code::INVALID_PARAMS,
528 "missing 'sql' string".to_string(),
529 ))?;
530 let qr = runtime
531 .execute_query(sql)
532 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
533
534 let columns: Vec<String> = qr
538 .result
539 .records
540 .first()
541 .map(|first| {
542 let mut keys: Vec<String> = first
543 .column_names()
544 .into_iter()
545 .map(|k| k.to_string())
546 .collect();
547 keys.sort();
548 keys
549 })
550 .unwrap_or_default();
551
552 let cursor = Cursor::new(columns.clone(), qr.result.records);
553 let total = cursor.total();
554 let cursor_id = session.insert_cursor(cursor)?;
555
556 Ok(Value::Object(
557 [
558 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
559 (
560 "columns".to_string(),
561 Value::Array(columns.into_iter().map(Value::String).collect()),
562 ),
563 ("total_rows".to_string(), Value::Number(total as f64)),
564 ]
565 .into_iter()
566 .collect(),
567 ))
568 }
569
570 "query.next" => {
571 let cursor_id = params
572 .get("cursor_id")
573 .and_then(|v| v.as_f64())
574 .map(|n| n as u64)
575 .ok_or((
576 error_code::INVALID_PARAMS,
577 "missing 'cursor_id' number".to_string(),
578 ))?;
579 let batch_size = params
580 .get("batch_size")
581 .and_then(|v| v.as_f64())
582 .map(|n| n as usize)
583 .unwrap_or(DEFAULT_CURSOR_BATCH_SIZE)
584 .clamp(1, MAX_CURSOR_BATCH_SIZE);
585
586 let (rows, done, remaining) = {
589 let cursor = session.cursor_mut(cursor_id).ok_or((
590 error_code::CURSOR_NOT_FOUND,
591 format!("cursor {cursor_id} not found"),
592 ))?;
593 let batch = cursor.take_batch(batch_size);
594 let rows_json: Vec<Value> = batch.iter().map(record_to_json_object).collect();
595 (rows_json, cursor.is_exhausted(), cursor.remaining())
596 };
597
598 if done {
599 let _ = session.drop_cursor(cursor_id);
602 }
603
604 Ok(Value::Object(
605 [
606 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
607 ("rows".to_string(), Value::Array(rows)),
608 ("done".to_string(), Value::Bool(done)),
609 ("remaining".to_string(), Value::Number(remaining as f64)),
610 ]
611 .into_iter()
612 .collect(),
613 ))
614 }
615
616 "query.close" => {
617 let cursor_id = params
618 .get("cursor_id")
619 .and_then(|v| v.as_f64())
620 .map(|n| n as u64)
621 .ok_or((
622 error_code::INVALID_PARAMS,
623 "missing 'cursor_id' number".to_string(),
624 ))?;
625 let existed = session.drop_cursor(cursor_id).is_some();
626 if !existed {
627 return Err((
628 error_code::CURSOR_NOT_FOUND,
629 format!("cursor {cursor_id} not found"),
630 ));
631 }
632 Ok(Value::Object(
633 [
634 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
635 ("closed".to_string(), Value::Bool(true)),
636 ]
637 .into_iter()
638 .collect(),
639 ))
640 }
641
642 "tx.rollback" => {
643 let tx = session.take_tx().ok_or((
644 error_code::NO_TX_OPEN,
645 "no transaction is open in this session".to_string(),
646 ))?;
647 let ops_discarded = tx.write_set.len();
648 Ok(Value::Object(
649 [
650 ("tx_id".to_string(), Value::Number(tx.tx_id as f64)),
651 (
652 "ops_discarded".to_string(),
653 Value::Number(ops_discarded as f64),
654 ),
655 ]
656 .into_iter()
657 .collect(),
658 ))
659 }
660
661 "version" => Ok(Value::Object(
662 [
663 (
664 "version".to_string(),
665 Value::String(env!("CARGO_PKG_VERSION").to_string()),
666 ),
667 (
668 "protocol".to_string(),
669 Value::String(PROTOCOL_VERSION.to_string()),
670 ),
671 ]
672 .into_iter()
673 .collect(),
674 )),
675
676 "health" => Ok(Value::Object(
677 [
678 ("ok".to_string(), Value::Bool(true)),
679 (
680 "version".to_string(),
681 Value::String(env!("CARGO_PKG_VERSION").to_string()),
682 ),
683 ]
684 .into_iter()
685 .collect(),
686 )),
687
688 "query" => {
689 let sql = params.get("sql").and_then(Value::as_str).ok_or((
690 error_code::INVALID_PARAMS,
691 "missing 'sql' string".to_string(),
692 ))?;
693
694 let bind_values: Option<Vec<SchemaValue>> = params
697 .get("params")
698 .map(|v| {
699 v.as_array()
700 .ok_or((
701 error_code::INVALID_PARAMS,
702 "'params' must be an array".to_string(),
703 ))
704 .map(|arr| arr.iter().map(json_value_to_schema_value).collect())
705 })
706 .transpose()?;
707
708 if let Some(binds) = bind_values {
709 use crate::storage::query::modes::parse_multi;
710 use crate::storage::query::user_params;
711 let parsed =
712 parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
713 let bound = user_params::bind(&parsed, &binds)
714 .map_err(|e| (error_code::INVALID_PARAMS, e.to_string()))?;
715 let qr = runtime
716 .execute_query_expr(bound)
717 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
718 return Ok(query_result_to_json(&qr));
719 }
720
721 let qr = runtime
722 .execute_query(sql)
723 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
724 Ok(query_result_to_json(&qr))
725 }
726
727 "prepare" => {
736 use crate::storage::query::modes::parse_multi;
737 use crate::storage::query::planner::shape::parameterize_query_expr;
738
739 let sql = params.get("sql").and_then(Value::as_str).ok_or((
740 error_code::INVALID_PARAMS,
741 "missing 'sql' string".to_string(),
742 ))?;
743 let parsed = parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
744 let (shape, parameter_count) = if let Some(prepared) = parameterize_query_expr(&parsed)
745 {
746 (prepared.shape, prepared.parameter_count)
747 } else {
748 (parsed, 0)
749 };
750 let id = session.next_prepared_id;
751 session.next_prepared_id = session.next_prepared_id.saturating_add(1);
752 session.prepared.insert(
753 id,
754 StdioPreparedStatement {
755 shape,
756 parameter_count,
757 },
758 );
759 Ok(Value::Object(
760 [
761 ("prepared_id".to_string(), Value::Number(id as f64)),
762 (
763 "parameter_count".to_string(),
764 Value::Number(parameter_count as f64),
765 ),
766 ]
767 .into_iter()
768 .collect(),
769 ))
770 }
771
772 "execute_prepared" => {
773 use crate::storage::query::planner::shape::bind_parameterized_query;
774 use crate::storage::schema::Value as SV;
775
776 let id = params
777 .get("prepared_id")
778 .and_then(Value::as_f64)
779 .map(|n| n as u64)
780 .ok_or((
781 error_code::INVALID_PARAMS,
782 "missing 'prepared_id'".to_string(),
783 ))?;
784
785 let stmt = session.prepared.get(&id).ok_or((
786 error_code::QUERY_ERROR,
787 format!("no prepared statement with id {id}"),
788 ))?;
789
790 let binds_json: Vec<Value> = params
792 .get("binds")
793 .and_then(Value::as_array)
794 .map(|a| a.to_vec())
795 .unwrap_or_default();
796 if binds_json.len() != stmt.parameter_count {
797 return Err((
798 error_code::INVALID_PARAMS,
799 format!(
800 "expected {} bind values, got {}",
801 stmt.parameter_count,
802 binds_json.len()
803 ),
804 ));
805 }
806
807 let binds: Vec<SV> = binds_json.iter().map(json_value_to_schema_value).collect();
809
810 let expr = if stmt.parameter_count == 0 {
812 stmt.shape.clone()
813 } else {
814 bind_parameterized_query(&stmt.shape, &binds, stmt.parameter_count)
815 .ok_or((error_code::QUERY_ERROR, "bind failed".to_string()))?
816 };
817
818 let qr = runtime
819 .execute_query_expr(expr)
820 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
821 Ok(query_result_to_json(&qr))
822 }
823
824 "insert" => {
825 let collection = params.get("collection").and_then(Value::as_str).ok_or((
826 error_code::INVALID_PARAMS,
827 "missing 'collection' string".to_string(),
828 ))?;
829 let payload = params.get("payload").ok_or((
830 error_code::INVALID_PARAMS,
831 "missing 'payload' object".to_string(),
832 ))?;
833 let payload_obj = payload.as_object().ok_or((
834 error_code::INVALID_PARAMS,
835 "'payload' must be a JSON object".to_string(),
836 ))?;
837 if let Some(tx) = session.current_tx_mut() {
838 let sql = build_insert_sql(collection, payload_obj.iter());
839 tx.write_set.push(PendingSql::Insert(sql));
840 return Ok(pending_tx_response(tx.tx_id));
841 }
842
843 let output = runtime
844 .create_row(flat_payload_to_row_input(collection, payload_obj))
845 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
846 let mut out = json::Map::new();
847 out.insert("affected".to_string(), Value::Number(1.0));
848 out.insert("id".to_string(), Value::String(output.id.raw().to_string()));
849 Ok(Value::Object(out))
850 }
851
852 "bulk_insert" => {
853 let collection = params.get("collection").and_then(Value::as_str).ok_or((
854 error_code::INVALID_PARAMS,
855 "missing 'collection' string".to_string(),
856 ))?;
857 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
858 error_code::INVALID_PARAMS,
859 "missing 'payloads' array".to_string(),
860 ))?;
861
862 let mut objects = Vec::with_capacity(payloads.len());
863 for entry in payloads {
864 objects.push(entry.as_object().ok_or((
865 error_code::INVALID_PARAMS,
866 "each payload must be a JSON object".to_string(),
867 ))?);
868 }
869
870 if let Some(tx) = session.current_tx_mut() {
871 let mut buffered: u64 = 0;
872 for obj in &objects {
873 let sql = build_insert_sql(collection, obj.iter());
874 tx.write_set.push(PendingSql::Insert(sql));
875 buffered += 1;
876 }
877 let tx_id = tx.tx_id;
878 return Ok(Value::Object(
879 [
880 ("affected".to_string(), Value::Number(0.0)),
881 ("buffered".to_string(), Value::Number(buffered as f64)),
882 ("pending".to_string(), Value::Bool(true)),
883 ("tx_id".to_string(), Value::Number(tx_id as f64)),
884 ]
885 .into_iter()
886 .collect(),
887 ));
888 }
889
890 if should_bulk_insert_graph(runtime, collection, &objects) {
891 return bulk_insert_graph(runtime, collection, &objects)
892 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()));
893 }
894
895 let mut total_affected: u64 = 0;
896 let mut ids = Vec::with_capacity(objects.len());
897 for chunk in objects.chunks(STDIO_BULK_INSERT_CHUNK_ROWS) {
898 let rows = chunk
899 .iter()
900 .map(|obj| flat_payload_to_row_input(collection, obj))
901 .collect();
902 let outputs = runtime
903 .create_rows_batch(CreateRowsBatchInput {
904 collection: collection.to_string(),
905 rows,
906 suppress_events: false,
907 })
908 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
909 total_affected += outputs.len() as u64;
910 ids.extend(
911 outputs
912 .into_iter()
913 .map(|output| Value::String(output.id.raw().to_string())),
914 );
915 }
916 let mut out = json::Map::new();
917 out.insert("affected".to_string(), Value::Number(total_affected as f64));
918 out.insert("ids".to_string(), Value::Array(ids));
919 Ok(Value::Object(out))
920 }
921
922 "get" => {
923 let collection = params.get("collection").and_then(Value::as_str).ok_or((
924 error_code::INVALID_PARAMS,
925 "missing 'collection' string".to_string(),
926 ))?;
927 let id = params.get("id").and_then(Value::as_str).ok_or((
928 error_code::INVALID_PARAMS,
929 "missing 'id' string".to_string(),
930 ))?;
931 let sql = format!("SELECT * FROM {collection} WHERE red_entity_id = {id} LIMIT 1");
932 let qr = runtime
933 .execute_query(&sql)
934 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
935 let entity = qr
936 .result
937 .records
938 .first()
939 .map(record_to_json_object)
940 .unwrap_or(Value::Null);
941 Ok(Value::Object(
942 [("entity".to_string(), entity)].into_iter().collect(),
943 ))
944 }
945
946 "delete" => {
947 let collection = params.get("collection").and_then(Value::as_str).ok_or((
948 error_code::INVALID_PARAMS,
949 "missing 'collection' string".to_string(),
950 ))?;
951 let id = params.get("id").and_then(Value::as_str).ok_or((
952 error_code::INVALID_PARAMS,
953 "missing 'id' string".to_string(),
954 ))?;
955 let sql = format!("DELETE FROM {collection} WHERE red_entity_id = {id}");
956
957 if let Some(tx) = session.current_tx_mut() {
958 tx.write_set.push(PendingSql::Delete(sql));
959 return Ok(pending_tx_response(tx.tx_id));
960 }
961
962 let qr = runtime
963 .execute_query(&sql)
964 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
965 Ok(Value::Object(
966 [(
967 "affected".to_string(),
968 Value::Number(qr.affected_rows as f64),
969 )]
970 .into_iter()
971 .collect(),
972 ))
973 }
974
975 "close" => {
976 let _ = session.take_tx();
981 session.clear_cursors();
982 let _ = runtime.checkpoint();
983 Ok(Value::Null)
984 }
985
986 "auth.login"
991 | "auth.whoami"
992 | "auth.change_password"
993 | "auth.create_api_key"
994 | "auth.revoke_api_key" => {
995 let _ = (session, params);
996 Err((
997 error_code::INVALID_REQUEST,
998 format!(
999 "{method}: auth methods are only available on grpc:// connections; \
1000 embedded modes (memory://, file://) inherit caller privileges"
1001 ),
1002 ))
1003 }
1004
1005 other => Err((
1006 error_code::INVALID_REQUEST,
1007 format!("unknown method: {other}"),
1008 )),
1009 }
1010}
1011
1012fn success_response(id: &Value, result: &Value, is_close: bool) -> String {
1017 let mut envelope = json::Map::new();
1022 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1023 envelope.insert("id".to_string(), id.clone());
1024 envelope.insert("result".to_string(), result.clone());
1025 if is_close {
1026 envelope.insert("__close__".to_string(), Value::Bool(true));
1027 }
1028 Value::Object(envelope).to_string_compact()
1029}
1030
1031fn error_response(id: &Value, code: &str, message: &str) -> String {
1032 let mut err = json::Map::new();
1033 err.insert("code".to_string(), Value::String(code.to_string()));
1034 err.insert("message".to_string(), Value::String(message.to_string()));
1035 err.insert("data".to_string(), Value::Null);
1036
1037 let mut envelope = json::Map::new();
1038 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1039 envelope.insert("id".to_string(), id.clone());
1040 envelope.insert("error".to_string(), Value::Object(err));
1041 Value::Object(envelope).to_string_compact()
1042}
1043
1044fn pending_tx_response(tx_id: u64) -> Value {
1051 Value::Object(
1052 [
1053 ("affected".to_string(), Value::Number(0.0)),
1054 ("pending".to_string(), Value::Bool(true)),
1055 ("tx_id".to_string(), Value::Number(tx_id as f64)),
1056 ]
1057 .into_iter()
1058 .collect(),
1059 )
1060}
1061
1062pub(crate) fn build_insert_sql<'a, I>(collection: &str, fields: I) -> String
1063where
1064 I: Iterator<Item = (&'a String, &'a Value)>,
1065{
1066 let mut cols = Vec::new();
1067 let mut vals = Vec::new();
1068 for (k, v) in fields {
1069 cols.push(k.clone());
1070 vals.push(value_to_sql_literal(v));
1071 }
1072 format!(
1073 "INSERT INTO {collection} ({}) VALUES ({})",
1074 cols.join(", "),
1075 vals.join(", "),
1076 )
1077}
1078
1079fn flat_payload_to_row_input(
1080 collection: &str,
1081 payload: &json::Map<String, Value>,
1082) -> CreateRowInput {
1083 CreateRowInput {
1084 collection: collection.to_string(),
1085 fields: payload
1086 .iter()
1087 .map(|(key, value)| (key.clone(), json_value_to_schema_value(value)))
1088 .collect(),
1089 metadata: Vec::new(),
1090 node_links: Vec::new(),
1091 vector_links: Vec::new(),
1092 }
1093}
1094
1095fn bulk_insert_chunk_count(row_count: usize) -> usize {
1096 if row_count == 0 {
1097 0
1098 } else {
1099 ((row_count - 1) / STDIO_BULK_INSERT_CHUNK_ROWS) + 1
1100 }
1101}
1102
1103pub(crate) fn should_bulk_insert_graph(
1104 runtime: &RedDBRuntime,
1105 collection: &str,
1106 payloads: &[&json::Map<String, Value>],
1107) -> bool {
1108 let graph_shaped = payloads
1109 .iter()
1110 .all(|payload| payload.get("label").and_then(Value::as_str).is_some());
1111 if !graph_shaped {
1112 return false;
1113 }
1114
1115 matches!(
1116 runtime
1117 .db()
1118 .catalog_model_snapshot()
1119 .collections
1120 .iter()
1121 .find(|descriptor| descriptor.name == collection)
1122 .map(|descriptor| descriptor.declared_model.unwrap_or(descriptor.model)),
1123 Some(crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed)
1124 )
1125}
1126
1127pub(crate) fn bulk_insert_graph(
1128 runtime: &RedDBRuntime,
1129 collection: &str,
1130 payloads: &[&json::Map<String, Value>],
1131) -> crate::RedDBResult<Value> {
1132 use crate::application::entity_payload::{parse_create_edge_input, parse_create_node_input};
1133 use crate::application::ports::RuntimeEntityPort;
1134
1135 let mut ids = Vec::with_capacity(payloads.len());
1136 for payload in payloads {
1137 let input_payload = normalize_flat_graph_payload(payload);
1138 let id = if payload.contains_key("from") || payload.contains_key("to") {
1139 runtime
1140 .create_edge(parse_create_edge_input(
1141 collection.to_string(),
1142 &input_payload,
1143 )?)?
1144 .id
1145 } else {
1146 runtime
1147 .create_node(parse_create_node_input(
1148 collection.to_string(),
1149 &input_payload,
1150 )?)?
1151 .id
1152 };
1153 ids.push(Value::Number(id.raw() as f64));
1154 }
1155
1156 let mut out = json::Map::new();
1157 out.insert("affected".to_string(), Value::Number(ids.len() as f64));
1158 out.insert("ids".to_string(), Value::Array(ids));
1159 Ok(Value::Object(out))
1160}
1161
1162fn normalize_flat_graph_payload(payload: &json::Map<String, Value>) -> Value {
1163 if payload.contains_key("properties") || payload.contains_key("fields") {
1164 return Value::Object(payload.clone());
1165 }
1166
1167 let is_edge = payload.contains_key("from") || payload.contains_key("to");
1168 let mut normalized = payload.clone();
1169 let mut properties = json::Map::new();
1170 for (key, value) in payload {
1171 let reserved = if is_edge {
1172 matches!(
1173 key.as_str(),
1174 "label"
1175 | "from"
1176 | "to"
1177 | "weight"
1178 | "metadata"
1179 | "properties"
1180 | "fields"
1181 | "_ttl_ms"
1182 | "_expires_at"
1183 )
1184 } else {
1185 matches!(
1186 key.as_str(),
1187 "label"
1188 | "node_type"
1189 | "metadata"
1190 | "links"
1191 | "embeddings"
1192 | "properties"
1193 | "fields"
1194 | "_ttl_ms"
1195 | "_expires_at"
1196 )
1197 };
1198 if !reserved {
1199 properties.insert(key.clone(), value.clone());
1200 }
1201 }
1202 if !properties.is_empty() {
1203 normalized.insert("properties".to_string(), Value::Object(properties));
1204 }
1205 Value::Object(normalized)
1206}
1207
1208pub(crate) fn value_to_sql_literal(v: &Value) -> String {
1209 match v {
1210 Value::Null => "NULL".to_string(),
1211 Value::Bool(b) => b.to_string(),
1212 Value::Number(n) => {
1213 if n.fract() == 0.0 {
1214 format!("{}", *n as i64)
1215 } else {
1216 n.to_string()
1217 }
1218 }
1219 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1220 other => format!("'{}'", other.to_string_compact().replace('\'', "''")),
1221 }
1222}
1223
1224pub(crate) fn query_result_to_json(qr: &RuntimeQueryResult) -> Value {
1225 if let Some(ask) = ask_query_result_to_json(qr) {
1226 return ask;
1227 }
1228
1229 let mut envelope = json::Map::new();
1230 envelope.insert(
1231 "statement".to_string(),
1232 Value::String(qr.statement_type.to_string()),
1233 );
1234 envelope.insert(
1235 "affected".to_string(),
1236 Value::Number(qr.affected_rows as f64),
1237 );
1238
1239 let mut columns = Vec::new();
1240 if let Some(first) = qr.result.records.first() {
1241 let mut keys: Vec<String> = first
1242 .column_names()
1243 .into_iter()
1244 .map(|k| k.to_string())
1245 .collect();
1246 keys.sort();
1247 columns = keys.into_iter().map(Value::String).collect();
1248 }
1249 envelope.insert("columns".to_string(), Value::Array(columns));
1250
1251 let rows: Vec<Value> = qr
1252 .result
1253 .records
1254 .iter()
1255 .map(record_to_json_object)
1256 .collect();
1257 envelope.insert("rows".to_string(), Value::Array(rows));
1258
1259 Value::Object(envelope)
1260}
1261
1262fn ask_query_result_to_json(qr: &RuntimeQueryResult) -> Option<Value> {
1263 if qr.statement != "ask" {
1264 return None;
1265 }
1266 let row = qr.result.records.first()?;
1267 let answer = text_field(row, "answer")?;
1268 let provider = text_field(row, "provider").unwrap_or_default();
1269 let model = text_field(row, "model").unwrap_or_default();
1270 let sources_flat_json = json_field(row, "sources_flat").unwrap_or(Value::Array(Vec::new()));
1271 let citations_json = json_field(row, "citations").unwrap_or(Value::Array(Vec::new()));
1272 let validation_json = json_field(row, "validation").unwrap_or(Value::Object(json::Map::new()));
1273
1274 let effective_mode = match text_field(row, "mode").as_deref() {
1275 Some("lenient") => crate::runtime::ai::ask_response_envelope::Mode::Lenient,
1276 _ => crate::runtime::ai::ask_response_envelope::Mode::Strict,
1277 };
1278
1279 let result = crate::runtime::ai::ask_response_envelope::AskResult {
1280 answer,
1281 sources_flat: envelope_sources_flat(&sources_flat_json),
1282 citations: envelope_citations(&citations_json),
1283 validation: envelope_validation(&validation_json),
1284 cache_hit: bool_field(row, "cache_hit").unwrap_or(false),
1285 provider,
1286 model,
1287 prompt_tokens: u32_field(row, "prompt_tokens").unwrap_or(0),
1288 completion_tokens: u32_field(row, "completion_tokens").unwrap_or(0),
1289 cost_usd: f64_field(row, "cost_usd").unwrap_or(0.0),
1290 effective_mode,
1291 retry_count: u32_field(row, "retry_count").unwrap_or(0),
1292 };
1293 Some(crate::runtime::ai::ask_response_envelope::build(&result))
1294}
1295
1296fn record_field<'a>(record: &'a UnifiedRecord, name: &str) -> Option<&'a SchemaValue> {
1297 record
1298 .iter_fields()
1299 .find_map(|(key, value)| (key.as_ref() == name).then_some(value))
1300}
1301
1302fn text_field(record: &UnifiedRecord, name: &str) -> Option<String> {
1303 match record_field(record, name)? {
1304 SchemaValue::Text(s) => Some(s.to_string()),
1305 SchemaValue::Email(s)
1306 | SchemaValue::Url(s)
1307 | SchemaValue::NodeRef(s)
1308 | SchemaValue::EdgeRef(s) => Some(s.clone()),
1309 other => Some(format!("{other}")),
1310 }
1311}
1312
1313fn u32_field(record: &UnifiedRecord, name: &str) -> Option<u32> {
1314 match record_field(record, name)? {
1315 SchemaValue::Integer(n) => (*n >= 0).then_some((*n).min(u32::MAX as i64) as u32),
1316 SchemaValue::UnsignedInteger(n) => Some((*n).min(u32::MAX as u64) as u32),
1317 SchemaValue::BigInt(n)
1318 | SchemaValue::TimestampMs(n)
1319 | SchemaValue::Timestamp(n)
1320 | SchemaValue::Duration(n)
1321 | SchemaValue::Decimal(n) => (*n >= 0).then_some((*n).min(u32::MAX as i64) as u32),
1322 SchemaValue::Float(n) => (*n >= 0.0).then_some((*n).min(u32::MAX as f64) as u32),
1323 _ => None,
1324 }
1325}
1326
1327fn f64_field(record: &UnifiedRecord, name: &str) -> Option<f64> {
1328 match record_field(record, name)? {
1329 SchemaValue::Integer(n) => Some(*n as f64),
1330 SchemaValue::UnsignedInteger(n) => Some(*n as f64),
1331 SchemaValue::BigInt(n)
1332 | SchemaValue::TimestampMs(n)
1333 | SchemaValue::Timestamp(n)
1334 | SchemaValue::Duration(n)
1335 | SchemaValue::Decimal(n) => Some(*n as f64),
1336 SchemaValue::Float(n) => Some(*n),
1337 _ => None,
1338 }
1339}
1340
1341fn bool_field(record: &UnifiedRecord, name: &str) -> Option<bool> {
1342 match record_field(record, name)? {
1343 SchemaValue::Boolean(value) => Some(*value),
1344 _ => None,
1345 }
1346}
1347
1348fn json_field(record: &UnifiedRecord, name: &str) -> Option<Value> {
1349 match record_field(record, name)? {
1350 SchemaValue::Json(bytes) => json::from_slice(bytes).ok(),
1351 SchemaValue::Text(text) => json::from_str(text).ok(),
1352 _ => None,
1353 }
1354}
1355
1356fn envelope_sources_flat(
1357 value: &Value,
1358) -> Vec<crate::runtime::ai::ask_response_envelope::SourceRow> {
1359 value
1360 .as_array()
1361 .unwrap_or(&[])
1362 .iter()
1363 .filter_map(|source| {
1364 let urn = source.get("urn").and_then(Value::as_str)?.to_string();
1365 let payload = source
1366 .get("payload")
1367 .and_then(Value::as_str)
1368 .map(ToString::to_string)
1369 .unwrap_or_else(|| source.to_string_compact());
1370 Some(crate::runtime::ai::ask_response_envelope::SourceRow { urn, payload })
1371 })
1372 .collect()
1373}
1374
1375fn envelope_citations(value: &Value) -> Vec<crate::runtime::ai::ask_response_envelope::Citation> {
1376 value
1377 .as_array()
1378 .unwrap_or(&[])
1379 .iter()
1380 .filter_map(|citation| {
1381 let marker = citation.get("marker").and_then(Value::as_u64)?;
1382 let urn = citation.get("urn").and_then(Value::as_str)?.to_string();
1383 Some(crate::runtime::ai::ask_response_envelope::Citation {
1384 marker: marker.min(u32::MAX as u64) as u32,
1385 urn,
1386 })
1387 })
1388 .collect()
1389}
1390
1391fn envelope_validation(value: &Value) -> crate::runtime::ai::ask_response_envelope::Validation {
1392 crate::runtime::ai::ask_response_envelope::Validation {
1393 ok: value.get("ok").and_then(Value::as_bool).unwrap_or(true),
1394 warnings: validation_items(value, "warnings")
1395 .into_iter()
1396 .map(
1397 |(kind, detail)| crate::runtime::ai::ask_response_envelope::ValidationWarning {
1398 kind,
1399 detail,
1400 },
1401 )
1402 .collect(),
1403 errors: validation_items(value, "errors")
1404 .into_iter()
1405 .map(
1406 |(kind, detail)| crate::runtime::ai::ask_response_envelope::ValidationError {
1407 kind,
1408 detail,
1409 },
1410 )
1411 .collect(),
1412 }
1413}
1414
1415fn validation_items(value: &Value, key: &str) -> Vec<(String, String)> {
1416 value
1417 .get(key)
1418 .and_then(Value::as_array)
1419 .unwrap_or(&[])
1420 .iter()
1421 .filter_map(|item| {
1422 Some((
1423 item.get("kind").and_then(Value::as_str)?.to_string(),
1424 item.get("detail")
1425 .and_then(Value::as_str)
1426 .unwrap_or("")
1427 .to_string(),
1428 ))
1429 })
1430 .collect()
1431}
1432
1433pub(crate) fn insert_result_to_json(qr: &RuntimeQueryResult) -> Value {
1434 let mut envelope = json::Map::new();
1435 envelope.insert(
1436 "affected".to_string(),
1437 Value::Number(qr.affected_rows as f64),
1438 );
1439 if let Some(first) = qr.result.records.first() {
1441 if let Some(id_val) = first
1442 .iter_fields()
1443 .find(|(k, _)| {
1444 let s: &str = k;
1445 s == "_entity_id"
1446 })
1447 .map(|(_, v)| schema_value_to_json(v))
1448 {
1449 envelope.insert("id".to_string(), id_val);
1450 }
1451 }
1452 Value::Object(envelope)
1453}
1454
1455fn record_to_json_object(record: &UnifiedRecord) -> Value {
1456 let mut map = json::Map::new();
1457 let mut entries: Vec<(&str, &SchemaValue)> =
1460 record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
1461 entries.sort_by(|a, b| a.0.cmp(b.0));
1462 for (k, v) in entries {
1463 map.insert(k.to_string(), schema_value_to_json(v));
1464 }
1465 Value::Object(map)
1466}
1467
1468fn schema_value_to_json(v: &SchemaValue) -> Value {
1469 match v {
1470 SchemaValue::Null => Value::Null,
1471 SchemaValue::Boolean(b) => Value::Bool(*b),
1472 SchemaValue::Integer(n) => Value::Number(*n as f64),
1473 SchemaValue::UnsignedInteger(n) => Value::Number(*n as f64),
1474 SchemaValue::Float(n) if n.is_finite() => Value::Number(*n),
1475 SchemaValue::Float(n) => {
1476 let token = if n.is_nan() {
1477 "NaN"
1478 } else if n.is_sign_positive() {
1479 "Infinity"
1480 } else {
1481 "-Infinity"
1482 };
1483 single_key_object("$float", Value::String(token.to_string()))
1484 }
1485 SchemaValue::BigInt(n) => Value::Number(*n as f64),
1486 SchemaValue::TimestampMs(n) | SchemaValue::Duration(n) | SchemaValue::Decimal(n) => {
1487 Value::Number(*n as f64)
1488 }
1489 SchemaValue::Timestamp(n) => single_key_object("$ts", Value::String(n.to_string())),
1490 SchemaValue::Password(_) | SchemaValue::Secret(_) => Value::String("***".to_string()),
1491 SchemaValue::Text(s) => Value::String(s.to_string()),
1492 SchemaValue::Blob(bytes) => {
1493 single_key_object("$bytes", Value::String(base64_encode(bytes)))
1494 }
1495 SchemaValue::Json(bytes) => {
1496 crate::presentation::entity_json::storage_json_bytes_to_json(bytes)
1497 }
1498 SchemaValue::Uuid(bytes) => single_key_object("$uuid", Value::String(format_uuid(bytes))),
1499 SchemaValue::Email(s)
1500 | SchemaValue::Url(s)
1501 | SchemaValue::NodeRef(s)
1502 | SchemaValue::EdgeRef(s) => Value::String(s.clone()),
1503 other => Value::String(format!("{other}")),
1504 }
1505}
1506
1507fn single_key_object(key: &str, value: Value) -> Value {
1508 Value::Object([(key.to_string(), value)].into_iter().collect())
1509}
1510
1511pub(crate) fn json_value_to_schema_value(v: &Value) -> SchemaValue {
1515 match v {
1516 Value::Null => SchemaValue::Null,
1517 Value::Bool(b) => SchemaValue::Boolean(*b),
1518 Value::Number(n) => {
1519 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
1520 SchemaValue::Integer(*n as i64)
1521 } else {
1522 SchemaValue::Float(*n)
1523 }
1524 }
1525 Value::String(s) => SchemaValue::text(s.clone()),
1526 Value::Array(items) => {
1527 if items.iter().all(|v| matches!(v, Value::Number(_))) {
1531 let floats: Vec<f32> = items
1532 .iter()
1533 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
1534 .collect();
1535 SchemaValue::Vector(floats)
1536 } else {
1537 SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default())
1538 }
1539 }
1540 Value::Object(map) => {
1541 if map.len() == 1 {
1542 if let Some(Value::String(encoded)) = map.get("$bytes") {
1543 if let Ok(bytes) = base64_decode(encoded) {
1544 return SchemaValue::Blob(bytes);
1545 }
1546 }
1547 if let Some(value) = map.get("$ts") {
1548 if let Some(ts) = json_i64(value) {
1549 return SchemaValue::Timestamp(ts);
1550 }
1551 }
1552 if let Some(Value::String(value)) = map.get("$uuid") {
1553 if let Ok(uuid) = crate::crypto::Uuid::parse_str(value) {
1554 return SchemaValue::Uuid(*uuid.as_bytes());
1555 }
1556 }
1557 if let Some(Value::String(value)) = map.get("$float") {
1558 return match value.as_str() {
1559 "NaN" => SchemaValue::Float(f64::NAN),
1560 "Infinity" | "+Infinity" | "inf" | "+inf" => {
1561 SchemaValue::Float(f64::INFINITY)
1562 }
1563 "-Infinity" | "-inf" => SchemaValue::Float(f64::NEG_INFINITY),
1564 _ => SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default()),
1565 };
1566 }
1567 }
1568 SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default())
1569 }
1570 }
1571}
1572
1573fn json_i64(value: &Value) -> Option<i64> {
1574 match value {
1575 Value::Number(n) => {
1576 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
1577 Some(*n as i64)
1578 } else {
1579 None
1580 }
1581 }
1582 Value::String(s) => s.parse::<i64>().ok(),
1583 _ => None,
1584 }
1585}
1586
1587fn format_uuid(bytes: &[u8; 16]) -> String {
1588 format!(
1589 "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
1590 bytes[0],
1591 bytes[1],
1592 bytes[2],
1593 bytes[3],
1594 bytes[4],
1595 bytes[5],
1596 bytes[6],
1597 bytes[7],
1598 bytes[8],
1599 bytes[9],
1600 bytes[10],
1601 bytes[11],
1602 bytes[12],
1603 bytes[13],
1604 bytes[14],
1605 bytes[15]
1606 )
1607}
1608
1609fn base64_encode(bytes: &[u8]) -> String {
1610 const TABLE: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1611 let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1612 let mut chunks = bytes.chunks_exact(3);
1613 for chunk in chunks.by_ref() {
1614 let n = ((chunk[0] as u32) << 16) | ((chunk[1] as u32) << 8) | chunk[2] as u32;
1615 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1616 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1617 out.push(TABLE[((n >> 6) & 0x3f) as usize] as char);
1618 out.push(TABLE[(n & 0x3f) as usize] as char);
1619 }
1620 match chunks.remainder() {
1621 [] => {}
1622 [a] => {
1623 let n = (*a as u32) << 16;
1624 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1625 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1626 out.push('=');
1627 out.push('=');
1628 }
1629 [a, b] => {
1630 let n = ((*a as u32) << 16) | ((*b as u32) << 8);
1631 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1632 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1633 out.push(TABLE[((n >> 6) & 0x3f) as usize] as char);
1634 out.push('=');
1635 }
1636 _ => unreachable!(),
1637 }
1638 out
1639}
1640
1641fn base64_decode(input: &str) -> Result<Vec<u8>, String> {
1642 let bytes = input.as_bytes();
1643 if bytes.len() % 4 != 0 {
1644 return Err("base64 length must be a multiple of 4".to_string());
1645 }
1646 let mut out = Vec::with_capacity(bytes.len() / 4 * 3);
1647 for chunk in bytes.chunks_exact(4) {
1648 let pad = chunk.iter().rev().take_while(|&&b| b == b'=').count();
1649 let a = base64_value(chunk[0])?;
1650 let b = base64_value(chunk[1])?;
1651 let c = if chunk[2] == b'=' {
1652 0
1653 } else {
1654 base64_value(chunk[2])?
1655 };
1656 let d = if chunk[3] == b'=' {
1657 0
1658 } else {
1659 base64_value(chunk[3])?
1660 };
1661 let n = ((a as u32) << 18) | ((b as u32) << 12) | ((c as u32) << 6) | d as u32;
1662 out.push(((n >> 16) & 0xff) as u8);
1663 if pad < 2 {
1664 out.push(((n >> 8) & 0xff) as u8);
1665 }
1666 if pad < 1 {
1667 out.push((n & 0xff) as u8);
1668 }
1669 }
1670 Ok(out)
1671}
1672
1673fn base64_value(byte: u8) -> Result<u8, String> {
1674 match byte {
1675 b'A'..=b'Z' => Ok(byte - b'A'),
1676 b'a'..=b'z' => Ok(byte - b'a' + 26),
1677 b'0'..=b'9' => Ok(byte - b'0' + 52),
1678 b'+' => Ok(62),
1679 b'/' => Ok(63),
1680 b'=' => Ok(0),
1681 _ => Err(format!("invalid base64 character: {}", byte as char)),
1682 }
1683}
1684
1685fn dispatch_method_remote(
1695 client: &AsyncMutex<RedDBClient>,
1696 tokio_rt: &tokio::runtime::Runtime,
1697 method: &str,
1698 params: &Value,
1699) -> Result<Value, (&'static str, String)> {
1700 match method {
1701 "version" => Ok(Value::Object(
1702 [
1703 (
1704 "version".to_string(),
1705 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1706 ),
1707 (
1708 "protocol".to_string(),
1709 Value::String(PROTOCOL_VERSION.to_string()),
1710 ),
1711 ]
1712 .into_iter()
1713 .collect(),
1714 )),
1715
1716 "health" => {
1717 let result = tokio_rt.block_on(async {
1718 let mut guard = client.lock().await;
1719 guard.health_status().await
1720 });
1721 match result {
1722 Ok(status) => Ok(Value::Object(
1723 [
1724 ("ok".to_string(), Value::Bool(status.healthy)),
1725 ("state".to_string(), Value::String(status.state)),
1726 (
1727 "checked_at_unix_ms".to_string(),
1728 Value::Number(status.checked_at_unix_ms as f64),
1729 ),
1730 (
1731 "version".to_string(),
1732 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1733 ),
1734 ]
1735 .into_iter()
1736 .collect(),
1737 )),
1738 Err(e) => Err((error_code::INTERNAL_ERROR, e.to_string())),
1739 }
1740 }
1741
1742 "query" => {
1743 let sql = params.get("sql").and_then(Value::as_str).ok_or((
1744 error_code::INVALID_PARAMS,
1745 "missing 'sql' string".to_string(),
1746 ))?;
1747 let json_str = tokio_rt
1748 .block_on(async {
1749 let mut guard = client.lock().await;
1750 guard.query(sql).await
1751 })
1752 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1753 let parsed = json::from_str::<Value>(&json_str)
1758 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1759 Ok(parsed)
1760 }
1761
1762 "insert" => {
1763 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1764 error_code::INVALID_PARAMS,
1765 "missing 'collection' string".to_string(),
1766 ))?;
1767 let payload = params.get("payload").ok_or((
1768 error_code::INVALID_PARAMS,
1769 "missing 'payload' object".to_string(),
1770 ))?;
1771 if payload.as_object().is_none() {
1772 return Err((
1773 error_code::INVALID_PARAMS,
1774 "'payload' must be a JSON object".to_string(),
1775 ));
1776 }
1777 let payload_json = payload.to_string_compact();
1778 let reply = tokio_rt
1779 .block_on(async {
1780 let mut guard = client.lock().await;
1781 guard.create_row_entity(collection, &payload_json).await
1782 })
1783 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1784 let mut out = json::Map::new();
1785 out.insert("affected".to_string(), Value::Number(1.0));
1786 out.insert("id".to_string(), Value::String(reply.id.to_string()));
1787 Ok(Value::Object(out))
1788 }
1789
1790 "bulk_insert" => {
1791 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1792 error_code::INVALID_PARAMS,
1793 "missing 'collection' string".to_string(),
1794 ))?;
1795 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
1796 error_code::INVALID_PARAMS,
1797 "missing 'payloads' array".to_string(),
1798 ))?;
1799 let mut encoded = Vec::with_capacity(payloads.len());
1800 for entry in payloads {
1801 if entry.as_object().is_none() {
1802 return Err((
1803 error_code::INVALID_PARAMS,
1804 "each payload must be a JSON object".to_string(),
1805 ));
1806 }
1807 encoded.push(entry.to_string_compact());
1808 }
1809 let status = tokio_rt
1810 .block_on(async {
1811 let mut guard = client.lock().await;
1812 guard.bulk_create_rows(collection, encoded).await
1813 })
1814 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1815 Ok(Value::Object(
1816 [
1817 ("affected".to_string(), Value::Number(status.count as f64)),
1818 (
1819 "ids".to_string(),
1820 Value::Array(
1821 status
1822 .ids
1823 .into_iter()
1824 .map(|id| Value::Number(id as f64))
1825 .collect(),
1826 ),
1827 ),
1828 ]
1829 .into_iter()
1830 .collect(),
1831 ))
1832 }
1833
1834 "get" => {
1835 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1836 error_code::INVALID_PARAMS,
1837 "missing 'collection' string".to_string(),
1838 ))?;
1839 let id = params.get("id").and_then(Value::as_str).ok_or((
1840 error_code::INVALID_PARAMS,
1841 "missing 'id' string".to_string(),
1842 ))?;
1843 let sql = format!("SELECT * FROM {collection} WHERE red_entity_id = {id} LIMIT 1");
1844 let json_str = tokio_rt
1845 .block_on(async {
1846 let mut guard = client.lock().await;
1847 guard.query(&sql).await
1848 })
1849 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1850 let parsed = json::from_str::<Value>(&json_str)
1851 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1852 let entity = parsed
1855 .get("rows")
1856 .and_then(Value::as_array)
1857 .and_then(|rows| rows.first().cloned())
1858 .unwrap_or(Value::Null);
1859 Ok(Value::Object(
1860 [("entity".to_string(), entity)].into_iter().collect(),
1861 ))
1862 }
1863
1864 "delete" => {
1865 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1866 error_code::INVALID_PARAMS,
1867 "missing 'collection' string".to_string(),
1868 ))?;
1869 let id = params.get("id").and_then(Value::as_str).ok_or((
1870 error_code::INVALID_PARAMS,
1871 "missing 'id' string".to_string(),
1872 ))?;
1873 let id = id.parse::<u64>().map_err(|_| {
1874 (
1875 error_code::INVALID_PARAMS,
1876 "id must be a numeric string".to_string(),
1877 )
1878 })?;
1879 let _reply = tokio_rt
1880 .block_on(async {
1881 let mut guard = client.lock().await;
1882 guard.delete_entity(collection, id).await
1883 })
1884 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1885 Ok(Value::Object(
1886 [("affected".to_string(), Value::Number(1.0))]
1887 .into_iter()
1888 .collect(),
1889 ))
1890 }
1891
1892 "close" => Ok(Value::Null),
1893
1894 other => Err((
1895 error_code::INVALID_REQUEST,
1896 format!("unknown method: {other}"),
1897 )),
1898 }
1899}
1900
1901#[cfg(test)]
1902mod tests {
1903 use super::*;
1904 use crate::json::json;
1905 use proptest::prelude::*;
1906
1907 fn make_runtime() -> RedDBRuntime {
1908 RedDBRuntime::in_memory().expect("in-memory runtime")
1909 }
1910
1911 fn create_graph_collection(rt: &RedDBRuntime, name: &str) {
1912 let db = rt.db();
1913 db.store()
1914 .create_collection(name)
1915 .expect("create collection");
1916 let now = std::time::SystemTime::now()
1917 .duration_since(std::time::UNIX_EPOCH)
1918 .unwrap_or_default()
1919 .as_millis();
1920 db.save_collection_contract(crate::physical::CollectionContract {
1921 name: name.to_string(),
1922 declared_model: crate::catalog::CollectionModel::Graph,
1923 schema_mode: crate::catalog::SchemaMode::Dynamic,
1924 origin: crate::physical::ContractOrigin::Explicit,
1925 version: 1,
1926 created_at_unix_ms: now,
1927 updated_at_unix_ms: now,
1928 default_ttl_ms: None,
1929 vector_dimension: None,
1930 vector_metric: None,
1931 context_index_fields: Vec::new(),
1932 declared_columns: Vec::new(),
1933 table_def: None,
1934 timestamps_enabled: false,
1935 context_index_enabled: false,
1936 metrics_raw_retention_ms: None,
1937 metrics_rollup_policies: Vec::new(),
1938 metrics_tenant_identity: None,
1939 metrics_namespace: None,
1940 append_only: false,
1941 subscriptions: Vec::new(),
1942 session_key: None,
1943 session_gap_ms: None,
1944 retention_duration_ms: None,
1945 })
1946 .expect("save graph contract");
1947 }
1948
1949 fn handle(rt: &RedDBRuntime, line: &str) -> String {
1950 let mut session = Session::new();
1951 handle_line(&Backend::Local(rt), &mut session, line)
1952 }
1953
1954 fn query_request(id: u64, sql: &str) -> String {
1955 let mut params = json::Map::new();
1956 params.insert("sql".to_string(), Value::String(sql.to_string()));
1957
1958 let mut request = json::Map::new();
1959 request.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1960 request.insert("id".to_string(), Value::Number(id as f64));
1961 request.insert("method".to_string(), Value::String("query".to_string()));
1962 request.insert("params".to_string(), Value::Object(params));
1963 Value::Object(request).to_string_compact()
1964 }
1965
1966 fn query_request_with_params(id: u64, sql: &str, binds: Vec<Value>) -> String {
1967 let mut params = json::Map::new();
1968 params.insert("sql".to_string(), Value::String(sql.to_string()));
1969 params.insert("params".to_string(), Value::Array(binds));
1970
1971 let mut request = json::Map::new();
1972 request.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1973 request.insert("id".to_string(), Value::Number(id as f64));
1974 request.insert("method".to_string(), Value::String("query".to_string()));
1975 request.insert("params".to_string(), Value::Object(params));
1976 Value::Object(request).to_string_compact()
1977 }
1978
1979 fn with_session<F>(rt: &RedDBRuntime, f: F)
1982 where
1983 F: FnOnce(&dyn Fn(&str) -> String, &RedDBRuntime),
1984 {
1985 let session = std::cell::RefCell::new(Session::new());
1986 let call = |line: &str| -> String {
1987 let mut s = session.borrow_mut();
1988 handle_line(&Backend::Local(rt), &mut s, line)
1989 };
1990 f(&call, rt);
1991 }
1992
1993 fn result_rows(response: &str) -> Vec<Value> {
1994 json::from_str::<Value>(response)
1995 .expect("json response")
1996 .get("result")
1997 .and_then(|result| result.get("rows"))
1998 .and_then(Value::as_array)
1999 .map(|rows| rows.to_vec())
2000 .unwrap_or_default()
2001 }
2002
2003 fn result_name_kind(response: &str) -> Vec<(String, String)> {
2004 result_rows(response)
2005 .into_iter()
2006 .map(|row| {
2007 let object = row.as_object().expect("row object");
2008 let name = object
2009 .get("name")
2010 .and_then(Value::as_str)
2011 .expect("row name")
2012 .to_string();
2013 let kind = object
2014 .get("kind")
2015 .and_then(Value::as_str)
2016 .expect("row kind")
2017 .to_string();
2018 (name, kind)
2019 })
2020 .collect()
2021 }
2022
2023 fn json_scalar_param() -> impl Strategy<Value = Value> {
2024 prop_oneof![
2025 Just(Value::Null),
2026 any::<bool>().prop_map(Value::Bool),
2027 (-1000_i64..1000_i64).prop_map(|n| Value::Number(n as f64)),
2028 "[a-z']{0,8}".prop_map(Value::String),
2029 ]
2030 }
2031
2032 fn sql_literal_for_json(value: &Value) -> String {
2033 match value {
2034 Value::Null => "NULL".to_string(),
2035 Value::Bool(true) => "TRUE".to_string(),
2036 Value::Bool(false) => "FALSE".to_string(),
2037 Value::Number(n) => format!("{n:.0}"),
2038 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2039 _ => panic!("unsupported scalar param: {value:?}"),
2040 }
2041 }
2042
2043 #[test]
2044 fn version_method_returns_version_and_protocol() {
2045 let rt = make_runtime();
2046 let line = r#"{"jsonrpc":"2.0","id":1,"method":"version","params":{}}"#;
2047 let resp = handle(&rt, line);
2048 assert!(resp.contains("\"id\":1"));
2049 assert!(resp.contains("\"protocol\":\"1.0\""));
2050 assert!(resp.contains("\"version\""));
2051 }
2052
2053 #[test]
2054 fn health_method_returns_ok_true() {
2055 let rt = make_runtime();
2056 let resp = handle(
2057 &rt,
2058 r#"{"jsonrpc":"2.0","id":"abc","method":"health","params":{}}"#,
2059 );
2060 assert!(resp.contains("\"ok\":true"));
2061 assert!(resp.contains("\"id\":\"abc\""));
2062 }
2063
2064 #[test]
2065 fn parse_error_for_invalid_json() {
2066 let rt = make_runtime();
2067 let resp = handle(&rt, "not json {");
2068 assert!(resp.contains("\"code\":\"PARSE_ERROR\""));
2069 assert!(resp.contains("\"id\":null"));
2070 }
2071
2072 #[test]
2073 fn invalid_request_when_method_missing() {
2074 let rt = make_runtime();
2075 let resp = handle(&rt, r#"{"jsonrpc":"2.0","id":1,"params":{}}"#);
2076 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
2077 }
2078
2079 #[test]
2080 fn unknown_method_is_invalid_request() {
2081 let rt = make_runtime();
2082 let resp = handle(
2083 &rt,
2084 r#"{"jsonrpc":"2.0","id":1,"method":"frobnicate","params":{}}"#,
2085 );
2086 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
2087 assert!(resp.contains("frobnicate"));
2088 }
2089
2090 #[test]
2091 fn invalid_params_when_query_sql_missing() {
2092 let rt = make_runtime();
2093 let resp = handle(
2094 &rt,
2095 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{}}"#,
2096 );
2097 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
2098 }
2099
2100 #[test]
2101 fn close_method_marks_response_for_shutdown() {
2102 let rt = make_runtime();
2103 let resp = handle(
2104 &rt,
2105 r#"{"jsonrpc":"2.0","id":1,"method":"close","params":{}}"#,
2106 );
2107 assert!(resp.contains("\"__close__\":true"));
2108 }
2109
2110 #[test]
2111 fn query_with_int_text_params_round_trips() {
2112 let rt = make_runtime();
2113 let _ = handle(
2114 &rt,
2115 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE p (id INTEGER, name TEXT)"}}"#,
2116 );
2117 let _ = handle(
2118 &rt,
2119 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (1, 'Alice')"}}"#,
2120 );
2121 let _ = handle(
2122 &rt,
2123 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (2, 'Bob')"}}"#,
2124 );
2125 let resp = handle(
2126 &rt,
2127 r#"{"jsonrpc":"2.0","id":4,"method":"query","params":{"sql":"SELECT * FROM p WHERE id = $1 AND name = $2","params":[1,"Alice"]}}"#,
2128 );
2129 assert!(resp.contains("\"Alice\""), "got: {resp}");
2130 assert!(!resp.contains("\"Bob\""), "got: {resp}");
2131 }
2132
2133 #[test]
2134 fn query_with_question_params_covers_select_insert_update_delete() {
2135 let rt = make_runtime();
2136 let create = handle(
2137 &rt,
2138 &query_request(1, "CREATE TABLE qp (id INTEGER, name TEXT)"),
2139 );
2140 assert!(!create.contains("\"error\""), "got: {create}");
2141
2142 let inserted = handle(
2143 &rt,
2144 &query_request_with_params(
2145 2,
2146 "INSERT INTO qp (id, name) VALUES (?, ?)",
2147 vec![json!(1), json!("O'Reilly")],
2148 ),
2149 );
2150 assert!(inserted.contains("\"affected\":1"), "got: {inserted}");
2151
2152 let selected = handle(
2153 &rt,
2154 &query_request_with_params(3, "SELECT name FROM qp WHERE id = ?", vec![json!(1)]),
2155 );
2156 let rows = result_rows(&selected);
2157 assert_eq!(rows.len(), 1, "got: {selected}");
2158 assert_eq!(
2159 rows[0].get("name").and_then(Value::as_str),
2160 Some("O'Reilly")
2161 );
2162
2163 let selected_numbered = handle(
2164 &rt,
2165 &query_request_with_params(
2166 4,
2167 "SELECT name FROM qp WHERE name = ?1 AND id = ?2",
2168 vec![json!("O'Reilly"), json!(1)],
2169 ),
2170 );
2171 assert_eq!(
2172 result_rows(&selected_numbered).len(),
2173 1,
2174 "got: {selected_numbered}"
2175 );
2176
2177 let updated = handle(
2178 &rt,
2179 &query_request_with_params(
2180 5,
2181 "UPDATE qp SET name = ? WHERE id = ?",
2182 vec![json!("Alice"), json!(1)],
2183 ),
2184 );
2185 assert!(updated.contains("\"affected\":1"), "got: {updated}");
2186
2187 let deleted = handle(
2188 &rt,
2189 &query_request_with_params(6, "DELETE FROM qp WHERE name = ?", vec![json!("Alice")]),
2190 );
2191 assert!(deleted.contains("\"affected\":1"), "got: {deleted}");
2192
2193 let remaining = handle(&rt, &query_request(7, "SELECT * FROM qp"));
2194 assert!(result_rows(&remaining).is_empty(), "got: {remaining}");
2195 }
2196
2197 #[test]
2198 fn query_with_params_insert_and_search_round_trip() {
2199 let rt = make_runtime();
2200 let insert = handle(
2201 &rt,
2202 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"INSERT INTO bun_embeddings VECTOR (dense, content) VALUES ($1, $2)","params":[[1.0,0.0],"bun vector"]}}"#,
2203 );
2204 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2205
2206 let search = handle(
2207 &rt,
2208 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SEARCH SIMILAR $1 COLLECTION bun_embeddings LIMIT 1","params":[[1.0,0.0]]}}"#,
2209 );
2210 assert!(search.contains("\"rows\""), "got: {search}");
2211 assert!(search.contains("\"score\":1"), "got: {search}");
2212 assert!(!search.contains("\"error\""), "got: {search}");
2213 }
2214
2215 #[test]
2216 fn query_with_question_vector_param_round_trips() {
2217 let rt = make_runtime();
2218 let insert = handle(
2219 &rt,
2220 &query_request_with_params(
2221 1,
2222 "INSERT INTO question_embeddings VECTOR (dense, content) VALUES (?, ?)",
2223 vec![json!([1.0, 0.0]), json!("question vector")],
2224 ),
2225 );
2226 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2227
2228 let search = handle(
2229 &rt,
2230 &query_request_with_params(
2231 2,
2232 "SEARCH SIMILAR ? COLLECTION question_embeddings LIMIT 1",
2233 vec![json!([1.0, 0.0])],
2234 ),
2235 );
2236 assert!(search.contains("\"rows\""), "got: {search}");
2237 assert!(search.contains("\"score\":1"), "got: {search}");
2238 assert!(!search.contains("\"error\""), "got: {search}");
2239 }
2240
2241 #[test]
2242 fn query_with_typed_json_rpc_params_round_trips() {
2243 let rt = make_runtime();
2244 let create = handle(
2245 &rt,
2246 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE value_params (ok BOOLEAN, score FLOAT, payload BLOB, body JSON, seen_at TIMESTAMP, ident UUID)"}}"#,
2247 );
2248 assert!(!create.contains("\"error\""), "got: {create}");
2249
2250 let insert = handle(
2251 &rt,
2252 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO value_params (ok, score, payload, body, seen_at, ident) VALUES ($1, $2, $3, $4, $5, $6)","params":[true,{"$float":"NaN"},{"$bytes":"3q2+7w=="},{"z":[1,{"a":true}],"a":null},{"$ts":"1700000000123456789"},{"$uuid":"00112233-4455-6677-8899-aabbccddeeff"}]}}"#,
2253 );
2254 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2255
2256 let selected = handle(
2257 &rt,
2258 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"SELECT * FROM value_params"}}"#,
2259 );
2260 assert!(selected.contains("\"ok\":true"), "got: {selected}");
2261 assert!(selected.contains("\"$float\":\"NaN\""), "got: {selected}");
2262 assert!(
2263 selected.contains("\"$bytes\":\"3q2+7w==\""),
2264 "got: {selected}"
2265 );
2266 assert!(
2267 selected.contains("\"body\":{\"a\":null,\"z\":[1,{\"a\":true}]}"),
2268 "got: {selected}"
2269 );
2270 assert!(
2271 selected.contains("\"$ts\":\"1700000000123456789\""),
2272 "got: {selected}"
2273 );
2274 assert!(
2275 selected.contains("\"$uuid\":\"00112233-4455-6677-8899-aabbccddeeff\""),
2276 "got: {selected}"
2277 );
2278 }
2279
2280 #[test]
2281 fn select_timeseries_tags_decodes_json_payload() {
2282 let rt = make_runtime();
2283 let create = handle(&rt, &query_request(1, "CREATE TIMESERIES ts1"));
2284 assert!(!create.contains("\"error\""), "got: {create}");
2285
2286 let insert = handle(
2287 &rt,
2288 &query_request(
2289 2,
2290 r#"INSERT INTO ts1 (metric, value, tags, timestamp) VALUES ('cpu', 85, '{"host":"a"}', 1000)"#,
2291 ),
2292 );
2293 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2294
2295 let selected = handle(&rt, &query_request(3, "SELECT tags FROM ts1"));
2296 assert!(!selected.contains("<json"), "got: {selected}");
2297 let response = json::from_str::<Value>(&selected).expect("response json");
2298 let tags = response
2299 .get("result")
2300 .and_then(|result| result.get("rows"))
2301 .and_then(Value::as_array)
2302 .and_then(|rows| rows.first())
2303 .and_then(|row| row.get("tags"))
2304 .expect("tags field");
2305 assert_eq!(tags, &json!({"host": "a"}));
2306 }
2307
2308 #[test]
2309 fn select_table_json_column_round_trips_after_single_parse() {
2310 let rt = make_runtime();
2311 let create = handle(&rt, &query_request(1, "CREATE TABLE docs (payload JSON)"));
2312 assert!(!create.contains("\"error\""), "got: {create}");
2313
2314 let original = r#"{"nested":{"items":[1,true,"x"],"object":{"k":"v"}}}"#;
2315 let insert_sql = format!("INSERT INTO docs (payload) VALUES ({original})");
2316 let insert = handle(&rt, &query_request(2, &insert_sql));
2317 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2318
2319 let selected = handle(&rt, &query_request(3, "SELECT payload FROM docs"));
2320 assert!(!selected.contains("<json"), "got: {selected}");
2321 let response = json::from_str::<Value>(&selected).expect("response json");
2322 let payload = response
2323 .get("result")
2324 .and_then(|result| result.get("rows"))
2325 .and_then(Value::as_array)
2326 .and_then(|rows| rows.first())
2327 .and_then(|row| row.get("payload"))
2328 .expect("payload field");
2329 let expected = json::from_str::<Value>(original).expect("expected json");
2330 assert_eq!(payload, &expected);
2331
2332 let payload_text = payload.to_string_compact();
2333 assert_eq!(
2334 json::from_str::<Value>(&payload_text).expect("single parse"),
2335 expected
2336 );
2337 }
2338
2339 #[test]
2340 fn select_json_corruption_falls_back_to_code_and_hex() {
2341 use crate::storage::query::unified::UnifiedResult;
2342
2343 let mut result = UnifiedResult::with_columns(vec!["payload".into()]);
2344 let mut record = UnifiedRecord::new();
2345 record.set("payload", SchemaValue::Json(b"{not json".to_vec()));
2346 result.push(record);
2347
2348 let json = query_result_to_json(&RuntimeQueryResult {
2349 query: "SELECT payload FROM docs".to_string(),
2350 mode: crate::storage::query::modes::QueryMode::Sql,
2351 statement: "select",
2352 engine: "runtime-table",
2353 result,
2354 affected_rows: 0,
2355 statement_type: "select",
2356 });
2357
2358 let payload = json
2359 .get("rows")
2360 .and_then(Value::as_array)
2361 .and_then(|rows| rows.first())
2362 .and_then(|row| row.get("payload"))
2363 .expect("payload field");
2364 assert_eq!(
2365 payload.get("code").and_then(Value::as_str),
2366 Some("INVALID_JSON")
2367 );
2368 assert_eq!(
2369 payload.get("hex").and_then(Value::as_str),
2370 Some("7b6e6f74206a736f6e")
2371 );
2372 }
2373
2374 #[test]
2375 fn json_value_to_schema_value_decodes_typed_envelopes() {
2376 let SchemaValue::Blob(bytes) = json_value_to_schema_value(&json!({ "$bytes": "AAECAw==" }))
2377 else {
2378 panic!("expected blob");
2379 };
2380 assert_eq!(bytes, vec![0, 1, 2, 3]);
2381
2382 assert_eq!(
2383 json_value_to_schema_value(&json!({ "$ts": "9223372036854775807" })),
2384 SchemaValue::Timestamp(i64::MAX)
2385 );
2386
2387 let SchemaValue::Uuid(bytes) = json_value_to_schema_value(&json!({
2388 "$uuid": "00112233-4455-6677-8899-aabbccddeeff"
2389 })) else {
2390 panic!("expected uuid");
2391 };
2392 assert_eq!(
2393 bytes,
2394 [
2395 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd,
2396 0xee, 0xff
2397 ]
2398 );
2399
2400 let SchemaValue::Float(value) =
2401 json_value_to_schema_value(&json!({ "$float": "-Infinity" }))
2402 else {
2403 panic!("expected float");
2404 };
2405 assert!(value.is_infinite() && value.is_sign_negative());
2406 }
2407
2408 #[test]
2409 fn query_with_params_arity_mismatch_rejected() {
2410 let rt = make_runtime();
2411 let _ = handle(
2412 &rt,
2413 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pa (id INTEGER)"}}"#,
2414 );
2415 let resp = handle(
2416 &rt,
2417 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pa WHERE id = $1","params":[1,2]}}"#,
2418 );
2419 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2420 }
2421
2422 #[test]
2423 fn query_with_question_params_arity_mismatch_rejected() {
2424 let rt = make_runtime();
2425 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpa (id INTEGER)"));
2426 let resp = handle(
2427 &rt,
2428 &query_request_with_params(
2429 2,
2430 "SELECT * FROM qpa WHERE id = ?",
2431 vec![json!(1), json!(2)],
2432 ),
2433 );
2434 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2435 assert!(resp.contains("SQL expects 1, got 2"), "got: {resp}");
2436 }
2437
2438 #[test]
2439 fn query_with_params_gap_rejected() {
2440 let rt = make_runtime();
2441 let _ = handle(
2442 &rt,
2443 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pg (a INTEGER, b INTEGER)"}}"#,
2444 );
2445 let resp = handle(
2446 &rt,
2447 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pg WHERE a = $1 AND b = $3","params":[1,2,3]}}"#,
2448 );
2449 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2450 }
2451
2452 #[test]
2453 fn query_with_question_numbered_gap_rejected() {
2454 let rt = make_runtime();
2455 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpg (id INTEGER)"));
2456 let resp = handle(
2457 &rt,
2458 &query_request_with_params(
2459 2,
2460 "SELECT * FROM qpg WHERE id = ?2",
2461 vec![json!(1), json!(2)],
2462 ),
2463 );
2464 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2465 assert!(resp.contains("parameter $`1` is missing"), "got: {resp}");
2466 }
2467
2468 #[test]
2469 fn query_with_question_params_type_mismatch_names_slot() {
2470 let rt = make_runtime();
2471 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpt (id INTEGER)"));
2472 let resp = handle(
2473 &rt,
2474 &query_request_with_params(
2475 2,
2476 "INSERT INTO qpt (id) VALUES (?)",
2477 vec![json!("not-an-integer")],
2478 ),
2479 );
2480 assert!(resp.contains("\"QUERY_ERROR\""), "got: {resp}");
2481 assert!(resp.contains("id"), "got: {resp}");
2482 assert!(resp.contains("integer"), "got: {resp}");
2483 }
2484
2485 #[test]
2486 fn query_select_one_returns_rows() {
2487 let rt = make_runtime();
2488 let resp = handle(
2489 &rt,
2490 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"SELECT 1 AS one"}}"#,
2491 );
2492 assert!(resp.contains("\"result\""));
2493 assert!(!resp.contains("\"error\""));
2494 }
2495
2496 #[test]
2497 fn ask_query_result_uses_canonical_envelope() {
2498 use crate::storage::query::unified::UnifiedResult;
2499
2500 let mut result = UnifiedResult::with_columns(vec![
2501 "answer".into(),
2502 "provider".into(),
2503 "model".into(),
2504 "prompt_tokens".into(),
2505 "completion_tokens".into(),
2506 "sources_count".into(),
2507 "sources_flat".into(),
2508 "citations".into(),
2509 "validation".into(),
2510 ]);
2511 let mut record = UnifiedRecord::new();
2512 record.set("answer", SchemaValue::text("Deploy failed [^1]."));
2513 record.set("provider", SchemaValue::text("openai"));
2514 record.set("model", SchemaValue::text("gpt-4o-mini"));
2515 record.set("prompt_tokens", SchemaValue::Integer(11));
2516 record.set("completion_tokens", SchemaValue::Integer(7));
2517 record.set(
2518 "sources_flat",
2519 SchemaValue::Json(
2520 br#"[{"urn":"urn:reddb:row:deployments:1","kind":"row","collection":"deployments","id":"1"}]"#.to_vec(),
2521 ),
2522 );
2523 record.set(
2524 "citations",
2525 SchemaValue::Json(br#"[{"marker":1,"urn":"urn:reddb:row:deployments:1"}]"#.to_vec()),
2526 );
2527 record.set(
2528 "validation",
2529 SchemaValue::Json(br#"{"ok":true,"warnings":[],"errors":[]}"#.to_vec()),
2530 );
2531 result.push(record);
2532
2533 let json = query_result_to_json(&RuntimeQueryResult {
2534 query: "ASK 'why did deploy fail?'".to_string(),
2535 mode: crate::storage::query::modes::QueryMode::Sql,
2536 statement: "ask",
2537 engine: "runtime-ai",
2538 result,
2539 affected_rows: 0,
2540 statement_type: "select",
2541 });
2542
2543 assert_eq!(
2544 json.get("answer").and_then(Value::as_str),
2545 Some("Deploy failed [^1].")
2546 );
2547 assert_eq!(json.get("cache_hit").and_then(Value::as_bool), Some(false));
2548 assert_eq!(json.get("cost_usd").and_then(Value::as_f64), Some(0.0));
2549 assert_eq!(json.get("mode").and_then(Value::as_str), Some("strict"));
2550 assert_eq!(json.get("retry_count").and_then(Value::as_u64), Some(0));
2551 assert!(
2552 json.get("rows").is_none(),
2553 "ASK envelope must not be row-wrapped: {json}"
2554 );
2555 assert!(
2556 json.get("sources_flat")
2557 .and_then(Value::as_array)
2558 .is_some_and(|sources| sources.len() == 1
2559 && sources[0].get("payload").and_then(Value::as_str).is_some()),
2560 "sources_flat must be a parsed array: {json}"
2561 );
2562 assert!(
2563 json.get("citations")
2564 .and_then(Value::as_array)
2565 .is_some_and(|citations| citations.len() == 1),
2566 "citations must be a parsed array: {json}"
2567 );
2568 assert_eq!(
2569 json.get("validation")
2570 .and_then(|v| v.get("ok"))
2571 .and_then(Value::as_bool),
2572 Some(true)
2573 );
2574 }
2575
2576 #[test]
2581 fn tx_begin_returns_tx_id_and_isolation() {
2582 let rt = make_runtime();
2583 with_session(&rt, |call, _| {
2584 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2585 assert!(resp.contains("\"tx_id\":1"));
2586 assert!(resp.contains("\"isolation\":\"read_committed_deferred\""));
2587 assert!(!resp.contains("\"error\""));
2588 });
2589 }
2590
2591 #[test]
2592 fn tx_begin_twice_returns_already_open() {
2593 let rt = make_runtime();
2594 with_session(&rt, |call, _| {
2595 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2596 let resp = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
2597 assert!(resp.contains("\"code\":\"TX_ALREADY_OPEN\""));
2598 });
2599 }
2600
2601 #[test]
2602 fn tx_commit_without_begin_returns_no_tx_open() {
2603 let rt = make_runtime();
2604 with_session(&rt, |call, _| {
2605 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.commit","params":null}"#);
2606 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
2607 });
2608 }
2609
2610 #[test]
2611 fn tx_rollback_without_begin_returns_no_tx_open() {
2612 let rt = make_runtime();
2613 with_session(&rt, |call, _| {
2614 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.rollback","params":null}"#);
2615 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
2616 });
2617 }
2618
2619 #[test]
2620 fn insert_inside_tx_returns_pending_envelope() {
2621 let rt = make_runtime();
2622 let _ = handle(
2624 &rt,
2625 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE users (name TEXT)"}}"#,
2626 );
2627 with_session(&rt, |call, _| {
2628 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2629 let resp = call(
2630 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"users","payload":{"name":"alice"}}}"#,
2631 );
2632 assert!(resp.contains("\"pending\":true"));
2633 assert!(resp.contains("\"tx_id\":1"));
2634 assert!(resp.contains("\"affected\":0"));
2635 });
2636 }
2637
2638 #[test]
2639 fn begin_insert_rollback_does_not_persist() {
2640 let rt = make_runtime();
2641 let _ = handle(
2642 &rt,
2643 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u (name TEXT)"}}"#,
2644 );
2645 with_session(&rt, |call, _| {
2646 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2647 let _ = call(
2648 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u","payload":{"name":"ghost"}}}"#,
2649 );
2650 let rollback = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
2651 assert!(rollback.contains("\"ops_discarded\":1"));
2652 assert!(rollback.contains("\"tx_id\":1"));
2653 });
2654 let resp = handle(
2656 &rt,
2657 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u"}}"#,
2658 );
2659 assert!(!resp.contains("\"ghost\""));
2660 }
2661
2662 #[test]
2663 fn begin_insert_commit_persists() {
2664 let rt = make_runtime();
2665 let _ = handle(
2666 &rt,
2667 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u2 (name TEXT)"}}"#,
2668 );
2669 with_session(&rt, |call, _| {
2670 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2671 let _ = call(
2672 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u2","payload":{"name":"alice"}}}"#,
2673 );
2674 let _ = call(
2675 r#"{"jsonrpc":"2.0","id":3,"method":"insert","params":{"collection":"u2","payload":{"name":"bob"}}}"#,
2676 );
2677 let commit = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.commit","params":null}"#);
2678 assert!(commit.contains("\"ops_replayed\":2"));
2679 assert!(!commit.contains("\"error\""));
2680 });
2681 let resp = handle(
2682 &rt,
2683 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u2"}}"#,
2684 );
2685 assert!(resp.contains("\"alice\""));
2686 assert!(resp.contains("\"bob\""));
2687 }
2688
2689 #[test]
2690 fn bulk_insert_inside_tx_buffers_everything() {
2691 let rt = make_runtime();
2692 let _ = handle(
2693 &rt,
2694 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u3 (name TEXT)"}}"#,
2695 );
2696 with_session(&rt, |call, _| {
2697 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2698 let resp = call(
2699 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"u3","payloads":[{"name":"a"},{"name":"b"},{"name":"c"}]}}"#,
2700 );
2701 assert!(resp.contains("\"buffered\":3"));
2702 assert!(resp.contains("\"pending\":true"));
2703 assert!(resp.contains("\"affected\":0"));
2704
2705 let commit = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
2706 assert!(commit.contains("\"ops_replayed\":3"));
2707 });
2708 }
2709
2710 #[test]
2711 fn bulk_insert_chunks_at_internal_500_row_limit() {
2712 assert_eq!(bulk_insert_chunk_count(0), 0);
2713 assert_eq!(bulk_insert_chunk_count(1), 1);
2714 assert_eq!(bulk_insert_chunk_count(500), 1);
2715 assert_eq!(bulk_insert_chunk_count(501), 2);
2716 assert_eq!(bulk_insert_chunk_count(1000), 2);
2717 assert_eq!(bulk_insert_chunk_count(1001), 3);
2718 }
2719
2720 proptest! {
2721 #![proptest_config(ProptestConfig {
2722 cases: 12,
2723 ..ProptestConfig::default()
2724 })]
2725
2726 #[test]
2727 fn bulk_insert_matches_sequential_insert_state(
2728 names in proptest::collection::vec("[a-z]{1,8}", 1usize..20)
2729 ) {
2730 let rt = make_runtime();
2731 let payloads = names
2732 .iter()
2733 .map(|name| format!(r#"{{"name":"{name}","kind":"bulk"}}"#))
2734 .collect::<Vec<_>>();
2735 let payload_array = payloads.join(",");
2736
2737 let bulk = handle(
2738 &rt,
2739 &format!(
2740 r#"{{"jsonrpc":"2.0","id":1,"method":"bulk_insert","params":{{"collection":"bulk_prop","payloads":[{payload_array}]}}}}"#
2741 ),
2742 );
2743 let bulk_result = json::from_str::<Value>(&bulk).expect("bulk json");
2744 let bulk_ids = bulk_result
2745 .get("result")
2746 .and_then(|result| result.get("ids"))
2747 .and_then(Value::as_array)
2748 .expect("bulk ids");
2749 prop_assert_eq!(bulk_ids.len(), names.len());
2750
2751 for (index, payload) in payloads.iter().enumerate() {
2752 let insert = handle(
2753 &rt,
2754 &format!(
2755 r#"{{"jsonrpc":"2.0","id":{},"method":"insert","params":{{"collection":"seq_prop","payload":{payload}}}}}"#,
2756 index + 10
2757 ),
2758 );
2759 let insert_result = json::from_str::<Value>(&insert).expect("insert json");
2760 prop_assert!(
2761 insert_result
2762 .get("result")
2763 .and_then(|result| result.get("id"))
2764 .is_some(),
2765 "insert response missing id: {insert}"
2766 );
2767 }
2768
2769 let bulk_rows = result_name_kind(&handle(
2770 &rt,
2771 r#"{"jsonrpc":"2.0","id":99,"method":"query","params":{"sql":"SELECT name, kind FROM bulk_prop ORDER BY red_entity_id"}}"#,
2772 ));
2773 let seq_rows = result_name_kind(&handle(
2774 &rt,
2775 r#"{"jsonrpc":"2.0","id":100,"method":"query","params":{"sql":"SELECT name, kind FROM seq_prop ORDER BY red_entity_id"}}"#,
2776 ));
2777 prop_assert_eq!(bulk_rows, seq_rows);
2778 }
2779
2780 #[test]
2781 fn question_param_select_matches_inlined_literal(value in json_scalar_param()) {
2782 let rt = make_runtime();
2783 let bound = handle(
2784 &rt,
2785 &query_request_with_params(1, "SELECT ? AS v", vec![value.clone()]),
2786 );
2787 let inline_sql = format!("SELECT {} AS v", sql_literal_for_json(&value));
2788 let inlined = handle(&rt, &query_request(2, &inline_sql));
2789 prop_assert_eq!(
2790 result_rows(&bound),
2791 result_rows(&inlined),
2792 "bound={}, inlined={}",
2793 bound,
2794 inlined
2795 );
2796 }
2797 }
2798
2799 #[test]
2800 fn bulk_insert_graph_nodes_accepts_flat_rows_and_returns_ids() {
2801 let rt = make_runtime();
2802 create_graph_collection(&rt, "social");
2803
2804 let resp = handle(
2805 &rt,
2806 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"social","payloads":[{"label":"User","name":"alice"},{"label":"User","name":"bob"}]}}"#,
2807 );
2808 let envelope: Value = json::from_str(&resp).expect("json response");
2809 let result = envelope.get("result").expect("result");
2810 assert_eq!(result.get("affected").and_then(Value::as_u64), Some(2));
2811 assert_eq!(
2812 result
2813 .get("ids")
2814 .and_then(Value::as_array)
2815 .map(|ids| ids.len()),
2816 Some(2)
2817 );
2818
2819 let query = handle(
2820 &rt,
2821 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"MATCH (n:User) RETURN n.name"}}"#,
2822 );
2823 assert!(query.contains("\"alice\""), "got: {query}");
2824 assert!(query.contains("\"bob\""), "got: {query}");
2825 }
2826
2827 #[test]
2828 fn bulk_insert_graph_edges_accepts_flat_rows_and_returns_ids() {
2829 let rt = make_runtime();
2830 create_graph_collection(&rt, "network");
2831 let nodes = handle(
2832 &rt,
2833 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"network","payloads":[{"label":"Host","name":"app"},{"label":"Host","name":"db"}]}}"#,
2834 );
2835 let envelope: Value = json::from_str(&nodes).expect("node response");
2836 let ids = envelope
2837 .get("result")
2838 .and_then(|r| r.get("ids"))
2839 .and_then(Value::as_array)
2840 .expect("node ids");
2841 let from = ids[0].as_u64().expect("from id");
2842 let to = ids[1].as_u64().expect("to id");
2843
2844 let resp = handle(
2845 &rt,
2846 &format!(
2847 r#"{{"jsonrpc":"2.0","id":3,"method":"bulk_insert","params":{{"collection":"network","payloads":[{{"label":"connects","from":{from},"to":{to},"weight":0.5,"role":"primary"}}]}}}}"#
2848 ),
2849 );
2850 let envelope: Value = json::from_str(&resp).expect("edge response");
2851 let result = envelope.get("result").expect("result");
2852 assert_eq!(result.get("affected").and_then(Value::as_u64), Some(1));
2853 assert_eq!(
2854 result
2855 .get("ids")
2856 .and_then(Value::as_array)
2857 .map(|ids| ids.len()),
2858 Some(1)
2859 );
2860 }
2861
2862 #[test]
2863 fn delete_inside_tx_is_buffered() {
2864 let rt = make_runtime();
2865 let _ = handle(
2867 &rt,
2868 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u4 (name TEXT)"}}"#,
2869 );
2870 let _ = handle(
2871 &rt,
2872 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO u4 (name) VALUES ('keep')"}}"#,
2873 );
2874 with_session(&rt, |call, _| {
2875 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2876 let resp = call(
2877 r#"{"jsonrpc":"2.0","id":2,"method":"delete","params":{"collection":"u4","id":"1"}}"#,
2878 );
2879 assert!(resp.contains("\"pending\":true"));
2880 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
2881 });
2882 let resp = handle(
2884 &rt,
2885 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u4"}}"#,
2886 );
2887 assert!(resp.contains("\"keep\""));
2888 }
2889
2890 #[test]
2891 fn close_with_open_tx_auto_rollbacks() {
2892 let rt = make_runtime();
2893 let _ = handle(
2894 &rt,
2895 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u5 (name TEXT)"}}"#,
2896 );
2897 with_session(&rt, |call, _| {
2898 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2899 let _ = call(
2900 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u5","payload":{"name":"ghost"}}}"#,
2901 );
2902 let close = call(r#"{"jsonrpc":"2.0","id":3,"method":"close","params":null}"#);
2903 assert!(close.contains("\"__close__\":true"));
2904 assert!(!close.contains("\"error\""));
2905 });
2906 let resp = handle(
2907 &rt,
2908 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u5"}}"#,
2909 );
2910 assert!(!resp.contains("\"ghost\""));
2911 }
2912
2913 fn seed_numbers_table(rt: &RedDBRuntime, table: &str, count: u32) {
2918 let _ = handle(
2919 rt,
2920 &format!(
2921 r#"{{"jsonrpc":"2.0","id":1,"method":"query","params":{{"sql":"CREATE TABLE {table} (n INTEGER)"}}}}"#,
2922 ),
2923 );
2924 for i in 0..count {
2925 let _ = handle(
2926 rt,
2927 &format!(
2928 r#"{{"jsonrpc":"2.0","id":2,"method":"query","params":{{"sql":"INSERT INTO {table} (n) VALUES ({i})"}}}}"#,
2929 ),
2930 );
2931 }
2932 }
2933
2934 #[test]
2935 fn cursor_open_returns_id_columns_and_total() {
2936 let rt = make_runtime();
2937 seed_numbers_table(&rt, "nums1", 3);
2938 with_session(&rt, |call, _| {
2939 let resp = call(
2940 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums1"}}"#,
2941 );
2942 assert!(resp.contains("\"cursor_id\":1"));
2943 assert!(resp.contains("\"total_rows\":3"));
2944 assert!(resp.contains("\"columns\""));
2945 assert!(!resp.contains("\"error\""));
2946 });
2947 }
2948
2949 #[test]
2950 fn cursor_next_chunks_rows_and_signals_done() {
2951 let rt = make_runtime();
2952 seed_numbers_table(&rt, "nums2", 5);
2953 with_session(&rt, |call, _| {
2954 let _ = call(
2955 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums2"}}"#,
2956 );
2957 let first = call(
2958 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2959 );
2960 assert!(first.contains("\"done\":false"));
2961 assert!(first.contains("\"remaining\":3"));
2962
2963 let second = call(
2964 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2965 );
2966 assert!(second.contains("\"done\":false"));
2967 assert!(second.contains("\"remaining\":1"));
2968
2969 let third = call(
2970 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2971 );
2972 assert!(third.contains("\"done\":true"));
2973 assert!(third.contains("\"remaining\":0"));
2974 });
2975 }
2976
2977 #[test]
2978 fn cursor_auto_drops_when_exhausted() {
2979 let rt = make_runtime();
2980 seed_numbers_table(&rt, "nums3", 2);
2981 with_session(&rt, |call, _| {
2982 let _ = call(
2983 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums3"}}"#,
2984 );
2985 let _ = call(
2986 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
2987 );
2988 let resp = call(
2991 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
2992 );
2993 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
2994 });
2995 }
2996
2997 #[test]
2998 fn cursor_close_removes_it() {
2999 let rt = make_runtime();
3000 seed_numbers_table(&rt, "nums4", 3);
3001 with_session(&rt, |call, _| {
3002 let _ = call(
3003 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums4"}}"#,
3004 );
3005 let close =
3006 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.close","params":{"cursor_id":1}}"#);
3007 assert!(close.contains("\"closed\":true"));
3008 let after = call(
3009 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3010 );
3011 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3012 });
3013 }
3014
3015 #[test]
3016 fn cursor_close_unknown_errors() {
3017 let rt = make_runtime();
3018 with_session(&rt, |call, _| {
3019 let resp = call(
3020 r#"{"jsonrpc":"2.0","id":1,"method":"query.close","params":{"cursor_id":9999}}"#,
3021 );
3022 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3023 });
3024 }
3025
3026 #[test]
3027 fn cursor_next_without_cursor_id_errors() {
3028 let rt = make_runtime();
3029 with_session(&rt, |call, _| {
3030 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"query.next","params":{}}"#);
3031 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
3032 });
3033 }
3034
3035 #[test]
3036 fn cursor_default_batch_size_returns_all_when_smaller_than_default() {
3037 let rt = make_runtime();
3038 seed_numbers_table(&rt, "nums5", 7);
3039 with_session(&rt, |call, _| {
3040 let _ = call(
3041 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums5"}}"#,
3042 );
3043 let resp =
3045 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1}}"#);
3046 assert!(resp.contains("\"done\":true"));
3047 assert!(resp.contains("\"remaining\":0"));
3048 });
3049 }
3050
3051 #[test]
3052 fn close_method_drops_open_cursors() {
3053 let rt = make_runtime();
3054 seed_numbers_table(&rt, "nums6", 3);
3055 with_session(&rt, |call, _| {
3058 let _ = call(
3059 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums6"}}"#,
3060 );
3061 let close = call(r#"{"jsonrpc":"2.0","id":2,"method":"close","params":null}"#);
3062 assert!(close.contains("\"__close__\":true"));
3063 let after = call(
3065 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3066 );
3067 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3068 });
3069 }
3070
3071 #[test]
3072 fn cursor_independent_of_transaction_state() {
3073 let rt = make_runtime();
3074 seed_numbers_table(&rt, "nums7", 4);
3075 with_session(&rt, |call, _| {
3076 let _ = call(
3078 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums7"}}"#,
3079 );
3080 let _ = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
3081 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
3082 let resp = call(
3083 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3084 );
3085 assert!(resp.contains("\"done\":true"));
3086 assert!(!resp.contains("\"error\""));
3087 });
3088 }
3089
3090 #[test]
3091 fn second_tx_after_commit_gets_fresh_id() {
3092 let rt = make_runtime();
3093 let _ = handle(
3094 &rt,
3095 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u6 (name TEXT)"}}"#,
3096 );
3097 with_session(&rt, |call, _| {
3098 let first = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
3099 assert!(first.contains("\"tx_id\":1"));
3100 let _ = call(
3101 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u6","payload":{"name":"x"}}}"#,
3102 );
3103 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
3104
3105 let second = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.begin","params":null}"#);
3106 assert!(second.contains("\"tx_id\":2"));
3107 let _ = call(r#"{"jsonrpc":"2.0","id":5,"method":"tx.rollback","params":null}"#);
3108 });
3109 }
3110
3111 #[test]
3112 fn prepare_and_execute_prepared_statement() {
3113 let rt = make_runtime();
3114 let _ = handle(
3116 &rt,
3117 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE ps_test (n INTEGER)"}}"#,
3118 );
3119 let _ = handle(
3120 &rt,
3121 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO ps_test (n) VALUES (42)"}}"#,
3122 );
3123
3124 with_session(&rt, |call, _| {
3125 let prep = call(
3127 r#"{"jsonrpc":"2.0","id":3,"method":"prepare","params":{"sql":"SELECT n FROM ps_test WHERE n = 42"}}"#,
3128 );
3129 assert!(prep.contains("\"prepared_id\""), "prepare response: {prep}");
3130
3131 let id: u64 = {
3133 let v: crate::json::Value = crate::json::from_str(&prep).expect("json");
3134 let result = v.get("result").expect("result");
3135 result
3136 .get("prepared_id")
3137 .and_then(|n| n.as_f64())
3138 .expect("prepared_id") as u64
3139 };
3140
3141 let exec = call(&format!(
3143 r#"{{"jsonrpc":"2.0","id":4,"method":"execute_prepared","params":{{"prepared_id":{id},"binds":[42]}}}}"#
3144 ));
3145 assert!(
3147 exec.contains("\"rows\""),
3148 "execute_prepared response: {exec}"
3149 );
3150 assert!(exec.contains("42"), "expected row with n=42 in: {exec}");
3151 });
3152 }
3153}