1use std::io::{BufRead, BufReader, Stdin, Write};
19use std::panic::AssertUnwindSafe;
20
21use tokio::sync::Mutex as AsyncMutex;
22
23use crate::application::entity::{CreateRowInput, CreateRowsBatchInput};
24use crate::application::ports::RuntimeEntityPort;
25use crate::json::{self as json, Value};
26use crate::runtime::{RedDBRuntime, RuntimeQueryResult};
27use crate::storage::query::unified::UnifiedRecord;
28use crate::storage::schema::Value as SchemaValue;
29use reddb_client_connector::RedDBClient;
30
31enum Backend<'a> {
44 Local(&'a RedDBRuntime),
45 Remote(Box<RemoteBackend<'a>>),
46}
47
48struct RemoteBackend<'a> {
49 client: AsyncMutex<RedDBClient>,
50 tokio_rt: &'a tokio::runtime::Runtime,
51}
52
53pub const PROTOCOL_VERSION: &str = "1.0";
55const STDIO_BULK_INSERT_CHUNK_ROWS: usize = 500;
56
57pub mod error_code {
59 pub const PARSE_ERROR: &str = "PARSE_ERROR";
60 pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
61 pub const INVALID_PARAMS: &str = "INVALID_PARAMS";
62 pub const QUERY_ERROR: &str = "QUERY_ERROR";
63 pub const NOT_FOUND: &str = "NOT_FOUND";
64 pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
65 pub const TX_ALREADY_OPEN: &str = "TX_ALREADY_OPEN";
68 pub const NO_TX_OPEN: &str = "NO_TX_OPEN";
71 pub const TX_REPLAY_FAILED: &str = "TX_REPLAY_FAILED";
75 pub const TX_NOT_SUPPORTED_REMOTE: &str = "TX_NOT_SUPPORTED_REMOTE";
77 pub const CURSOR_NOT_FOUND: &str = "CURSOR_NOT_FOUND";
81 pub const CURSOR_LIMIT_EXCEEDED: &str = "CURSOR_LIMIT_EXCEEDED";
83}
84
85pub(crate) const MAX_CURSORS_PER_SESSION: usize = 64;
89pub(crate) const DEFAULT_CURSOR_BATCH_SIZE: usize = 100;
93pub(crate) const MAX_CURSOR_BATCH_SIZE: usize = 10_000;
96
97struct StdioPreparedStatement {
120 shape: crate::storage::query::ast::QueryExpr,
121 parameter_count: usize,
122}
123
124pub(crate) struct Session {
125 next_tx_id: u64,
126 current_tx: Option<OpenTx>,
127 next_cursor_id: u64,
128 cursors: std::collections::HashMap<u64, Cursor>,
129 next_prepared_id: u64,
131 prepared: std::collections::HashMap<u64, StdioPreparedStatement>,
133}
134
135impl Session {
136 pub(crate) fn new() -> Self {
137 Self {
138 next_tx_id: 1,
139 current_tx: None,
140 next_cursor_id: 1,
141 cursors: std::collections::HashMap::new(),
142 next_prepared_id: 1,
143 prepared: std::collections::HashMap::new(),
144 }
145 }
146
147 fn open_tx(&mut self) -> Result<u64, (&'static str, String)> {
148 if let Some(tx) = &self.current_tx {
149 return Err((
150 error_code::TX_ALREADY_OPEN,
151 format!("transaction {} already open in this session", tx.tx_id),
152 ));
153 }
154 let tx_id = self.next_tx_id;
155 self.next_tx_id = self.next_tx_id.saturating_add(1);
156 self.current_tx = Some(OpenTx {
157 tx_id,
158 write_set: Vec::new(),
159 });
160 Ok(tx_id)
161 }
162
163 fn take_tx(&mut self) -> Option<OpenTx> {
164 self.current_tx.take()
165 }
166
167 fn current_tx_mut(&mut self) -> Option<&mut OpenTx> {
168 self.current_tx.as_mut()
169 }
170
171 #[allow(dead_code)]
172 fn has_tx(&self) -> bool {
173 self.current_tx.is_some()
174 }
175
176 fn insert_cursor(&mut self, cursor: Cursor) -> Result<u64, (&'static str, String)> {
179 if self.cursors.len() >= MAX_CURSORS_PER_SESSION {
180 return Err((
181 error_code::CURSOR_LIMIT_EXCEEDED,
182 format!(
183 "session already holds {} cursors (max {}) — close some before opening new ones",
184 self.cursors.len(),
185 MAX_CURSORS_PER_SESSION
186 ),
187 ));
188 }
189 let id = self.next_cursor_id;
190 self.next_cursor_id = self.next_cursor_id.saturating_add(1);
191 let mut cursor = cursor;
192 cursor.cursor_id = id;
193 self.cursors.insert(id, cursor);
194 Ok(id)
195 }
196
197 fn cursor_mut(&mut self, id: u64) -> Option<&mut Cursor> {
198 self.cursors.get_mut(&id)
199 }
200
201 fn drop_cursor(&mut self, id: u64) -> Option<Cursor> {
202 self.cursors.remove(&id)
203 }
204
205 fn clear_cursors(&mut self) {
206 self.cursors.clear();
207 }
208}
209
210impl Default for Session {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216struct OpenTx {
218 tx_id: u64,
219 write_set: Vec<PendingSql>,
220}
221
222enum PendingSql {
226 Insert(String),
227 Delete(String),
228 #[allow(dead_code)] Update(String),
230}
231
232impl PendingSql {
233 fn sql(&self) -> &str {
234 match self {
235 PendingSql::Insert(s) | PendingSql::Delete(s) | PendingSql::Update(s) => s,
236 }
237 }
238}
239
240pub(crate) struct Cursor {
252 cursor_id: u64,
253 columns: Vec<String>,
254 rows: Vec<UnifiedRecord>,
255 position: usize,
256}
257
258impl Cursor {
259 fn new(columns: Vec<String>, rows: Vec<UnifiedRecord>) -> Self {
260 Self {
261 cursor_id: 0, columns,
263 rows,
264 position: 0,
265 }
266 }
267
268 fn total(&self) -> usize {
269 self.rows.len()
270 }
271
272 fn remaining(&self) -> usize {
273 self.rows.len().saturating_sub(self.position)
274 }
275
276 fn is_exhausted(&self) -> bool {
277 self.position >= self.rows.len()
278 }
279
280 fn take_batch(&mut self, batch_size: usize) -> &[UnifiedRecord] {
283 let end = (self.position + batch_size).min(self.rows.len());
284 let slice = &self.rows[self.position..end];
285 self.position = end;
286 slice
287 }
288}
289
290pub fn run(runtime: &RedDBRuntime) -> i32 {
296 run_with_io(runtime, std::io::stdin(), &mut std::io::stdout())
297}
298
299pub fn run_remote(endpoint: &str, token: Option<String>) -> i32 {
306 let tokio_rt = match tokio::runtime::Builder::new_current_thread()
307 .enable_all()
308 .build()
309 {
310 Ok(rt) => rt,
311 Err(e) => {
312 tracing::error!(err = %e, "rpc: failed to build tokio runtime");
313 return 1;
314 }
315 };
316 let client = match tokio_rt.block_on(RedDBClient::connect(endpoint, token)) {
317 Ok(c) => c,
318 Err(e) => {
319 tracing::error!(endpoint, err = %e, "rpc: failed to connect");
320 return 1;
321 }
322 };
323 let backend = Backend::Remote(Box::new(RemoteBackend {
324 client: AsyncMutex::new(client),
325 tokio_rt: &tokio_rt,
326 }));
327 run_backend(&backend, std::io::stdin(), &mut std::io::stdout())
328}
329
330pub fn run_with_io<W: Write>(runtime: &RedDBRuntime, stdin: Stdin, stdout: &mut W) -> i32 {
332 run_backend(&Backend::Local(runtime), stdin, stdout)
333}
334
335static STDIO_SESSION_CONN_ID: std::sync::atomic::AtomicU64 =
342 std::sync::atomic::AtomicU64::new(1_000_000);
343
344fn next_stdio_conn_id() -> u64 {
345 STDIO_SESSION_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
346}
347
348fn run_backend<W: Write>(backend: &Backend<'_>, stdin: Stdin, stdout: &mut W) -> i32 {
349 let reader = BufReader::new(stdin.lock());
350 let mut session = Session::new();
351 let conn_id = next_stdio_conn_id();
355 crate::runtime::impl_core::set_current_connection_id(conn_id);
356 for line_result in reader.lines() {
357 let line = match line_result {
358 Ok(l) => l,
359 Err(e) => {
360 let _ = writeln!(
361 stdout,
362 "{}",
363 error_response(&Value::Null, error_code::INTERNAL_ERROR, &e.to_string())
364 );
365 let _ = stdout.flush();
366 return 1;
367 }
368 };
369 if line.trim().is_empty() {
370 continue;
371 }
372 let response = handle_line(backend, &mut session, &line);
373 if writeln!(stdout, "{}", response).is_err() || stdout.flush().is_err() {
374 return 1;
375 }
376 if response.contains("\"__close__\":true") {
377 return 0;
378 }
379 }
380 let _ = session.take_tx();
384 crate::runtime::impl_core::clear_current_connection_id();
385 0
386}
387
388fn handle_line(backend: &Backend<'_>, session: &mut Session, line: &str) -> String {
392 let parsed: Value = match json::from_str(line) {
393 Ok(v) => v,
394 Err(err) => {
395 return error_response(
396 &Value::Null,
397 error_code::PARSE_ERROR,
398 &format!("invalid JSON: {err}"),
399 );
400 }
401 };
402
403 let id = parsed.get("id").cloned().unwrap_or(Value::Null);
404
405 let method = match parsed.get("method").and_then(Value::as_str) {
406 Some(m) => m.to_string(),
407 None => {
408 return error_response(&id, error_code::INVALID_REQUEST, "missing 'method' field");
409 }
410 };
411
412 let params = parsed.get("params").cloned().unwrap_or(Value::Null);
413
414 let dispatch = std::panic::catch_unwind(AssertUnwindSafe(|| match backend {
415 Backend::Local(rt) => dispatch_method(rt, session, &method, ¶ms),
416 Backend::Remote(remote) => {
417 if matches!(
422 method.as_str(),
423 "tx.begin"
424 | "tx.commit"
425 | "tx.rollback"
426 | "query.open"
427 | "query.next"
428 | "query.close"
429 ) {
430 Err((
431 error_code::TX_NOT_SUPPORTED_REMOTE,
432 format!("{method} is not supported over remote gRPC yet"),
433 ))
434 } else {
435 dispatch_method_remote(&remote.client, remote.tokio_rt, &method, ¶ms)
436 }
437 }
438 }));
439
440 match dispatch {
441 Ok(Ok(result)) => success_response(&id, &result, method == "close"),
442 Ok(Err((code, msg))) => error_response(&id, code, &msg),
443 Err(_) => error_response(&id, error_code::INTERNAL_ERROR, "handler panicked (caught)"),
444 }
445}
446
447fn dispatch_method(
450 runtime: &RedDBRuntime,
451 session: &mut Session,
452 method: &str,
453 params: &Value,
454) -> Result<Value, (&'static str, String)> {
455 match method {
456 "tx.begin" => {
457 let tx_id = session.open_tx()?;
458 Ok(Value::Object(
459 [
460 ("tx_id".to_string(), Value::Number(tx_id as f64)),
461 (
462 "isolation".to_string(),
463 Value::String("read_committed_deferred".to_string()),
464 ),
465 ]
466 .into_iter()
467 .collect(),
468 ))
469 }
470
471 "tx.commit" => {
472 let tx = session.take_tx().ok_or((
473 error_code::NO_TX_OPEN,
474 "no transaction is open in this session".to_string(),
475 ))?;
476 let tx_id = tx.tx_id;
477 let op_count = tx.write_set.len();
478
479 let replay: Result<(u64, usize), (usize, String)> = (|| {
486 runtime
487 .execute_query("BEGIN")
488 .map_err(|e| (0usize, format!("BEGIN: {e}")))?;
489 let mut total_affected: u64 = 0;
490 for (idx, op) in tx.write_set.iter().enumerate() {
491 match runtime.execute_query(op.sql()) {
492 Ok(qr) => total_affected += qr.affected_rows,
493 Err(e) => {
494 let _ = runtime.execute_query("ROLLBACK");
495 return Err((idx, e.to_string()));
496 }
497 }
498 }
499 runtime
500 .execute_query("COMMIT")
501 .map_err(|e| (op_count, format!("COMMIT: {e}")))?;
502 Ok((total_affected, op_count))
503 })();
504
505 match replay {
506 Ok((affected, replayed)) => Ok(Value::Object(
507 [
508 ("tx_id".to_string(), Value::Number(tx_id as f64)),
509 ("ops_replayed".to_string(), Value::Number(replayed as f64)),
510 ("affected".to_string(), Value::Number(affected as f64)),
511 ]
512 .into_iter()
513 .collect(),
514 )),
515 Err((failed_idx, msg)) => Err((
516 error_code::TX_REPLAY_FAILED,
517 format!(
518 "tx {tx_id} replay failed at op {failed_idx}/{op_count}: {msg} \
519 (ops 0..{failed_idx} already applied and are NOT rolled back)"
520 ),
521 )),
522 }
523 }
524
525 "query.open" => {
526 let sql = params.get("sql").and_then(Value::as_str).ok_or((
527 error_code::INVALID_PARAMS,
528 "missing 'sql' string".to_string(),
529 ))?;
530 let qr = runtime
531 .execute_query(sql)
532 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
533
534 let columns: Vec<String> = qr
538 .result
539 .records
540 .first()
541 .map(|first| {
542 let mut keys: Vec<String> = first
543 .column_names()
544 .into_iter()
545 .map(|k| k.to_string())
546 .collect();
547 keys.sort();
548 keys
549 })
550 .unwrap_or_default();
551
552 let cursor = Cursor::new(columns.clone(), qr.result.records);
553 let total = cursor.total();
554 let cursor_id = session.insert_cursor(cursor)?;
555
556 Ok(Value::Object(
557 [
558 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
559 (
560 "columns".to_string(),
561 Value::Array(columns.into_iter().map(Value::String).collect()),
562 ),
563 ("total_rows".to_string(), Value::Number(total as f64)),
564 ]
565 .into_iter()
566 .collect(),
567 ))
568 }
569
570 "query.next" => {
571 let cursor_id = params
572 .get("cursor_id")
573 .and_then(|v| v.as_f64())
574 .map(|n| n as u64)
575 .ok_or((
576 error_code::INVALID_PARAMS,
577 "missing 'cursor_id' number".to_string(),
578 ))?;
579 let batch_size = params
580 .get("batch_size")
581 .and_then(|v| v.as_f64())
582 .map(|n| n as usize)
583 .unwrap_or(DEFAULT_CURSOR_BATCH_SIZE)
584 .clamp(1, MAX_CURSOR_BATCH_SIZE);
585
586 let (rows, done, remaining) = {
589 let cursor = session.cursor_mut(cursor_id).ok_or((
590 error_code::CURSOR_NOT_FOUND,
591 format!("cursor {cursor_id} not found"),
592 ))?;
593 let batch = cursor.take_batch(batch_size);
594 let rows_json: Vec<Value> = batch.iter().map(record_to_json_object).collect();
595 (rows_json, cursor.is_exhausted(), cursor.remaining())
596 };
597
598 if done {
599 let _ = session.drop_cursor(cursor_id);
602 }
603
604 Ok(Value::Object(
605 [
606 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
607 ("rows".to_string(), Value::Array(rows)),
608 ("done".to_string(), Value::Bool(done)),
609 ("remaining".to_string(), Value::Number(remaining as f64)),
610 ]
611 .into_iter()
612 .collect(),
613 ))
614 }
615
616 "query.close" => {
617 let cursor_id = params
618 .get("cursor_id")
619 .and_then(|v| v.as_f64())
620 .map(|n| n as u64)
621 .ok_or((
622 error_code::INVALID_PARAMS,
623 "missing 'cursor_id' number".to_string(),
624 ))?;
625 let existed = session.drop_cursor(cursor_id).is_some();
626 if !existed {
627 return Err((
628 error_code::CURSOR_NOT_FOUND,
629 format!("cursor {cursor_id} not found"),
630 ));
631 }
632 Ok(Value::Object(
633 [
634 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
635 ("closed".to_string(), Value::Bool(true)),
636 ]
637 .into_iter()
638 .collect(),
639 ))
640 }
641
642 "tx.rollback" => {
643 let tx = session.take_tx().ok_or((
644 error_code::NO_TX_OPEN,
645 "no transaction is open in this session".to_string(),
646 ))?;
647 let ops_discarded = tx.write_set.len();
648 Ok(Value::Object(
649 [
650 ("tx_id".to_string(), Value::Number(tx.tx_id as f64)),
651 (
652 "ops_discarded".to_string(),
653 Value::Number(ops_discarded as f64),
654 ),
655 ]
656 .into_iter()
657 .collect(),
658 ))
659 }
660
661 "version" => Ok(Value::Object(
662 [
663 (
664 "version".to_string(),
665 Value::String(env!("CARGO_PKG_VERSION").to_string()),
666 ),
667 (
668 "protocol".to_string(),
669 Value::String(PROTOCOL_VERSION.to_string()),
670 ),
671 ]
672 .into_iter()
673 .collect(),
674 )),
675
676 "health" => Ok(Value::Object(
677 [
678 ("ok".to_string(), Value::Bool(true)),
679 (
680 "version".to_string(),
681 Value::String(env!("CARGO_PKG_VERSION").to_string()),
682 ),
683 ]
684 .into_iter()
685 .collect(),
686 )),
687
688 "query" => {
689 let sql = params.get("sql").and_then(Value::as_str).ok_or((
690 error_code::INVALID_PARAMS,
691 "missing 'sql' string".to_string(),
692 ))?;
693
694 let bind_values: Option<Vec<SchemaValue>> = params
697 .get("params")
698 .map(|v| {
699 v.as_array()
700 .ok_or((
701 error_code::INVALID_PARAMS,
702 "'params' must be an array".to_string(),
703 ))
704 .map(|arr| arr.iter().map(json_value_to_schema_value).collect())
705 })
706 .transpose()?;
707
708 if let Some(binds) = bind_values {
709 use crate::storage::query::modes::parse_multi;
710 use crate::storage::query::user_params;
711 let parsed =
712 parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
713 let bound = user_params::bind(&parsed, &binds)
714 .map_err(|e| (error_code::INVALID_PARAMS, e.to_string()))?;
715 let qr = runtime
716 .execute_query_expr(bound)
717 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
718 return Ok(query_result_to_json(&qr));
719 }
720
721 let qr = runtime
722 .execute_query(sql)
723 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
724 Ok(query_result_to_json(&qr))
725 }
726
727 "prepare" => {
736 use crate::storage::query::modes::parse_multi;
737 use crate::storage::query::planner::shape::parameterize_query_expr;
738
739 let sql = params.get("sql").and_then(Value::as_str).ok_or((
740 error_code::INVALID_PARAMS,
741 "missing 'sql' string".to_string(),
742 ))?;
743 let parsed = parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
744 let (shape, parameter_count) = if let Some(prepared) = parameterize_query_expr(&parsed)
745 {
746 (prepared.shape, prepared.parameter_count)
747 } else {
748 (parsed, 0)
749 };
750 let id = session.next_prepared_id;
751 session.next_prepared_id = session.next_prepared_id.saturating_add(1);
752 session.prepared.insert(
753 id,
754 StdioPreparedStatement {
755 shape,
756 parameter_count,
757 },
758 );
759 Ok(Value::Object(
760 [
761 ("prepared_id".to_string(), Value::Number(id as f64)),
762 (
763 "parameter_count".to_string(),
764 Value::Number(parameter_count as f64),
765 ),
766 ]
767 .into_iter()
768 .collect(),
769 ))
770 }
771
772 "execute_prepared" => {
773 use crate::storage::query::planner::shape::bind_parameterized_query;
774 use crate::storage::schema::Value as SV;
775
776 let id = params
777 .get("prepared_id")
778 .and_then(Value::as_f64)
779 .map(|n| n as u64)
780 .ok_or((
781 error_code::INVALID_PARAMS,
782 "missing 'prepared_id'".to_string(),
783 ))?;
784
785 let stmt = session.prepared.get(&id).ok_or((
786 error_code::QUERY_ERROR,
787 format!("no prepared statement with id {id}"),
788 ))?;
789
790 let binds_json: Vec<Value> = params
792 .get("binds")
793 .and_then(Value::as_array)
794 .map(|a| a.to_vec())
795 .unwrap_or_default();
796 if binds_json.len() != stmt.parameter_count {
797 return Err((
798 error_code::INVALID_PARAMS,
799 format!(
800 "expected {} bind values, got {}",
801 stmt.parameter_count,
802 binds_json.len()
803 ),
804 ));
805 }
806
807 let binds: Vec<SV> = binds_json.iter().map(json_value_to_schema_value).collect();
809
810 let expr = if stmt.parameter_count == 0 {
812 stmt.shape.clone()
813 } else {
814 bind_parameterized_query(&stmt.shape, &binds, stmt.parameter_count)
815 .ok_or((error_code::QUERY_ERROR, "bind failed".to_string()))?
816 };
817
818 let qr = runtime
819 .execute_query_expr(expr)
820 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
821 Ok(query_result_to_json(&qr))
822 }
823
824 "insert" => {
825 let collection = params.get("collection").and_then(Value::as_str).ok_or((
826 error_code::INVALID_PARAMS,
827 "missing 'collection' string".to_string(),
828 ))?;
829 let payload = params.get("payload").ok_or((
830 error_code::INVALID_PARAMS,
831 "missing 'payload' object".to_string(),
832 ))?;
833 let payload_obj = payload.as_object().ok_or((
834 error_code::INVALID_PARAMS,
835 "'payload' must be a JSON object".to_string(),
836 ))?;
837 if let Some(tx) = session.current_tx_mut() {
838 let sql = build_insert_sql(collection, payload_obj.iter());
839 tx.write_set.push(PendingSql::Insert(sql));
840 return Ok(pending_tx_response(tx.tx_id));
841 }
842
843 let output = runtime
844 .create_row(flat_payload_to_row_input(collection, payload_obj))
845 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
846 let mut out = json::Map::new();
847 out.insert("affected".to_string(), Value::Number(1.0));
848 out.insert("id".to_string(), Value::String(output.id.raw().to_string()));
849 Ok(Value::Object(out))
850 }
851
852 "bulk_insert" => {
853 let collection = params.get("collection").and_then(Value::as_str).ok_or((
854 error_code::INVALID_PARAMS,
855 "missing 'collection' string".to_string(),
856 ))?;
857 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
858 error_code::INVALID_PARAMS,
859 "missing 'payloads' array".to_string(),
860 ))?;
861
862 let mut objects = Vec::with_capacity(payloads.len());
863 for entry in payloads {
864 objects.push(entry.as_object().ok_or((
865 error_code::INVALID_PARAMS,
866 "each payload must be a JSON object".to_string(),
867 ))?);
868 }
869
870 if let Some(tx) = session.current_tx_mut() {
871 let mut buffered: u64 = 0;
872 for obj in &objects {
873 let sql = build_insert_sql(collection, obj.iter());
874 tx.write_set.push(PendingSql::Insert(sql));
875 buffered += 1;
876 }
877 let tx_id = tx.tx_id;
878 return Ok(Value::Object(
879 [
880 ("affected".to_string(), Value::Number(0.0)),
881 ("buffered".to_string(), Value::Number(buffered as f64)),
882 ("pending".to_string(), Value::Bool(true)),
883 ("tx_id".to_string(), Value::Number(tx_id as f64)),
884 ]
885 .into_iter()
886 .collect(),
887 ));
888 }
889
890 if should_bulk_insert_graph(runtime, collection, &objects) {
891 return bulk_insert_graph(runtime, collection, &objects)
892 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()));
893 }
894
895 let mut total_affected: u64 = 0;
896 let mut ids = Vec::with_capacity(objects.len());
897 for chunk in objects.chunks(STDIO_BULK_INSERT_CHUNK_ROWS) {
898 let rows = chunk
899 .iter()
900 .map(|obj| flat_payload_to_row_input(collection, obj))
901 .collect();
902 let outputs = runtime
903 .create_rows_batch(CreateRowsBatchInput {
904 collection: collection.to_string(),
905 rows,
906 suppress_events: false,
907 })
908 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
909 total_affected += outputs.len() as u64;
910 ids.extend(
911 outputs
912 .into_iter()
913 .map(|output| Value::String(output.id.raw().to_string())),
914 );
915 }
916 let mut out = json::Map::new();
917 out.insert("affected".to_string(), Value::Number(total_affected as f64));
918 out.insert("ids".to_string(), Value::Array(ids));
919 Ok(Value::Object(out))
920 }
921
922 "get" => {
923 let collection = params.get("collection").and_then(Value::as_str).ok_or((
924 error_code::INVALID_PARAMS,
925 "missing 'collection' string".to_string(),
926 ))?;
927 let id = params.get("id").and_then(Value::as_str).ok_or((
928 error_code::INVALID_PARAMS,
929 "missing 'id' string".to_string(),
930 ))?;
931 let sql = format!("SELECT * FROM {collection} WHERE red_entity_id = {id} LIMIT 1");
932 let qr = runtime
933 .execute_query(&sql)
934 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
935 let entity = qr
936 .result
937 .records
938 .first()
939 .map(record_to_json_object)
940 .unwrap_or(Value::Null);
941 Ok(Value::Object(
942 [("entity".to_string(), entity)].into_iter().collect(),
943 ))
944 }
945
946 "delete" => {
947 let collection = params.get("collection").and_then(Value::as_str).ok_or((
948 error_code::INVALID_PARAMS,
949 "missing 'collection' string".to_string(),
950 ))?;
951 let id = params.get("id").and_then(Value::as_str).ok_or((
952 error_code::INVALID_PARAMS,
953 "missing 'id' string".to_string(),
954 ))?;
955 let sql = format!("DELETE FROM {collection} WHERE red_entity_id = {id}");
956
957 if let Some(tx) = session.current_tx_mut() {
958 tx.write_set.push(PendingSql::Delete(sql));
959 return Ok(pending_tx_response(tx.tx_id));
960 }
961
962 let qr = runtime
963 .execute_query(&sql)
964 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
965 Ok(Value::Object(
966 [(
967 "affected".to_string(),
968 Value::Number(qr.affected_rows as f64),
969 )]
970 .into_iter()
971 .collect(),
972 ))
973 }
974
975 "close" => {
976 let _ = session.take_tx();
981 session.clear_cursors();
982 let _ = runtime.checkpoint();
983 Ok(Value::Null)
984 }
985
986 "auth.login"
991 | "auth.whoami"
992 | "auth.change_password"
993 | "auth.create_api_key"
994 | "auth.revoke_api_key" => {
995 let _ = (session, params);
996 Err((
997 error_code::INVALID_REQUEST,
998 format!(
999 "{method}: auth methods are only available on grpc:// connections; \
1000 embedded modes (memory://, file://) inherit caller privileges"
1001 ),
1002 ))
1003 }
1004
1005 other => Err((
1006 error_code::INVALID_REQUEST,
1007 format!("unknown method: {other}"),
1008 )),
1009 }
1010}
1011
1012fn success_response(id: &Value, result: &Value, is_close: bool) -> String {
1017 let mut envelope = json::Map::new();
1022 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1023 envelope.insert("id".to_string(), id.clone());
1024 envelope.insert("result".to_string(), result.clone());
1025 if is_close {
1026 envelope.insert("__close__".to_string(), Value::Bool(true));
1027 }
1028 Value::Object(envelope).to_string_compact()
1029}
1030
1031fn error_response(id: &Value, code: &str, message: &str) -> String {
1032 let mut err = json::Map::new();
1033 err.insert("code".to_string(), Value::String(code.to_string()));
1034 err.insert("message".to_string(), Value::String(message.to_string()));
1035 err.insert("data".to_string(), Value::Null);
1036
1037 let mut envelope = json::Map::new();
1038 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1039 envelope.insert("id".to_string(), id.clone());
1040 envelope.insert("error".to_string(), Value::Object(err));
1041 Value::Object(envelope).to_string_compact()
1042}
1043
1044fn pending_tx_response(tx_id: u64) -> Value {
1051 Value::Object(
1052 [
1053 ("affected".to_string(), Value::Number(0.0)),
1054 ("pending".to_string(), Value::Bool(true)),
1055 ("tx_id".to_string(), Value::Number(tx_id as f64)),
1056 ]
1057 .into_iter()
1058 .collect(),
1059 )
1060}
1061
1062pub(crate) fn build_insert_sql<'a, I>(collection: &str, fields: I) -> String
1063where
1064 I: Iterator<Item = (&'a String, &'a Value)>,
1065{
1066 let mut cols = Vec::new();
1067 let mut vals = Vec::new();
1068 for (k, v) in fields {
1069 cols.push(k.clone());
1070 vals.push(value_to_sql_literal(v));
1071 }
1072 format!(
1073 "INSERT INTO {collection} ({}) VALUES ({})",
1074 cols.join(", "),
1075 vals.join(", "),
1076 )
1077}
1078
1079fn flat_payload_to_row_input(
1080 collection: &str,
1081 payload: &json::Map<String, Value>,
1082) -> CreateRowInput {
1083 CreateRowInput {
1084 collection: collection.to_string(),
1085 fields: payload
1086 .iter()
1087 .map(|(key, value)| (key.clone(), json_value_to_schema_value(value)))
1088 .collect(),
1089 metadata: Vec::new(),
1090 node_links: Vec::new(),
1091 vector_links: Vec::new(),
1092 }
1093}
1094
1095fn bulk_insert_chunk_count(row_count: usize) -> usize {
1096 if row_count == 0 {
1097 0
1098 } else {
1099 ((row_count - 1) / STDIO_BULK_INSERT_CHUNK_ROWS) + 1
1100 }
1101}
1102
1103pub(crate) fn should_bulk_insert_graph(
1104 runtime: &RedDBRuntime,
1105 collection: &str,
1106 payloads: &[&json::Map<String, Value>],
1107) -> bool {
1108 let graph_shaped = payloads
1109 .iter()
1110 .all(|payload| payload.get("label").and_then(Value::as_str).is_some());
1111 if !graph_shaped {
1112 return false;
1113 }
1114
1115 matches!(
1116 runtime
1117 .db()
1118 .catalog_model_snapshot()
1119 .collections
1120 .iter()
1121 .find(|descriptor| descriptor.name == collection)
1122 .map(|descriptor| descriptor.declared_model.unwrap_or(descriptor.model)),
1123 Some(crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed)
1124 )
1125}
1126
1127pub(crate) fn bulk_insert_graph(
1128 runtime: &RedDBRuntime,
1129 collection: &str,
1130 payloads: &[&json::Map<String, Value>],
1131) -> crate::RedDBResult<Value> {
1132 use crate::application::entity_payload::{parse_create_edge_input, parse_create_node_input};
1133 use crate::application::ports::RuntimeEntityPort;
1134
1135 let mut ids = Vec::with_capacity(payloads.len());
1136 for payload in payloads {
1137 let input_payload = normalize_flat_graph_payload(payload);
1138 let id = if payload.contains_key("from") || payload.contains_key("to") {
1139 runtime
1140 .create_edge(parse_create_edge_input(
1141 collection.to_string(),
1142 &input_payload,
1143 )?)?
1144 .id
1145 } else {
1146 runtime
1147 .create_node(parse_create_node_input(
1148 collection.to_string(),
1149 &input_payload,
1150 )?)?
1151 .id
1152 };
1153 ids.push(Value::Number(id.raw() as f64));
1154 }
1155
1156 let mut out = json::Map::new();
1157 out.insert("affected".to_string(), Value::Number(ids.len() as f64));
1158 out.insert("ids".to_string(), Value::Array(ids));
1159 Ok(Value::Object(out))
1160}
1161
1162fn normalize_flat_graph_payload(payload: &json::Map<String, Value>) -> Value {
1163 if payload.contains_key("properties") || payload.contains_key("fields") {
1164 return Value::Object(payload.clone());
1165 }
1166
1167 let is_edge = payload.contains_key("from") || payload.contains_key("to");
1168 let mut normalized = payload.clone();
1169 let mut properties = json::Map::new();
1170 for (key, value) in payload {
1171 let reserved = if is_edge {
1172 matches!(
1173 key.as_str(),
1174 "label"
1175 | "from"
1176 | "to"
1177 | "weight"
1178 | "metadata"
1179 | "properties"
1180 | "fields"
1181 | "_ttl_ms"
1182 | "_expires_at"
1183 )
1184 } else {
1185 matches!(
1186 key.as_str(),
1187 "label"
1188 | "node_type"
1189 | "metadata"
1190 | "links"
1191 | "embeddings"
1192 | "properties"
1193 | "fields"
1194 | "_ttl_ms"
1195 | "_expires_at"
1196 )
1197 };
1198 if !reserved {
1199 properties.insert(key.clone(), value.clone());
1200 }
1201 }
1202 if !properties.is_empty() {
1203 normalized.insert("properties".to_string(), Value::Object(properties));
1204 }
1205 Value::Object(normalized)
1206}
1207
1208pub(crate) fn value_to_sql_literal(v: &Value) -> String {
1209 match v {
1210 Value::Null => "NULL".to_string(),
1211 Value::Bool(b) => b.to_string(),
1212 Value::Number(n) => {
1213 if n.fract() == 0.0 {
1214 format!("{}", *n as i64)
1215 } else {
1216 n.to_string()
1217 }
1218 }
1219 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1220 other => format!("'{}'", other.to_string_compact().replace('\'', "''")),
1221 }
1222}
1223
1224pub(crate) fn query_result_to_json(qr: &RuntimeQueryResult) -> Value {
1225 if let Some(ask) = ask_query_result_to_json(qr) {
1226 return ask;
1227 }
1228
1229 let mut envelope = json::Map::new();
1230 envelope.insert(
1231 "statement".to_string(),
1232 Value::String(qr.statement_type.to_string()),
1233 );
1234 envelope.insert(
1235 "affected".to_string(),
1236 Value::Number(qr.affected_rows as f64),
1237 );
1238
1239 let mut columns = Vec::new();
1240 if let Some(first) = qr.result.records.first() {
1241 let mut keys: Vec<String> = first
1242 .column_names()
1243 .into_iter()
1244 .map(|k| k.to_string())
1245 .collect();
1246 keys.sort();
1247 columns = keys.into_iter().map(Value::String).collect();
1248 }
1249 envelope.insert("columns".to_string(), Value::Array(columns));
1250
1251 let rows: Vec<Value> = qr
1252 .result
1253 .records
1254 .iter()
1255 .map(record_to_json_object)
1256 .collect();
1257 envelope.insert("rows".to_string(), Value::Array(rows));
1258
1259 Value::Object(envelope)
1260}
1261
1262fn ask_query_result_to_json(qr: &RuntimeQueryResult) -> Option<Value> {
1263 if qr.statement != "ask" {
1264 return None;
1265 }
1266 let row = qr.result.records.first()?;
1267 let answer = text_field(row, "answer")?;
1268 let provider = text_field(row, "provider").unwrap_or_default();
1269 let model = text_field(row, "model").unwrap_or_default();
1270 let sources_flat_json = json_field(row, "sources_flat").unwrap_or(Value::Array(Vec::new()));
1271 let citations_json = json_field(row, "citations").unwrap_or(Value::Array(Vec::new()));
1272 let validation_json = json_field(row, "validation").unwrap_or(Value::Object(json::Map::new()));
1273
1274 let effective_mode = match text_field(row, "mode").as_deref() {
1275 Some("lenient") => crate::runtime::ai::ask_response_envelope::Mode::Lenient,
1276 _ => crate::runtime::ai::ask_response_envelope::Mode::Strict,
1277 };
1278
1279 let result = crate::runtime::ai::ask_response_envelope::AskResult {
1280 answer,
1281 sources_flat: envelope_sources_flat(&sources_flat_json),
1282 citations: envelope_citations(&citations_json),
1283 validation: envelope_validation(&validation_json),
1284 cache_hit: bool_field(row, "cache_hit").unwrap_or(false),
1285 provider,
1286 model,
1287 prompt_tokens: u32_field(row, "prompt_tokens").unwrap_or(0),
1288 completion_tokens: u32_field(row, "completion_tokens").unwrap_or(0),
1289 cost_usd: f64_field(row, "cost_usd").unwrap_or(0.0),
1290 effective_mode,
1291 retry_count: u32_field(row, "retry_count").unwrap_or(0),
1292 };
1293 Some(crate::runtime::ai::ask_response_envelope::build(&result))
1294}
1295
1296fn record_field<'a>(record: &'a UnifiedRecord, name: &str) -> Option<&'a SchemaValue> {
1297 record
1298 .iter_fields()
1299 .find_map(|(key, value)| (key.as_ref() == name).then_some(value))
1300}
1301
1302fn text_field(record: &UnifiedRecord, name: &str) -> Option<String> {
1303 match record_field(record, name)? {
1304 SchemaValue::Text(s) => Some(s.to_string()),
1305 SchemaValue::Email(s)
1306 | SchemaValue::Url(s)
1307 | SchemaValue::NodeRef(s)
1308 | SchemaValue::EdgeRef(s) => Some(s.clone()),
1309 other => Some(format!("{other}")),
1310 }
1311}
1312
1313fn u32_field(record: &UnifiedRecord, name: &str) -> Option<u32> {
1314 match record_field(record, name)? {
1315 SchemaValue::Integer(n) => (*n >= 0).then_some((*n).min(u32::MAX as i64) as u32),
1316 SchemaValue::UnsignedInteger(n) => Some((*n).min(u32::MAX as u64) as u32),
1317 SchemaValue::BigInt(n)
1318 | SchemaValue::TimestampMs(n)
1319 | SchemaValue::Timestamp(n)
1320 | SchemaValue::Duration(n)
1321 | SchemaValue::Decimal(n) => (*n >= 0).then_some((*n).min(u32::MAX as i64) as u32),
1322 SchemaValue::Float(n) => (*n >= 0.0).then_some((*n).min(u32::MAX as f64) as u32),
1323 _ => None,
1324 }
1325}
1326
1327fn f64_field(record: &UnifiedRecord, name: &str) -> Option<f64> {
1328 match record_field(record, name)? {
1329 SchemaValue::Integer(n) => Some(*n as f64),
1330 SchemaValue::UnsignedInteger(n) => Some(*n as f64),
1331 SchemaValue::BigInt(n)
1332 | SchemaValue::TimestampMs(n)
1333 | SchemaValue::Timestamp(n)
1334 | SchemaValue::Duration(n)
1335 | SchemaValue::Decimal(n) => Some(*n as f64),
1336 SchemaValue::Float(n) => Some(*n),
1337 _ => None,
1338 }
1339}
1340
1341fn bool_field(record: &UnifiedRecord, name: &str) -> Option<bool> {
1342 match record_field(record, name)? {
1343 SchemaValue::Boolean(value) => Some(*value),
1344 _ => None,
1345 }
1346}
1347
1348fn json_field(record: &UnifiedRecord, name: &str) -> Option<Value> {
1349 match record_field(record, name)? {
1350 SchemaValue::Json(bytes) => json::from_slice(bytes).ok(),
1351 SchemaValue::Text(text) => json::from_str(text).ok(),
1352 _ => None,
1353 }
1354}
1355
1356fn envelope_sources_flat(
1357 value: &Value,
1358) -> Vec<crate::runtime::ai::ask_response_envelope::SourceRow> {
1359 value
1360 .as_array()
1361 .unwrap_or(&[])
1362 .iter()
1363 .filter_map(|source| {
1364 let urn = source.get("urn").and_then(Value::as_str)?.to_string();
1365 let payload = source
1366 .get("payload")
1367 .and_then(Value::as_str)
1368 .map(ToString::to_string)
1369 .unwrap_or_else(|| source.to_string_compact());
1370 Some(crate::runtime::ai::ask_response_envelope::SourceRow { urn, payload })
1371 })
1372 .collect()
1373}
1374
1375fn envelope_citations(value: &Value) -> Vec<crate::runtime::ai::ask_response_envelope::Citation> {
1376 value
1377 .as_array()
1378 .unwrap_or(&[])
1379 .iter()
1380 .filter_map(|citation| {
1381 let marker = citation.get("marker").and_then(Value::as_u64)?;
1382 let urn = citation.get("urn").and_then(Value::as_str)?.to_string();
1383 Some(crate::runtime::ai::ask_response_envelope::Citation {
1384 marker: marker.min(u32::MAX as u64) as u32,
1385 urn,
1386 })
1387 })
1388 .collect()
1389}
1390
1391fn envelope_validation(value: &Value) -> crate::runtime::ai::ask_response_envelope::Validation {
1392 crate::runtime::ai::ask_response_envelope::Validation {
1393 ok: value.get("ok").and_then(Value::as_bool).unwrap_or(true),
1394 warnings: validation_items(value, "warnings")
1395 .into_iter()
1396 .map(
1397 |(kind, detail)| crate::runtime::ai::ask_response_envelope::ValidationWarning {
1398 kind,
1399 detail,
1400 },
1401 )
1402 .collect(),
1403 errors: validation_items(value, "errors")
1404 .into_iter()
1405 .map(
1406 |(kind, detail)| crate::runtime::ai::ask_response_envelope::ValidationError {
1407 kind,
1408 detail,
1409 },
1410 )
1411 .collect(),
1412 }
1413}
1414
1415fn validation_items(value: &Value, key: &str) -> Vec<(String, String)> {
1416 value
1417 .get(key)
1418 .and_then(Value::as_array)
1419 .unwrap_or(&[])
1420 .iter()
1421 .filter_map(|item| {
1422 Some((
1423 item.get("kind").and_then(Value::as_str)?.to_string(),
1424 item.get("detail")
1425 .and_then(Value::as_str)
1426 .unwrap_or("")
1427 .to_string(),
1428 ))
1429 })
1430 .collect()
1431}
1432
1433pub(crate) fn insert_result_to_json(qr: &RuntimeQueryResult) -> Value {
1434 let mut envelope = json::Map::new();
1435 envelope.insert(
1436 "affected".to_string(),
1437 Value::Number(qr.affected_rows as f64),
1438 );
1439 if let Some(first) = qr.result.records.first() {
1441 if let Some(id_val) = first
1442 .iter_fields()
1443 .find(|(k, _)| {
1444 let s: &str = k;
1445 s == "_entity_id"
1446 })
1447 .map(|(_, v)| schema_value_to_json(v))
1448 {
1449 envelope.insert("id".to_string(), id_val);
1450 }
1451 }
1452 Value::Object(envelope)
1453}
1454
1455fn record_to_json_object(record: &UnifiedRecord) -> Value {
1456 let mut map = json::Map::new();
1457 let mut entries: Vec<(&str, &SchemaValue)> =
1460 record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
1461 entries.sort_by(|a, b| a.0.cmp(b.0));
1462 for (k, v) in entries {
1463 map.insert(k.to_string(), schema_value_to_json(v));
1464 }
1465 Value::Object(map)
1466}
1467
1468fn schema_value_to_json(v: &SchemaValue) -> Value {
1469 match v {
1470 SchemaValue::Null => Value::Null,
1471 SchemaValue::Boolean(b) => Value::Bool(*b),
1472 SchemaValue::Integer(n) => Value::Number(*n as f64),
1473 SchemaValue::UnsignedInteger(n) => Value::Number(*n as f64),
1474 SchemaValue::Float(n) if n.is_finite() => Value::Number(*n),
1475 SchemaValue::Float(n) => {
1476 let token = if n.is_nan() {
1477 "NaN"
1478 } else if n.is_sign_positive() {
1479 "Infinity"
1480 } else {
1481 "-Infinity"
1482 };
1483 single_key_object("$float", Value::String(token.to_string()))
1484 }
1485 SchemaValue::BigInt(n) => Value::Number(*n as f64),
1486 SchemaValue::TimestampMs(n) | SchemaValue::Duration(n) | SchemaValue::Decimal(n) => {
1487 Value::Number(*n as f64)
1488 }
1489 SchemaValue::Timestamp(n) => single_key_object("$ts", Value::String(n.to_string())),
1490 SchemaValue::Password(_) | SchemaValue::Secret(_) => Value::String("***".to_string()),
1491 SchemaValue::Text(s) => Value::String(s.to_string()),
1492 SchemaValue::Blob(bytes) => {
1493 single_key_object("$bytes", Value::String(base64_encode(bytes)))
1494 }
1495 SchemaValue::Json(bytes) => {
1496 crate::presentation::entity_json::storage_json_bytes_to_json(bytes)
1497 }
1498 SchemaValue::Uuid(bytes) => single_key_object("$uuid", Value::String(format_uuid(bytes))),
1499 SchemaValue::Email(s)
1500 | SchemaValue::Url(s)
1501 | SchemaValue::NodeRef(s)
1502 | SchemaValue::EdgeRef(s) => Value::String(s.clone()),
1503 other => Value::String(format!("{other}")),
1504 }
1505}
1506
1507fn single_key_object(key: &str, value: Value) -> Value {
1508 Value::Object([(key.to_string(), value)].into_iter().collect())
1509}
1510
1511pub(crate) fn json_value_to_schema_value(v: &Value) -> SchemaValue {
1515 match v {
1516 Value::Null => SchemaValue::Null,
1517 Value::Bool(b) => SchemaValue::Boolean(*b),
1518 Value::Number(n) => {
1519 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
1520 SchemaValue::Integer(*n as i64)
1521 } else {
1522 SchemaValue::Float(*n)
1523 }
1524 }
1525 Value::String(s) => SchemaValue::text(s.clone()),
1526 Value::Array(items) => {
1527 if items.iter().all(|v| matches!(v, Value::Number(_))) {
1531 let floats: Vec<f32> = items
1532 .iter()
1533 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
1534 .collect();
1535 SchemaValue::Vector(floats)
1536 } else {
1537 SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default())
1538 }
1539 }
1540 Value::Object(map) => {
1541 if map.len() == 1 {
1542 if let Some(Value::String(encoded)) = map.get("$bytes") {
1543 if let Ok(bytes) = base64_decode(encoded) {
1544 return SchemaValue::Blob(bytes);
1545 }
1546 }
1547 if let Some(value) = map.get("$ts") {
1548 if let Some(ts) = json_i64(value) {
1549 return SchemaValue::Timestamp(ts);
1550 }
1551 }
1552 if let Some(Value::String(value)) = map.get("$uuid") {
1553 if let Ok(uuid) = crate::crypto::Uuid::parse_str(value) {
1554 return SchemaValue::Uuid(*uuid.as_bytes());
1555 }
1556 }
1557 if let Some(Value::String(value)) = map.get("$float") {
1558 return match value.as_str() {
1559 "NaN" => SchemaValue::Float(f64::NAN),
1560 "Infinity" | "+Infinity" | "inf" | "+inf" => {
1561 SchemaValue::Float(f64::INFINITY)
1562 }
1563 "-Infinity" | "-inf" => SchemaValue::Float(f64::NEG_INFINITY),
1564 _ => SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default()),
1565 };
1566 }
1567 }
1568 SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default())
1569 }
1570 }
1571}
1572
1573fn json_i64(value: &Value) -> Option<i64> {
1574 match value {
1575 Value::Number(n) => {
1576 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
1577 Some(*n as i64)
1578 } else {
1579 None
1580 }
1581 }
1582 Value::String(s) => s.parse::<i64>().ok(),
1583 _ => None,
1584 }
1585}
1586
1587fn format_uuid(bytes: &[u8; 16]) -> String {
1588 format!(
1589 "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
1590 bytes[0],
1591 bytes[1],
1592 bytes[2],
1593 bytes[3],
1594 bytes[4],
1595 bytes[5],
1596 bytes[6],
1597 bytes[7],
1598 bytes[8],
1599 bytes[9],
1600 bytes[10],
1601 bytes[11],
1602 bytes[12],
1603 bytes[13],
1604 bytes[14],
1605 bytes[15]
1606 )
1607}
1608
1609fn base64_encode(bytes: &[u8]) -> String {
1610 const TABLE: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1611 let mut out = String::with_capacity((bytes.len() + 2) / 3 * 4);
1612 let mut chunks = bytes.chunks_exact(3);
1613 for chunk in chunks.by_ref() {
1614 let n = ((chunk[0] as u32) << 16) | ((chunk[1] as u32) << 8) | chunk[2] as u32;
1615 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1616 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1617 out.push(TABLE[((n >> 6) & 0x3f) as usize] as char);
1618 out.push(TABLE[(n & 0x3f) as usize] as char);
1619 }
1620 match chunks.remainder() {
1621 [] => {}
1622 [a] => {
1623 let n = (*a as u32) << 16;
1624 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1625 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1626 out.push('=');
1627 out.push('=');
1628 }
1629 [a, b] => {
1630 let n = ((*a as u32) << 16) | ((*b as u32) << 8);
1631 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1632 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1633 out.push(TABLE[((n >> 6) & 0x3f) as usize] as char);
1634 out.push('=');
1635 }
1636 _ => unreachable!(),
1637 }
1638 out
1639}
1640
1641fn base64_decode(input: &str) -> Result<Vec<u8>, String> {
1642 let bytes = input.as_bytes();
1643 if bytes.len() % 4 != 0 {
1644 return Err("base64 length must be a multiple of 4".to_string());
1645 }
1646 let mut out = Vec::with_capacity(bytes.len() / 4 * 3);
1647 for chunk in bytes.chunks_exact(4) {
1648 let pad = chunk.iter().rev().take_while(|&&b| b == b'=').count();
1649 let a = base64_value(chunk[0])?;
1650 let b = base64_value(chunk[1])?;
1651 let c = if chunk[2] == b'=' {
1652 0
1653 } else {
1654 base64_value(chunk[2])?
1655 };
1656 let d = if chunk[3] == b'=' {
1657 0
1658 } else {
1659 base64_value(chunk[3])?
1660 };
1661 let n = ((a as u32) << 18) | ((b as u32) << 12) | ((c as u32) << 6) | d as u32;
1662 out.push(((n >> 16) & 0xff) as u8);
1663 if pad < 2 {
1664 out.push(((n >> 8) & 0xff) as u8);
1665 }
1666 if pad < 1 {
1667 out.push((n & 0xff) as u8);
1668 }
1669 }
1670 Ok(out)
1671}
1672
1673fn base64_value(byte: u8) -> Result<u8, String> {
1674 match byte {
1675 b'A'..=b'Z' => Ok(byte - b'A'),
1676 b'a'..=b'z' => Ok(byte - b'a' + 26),
1677 b'0'..=b'9' => Ok(byte - b'0' + 52),
1678 b'+' => Ok(62),
1679 b'/' => Ok(63),
1680 b'=' => Ok(0),
1681 _ => Err(format!("invalid base64 character: {}", byte as char)),
1682 }
1683}
1684
1685fn dispatch_method_remote(
1695 client: &AsyncMutex<RedDBClient>,
1696 tokio_rt: &tokio::runtime::Runtime,
1697 method: &str,
1698 params: &Value,
1699) -> Result<Value, (&'static str, String)> {
1700 match method {
1701 "version" => Ok(Value::Object(
1702 [
1703 (
1704 "version".to_string(),
1705 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1706 ),
1707 (
1708 "protocol".to_string(),
1709 Value::String(PROTOCOL_VERSION.to_string()),
1710 ),
1711 ]
1712 .into_iter()
1713 .collect(),
1714 )),
1715
1716 "health" => {
1717 let result = tokio_rt.block_on(async {
1718 let mut guard = client.lock().await;
1719 guard.health_status().await
1720 });
1721 match result {
1722 Ok(status) => Ok(Value::Object(
1723 [
1724 ("ok".to_string(), Value::Bool(status.healthy)),
1725 ("state".to_string(), Value::String(status.state)),
1726 (
1727 "checked_at_unix_ms".to_string(),
1728 Value::Number(status.checked_at_unix_ms as f64),
1729 ),
1730 (
1731 "version".to_string(),
1732 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1733 ),
1734 ]
1735 .into_iter()
1736 .collect(),
1737 )),
1738 Err(e) => Err((error_code::INTERNAL_ERROR, e.to_string())),
1739 }
1740 }
1741
1742 "query" => {
1743 let sql = params.get("sql").and_then(Value::as_str).ok_or((
1744 error_code::INVALID_PARAMS,
1745 "missing 'sql' string".to_string(),
1746 ))?;
1747 let json_str = tokio_rt
1748 .block_on(async {
1749 let mut guard = client.lock().await;
1750 guard.query(sql).await
1751 })
1752 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1753 let parsed = json::from_str::<Value>(&json_str)
1758 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1759 Ok(parsed)
1760 }
1761
1762 "insert" => {
1763 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1764 error_code::INVALID_PARAMS,
1765 "missing 'collection' string".to_string(),
1766 ))?;
1767 let payload = params.get("payload").ok_or((
1768 error_code::INVALID_PARAMS,
1769 "missing 'payload' object".to_string(),
1770 ))?;
1771 if payload.as_object().is_none() {
1772 return Err((
1773 error_code::INVALID_PARAMS,
1774 "'payload' must be a JSON object".to_string(),
1775 ));
1776 }
1777 let payload_json = payload.to_string_compact();
1778 let reply = tokio_rt
1779 .block_on(async {
1780 let mut guard = client.lock().await;
1781 guard.create_row_entity(collection, &payload_json).await
1782 })
1783 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1784 let mut out = json::Map::new();
1785 out.insert("affected".to_string(), Value::Number(1.0));
1786 out.insert("id".to_string(), Value::String(reply.id.to_string()));
1787 Ok(Value::Object(out))
1788 }
1789
1790 "bulk_insert" => {
1791 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1792 error_code::INVALID_PARAMS,
1793 "missing 'collection' string".to_string(),
1794 ))?;
1795 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
1796 error_code::INVALID_PARAMS,
1797 "missing 'payloads' array".to_string(),
1798 ))?;
1799 let mut encoded = Vec::with_capacity(payloads.len());
1800 for entry in payloads {
1801 if entry.as_object().is_none() {
1802 return Err((
1803 error_code::INVALID_PARAMS,
1804 "each payload must be a JSON object".to_string(),
1805 ));
1806 }
1807 encoded.push(entry.to_string_compact());
1808 }
1809 let status = tokio_rt
1810 .block_on(async {
1811 let mut guard = client.lock().await;
1812 guard.bulk_create_rows(collection, encoded).await
1813 })
1814 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1815 Ok(Value::Object(
1816 [
1817 ("affected".to_string(), Value::Number(status.count as f64)),
1818 (
1819 "ids".to_string(),
1820 Value::Array(
1821 status
1822 .ids
1823 .into_iter()
1824 .map(|id| Value::Number(id as f64))
1825 .collect(),
1826 ),
1827 ),
1828 ]
1829 .into_iter()
1830 .collect(),
1831 ))
1832 }
1833
1834 "get" => {
1835 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1836 error_code::INVALID_PARAMS,
1837 "missing 'collection' string".to_string(),
1838 ))?;
1839 let id = params.get("id").and_then(Value::as_str).ok_or((
1840 error_code::INVALID_PARAMS,
1841 "missing 'id' string".to_string(),
1842 ))?;
1843 let sql = format!("SELECT * FROM {collection} WHERE red_entity_id = {id} LIMIT 1");
1844 let json_str = tokio_rt
1845 .block_on(async {
1846 let mut guard = client.lock().await;
1847 guard.query(&sql).await
1848 })
1849 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1850 let parsed = json::from_str::<Value>(&json_str)
1851 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1852 let entity = parsed
1855 .get("rows")
1856 .and_then(Value::as_array)
1857 .and_then(|rows| rows.first().cloned())
1858 .unwrap_or(Value::Null);
1859 Ok(Value::Object(
1860 [("entity".to_string(), entity)].into_iter().collect(),
1861 ))
1862 }
1863
1864 "delete" => {
1865 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1866 error_code::INVALID_PARAMS,
1867 "missing 'collection' string".to_string(),
1868 ))?;
1869 let id = params.get("id").and_then(Value::as_str).ok_or((
1870 error_code::INVALID_PARAMS,
1871 "missing 'id' string".to_string(),
1872 ))?;
1873 let id = id.parse::<u64>().map_err(|_| {
1874 (
1875 error_code::INVALID_PARAMS,
1876 "id must be a numeric string".to_string(),
1877 )
1878 })?;
1879 let _reply = tokio_rt
1880 .block_on(async {
1881 let mut guard = client.lock().await;
1882 guard.delete_entity(collection, id).await
1883 })
1884 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1885 Ok(Value::Object(
1886 [("affected".to_string(), Value::Number(1.0))]
1887 .into_iter()
1888 .collect(),
1889 ))
1890 }
1891
1892 "close" => Ok(Value::Null),
1893
1894 other => Err((
1895 error_code::INVALID_REQUEST,
1896 format!("unknown method: {other}"),
1897 )),
1898 }
1899}
1900
1901#[cfg(test)]
1902mod tests {
1903 use super::*;
1904 use crate::json::json;
1905 use proptest::prelude::*;
1906
1907 fn make_runtime() -> RedDBRuntime {
1908 RedDBRuntime::in_memory().expect("in-memory runtime")
1909 }
1910
1911 fn create_graph_collection(rt: &RedDBRuntime, name: &str) {
1912 let db = rt.db();
1913 db.store()
1914 .create_collection(name)
1915 .expect("create collection");
1916 let now = std::time::SystemTime::now()
1917 .duration_since(std::time::UNIX_EPOCH)
1918 .unwrap_or_default()
1919 .as_millis();
1920 db.save_collection_contract(crate::physical::CollectionContract {
1921 name: name.to_string(),
1922 declared_model: crate::catalog::CollectionModel::Graph,
1923 schema_mode: crate::catalog::SchemaMode::Dynamic,
1924 origin: crate::physical::ContractOrigin::Explicit,
1925 version: 1,
1926 created_at_unix_ms: now,
1927 updated_at_unix_ms: now,
1928 default_ttl_ms: None,
1929 vector_dimension: None,
1930 vector_metric: None,
1931 context_index_fields: Vec::new(),
1932 declared_columns: Vec::new(),
1933 table_def: None,
1934 timestamps_enabled: false,
1935 context_index_enabled: false,
1936 metrics_raw_retention_ms: None,
1937 metrics_rollup_policies: Vec::new(),
1938 metrics_tenant_identity: None,
1939 metrics_namespace: None,
1940 append_only: false,
1941 subscriptions: Vec::new(),
1942 })
1943 .expect("save graph contract");
1944 }
1945
1946 fn handle(rt: &RedDBRuntime, line: &str) -> String {
1947 let mut session = Session::new();
1948 handle_line(&Backend::Local(rt), &mut session, line)
1949 }
1950
1951 fn query_request(id: u64, sql: &str) -> String {
1952 let mut params = json::Map::new();
1953 params.insert("sql".to_string(), Value::String(sql.to_string()));
1954
1955 let mut request = json::Map::new();
1956 request.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1957 request.insert("id".to_string(), Value::Number(id as f64));
1958 request.insert("method".to_string(), Value::String("query".to_string()));
1959 request.insert("params".to_string(), Value::Object(params));
1960 Value::Object(request).to_string_compact()
1961 }
1962
1963 fn query_request_with_params(id: u64, sql: &str, binds: Vec<Value>) -> String {
1964 let mut params = json::Map::new();
1965 params.insert("sql".to_string(), Value::String(sql.to_string()));
1966 params.insert("params".to_string(), Value::Array(binds));
1967
1968 let mut request = json::Map::new();
1969 request.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1970 request.insert("id".to_string(), Value::Number(id as f64));
1971 request.insert("method".to_string(), Value::String("query".to_string()));
1972 request.insert("params".to_string(), Value::Object(params));
1973 Value::Object(request).to_string_compact()
1974 }
1975
1976 fn with_session<F>(rt: &RedDBRuntime, f: F)
1979 where
1980 F: FnOnce(&dyn Fn(&str) -> String, &RedDBRuntime),
1981 {
1982 let session = std::cell::RefCell::new(Session::new());
1983 let call = |line: &str| -> String {
1984 let mut s = session.borrow_mut();
1985 handle_line(&Backend::Local(rt), &mut s, line)
1986 };
1987 f(&call, rt);
1988 }
1989
1990 fn result_rows(response: &str) -> Vec<Value> {
1991 json::from_str::<Value>(response)
1992 .expect("json response")
1993 .get("result")
1994 .and_then(|result| result.get("rows"))
1995 .and_then(Value::as_array)
1996 .map(|rows| rows.to_vec())
1997 .unwrap_or_default()
1998 }
1999
2000 fn result_name_kind(response: &str) -> Vec<(String, String)> {
2001 result_rows(response)
2002 .into_iter()
2003 .map(|row| {
2004 let object = row.as_object().expect("row object");
2005 let name = object
2006 .get("name")
2007 .and_then(Value::as_str)
2008 .expect("row name")
2009 .to_string();
2010 let kind = object
2011 .get("kind")
2012 .and_then(Value::as_str)
2013 .expect("row kind")
2014 .to_string();
2015 (name, kind)
2016 })
2017 .collect()
2018 }
2019
2020 fn json_scalar_param() -> impl Strategy<Value = Value> {
2021 prop_oneof![
2022 Just(Value::Null),
2023 any::<bool>().prop_map(Value::Bool),
2024 (-1000_i64..1000_i64).prop_map(|n| Value::Number(n as f64)),
2025 "[a-z']{0,8}".prop_map(Value::String),
2026 ]
2027 }
2028
2029 fn sql_literal_for_json(value: &Value) -> String {
2030 match value {
2031 Value::Null => "NULL".to_string(),
2032 Value::Bool(true) => "TRUE".to_string(),
2033 Value::Bool(false) => "FALSE".to_string(),
2034 Value::Number(n) => format!("{n:.0}"),
2035 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2036 _ => panic!("unsupported scalar param: {value:?}"),
2037 }
2038 }
2039
2040 #[test]
2041 fn version_method_returns_version_and_protocol() {
2042 let rt = make_runtime();
2043 let line = r#"{"jsonrpc":"2.0","id":1,"method":"version","params":{}}"#;
2044 let resp = handle(&rt, line);
2045 assert!(resp.contains("\"id\":1"));
2046 assert!(resp.contains("\"protocol\":\"1.0\""));
2047 assert!(resp.contains("\"version\""));
2048 }
2049
2050 #[test]
2051 fn health_method_returns_ok_true() {
2052 let rt = make_runtime();
2053 let resp = handle(
2054 &rt,
2055 r#"{"jsonrpc":"2.0","id":"abc","method":"health","params":{}}"#,
2056 );
2057 assert!(resp.contains("\"ok\":true"));
2058 assert!(resp.contains("\"id\":\"abc\""));
2059 }
2060
2061 #[test]
2062 fn parse_error_for_invalid_json() {
2063 let rt = make_runtime();
2064 let resp = handle(&rt, "not json {");
2065 assert!(resp.contains("\"code\":\"PARSE_ERROR\""));
2066 assert!(resp.contains("\"id\":null"));
2067 }
2068
2069 #[test]
2070 fn invalid_request_when_method_missing() {
2071 let rt = make_runtime();
2072 let resp = handle(&rt, r#"{"jsonrpc":"2.0","id":1,"params":{}}"#);
2073 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
2074 }
2075
2076 #[test]
2077 fn unknown_method_is_invalid_request() {
2078 let rt = make_runtime();
2079 let resp = handle(
2080 &rt,
2081 r#"{"jsonrpc":"2.0","id":1,"method":"frobnicate","params":{}}"#,
2082 );
2083 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
2084 assert!(resp.contains("frobnicate"));
2085 }
2086
2087 #[test]
2088 fn invalid_params_when_query_sql_missing() {
2089 let rt = make_runtime();
2090 let resp = handle(
2091 &rt,
2092 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{}}"#,
2093 );
2094 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
2095 }
2096
2097 #[test]
2098 fn close_method_marks_response_for_shutdown() {
2099 let rt = make_runtime();
2100 let resp = handle(
2101 &rt,
2102 r#"{"jsonrpc":"2.0","id":1,"method":"close","params":{}}"#,
2103 );
2104 assert!(resp.contains("\"__close__\":true"));
2105 }
2106
2107 #[test]
2108 fn query_with_int_text_params_round_trips() {
2109 let rt = make_runtime();
2110 let _ = handle(
2111 &rt,
2112 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE p (id INTEGER, name TEXT)"}}"#,
2113 );
2114 let _ = handle(
2115 &rt,
2116 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (1, 'Alice')"}}"#,
2117 );
2118 let _ = handle(
2119 &rt,
2120 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (2, 'Bob')"}}"#,
2121 );
2122 let resp = handle(
2123 &rt,
2124 r#"{"jsonrpc":"2.0","id":4,"method":"query","params":{"sql":"SELECT * FROM p WHERE id = $1 AND name = $2","params":[1,"Alice"]}}"#,
2125 );
2126 assert!(resp.contains("\"Alice\""), "got: {resp}");
2127 assert!(!resp.contains("\"Bob\""), "got: {resp}");
2128 }
2129
2130 #[test]
2131 fn query_with_question_params_covers_select_insert_update_delete() {
2132 let rt = make_runtime();
2133 let create = handle(
2134 &rt,
2135 &query_request(1, "CREATE TABLE qp (id INTEGER, name TEXT)"),
2136 );
2137 assert!(!create.contains("\"error\""), "got: {create}");
2138
2139 let inserted = handle(
2140 &rt,
2141 &query_request_with_params(
2142 2,
2143 "INSERT INTO qp (id, name) VALUES (?, ?)",
2144 vec![json!(1), json!("O'Reilly")],
2145 ),
2146 );
2147 assert!(inserted.contains("\"affected\":1"), "got: {inserted}");
2148
2149 let selected = handle(
2150 &rt,
2151 &query_request_with_params(3, "SELECT name FROM qp WHERE id = ?", vec![json!(1)]),
2152 );
2153 let rows = result_rows(&selected);
2154 assert_eq!(rows.len(), 1, "got: {selected}");
2155 assert_eq!(
2156 rows[0].get("name").and_then(Value::as_str),
2157 Some("O'Reilly")
2158 );
2159
2160 let selected_numbered = handle(
2161 &rt,
2162 &query_request_with_params(
2163 4,
2164 "SELECT name FROM qp WHERE name = ?1 AND id = ?2",
2165 vec![json!("O'Reilly"), json!(1)],
2166 ),
2167 );
2168 assert_eq!(
2169 result_rows(&selected_numbered).len(),
2170 1,
2171 "got: {selected_numbered}"
2172 );
2173
2174 let updated = handle(
2175 &rt,
2176 &query_request_with_params(
2177 5,
2178 "UPDATE qp SET name = ? WHERE id = ?",
2179 vec![json!("Alice"), json!(1)],
2180 ),
2181 );
2182 assert!(updated.contains("\"affected\":1"), "got: {updated}");
2183
2184 let deleted = handle(
2185 &rt,
2186 &query_request_with_params(6, "DELETE FROM qp WHERE name = ?", vec![json!("Alice")]),
2187 );
2188 assert!(deleted.contains("\"affected\":1"), "got: {deleted}");
2189
2190 let remaining = handle(&rt, &query_request(7, "SELECT * FROM qp"));
2191 assert!(result_rows(&remaining).is_empty(), "got: {remaining}");
2192 }
2193
2194 #[test]
2195 fn query_with_params_insert_and_search_round_trip() {
2196 let rt = make_runtime();
2197 let insert = handle(
2198 &rt,
2199 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"INSERT INTO bun_embeddings VECTOR (dense, content) VALUES ($1, $2)","params":[[1.0,0.0],"bun vector"]}}"#,
2200 );
2201 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2202
2203 let search = handle(
2204 &rt,
2205 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SEARCH SIMILAR $1 COLLECTION bun_embeddings LIMIT 1","params":[[1.0,0.0]]}}"#,
2206 );
2207 assert!(search.contains("\"rows\""), "got: {search}");
2208 assert!(search.contains("\"score\":1"), "got: {search}");
2209 assert!(!search.contains("\"error\""), "got: {search}");
2210 }
2211
2212 #[test]
2213 fn query_with_question_vector_param_round_trips() {
2214 let rt = make_runtime();
2215 let insert = handle(
2216 &rt,
2217 &query_request_with_params(
2218 1,
2219 "INSERT INTO question_embeddings VECTOR (dense, content) VALUES (?, ?)",
2220 vec![json!([1.0, 0.0]), json!("question vector")],
2221 ),
2222 );
2223 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2224
2225 let search = handle(
2226 &rt,
2227 &query_request_with_params(
2228 2,
2229 "SEARCH SIMILAR ? COLLECTION question_embeddings LIMIT 1",
2230 vec![json!([1.0, 0.0])],
2231 ),
2232 );
2233 assert!(search.contains("\"rows\""), "got: {search}");
2234 assert!(search.contains("\"score\":1"), "got: {search}");
2235 assert!(!search.contains("\"error\""), "got: {search}");
2236 }
2237
2238 #[test]
2239 fn query_with_typed_json_rpc_params_round_trips() {
2240 let rt = make_runtime();
2241 let create = handle(
2242 &rt,
2243 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE value_params (ok BOOLEAN, score FLOAT, payload BLOB, body JSON, seen_at TIMESTAMP, ident UUID)"}}"#,
2244 );
2245 assert!(!create.contains("\"error\""), "got: {create}");
2246
2247 let insert = handle(
2248 &rt,
2249 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO value_params (ok, score, payload, body, seen_at, ident) VALUES ($1, $2, $3, $4, $5, $6)","params":[true,{"$float":"NaN"},{"$bytes":"3q2+7w=="},{"z":[1,{"a":true}],"a":null},{"$ts":"1700000000123456789"},{"$uuid":"00112233-4455-6677-8899-aabbccddeeff"}]}}"#,
2250 );
2251 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2252
2253 let selected = handle(
2254 &rt,
2255 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"SELECT * FROM value_params"}}"#,
2256 );
2257 assert!(selected.contains("\"ok\":true"), "got: {selected}");
2258 assert!(selected.contains("\"$float\":\"NaN\""), "got: {selected}");
2259 assert!(
2260 selected.contains("\"$bytes\":\"3q2+7w==\""),
2261 "got: {selected}"
2262 );
2263 assert!(
2264 selected.contains("\"body\":{\"a\":null,\"z\":[1,{\"a\":true}]}"),
2265 "got: {selected}"
2266 );
2267 assert!(
2268 selected.contains("\"$ts\":\"1700000000123456789\""),
2269 "got: {selected}"
2270 );
2271 assert!(
2272 selected.contains("\"$uuid\":\"00112233-4455-6677-8899-aabbccddeeff\""),
2273 "got: {selected}"
2274 );
2275 }
2276
2277 #[test]
2278 fn select_timeseries_tags_decodes_json_payload() {
2279 let rt = make_runtime();
2280 let create = handle(&rt, &query_request(1, "CREATE TIMESERIES ts1"));
2281 assert!(!create.contains("\"error\""), "got: {create}");
2282
2283 let insert = handle(
2284 &rt,
2285 &query_request(
2286 2,
2287 r#"INSERT INTO ts1 (metric, value, tags, timestamp) VALUES ('cpu', 85, '{"host":"a"}', 1000)"#,
2288 ),
2289 );
2290 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2291
2292 let selected = handle(&rt, &query_request(3, "SELECT tags FROM ts1"));
2293 assert!(!selected.contains("<json"), "got: {selected}");
2294 let response = json::from_str::<Value>(&selected).expect("response json");
2295 let tags = response
2296 .get("result")
2297 .and_then(|result| result.get("rows"))
2298 .and_then(Value::as_array)
2299 .and_then(|rows| rows.first())
2300 .and_then(|row| row.get("tags"))
2301 .expect("tags field");
2302 assert_eq!(tags, &json!({"host": "a"}));
2303 }
2304
2305 #[test]
2306 fn select_table_json_column_round_trips_after_single_parse() {
2307 let rt = make_runtime();
2308 let create = handle(&rt, &query_request(1, "CREATE TABLE docs (payload JSON)"));
2309 assert!(!create.contains("\"error\""), "got: {create}");
2310
2311 let original = r#"{"nested":{"items":[1,true,"x"],"object":{"k":"v"}}}"#;
2312 let insert_sql = format!("INSERT INTO docs (payload) VALUES ({original})");
2313 let insert = handle(&rt, &query_request(2, &insert_sql));
2314 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2315
2316 let selected = handle(&rt, &query_request(3, "SELECT payload FROM docs"));
2317 assert!(!selected.contains("<json"), "got: {selected}");
2318 let response = json::from_str::<Value>(&selected).expect("response json");
2319 let payload = response
2320 .get("result")
2321 .and_then(|result| result.get("rows"))
2322 .and_then(Value::as_array)
2323 .and_then(|rows| rows.first())
2324 .and_then(|row| row.get("payload"))
2325 .expect("payload field");
2326 let expected = json::from_str::<Value>(original).expect("expected json");
2327 assert_eq!(payload, &expected);
2328
2329 let payload_text = payload.to_string_compact();
2330 assert_eq!(
2331 json::from_str::<Value>(&payload_text).expect("single parse"),
2332 expected
2333 );
2334 }
2335
2336 #[test]
2337 fn select_json_corruption_falls_back_to_code_and_hex() {
2338 use crate::storage::query::unified::UnifiedResult;
2339
2340 let mut result = UnifiedResult::with_columns(vec!["payload".into()]);
2341 let mut record = UnifiedRecord::new();
2342 record.set("payload", SchemaValue::Json(b"{not json".to_vec()));
2343 result.push(record);
2344
2345 let json = query_result_to_json(&RuntimeQueryResult {
2346 query: "SELECT payload FROM docs".to_string(),
2347 mode: crate::storage::query::modes::QueryMode::Sql,
2348 statement: "select",
2349 engine: "runtime-table",
2350 result,
2351 affected_rows: 0,
2352 statement_type: "select",
2353 });
2354
2355 let payload = json
2356 .get("rows")
2357 .and_then(Value::as_array)
2358 .and_then(|rows| rows.first())
2359 .and_then(|row| row.get("payload"))
2360 .expect("payload field");
2361 assert_eq!(
2362 payload.get("code").and_then(Value::as_str),
2363 Some("INVALID_JSON")
2364 );
2365 assert_eq!(
2366 payload.get("hex").and_then(Value::as_str),
2367 Some("7b6e6f74206a736f6e")
2368 );
2369 }
2370
2371 #[test]
2372 fn json_value_to_schema_value_decodes_typed_envelopes() {
2373 let SchemaValue::Blob(bytes) = json_value_to_schema_value(&json!({ "$bytes": "AAECAw==" }))
2374 else {
2375 panic!("expected blob");
2376 };
2377 assert_eq!(bytes, vec![0, 1, 2, 3]);
2378
2379 assert_eq!(
2380 json_value_to_schema_value(&json!({ "$ts": "9223372036854775807" })),
2381 SchemaValue::Timestamp(i64::MAX)
2382 );
2383
2384 let SchemaValue::Uuid(bytes) = json_value_to_schema_value(&json!({
2385 "$uuid": "00112233-4455-6677-8899-aabbccddeeff"
2386 })) else {
2387 panic!("expected uuid");
2388 };
2389 assert_eq!(
2390 bytes,
2391 [
2392 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd,
2393 0xee, 0xff
2394 ]
2395 );
2396
2397 let SchemaValue::Float(value) =
2398 json_value_to_schema_value(&json!({ "$float": "-Infinity" }))
2399 else {
2400 panic!("expected float");
2401 };
2402 assert!(value.is_infinite() && value.is_sign_negative());
2403 }
2404
2405 #[test]
2406 fn query_with_params_arity_mismatch_rejected() {
2407 let rt = make_runtime();
2408 let _ = handle(
2409 &rt,
2410 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pa (id INTEGER)"}}"#,
2411 );
2412 let resp = handle(
2413 &rt,
2414 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pa WHERE id = $1","params":[1,2]}}"#,
2415 );
2416 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2417 }
2418
2419 #[test]
2420 fn query_with_question_params_arity_mismatch_rejected() {
2421 let rt = make_runtime();
2422 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpa (id INTEGER)"));
2423 let resp = handle(
2424 &rt,
2425 &query_request_with_params(
2426 2,
2427 "SELECT * FROM qpa WHERE id = ?",
2428 vec![json!(1), json!(2)],
2429 ),
2430 );
2431 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2432 assert!(resp.contains("SQL expects 1, got 2"), "got: {resp}");
2433 }
2434
2435 #[test]
2436 fn query_with_params_gap_rejected() {
2437 let rt = make_runtime();
2438 let _ = handle(
2439 &rt,
2440 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pg (a INTEGER, b INTEGER)"}}"#,
2441 );
2442 let resp = handle(
2443 &rt,
2444 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pg WHERE a = $1 AND b = $3","params":[1,2,3]}}"#,
2445 );
2446 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2447 }
2448
2449 #[test]
2450 fn query_with_question_numbered_gap_rejected() {
2451 let rt = make_runtime();
2452 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpg (id INTEGER)"));
2453 let resp = handle(
2454 &rt,
2455 &query_request_with_params(
2456 2,
2457 "SELECT * FROM qpg WHERE id = ?2",
2458 vec![json!(1), json!(2)],
2459 ),
2460 );
2461 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2462 assert!(resp.contains("parameter $`1` is missing"), "got: {resp}");
2463 }
2464
2465 #[test]
2466 fn query_with_question_params_type_mismatch_names_slot() {
2467 let rt = make_runtime();
2468 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpt (id INTEGER)"));
2469 let resp = handle(
2470 &rt,
2471 &query_request_with_params(
2472 2,
2473 "INSERT INTO qpt (id) VALUES (?)",
2474 vec![json!("not-an-integer")],
2475 ),
2476 );
2477 assert!(resp.contains("\"QUERY_ERROR\""), "got: {resp}");
2478 assert!(resp.contains("id"), "got: {resp}");
2479 assert!(resp.contains("integer"), "got: {resp}");
2480 }
2481
2482 #[test]
2483 fn query_select_one_returns_rows() {
2484 let rt = make_runtime();
2485 let resp = handle(
2486 &rt,
2487 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"SELECT 1 AS one"}}"#,
2488 );
2489 assert!(resp.contains("\"result\""));
2490 assert!(!resp.contains("\"error\""));
2491 }
2492
2493 #[test]
2494 fn ask_query_result_uses_canonical_envelope() {
2495 use crate::storage::query::unified::UnifiedResult;
2496
2497 let mut result = UnifiedResult::with_columns(vec![
2498 "answer".into(),
2499 "provider".into(),
2500 "model".into(),
2501 "prompt_tokens".into(),
2502 "completion_tokens".into(),
2503 "sources_count".into(),
2504 "sources_flat".into(),
2505 "citations".into(),
2506 "validation".into(),
2507 ]);
2508 let mut record = UnifiedRecord::new();
2509 record.set("answer", SchemaValue::text("Deploy failed [^1]."));
2510 record.set("provider", SchemaValue::text("openai"));
2511 record.set("model", SchemaValue::text("gpt-4o-mini"));
2512 record.set("prompt_tokens", SchemaValue::Integer(11));
2513 record.set("completion_tokens", SchemaValue::Integer(7));
2514 record.set(
2515 "sources_flat",
2516 SchemaValue::Json(
2517 br#"[{"urn":"urn:reddb:row:deployments:1","kind":"row","collection":"deployments","id":"1"}]"#.to_vec(),
2518 ),
2519 );
2520 record.set(
2521 "citations",
2522 SchemaValue::Json(br#"[{"marker":1,"urn":"urn:reddb:row:deployments:1"}]"#.to_vec()),
2523 );
2524 record.set(
2525 "validation",
2526 SchemaValue::Json(br#"{"ok":true,"warnings":[],"errors":[]}"#.to_vec()),
2527 );
2528 result.push(record);
2529
2530 let json = query_result_to_json(&RuntimeQueryResult {
2531 query: "ASK 'why did deploy fail?'".to_string(),
2532 mode: crate::storage::query::modes::QueryMode::Sql,
2533 statement: "ask",
2534 engine: "runtime-ai",
2535 result,
2536 affected_rows: 0,
2537 statement_type: "select",
2538 });
2539
2540 assert_eq!(
2541 json.get("answer").and_then(Value::as_str),
2542 Some("Deploy failed [^1].")
2543 );
2544 assert_eq!(json.get("cache_hit").and_then(Value::as_bool), Some(false));
2545 assert_eq!(json.get("cost_usd").and_then(Value::as_f64), Some(0.0));
2546 assert_eq!(json.get("mode").and_then(Value::as_str), Some("strict"));
2547 assert_eq!(json.get("retry_count").and_then(Value::as_u64), Some(0));
2548 assert!(
2549 json.get("rows").is_none(),
2550 "ASK envelope must not be row-wrapped: {json}"
2551 );
2552 assert!(
2553 json.get("sources_flat")
2554 .and_then(Value::as_array)
2555 .is_some_and(|sources| sources.len() == 1
2556 && sources[0].get("payload").and_then(Value::as_str).is_some()),
2557 "sources_flat must be a parsed array: {json}"
2558 );
2559 assert!(
2560 json.get("citations")
2561 .and_then(Value::as_array)
2562 .is_some_and(|citations| citations.len() == 1),
2563 "citations must be a parsed array: {json}"
2564 );
2565 assert_eq!(
2566 json.get("validation")
2567 .and_then(|v| v.get("ok"))
2568 .and_then(Value::as_bool),
2569 Some(true)
2570 );
2571 }
2572
2573 #[test]
2578 fn tx_begin_returns_tx_id_and_isolation() {
2579 let rt = make_runtime();
2580 with_session(&rt, |call, _| {
2581 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2582 assert!(resp.contains("\"tx_id\":1"));
2583 assert!(resp.contains("\"isolation\":\"read_committed_deferred\""));
2584 assert!(!resp.contains("\"error\""));
2585 });
2586 }
2587
2588 #[test]
2589 fn tx_begin_twice_returns_already_open() {
2590 let rt = make_runtime();
2591 with_session(&rt, |call, _| {
2592 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2593 let resp = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
2594 assert!(resp.contains("\"code\":\"TX_ALREADY_OPEN\""));
2595 });
2596 }
2597
2598 #[test]
2599 fn tx_commit_without_begin_returns_no_tx_open() {
2600 let rt = make_runtime();
2601 with_session(&rt, |call, _| {
2602 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.commit","params":null}"#);
2603 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
2604 });
2605 }
2606
2607 #[test]
2608 fn tx_rollback_without_begin_returns_no_tx_open() {
2609 let rt = make_runtime();
2610 with_session(&rt, |call, _| {
2611 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.rollback","params":null}"#);
2612 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
2613 });
2614 }
2615
2616 #[test]
2617 fn insert_inside_tx_returns_pending_envelope() {
2618 let rt = make_runtime();
2619 let _ = handle(
2621 &rt,
2622 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE users (name TEXT)"}}"#,
2623 );
2624 with_session(&rt, |call, _| {
2625 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2626 let resp = call(
2627 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"users","payload":{"name":"alice"}}}"#,
2628 );
2629 assert!(resp.contains("\"pending\":true"));
2630 assert!(resp.contains("\"tx_id\":1"));
2631 assert!(resp.contains("\"affected\":0"));
2632 });
2633 }
2634
2635 #[test]
2636 fn begin_insert_rollback_does_not_persist() {
2637 let rt = make_runtime();
2638 let _ = handle(
2639 &rt,
2640 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u (name TEXT)"}}"#,
2641 );
2642 with_session(&rt, |call, _| {
2643 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2644 let _ = call(
2645 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u","payload":{"name":"ghost"}}}"#,
2646 );
2647 let rollback = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
2648 assert!(rollback.contains("\"ops_discarded\":1"));
2649 assert!(rollback.contains("\"tx_id\":1"));
2650 });
2651 let resp = handle(
2653 &rt,
2654 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u"}}"#,
2655 );
2656 assert!(!resp.contains("\"ghost\""));
2657 }
2658
2659 #[test]
2660 fn begin_insert_commit_persists() {
2661 let rt = make_runtime();
2662 let _ = handle(
2663 &rt,
2664 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u2 (name TEXT)"}}"#,
2665 );
2666 with_session(&rt, |call, _| {
2667 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2668 let _ = call(
2669 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u2","payload":{"name":"alice"}}}"#,
2670 );
2671 let _ = call(
2672 r#"{"jsonrpc":"2.0","id":3,"method":"insert","params":{"collection":"u2","payload":{"name":"bob"}}}"#,
2673 );
2674 let commit = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.commit","params":null}"#);
2675 assert!(commit.contains("\"ops_replayed\":2"));
2676 assert!(!commit.contains("\"error\""));
2677 });
2678 let resp = handle(
2679 &rt,
2680 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u2"}}"#,
2681 );
2682 assert!(resp.contains("\"alice\""));
2683 assert!(resp.contains("\"bob\""));
2684 }
2685
2686 #[test]
2687 fn bulk_insert_inside_tx_buffers_everything() {
2688 let rt = make_runtime();
2689 let _ = handle(
2690 &rt,
2691 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u3 (name TEXT)"}}"#,
2692 );
2693 with_session(&rt, |call, _| {
2694 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2695 let resp = call(
2696 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"u3","payloads":[{"name":"a"},{"name":"b"},{"name":"c"}]}}"#,
2697 );
2698 assert!(resp.contains("\"buffered\":3"));
2699 assert!(resp.contains("\"pending\":true"));
2700 assert!(resp.contains("\"affected\":0"));
2701
2702 let commit = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
2703 assert!(commit.contains("\"ops_replayed\":3"));
2704 });
2705 }
2706
2707 #[test]
2708 fn bulk_insert_chunks_at_internal_500_row_limit() {
2709 assert_eq!(bulk_insert_chunk_count(0), 0);
2710 assert_eq!(bulk_insert_chunk_count(1), 1);
2711 assert_eq!(bulk_insert_chunk_count(500), 1);
2712 assert_eq!(bulk_insert_chunk_count(501), 2);
2713 assert_eq!(bulk_insert_chunk_count(1000), 2);
2714 assert_eq!(bulk_insert_chunk_count(1001), 3);
2715 }
2716
2717 proptest! {
2718 #![proptest_config(ProptestConfig {
2719 cases: 12,
2720 ..ProptestConfig::default()
2721 })]
2722
2723 #[test]
2724 fn bulk_insert_matches_sequential_insert_state(
2725 names in proptest::collection::vec("[a-z]{1,8}", 1usize..20)
2726 ) {
2727 let rt = make_runtime();
2728 let payloads = names
2729 .iter()
2730 .map(|name| format!(r#"{{"name":"{name}","kind":"bulk"}}"#))
2731 .collect::<Vec<_>>();
2732 let payload_array = payloads.join(",");
2733
2734 let bulk = handle(
2735 &rt,
2736 &format!(
2737 r#"{{"jsonrpc":"2.0","id":1,"method":"bulk_insert","params":{{"collection":"bulk_prop","payloads":[{payload_array}]}}}}"#
2738 ),
2739 );
2740 let bulk_result = json::from_str::<Value>(&bulk).expect("bulk json");
2741 let bulk_ids = bulk_result
2742 .get("result")
2743 .and_then(|result| result.get("ids"))
2744 .and_then(Value::as_array)
2745 .expect("bulk ids");
2746 prop_assert_eq!(bulk_ids.len(), names.len());
2747
2748 for (index, payload) in payloads.iter().enumerate() {
2749 let insert = handle(
2750 &rt,
2751 &format!(
2752 r#"{{"jsonrpc":"2.0","id":{},"method":"insert","params":{{"collection":"seq_prop","payload":{payload}}}}}"#,
2753 index + 10
2754 ),
2755 );
2756 let insert_result = json::from_str::<Value>(&insert).expect("insert json");
2757 prop_assert!(
2758 insert_result
2759 .get("result")
2760 .and_then(|result| result.get("id"))
2761 .is_some(),
2762 "insert response missing id: {insert}"
2763 );
2764 }
2765
2766 let bulk_rows = result_name_kind(&handle(
2767 &rt,
2768 r#"{"jsonrpc":"2.0","id":99,"method":"query","params":{"sql":"SELECT name, kind FROM bulk_prop ORDER BY red_entity_id"}}"#,
2769 ));
2770 let seq_rows = result_name_kind(&handle(
2771 &rt,
2772 r#"{"jsonrpc":"2.0","id":100,"method":"query","params":{"sql":"SELECT name, kind FROM seq_prop ORDER BY red_entity_id"}}"#,
2773 ));
2774 prop_assert_eq!(bulk_rows, seq_rows);
2775 }
2776
2777 #[test]
2778 fn question_param_select_matches_inlined_literal(value in json_scalar_param()) {
2779 let rt = make_runtime();
2780 let bound = handle(
2781 &rt,
2782 &query_request_with_params(1, "SELECT ? AS v", vec![value.clone()]),
2783 );
2784 let inline_sql = format!("SELECT {} AS v", sql_literal_for_json(&value));
2785 let inlined = handle(&rt, &query_request(2, &inline_sql));
2786 prop_assert_eq!(
2787 result_rows(&bound),
2788 result_rows(&inlined),
2789 "bound={}, inlined={}",
2790 bound,
2791 inlined
2792 );
2793 }
2794 }
2795
2796 #[test]
2797 fn bulk_insert_graph_nodes_accepts_flat_rows_and_returns_ids() {
2798 let rt = make_runtime();
2799 create_graph_collection(&rt, "social");
2800
2801 let resp = handle(
2802 &rt,
2803 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"social","payloads":[{"label":"User","name":"alice"},{"label":"User","name":"bob"}]}}"#,
2804 );
2805 let envelope: Value = json::from_str(&resp).expect("json response");
2806 let result = envelope.get("result").expect("result");
2807 assert_eq!(result.get("affected").and_then(Value::as_u64), Some(2));
2808 assert_eq!(
2809 result
2810 .get("ids")
2811 .and_then(Value::as_array)
2812 .map(|ids| ids.len()),
2813 Some(2)
2814 );
2815
2816 let query = handle(
2817 &rt,
2818 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"MATCH (n:User) RETURN n.name"}}"#,
2819 );
2820 assert!(query.contains("\"alice\""), "got: {query}");
2821 assert!(query.contains("\"bob\""), "got: {query}");
2822 }
2823
2824 #[test]
2825 fn bulk_insert_graph_edges_accepts_flat_rows_and_returns_ids() {
2826 let rt = make_runtime();
2827 create_graph_collection(&rt, "network");
2828 let nodes = handle(
2829 &rt,
2830 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"network","payloads":[{"label":"Host","name":"app"},{"label":"Host","name":"db"}]}}"#,
2831 );
2832 let envelope: Value = json::from_str(&nodes).expect("node response");
2833 let ids = envelope
2834 .get("result")
2835 .and_then(|r| r.get("ids"))
2836 .and_then(Value::as_array)
2837 .expect("node ids");
2838 let from = ids[0].as_u64().expect("from id");
2839 let to = ids[1].as_u64().expect("to id");
2840
2841 let resp = handle(
2842 &rt,
2843 &format!(
2844 r#"{{"jsonrpc":"2.0","id":3,"method":"bulk_insert","params":{{"collection":"network","payloads":[{{"label":"connects","from":{from},"to":{to},"weight":0.5,"role":"primary"}}]}}}}"#
2845 ),
2846 );
2847 let envelope: Value = json::from_str(&resp).expect("edge response");
2848 let result = envelope.get("result").expect("result");
2849 assert_eq!(result.get("affected").and_then(Value::as_u64), Some(1));
2850 assert_eq!(
2851 result
2852 .get("ids")
2853 .and_then(Value::as_array)
2854 .map(|ids| ids.len()),
2855 Some(1)
2856 );
2857 }
2858
2859 #[test]
2860 fn delete_inside_tx_is_buffered() {
2861 let rt = make_runtime();
2862 let _ = handle(
2864 &rt,
2865 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u4 (name TEXT)"}}"#,
2866 );
2867 let _ = handle(
2868 &rt,
2869 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO u4 (name) VALUES ('keep')"}}"#,
2870 );
2871 with_session(&rt, |call, _| {
2872 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2873 let resp = call(
2874 r#"{"jsonrpc":"2.0","id":2,"method":"delete","params":{"collection":"u4","id":"1"}}"#,
2875 );
2876 assert!(resp.contains("\"pending\":true"));
2877 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
2878 });
2879 let resp = handle(
2881 &rt,
2882 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u4"}}"#,
2883 );
2884 assert!(resp.contains("\"keep\""));
2885 }
2886
2887 #[test]
2888 fn close_with_open_tx_auto_rollbacks() {
2889 let rt = make_runtime();
2890 let _ = handle(
2891 &rt,
2892 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u5 (name TEXT)"}}"#,
2893 );
2894 with_session(&rt, |call, _| {
2895 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2896 let _ = call(
2897 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u5","payload":{"name":"ghost"}}}"#,
2898 );
2899 let close = call(r#"{"jsonrpc":"2.0","id":3,"method":"close","params":null}"#);
2900 assert!(close.contains("\"__close__\":true"));
2901 assert!(!close.contains("\"error\""));
2902 });
2903 let resp = handle(
2904 &rt,
2905 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u5"}}"#,
2906 );
2907 assert!(!resp.contains("\"ghost\""));
2908 }
2909
2910 fn seed_numbers_table(rt: &RedDBRuntime, table: &str, count: u32) {
2915 let _ = handle(
2916 rt,
2917 &format!(
2918 r#"{{"jsonrpc":"2.0","id":1,"method":"query","params":{{"sql":"CREATE TABLE {table} (n INTEGER)"}}}}"#,
2919 ),
2920 );
2921 for i in 0..count {
2922 let _ = handle(
2923 rt,
2924 &format!(
2925 r#"{{"jsonrpc":"2.0","id":2,"method":"query","params":{{"sql":"INSERT INTO {table} (n) VALUES ({i})"}}}}"#,
2926 ),
2927 );
2928 }
2929 }
2930
2931 #[test]
2932 fn cursor_open_returns_id_columns_and_total() {
2933 let rt = make_runtime();
2934 seed_numbers_table(&rt, "nums1", 3);
2935 with_session(&rt, |call, _| {
2936 let resp = call(
2937 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums1"}}"#,
2938 );
2939 assert!(resp.contains("\"cursor_id\":1"));
2940 assert!(resp.contains("\"total_rows\":3"));
2941 assert!(resp.contains("\"columns\""));
2942 assert!(!resp.contains("\"error\""));
2943 });
2944 }
2945
2946 #[test]
2947 fn cursor_next_chunks_rows_and_signals_done() {
2948 let rt = make_runtime();
2949 seed_numbers_table(&rt, "nums2", 5);
2950 with_session(&rt, |call, _| {
2951 let _ = call(
2952 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums2"}}"#,
2953 );
2954 let first = call(
2955 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2956 );
2957 assert!(first.contains("\"done\":false"));
2958 assert!(first.contains("\"remaining\":3"));
2959
2960 let second = call(
2961 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2962 );
2963 assert!(second.contains("\"done\":false"));
2964 assert!(second.contains("\"remaining\":1"));
2965
2966 let third = call(
2967 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2968 );
2969 assert!(third.contains("\"done\":true"));
2970 assert!(third.contains("\"remaining\":0"));
2971 });
2972 }
2973
2974 #[test]
2975 fn cursor_auto_drops_when_exhausted() {
2976 let rt = make_runtime();
2977 seed_numbers_table(&rt, "nums3", 2);
2978 with_session(&rt, |call, _| {
2979 let _ = call(
2980 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums3"}}"#,
2981 );
2982 let _ = call(
2983 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
2984 );
2985 let resp = call(
2988 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
2989 );
2990 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
2991 });
2992 }
2993
2994 #[test]
2995 fn cursor_close_removes_it() {
2996 let rt = make_runtime();
2997 seed_numbers_table(&rt, "nums4", 3);
2998 with_session(&rt, |call, _| {
2999 let _ = call(
3000 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums4"}}"#,
3001 );
3002 let close =
3003 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.close","params":{"cursor_id":1}}"#);
3004 assert!(close.contains("\"closed\":true"));
3005 let after = call(
3006 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3007 );
3008 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3009 });
3010 }
3011
3012 #[test]
3013 fn cursor_close_unknown_errors() {
3014 let rt = make_runtime();
3015 with_session(&rt, |call, _| {
3016 let resp = call(
3017 r#"{"jsonrpc":"2.0","id":1,"method":"query.close","params":{"cursor_id":9999}}"#,
3018 );
3019 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3020 });
3021 }
3022
3023 #[test]
3024 fn cursor_next_without_cursor_id_errors() {
3025 let rt = make_runtime();
3026 with_session(&rt, |call, _| {
3027 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"query.next","params":{}}"#);
3028 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
3029 });
3030 }
3031
3032 #[test]
3033 fn cursor_default_batch_size_returns_all_when_smaller_than_default() {
3034 let rt = make_runtime();
3035 seed_numbers_table(&rt, "nums5", 7);
3036 with_session(&rt, |call, _| {
3037 let _ = call(
3038 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums5"}}"#,
3039 );
3040 let resp =
3042 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1}}"#);
3043 assert!(resp.contains("\"done\":true"));
3044 assert!(resp.contains("\"remaining\":0"));
3045 });
3046 }
3047
3048 #[test]
3049 fn close_method_drops_open_cursors() {
3050 let rt = make_runtime();
3051 seed_numbers_table(&rt, "nums6", 3);
3052 with_session(&rt, |call, _| {
3055 let _ = call(
3056 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums6"}}"#,
3057 );
3058 let close = call(r#"{"jsonrpc":"2.0","id":2,"method":"close","params":null}"#);
3059 assert!(close.contains("\"__close__\":true"));
3060 let after = call(
3062 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3063 );
3064 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3065 });
3066 }
3067
3068 #[test]
3069 fn cursor_independent_of_transaction_state() {
3070 let rt = make_runtime();
3071 seed_numbers_table(&rt, "nums7", 4);
3072 with_session(&rt, |call, _| {
3073 let _ = call(
3075 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums7"}}"#,
3076 );
3077 let _ = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
3078 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
3079 let resp = call(
3080 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3081 );
3082 assert!(resp.contains("\"done\":true"));
3083 assert!(!resp.contains("\"error\""));
3084 });
3085 }
3086
3087 #[test]
3088 fn second_tx_after_commit_gets_fresh_id() {
3089 let rt = make_runtime();
3090 let _ = handle(
3091 &rt,
3092 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u6 (name TEXT)"}}"#,
3093 );
3094 with_session(&rt, |call, _| {
3095 let first = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
3096 assert!(first.contains("\"tx_id\":1"));
3097 let _ = call(
3098 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u6","payload":{"name":"x"}}}"#,
3099 );
3100 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
3101
3102 let second = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.begin","params":null}"#);
3103 assert!(second.contains("\"tx_id\":2"));
3104 let _ = call(r#"{"jsonrpc":"2.0","id":5,"method":"tx.rollback","params":null}"#);
3105 });
3106 }
3107
3108 #[test]
3109 fn prepare_and_execute_prepared_statement() {
3110 let rt = make_runtime();
3111 let _ = handle(
3113 &rt,
3114 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE ps_test (n INTEGER)"}}"#,
3115 );
3116 let _ = handle(
3117 &rt,
3118 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO ps_test (n) VALUES (42)"}}"#,
3119 );
3120
3121 with_session(&rt, |call, _| {
3122 let prep = call(
3124 r#"{"jsonrpc":"2.0","id":3,"method":"prepare","params":{"sql":"SELECT n FROM ps_test WHERE n = 42"}}"#,
3125 );
3126 assert!(prep.contains("\"prepared_id\""), "prepare response: {prep}");
3127
3128 let id: u64 = {
3130 let v: crate::json::Value = crate::json::from_str(&prep).expect("json");
3131 let result = v.get("result").expect("result");
3132 result
3133 .get("prepared_id")
3134 .and_then(|n| n.as_f64())
3135 .expect("prepared_id") as u64
3136 };
3137
3138 let exec = call(&format!(
3140 r#"{{"jsonrpc":"2.0","id":4,"method":"execute_prepared","params":{{"prepared_id":{id},"binds":[42]}}}}"#
3141 ));
3142 assert!(
3144 exec.contains("\"rows\""),
3145 "execute_prepared response: {exec}"
3146 );
3147 assert!(exec.contains("42"), "expected row with n=42 in: {exec}");
3148 });
3149 }
3150}