1use std::io::{BufRead, BufReader, Stdin, Write};
19use std::panic::AssertUnwindSafe;
20
21use tokio::sync::Mutex as AsyncMutex;
22
23use crate::application::entity::{CreateRowInput, CreateRowsBatchInput};
24use crate::application::ports::RuntimeEntityPort;
25use crate::json::{self as json, Value};
26use crate::runtime::{RedDBRuntime, RuntimeQueryResult};
27use crate::storage::query::unified::UnifiedRecord;
28use crate::storage::schema::Value as SchemaValue;
29use reddb_client_connector::RedDBClient;
30
31enum Backend<'a> {
44 Local(&'a RedDBRuntime),
45 Remote(Box<RemoteBackend<'a>>),
46}
47
48struct RemoteBackend<'a> {
49 client: AsyncMutex<RedDBClient>,
50 tokio_rt: &'a tokio::runtime::Runtime,
51}
52
53pub const PROTOCOL_VERSION: &str = "1.0";
55const STDIO_BULK_INSERT_CHUNK_ROWS: usize = 500;
56
57pub mod error_code {
59 pub const PARSE_ERROR: &str = "PARSE_ERROR";
60 pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
61 pub const INVALID_PARAMS: &str = "INVALID_PARAMS";
62 pub const QUERY_ERROR: &str = "QUERY_ERROR";
63 pub const NOT_FOUND: &str = "NOT_FOUND";
64 pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
65 pub const TX_ALREADY_OPEN: &str = "TX_ALREADY_OPEN";
68 pub const NO_TX_OPEN: &str = "NO_TX_OPEN";
71 pub const TX_REPLAY_FAILED: &str = "TX_REPLAY_FAILED";
75 pub const TX_NOT_SUPPORTED_REMOTE: &str = "TX_NOT_SUPPORTED_REMOTE";
77 pub const CURSOR_NOT_FOUND: &str = "CURSOR_NOT_FOUND";
81 pub const CURSOR_LIMIT_EXCEEDED: &str = "CURSOR_LIMIT_EXCEEDED";
83}
84
85pub(crate) const MAX_CURSORS_PER_SESSION: usize = 64;
89pub(crate) const DEFAULT_CURSOR_BATCH_SIZE: usize = 100;
93pub(crate) const MAX_CURSOR_BATCH_SIZE: usize = 10_000;
96
97struct StdioPreparedStatement {
120 shape: crate::storage::query::ast::QueryExpr,
121 parameter_count: usize,
122}
123
124pub(crate) struct Session {
125 next_tx_id: u64,
126 current_tx: Option<OpenTx>,
127 next_cursor_id: u64,
128 cursors: std::collections::HashMap<u64, Cursor>,
129 next_prepared_id: u64,
131 prepared: std::collections::HashMap<u64, StdioPreparedStatement>,
133}
134
135impl Session {
136 pub(crate) fn new() -> Self {
137 Self {
138 next_tx_id: 1,
139 current_tx: None,
140 next_cursor_id: 1,
141 cursors: std::collections::HashMap::new(),
142 next_prepared_id: 1,
143 prepared: std::collections::HashMap::new(),
144 }
145 }
146
147 fn open_tx(&mut self) -> Result<u64, (&'static str, String)> {
148 if let Some(tx) = &self.current_tx {
149 return Err((
150 error_code::TX_ALREADY_OPEN,
151 format!("transaction {} already open in this session", tx.tx_id),
152 ));
153 }
154 let tx_id = self.next_tx_id;
155 self.next_tx_id = self.next_tx_id.saturating_add(1);
156 self.current_tx = Some(OpenTx {
157 tx_id,
158 write_set: Vec::new(),
159 });
160 Ok(tx_id)
161 }
162
163 fn take_tx(&mut self) -> Option<OpenTx> {
164 self.current_tx.take()
165 }
166
167 fn current_tx_mut(&mut self) -> Option<&mut OpenTx> {
168 self.current_tx.as_mut()
169 }
170
171 #[allow(dead_code)]
172 fn has_tx(&self) -> bool {
173 self.current_tx.is_some()
174 }
175
176 fn insert_cursor(&mut self, cursor: Cursor) -> Result<u64, (&'static str, String)> {
179 if self.cursors.len() >= MAX_CURSORS_PER_SESSION {
180 return Err((
181 error_code::CURSOR_LIMIT_EXCEEDED,
182 format!(
183 "session already holds {} cursors (max {}) — close some before opening new ones",
184 self.cursors.len(),
185 MAX_CURSORS_PER_SESSION
186 ),
187 ));
188 }
189 let id = self.next_cursor_id;
190 self.next_cursor_id = self.next_cursor_id.saturating_add(1);
191 let mut cursor = cursor;
192 cursor.cursor_id = id;
193 self.cursors.insert(id, cursor);
194 Ok(id)
195 }
196
197 fn cursor_mut(&mut self, id: u64) -> Option<&mut Cursor> {
198 self.cursors.get_mut(&id)
199 }
200
201 fn drop_cursor(&mut self, id: u64) -> Option<Cursor> {
202 self.cursors.remove(&id)
203 }
204
205 fn clear_cursors(&mut self) {
206 self.cursors.clear();
207 }
208}
209
210impl Default for Session {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216struct OpenTx {
218 tx_id: u64,
219 write_set: Vec<PendingSql>,
220}
221
222enum PendingSql {
226 Insert(String),
227 Delete(String),
228 #[allow(dead_code)] Update(String),
230}
231
232impl PendingSql {
233 fn sql(&self) -> &str {
234 match self {
235 PendingSql::Insert(s) | PendingSql::Delete(s) | PendingSql::Update(s) => s,
236 }
237 }
238}
239
240pub(crate) struct Cursor {
252 cursor_id: u64,
253 columns: Vec<String>,
254 rows: Vec<UnifiedRecord>,
255 position: usize,
256}
257
258impl Cursor {
259 fn new(columns: Vec<String>, rows: Vec<UnifiedRecord>) -> Self {
260 Self {
261 cursor_id: 0, columns,
263 rows,
264 position: 0,
265 }
266 }
267
268 fn total(&self) -> usize {
269 self.rows.len()
270 }
271
272 fn remaining(&self) -> usize {
273 self.rows.len().saturating_sub(self.position)
274 }
275
276 fn is_exhausted(&self) -> bool {
277 self.position >= self.rows.len()
278 }
279
280 fn take_batch(&mut self, batch_size: usize) -> &[UnifiedRecord] {
283 let end = (self.position + batch_size).min(self.rows.len());
284 let slice = &self.rows[self.position..end];
285 self.position = end;
286 slice
287 }
288}
289
290pub fn run(runtime: &RedDBRuntime) -> i32 {
296 run_with_io(runtime, std::io::stdin(), &mut std::io::stdout())
297}
298
299pub fn run_remote(endpoint: &str, token: Option<String>) -> i32 {
306 let tokio_rt = match tokio::runtime::Builder::new_current_thread()
307 .enable_all()
308 .build()
309 {
310 Ok(rt) => rt,
311 Err(e) => {
312 tracing::error!(err = %e, "rpc: failed to build tokio runtime");
313 return 1;
314 }
315 };
316 let client = match tokio_rt.block_on(RedDBClient::connect(endpoint, token)) {
317 Ok(c) => c,
318 Err(e) => {
319 tracing::error!(endpoint, err = %e, "rpc: failed to connect");
320 return 1;
321 }
322 };
323 let backend = Backend::Remote(Box::new(RemoteBackend {
324 client: AsyncMutex::new(client),
325 tokio_rt: &tokio_rt,
326 }));
327 run_backend(&backend, std::io::stdin(), &mut std::io::stdout())
328}
329
330pub fn run_with_io<W: Write>(runtime: &RedDBRuntime, stdin: Stdin, stdout: &mut W) -> i32 {
332 run_backend(&Backend::Local(runtime), stdin, stdout)
333}
334
335static STDIO_SESSION_CONN_ID: std::sync::atomic::AtomicU64 =
342 std::sync::atomic::AtomicU64::new(1_000_000);
343
344fn next_stdio_conn_id() -> u64 {
345 STDIO_SESSION_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
346}
347
348fn run_backend<W: Write>(backend: &Backend<'_>, stdin: Stdin, stdout: &mut W) -> i32 {
349 let reader = BufReader::new(stdin.lock());
350 let mut session = Session::new();
351 let conn_id = next_stdio_conn_id();
355 crate::runtime::impl_core::set_current_connection_id(conn_id);
356 for line_result in reader.lines() {
357 let line = match line_result {
358 Ok(l) => l,
359 Err(e) => {
360 let _ = writeln!(
361 stdout,
362 "{}",
363 error_response(&Value::Null, error_code::INTERNAL_ERROR, &e.to_string())
364 );
365 let _ = stdout.flush();
366 return 1;
367 }
368 };
369 if line.trim().is_empty() {
370 continue;
371 }
372 let response = handle_line(backend, &mut session, &line);
373 if writeln!(stdout, "{}", response).is_err() || stdout.flush().is_err() {
374 return 1;
375 }
376 if response.contains("\"__close__\":true") {
377 return 0;
378 }
379 }
380 let _ = session.take_tx();
384 crate::runtime::impl_core::clear_current_connection_id();
385 0
386}
387
388fn handle_line(backend: &Backend<'_>, session: &mut Session, line: &str) -> String {
392 let parsed: Value = match json::from_str(line) {
393 Ok(v) => v,
394 Err(err) => {
395 return error_response(
396 &Value::Null,
397 error_code::PARSE_ERROR,
398 &format!("invalid JSON: {err}"),
399 );
400 }
401 };
402
403 let id = parsed.get("id").cloned().unwrap_or(Value::Null);
404
405 let method = match parsed.get("method").and_then(Value::as_str) {
406 Some(m) => m.to_string(),
407 None => {
408 return error_response(&id, error_code::INVALID_REQUEST, "missing 'method' field");
409 }
410 };
411
412 let params = parsed.get("params").cloned().unwrap_or(Value::Null);
413
414 let dispatch = std::panic::catch_unwind(AssertUnwindSafe(|| match backend {
415 Backend::Local(rt) => dispatch_method(rt, session, &method, ¶ms),
416 Backend::Remote(remote) => {
417 if matches!(
422 method.as_str(),
423 "tx.begin"
424 | "tx.commit"
425 | "tx.rollback"
426 | "query.open"
427 | "query.next"
428 | "query.close"
429 ) {
430 Err((
431 error_code::TX_NOT_SUPPORTED_REMOTE,
432 format!("{method} is not supported over remote gRPC yet"),
433 ))
434 } else {
435 dispatch_method_remote(&remote.client, remote.tokio_rt, &method, ¶ms)
436 }
437 }
438 }));
439
440 match dispatch {
441 Ok(Ok(result)) => success_response(&id, &result, method == "close"),
442 Ok(Err((code, msg))) => error_response(&id, code, &msg),
443 Err(_) => error_response(&id, error_code::INTERNAL_ERROR, "handler panicked (caught)"),
444 }
445}
446
447fn dispatch_method(
450 runtime: &RedDBRuntime,
451 session: &mut Session,
452 method: &str,
453 params: &Value,
454) -> Result<Value, (&'static str, String)> {
455 match method {
456 "tx.begin" => {
457 let tx_id = session.open_tx()?;
458 Ok(Value::Object(
459 [
460 ("tx_id".to_string(), Value::Number(tx_id as f64)),
461 (
462 "isolation".to_string(),
463 Value::String("read_committed_deferred".to_string()),
464 ),
465 ]
466 .into_iter()
467 .collect(),
468 ))
469 }
470
471 "tx.commit" => {
472 let tx = session.take_tx().ok_or((
473 error_code::NO_TX_OPEN,
474 "no transaction is open in this session".to_string(),
475 ))?;
476 let tx_id = tx.tx_id;
477 let op_count = tx.write_set.len();
478
479 let replay: Result<(u64, usize), (usize, String)> = (|| {
486 runtime
487 .execute_query("BEGIN")
488 .map_err(|e| (0usize, format!("BEGIN: {e}")))?;
489 let mut total_affected: u64 = 0;
490 for (idx, op) in tx.write_set.iter().enumerate() {
491 match runtime.execute_query(op.sql()) {
492 Ok(qr) => total_affected += qr.affected_rows,
493 Err(e) => {
494 let _ = runtime.execute_query("ROLLBACK");
495 return Err((idx, e.to_string()));
496 }
497 }
498 }
499 runtime
500 .execute_query("COMMIT")
501 .map_err(|e| (op_count, format!("COMMIT: {e}")))?;
502 Ok((total_affected, op_count))
503 })();
504
505 match replay {
506 Ok((affected, replayed)) => Ok(Value::Object(
507 [
508 ("tx_id".to_string(), Value::Number(tx_id as f64)),
509 ("ops_replayed".to_string(), Value::Number(replayed as f64)),
510 ("affected".to_string(), Value::Number(affected as f64)),
511 ]
512 .into_iter()
513 .collect(),
514 )),
515 Err((failed_idx, msg)) => Err((
516 error_code::TX_REPLAY_FAILED,
517 format!(
518 "tx {tx_id} replay failed at op {failed_idx}/{op_count}: {msg} \
519 (ops 0..{failed_idx} already applied and are NOT rolled back)"
520 ),
521 )),
522 }
523 }
524
525 "query.open" => {
526 let sql = params.get("sql").and_then(Value::as_str).ok_or((
527 error_code::INVALID_PARAMS,
528 "missing 'sql' string".to_string(),
529 ))?;
530 let qr = runtime
531 .execute_query(sql)
532 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
533
534 let columns: Vec<String> = qr
538 .result
539 .records
540 .first()
541 .map(|first| {
542 let mut keys: Vec<String> = first
543 .column_names()
544 .into_iter()
545 .map(|k| k.to_string())
546 .collect();
547 keys.sort();
548 keys
549 })
550 .unwrap_or_default();
551
552 let cursor = Cursor::new(columns.clone(), qr.result.records);
553 let total = cursor.total();
554 let cursor_id = session.insert_cursor(cursor)?;
555
556 Ok(Value::Object(
557 [
558 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
559 (
560 "columns".to_string(),
561 Value::Array(columns.into_iter().map(Value::String).collect()),
562 ),
563 ("total_rows".to_string(), Value::Number(total as f64)),
564 ]
565 .into_iter()
566 .collect(),
567 ))
568 }
569
570 "query.next" => {
571 let cursor_id = params
572 .get("cursor_id")
573 .and_then(|v| v.as_f64())
574 .map(|n| n as u64)
575 .ok_or((
576 error_code::INVALID_PARAMS,
577 "missing 'cursor_id' number".to_string(),
578 ))?;
579 let batch_size = params
580 .get("batch_size")
581 .and_then(|v| v.as_f64())
582 .map(|n| n as usize)
583 .unwrap_or(DEFAULT_CURSOR_BATCH_SIZE)
584 .clamp(1, MAX_CURSOR_BATCH_SIZE);
585
586 let (rows, done, remaining) = {
589 let cursor = session.cursor_mut(cursor_id).ok_or((
590 error_code::CURSOR_NOT_FOUND,
591 format!("cursor {cursor_id} not found"),
592 ))?;
593 let batch = cursor.take_batch(batch_size);
594 let rows_json: Vec<Value> = batch.iter().map(record_to_json_object).collect();
595 (rows_json, cursor.is_exhausted(), cursor.remaining())
596 };
597
598 if done {
599 let _ = session.drop_cursor(cursor_id);
602 }
603
604 Ok(Value::Object(
605 [
606 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
607 ("rows".to_string(), Value::Array(rows)),
608 ("done".to_string(), Value::Bool(done)),
609 ("remaining".to_string(), Value::Number(remaining as f64)),
610 ]
611 .into_iter()
612 .collect(),
613 ))
614 }
615
616 "query.close" => {
617 let cursor_id = params
618 .get("cursor_id")
619 .and_then(|v| v.as_f64())
620 .map(|n| n as u64)
621 .ok_or((
622 error_code::INVALID_PARAMS,
623 "missing 'cursor_id' number".to_string(),
624 ))?;
625 let existed = session.drop_cursor(cursor_id).is_some();
626 if !existed {
627 return Err((
628 error_code::CURSOR_NOT_FOUND,
629 format!("cursor {cursor_id} not found"),
630 ));
631 }
632 Ok(Value::Object(
633 [
634 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
635 ("closed".to_string(), Value::Bool(true)),
636 ]
637 .into_iter()
638 .collect(),
639 ))
640 }
641
642 "tx.rollback" => {
643 let tx = session.take_tx().ok_or((
644 error_code::NO_TX_OPEN,
645 "no transaction is open in this session".to_string(),
646 ))?;
647 let ops_discarded = tx.write_set.len();
648 Ok(Value::Object(
649 [
650 ("tx_id".to_string(), Value::Number(tx.tx_id as f64)),
651 (
652 "ops_discarded".to_string(),
653 Value::Number(ops_discarded as f64),
654 ),
655 ]
656 .into_iter()
657 .collect(),
658 ))
659 }
660
661 "version" => Ok(Value::Object(
662 [
663 (
664 "version".to_string(),
665 Value::String(env!("CARGO_PKG_VERSION").to_string()),
666 ),
667 (
668 "protocol".to_string(),
669 Value::String(PROTOCOL_VERSION.to_string()),
670 ),
671 ]
672 .into_iter()
673 .collect(),
674 )),
675
676 "health" => Ok(Value::Object(
677 [
678 ("ok".to_string(), Value::Bool(true)),
679 (
680 "version".to_string(),
681 Value::String(env!("CARGO_PKG_VERSION").to_string()),
682 ),
683 ]
684 .into_iter()
685 .collect(),
686 )),
687
688 "query" => {
689 let sql = params.get("sql").and_then(Value::as_str).ok_or((
690 error_code::INVALID_PARAMS,
691 "missing 'sql' string".to_string(),
692 ))?;
693
694 let bind_values: Option<Vec<SchemaValue>> = params
697 .get("params")
698 .map(|v| {
699 v.as_array()
700 .ok_or((
701 error_code::INVALID_PARAMS,
702 "'params' must be an array".to_string(),
703 ))
704 .map(|arr| arr.iter().map(json_value_to_schema_value).collect())
705 })
706 .transpose()?;
707
708 if let Some(binds) = bind_values {
709 use crate::storage::query::modes::parse_multi;
710 use crate::storage::query::user_params;
711 let parsed =
712 parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
713 let bound = user_params::bind(&parsed, &binds)
714 .map_err(|e| (error_code::INVALID_PARAMS, e.to_string()))?;
715 let qr = runtime
716 .execute_query_expr(bound)
717 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
718 return Ok(query_result_to_json(&qr));
719 }
720
721 let qr = runtime
722 .execute_query(sql)
723 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
724 Ok(query_result_to_json(&qr))
725 }
726
727 "prepare" => {
736 use crate::storage::query::modes::parse_multi;
737 use crate::storage::query::planner::shape::parameterize_query_expr;
738
739 let sql = params.get("sql").and_then(Value::as_str).ok_or((
740 error_code::INVALID_PARAMS,
741 "missing 'sql' string".to_string(),
742 ))?;
743 let parsed = parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
744 let (shape, parameter_count) = if let Some(prepared) = parameterize_query_expr(&parsed)
745 {
746 (prepared.shape, prepared.parameter_count)
747 } else {
748 (parsed, 0)
749 };
750 let id = session.next_prepared_id;
751 session.next_prepared_id = session.next_prepared_id.saturating_add(1);
752 session.prepared.insert(
753 id,
754 StdioPreparedStatement {
755 shape,
756 parameter_count,
757 },
758 );
759 Ok(Value::Object(
760 [
761 ("prepared_id".to_string(), Value::Number(id as f64)),
762 (
763 "parameter_count".to_string(),
764 Value::Number(parameter_count as f64),
765 ),
766 ]
767 .into_iter()
768 .collect(),
769 ))
770 }
771
772 "execute_prepared" => {
773 use crate::storage::query::planner::shape::bind_parameterized_query;
774 use crate::storage::schema::Value as SV;
775
776 let id = params
777 .get("prepared_id")
778 .and_then(Value::as_f64)
779 .map(|n| n as u64)
780 .ok_or((
781 error_code::INVALID_PARAMS,
782 "missing 'prepared_id'".to_string(),
783 ))?;
784
785 let stmt = session.prepared.get(&id).ok_or((
786 error_code::QUERY_ERROR,
787 format!("no prepared statement with id {id}"),
788 ))?;
789
790 let binds_json: Vec<Value> = params
792 .get("binds")
793 .and_then(Value::as_array)
794 .map(|a| a.to_vec())
795 .unwrap_or_default();
796 if binds_json.len() != stmt.parameter_count {
797 return Err((
798 error_code::INVALID_PARAMS,
799 format!(
800 "expected {} bind values, got {}",
801 stmt.parameter_count,
802 binds_json.len()
803 ),
804 ));
805 }
806
807 let binds: Vec<SV> = binds_json.iter().map(json_value_to_schema_value).collect();
809
810 let expr = if stmt.parameter_count == 0 {
812 stmt.shape.clone()
813 } else {
814 bind_parameterized_query(&stmt.shape, &binds, stmt.parameter_count)
815 .ok_or((error_code::QUERY_ERROR, "bind failed".to_string()))?
816 };
817
818 let qr = runtime
819 .execute_query_expr(expr)
820 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
821 Ok(query_result_to_json(&qr))
822 }
823
824 "insert" => {
825 let collection = params.get("collection").and_then(Value::as_str).ok_or((
826 error_code::INVALID_PARAMS,
827 "missing 'collection' string".to_string(),
828 ))?;
829 let payload = params.get("payload").ok_or((
830 error_code::INVALID_PARAMS,
831 "missing 'payload' object".to_string(),
832 ))?;
833 let payload_obj = payload.as_object().ok_or((
834 error_code::INVALID_PARAMS,
835 "'payload' must be a JSON object".to_string(),
836 ))?;
837 if let Some(tx) = session.current_tx_mut() {
838 let sql = build_insert_sql(collection, payload_obj.iter());
839 tx.write_set.push(PendingSql::Insert(sql));
840 return Ok(pending_tx_response(tx.tx_id));
841 }
842
843 let output = runtime
844 .create_row(flat_payload_to_row_input(collection, payload_obj))
845 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
846 let mut out = json::Map::new();
847 out.insert("affected".to_string(), Value::Number(1.0));
848 out.insert("id".to_string(), Value::String(output.id.raw().to_string()));
849 Ok(Value::Object(out))
850 }
851
852 "bulk_insert" => {
853 let collection = params.get("collection").and_then(Value::as_str).ok_or((
854 error_code::INVALID_PARAMS,
855 "missing 'collection' string".to_string(),
856 ))?;
857 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
858 error_code::INVALID_PARAMS,
859 "missing 'payloads' array".to_string(),
860 ))?;
861
862 let mut objects = Vec::with_capacity(payloads.len());
863 for entry in payloads {
864 objects.push(entry.as_object().ok_or((
865 error_code::INVALID_PARAMS,
866 "each payload must be a JSON object".to_string(),
867 ))?);
868 }
869
870 if let Some(tx) = session.current_tx_mut() {
871 let mut buffered: u64 = 0;
872 for obj in &objects {
873 let sql = build_insert_sql(collection, obj.iter());
874 tx.write_set.push(PendingSql::Insert(sql));
875 buffered += 1;
876 }
877 let tx_id = tx.tx_id;
878 return Ok(Value::Object(
879 [
880 ("affected".to_string(), Value::Number(0.0)),
881 ("buffered".to_string(), Value::Number(buffered as f64)),
882 ("pending".to_string(), Value::Bool(true)),
883 ("tx_id".to_string(), Value::Number(tx_id as f64)),
884 ]
885 .into_iter()
886 .collect(),
887 ));
888 }
889
890 if should_bulk_insert_graph(runtime, collection, &objects) {
891 return bulk_insert_graph(runtime, collection, &objects)
892 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()));
893 }
894
895 let mut total_affected: u64 = 0;
896 let mut ids = Vec::with_capacity(objects.len());
897 for chunk in objects.chunks(STDIO_BULK_INSERT_CHUNK_ROWS) {
898 let rows = chunk
899 .iter()
900 .map(|obj| flat_payload_to_row_input(collection, obj))
901 .collect();
902 let outputs = runtime
903 .create_rows_batch(CreateRowsBatchInput {
904 collection: collection.to_string(),
905 rows,
906 suppress_events: false,
907 })
908 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
909 total_affected += outputs.len() as u64;
910 ids.extend(
911 outputs
912 .into_iter()
913 .map(|output| Value::String(output.id.raw().to_string())),
914 );
915 }
916 let mut out = json::Map::new();
917 out.insert("affected".to_string(), Value::Number(total_affected as f64));
918 out.insert("ids".to_string(), Value::Array(ids));
919 Ok(Value::Object(out))
920 }
921
922 "get" => {
923 let collection = params.get("collection").and_then(Value::as_str).ok_or((
924 error_code::INVALID_PARAMS,
925 "missing 'collection' string".to_string(),
926 ))?;
927 let id = params.get("id").and_then(Value::as_str).ok_or((
928 error_code::INVALID_PARAMS,
929 "missing 'id' string".to_string(),
930 ))?;
931 let sql = format!("SELECT * FROM {collection} WHERE red_entity_id = {id} LIMIT 1");
932 let qr = runtime
933 .execute_query(&sql)
934 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
935 let entity = qr
936 .result
937 .records
938 .first()
939 .map(record_to_json_object)
940 .unwrap_or(Value::Null);
941 Ok(Value::Object(
942 [("entity".to_string(), entity)].into_iter().collect(),
943 ))
944 }
945
946 "delete" => {
947 let collection = params.get("collection").and_then(Value::as_str).ok_or((
948 error_code::INVALID_PARAMS,
949 "missing 'collection' string".to_string(),
950 ))?;
951 let id = params.get("id").and_then(Value::as_str).ok_or((
952 error_code::INVALID_PARAMS,
953 "missing 'id' string".to_string(),
954 ))?;
955 let sql = format!("DELETE FROM {collection} WHERE red_entity_id = {id}");
956
957 if let Some(tx) = session.current_tx_mut() {
958 tx.write_set.push(PendingSql::Delete(sql));
959 return Ok(pending_tx_response(tx.tx_id));
960 }
961
962 let qr = runtime
963 .execute_query(&sql)
964 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
965 Ok(Value::Object(
966 [(
967 "affected".to_string(),
968 Value::Number(qr.affected_rows as f64),
969 )]
970 .into_iter()
971 .collect(),
972 ))
973 }
974
975 "close" => {
976 let _ = session.take_tx();
981 session.clear_cursors();
982 let _ = runtime.checkpoint();
983 Ok(Value::Null)
984 }
985
986 "auth.login"
991 | "auth.whoami"
992 | "auth.change_password"
993 | "auth.create_api_key"
994 | "auth.revoke_api_key" => {
995 let _ = (session, params);
996 Err((
997 error_code::INVALID_REQUEST,
998 format!(
999 "{method}: auth methods are only available on grpc:// connections; \
1000 embedded modes (memory://, file://) inherit caller privileges"
1001 ),
1002 ))
1003 }
1004
1005 other => Err((
1006 error_code::INVALID_REQUEST,
1007 format!("unknown method: {other}"),
1008 )),
1009 }
1010}
1011
1012fn success_response(id: &Value, result: &Value, is_close: bool) -> String {
1017 let mut envelope = json::Map::new();
1022 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1023 envelope.insert("id".to_string(), id.clone());
1024 envelope.insert("result".to_string(), result.clone());
1025 if is_close {
1026 envelope.insert("__close__".to_string(), Value::Bool(true));
1027 }
1028 Value::Object(envelope).to_string_compact()
1029}
1030
1031fn error_response(id: &Value, code: &str, message: &str) -> String {
1032 let mut err = json::Map::new();
1033 err.insert("code".to_string(), Value::String(code.to_string()));
1034 err.insert("message".to_string(), Value::String(message.to_string()));
1035 err.insert("data".to_string(), Value::Null);
1036
1037 let mut envelope = json::Map::new();
1038 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1039 envelope.insert("id".to_string(), id.clone());
1040 envelope.insert("error".to_string(), Value::Object(err));
1041 Value::Object(envelope).to_string_compact()
1042}
1043
1044fn pending_tx_response(tx_id: u64) -> Value {
1051 Value::Object(
1052 [
1053 ("affected".to_string(), Value::Number(0.0)),
1054 ("pending".to_string(), Value::Bool(true)),
1055 ("tx_id".to_string(), Value::Number(tx_id as f64)),
1056 ]
1057 .into_iter()
1058 .collect(),
1059 )
1060}
1061
1062pub(crate) fn build_insert_sql<'a, I>(collection: &str, fields: I) -> String
1063where
1064 I: Iterator<Item = (&'a String, &'a Value)>,
1065{
1066 let mut cols = Vec::new();
1067 let mut vals = Vec::new();
1068 for (k, v) in fields {
1069 cols.push(k.clone());
1070 vals.push(value_to_sql_literal(v));
1071 }
1072 format!(
1073 "INSERT INTO {collection} ({}) VALUES ({})",
1074 cols.join(", "),
1075 vals.join(", "),
1076 )
1077}
1078
1079fn flat_payload_to_row_input(
1080 collection: &str,
1081 payload: &json::Map<String, Value>,
1082) -> CreateRowInput {
1083 CreateRowInput {
1084 collection: collection.to_string(),
1085 fields: payload
1086 .iter()
1087 .map(|(key, value)| (key.clone(), json_value_to_schema_value(value)))
1088 .collect(),
1089 metadata: Vec::new(),
1090 node_links: Vec::new(),
1091 vector_links: Vec::new(),
1092 }
1093}
1094
1095fn bulk_insert_chunk_count(row_count: usize) -> usize {
1096 if row_count == 0 {
1097 0
1098 } else {
1099 ((row_count - 1) / STDIO_BULK_INSERT_CHUNK_ROWS) + 1
1100 }
1101}
1102
1103pub(crate) fn should_bulk_insert_graph(
1104 runtime: &RedDBRuntime,
1105 collection: &str,
1106 payloads: &[&json::Map<String, Value>],
1107) -> bool {
1108 let graph_shaped = payloads
1109 .iter()
1110 .all(|payload| payload.get("label").and_then(Value::as_str).is_some());
1111 if !graph_shaped {
1112 return false;
1113 }
1114
1115 matches!(
1116 runtime
1117 .db()
1118 .catalog_model_snapshot()
1119 .collections
1120 .iter()
1121 .find(|descriptor| descriptor.name == collection)
1122 .map(|descriptor| descriptor.declared_model.unwrap_or(descriptor.model)),
1123 Some(crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed)
1124 )
1125}
1126
1127pub(crate) fn bulk_insert_graph(
1128 runtime: &RedDBRuntime,
1129 collection: &str,
1130 payloads: &[&json::Map<String, Value>],
1131) -> crate::RedDBResult<Value> {
1132 use crate::application::entity_payload::{parse_create_edge_input, parse_create_node_input};
1133 use crate::application::ports::RuntimeEntityPort;
1134
1135 let mut ids = Vec::with_capacity(payloads.len());
1136 for payload in payloads {
1137 let input_payload = normalize_flat_graph_payload(payload);
1138 let id = if payload.contains_key("from") || payload.contains_key("to") {
1139 runtime
1140 .create_edge(parse_create_edge_input(
1141 collection.to_string(),
1142 &input_payload,
1143 )?)?
1144 .id
1145 } else {
1146 runtime
1147 .create_node(parse_create_node_input(
1148 collection.to_string(),
1149 &input_payload,
1150 )?)?
1151 .id
1152 };
1153 ids.push(Value::Number(id.raw() as f64));
1154 }
1155
1156 let mut out = json::Map::new();
1157 out.insert("affected".to_string(), Value::Number(ids.len() as f64));
1158 out.insert("ids".to_string(), Value::Array(ids));
1159 Ok(Value::Object(out))
1160}
1161
1162fn normalize_flat_graph_payload(payload: &json::Map<String, Value>) -> Value {
1163 if payload.contains_key("properties") || payload.contains_key("fields") {
1164 return Value::Object(payload.clone());
1165 }
1166
1167 let is_edge = payload.contains_key("from") || payload.contains_key("to");
1168 let mut normalized = payload.clone();
1169 let mut properties = json::Map::new();
1170 for (key, value) in payload {
1171 let reserved = if is_edge {
1172 matches!(
1173 key.as_str(),
1174 "label"
1175 | "from"
1176 | "to"
1177 | "weight"
1178 | "metadata"
1179 | "properties"
1180 | "fields"
1181 | "_ttl_ms"
1182 | "_expires_at"
1183 )
1184 } else {
1185 matches!(
1186 key.as_str(),
1187 "label"
1188 | "node_type"
1189 | "metadata"
1190 | "links"
1191 | "embeddings"
1192 | "properties"
1193 | "fields"
1194 | "_ttl_ms"
1195 | "_expires_at"
1196 )
1197 };
1198 if !reserved {
1199 properties.insert(key.clone(), value.clone());
1200 }
1201 }
1202 if !properties.is_empty() {
1203 normalized.insert("properties".to_string(), Value::Object(properties));
1204 }
1205 Value::Object(normalized)
1206}
1207
1208pub(crate) fn value_to_sql_literal(v: &Value) -> String {
1209 match v {
1210 Value::Null => "NULL".to_string(),
1211 Value::Bool(b) => b.to_string(),
1212 Value::Number(n) => {
1213 if n.fract() == 0.0 {
1214 format!("{}", *n as i64)
1215 } else {
1216 n.to_string()
1217 }
1218 }
1219 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1220 other => format!("'{}'", other.to_string_compact().replace('\'', "''")),
1221 }
1222}
1223
1224pub(crate) fn query_result_to_json(qr: &RuntimeQueryResult) -> Value {
1225 if let Some(ask) = ask_query_result_to_json(qr) {
1226 return ask;
1227 }
1228
1229 let mut envelope = json::Map::new();
1230 envelope.insert(
1231 "statement".to_string(),
1232 Value::String(qr.statement_type.to_string()),
1233 );
1234 envelope.insert(
1235 "affected".to_string(),
1236 Value::Number(qr.affected_rows as f64),
1237 );
1238
1239 let mut columns = Vec::new();
1240 if let Some(first) = qr.result.records.first() {
1241 let mut keys: Vec<String> = first
1242 .column_names()
1243 .into_iter()
1244 .map(|k| k.to_string())
1245 .collect();
1246 keys.sort();
1247 columns = keys.into_iter().map(Value::String).collect();
1248 }
1249 envelope.insert("columns".to_string(), Value::Array(columns));
1250
1251 let rows: Vec<Value> = qr
1252 .result
1253 .records
1254 .iter()
1255 .map(record_to_json_object)
1256 .collect();
1257 envelope.insert("rows".to_string(), Value::Array(rows));
1258
1259 Value::Object(envelope)
1260}
1261
1262fn ask_query_result_to_json(qr: &RuntimeQueryResult) -> Option<Value> {
1263 if qr.statement != "ask" {
1264 return None;
1265 }
1266 let row = qr.result.records.first()?;
1267 let answer = text_field(row, "answer")?;
1268 let provider = text_field(row, "provider").unwrap_or_default();
1269 let model = text_field(row, "model").unwrap_or_default();
1270 let sources_flat_json = json_field(row, "sources_flat").unwrap_or(Value::Array(Vec::new()));
1271 let citations_json = json_field(row, "citations").unwrap_or(Value::Array(Vec::new()));
1272 let validation_json = json_field(row, "validation").unwrap_or(Value::Object(json::Map::new()));
1273
1274 let effective_mode = match text_field(row, "mode").as_deref() {
1275 Some("lenient") => crate::runtime::ai::ask_response_envelope::Mode::Lenient,
1276 _ => crate::runtime::ai::ask_response_envelope::Mode::Strict,
1277 };
1278
1279 let result = crate::runtime::ai::ask_response_envelope::AskResult {
1280 answer,
1281 sources_flat: envelope_sources_flat(&sources_flat_json),
1282 citations: envelope_citations(&citations_json),
1283 validation: envelope_validation(&validation_json),
1284 cache_hit: bool_field(row, "cache_hit").unwrap_or(false),
1285 provider,
1286 model,
1287 prompt_tokens: u32_field(row, "prompt_tokens").unwrap_or(0),
1288 completion_tokens: u32_field(row, "completion_tokens").unwrap_or(0),
1289 cost_usd: f64_field(row, "cost_usd").unwrap_or(0.0),
1290 effective_mode,
1291 retry_count: u32_field(row, "retry_count").unwrap_or(0),
1292 };
1293 Some(crate::runtime::ai::ask_response_envelope::build(&result))
1294}
1295
1296fn record_field<'a>(record: &'a UnifiedRecord, name: &str) -> Option<&'a SchemaValue> {
1297 record
1298 .iter_fields()
1299 .find_map(|(key, value)| (key.as_ref() == name).then_some(value))
1300}
1301
1302fn text_field(record: &UnifiedRecord, name: &str) -> Option<String> {
1303 match record_field(record, name)? {
1304 SchemaValue::Text(s) => Some(s.to_string()),
1305 SchemaValue::Email(s)
1306 | SchemaValue::Url(s)
1307 | SchemaValue::NodeRef(s)
1308 | SchemaValue::EdgeRef(s) => Some(s.clone()),
1309 other => Some(format!("{other}")),
1310 }
1311}
1312
1313fn u32_field(record: &UnifiedRecord, name: &str) -> Option<u32> {
1314 match record_field(record, name)? {
1315 SchemaValue::Integer(n) => (*n >= 0).then_some((*n).min(u32::MAX as i64) as u32),
1316 SchemaValue::UnsignedInteger(n) => Some((*n).min(u32::MAX as u64) as u32),
1317 SchemaValue::BigInt(n)
1318 | SchemaValue::TimestampMs(n)
1319 | SchemaValue::Timestamp(n)
1320 | SchemaValue::Duration(n)
1321 | SchemaValue::Decimal(n) => (*n >= 0).then_some((*n).min(u32::MAX as i64) as u32),
1322 SchemaValue::Float(n) => (*n >= 0.0).then_some((*n).min(u32::MAX as f64) as u32),
1323 _ => None,
1324 }
1325}
1326
1327fn f64_field(record: &UnifiedRecord, name: &str) -> Option<f64> {
1328 match record_field(record, name)? {
1329 SchemaValue::Integer(n) => Some(*n as f64),
1330 SchemaValue::UnsignedInteger(n) => Some(*n as f64),
1331 SchemaValue::BigInt(n)
1332 | SchemaValue::TimestampMs(n)
1333 | SchemaValue::Timestamp(n)
1334 | SchemaValue::Duration(n)
1335 | SchemaValue::Decimal(n) => Some(*n as f64),
1336 SchemaValue::Float(n) => Some(*n),
1337 _ => None,
1338 }
1339}
1340
1341fn bool_field(record: &UnifiedRecord, name: &str) -> Option<bool> {
1342 match record_field(record, name)? {
1343 SchemaValue::Boolean(value) => Some(*value),
1344 _ => None,
1345 }
1346}
1347
1348fn json_field(record: &UnifiedRecord, name: &str) -> Option<Value> {
1349 match record_field(record, name)? {
1350 SchemaValue::Json(bytes) => json::from_slice(bytes).ok(),
1351 SchemaValue::Text(text) => json::from_str(text).ok(),
1352 _ => None,
1353 }
1354}
1355
1356fn envelope_sources_flat(
1357 value: &Value,
1358) -> Vec<crate::runtime::ai::ask_response_envelope::SourceRow> {
1359 value
1360 .as_array()
1361 .unwrap_or(&[])
1362 .iter()
1363 .filter_map(|source| {
1364 let urn = source.get("urn").and_then(Value::as_str)?.to_string();
1365 let payload = source
1366 .get("payload")
1367 .and_then(Value::as_str)
1368 .map(ToString::to_string)
1369 .unwrap_or_else(|| source.to_string_compact());
1370 Some(crate::runtime::ai::ask_response_envelope::SourceRow { urn, payload })
1371 })
1372 .collect()
1373}
1374
1375fn envelope_citations(value: &Value) -> Vec<crate::runtime::ai::ask_response_envelope::Citation> {
1376 value
1377 .as_array()
1378 .unwrap_or(&[])
1379 .iter()
1380 .filter_map(|citation| {
1381 let marker = citation.get("marker").and_then(Value::as_u64)?;
1382 let urn = citation.get("urn").and_then(Value::as_str)?.to_string();
1383 Some(crate::runtime::ai::ask_response_envelope::Citation {
1384 marker: marker.min(u32::MAX as u64) as u32,
1385 urn,
1386 })
1387 })
1388 .collect()
1389}
1390
1391fn envelope_validation(value: &Value) -> crate::runtime::ai::ask_response_envelope::Validation {
1392 crate::runtime::ai::ask_response_envelope::Validation {
1393 ok: value.get("ok").and_then(Value::as_bool).unwrap_or(true),
1394 warnings: validation_items(value, "warnings")
1395 .into_iter()
1396 .map(
1397 |(kind, detail)| crate::runtime::ai::ask_response_envelope::ValidationWarning {
1398 kind,
1399 detail,
1400 },
1401 )
1402 .collect(),
1403 errors: validation_items(value, "errors")
1404 .into_iter()
1405 .map(
1406 |(kind, detail)| crate::runtime::ai::ask_response_envelope::ValidationError {
1407 kind,
1408 detail,
1409 },
1410 )
1411 .collect(),
1412 }
1413}
1414
1415fn validation_items(value: &Value, key: &str) -> Vec<(String, String)> {
1416 value
1417 .get(key)
1418 .and_then(Value::as_array)
1419 .unwrap_or(&[])
1420 .iter()
1421 .filter_map(|item| {
1422 Some((
1423 item.get("kind").and_then(Value::as_str)?.to_string(),
1424 item.get("detail")
1425 .and_then(Value::as_str)
1426 .unwrap_or("")
1427 .to_string(),
1428 ))
1429 })
1430 .collect()
1431}
1432
1433pub(crate) fn insert_result_to_json(qr: &RuntimeQueryResult) -> Value {
1434 let mut envelope = json::Map::new();
1435 envelope.insert(
1436 "affected".to_string(),
1437 Value::Number(qr.affected_rows as f64),
1438 );
1439 if let Some(first) = qr.result.records.first() {
1441 if let Some(id_val) = first
1442 .iter_fields()
1443 .find(|(k, _)| {
1444 let s: &str = k;
1445 s == "_entity_id"
1446 })
1447 .map(|(_, v)| schema_value_to_json(v))
1448 {
1449 envelope.insert("id".to_string(), id_val);
1450 }
1451 }
1452 Value::Object(envelope)
1453}
1454
1455fn record_to_json_object(record: &UnifiedRecord) -> Value {
1456 let mut map = json::Map::new();
1457 let mut entries: Vec<(&str, &SchemaValue)> =
1460 record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
1461 entries.sort_by(|a, b| a.0.cmp(b.0));
1462 for (k, v) in entries {
1463 map.insert(k.to_string(), schema_value_to_json(v));
1464 }
1465 Value::Object(map)
1466}
1467
1468fn schema_value_to_json(v: &SchemaValue) -> Value {
1469 match v {
1470 SchemaValue::Null => Value::Null,
1471 SchemaValue::Boolean(b) => Value::Bool(*b),
1472 SchemaValue::Integer(n) => Value::Number(*n as f64),
1473 SchemaValue::UnsignedInteger(n) => Value::Number(*n as f64),
1474 SchemaValue::Float(n) if n.is_finite() => Value::Number(*n),
1475 SchemaValue::Float(n) => {
1476 let token = if n.is_nan() {
1477 "NaN"
1478 } else if n.is_sign_positive() {
1479 "Infinity"
1480 } else {
1481 "-Infinity"
1482 };
1483 single_key_object("$float", Value::String(token.to_string()))
1484 }
1485 SchemaValue::BigInt(n) => Value::Number(*n as f64),
1486 SchemaValue::TimestampMs(n) | SchemaValue::Duration(n) | SchemaValue::Decimal(n) => {
1487 Value::Number(*n as f64)
1488 }
1489 SchemaValue::Timestamp(n) => single_key_object("$ts", Value::String(n.to_string())),
1490 SchemaValue::Password(_) | SchemaValue::Secret(_) => Value::String("***".to_string()),
1491 SchemaValue::Text(s) => Value::String(s.to_string()),
1492 SchemaValue::Blob(bytes) => {
1493 single_key_object("$bytes", Value::String(base64_encode(bytes)))
1494 }
1495 SchemaValue::Json(bytes) => {
1496 crate::presentation::entity_json::storage_json_bytes_to_json(bytes)
1497 }
1498 SchemaValue::Uuid(bytes) => single_key_object("$uuid", Value::String(format_uuid(bytes))),
1499 SchemaValue::Email(s)
1500 | SchemaValue::Url(s)
1501 | SchemaValue::NodeRef(s)
1502 | SchemaValue::EdgeRef(s) => Value::String(s.clone()),
1503 other => Value::String(format!("{other}")),
1504 }
1505}
1506
1507fn single_key_object(key: &str, value: Value) -> Value {
1508 Value::Object([(key.to_string(), value)].into_iter().collect())
1509}
1510
1511pub(crate) fn json_value_to_schema_value(v: &Value) -> SchemaValue {
1515 match v {
1516 Value::Null => SchemaValue::Null,
1517 Value::Bool(b) => SchemaValue::Boolean(*b),
1518 Value::Number(n) => {
1519 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
1520 SchemaValue::Integer(*n as i64)
1521 } else {
1522 SchemaValue::Float(*n)
1523 }
1524 }
1525 Value::String(s) => SchemaValue::text(s.clone()),
1526 Value::Array(items) => {
1527 if items.iter().all(|v| matches!(v, Value::Number(_))) {
1531 let floats: Vec<f32> = items
1532 .iter()
1533 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
1534 .collect();
1535 SchemaValue::Vector(floats)
1536 } else {
1537 SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default())
1538 }
1539 }
1540 Value::Object(map) => {
1541 if map.len() == 1 {
1542 if let Some(Value::String(encoded)) = map.get("$bytes") {
1543 if let Ok(bytes) = base64_decode(encoded) {
1544 return SchemaValue::Blob(bytes);
1545 }
1546 }
1547 if let Some(value) = map.get("$ts") {
1548 if let Some(ts) = json_i64(value) {
1549 return SchemaValue::Timestamp(ts);
1550 }
1551 }
1552 if let Some(Value::String(value)) = map.get("$uuid") {
1553 if let Ok(uuid) = crate::crypto::Uuid::parse_str(value) {
1554 return SchemaValue::Uuid(*uuid.as_bytes());
1555 }
1556 }
1557 if let Some(Value::String(value)) = map.get("$float") {
1558 return match value.as_str() {
1559 "NaN" => SchemaValue::Float(f64::NAN),
1560 "Infinity" | "+Infinity" | "inf" | "+inf" => {
1561 SchemaValue::Float(f64::INFINITY)
1562 }
1563 "-Infinity" | "-inf" => SchemaValue::Float(f64::NEG_INFINITY),
1564 _ => SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default()),
1565 };
1566 }
1567 }
1568 SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default())
1569 }
1570 }
1571}
1572
1573fn json_i64(value: &Value) -> Option<i64> {
1574 match value {
1575 Value::Number(n) => {
1576 if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
1577 Some(*n as i64)
1578 } else {
1579 None
1580 }
1581 }
1582 Value::String(s) => s.parse::<i64>().ok(),
1583 _ => None,
1584 }
1585}
1586
1587fn format_uuid(bytes: &[u8; 16]) -> String {
1588 format!(
1589 "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
1590 bytes[0],
1591 bytes[1],
1592 bytes[2],
1593 bytes[3],
1594 bytes[4],
1595 bytes[5],
1596 bytes[6],
1597 bytes[7],
1598 bytes[8],
1599 bytes[9],
1600 bytes[10],
1601 bytes[11],
1602 bytes[12],
1603 bytes[13],
1604 bytes[14],
1605 bytes[15]
1606 )
1607}
1608
1609fn base64_encode(bytes: &[u8]) -> String {
1610 const TABLE: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1611 let mut out = String::with_capacity(bytes.len().div_ceil(3) * 4);
1612 let mut chunks = bytes.chunks_exact(3);
1613 for chunk in chunks.by_ref() {
1614 let n = ((chunk[0] as u32) << 16) | ((chunk[1] as u32) << 8) | chunk[2] as u32;
1615 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1616 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1617 out.push(TABLE[((n >> 6) & 0x3f) as usize] as char);
1618 out.push(TABLE[(n & 0x3f) as usize] as char);
1619 }
1620 match chunks.remainder() {
1621 [] => {}
1622 [a] => {
1623 let n = (*a as u32) << 16;
1624 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1625 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1626 out.push('=');
1627 out.push('=');
1628 }
1629 [a, b] => {
1630 let n = ((*a as u32) << 16) | ((*b as u32) << 8);
1631 out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1632 out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1633 out.push(TABLE[((n >> 6) & 0x3f) as usize] as char);
1634 out.push('=');
1635 }
1636 _ => unreachable!(),
1637 }
1638 out
1639}
1640
1641fn base64_decode(input: &str) -> Result<Vec<u8>, String> {
1642 let bytes = input.as_bytes();
1643 if !bytes.len().is_multiple_of(4) {
1644 return Err("base64 length must be a multiple of 4".to_string());
1645 }
1646 let mut out = Vec::with_capacity(bytes.len() / 4 * 3);
1647 for chunk in bytes.chunks_exact(4) {
1648 let pad = chunk.iter().rev().take_while(|&&b| b == b'=').count();
1649 let a = base64_value(chunk[0])?;
1650 let b = base64_value(chunk[1])?;
1651 let c = if chunk[2] == b'=' {
1652 0
1653 } else {
1654 base64_value(chunk[2])?
1655 };
1656 let d = if chunk[3] == b'=' {
1657 0
1658 } else {
1659 base64_value(chunk[3])?
1660 };
1661 let n = ((a as u32) << 18) | ((b as u32) << 12) | ((c as u32) << 6) | d as u32;
1662 out.push(((n >> 16) & 0xff) as u8);
1663 if pad < 2 {
1664 out.push(((n >> 8) & 0xff) as u8);
1665 }
1666 if pad < 1 {
1667 out.push((n & 0xff) as u8);
1668 }
1669 }
1670 Ok(out)
1671}
1672
1673fn base64_value(byte: u8) -> Result<u8, String> {
1674 match byte {
1675 b'A'..=b'Z' => Ok(byte - b'A'),
1676 b'a'..=b'z' => Ok(byte - b'a' + 26),
1677 b'0'..=b'9' => Ok(byte - b'0' + 52),
1678 b'+' => Ok(62),
1679 b'/' => Ok(63),
1680 b'=' => Ok(0),
1681 _ => Err(format!("invalid base64 character: {}", byte as char)),
1682 }
1683}
1684
1685fn dispatch_method_remote(
1695 client: &AsyncMutex<RedDBClient>,
1696 tokio_rt: &tokio::runtime::Runtime,
1697 method: &str,
1698 params: &Value,
1699) -> Result<Value, (&'static str, String)> {
1700 match method {
1701 "version" => Ok(Value::Object(
1702 [
1703 (
1704 "version".to_string(),
1705 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1706 ),
1707 (
1708 "protocol".to_string(),
1709 Value::String(PROTOCOL_VERSION.to_string()),
1710 ),
1711 ]
1712 .into_iter()
1713 .collect(),
1714 )),
1715
1716 "health" => {
1717 let result = tokio_rt.block_on(async {
1718 let mut guard = client.lock().await;
1719 guard.health_status().await
1720 });
1721 match result {
1722 Ok(status) => Ok(Value::Object(
1723 [
1724 ("ok".to_string(), Value::Bool(status.healthy)),
1725 ("state".to_string(), Value::String(status.state)),
1726 (
1727 "checked_at_unix_ms".to_string(),
1728 Value::Number(status.checked_at_unix_ms as f64),
1729 ),
1730 (
1731 "version".to_string(),
1732 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1733 ),
1734 ]
1735 .into_iter()
1736 .collect(),
1737 )),
1738 Err(e) => Err((error_code::INTERNAL_ERROR, e.to_string())),
1739 }
1740 }
1741
1742 "query" => {
1743 let sql = params.get("sql").and_then(Value::as_str).ok_or((
1744 error_code::INVALID_PARAMS,
1745 "missing 'sql' string".to_string(),
1746 ))?;
1747 let json_str = tokio_rt
1748 .block_on(async {
1749 let mut guard = client.lock().await;
1750 guard.query(sql).await
1751 })
1752 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1753 let parsed = json::from_str::<Value>(&json_str)
1758 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1759 Ok(parsed)
1760 }
1761
1762 "insert" => {
1763 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1764 error_code::INVALID_PARAMS,
1765 "missing 'collection' string".to_string(),
1766 ))?;
1767 let payload = params.get("payload").ok_or((
1768 error_code::INVALID_PARAMS,
1769 "missing 'payload' object".to_string(),
1770 ))?;
1771 if payload.as_object().is_none() {
1772 return Err((
1773 error_code::INVALID_PARAMS,
1774 "'payload' must be a JSON object".to_string(),
1775 ));
1776 }
1777 let payload_json = payload.to_string_compact();
1778 let reply = tokio_rt
1779 .block_on(async {
1780 let mut guard = client.lock().await;
1781 guard.create_row_entity(collection, &payload_json).await
1782 })
1783 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1784 let mut out = json::Map::new();
1785 out.insert("affected".to_string(), Value::Number(1.0));
1786 out.insert("id".to_string(), Value::String(reply.id.to_string()));
1787 Ok(Value::Object(out))
1788 }
1789
1790 "bulk_insert" => {
1791 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1792 error_code::INVALID_PARAMS,
1793 "missing 'collection' string".to_string(),
1794 ))?;
1795 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
1796 error_code::INVALID_PARAMS,
1797 "missing 'payloads' array".to_string(),
1798 ))?;
1799 let mut encoded = Vec::with_capacity(payloads.len());
1800 for entry in payloads {
1801 if entry.as_object().is_none() {
1802 return Err((
1803 error_code::INVALID_PARAMS,
1804 "each payload must be a JSON object".to_string(),
1805 ));
1806 }
1807 encoded.push(entry.to_string_compact());
1808 }
1809 let status = tokio_rt
1810 .block_on(async {
1811 let mut guard = client.lock().await;
1812 guard.bulk_create_rows(collection, encoded).await
1813 })
1814 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1815 Ok(Value::Object(
1816 [
1817 ("affected".to_string(), Value::Number(status.count as f64)),
1818 (
1819 "ids".to_string(),
1820 Value::Array(
1821 status
1822 .ids
1823 .into_iter()
1824 .map(|id| Value::Number(id as f64))
1825 .collect(),
1826 ),
1827 ),
1828 ]
1829 .into_iter()
1830 .collect(),
1831 ))
1832 }
1833
1834 "get" => {
1835 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1836 error_code::INVALID_PARAMS,
1837 "missing 'collection' string".to_string(),
1838 ))?;
1839 let id = params.get("id").and_then(Value::as_str).ok_or((
1840 error_code::INVALID_PARAMS,
1841 "missing 'id' string".to_string(),
1842 ))?;
1843 let sql = format!("SELECT * FROM {collection} WHERE red_entity_id = {id} LIMIT 1");
1844 let json_str = tokio_rt
1845 .block_on(async {
1846 let mut guard = client.lock().await;
1847 guard.query(&sql).await
1848 })
1849 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1850 let parsed = json::from_str::<Value>(&json_str)
1851 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1852 let entity = parsed
1855 .get("rows")
1856 .and_then(Value::as_array)
1857 .and_then(|rows| rows.first().cloned())
1858 .unwrap_or(Value::Null);
1859 Ok(Value::Object(
1860 [("entity".to_string(), entity)].into_iter().collect(),
1861 ))
1862 }
1863
1864 "delete" => {
1865 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1866 error_code::INVALID_PARAMS,
1867 "missing 'collection' string".to_string(),
1868 ))?;
1869 let id = params.get("id").and_then(Value::as_str).ok_or((
1870 error_code::INVALID_PARAMS,
1871 "missing 'id' string".to_string(),
1872 ))?;
1873 let id = id.parse::<u64>().map_err(|_| {
1874 (
1875 error_code::INVALID_PARAMS,
1876 "id must be a numeric string".to_string(),
1877 )
1878 })?;
1879 let _reply = tokio_rt
1880 .block_on(async {
1881 let mut guard = client.lock().await;
1882 guard.delete_entity(collection, id).await
1883 })
1884 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1885 Ok(Value::Object(
1886 [("affected".to_string(), Value::Number(1.0))]
1887 .into_iter()
1888 .collect(),
1889 ))
1890 }
1891
1892 "close" => Ok(Value::Null),
1893
1894 other => Err((
1895 error_code::INVALID_REQUEST,
1896 format!("unknown method: {other}"),
1897 )),
1898 }
1899}
1900
1901#[cfg(test)]
1902mod tests {
1903 use super::*;
1904 use crate::json::json;
1905 use proptest::prelude::*;
1906
1907 fn make_runtime() -> RedDBRuntime {
1908 RedDBRuntime::in_memory().expect("in-memory runtime")
1909 }
1910
1911 fn create_graph_collection(rt: &RedDBRuntime, name: &str) {
1912 let db = rt.db();
1913 db.store()
1914 .create_collection(name)
1915 .expect("create collection");
1916 let now = std::time::SystemTime::now()
1917 .duration_since(std::time::UNIX_EPOCH)
1918 .unwrap_or_default()
1919 .as_millis();
1920 db.save_collection_contract(crate::physical::CollectionContract {
1921 name: name.to_string(),
1922 declared_model: crate::catalog::CollectionModel::Graph,
1923 schema_mode: crate::catalog::SchemaMode::Dynamic,
1924 origin: crate::physical::ContractOrigin::Explicit,
1925 version: 1,
1926 created_at_unix_ms: now,
1927 updated_at_unix_ms: now,
1928 default_ttl_ms: None,
1929 vector_dimension: None,
1930 vector_metric: None,
1931 context_index_fields: Vec::new(),
1932 declared_columns: Vec::new(),
1933 table_def: None,
1934 timestamps_enabled: false,
1935 context_index_enabled: false,
1936 metrics_raw_retention_ms: None,
1937 metrics_rollup_policies: Vec::new(),
1938 metrics_tenant_identity: None,
1939 metrics_namespace: None,
1940 append_only: false,
1941 subscriptions: Vec::new(),
1942 analytics_config: Vec::new(),
1943 session_key: None,
1944 session_gap_ms: None,
1945 retention_duration_ms: None,
1946 })
1947 .expect("save graph contract");
1948 }
1949
1950 fn handle(rt: &RedDBRuntime, line: &str) -> String {
1951 let mut session = Session::new();
1952 handle_line(&Backend::Local(rt), &mut session, line)
1953 }
1954
1955 fn query_request(id: u64, sql: &str) -> String {
1956 let mut params = json::Map::new();
1957 params.insert("sql".to_string(), Value::String(sql.to_string()));
1958
1959 let mut request = json::Map::new();
1960 request.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1961 request.insert("id".to_string(), Value::Number(id as f64));
1962 request.insert("method".to_string(), Value::String("query".to_string()));
1963 request.insert("params".to_string(), Value::Object(params));
1964 Value::Object(request).to_string_compact()
1965 }
1966
1967 fn query_request_with_params(id: u64, sql: &str, binds: Vec<Value>) -> String {
1968 let mut params = json::Map::new();
1969 params.insert("sql".to_string(), Value::String(sql.to_string()));
1970 params.insert("params".to_string(), Value::Array(binds));
1971
1972 let mut request = json::Map::new();
1973 request.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1974 request.insert("id".to_string(), Value::Number(id as f64));
1975 request.insert("method".to_string(), Value::String("query".to_string()));
1976 request.insert("params".to_string(), Value::Object(params));
1977 Value::Object(request).to_string_compact()
1978 }
1979
1980 fn with_session<F>(rt: &RedDBRuntime, f: F)
1983 where
1984 F: FnOnce(&dyn Fn(&str) -> String, &RedDBRuntime),
1985 {
1986 let session = std::cell::RefCell::new(Session::new());
1987 let call = |line: &str| -> String {
1988 let mut s = session.borrow_mut();
1989 handle_line(&Backend::Local(rt), &mut s, line)
1990 };
1991 f(&call, rt);
1992 }
1993
1994 fn result_rows(response: &str) -> Vec<Value> {
1995 json::from_str::<Value>(response)
1996 .expect("json response")
1997 .get("result")
1998 .and_then(|result| result.get("rows"))
1999 .and_then(Value::as_array)
2000 .map(|rows| rows.to_vec())
2001 .unwrap_or_default()
2002 }
2003
2004 fn result_name_kind(response: &str) -> Vec<(String, String)> {
2005 result_rows(response)
2006 .into_iter()
2007 .map(|row| {
2008 let object = row.as_object().expect("row object");
2009 let name = object
2010 .get("name")
2011 .and_then(Value::as_str)
2012 .expect("row name")
2013 .to_string();
2014 let kind = object
2015 .get("kind")
2016 .and_then(Value::as_str)
2017 .expect("row kind")
2018 .to_string();
2019 (name, kind)
2020 })
2021 .collect()
2022 }
2023
2024 fn json_scalar_param() -> impl Strategy<Value = Value> {
2025 prop_oneof![
2026 Just(Value::Null),
2027 any::<bool>().prop_map(Value::Bool),
2028 (-1000_i64..1000_i64).prop_map(|n| Value::Number(n as f64)),
2029 "[a-z']{0,8}".prop_map(Value::String),
2030 ]
2031 }
2032
2033 fn sql_literal_for_json(value: &Value) -> String {
2034 match value {
2035 Value::Null => "NULL".to_string(),
2036 Value::Bool(true) => "TRUE".to_string(),
2037 Value::Bool(false) => "FALSE".to_string(),
2038 Value::Number(n) => format!("{n:.0}"),
2039 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2040 _ => panic!("unsupported scalar param: {value:?}"),
2041 }
2042 }
2043
2044 #[test]
2045 fn version_method_returns_version_and_protocol() {
2046 let rt = make_runtime();
2047 let line = r#"{"jsonrpc":"2.0","id":1,"method":"version","params":{}}"#;
2048 let resp = handle(&rt, line);
2049 assert!(resp.contains("\"id\":1"));
2050 assert!(resp.contains("\"protocol\":\"1.0\""));
2051 assert!(resp.contains("\"version\""));
2052 }
2053
2054 #[test]
2055 fn health_method_returns_ok_true() {
2056 let rt = make_runtime();
2057 let resp = handle(
2058 &rt,
2059 r#"{"jsonrpc":"2.0","id":"abc","method":"health","params":{}}"#,
2060 );
2061 assert!(resp.contains("\"ok\":true"));
2062 assert!(resp.contains("\"id\":\"abc\""));
2063 }
2064
2065 #[test]
2066 fn parse_error_for_invalid_json() {
2067 let rt = make_runtime();
2068 let resp = handle(&rt, "not json {");
2069 assert!(resp.contains("\"code\":\"PARSE_ERROR\""));
2070 assert!(resp.contains("\"id\":null"));
2071 }
2072
2073 #[test]
2074 fn invalid_request_when_method_missing() {
2075 let rt = make_runtime();
2076 let resp = handle(&rt, r#"{"jsonrpc":"2.0","id":1,"params":{}}"#);
2077 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
2078 }
2079
2080 #[test]
2081 fn unknown_method_is_invalid_request() {
2082 let rt = make_runtime();
2083 let resp = handle(
2084 &rt,
2085 r#"{"jsonrpc":"2.0","id":1,"method":"frobnicate","params":{}}"#,
2086 );
2087 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
2088 assert!(resp.contains("frobnicate"));
2089 }
2090
2091 #[test]
2092 fn invalid_params_when_query_sql_missing() {
2093 let rt = make_runtime();
2094 let resp = handle(
2095 &rt,
2096 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{}}"#,
2097 );
2098 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
2099 }
2100
2101 #[test]
2102 fn close_method_marks_response_for_shutdown() {
2103 let rt = make_runtime();
2104 let resp = handle(
2105 &rt,
2106 r#"{"jsonrpc":"2.0","id":1,"method":"close","params":{}}"#,
2107 );
2108 assert!(resp.contains("\"__close__\":true"));
2109 }
2110
2111 #[test]
2112 fn query_with_int_text_params_round_trips() {
2113 let rt = make_runtime();
2114 let _ = handle(
2115 &rt,
2116 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE p (id INTEGER, name TEXT)"}}"#,
2117 );
2118 let _ = handle(
2119 &rt,
2120 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (1, 'Alice')"}}"#,
2121 );
2122 let _ = handle(
2123 &rt,
2124 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (2, 'Bob')"}}"#,
2125 );
2126 let resp = handle(
2127 &rt,
2128 r#"{"jsonrpc":"2.0","id":4,"method":"query","params":{"sql":"SELECT * FROM p WHERE id = $1 AND name = $2","params":[1,"Alice"]}}"#,
2129 );
2130 assert!(resp.contains("\"Alice\""), "got: {resp}");
2131 assert!(!resp.contains("\"Bob\""), "got: {resp}");
2132 }
2133
2134 #[test]
2135 fn query_with_question_params_covers_select_insert_update_delete() {
2136 let rt = make_runtime();
2137 let create = handle(
2138 &rt,
2139 &query_request(1, "CREATE TABLE qp (id INTEGER, name TEXT)"),
2140 );
2141 assert!(!create.contains("\"error\""), "got: {create}");
2142
2143 let inserted = handle(
2144 &rt,
2145 &query_request_with_params(
2146 2,
2147 "INSERT INTO qp (id, name) VALUES (?, ?)",
2148 vec![json!(1), json!("O'Reilly")],
2149 ),
2150 );
2151 assert!(inserted.contains("\"affected\":1"), "got: {inserted}");
2152
2153 let selected = handle(
2154 &rt,
2155 &query_request_with_params(3, "SELECT name FROM qp WHERE id = ?", vec![json!(1)]),
2156 );
2157 let rows = result_rows(&selected);
2158 assert_eq!(rows.len(), 1, "got: {selected}");
2159 assert_eq!(
2160 rows[0].get("name").and_then(Value::as_str),
2161 Some("O'Reilly")
2162 );
2163
2164 let selected_numbered = handle(
2165 &rt,
2166 &query_request_with_params(
2167 4,
2168 "SELECT name FROM qp WHERE name = ?1 AND id = ?2",
2169 vec![json!("O'Reilly"), json!(1)],
2170 ),
2171 );
2172 assert_eq!(
2173 result_rows(&selected_numbered).len(),
2174 1,
2175 "got: {selected_numbered}"
2176 );
2177
2178 let updated = handle(
2179 &rt,
2180 &query_request_with_params(
2181 5,
2182 "UPDATE qp SET name = ? WHERE id = ?",
2183 vec![json!("Alice"), json!(1)],
2184 ),
2185 );
2186 assert!(updated.contains("\"affected\":1"), "got: {updated}");
2187
2188 let deleted = handle(
2189 &rt,
2190 &query_request_with_params(6, "DELETE FROM qp WHERE name = ?", vec![json!("Alice")]),
2191 );
2192 assert!(deleted.contains("\"affected\":1"), "got: {deleted}");
2193
2194 let remaining = handle(&rt, &query_request(7, "SELECT * FROM qp"));
2195 assert!(result_rows(&remaining).is_empty(), "got: {remaining}");
2196 }
2197
2198 #[test]
2199 fn query_with_params_insert_and_search_round_trip() {
2200 let rt = make_runtime();
2201 let insert = handle(
2202 &rt,
2203 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"INSERT INTO bun_embeddings VECTOR (dense, content) VALUES ($1, $2)","params":[[1.0,0.0],"bun vector"]}}"#,
2204 );
2205 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2206
2207 let search = handle(
2208 &rt,
2209 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SEARCH SIMILAR $1 COLLECTION bun_embeddings LIMIT 1","params":[[1.0,0.0]]}}"#,
2210 );
2211 assert!(search.contains("\"rows\""), "got: {search}");
2212 assert!(search.contains("\"score\":1"), "got: {search}");
2213 assert!(!search.contains("\"error\""), "got: {search}");
2214 }
2215
2216 #[test]
2217 fn query_with_question_vector_param_round_trips() {
2218 let rt = make_runtime();
2219 let insert = handle(
2220 &rt,
2221 &query_request_with_params(
2222 1,
2223 "INSERT INTO question_embeddings VECTOR (dense, content) VALUES (?, ?)",
2224 vec![json!([1.0, 0.0]), json!("question vector")],
2225 ),
2226 );
2227 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2228
2229 let search = handle(
2230 &rt,
2231 &query_request_with_params(
2232 2,
2233 "SEARCH SIMILAR ? COLLECTION question_embeddings LIMIT 1",
2234 vec![json!([1.0, 0.0])],
2235 ),
2236 );
2237 assert!(search.contains("\"rows\""), "got: {search}");
2238 assert!(search.contains("\"score\":1"), "got: {search}");
2239 assert!(!search.contains("\"error\""), "got: {search}");
2240 }
2241
2242 #[test]
2243 fn query_with_typed_json_rpc_params_round_trips() {
2244 let rt = make_runtime();
2245 let create = handle(
2246 &rt,
2247 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE value_params (ok BOOLEAN, score FLOAT, payload BLOB, body JSON, seen_at TIMESTAMP, ident UUID)"}}"#,
2248 );
2249 assert!(!create.contains("\"error\""), "got: {create}");
2250
2251 let insert = handle(
2252 &rt,
2253 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO value_params (ok, score, payload, body, seen_at, ident) VALUES ($1, $2, $3, $4, $5, $6)","params":[true,{"$float":"NaN"},{"$bytes":"3q2+7w=="},{"z":[1,{"a":true}],"a":null},{"$ts":"1700000000123456789"},{"$uuid":"00112233-4455-6677-8899-aabbccddeeff"}]}}"#,
2254 );
2255 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2256
2257 let selected = handle(
2258 &rt,
2259 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"SELECT * FROM value_params"}}"#,
2260 );
2261 assert!(selected.contains("\"ok\":true"), "got: {selected}");
2262 assert!(selected.contains("\"$float\":\"NaN\""), "got: {selected}");
2263 assert!(
2264 selected.contains("\"$bytes\":\"3q2+7w==\""),
2265 "got: {selected}"
2266 );
2267 assert!(
2268 selected.contains("\"body\":{\"a\":null,\"z\":[1,{\"a\":true}]}"),
2269 "got: {selected}"
2270 );
2271 assert!(
2272 selected.contains("\"$ts\":\"1700000000123456789\""),
2273 "got: {selected}"
2274 );
2275 assert!(
2276 selected.contains("\"$uuid\":\"00112233-4455-6677-8899-aabbccddeeff\""),
2277 "got: {selected}"
2278 );
2279 }
2280
2281 #[test]
2282 fn select_timeseries_tags_decodes_json_payload() {
2283 let rt = make_runtime();
2284 let create = handle(&rt, &query_request(1, "CREATE TIMESERIES ts1"));
2285 assert!(!create.contains("\"error\""), "got: {create}");
2286
2287 let insert = handle(
2288 &rt,
2289 &query_request(
2290 2,
2291 r#"INSERT INTO ts1 (metric, value, tags, timestamp) VALUES ('cpu', 85, '{"host":"a"}', 1000)"#,
2292 ),
2293 );
2294 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2295
2296 let selected = handle(&rt, &query_request(3, "SELECT tags FROM ts1"));
2297 assert!(!selected.contains("<json"), "got: {selected}");
2298 let response = json::from_str::<Value>(&selected).expect("response json");
2299 let tags = response
2300 .get("result")
2301 .and_then(|result| result.get("rows"))
2302 .and_then(Value::as_array)
2303 .and_then(|rows| rows.first())
2304 .and_then(|row| row.get("tags"))
2305 .expect("tags field");
2306 assert_eq!(tags, &json!({"host": "a"}));
2307 }
2308
2309 #[test]
2310 fn select_table_json_column_round_trips_after_single_parse() {
2311 let rt = make_runtime();
2312 let create = handle(&rt, &query_request(1, "CREATE TABLE docs (payload JSON)"));
2313 assert!(!create.contains("\"error\""), "got: {create}");
2314
2315 let original = r#"{"nested":{"items":[1,true,"x"],"object":{"k":"v"}}}"#;
2316 let insert_sql = format!("INSERT INTO docs (payload) VALUES ({original})");
2317 let insert = handle(&rt, &query_request(2, &insert_sql));
2318 assert!(insert.contains("\"affected\":1"), "got: {insert}");
2319
2320 let selected = handle(&rt, &query_request(3, "SELECT payload FROM docs"));
2321 assert!(!selected.contains("<json"), "got: {selected}");
2322 let response = json::from_str::<Value>(&selected).expect("response json");
2323 let payload = response
2324 .get("result")
2325 .and_then(|result| result.get("rows"))
2326 .and_then(Value::as_array)
2327 .and_then(|rows| rows.first())
2328 .and_then(|row| row.get("payload"))
2329 .expect("payload field");
2330 let expected = json::from_str::<Value>(original).expect("expected json");
2331 assert_eq!(payload, &expected);
2332
2333 let payload_text = payload.to_string_compact();
2334 assert_eq!(
2335 json::from_str::<Value>(&payload_text).expect("single parse"),
2336 expected
2337 );
2338 }
2339
2340 #[test]
2341 fn select_json_corruption_falls_back_to_code_and_hex() {
2342 use crate::storage::query::unified::UnifiedResult;
2343
2344 let mut result = UnifiedResult::with_columns(vec!["payload".into()]);
2345 let mut record = UnifiedRecord::new();
2346 record.set("payload", SchemaValue::Json(b"{not json".to_vec()));
2347 result.push(record);
2348
2349 let json = query_result_to_json(&RuntimeQueryResult {
2350 query: "SELECT payload FROM docs".to_string(),
2351 mode: crate::storage::query::modes::QueryMode::Sql,
2352 statement: "select",
2353 engine: "runtime-table",
2354 result,
2355 affected_rows: 0,
2356 statement_type: "select",
2357 bookmark: None,
2358 });
2359
2360 let payload = json
2361 .get("rows")
2362 .and_then(Value::as_array)
2363 .and_then(|rows| rows.first())
2364 .and_then(|row| row.get("payload"))
2365 .expect("payload field");
2366 assert_eq!(
2367 payload.get("code").and_then(Value::as_str),
2368 Some("INVALID_JSON")
2369 );
2370 assert_eq!(
2371 payload.get("hex").and_then(Value::as_str),
2372 Some("7b6e6f74206a736f6e")
2373 );
2374 }
2375
2376 #[test]
2377 fn json_value_to_schema_value_decodes_typed_envelopes() {
2378 let SchemaValue::Blob(bytes) = json_value_to_schema_value(&json!({ "$bytes": "AAECAw==" }))
2379 else {
2380 panic!("expected blob");
2381 };
2382 assert_eq!(bytes, vec![0, 1, 2, 3]);
2383
2384 assert_eq!(
2385 json_value_to_schema_value(&json!({ "$ts": "9223372036854775807" })),
2386 SchemaValue::Timestamp(i64::MAX)
2387 );
2388
2389 let SchemaValue::Uuid(bytes) = json_value_to_schema_value(&json!({
2390 "$uuid": "00112233-4455-6677-8899-aabbccddeeff"
2391 })) else {
2392 panic!("expected uuid");
2393 };
2394 assert_eq!(
2395 bytes,
2396 [
2397 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd,
2398 0xee, 0xff
2399 ]
2400 );
2401
2402 let SchemaValue::Float(value) =
2403 json_value_to_schema_value(&json!({ "$float": "-Infinity" }))
2404 else {
2405 panic!("expected float");
2406 };
2407 assert!(value.is_infinite() && value.is_sign_negative());
2408 }
2409
2410 #[test]
2411 fn query_with_params_arity_mismatch_rejected() {
2412 let rt = make_runtime();
2413 let _ = handle(
2414 &rt,
2415 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pa (id INTEGER)"}}"#,
2416 );
2417 let resp = handle(
2418 &rt,
2419 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pa WHERE id = $1","params":[1,2]}}"#,
2420 );
2421 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2422 }
2423
2424 #[test]
2425 fn query_with_question_params_arity_mismatch_rejected() {
2426 let rt = make_runtime();
2427 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpa (id INTEGER)"));
2428 let resp = handle(
2429 &rt,
2430 &query_request_with_params(
2431 2,
2432 "SELECT * FROM qpa WHERE id = ?",
2433 vec![json!(1), json!(2)],
2434 ),
2435 );
2436 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2437 assert!(resp.contains("SQL expects 1, got 2"), "got: {resp}");
2438 }
2439
2440 #[test]
2441 fn query_with_params_gap_rejected() {
2442 let rt = make_runtime();
2443 let _ = handle(
2444 &rt,
2445 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pg (a INTEGER, b INTEGER)"}}"#,
2446 );
2447 let resp = handle(
2448 &rt,
2449 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pg WHERE a = $1 AND b = $3","params":[1,2,3]}}"#,
2450 );
2451 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2452 }
2453
2454 #[test]
2455 fn query_with_question_numbered_gap_rejected() {
2456 let rt = make_runtime();
2457 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpg (id INTEGER)"));
2458 let resp = handle(
2459 &rt,
2460 &query_request_with_params(
2461 2,
2462 "SELECT * FROM qpg WHERE id = ?2",
2463 vec![json!(1), json!(2)],
2464 ),
2465 );
2466 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2467 assert!(resp.contains("parameter $`1` is missing"), "got: {resp}");
2468 }
2469
2470 #[test]
2471 fn query_with_question_params_type_mismatch_names_slot() {
2472 let rt = make_runtime();
2473 let _ = handle(&rt, &query_request(1, "CREATE TABLE qpt (id INTEGER)"));
2474 let resp = handle(
2475 &rt,
2476 &query_request_with_params(
2477 2,
2478 "INSERT INTO qpt (id) VALUES (?)",
2479 vec![json!("not-an-integer")],
2480 ),
2481 );
2482 assert!(resp.contains("\"QUERY_ERROR\""), "got: {resp}");
2483 assert!(resp.contains("id"), "got: {resp}");
2484 assert!(resp.contains("integer"), "got: {resp}");
2485 }
2486
2487 #[test]
2488 fn query_select_one_returns_rows() {
2489 let rt = make_runtime();
2490 let resp = handle(
2491 &rt,
2492 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"SELECT 1 AS one"}}"#,
2493 );
2494 assert!(resp.contains("\"result\""));
2495 assert!(!resp.contains("\"error\""));
2496 }
2497
2498 #[test]
2499 fn ask_query_result_uses_canonical_envelope() {
2500 use crate::storage::query::unified::UnifiedResult;
2501
2502 let mut result = UnifiedResult::with_columns(vec![
2503 "answer".into(),
2504 "provider".into(),
2505 "model".into(),
2506 "prompt_tokens".into(),
2507 "completion_tokens".into(),
2508 "sources_count".into(),
2509 "sources_flat".into(),
2510 "citations".into(),
2511 "validation".into(),
2512 ]);
2513 let mut record = UnifiedRecord::new();
2514 record.set("answer", SchemaValue::text("Deploy failed [^1]."));
2515 record.set("provider", SchemaValue::text("openai"));
2516 record.set("model", SchemaValue::text("gpt-4o-mini"));
2517 record.set("prompt_tokens", SchemaValue::Integer(11));
2518 record.set("completion_tokens", SchemaValue::Integer(7));
2519 record.set(
2520 "sources_flat",
2521 SchemaValue::Json(
2522 br#"[{"urn":"urn:reddb:row:deployments:1","kind":"row","collection":"deployments","id":"1"}]"#.to_vec(),
2523 ),
2524 );
2525 record.set(
2526 "citations",
2527 SchemaValue::Json(br#"[{"marker":1,"urn":"urn:reddb:row:deployments:1"}]"#.to_vec()),
2528 );
2529 record.set(
2530 "validation",
2531 SchemaValue::Json(br#"{"ok":true,"warnings":[],"errors":[]}"#.to_vec()),
2532 );
2533 result.push(record);
2534
2535 let json = query_result_to_json(&RuntimeQueryResult {
2536 query: "ASK 'why did deploy fail?'".to_string(),
2537 mode: crate::storage::query::modes::QueryMode::Sql,
2538 statement: "ask",
2539 engine: "runtime-ai",
2540 result,
2541 affected_rows: 0,
2542 statement_type: "select",
2543 bookmark: None,
2544 });
2545
2546 assert_eq!(
2547 json.get("answer").and_then(Value::as_str),
2548 Some("Deploy failed [^1].")
2549 );
2550 assert_eq!(json.get("cache_hit").and_then(Value::as_bool), Some(false));
2551 assert_eq!(json.get("cost_usd").and_then(Value::as_f64), Some(0.0));
2552 assert_eq!(json.get("mode").and_then(Value::as_str), Some("strict"));
2553 assert_eq!(json.get("retry_count").and_then(Value::as_u64), Some(0));
2554 assert!(
2555 json.get("rows").is_none(),
2556 "ASK envelope must not be row-wrapped: {json}"
2557 );
2558 assert!(
2559 json.get("sources_flat")
2560 .and_then(Value::as_array)
2561 .is_some_and(|sources| sources.len() == 1
2562 && sources[0].get("payload").and_then(Value::as_str).is_some()),
2563 "sources_flat must be a parsed array: {json}"
2564 );
2565 assert!(
2566 json.get("citations")
2567 .and_then(Value::as_array)
2568 .is_some_and(|citations| citations.len() == 1),
2569 "citations must be a parsed array: {json}"
2570 );
2571 assert_eq!(
2572 json.get("validation")
2573 .and_then(|v| v.get("ok"))
2574 .and_then(Value::as_bool),
2575 Some(true)
2576 );
2577 }
2578
2579 #[test]
2584 fn tx_begin_returns_tx_id_and_isolation() {
2585 let rt = make_runtime();
2586 with_session(&rt, |call, _| {
2587 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2588 assert!(resp.contains("\"tx_id\":1"));
2589 assert!(resp.contains("\"isolation\":\"read_committed_deferred\""));
2590 assert!(!resp.contains("\"error\""));
2591 });
2592 }
2593
2594 #[test]
2595 fn tx_begin_twice_returns_already_open() {
2596 let rt = make_runtime();
2597 with_session(&rt, |call, _| {
2598 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2599 let resp = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
2600 assert!(resp.contains("\"code\":\"TX_ALREADY_OPEN\""));
2601 });
2602 }
2603
2604 #[test]
2605 fn tx_commit_without_begin_returns_no_tx_open() {
2606 let rt = make_runtime();
2607 with_session(&rt, |call, _| {
2608 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.commit","params":null}"#);
2609 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
2610 });
2611 }
2612
2613 #[test]
2614 fn tx_rollback_without_begin_returns_no_tx_open() {
2615 let rt = make_runtime();
2616 with_session(&rt, |call, _| {
2617 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.rollback","params":null}"#);
2618 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
2619 });
2620 }
2621
2622 #[test]
2623 fn insert_inside_tx_returns_pending_envelope() {
2624 let rt = make_runtime();
2625 let _ = handle(
2627 &rt,
2628 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE users (name TEXT)"}}"#,
2629 );
2630 with_session(&rt, |call, _| {
2631 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2632 let resp = call(
2633 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"users","payload":{"name":"alice"}}}"#,
2634 );
2635 assert!(resp.contains("\"pending\":true"));
2636 assert!(resp.contains("\"tx_id\":1"));
2637 assert!(resp.contains("\"affected\":0"));
2638 });
2639 }
2640
2641 #[test]
2642 fn begin_insert_rollback_does_not_persist() {
2643 let rt = make_runtime();
2644 let _ = handle(
2645 &rt,
2646 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u (name TEXT)"}}"#,
2647 );
2648 with_session(&rt, |call, _| {
2649 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2650 let _ = call(
2651 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u","payload":{"name":"ghost"}}}"#,
2652 );
2653 let rollback = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
2654 assert!(rollback.contains("\"ops_discarded\":1"));
2655 assert!(rollback.contains("\"tx_id\":1"));
2656 });
2657 let resp = handle(
2659 &rt,
2660 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u"}}"#,
2661 );
2662 assert!(!resp.contains("\"ghost\""));
2663 }
2664
2665 #[test]
2666 fn begin_insert_commit_persists() {
2667 let rt = make_runtime();
2668 let _ = handle(
2669 &rt,
2670 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u2 (name TEXT)"}}"#,
2671 );
2672 with_session(&rt, |call, _| {
2673 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2674 let _ = call(
2675 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u2","payload":{"name":"alice"}}}"#,
2676 );
2677 let _ = call(
2678 r#"{"jsonrpc":"2.0","id":3,"method":"insert","params":{"collection":"u2","payload":{"name":"bob"}}}"#,
2679 );
2680 let commit = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.commit","params":null}"#);
2681 assert!(commit.contains("\"ops_replayed\":2"));
2682 assert!(!commit.contains("\"error\""));
2683 });
2684 let resp = handle(
2685 &rt,
2686 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u2"}}"#,
2687 );
2688 assert!(resp.contains("\"alice\""));
2689 assert!(resp.contains("\"bob\""));
2690 }
2691
2692 #[test]
2693 fn bulk_insert_inside_tx_buffers_everything() {
2694 let rt = make_runtime();
2695 let _ = handle(
2696 &rt,
2697 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u3 (name TEXT)"}}"#,
2698 );
2699 with_session(&rt, |call, _| {
2700 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2701 let resp = call(
2702 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"u3","payloads":[{"name":"a"},{"name":"b"},{"name":"c"}]}}"#,
2703 );
2704 assert!(resp.contains("\"buffered\":3"));
2705 assert!(resp.contains("\"pending\":true"));
2706 assert!(resp.contains("\"affected\":0"));
2707
2708 let commit = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
2709 assert!(commit.contains("\"ops_replayed\":3"));
2710 });
2711 }
2712
2713 #[test]
2714 fn bulk_insert_chunks_at_internal_500_row_limit() {
2715 assert_eq!(bulk_insert_chunk_count(0), 0);
2716 assert_eq!(bulk_insert_chunk_count(1), 1);
2717 assert_eq!(bulk_insert_chunk_count(500), 1);
2718 assert_eq!(bulk_insert_chunk_count(501), 2);
2719 assert_eq!(bulk_insert_chunk_count(1000), 2);
2720 assert_eq!(bulk_insert_chunk_count(1001), 3);
2721 }
2722
2723 proptest! {
2724 #![proptest_config(ProptestConfig {
2725 cases: 12,
2726 ..ProptestConfig::default()
2727 })]
2728
2729 #[test]
2730 fn bulk_insert_matches_sequential_insert_state(
2731 names in proptest::collection::vec("[a-z]{1,8}", 1usize..20)
2732 ) {
2733 let rt = make_runtime();
2734 let payloads = names
2735 .iter()
2736 .map(|name| format!(r#"{{"name":"{name}","kind":"bulk"}}"#))
2737 .collect::<Vec<_>>();
2738 let payload_array = payloads.join(",");
2739
2740 let bulk = handle(
2741 &rt,
2742 &format!(
2743 r#"{{"jsonrpc":"2.0","id":1,"method":"bulk_insert","params":{{"collection":"bulk_prop","payloads":[{payload_array}]}}}}"#
2744 ),
2745 );
2746 let bulk_result = json::from_str::<Value>(&bulk).expect("bulk json");
2747 let bulk_ids = bulk_result
2748 .get("result")
2749 .and_then(|result| result.get("ids"))
2750 .and_then(Value::as_array)
2751 .expect("bulk ids");
2752 prop_assert_eq!(bulk_ids.len(), names.len());
2753
2754 for (index, payload) in payloads.iter().enumerate() {
2755 let insert = handle(
2756 &rt,
2757 &format!(
2758 r#"{{"jsonrpc":"2.0","id":{},"method":"insert","params":{{"collection":"seq_prop","payload":{payload}}}}}"#,
2759 index + 10
2760 ),
2761 );
2762 let insert_result = json::from_str::<Value>(&insert).expect("insert json");
2763 prop_assert!(
2764 insert_result
2765 .get("result")
2766 .and_then(|result| result.get("id"))
2767 .is_some(),
2768 "insert response missing id: {insert}"
2769 );
2770 }
2771
2772 let bulk_rows = result_name_kind(&handle(
2773 &rt,
2774 r#"{"jsonrpc":"2.0","id":99,"method":"query","params":{"sql":"SELECT name, kind FROM bulk_prop ORDER BY red_entity_id"}}"#,
2775 ));
2776 let seq_rows = result_name_kind(&handle(
2777 &rt,
2778 r#"{"jsonrpc":"2.0","id":100,"method":"query","params":{"sql":"SELECT name, kind FROM seq_prop ORDER BY red_entity_id"}}"#,
2779 ));
2780 prop_assert_eq!(bulk_rows, seq_rows);
2781 }
2782
2783 #[test]
2784 fn question_param_select_matches_inlined_literal(value in json_scalar_param()) {
2785 let rt = make_runtime();
2786 let bound = handle(
2787 &rt,
2788 &query_request_with_params(1, "SELECT ? AS v", vec![value.clone()]),
2789 );
2790 let inline_sql = format!("SELECT {} AS v", sql_literal_for_json(&value));
2791 let inlined = handle(&rt, &query_request(2, &inline_sql));
2792 prop_assert_eq!(
2793 result_rows(&bound),
2794 result_rows(&inlined),
2795 "bound={}, inlined={}",
2796 bound,
2797 inlined
2798 );
2799 }
2800 }
2801
2802 #[test]
2803 fn bulk_insert_graph_nodes_accepts_flat_rows_and_returns_ids() {
2804 let rt = make_runtime();
2805 create_graph_collection(&rt, "social");
2806
2807 let resp = handle(
2808 &rt,
2809 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"social","payloads":[{"label":"User","name":"alice"},{"label":"User","name":"bob"}]}}"#,
2810 );
2811 let envelope: Value = json::from_str(&resp).expect("json response");
2812 let result = envelope.get("result").expect("result");
2813 assert_eq!(result.get("affected").and_then(Value::as_u64), Some(2));
2814 assert_eq!(
2815 result
2816 .get("ids")
2817 .and_then(Value::as_array)
2818 .map(|ids| ids.len()),
2819 Some(2)
2820 );
2821
2822 let query = handle(
2823 &rt,
2824 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"MATCH (n:User) RETURN n.name"}}"#,
2825 );
2826 assert!(query.contains("\"alice\""), "got: {query}");
2827 assert!(query.contains("\"bob\""), "got: {query}");
2828 }
2829
2830 #[test]
2831 fn bulk_insert_graph_edges_accepts_flat_rows_and_returns_ids() {
2832 let rt = make_runtime();
2833 create_graph_collection(&rt, "network");
2834 let nodes = handle(
2835 &rt,
2836 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"network","payloads":[{"label":"Host","name":"app"},{"label":"Host","name":"db"}]}}"#,
2837 );
2838 let envelope: Value = json::from_str(&nodes).expect("node response");
2839 let ids = envelope
2840 .get("result")
2841 .and_then(|r| r.get("ids"))
2842 .and_then(Value::as_array)
2843 .expect("node ids");
2844 let from = ids[0].as_u64().expect("from id");
2845 let to = ids[1].as_u64().expect("to id");
2846
2847 let resp = handle(
2848 &rt,
2849 &format!(
2850 r#"{{"jsonrpc":"2.0","id":3,"method":"bulk_insert","params":{{"collection":"network","payloads":[{{"label":"connects","from":{from},"to":{to},"weight":0.5,"role":"primary"}}]}}}}"#
2851 ),
2852 );
2853 let envelope: Value = json::from_str(&resp).expect("edge response");
2854 let result = envelope.get("result").expect("result");
2855 assert_eq!(result.get("affected").and_then(Value::as_u64), Some(1));
2856 assert_eq!(
2857 result
2858 .get("ids")
2859 .and_then(Value::as_array)
2860 .map(|ids| ids.len()),
2861 Some(1)
2862 );
2863 }
2864
2865 #[test]
2866 fn delete_inside_tx_is_buffered() {
2867 let rt = make_runtime();
2868 let _ = handle(
2870 &rt,
2871 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u4 (name TEXT)"}}"#,
2872 );
2873 let _ = handle(
2874 &rt,
2875 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO u4 (name) VALUES ('keep')"}}"#,
2876 );
2877 with_session(&rt, |call, _| {
2878 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2879 let resp = call(
2880 r#"{"jsonrpc":"2.0","id":2,"method":"delete","params":{"collection":"u4","id":"1"}}"#,
2881 );
2882 assert!(resp.contains("\"pending\":true"));
2883 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
2884 });
2885 let resp = handle(
2887 &rt,
2888 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u4"}}"#,
2889 );
2890 assert!(resp.contains("\"keep\""));
2891 }
2892
2893 #[test]
2894 fn close_with_open_tx_auto_rollbacks() {
2895 let rt = make_runtime();
2896 let _ = handle(
2897 &rt,
2898 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u5 (name TEXT)"}}"#,
2899 );
2900 with_session(&rt, |call, _| {
2901 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2902 let _ = call(
2903 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u5","payload":{"name":"ghost"}}}"#,
2904 );
2905 let close = call(r#"{"jsonrpc":"2.0","id":3,"method":"close","params":null}"#);
2906 assert!(close.contains("\"__close__\":true"));
2907 assert!(!close.contains("\"error\""));
2908 });
2909 let resp = handle(
2910 &rt,
2911 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u5"}}"#,
2912 );
2913 assert!(!resp.contains("\"ghost\""));
2914 }
2915
2916 fn seed_numbers_table(rt: &RedDBRuntime, table: &str, count: u32) {
2921 let _ = handle(
2922 rt,
2923 &format!(
2924 r#"{{"jsonrpc":"2.0","id":1,"method":"query","params":{{"sql":"CREATE TABLE {table} (n INTEGER)"}}}}"#,
2925 ),
2926 );
2927 for i in 0..count {
2928 let _ = handle(
2929 rt,
2930 &format!(
2931 r#"{{"jsonrpc":"2.0","id":2,"method":"query","params":{{"sql":"INSERT INTO {table} (n) VALUES ({i})"}}}}"#,
2932 ),
2933 );
2934 }
2935 }
2936
2937 #[test]
2938 fn cursor_open_returns_id_columns_and_total() {
2939 let rt = make_runtime();
2940 seed_numbers_table(&rt, "nums1", 3);
2941 with_session(&rt, |call, _| {
2942 let resp = call(
2943 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums1"}}"#,
2944 );
2945 assert!(resp.contains("\"cursor_id\":1"));
2946 assert!(resp.contains("\"total_rows\":3"));
2947 assert!(resp.contains("\"columns\""));
2948 assert!(!resp.contains("\"error\""));
2949 });
2950 }
2951
2952 #[test]
2953 fn cursor_next_chunks_rows_and_signals_done() {
2954 let rt = make_runtime();
2955 seed_numbers_table(&rt, "nums2", 5);
2956 with_session(&rt, |call, _| {
2957 let _ = call(
2958 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums2"}}"#,
2959 );
2960 let first = call(
2961 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2962 );
2963 assert!(first.contains("\"done\":false"));
2964 assert!(first.contains("\"remaining\":3"));
2965
2966 let second = call(
2967 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2968 );
2969 assert!(second.contains("\"done\":false"));
2970 assert!(second.contains("\"remaining\":1"));
2971
2972 let third = call(
2973 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2974 );
2975 assert!(third.contains("\"done\":true"));
2976 assert!(third.contains("\"remaining\":0"));
2977 });
2978 }
2979
2980 #[test]
2981 fn cursor_auto_drops_when_exhausted() {
2982 let rt = make_runtime();
2983 seed_numbers_table(&rt, "nums3", 2);
2984 with_session(&rt, |call, _| {
2985 let _ = call(
2986 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums3"}}"#,
2987 );
2988 let _ = call(
2989 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
2990 );
2991 let resp = call(
2994 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
2995 );
2996 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
2997 });
2998 }
2999
3000 #[test]
3001 fn cursor_close_removes_it() {
3002 let rt = make_runtime();
3003 seed_numbers_table(&rt, "nums4", 3);
3004 with_session(&rt, |call, _| {
3005 let _ = call(
3006 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums4"}}"#,
3007 );
3008 let close =
3009 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.close","params":{"cursor_id":1}}"#);
3010 assert!(close.contains("\"closed\":true"));
3011 let after = call(
3012 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3013 );
3014 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3015 });
3016 }
3017
3018 #[test]
3019 fn cursor_close_unknown_errors() {
3020 let rt = make_runtime();
3021 with_session(&rt, |call, _| {
3022 let resp = call(
3023 r#"{"jsonrpc":"2.0","id":1,"method":"query.close","params":{"cursor_id":9999}}"#,
3024 );
3025 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3026 });
3027 }
3028
3029 #[test]
3030 fn cursor_next_without_cursor_id_errors() {
3031 let rt = make_runtime();
3032 with_session(&rt, |call, _| {
3033 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"query.next","params":{}}"#);
3034 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
3035 });
3036 }
3037
3038 #[test]
3039 fn cursor_default_batch_size_returns_all_when_smaller_than_default() {
3040 let rt = make_runtime();
3041 seed_numbers_table(&rt, "nums5", 7);
3042 with_session(&rt, |call, _| {
3043 let _ = call(
3044 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums5"}}"#,
3045 );
3046 let resp =
3048 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1}}"#);
3049 assert!(resp.contains("\"done\":true"));
3050 assert!(resp.contains("\"remaining\":0"));
3051 });
3052 }
3053
3054 #[test]
3055 fn close_method_drops_open_cursors() {
3056 let rt = make_runtime();
3057 seed_numbers_table(&rt, "nums6", 3);
3058 with_session(&rt, |call, _| {
3061 let _ = call(
3062 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums6"}}"#,
3063 );
3064 let close = call(r#"{"jsonrpc":"2.0","id":2,"method":"close","params":null}"#);
3065 assert!(close.contains("\"__close__\":true"));
3066 let after = call(
3068 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3069 );
3070 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3071 });
3072 }
3073
3074 #[test]
3075 fn cursor_independent_of_transaction_state() {
3076 let rt = make_runtime();
3077 seed_numbers_table(&rt, "nums7", 4);
3078 with_session(&rt, |call, _| {
3079 let _ = call(
3081 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums7"}}"#,
3082 );
3083 let _ = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
3084 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
3085 let resp = call(
3086 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3087 );
3088 assert!(resp.contains("\"done\":true"));
3089 assert!(!resp.contains("\"error\""));
3090 });
3091 }
3092
3093 #[test]
3094 fn second_tx_after_commit_gets_fresh_id() {
3095 let rt = make_runtime();
3096 let _ = handle(
3097 &rt,
3098 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u6 (name TEXT)"}}"#,
3099 );
3100 with_session(&rt, |call, _| {
3101 let first = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
3102 assert!(first.contains("\"tx_id\":1"));
3103 let _ = call(
3104 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u6","payload":{"name":"x"}}}"#,
3105 );
3106 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
3107
3108 let second = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.begin","params":null}"#);
3109 assert!(second.contains("\"tx_id\":2"));
3110 let _ = call(r#"{"jsonrpc":"2.0","id":5,"method":"tx.rollback","params":null}"#);
3111 });
3112 }
3113
3114 #[test]
3115 fn prepare_and_execute_prepared_statement() {
3116 let rt = make_runtime();
3117 let _ = handle(
3119 &rt,
3120 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE ps_test (n INTEGER)"}}"#,
3121 );
3122 let _ = handle(
3123 &rt,
3124 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO ps_test (n) VALUES (42)"}}"#,
3125 );
3126
3127 with_session(&rt, |call, _| {
3128 let prep = call(
3130 r#"{"jsonrpc":"2.0","id":3,"method":"prepare","params":{"sql":"SELECT n FROM ps_test WHERE n = 42"}}"#,
3131 );
3132 assert!(prep.contains("\"prepared_id\""), "prepare response: {prep}");
3133
3134 let id: u64 = {
3136 let v: crate::json::Value = crate::json::from_str(&prep).expect("json");
3137 let result = v.get("result").expect("result");
3138 result
3139 .get("prepared_id")
3140 .and_then(|n| n.as_f64())
3141 .expect("prepared_id") as u64
3142 };
3143
3144 let exec = call(&format!(
3146 r#"{{"jsonrpc":"2.0","id":4,"method":"execute_prepared","params":{{"prepared_id":{id},"binds":[42]}}}}"#
3147 ));
3148 assert!(
3150 exec.contains("\"rows\""),
3151 "execute_prepared response: {exec}"
3152 );
3153 assert!(exec.contains("42"), "expected row with n=42 in: {exec}");
3154 });
3155 }
3156}