1use std::io::{BufRead, BufReader, Stdin, Write};
19use std::panic::AssertUnwindSafe;
20
21use tokio::sync::Mutex as AsyncMutex;
22
23use crate::json::{self as json, Value};
24use crate::runtime::{RedDBRuntime, RuntimeQueryResult};
25use crate::storage::query::unified::UnifiedRecord;
26use crate::storage::schema::Value as SchemaValue;
27use reddb_client_connector::RedDBClient;
28
29enum Backend<'a> {
42 Local(&'a RedDBRuntime),
43 Remote(Box<RemoteBackend<'a>>),
44}
45
46struct RemoteBackend<'a> {
47 client: AsyncMutex<RedDBClient>,
48 tokio_rt: &'a tokio::runtime::Runtime,
49}
50
51pub const PROTOCOL_VERSION: &str = "1.0";
53
54pub mod error_code {
56 pub const PARSE_ERROR: &str = "PARSE_ERROR";
57 pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
58 pub const INVALID_PARAMS: &str = "INVALID_PARAMS";
59 pub const QUERY_ERROR: &str = "QUERY_ERROR";
60 pub const NOT_FOUND: &str = "NOT_FOUND";
61 pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
62 pub const TX_ALREADY_OPEN: &str = "TX_ALREADY_OPEN";
65 pub const NO_TX_OPEN: &str = "NO_TX_OPEN";
68 pub const TX_REPLAY_FAILED: &str = "TX_REPLAY_FAILED";
72 pub const TX_NOT_SUPPORTED_REMOTE: &str = "TX_NOT_SUPPORTED_REMOTE";
74 pub const CURSOR_NOT_FOUND: &str = "CURSOR_NOT_FOUND";
78 pub const CURSOR_LIMIT_EXCEEDED: &str = "CURSOR_LIMIT_EXCEEDED";
80}
81
82pub(crate) const MAX_CURSORS_PER_SESSION: usize = 64;
86pub(crate) const DEFAULT_CURSOR_BATCH_SIZE: usize = 100;
90pub(crate) const MAX_CURSOR_BATCH_SIZE: usize = 10_000;
93
94struct StdioPreparedStatement {
117 shape: crate::storage::query::ast::QueryExpr,
118 parameter_count: usize,
119}
120
121pub(crate) struct Session {
122 next_tx_id: u64,
123 current_tx: Option<OpenTx>,
124 next_cursor_id: u64,
125 cursors: std::collections::HashMap<u64, Cursor>,
126 next_prepared_id: u64,
128 prepared: std::collections::HashMap<u64, StdioPreparedStatement>,
130}
131
132impl Session {
133 pub(crate) fn new() -> Self {
134 Self {
135 next_tx_id: 1,
136 current_tx: None,
137 next_cursor_id: 1,
138 cursors: std::collections::HashMap::new(),
139 next_prepared_id: 1,
140 prepared: std::collections::HashMap::new(),
141 }
142 }
143
144 fn open_tx(&mut self) -> Result<u64, (&'static str, String)> {
145 if let Some(tx) = &self.current_tx {
146 return Err((
147 error_code::TX_ALREADY_OPEN,
148 format!("transaction {} already open in this session", tx.tx_id),
149 ));
150 }
151 let tx_id = self.next_tx_id;
152 self.next_tx_id = self.next_tx_id.saturating_add(1);
153 self.current_tx = Some(OpenTx {
154 tx_id,
155 write_set: Vec::new(),
156 });
157 Ok(tx_id)
158 }
159
160 fn take_tx(&mut self) -> Option<OpenTx> {
161 self.current_tx.take()
162 }
163
164 fn current_tx_mut(&mut self) -> Option<&mut OpenTx> {
165 self.current_tx.as_mut()
166 }
167
168 #[allow(dead_code)]
169 fn has_tx(&self) -> bool {
170 self.current_tx.is_some()
171 }
172
173 fn insert_cursor(&mut self, cursor: Cursor) -> Result<u64, (&'static str, String)> {
176 if self.cursors.len() >= MAX_CURSORS_PER_SESSION {
177 return Err((
178 error_code::CURSOR_LIMIT_EXCEEDED,
179 format!(
180 "session already holds {} cursors (max {}) — close some before opening new ones",
181 self.cursors.len(),
182 MAX_CURSORS_PER_SESSION
183 ),
184 ));
185 }
186 let id = self.next_cursor_id;
187 self.next_cursor_id = self.next_cursor_id.saturating_add(1);
188 let mut cursor = cursor;
189 cursor.cursor_id = id;
190 self.cursors.insert(id, cursor);
191 Ok(id)
192 }
193
194 fn cursor_mut(&mut self, id: u64) -> Option<&mut Cursor> {
195 self.cursors.get_mut(&id)
196 }
197
198 fn drop_cursor(&mut self, id: u64) -> Option<Cursor> {
199 self.cursors.remove(&id)
200 }
201
202 fn clear_cursors(&mut self) {
203 self.cursors.clear();
204 }
205}
206
207impl Default for Session {
208 fn default() -> Self {
209 Self::new()
210 }
211}
212
213struct OpenTx {
215 tx_id: u64,
216 write_set: Vec<PendingSql>,
217}
218
219enum PendingSql {
223 Insert(String),
224 Delete(String),
225 #[allow(dead_code)] Update(String),
227}
228
229impl PendingSql {
230 fn sql(&self) -> &str {
231 match self {
232 PendingSql::Insert(s) | PendingSql::Delete(s) | PendingSql::Update(s) => s,
233 }
234 }
235}
236
237pub(crate) struct Cursor {
249 cursor_id: u64,
250 columns: Vec<String>,
251 rows: Vec<UnifiedRecord>,
252 position: usize,
253}
254
255impl Cursor {
256 fn new(columns: Vec<String>, rows: Vec<UnifiedRecord>) -> Self {
257 Self {
258 cursor_id: 0, columns,
260 rows,
261 position: 0,
262 }
263 }
264
265 fn total(&self) -> usize {
266 self.rows.len()
267 }
268
269 fn remaining(&self) -> usize {
270 self.rows.len().saturating_sub(self.position)
271 }
272
273 fn is_exhausted(&self) -> bool {
274 self.position >= self.rows.len()
275 }
276
277 fn take_batch(&mut self, batch_size: usize) -> &[UnifiedRecord] {
280 let end = (self.position + batch_size).min(self.rows.len());
281 let slice = &self.rows[self.position..end];
282 self.position = end;
283 slice
284 }
285}
286
287pub fn run(runtime: &RedDBRuntime) -> i32 {
293 run_with_io(runtime, std::io::stdin(), &mut std::io::stdout())
294}
295
296pub fn run_remote(endpoint: &str, token: Option<String>) -> i32 {
303 let tokio_rt = match tokio::runtime::Builder::new_current_thread()
304 .enable_all()
305 .build()
306 {
307 Ok(rt) => rt,
308 Err(e) => {
309 tracing::error!(err = %e, "rpc: failed to build tokio runtime");
310 return 1;
311 }
312 };
313 let client = match tokio_rt.block_on(RedDBClient::connect(endpoint, token)) {
314 Ok(c) => c,
315 Err(e) => {
316 tracing::error!(endpoint, err = %e, "rpc: failed to connect");
317 return 1;
318 }
319 };
320 let backend = Backend::Remote(Box::new(RemoteBackend {
321 client: AsyncMutex::new(client),
322 tokio_rt: &tokio_rt,
323 }));
324 run_backend(&backend, std::io::stdin(), &mut std::io::stdout())
325}
326
327pub fn run_with_io<W: Write>(runtime: &RedDBRuntime, stdin: Stdin, stdout: &mut W) -> i32 {
329 run_backend(&Backend::Local(runtime), stdin, stdout)
330}
331
332static STDIO_SESSION_CONN_ID: std::sync::atomic::AtomicU64 =
339 std::sync::atomic::AtomicU64::new(1_000_000);
340
341fn next_stdio_conn_id() -> u64 {
342 STDIO_SESSION_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
343}
344
345fn run_backend<W: Write>(backend: &Backend<'_>, stdin: Stdin, stdout: &mut W) -> i32 {
346 let reader = BufReader::new(stdin.lock());
347 let mut session = Session::new();
348 let conn_id = next_stdio_conn_id();
352 crate::runtime::impl_core::set_current_connection_id(conn_id);
353 for line_result in reader.lines() {
354 let line = match line_result {
355 Ok(l) => l,
356 Err(e) => {
357 let _ = writeln!(
358 stdout,
359 "{}",
360 error_response(&Value::Null, error_code::INTERNAL_ERROR, &e.to_string())
361 );
362 let _ = stdout.flush();
363 return 1;
364 }
365 };
366 if line.trim().is_empty() {
367 continue;
368 }
369 let response = handle_line(backend, &mut session, &line);
370 if writeln!(stdout, "{}", response).is_err() || stdout.flush().is_err() {
371 return 1;
372 }
373 if response.contains("\"__close__\":true") {
374 return 0;
375 }
376 }
377 let _ = session.take_tx();
381 crate::runtime::impl_core::clear_current_connection_id();
382 0
383}
384
385fn handle_line(backend: &Backend<'_>, session: &mut Session, line: &str) -> String {
389 let parsed: Value = match json::from_str(line) {
390 Ok(v) => v,
391 Err(err) => {
392 return error_response(
393 &Value::Null,
394 error_code::PARSE_ERROR,
395 &format!("invalid JSON: {err}"),
396 );
397 }
398 };
399
400 let id = parsed.get("id").cloned().unwrap_or(Value::Null);
401
402 let method = match parsed.get("method").and_then(Value::as_str) {
403 Some(m) => m.to_string(),
404 None => {
405 return error_response(&id, error_code::INVALID_REQUEST, "missing 'method' field");
406 }
407 };
408
409 let params = parsed.get("params").cloned().unwrap_or(Value::Null);
410
411 let dispatch = std::panic::catch_unwind(AssertUnwindSafe(|| match backend {
412 Backend::Local(rt) => dispatch_method(rt, session, &method, ¶ms),
413 Backend::Remote(remote) => {
414 if matches!(
419 method.as_str(),
420 "tx.begin"
421 | "tx.commit"
422 | "tx.rollback"
423 | "query.open"
424 | "query.next"
425 | "query.close"
426 ) {
427 Err((
428 error_code::TX_NOT_SUPPORTED_REMOTE,
429 format!("{method} is not supported over remote gRPC yet"),
430 ))
431 } else {
432 dispatch_method_remote(&remote.client, remote.tokio_rt, &method, ¶ms)
433 }
434 }
435 }));
436
437 match dispatch {
438 Ok(Ok(result)) => success_response(&id, &result, method == "close"),
439 Ok(Err((code, msg))) => error_response(&id, code, &msg),
440 Err(_) => error_response(&id, error_code::INTERNAL_ERROR, "handler panicked (caught)"),
441 }
442}
443
444fn dispatch_method(
447 runtime: &RedDBRuntime,
448 session: &mut Session,
449 method: &str,
450 params: &Value,
451) -> Result<Value, (&'static str, String)> {
452 match method {
453 "tx.begin" => {
454 let tx_id = session.open_tx()?;
455 Ok(Value::Object(
456 [
457 ("tx_id".to_string(), Value::Number(tx_id as f64)),
458 (
459 "isolation".to_string(),
460 Value::String("read_committed_deferred".to_string()),
461 ),
462 ]
463 .into_iter()
464 .collect(),
465 ))
466 }
467
468 "tx.commit" => {
469 let tx = session.take_tx().ok_or((
470 error_code::NO_TX_OPEN,
471 "no transaction is open in this session".to_string(),
472 ))?;
473 let tx_id = tx.tx_id;
474 let op_count = tx.write_set.len();
475
476 let replay: Result<(u64, usize), (usize, String)> = (|| {
483 runtime
484 .execute_query("BEGIN")
485 .map_err(|e| (0usize, format!("BEGIN: {e}")))?;
486 let mut total_affected: u64 = 0;
487 for (idx, op) in tx.write_set.iter().enumerate() {
488 match runtime.execute_query(op.sql()) {
489 Ok(qr) => total_affected += qr.affected_rows,
490 Err(e) => {
491 let _ = runtime.execute_query("ROLLBACK");
492 return Err((idx, e.to_string()));
493 }
494 }
495 }
496 runtime
497 .execute_query("COMMIT")
498 .map_err(|e| (op_count, format!("COMMIT: {e}")))?;
499 Ok((total_affected, op_count))
500 })();
501
502 match replay {
503 Ok((affected, replayed)) => Ok(Value::Object(
504 [
505 ("tx_id".to_string(), Value::Number(tx_id as f64)),
506 ("ops_replayed".to_string(), Value::Number(replayed as f64)),
507 ("affected".to_string(), Value::Number(affected as f64)),
508 ]
509 .into_iter()
510 .collect(),
511 )),
512 Err((failed_idx, msg)) => Err((
513 error_code::TX_REPLAY_FAILED,
514 format!(
515 "tx {tx_id} replay failed at op {failed_idx}/{op_count}: {msg} \
516 (ops 0..{failed_idx} already applied and are NOT rolled back)"
517 ),
518 )),
519 }
520 }
521
522 "query.open" => {
523 let sql = params.get("sql").and_then(Value::as_str).ok_or((
524 error_code::INVALID_PARAMS,
525 "missing 'sql' string".to_string(),
526 ))?;
527 let qr = runtime
528 .execute_query(sql)
529 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
530
531 let columns: Vec<String> = qr
535 .result
536 .records
537 .first()
538 .map(|first| {
539 let mut keys: Vec<String> = first
540 .column_names()
541 .into_iter()
542 .map(|k| k.to_string())
543 .collect();
544 keys.sort();
545 keys
546 })
547 .unwrap_or_default();
548
549 let cursor = Cursor::new(columns.clone(), qr.result.records);
550 let total = cursor.total();
551 let cursor_id = session.insert_cursor(cursor)?;
552
553 Ok(Value::Object(
554 [
555 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
556 (
557 "columns".to_string(),
558 Value::Array(columns.into_iter().map(Value::String).collect()),
559 ),
560 ("total_rows".to_string(), Value::Number(total as f64)),
561 ]
562 .into_iter()
563 .collect(),
564 ))
565 }
566
567 "query.next" => {
568 let cursor_id = params
569 .get("cursor_id")
570 .and_then(|v| v.as_f64())
571 .map(|n| n as u64)
572 .ok_or((
573 error_code::INVALID_PARAMS,
574 "missing 'cursor_id' number".to_string(),
575 ))?;
576 let batch_size = params
577 .get("batch_size")
578 .and_then(|v| v.as_f64())
579 .map(|n| n as usize)
580 .unwrap_or(DEFAULT_CURSOR_BATCH_SIZE)
581 .clamp(1, MAX_CURSOR_BATCH_SIZE);
582
583 let (rows, done, remaining) = {
586 let cursor = session.cursor_mut(cursor_id).ok_or((
587 error_code::CURSOR_NOT_FOUND,
588 format!("cursor {cursor_id} not found"),
589 ))?;
590 let batch = cursor.take_batch(batch_size);
591 let rows_json: Vec<Value> = batch.iter().map(record_to_json_object).collect();
592 (rows_json, cursor.is_exhausted(), cursor.remaining())
593 };
594
595 if done {
596 let _ = session.drop_cursor(cursor_id);
599 }
600
601 Ok(Value::Object(
602 [
603 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
604 ("rows".to_string(), Value::Array(rows)),
605 ("done".to_string(), Value::Bool(done)),
606 ("remaining".to_string(), Value::Number(remaining as f64)),
607 ]
608 .into_iter()
609 .collect(),
610 ))
611 }
612
613 "query.close" => {
614 let cursor_id = params
615 .get("cursor_id")
616 .and_then(|v| v.as_f64())
617 .map(|n| n as u64)
618 .ok_or((
619 error_code::INVALID_PARAMS,
620 "missing 'cursor_id' number".to_string(),
621 ))?;
622 let existed = session.drop_cursor(cursor_id).is_some();
623 if !existed {
624 return Err((
625 error_code::CURSOR_NOT_FOUND,
626 format!("cursor {cursor_id} not found"),
627 ));
628 }
629 Ok(Value::Object(
630 [
631 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
632 ("closed".to_string(), Value::Bool(true)),
633 ]
634 .into_iter()
635 .collect(),
636 ))
637 }
638
639 "tx.rollback" => {
640 let tx = session.take_tx().ok_or((
641 error_code::NO_TX_OPEN,
642 "no transaction is open in this session".to_string(),
643 ))?;
644 let ops_discarded = tx.write_set.len();
645 Ok(Value::Object(
646 [
647 ("tx_id".to_string(), Value::Number(tx.tx_id as f64)),
648 (
649 "ops_discarded".to_string(),
650 Value::Number(ops_discarded as f64),
651 ),
652 ]
653 .into_iter()
654 .collect(),
655 ))
656 }
657
658 "version" => Ok(Value::Object(
659 [
660 (
661 "version".to_string(),
662 Value::String(env!("CARGO_PKG_VERSION").to_string()),
663 ),
664 (
665 "protocol".to_string(),
666 Value::String(PROTOCOL_VERSION.to_string()),
667 ),
668 ]
669 .into_iter()
670 .collect(),
671 )),
672
673 "health" => Ok(Value::Object(
674 [
675 ("ok".to_string(), Value::Bool(true)),
676 (
677 "version".to_string(),
678 Value::String(env!("CARGO_PKG_VERSION").to_string()),
679 ),
680 ]
681 .into_iter()
682 .collect(),
683 )),
684
685 "query" => {
686 let sql = params.get("sql").and_then(Value::as_str).ok_or((
687 error_code::INVALID_PARAMS,
688 "missing 'sql' string".to_string(),
689 ))?;
690 let qr = runtime
691 .execute_query(sql)
692 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
693 Ok(query_result_to_json(&qr))
694 }
695
696 "prepare" => {
705 use crate::storage::query::modes::parse_multi;
706 use crate::storage::query::planner::shape::parameterize_query_expr;
707
708 let sql = params.get("sql").and_then(Value::as_str).ok_or((
709 error_code::INVALID_PARAMS,
710 "missing 'sql' string".to_string(),
711 ))?;
712 let parsed = parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
713 let (shape, parameter_count) = if let Some(prepared) = parameterize_query_expr(&parsed)
714 {
715 (prepared.shape, prepared.parameter_count)
716 } else {
717 (parsed, 0)
718 };
719 let id = session.next_prepared_id;
720 session.next_prepared_id = session.next_prepared_id.saturating_add(1);
721 session.prepared.insert(
722 id,
723 StdioPreparedStatement {
724 shape,
725 parameter_count,
726 },
727 );
728 Ok(Value::Object(
729 [
730 ("prepared_id".to_string(), Value::Number(id as f64)),
731 (
732 "parameter_count".to_string(),
733 Value::Number(parameter_count as f64),
734 ),
735 ]
736 .into_iter()
737 .collect(),
738 ))
739 }
740
741 "execute_prepared" => {
742 use crate::storage::query::planner::shape::bind_parameterized_query;
743 use crate::storage::schema::Value as SV;
744
745 let id = params
746 .get("prepared_id")
747 .and_then(Value::as_f64)
748 .map(|n| n as u64)
749 .ok_or((
750 error_code::INVALID_PARAMS,
751 "missing 'prepared_id'".to_string(),
752 ))?;
753
754 let stmt = session.prepared.get(&id).ok_or((
755 error_code::QUERY_ERROR,
756 format!("no prepared statement with id {id}"),
757 ))?;
758
759 let binds_json: Vec<Value> = params
761 .get("binds")
762 .and_then(Value::as_array)
763 .map(|a| a.to_vec())
764 .unwrap_or_default();
765 if binds_json.len() != stmt.parameter_count {
766 return Err((
767 error_code::INVALID_PARAMS,
768 format!(
769 "expected {} bind values, got {}",
770 stmt.parameter_count,
771 binds_json.len()
772 ),
773 ));
774 }
775
776 let binds: Vec<SV> = binds_json.iter().map(json_value_to_schema_value).collect();
778
779 let expr = if stmt.parameter_count == 0 {
781 stmt.shape.clone()
782 } else {
783 bind_parameterized_query(&stmt.shape, &binds, stmt.parameter_count)
784 .ok_or((error_code::QUERY_ERROR, "bind failed".to_string()))?
785 };
786
787 let qr = runtime
788 .execute_query_expr(expr)
789 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
790 Ok(query_result_to_json(&qr))
791 }
792
793 "insert" => {
794 let collection = params.get("collection").and_then(Value::as_str).ok_or((
795 error_code::INVALID_PARAMS,
796 "missing 'collection' string".to_string(),
797 ))?;
798 let payload = params.get("payload").ok_or((
799 error_code::INVALID_PARAMS,
800 "missing 'payload' object".to_string(),
801 ))?;
802 let payload_obj = payload.as_object().ok_or((
803 error_code::INVALID_PARAMS,
804 "'payload' must be a JSON object".to_string(),
805 ))?;
806 let sql = build_insert_sql(collection, payload_obj.iter());
807
808 if let Some(tx) = session.current_tx_mut() {
809 tx.write_set.push(PendingSql::Insert(sql));
810 return Ok(pending_tx_response(tx.tx_id));
811 }
812
813 let qr = runtime
814 .execute_query(&sql)
815 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
816 Ok(insert_result_to_json(&qr))
817 }
818
819 "bulk_insert" => {
820 let collection = params.get("collection").and_then(Value::as_str).ok_or((
821 error_code::INVALID_PARAMS,
822 "missing 'collection' string".to_string(),
823 ))?;
824 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
825 error_code::INVALID_PARAMS,
826 "missing 'payloads' array".to_string(),
827 ))?;
828
829 if let Some(tx) = session.current_tx_mut() {
830 let mut buffered: u64 = 0;
831 for entry in payloads {
832 let obj = entry.as_object().ok_or((
833 error_code::INVALID_PARAMS,
834 "each payload must be a JSON object".to_string(),
835 ))?;
836 let sql = build_insert_sql(collection, obj.iter());
837 tx.write_set.push(PendingSql::Insert(sql));
838 buffered += 1;
839 }
840 let tx_id = tx.tx_id;
841 return Ok(Value::Object(
842 [
843 ("affected".to_string(), Value::Number(0.0)),
844 ("buffered".to_string(), Value::Number(buffered as f64)),
845 ("pending".to_string(), Value::Bool(true)),
846 ("tx_id".to_string(), Value::Number(tx_id as f64)),
847 ]
848 .into_iter()
849 .collect(),
850 ));
851 }
852
853 let mut total_affected: u64 = 0;
854 for entry in payloads {
855 let obj = entry.as_object().ok_or((
856 error_code::INVALID_PARAMS,
857 "each payload must be a JSON object".to_string(),
858 ))?;
859 let sql = build_insert_sql(collection, obj.iter());
860 let qr = runtime
861 .execute_query(&sql)
862 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
863 total_affected += qr.affected_rows;
864 }
865 Ok(Value::Object(
866 [("affected".to_string(), Value::Number(total_affected as f64))]
867 .into_iter()
868 .collect(),
869 ))
870 }
871
872 "get" => {
873 let collection = params.get("collection").and_then(Value::as_str).ok_or((
874 error_code::INVALID_PARAMS,
875 "missing 'collection' string".to_string(),
876 ))?;
877 let id = params.get("id").and_then(Value::as_str).ok_or((
878 error_code::INVALID_PARAMS,
879 "missing 'id' string".to_string(),
880 ))?;
881 let sql = format!("SELECT * FROM {collection} WHERE _entity_id = {id} LIMIT 1");
882 let qr = runtime
883 .execute_query(&sql)
884 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
885 let entity = qr
886 .result
887 .records
888 .first()
889 .map(record_to_json_object)
890 .unwrap_or(Value::Null);
891 Ok(Value::Object(
892 [("entity".to_string(), entity)].into_iter().collect(),
893 ))
894 }
895
896 "delete" => {
897 let collection = params.get("collection").and_then(Value::as_str).ok_or((
898 error_code::INVALID_PARAMS,
899 "missing 'collection' string".to_string(),
900 ))?;
901 let id = params.get("id").and_then(Value::as_str).ok_or((
902 error_code::INVALID_PARAMS,
903 "missing 'id' string".to_string(),
904 ))?;
905 let sql = format!("DELETE FROM {collection} WHERE _entity_id = {id}");
906
907 if let Some(tx) = session.current_tx_mut() {
908 tx.write_set.push(PendingSql::Delete(sql));
909 return Ok(pending_tx_response(tx.tx_id));
910 }
911
912 let qr = runtime
913 .execute_query(&sql)
914 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
915 Ok(Value::Object(
916 [(
917 "affected".to_string(),
918 Value::Number(qr.affected_rows as f64),
919 )]
920 .into_iter()
921 .collect(),
922 ))
923 }
924
925 "close" => {
926 let _ = session.take_tx();
931 session.clear_cursors();
932 let _ = runtime.checkpoint();
933 Ok(Value::Null)
934 }
935
936 "auth.login"
941 | "auth.whoami"
942 | "auth.change_password"
943 | "auth.create_api_key"
944 | "auth.revoke_api_key" => {
945 let _ = (session, params);
946 Err((
947 error_code::INVALID_REQUEST,
948 format!(
949 "{method}: auth methods are only available on grpc:// connections; \
950 embedded modes (memory://, file://) inherit caller privileges"
951 ),
952 ))
953 }
954
955 other => Err((
956 error_code::INVALID_REQUEST,
957 format!("unknown method: {other}"),
958 )),
959 }
960}
961
962fn success_response(id: &Value, result: &Value, is_close: bool) -> String {
967 let mut envelope = json::Map::new();
972 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
973 envelope.insert("id".to_string(), id.clone());
974 envelope.insert("result".to_string(), result.clone());
975 if is_close {
976 envelope.insert("__close__".to_string(), Value::Bool(true));
977 }
978 Value::Object(envelope).to_string_compact()
979}
980
981fn error_response(id: &Value, code: &str, message: &str) -> String {
982 let mut err = json::Map::new();
983 err.insert("code".to_string(), Value::String(code.to_string()));
984 err.insert("message".to_string(), Value::String(message.to_string()));
985 err.insert("data".to_string(), Value::Null);
986
987 let mut envelope = json::Map::new();
988 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
989 envelope.insert("id".to_string(), id.clone());
990 envelope.insert("error".to_string(), Value::Object(err));
991 Value::Object(envelope).to_string_compact()
992}
993
994fn pending_tx_response(tx_id: u64) -> Value {
1001 Value::Object(
1002 [
1003 ("affected".to_string(), Value::Number(0.0)),
1004 ("pending".to_string(), Value::Bool(true)),
1005 ("tx_id".to_string(), Value::Number(tx_id as f64)),
1006 ]
1007 .into_iter()
1008 .collect(),
1009 )
1010}
1011
1012pub(crate) fn build_insert_sql<'a, I>(collection: &str, fields: I) -> String
1013where
1014 I: Iterator<Item = (&'a String, &'a Value)>,
1015{
1016 let mut cols = Vec::new();
1017 let mut vals = Vec::new();
1018 for (k, v) in fields {
1019 cols.push(k.clone());
1020 vals.push(value_to_sql_literal(v));
1021 }
1022 format!(
1023 "INSERT INTO {collection} ({}) VALUES ({})",
1024 cols.join(", "),
1025 vals.join(", "),
1026 )
1027}
1028
1029pub(crate) fn value_to_sql_literal(v: &Value) -> String {
1030 match v {
1031 Value::Null => "NULL".to_string(),
1032 Value::Bool(b) => b.to_string(),
1033 Value::Number(n) => {
1034 if n.fract() == 0.0 {
1035 format!("{}", *n as i64)
1036 } else {
1037 n.to_string()
1038 }
1039 }
1040 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1041 other => format!("'{}'", other.to_string_compact().replace('\'', "''")),
1042 }
1043}
1044
1045fn query_result_to_json(qr: &RuntimeQueryResult) -> Value {
1046 let mut envelope = json::Map::new();
1047 envelope.insert(
1048 "statement".to_string(),
1049 Value::String(qr.statement_type.to_string()),
1050 );
1051 envelope.insert(
1052 "affected".to_string(),
1053 Value::Number(qr.affected_rows as f64),
1054 );
1055
1056 let mut columns = Vec::new();
1057 if let Some(first) = qr.result.records.first() {
1058 let mut keys: Vec<String> = first
1059 .column_names()
1060 .into_iter()
1061 .map(|k| k.to_string())
1062 .collect();
1063 keys.sort();
1064 columns = keys.into_iter().map(Value::String).collect();
1065 }
1066 envelope.insert("columns".to_string(), Value::Array(columns));
1067
1068 let rows: Vec<Value> = qr
1069 .result
1070 .records
1071 .iter()
1072 .map(record_to_json_object)
1073 .collect();
1074 envelope.insert("rows".to_string(), Value::Array(rows));
1075
1076 Value::Object(envelope)
1077}
1078
1079pub(crate) fn insert_result_to_json(qr: &RuntimeQueryResult) -> Value {
1080 let mut envelope = json::Map::new();
1081 envelope.insert(
1082 "affected".to_string(),
1083 Value::Number(qr.affected_rows as f64),
1084 );
1085 if let Some(first) = qr.result.records.first() {
1087 if let Some(id_val) = first
1088 .iter_fields()
1089 .find(|(k, _)| {
1090 let s: &str = k;
1091 s == "_entity_id"
1092 })
1093 .map(|(_, v)| schema_value_to_json(v))
1094 {
1095 envelope.insert("id".to_string(), id_val);
1096 }
1097 }
1098 Value::Object(envelope)
1099}
1100
1101fn record_to_json_object(record: &UnifiedRecord) -> Value {
1102 let mut map = json::Map::new();
1103 let mut entries: Vec<(&str, &SchemaValue)> =
1106 record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
1107 entries.sort_by(|a, b| a.0.cmp(b.0));
1108 for (k, v) in entries {
1109 map.insert(k.to_string(), schema_value_to_json(v));
1110 }
1111 Value::Object(map)
1112}
1113
1114fn schema_value_to_json(v: &SchemaValue) -> Value {
1115 match v {
1116 SchemaValue::Null => Value::Null,
1117 SchemaValue::Boolean(b) => Value::Bool(*b),
1118 SchemaValue::Integer(n) => Value::Number(*n as f64),
1119 SchemaValue::UnsignedInteger(n) => Value::Number(*n as f64),
1120 SchemaValue::Float(n) => Value::Number(*n),
1121 SchemaValue::BigInt(n) => Value::Number(*n as f64),
1122 SchemaValue::TimestampMs(n)
1123 | SchemaValue::Timestamp(n)
1124 | SchemaValue::Duration(n)
1125 | SchemaValue::Decimal(n) => Value::Number(*n as f64),
1126 SchemaValue::Password(_) | SchemaValue::Secret(_) => Value::String("***".to_string()),
1127 SchemaValue::Text(s) => Value::String(s.to_string()),
1128 SchemaValue::Email(s)
1129 | SchemaValue::Url(s)
1130 | SchemaValue::NodeRef(s)
1131 | SchemaValue::EdgeRef(s) => Value::String(s.clone()),
1132 other => Value::String(format!("{other}")),
1133 }
1134}
1135
1136fn json_value_to_schema_value(v: &Value) -> SchemaValue {
1141 match v {
1142 Value::Null => SchemaValue::Null,
1143 Value::Bool(b) => SchemaValue::Boolean(*b),
1144 Value::Number(n) => {
1145 if n.fract() == 0.0 && n.abs() < i64::MAX as f64 {
1146 SchemaValue::Integer(*n as i64)
1147 } else {
1148 SchemaValue::Float(*n)
1149 }
1150 }
1151 Value::String(s) => SchemaValue::text(s.clone()),
1152 Value::Array(_) | Value::Object(_) => {
1153 SchemaValue::text(crate::json::to_string(v).unwrap_or_default())
1154 }
1155 }
1156}
1157
1158fn dispatch_method_remote(
1168 client: &AsyncMutex<RedDBClient>,
1169 tokio_rt: &tokio::runtime::Runtime,
1170 method: &str,
1171 params: &Value,
1172) -> Result<Value, (&'static str, String)> {
1173 match method {
1174 "version" => Ok(Value::Object(
1175 [
1176 (
1177 "version".to_string(),
1178 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1179 ),
1180 (
1181 "protocol".to_string(),
1182 Value::String(PROTOCOL_VERSION.to_string()),
1183 ),
1184 ]
1185 .into_iter()
1186 .collect(),
1187 )),
1188
1189 "health" => {
1190 let result = tokio_rt.block_on(async {
1191 let mut guard = client.lock().await;
1192 guard.health_status().await
1193 });
1194 match result {
1195 Ok(status) => Ok(Value::Object(
1196 [
1197 ("ok".to_string(), Value::Bool(status.healthy)),
1198 ("state".to_string(), Value::String(status.state)),
1199 (
1200 "checked_at_unix_ms".to_string(),
1201 Value::Number(status.checked_at_unix_ms as f64),
1202 ),
1203 (
1204 "version".to_string(),
1205 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1206 ),
1207 ]
1208 .into_iter()
1209 .collect(),
1210 )),
1211 Err(e) => Err((error_code::INTERNAL_ERROR, e.to_string())),
1212 }
1213 }
1214
1215 "query" => {
1216 let sql = params.get("sql").and_then(Value::as_str).ok_or((
1217 error_code::INVALID_PARAMS,
1218 "missing 'sql' string".to_string(),
1219 ))?;
1220 let json_str = tokio_rt
1221 .block_on(async {
1222 let mut guard = client.lock().await;
1223 guard.query(sql).await
1224 })
1225 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1226 let parsed = json::from_str::<Value>(&json_str)
1231 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1232 Ok(parsed)
1233 }
1234
1235 "insert" => {
1236 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1237 error_code::INVALID_PARAMS,
1238 "missing 'collection' string".to_string(),
1239 ))?;
1240 let payload = params.get("payload").ok_or((
1241 error_code::INVALID_PARAMS,
1242 "missing 'payload' object".to_string(),
1243 ))?;
1244 if payload.as_object().is_none() {
1245 return Err((
1246 error_code::INVALID_PARAMS,
1247 "'payload' must be a JSON object".to_string(),
1248 ));
1249 }
1250 let payload_json = payload.to_string_compact();
1251 let reply = tokio_rt
1252 .block_on(async {
1253 let mut guard = client.lock().await;
1254 guard.create_row_entity(collection, &payload_json).await
1255 })
1256 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1257 let mut out = json::Map::new();
1258 out.insert("affected".to_string(), Value::Number(1.0));
1259 out.insert("id".to_string(), Value::String(reply.id.to_string()));
1260 Ok(Value::Object(out))
1261 }
1262
1263 "bulk_insert" => {
1264 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1265 error_code::INVALID_PARAMS,
1266 "missing 'collection' string".to_string(),
1267 ))?;
1268 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
1269 error_code::INVALID_PARAMS,
1270 "missing 'payloads' array".to_string(),
1271 ))?;
1272 let mut encoded = Vec::with_capacity(payloads.len());
1273 for entry in payloads {
1274 if entry.as_object().is_none() {
1275 return Err((
1276 error_code::INVALID_PARAMS,
1277 "each payload must be a JSON object".to_string(),
1278 ));
1279 }
1280 encoded.push(entry.to_string_compact());
1281 }
1282 let total = tokio_rt
1283 .block_on(async {
1284 let mut guard = client.lock().await;
1285 guard.bulk_create_rows(collection, encoded).await
1286 })
1287 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?
1288 .count;
1289 Ok(Value::Object(
1290 [("affected".to_string(), Value::Number(total as f64))]
1291 .into_iter()
1292 .collect(),
1293 ))
1294 }
1295
1296 "get" => {
1297 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1298 error_code::INVALID_PARAMS,
1299 "missing 'collection' string".to_string(),
1300 ))?;
1301 let id = params.get("id").and_then(Value::as_str).ok_or((
1302 error_code::INVALID_PARAMS,
1303 "missing 'id' string".to_string(),
1304 ))?;
1305 let sql = format!("SELECT * FROM {collection} WHERE _entity_id = {id} LIMIT 1");
1306 let json_str = tokio_rt
1307 .block_on(async {
1308 let mut guard = client.lock().await;
1309 guard.query(&sql).await
1310 })
1311 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1312 let parsed = json::from_str::<Value>(&json_str)
1313 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1314 let entity = parsed
1317 .get("rows")
1318 .and_then(Value::as_array)
1319 .and_then(|rows| rows.first().cloned())
1320 .unwrap_or(Value::Null);
1321 Ok(Value::Object(
1322 [("entity".to_string(), entity)].into_iter().collect(),
1323 ))
1324 }
1325
1326 "delete" => {
1327 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1328 error_code::INVALID_PARAMS,
1329 "missing 'collection' string".to_string(),
1330 ))?;
1331 let id = params.get("id").and_then(Value::as_str).ok_or((
1332 error_code::INVALID_PARAMS,
1333 "missing 'id' string".to_string(),
1334 ))?;
1335 let id = id.parse::<u64>().map_err(|_| {
1336 (
1337 error_code::INVALID_PARAMS,
1338 "id must be a numeric string".to_string(),
1339 )
1340 })?;
1341 let _reply = tokio_rt
1342 .block_on(async {
1343 let mut guard = client.lock().await;
1344 guard.delete_entity(collection, id).await
1345 })
1346 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1347 Ok(Value::Object(
1348 [("affected".to_string(), Value::Number(1.0))]
1349 .into_iter()
1350 .collect(),
1351 ))
1352 }
1353
1354 "close" => Ok(Value::Null),
1355
1356 other => Err((
1357 error_code::INVALID_REQUEST,
1358 format!("unknown method: {other}"),
1359 )),
1360 }
1361}
1362
1363#[cfg(test)]
1364mod tests {
1365 use super::*;
1366
1367 fn make_runtime() -> RedDBRuntime {
1368 RedDBRuntime::in_memory().expect("in-memory runtime")
1369 }
1370
1371 fn handle(rt: &RedDBRuntime, line: &str) -> String {
1372 let mut session = Session::new();
1373 handle_line(&Backend::Local(rt), &mut session, line)
1374 }
1375
1376 fn with_session<F>(rt: &RedDBRuntime, f: F)
1379 where
1380 F: FnOnce(&dyn Fn(&str) -> String, &RedDBRuntime),
1381 {
1382 let session = std::cell::RefCell::new(Session::new());
1383 let call = |line: &str| -> String {
1384 let mut s = session.borrow_mut();
1385 handle_line(&Backend::Local(rt), &mut s, line)
1386 };
1387 f(&call, rt);
1388 }
1389
1390 #[test]
1391 fn version_method_returns_version_and_protocol() {
1392 let rt = make_runtime();
1393 let line = r#"{"jsonrpc":"2.0","id":1,"method":"version","params":{}}"#;
1394 let resp = handle(&rt, line);
1395 assert!(resp.contains("\"id\":1"));
1396 assert!(resp.contains("\"protocol\":\"1.0\""));
1397 assert!(resp.contains("\"version\""));
1398 }
1399
1400 #[test]
1401 fn health_method_returns_ok_true() {
1402 let rt = make_runtime();
1403 let resp = handle(
1404 &rt,
1405 r#"{"jsonrpc":"2.0","id":"abc","method":"health","params":{}}"#,
1406 );
1407 assert!(resp.contains("\"ok\":true"));
1408 assert!(resp.contains("\"id\":\"abc\""));
1409 }
1410
1411 #[test]
1412 fn parse_error_for_invalid_json() {
1413 let rt = make_runtime();
1414 let resp = handle(&rt, "not json {");
1415 assert!(resp.contains("\"code\":\"PARSE_ERROR\""));
1416 assert!(resp.contains("\"id\":null"));
1417 }
1418
1419 #[test]
1420 fn invalid_request_when_method_missing() {
1421 let rt = make_runtime();
1422 let resp = handle(&rt, r#"{"jsonrpc":"2.0","id":1,"params":{}}"#);
1423 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
1424 }
1425
1426 #[test]
1427 fn unknown_method_is_invalid_request() {
1428 let rt = make_runtime();
1429 let resp = handle(
1430 &rt,
1431 r#"{"jsonrpc":"2.0","id":1,"method":"frobnicate","params":{}}"#,
1432 );
1433 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
1434 assert!(resp.contains("frobnicate"));
1435 }
1436
1437 #[test]
1438 fn invalid_params_when_query_sql_missing() {
1439 let rt = make_runtime();
1440 let resp = handle(
1441 &rt,
1442 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{}}"#,
1443 );
1444 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
1445 }
1446
1447 #[test]
1448 fn close_method_marks_response_for_shutdown() {
1449 let rt = make_runtime();
1450 let resp = handle(
1451 &rt,
1452 r#"{"jsonrpc":"2.0","id":1,"method":"close","params":{}}"#,
1453 );
1454 assert!(resp.contains("\"__close__\":true"));
1455 }
1456
1457 #[test]
1458 fn query_select_one_returns_rows() {
1459 let rt = make_runtime();
1460 let resp = handle(
1461 &rt,
1462 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"SELECT 1 AS one"}}"#,
1463 );
1464 assert!(resp.contains("\"result\""));
1465 assert!(!resp.contains("\"error\""));
1466 }
1467
1468 #[test]
1473 fn tx_begin_returns_tx_id_and_isolation() {
1474 let rt = make_runtime();
1475 with_session(&rt, |call, _| {
1476 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1477 assert!(resp.contains("\"tx_id\":1"));
1478 assert!(resp.contains("\"isolation\":\"read_committed_deferred\""));
1479 assert!(!resp.contains("\"error\""));
1480 });
1481 }
1482
1483 #[test]
1484 fn tx_begin_twice_returns_already_open() {
1485 let rt = make_runtime();
1486 with_session(&rt, |call, _| {
1487 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1488 let resp = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
1489 assert!(resp.contains("\"code\":\"TX_ALREADY_OPEN\""));
1490 });
1491 }
1492
1493 #[test]
1494 fn tx_commit_without_begin_returns_no_tx_open() {
1495 let rt = make_runtime();
1496 with_session(&rt, |call, _| {
1497 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.commit","params":null}"#);
1498 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
1499 });
1500 }
1501
1502 #[test]
1503 fn tx_rollback_without_begin_returns_no_tx_open() {
1504 let rt = make_runtime();
1505 with_session(&rt, |call, _| {
1506 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.rollback","params":null}"#);
1507 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
1508 });
1509 }
1510
1511 #[test]
1512 fn insert_inside_tx_returns_pending_envelope() {
1513 let rt = make_runtime();
1514 let _ = handle(
1516 &rt,
1517 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE users (name TEXT)"}}"#,
1518 );
1519 with_session(&rt, |call, _| {
1520 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1521 let resp = call(
1522 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"users","payload":{"name":"alice"}}}"#,
1523 );
1524 assert!(resp.contains("\"pending\":true"));
1525 assert!(resp.contains("\"tx_id\":1"));
1526 assert!(resp.contains("\"affected\":0"));
1527 });
1528 }
1529
1530 #[test]
1531 fn begin_insert_rollback_does_not_persist() {
1532 let rt = make_runtime();
1533 let _ = handle(
1534 &rt,
1535 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u (name TEXT)"}}"#,
1536 );
1537 with_session(&rt, |call, _| {
1538 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1539 let _ = call(
1540 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u","payload":{"name":"ghost"}}}"#,
1541 );
1542 let rollback = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
1543 assert!(rollback.contains("\"ops_discarded\":1"));
1544 assert!(rollback.contains("\"tx_id\":1"));
1545 });
1546 let resp = handle(
1548 &rt,
1549 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u"}}"#,
1550 );
1551 assert!(!resp.contains("\"ghost\""));
1552 }
1553
1554 #[test]
1555 fn begin_insert_commit_persists() {
1556 let rt = make_runtime();
1557 let _ = handle(
1558 &rt,
1559 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u2 (name TEXT)"}}"#,
1560 );
1561 with_session(&rt, |call, _| {
1562 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1563 let _ = call(
1564 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u2","payload":{"name":"alice"}}}"#,
1565 );
1566 let _ = call(
1567 r#"{"jsonrpc":"2.0","id":3,"method":"insert","params":{"collection":"u2","payload":{"name":"bob"}}}"#,
1568 );
1569 let commit = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.commit","params":null}"#);
1570 assert!(commit.contains("\"ops_replayed\":2"));
1571 assert!(!commit.contains("\"error\""));
1572 });
1573 let resp = handle(
1574 &rt,
1575 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u2"}}"#,
1576 );
1577 assert!(resp.contains("\"alice\""));
1578 assert!(resp.contains("\"bob\""));
1579 }
1580
1581 #[test]
1582 fn bulk_insert_inside_tx_buffers_everything() {
1583 let rt = make_runtime();
1584 let _ = handle(
1585 &rt,
1586 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u3 (name TEXT)"}}"#,
1587 );
1588 with_session(&rt, |call, _| {
1589 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1590 let resp = call(
1591 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"u3","payloads":[{"name":"a"},{"name":"b"},{"name":"c"}]}}"#,
1592 );
1593 assert!(resp.contains("\"buffered\":3"));
1594 assert!(resp.contains("\"pending\":true"));
1595 assert!(resp.contains("\"affected\":0"));
1596
1597 let commit = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
1598 assert!(commit.contains("\"ops_replayed\":3"));
1599 });
1600 }
1601
1602 #[test]
1603 fn delete_inside_tx_is_buffered() {
1604 let rt = make_runtime();
1605 let _ = handle(
1607 &rt,
1608 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u4 (name TEXT)"}}"#,
1609 );
1610 let _ = handle(
1611 &rt,
1612 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO u4 (name) VALUES ('keep')"}}"#,
1613 );
1614 with_session(&rt, |call, _| {
1615 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1616 let resp = call(
1617 r#"{"jsonrpc":"2.0","id":2,"method":"delete","params":{"collection":"u4","id":"1"}}"#,
1618 );
1619 assert!(resp.contains("\"pending\":true"));
1620 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
1621 });
1622 let resp = handle(
1624 &rt,
1625 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u4"}}"#,
1626 );
1627 assert!(resp.contains("\"keep\""));
1628 }
1629
1630 #[test]
1631 fn close_with_open_tx_auto_rollbacks() {
1632 let rt = make_runtime();
1633 let _ = handle(
1634 &rt,
1635 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u5 (name TEXT)"}}"#,
1636 );
1637 with_session(&rt, |call, _| {
1638 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1639 let _ = call(
1640 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u5","payload":{"name":"ghost"}}}"#,
1641 );
1642 let close = call(r#"{"jsonrpc":"2.0","id":3,"method":"close","params":null}"#);
1643 assert!(close.contains("\"__close__\":true"));
1644 assert!(!close.contains("\"error\""));
1645 });
1646 let resp = handle(
1647 &rt,
1648 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u5"}}"#,
1649 );
1650 assert!(!resp.contains("\"ghost\""));
1651 }
1652
1653 fn seed_numbers_table(rt: &RedDBRuntime, table: &str, count: u32) {
1658 let _ = handle(
1659 rt,
1660 &format!(
1661 r#"{{"jsonrpc":"2.0","id":1,"method":"query","params":{{"sql":"CREATE TABLE {table} (n INTEGER)"}}}}"#,
1662 ),
1663 );
1664 for i in 0..count {
1665 let _ = handle(
1666 rt,
1667 &format!(
1668 r#"{{"jsonrpc":"2.0","id":2,"method":"query","params":{{"sql":"INSERT INTO {table} (n) VALUES ({i})"}}}}"#,
1669 ),
1670 );
1671 }
1672 }
1673
1674 #[test]
1675 fn cursor_open_returns_id_columns_and_total() {
1676 let rt = make_runtime();
1677 seed_numbers_table(&rt, "nums1", 3);
1678 with_session(&rt, |call, _| {
1679 let resp = call(
1680 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums1"}}"#,
1681 );
1682 assert!(resp.contains("\"cursor_id\":1"));
1683 assert!(resp.contains("\"total_rows\":3"));
1684 assert!(resp.contains("\"columns\""));
1685 assert!(!resp.contains("\"error\""));
1686 });
1687 }
1688
1689 #[test]
1690 fn cursor_next_chunks_rows_and_signals_done() {
1691 let rt = make_runtime();
1692 seed_numbers_table(&rt, "nums2", 5);
1693 with_session(&rt, |call, _| {
1694 let _ = call(
1695 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums2"}}"#,
1696 );
1697 let first = call(
1698 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
1699 );
1700 assert!(first.contains("\"done\":false"));
1701 assert!(first.contains("\"remaining\":3"));
1702
1703 let second = call(
1704 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
1705 );
1706 assert!(second.contains("\"done\":false"));
1707 assert!(second.contains("\"remaining\":1"));
1708
1709 let third = call(
1710 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
1711 );
1712 assert!(third.contains("\"done\":true"));
1713 assert!(third.contains("\"remaining\":0"));
1714 });
1715 }
1716
1717 #[test]
1718 fn cursor_auto_drops_when_exhausted() {
1719 let rt = make_runtime();
1720 seed_numbers_table(&rt, "nums3", 2);
1721 with_session(&rt, |call, _| {
1722 let _ = call(
1723 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums3"}}"#,
1724 );
1725 let _ = call(
1726 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
1727 );
1728 let resp = call(
1731 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
1732 );
1733 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1734 });
1735 }
1736
1737 #[test]
1738 fn cursor_close_removes_it() {
1739 let rt = make_runtime();
1740 seed_numbers_table(&rt, "nums4", 3);
1741 with_session(&rt, |call, _| {
1742 let _ = call(
1743 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums4"}}"#,
1744 );
1745 let close =
1746 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.close","params":{"cursor_id":1}}"#);
1747 assert!(close.contains("\"closed\":true"));
1748 let after = call(
1749 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
1750 );
1751 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1752 });
1753 }
1754
1755 #[test]
1756 fn cursor_close_unknown_errors() {
1757 let rt = make_runtime();
1758 with_session(&rt, |call, _| {
1759 let resp = call(
1760 r#"{"jsonrpc":"2.0","id":1,"method":"query.close","params":{"cursor_id":9999}}"#,
1761 );
1762 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1763 });
1764 }
1765
1766 #[test]
1767 fn cursor_next_without_cursor_id_errors() {
1768 let rt = make_runtime();
1769 with_session(&rt, |call, _| {
1770 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"query.next","params":{}}"#);
1771 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
1772 });
1773 }
1774
1775 #[test]
1776 fn cursor_default_batch_size_returns_all_when_smaller_than_default() {
1777 let rt = make_runtime();
1778 seed_numbers_table(&rt, "nums5", 7);
1779 with_session(&rt, |call, _| {
1780 let _ = call(
1781 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums5"}}"#,
1782 );
1783 let resp =
1785 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1}}"#);
1786 assert!(resp.contains("\"done\":true"));
1787 assert!(resp.contains("\"remaining\":0"));
1788 });
1789 }
1790
1791 #[test]
1792 fn close_method_drops_open_cursors() {
1793 let rt = make_runtime();
1794 seed_numbers_table(&rt, "nums6", 3);
1795 with_session(&rt, |call, _| {
1798 let _ = call(
1799 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums6"}}"#,
1800 );
1801 let close = call(r#"{"jsonrpc":"2.0","id":2,"method":"close","params":null}"#);
1802 assert!(close.contains("\"__close__\":true"));
1803 let after = call(
1805 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
1806 );
1807 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1808 });
1809 }
1810
1811 #[test]
1812 fn cursor_independent_of_transaction_state() {
1813 let rt = make_runtime();
1814 seed_numbers_table(&rt, "nums7", 4);
1815 with_session(&rt, |call, _| {
1816 let _ = call(
1818 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums7"}}"#,
1819 );
1820 let _ = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
1821 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
1822 let resp = call(
1823 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
1824 );
1825 assert!(resp.contains("\"done\":true"));
1826 assert!(!resp.contains("\"error\""));
1827 });
1828 }
1829
1830 #[test]
1831 fn second_tx_after_commit_gets_fresh_id() {
1832 let rt = make_runtime();
1833 let _ = handle(
1834 &rt,
1835 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u6 (name TEXT)"}}"#,
1836 );
1837 with_session(&rt, |call, _| {
1838 let first = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1839 assert!(first.contains("\"tx_id\":1"));
1840 let _ = call(
1841 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u6","payload":{"name":"x"}}}"#,
1842 );
1843 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
1844
1845 let second = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.begin","params":null}"#);
1846 assert!(second.contains("\"tx_id\":2"));
1847 let _ = call(r#"{"jsonrpc":"2.0","id":5,"method":"tx.rollback","params":null}"#);
1848 });
1849 }
1850
1851 #[test]
1852 fn prepare_and_execute_prepared_statement() {
1853 let rt = make_runtime();
1854 let _ = handle(
1856 &rt,
1857 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE ps_test (n INTEGER)"}}"#,
1858 );
1859 let _ = handle(
1860 &rt,
1861 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO ps_test (n) VALUES (42)"}}"#,
1862 );
1863
1864 with_session(&rt, |call, _| {
1865 let prep = call(
1867 r#"{"jsonrpc":"2.0","id":3,"method":"prepare","params":{"sql":"SELECT n FROM ps_test WHERE n = 42"}}"#,
1868 );
1869 assert!(prep.contains("\"prepared_id\""), "prepare response: {prep}");
1870
1871 let id: u64 = {
1873 let v: crate::json::Value = crate::json::from_str(&prep).expect("json");
1874 let result = v.get("result").expect("result");
1875 result
1876 .get("prepared_id")
1877 .and_then(|n| n.as_f64())
1878 .expect("prepared_id") as u64
1879 };
1880
1881 let exec = call(&format!(
1883 r#"{{"jsonrpc":"2.0","id":4,"method":"execute_prepared","params":{{"prepared_id":{id},"binds":[42]}}}}"#
1884 ));
1885 assert!(
1887 exec.contains("\"rows\""),
1888 "execute_prepared response: {exec}"
1889 );
1890 assert!(exec.contains("42"), "expected row with n=42 in: {exec}");
1891 });
1892 }
1893}