1use std::io::{BufRead, BufReader, Stdin, Write};
19use std::panic::AssertUnwindSafe;
20
21use tokio::sync::Mutex as AsyncMutex;
22
23use crate::json::{self as json, Value};
24use crate::runtime::{RedDBRuntime, RuntimeQueryResult};
25use crate::storage::query::unified::UnifiedRecord;
26use crate::storage::schema::Value as SchemaValue;
27use reddb_client_connector::RedDBClient;
28
29enum Backend<'a> {
42 Local(&'a RedDBRuntime),
43 Remote(Box<RemoteBackend<'a>>),
44}
45
46struct RemoteBackend<'a> {
47 client: AsyncMutex<RedDBClient>,
48 tokio_rt: &'a tokio::runtime::Runtime,
49}
50
51pub const PROTOCOL_VERSION: &str = "1.0";
53
54pub mod error_code {
56 pub const PARSE_ERROR: &str = "PARSE_ERROR";
57 pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
58 pub const INVALID_PARAMS: &str = "INVALID_PARAMS";
59 pub const QUERY_ERROR: &str = "QUERY_ERROR";
60 pub const NOT_FOUND: &str = "NOT_FOUND";
61 pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
62 pub const TX_ALREADY_OPEN: &str = "TX_ALREADY_OPEN";
65 pub const NO_TX_OPEN: &str = "NO_TX_OPEN";
68 pub const TX_REPLAY_FAILED: &str = "TX_REPLAY_FAILED";
72 pub const TX_NOT_SUPPORTED_REMOTE: &str = "TX_NOT_SUPPORTED_REMOTE";
74 pub const CURSOR_NOT_FOUND: &str = "CURSOR_NOT_FOUND";
78 pub const CURSOR_LIMIT_EXCEEDED: &str = "CURSOR_LIMIT_EXCEEDED";
80}
81
82pub(crate) const MAX_CURSORS_PER_SESSION: usize = 64;
86pub(crate) const DEFAULT_CURSOR_BATCH_SIZE: usize = 100;
90pub(crate) const MAX_CURSOR_BATCH_SIZE: usize = 10_000;
93
94struct StdioPreparedStatement {
117 shape: crate::storage::query::ast::QueryExpr,
118 parameter_count: usize,
119}
120
121pub(crate) struct Session {
122 next_tx_id: u64,
123 current_tx: Option<OpenTx>,
124 next_cursor_id: u64,
125 cursors: std::collections::HashMap<u64, Cursor>,
126 next_prepared_id: u64,
128 prepared: std::collections::HashMap<u64, StdioPreparedStatement>,
130}
131
132impl Session {
133 pub(crate) fn new() -> Self {
134 Self {
135 next_tx_id: 1,
136 current_tx: None,
137 next_cursor_id: 1,
138 cursors: std::collections::HashMap::new(),
139 next_prepared_id: 1,
140 prepared: std::collections::HashMap::new(),
141 }
142 }
143
144 fn open_tx(&mut self) -> Result<u64, (&'static str, String)> {
145 if let Some(tx) = &self.current_tx {
146 return Err((
147 error_code::TX_ALREADY_OPEN,
148 format!("transaction {} already open in this session", tx.tx_id),
149 ));
150 }
151 let tx_id = self.next_tx_id;
152 self.next_tx_id = self.next_tx_id.saturating_add(1);
153 self.current_tx = Some(OpenTx {
154 tx_id,
155 write_set: Vec::new(),
156 });
157 Ok(tx_id)
158 }
159
160 fn take_tx(&mut self) -> Option<OpenTx> {
161 self.current_tx.take()
162 }
163
164 fn current_tx_mut(&mut self) -> Option<&mut OpenTx> {
165 self.current_tx.as_mut()
166 }
167
168 #[allow(dead_code)]
169 fn has_tx(&self) -> bool {
170 self.current_tx.is_some()
171 }
172
173 fn insert_cursor(&mut self, cursor: Cursor) -> Result<u64, (&'static str, String)> {
176 if self.cursors.len() >= MAX_CURSORS_PER_SESSION {
177 return Err((
178 error_code::CURSOR_LIMIT_EXCEEDED,
179 format!(
180 "session already holds {} cursors (max {}) — close some before opening new ones",
181 self.cursors.len(),
182 MAX_CURSORS_PER_SESSION
183 ),
184 ));
185 }
186 let id = self.next_cursor_id;
187 self.next_cursor_id = self.next_cursor_id.saturating_add(1);
188 let mut cursor = cursor;
189 cursor.cursor_id = id;
190 self.cursors.insert(id, cursor);
191 Ok(id)
192 }
193
194 fn cursor_mut(&mut self, id: u64) -> Option<&mut Cursor> {
195 self.cursors.get_mut(&id)
196 }
197
198 fn drop_cursor(&mut self, id: u64) -> Option<Cursor> {
199 self.cursors.remove(&id)
200 }
201
202 fn clear_cursors(&mut self) {
203 self.cursors.clear();
204 }
205}
206
207impl Default for Session {
208 fn default() -> Self {
209 Self::new()
210 }
211}
212
213struct OpenTx {
215 tx_id: u64,
216 write_set: Vec<PendingSql>,
217}
218
219enum PendingSql {
223 Insert(String),
224 Delete(String),
225 #[allow(dead_code)] Update(String),
227}
228
229impl PendingSql {
230 fn sql(&self) -> &str {
231 match self {
232 PendingSql::Insert(s) | PendingSql::Delete(s) | PendingSql::Update(s) => s,
233 }
234 }
235}
236
237pub(crate) struct Cursor {
249 cursor_id: u64,
250 columns: Vec<String>,
251 rows: Vec<UnifiedRecord>,
252 position: usize,
253}
254
255impl Cursor {
256 fn new(columns: Vec<String>, rows: Vec<UnifiedRecord>) -> Self {
257 Self {
258 cursor_id: 0, columns,
260 rows,
261 position: 0,
262 }
263 }
264
265 fn total(&self) -> usize {
266 self.rows.len()
267 }
268
269 fn remaining(&self) -> usize {
270 self.rows.len().saturating_sub(self.position)
271 }
272
273 fn is_exhausted(&self) -> bool {
274 self.position >= self.rows.len()
275 }
276
277 fn take_batch(&mut self, batch_size: usize) -> &[UnifiedRecord] {
280 let end = (self.position + batch_size).min(self.rows.len());
281 let slice = &self.rows[self.position..end];
282 self.position = end;
283 slice
284 }
285}
286
287pub fn run(runtime: &RedDBRuntime) -> i32 {
293 run_with_io(runtime, std::io::stdin(), &mut std::io::stdout())
294}
295
296pub fn run_remote(endpoint: &str, token: Option<String>) -> i32 {
303 let tokio_rt = match tokio::runtime::Builder::new_current_thread()
304 .enable_all()
305 .build()
306 {
307 Ok(rt) => rt,
308 Err(e) => {
309 tracing::error!(err = %e, "rpc: failed to build tokio runtime");
310 return 1;
311 }
312 };
313 let client = match tokio_rt.block_on(RedDBClient::connect(endpoint, token)) {
314 Ok(c) => c,
315 Err(e) => {
316 tracing::error!(endpoint, err = %e, "rpc: failed to connect");
317 return 1;
318 }
319 };
320 let backend = Backend::Remote(Box::new(RemoteBackend {
321 client: AsyncMutex::new(client),
322 tokio_rt: &tokio_rt,
323 }));
324 run_backend(&backend, std::io::stdin(), &mut std::io::stdout())
325}
326
327pub fn run_with_io<W: Write>(runtime: &RedDBRuntime, stdin: Stdin, stdout: &mut W) -> i32 {
329 run_backend(&Backend::Local(runtime), stdin, stdout)
330}
331
332static STDIO_SESSION_CONN_ID: std::sync::atomic::AtomicU64 =
339 std::sync::atomic::AtomicU64::new(1_000_000);
340
341fn next_stdio_conn_id() -> u64 {
342 STDIO_SESSION_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
343}
344
345fn run_backend<W: Write>(backend: &Backend<'_>, stdin: Stdin, stdout: &mut W) -> i32 {
346 let reader = BufReader::new(stdin.lock());
347 let mut session = Session::new();
348 let conn_id = next_stdio_conn_id();
352 crate::runtime::impl_core::set_current_connection_id(conn_id);
353 for line_result in reader.lines() {
354 let line = match line_result {
355 Ok(l) => l,
356 Err(e) => {
357 let _ = writeln!(
358 stdout,
359 "{}",
360 error_response(&Value::Null, error_code::INTERNAL_ERROR, &e.to_string())
361 );
362 let _ = stdout.flush();
363 return 1;
364 }
365 };
366 if line.trim().is_empty() {
367 continue;
368 }
369 let response = handle_line(backend, &mut session, &line);
370 if writeln!(stdout, "{}", response).is_err() || stdout.flush().is_err() {
371 return 1;
372 }
373 if response.contains("\"__close__\":true") {
374 return 0;
375 }
376 }
377 let _ = session.take_tx();
381 crate::runtime::impl_core::clear_current_connection_id();
382 0
383}
384
385fn handle_line(backend: &Backend<'_>, session: &mut Session, line: &str) -> String {
389 let parsed: Value = match json::from_str(line) {
390 Ok(v) => v,
391 Err(err) => {
392 return error_response(
393 &Value::Null,
394 error_code::PARSE_ERROR,
395 &format!("invalid JSON: {err}"),
396 );
397 }
398 };
399
400 let id = parsed.get("id").cloned().unwrap_or(Value::Null);
401
402 let method = match parsed.get("method").and_then(Value::as_str) {
403 Some(m) => m.to_string(),
404 None => {
405 return error_response(&id, error_code::INVALID_REQUEST, "missing 'method' field");
406 }
407 };
408
409 let params = parsed.get("params").cloned().unwrap_or(Value::Null);
410
411 let dispatch = std::panic::catch_unwind(AssertUnwindSafe(|| match backend {
412 Backend::Local(rt) => dispatch_method(rt, session, &method, ¶ms),
413 Backend::Remote(remote) => {
414 if matches!(
419 method.as_str(),
420 "tx.begin"
421 | "tx.commit"
422 | "tx.rollback"
423 | "query.open"
424 | "query.next"
425 | "query.close"
426 ) {
427 Err((
428 error_code::TX_NOT_SUPPORTED_REMOTE,
429 format!("{method} is not supported over remote gRPC yet"),
430 ))
431 } else {
432 dispatch_method_remote(&remote.client, remote.tokio_rt, &method, ¶ms)
433 }
434 }
435 }));
436
437 match dispatch {
438 Ok(Ok(result)) => success_response(&id, &result, method == "close"),
439 Ok(Err((code, msg))) => error_response(&id, code, &msg),
440 Err(_) => error_response(&id, error_code::INTERNAL_ERROR, "handler panicked (caught)"),
441 }
442}
443
444fn dispatch_method(
447 runtime: &RedDBRuntime,
448 session: &mut Session,
449 method: &str,
450 params: &Value,
451) -> Result<Value, (&'static str, String)> {
452 match method {
453 "tx.begin" => {
454 let tx_id = session.open_tx()?;
455 Ok(Value::Object(
456 [
457 ("tx_id".to_string(), Value::Number(tx_id as f64)),
458 (
459 "isolation".to_string(),
460 Value::String("read_committed_deferred".to_string()),
461 ),
462 ]
463 .into_iter()
464 .collect(),
465 ))
466 }
467
468 "tx.commit" => {
469 let tx = session.take_tx().ok_or((
470 error_code::NO_TX_OPEN,
471 "no transaction is open in this session".to_string(),
472 ))?;
473 let tx_id = tx.tx_id;
474 let op_count = tx.write_set.len();
475
476 let replay: Result<(u64, usize), (usize, String)> = (|| {
483 runtime
484 .execute_query("BEGIN")
485 .map_err(|e| (0usize, format!("BEGIN: {e}")))?;
486 let mut total_affected: u64 = 0;
487 for (idx, op) in tx.write_set.iter().enumerate() {
488 match runtime.execute_query(op.sql()) {
489 Ok(qr) => total_affected += qr.affected_rows,
490 Err(e) => {
491 let _ = runtime.execute_query("ROLLBACK");
492 return Err((idx, e.to_string()));
493 }
494 }
495 }
496 runtime
497 .execute_query("COMMIT")
498 .map_err(|e| (op_count, format!("COMMIT: {e}")))?;
499 Ok((total_affected, op_count))
500 })();
501
502 match replay {
503 Ok((affected, replayed)) => Ok(Value::Object(
504 [
505 ("tx_id".to_string(), Value::Number(tx_id as f64)),
506 ("ops_replayed".to_string(), Value::Number(replayed as f64)),
507 ("affected".to_string(), Value::Number(affected as f64)),
508 ]
509 .into_iter()
510 .collect(),
511 )),
512 Err((failed_idx, msg)) => Err((
513 error_code::TX_REPLAY_FAILED,
514 format!(
515 "tx {tx_id} replay failed at op {failed_idx}/{op_count}: {msg} \
516 (ops 0..{failed_idx} already applied and are NOT rolled back)"
517 ),
518 )),
519 }
520 }
521
522 "query.open" => {
523 let sql = params.get("sql").and_then(Value::as_str).ok_or((
524 error_code::INVALID_PARAMS,
525 "missing 'sql' string".to_string(),
526 ))?;
527 let qr = runtime
528 .execute_query(sql)
529 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
530
531 let columns: Vec<String> = qr
535 .result
536 .records
537 .first()
538 .map(|first| {
539 let mut keys: Vec<String> = first
540 .column_names()
541 .into_iter()
542 .map(|k| k.to_string())
543 .collect();
544 keys.sort();
545 keys
546 })
547 .unwrap_or_default();
548
549 let cursor = Cursor::new(columns.clone(), qr.result.records);
550 let total = cursor.total();
551 let cursor_id = session.insert_cursor(cursor)?;
552
553 Ok(Value::Object(
554 [
555 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
556 (
557 "columns".to_string(),
558 Value::Array(columns.into_iter().map(Value::String).collect()),
559 ),
560 ("total_rows".to_string(), Value::Number(total as f64)),
561 ]
562 .into_iter()
563 .collect(),
564 ))
565 }
566
567 "query.next" => {
568 let cursor_id = params
569 .get("cursor_id")
570 .and_then(|v| v.as_f64())
571 .map(|n| n as u64)
572 .ok_or((
573 error_code::INVALID_PARAMS,
574 "missing 'cursor_id' number".to_string(),
575 ))?;
576 let batch_size = params
577 .get("batch_size")
578 .and_then(|v| v.as_f64())
579 .map(|n| n as usize)
580 .unwrap_or(DEFAULT_CURSOR_BATCH_SIZE)
581 .clamp(1, MAX_CURSOR_BATCH_SIZE);
582
583 let (rows, done, remaining) = {
586 let cursor = session.cursor_mut(cursor_id).ok_or((
587 error_code::CURSOR_NOT_FOUND,
588 format!("cursor {cursor_id} not found"),
589 ))?;
590 let batch = cursor.take_batch(batch_size);
591 let rows_json: Vec<Value> = batch.iter().map(record_to_json_object).collect();
592 (rows_json, cursor.is_exhausted(), cursor.remaining())
593 };
594
595 if done {
596 let _ = session.drop_cursor(cursor_id);
599 }
600
601 Ok(Value::Object(
602 [
603 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
604 ("rows".to_string(), Value::Array(rows)),
605 ("done".to_string(), Value::Bool(done)),
606 ("remaining".to_string(), Value::Number(remaining as f64)),
607 ]
608 .into_iter()
609 .collect(),
610 ))
611 }
612
613 "query.close" => {
614 let cursor_id = params
615 .get("cursor_id")
616 .and_then(|v| v.as_f64())
617 .map(|n| n as u64)
618 .ok_or((
619 error_code::INVALID_PARAMS,
620 "missing 'cursor_id' number".to_string(),
621 ))?;
622 let existed = session.drop_cursor(cursor_id).is_some();
623 if !existed {
624 return Err((
625 error_code::CURSOR_NOT_FOUND,
626 format!("cursor {cursor_id} not found"),
627 ));
628 }
629 Ok(Value::Object(
630 [
631 ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
632 ("closed".to_string(), Value::Bool(true)),
633 ]
634 .into_iter()
635 .collect(),
636 ))
637 }
638
639 "tx.rollback" => {
640 let tx = session.take_tx().ok_or((
641 error_code::NO_TX_OPEN,
642 "no transaction is open in this session".to_string(),
643 ))?;
644 let ops_discarded = tx.write_set.len();
645 Ok(Value::Object(
646 [
647 ("tx_id".to_string(), Value::Number(tx.tx_id as f64)),
648 (
649 "ops_discarded".to_string(),
650 Value::Number(ops_discarded as f64),
651 ),
652 ]
653 .into_iter()
654 .collect(),
655 ))
656 }
657
658 "version" => Ok(Value::Object(
659 [
660 (
661 "version".to_string(),
662 Value::String(env!("CARGO_PKG_VERSION").to_string()),
663 ),
664 (
665 "protocol".to_string(),
666 Value::String(PROTOCOL_VERSION.to_string()),
667 ),
668 ]
669 .into_iter()
670 .collect(),
671 )),
672
673 "health" => Ok(Value::Object(
674 [
675 ("ok".to_string(), Value::Bool(true)),
676 (
677 "version".to_string(),
678 Value::String(env!("CARGO_PKG_VERSION").to_string()),
679 ),
680 ]
681 .into_iter()
682 .collect(),
683 )),
684
685 "query" => {
686 let sql = params.get("sql").and_then(Value::as_str).ok_or((
687 error_code::INVALID_PARAMS,
688 "missing 'sql' string".to_string(),
689 ))?;
690
691 let bind_values: Option<Vec<SchemaValue>> = params
694 .get("params")
695 .map(|v| {
696 v.as_array()
697 .ok_or((
698 error_code::INVALID_PARAMS,
699 "'params' must be an array".to_string(),
700 ))
701 .map(|arr| arr.iter().map(json_value_to_schema_value).collect())
702 })
703 .transpose()?;
704
705 if let Some(binds) = bind_values {
706 use crate::storage::query::modes::parse_multi;
707 use crate::storage::query::user_params;
708 let parsed =
709 parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
710 let bound = user_params::bind(&parsed, &binds)
711 .map_err(|e| (error_code::INVALID_PARAMS, e.to_string()))?;
712 let qr = runtime
713 .execute_query_expr(bound)
714 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
715 return Ok(query_result_to_json(&qr));
716 }
717
718 let qr = runtime
719 .execute_query(sql)
720 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
721 Ok(query_result_to_json(&qr))
722 }
723
724 "prepare" => {
733 use crate::storage::query::modes::parse_multi;
734 use crate::storage::query::planner::shape::parameterize_query_expr;
735
736 let sql = params.get("sql").and_then(Value::as_str).ok_or((
737 error_code::INVALID_PARAMS,
738 "missing 'sql' string".to_string(),
739 ))?;
740 let parsed = parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
741 let (shape, parameter_count) = if let Some(prepared) = parameterize_query_expr(&parsed)
742 {
743 (prepared.shape, prepared.parameter_count)
744 } else {
745 (parsed, 0)
746 };
747 let id = session.next_prepared_id;
748 session.next_prepared_id = session.next_prepared_id.saturating_add(1);
749 session.prepared.insert(
750 id,
751 StdioPreparedStatement {
752 shape,
753 parameter_count,
754 },
755 );
756 Ok(Value::Object(
757 [
758 ("prepared_id".to_string(), Value::Number(id as f64)),
759 (
760 "parameter_count".to_string(),
761 Value::Number(parameter_count as f64),
762 ),
763 ]
764 .into_iter()
765 .collect(),
766 ))
767 }
768
769 "execute_prepared" => {
770 use crate::storage::query::planner::shape::bind_parameterized_query;
771 use crate::storage::schema::Value as SV;
772
773 let id = params
774 .get("prepared_id")
775 .and_then(Value::as_f64)
776 .map(|n| n as u64)
777 .ok_or((
778 error_code::INVALID_PARAMS,
779 "missing 'prepared_id'".to_string(),
780 ))?;
781
782 let stmt = session.prepared.get(&id).ok_or((
783 error_code::QUERY_ERROR,
784 format!("no prepared statement with id {id}"),
785 ))?;
786
787 let binds_json: Vec<Value> = params
789 .get("binds")
790 .and_then(Value::as_array)
791 .map(|a| a.to_vec())
792 .unwrap_or_default();
793 if binds_json.len() != stmt.parameter_count {
794 return Err((
795 error_code::INVALID_PARAMS,
796 format!(
797 "expected {} bind values, got {}",
798 stmt.parameter_count,
799 binds_json.len()
800 ),
801 ));
802 }
803
804 let binds: Vec<SV> = binds_json.iter().map(json_value_to_schema_value).collect();
806
807 let expr = if stmt.parameter_count == 0 {
809 stmt.shape.clone()
810 } else {
811 bind_parameterized_query(&stmt.shape, &binds, stmt.parameter_count)
812 .ok_or((error_code::QUERY_ERROR, "bind failed".to_string()))?
813 };
814
815 let qr = runtime
816 .execute_query_expr(expr)
817 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
818 Ok(query_result_to_json(&qr))
819 }
820
821 "insert" => {
822 let collection = params.get("collection").and_then(Value::as_str).ok_or((
823 error_code::INVALID_PARAMS,
824 "missing 'collection' string".to_string(),
825 ))?;
826 let payload = params.get("payload").ok_or((
827 error_code::INVALID_PARAMS,
828 "missing 'payload' object".to_string(),
829 ))?;
830 let payload_obj = payload.as_object().ok_or((
831 error_code::INVALID_PARAMS,
832 "'payload' must be a JSON object".to_string(),
833 ))?;
834 let sql = build_insert_sql(collection, payload_obj.iter());
835
836 if let Some(tx) = session.current_tx_mut() {
837 tx.write_set.push(PendingSql::Insert(sql));
838 return Ok(pending_tx_response(tx.tx_id));
839 }
840
841 let qr = runtime
842 .execute_query(&sql)
843 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
844 Ok(insert_result_to_json(&qr))
845 }
846
847 "bulk_insert" => {
848 let collection = params.get("collection").and_then(Value::as_str).ok_or((
849 error_code::INVALID_PARAMS,
850 "missing 'collection' string".to_string(),
851 ))?;
852 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
853 error_code::INVALID_PARAMS,
854 "missing 'payloads' array".to_string(),
855 ))?;
856
857 if let Some(tx) = session.current_tx_mut() {
858 let mut buffered: u64 = 0;
859 for entry in payloads {
860 let obj = entry.as_object().ok_or((
861 error_code::INVALID_PARAMS,
862 "each payload must be a JSON object".to_string(),
863 ))?;
864 let sql = build_insert_sql(collection, obj.iter());
865 tx.write_set.push(PendingSql::Insert(sql));
866 buffered += 1;
867 }
868 let tx_id = tx.tx_id;
869 return Ok(Value::Object(
870 [
871 ("affected".to_string(), Value::Number(0.0)),
872 ("buffered".to_string(), Value::Number(buffered as f64)),
873 ("pending".to_string(), Value::Bool(true)),
874 ("tx_id".to_string(), Value::Number(tx_id as f64)),
875 ]
876 .into_iter()
877 .collect(),
878 ));
879 }
880
881 let mut total_affected: u64 = 0;
882 for entry in payloads {
883 let obj = entry.as_object().ok_or((
884 error_code::INVALID_PARAMS,
885 "each payload must be a JSON object".to_string(),
886 ))?;
887 let sql = build_insert_sql(collection, obj.iter());
888 let qr = runtime
889 .execute_query(&sql)
890 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
891 total_affected += qr.affected_rows;
892 }
893 Ok(Value::Object(
894 [("affected".to_string(), Value::Number(total_affected as f64))]
895 .into_iter()
896 .collect(),
897 ))
898 }
899
900 "get" => {
901 let collection = params.get("collection").and_then(Value::as_str).ok_or((
902 error_code::INVALID_PARAMS,
903 "missing 'collection' string".to_string(),
904 ))?;
905 let id = params.get("id").and_then(Value::as_str).ok_or((
906 error_code::INVALID_PARAMS,
907 "missing 'id' string".to_string(),
908 ))?;
909 let sql = format!("SELECT * FROM {collection} WHERE _entity_id = {id} LIMIT 1");
910 let qr = runtime
911 .execute_query(&sql)
912 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
913 let entity = qr
914 .result
915 .records
916 .first()
917 .map(record_to_json_object)
918 .unwrap_or(Value::Null);
919 Ok(Value::Object(
920 [("entity".to_string(), entity)].into_iter().collect(),
921 ))
922 }
923
924 "delete" => {
925 let collection = params.get("collection").and_then(Value::as_str).ok_or((
926 error_code::INVALID_PARAMS,
927 "missing 'collection' string".to_string(),
928 ))?;
929 let id = params.get("id").and_then(Value::as_str).ok_or((
930 error_code::INVALID_PARAMS,
931 "missing 'id' string".to_string(),
932 ))?;
933 let sql = format!("DELETE FROM {collection} WHERE _entity_id = {id}");
934
935 if let Some(tx) = session.current_tx_mut() {
936 tx.write_set.push(PendingSql::Delete(sql));
937 return Ok(pending_tx_response(tx.tx_id));
938 }
939
940 let qr = runtime
941 .execute_query(&sql)
942 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
943 Ok(Value::Object(
944 [(
945 "affected".to_string(),
946 Value::Number(qr.affected_rows as f64),
947 )]
948 .into_iter()
949 .collect(),
950 ))
951 }
952
953 "close" => {
954 let _ = session.take_tx();
959 session.clear_cursors();
960 let _ = runtime.checkpoint();
961 Ok(Value::Null)
962 }
963
964 "auth.login"
969 | "auth.whoami"
970 | "auth.change_password"
971 | "auth.create_api_key"
972 | "auth.revoke_api_key" => {
973 let _ = (session, params);
974 Err((
975 error_code::INVALID_REQUEST,
976 format!(
977 "{method}: auth methods are only available on grpc:// connections; \
978 embedded modes (memory://, file://) inherit caller privileges"
979 ),
980 ))
981 }
982
983 other => Err((
984 error_code::INVALID_REQUEST,
985 format!("unknown method: {other}"),
986 )),
987 }
988}
989
990fn success_response(id: &Value, result: &Value, is_close: bool) -> String {
995 let mut envelope = json::Map::new();
1000 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1001 envelope.insert("id".to_string(), id.clone());
1002 envelope.insert("result".to_string(), result.clone());
1003 if is_close {
1004 envelope.insert("__close__".to_string(), Value::Bool(true));
1005 }
1006 Value::Object(envelope).to_string_compact()
1007}
1008
1009fn error_response(id: &Value, code: &str, message: &str) -> String {
1010 let mut err = json::Map::new();
1011 err.insert("code".to_string(), Value::String(code.to_string()));
1012 err.insert("message".to_string(), Value::String(message.to_string()));
1013 err.insert("data".to_string(), Value::Null);
1014
1015 let mut envelope = json::Map::new();
1016 envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1017 envelope.insert("id".to_string(), id.clone());
1018 envelope.insert("error".to_string(), Value::Object(err));
1019 Value::Object(envelope).to_string_compact()
1020}
1021
1022fn pending_tx_response(tx_id: u64) -> Value {
1029 Value::Object(
1030 [
1031 ("affected".to_string(), Value::Number(0.0)),
1032 ("pending".to_string(), Value::Bool(true)),
1033 ("tx_id".to_string(), Value::Number(tx_id as f64)),
1034 ]
1035 .into_iter()
1036 .collect(),
1037 )
1038}
1039
1040pub(crate) fn build_insert_sql<'a, I>(collection: &str, fields: I) -> String
1041where
1042 I: Iterator<Item = (&'a String, &'a Value)>,
1043{
1044 let mut cols = Vec::new();
1045 let mut vals = Vec::new();
1046 for (k, v) in fields {
1047 cols.push(k.clone());
1048 vals.push(value_to_sql_literal(v));
1049 }
1050 format!(
1051 "INSERT INTO {collection} ({}) VALUES ({})",
1052 cols.join(", "),
1053 vals.join(", "),
1054 )
1055}
1056
1057pub(crate) fn value_to_sql_literal(v: &Value) -> String {
1058 match v {
1059 Value::Null => "NULL".to_string(),
1060 Value::Bool(b) => b.to_string(),
1061 Value::Number(n) => {
1062 if n.fract() == 0.0 {
1063 format!("{}", *n as i64)
1064 } else {
1065 n.to_string()
1066 }
1067 }
1068 Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1069 other => format!("'{}'", other.to_string_compact().replace('\'', "''")),
1070 }
1071}
1072
1073fn query_result_to_json(qr: &RuntimeQueryResult) -> Value {
1074 let mut envelope = json::Map::new();
1075 envelope.insert(
1076 "statement".to_string(),
1077 Value::String(qr.statement_type.to_string()),
1078 );
1079 envelope.insert(
1080 "affected".to_string(),
1081 Value::Number(qr.affected_rows as f64),
1082 );
1083
1084 let mut columns = Vec::new();
1085 if let Some(first) = qr.result.records.first() {
1086 let mut keys: Vec<String> = first
1087 .column_names()
1088 .into_iter()
1089 .map(|k| k.to_string())
1090 .collect();
1091 keys.sort();
1092 columns = keys.into_iter().map(Value::String).collect();
1093 }
1094 envelope.insert("columns".to_string(), Value::Array(columns));
1095
1096 let rows: Vec<Value> = qr
1097 .result
1098 .records
1099 .iter()
1100 .map(record_to_json_object)
1101 .collect();
1102 envelope.insert("rows".to_string(), Value::Array(rows));
1103
1104 Value::Object(envelope)
1105}
1106
1107pub(crate) fn insert_result_to_json(qr: &RuntimeQueryResult) -> Value {
1108 let mut envelope = json::Map::new();
1109 envelope.insert(
1110 "affected".to_string(),
1111 Value::Number(qr.affected_rows as f64),
1112 );
1113 if let Some(first) = qr.result.records.first() {
1115 if let Some(id_val) = first
1116 .iter_fields()
1117 .find(|(k, _)| {
1118 let s: &str = k;
1119 s == "_entity_id"
1120 })
1121 .map(|(_, v)| schema_value_to_json(v))
1122 {
1123 envelope.insert("id".to_string(), id_val);
1124 }
1125 }
1126 Value::Object(envelope)
1127}
1128
1129fn record_to_json_object(record: &UnifiedRecord) -> Value {
1130 let mut map = json::Map::new();
1131 let mut entries: Vec<(&str, &SchemaValue)> =
1134 record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
1135 entries.sort_by(|a, b| a.0.cmp(b.0));
1136 for (k, v) in entries {
1137 map.insert(k.to_string(), schema_value_to_json(v));
1138 }
1139 Value::Object(map)
1140}
1141
1142fn schema_value_to_json(v: &SchemaValue) -> Value {
1143 match v {
1144 SchemaValue::Null => Value::Null,
1145 SchemaValue::Boolean(b) => Value::Bool(*b),
1146 SchemaValue::Integer(n) => Value::Number(*n as f64),
1147 SchemaValue::UnsignedInteger(n) => Value::Number(*n as f64),
1148 SchemaValue::Float(n) => Value::Number(*n),
1149 SchemaValue::BigInt(n) => Value::Number(*n as f64),
1150 SchemaValue::TimestampMs(n)
1151 | SchemaValue::Timestamp(n)
1152 | SchemaValue::Duration(n)
1153 | SchemaValue::Decimal(n) => Value::Number(*n as f64),
1154 SchemaValue::Password(_) | SchemaValue::Secret(_) => Value::String("***".to_string()),
1155 SchemaValue::Text(s) => Value::String(s.to_string()),
1156 SchemaValue::Email(s)
1157 | SchemaValue::Url(s)
1158 | SchemaValue::NodeRef(s)
1159 | SchemaValue::EdgeRef(s) => Value::String(s.clone()),
1160 other => Value::String(format!("{other}")),
1161 }
1162}
1163
1164pub(crate) fn json_value_to_schema_value(v: &Value) -> SchemaValue {
1169 match v {
1170 Value::Null => SchemaValue::Null,
1171 Value::Bool(b) => SchemaValue::Boolean(*b),
1172 Value::Number(n) => {
1173 if n.fract() == 0.0 && n.abs() < i64::MAX as f64 {
1174 SchemaValue::Integer(*n as i64)
1175 } else {
1176 SchemaValue::Float(*n)
1177 }
1178 }
1179 Value::String(s) => SchemaValue::text(s.clone()),
1180 Value::Array(items) => {
1181 if items.iter().all(|v| matches!(v, Value::Number(_))) {
1186 let floats: Vec<f32> = items
1187 .iter()
1188 .map(|v| v.as_f64().unwrap_or(0.0) as f32)
1189 .collect();
1190 SchemaValue::Vector(floats)
1191 } else {
1192 SchemaValue::text(crate::json::to_string(v).unwrap_or_default())
1193 }
1194 }
1195 Value::Object(_) => SchemaValue::text(crate::json::to_string(v).unwrap_or_default()),
1196 }
1197}
1198
1199fn dispatch_method_remote(
1209 client: &AsyncMutex<RedDBClient>,
1210 tokio_rt: &tokio::runtime::Runtime,
1211 method: &str,
1212 params: &Value,
1213) -> Result<Value, (&'static str, String)> {
1214 match method {
1215 "version" => Ok(Value::Object(
1216 [
1217 (
1218 "version".to_string(),
1219 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1220 ),
1221 (
1222 "protocol".to_string(),
1223 Value::String(PROTOCOL_VERSION.to_string()),
1224 ),
1225 ]
1226 .into_iter()
1227 .collect(),
1228 )),
1229
1230 "health" => {
1231 let result = tokio_rt.block_on(async {
1232 let mut guard = client.lock().await;
1233 guard.health_status().await
1234 });
1235 match result {
1236 Ok(status) => Ok(Value::Object(
1237 [
1238 ("ok".to_string(), Value::Bool(status.healthy)),
1239 ("state".to_string(), Value::String(status.state)),
1240 (
1241 "checked_at_unix_ms".to_string(),
1242 Value::Number(status.checked_at_unix_ms as f64),
1243 ),
1244 (
1245 "version".to_string(),
1246 Value::String(env!("CARGO_PKG_VERSION").to_string()),
1247 ),
1248 ]
1249 .into_iter()
1250 .collect(),
1251 )),
1252 Err(e) => Err((error_code::INTERNAL_ERROR, e.to_string())),
1253 }
1254 }
1255
1256 "query" => {
1257 let sql = params.get("sql").and_then(Value::as_str).ok_or((
1258 error_code::INVALID_PARAMS,
1259 "missing 'sql' string".to_string(),
1260 ))?;
1261 let json_str = tokio_rt
1262 .block_on(async {
1263 let mut guard = client.lock().await;
1264 guard.query(sql).await
1265 })
1266 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1267 let parsed = json::from_str::<Value>(&json_str)
1272 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1273 Ok(parsed)
1274 }
1275
1276 "insert" => {
1277 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1278 error_code::INVALID_PARAMS,
1279 "missing 'collection' string".to_string(),
1280 ))?;
1281 let payload = params.get("payload").ok_or((
1282 error_code::INVALID_PARAMS,
1283 "missing 'payload' object".to_string(),
1284 ))?;
1285 if payload.as_object().is_none() {
1286 return Err((
1287 error_code::INVALID_PARAMS,
1288 "'payload' must be a JSON object".to_string(),
1289 ));
1290 }
1291 let payload_json = payload.to_string_compact();
1292 let reply = tokio_rt
1293 .block_on(async {
1294 let mut guard = client.lock().await;
1295 guard.create_row_entity(collection, &payload_json).await
1296 })
1297 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1298 let mut out = json::Map::new();
1299 out.insert("affected".to_string(), Value::Number(1.0));
1300 out.insert("id".to_string(), Value::String(reply.id.to_string()));
1301 Ok(Value::Object(out))
1302 }
1303
1304 "bulk_insert" => {
1305 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1306 error_code::INVALID_PARAMS,
1307 "missing 'collection' string".to_string(),
1308 ))?;
1309 let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
1310 error_code::INVALID_PARAMS,
1311 "missing 'payloads' array".to_string(),
1312 ))?;
1313 let mut encoded = Vec::with_capacity(payloads.len());
1314 for entry in payloads {
1315 if entry.as_object().is_none() {
1316 return Err((
1317 error_code::INVALID_PARAMS,
1318 "each payload must be a JSON object".to_string(),
1319 ));
1320 }
1321 encoded.push(entry.to_string_compact());
1322 }
1323 let total = tokio_rt
1324 .block_on(async {
1325 let mut guard = client.lock().await;
1326 guard.bulk_create_rows(collection, encoded).await
1327 })
1328 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?
1329 .count;
1330 Ok(Value::Object(
1331 [("affected".to_string(), Value::Number(total as f64))]
1332 .into_iter()
1333 .collect(),
1334 ))
1335 }
1336
1337 "get" => {
1338 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1339 error_code::INVALID_PARAMS,
1340 "missing 'collection' string".to_string(),
1341 ))?;
1342 let id = params.get("id").and_then(Value::as_str).ok_or((
1343 error_code::INVALID_PARAMS,
1344 "missing 'id' string".to_string(),
1345 ))?;
1346 let sql = format!("SELECT * FROM {collection} WHERE _entity_id = {id} LIMIT 1");
1347 let json_str = tokio_rt
1348 .block_on(async {
1349 let mut guard = client.lock().await;
1350 guard.query(&sql).await
1351 })
1352 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1353 let parsed = json::from_str::<Value>(&json_str)
1354 .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1355 let entity = parsed
1358 .get("rows")
1359 .and_then(Value::as_array)
1360 .and_then(|rows| rows.first().cloned())
1361 .unwrap_or(Value::Null);
1362 Ok(Value::Object(
1363 [("entity".to_string(), entity)].into_iter().collect(),
1364 ))
1365 }
1366
1367 "delete" => {
1368 let collection = params.get("collection").and_then(Value::as_str).ok_or((
1369 error_code::INVALID_PARAMS,
1370 "missing 'collection' string".to_string(),
1371 ))?;
1372 let id = params.get("id").and_then(Value::as_str).ok_or((
1373 error_code::INVALID_PARAMS,
1374 "missing 'id' string".to_string(),
1375 ))?;
1376 let id = id.parse::<u64>().map_err(|_| {
1377 (
1378 error_code::INVALID_PARAMS,
1379 "id must be a numeric string".to_string(),
1380 )
1381 })?;
1382 let _reply = tokio_rt
1383 .block_on(async {
1384 let mut guard = client.lock().await;
1385 guard.delete_entity(collection, id).await
1386 })
1387 .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1388 Ok(Value::Object(
1389 [("affected".to_string(), Value::Number(1.0))]
1390 .into_iter()
1391 .collect(),
1392 ))
1393 }
1394
1395 "close" => Ok(Value::Null),
1396
1397 other => Err((
1398 error_code::INVALID_REQUEST,
1399 format!("unknown method: {other}"),
1400 )),
1401 }
1402}
1403
1404#[cfg(test)]
1405mod tests {
1406 use super::*;
1407
1408 fn make_runtime() -> RedDBRuntime {
1409 RedDBRuntime::in_memory().expect("in-memory runtime")
1410 }
1411
1412 fn handle(rt: &RedDBRuntime, line: &str) -> String {
1413 let mut session = Session::new();
1414 handle_line(&Backend::Local(rt), &mut session, line)
1415 }
1416
1417 fn with_session<F>(rt: &RedDBRuntime, f: F)
1420 where
1421 F: FnOnce(&dyn Fn(&str) -> String, &RedDBRuntime),
1422 {
1423 let session = std::cell::RefCell::new(Session::new());
1424 let call = |line: &str| -> String {
1425 let mut s = session.borrow_mut();
1426 handle_line(&Backend::Local(rt), &mut s, line)
1427 };
1428 f(&call, rt);
1429 }
1430
1431 #[test]
1432 fn version_method_returns_version_and_protocol() {
1433 let rt = make_runtime();
1434 let line = r#"{"jsonrpc":"2.0","id":1,"method":"version","params":{}}"#;
1435 let resp = handle(&rt, line);
1436 assert!(resp.contains("\"id\":1"));
1437 assert!(resp.contains("\"protocol\":\"1.0\""));
1438 assert!(resp.contains("\"version\""));
1439 }
1440
1441 #[test]
1442 fn health_method_returns_ok_true() {
1443 let rt = make_runtime();
1444 let resp = handle(
1445 &rt,
1446 r#"{"jsonrpc":"2.0","id":"abc","method":"health","params":{}}"#,
1447 );
1448 assert!(resp.contains("\"ok\":true"));
1449 assert!(resp.contains("\"id\":\"abc\""));
1450 }
1451
1452 #[test]
1453 fn parse_error_for_invalid_json() {
1454 let rt = make_runtime();
1455 let resp = handle(&rt, "not json {");
1456 assert!(resp.contains("\"code\":\"PARSE_ERROR\""));
1457 assert!(resp.contains("\"id\":null"));
1458 }
1459
1460 #[test]
1461 fn invalid_request_when_method_missing() {
1462 let rt = make_runtime();
1463 let resp = handle(&rt, r#"{"jsonrpc":"2.0","id":1,"params":{}}"#);
1464 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
1465 }
1466
1467 #[test]
1468 fn unknown_method_is_invalid_request() {
1469 let rt = make_runtime();
1470 let resp = handle(
1471 &rt,
1472 r#"{"jsonrpc":"2.0","id":1,"method":"frobnicate","params":{}}"#,
1473 );
1474 assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
1475 assert!(resp.contains("frobnicate"));
1476 }
1477
1478 #[test]
1479 fn invalid_params_when_query_sql_missing() {
1480 let rt = make_runtime();
1481 let resp = handle(
1482 &rt,
1483 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{}}"#,
1484 );
1485 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
1486 }
1487
1488 #[test]
1489 fn close_method_marks_response_for_shutdown() {
1490 let rt = make_runtime();
1491 let resp = handle(
1492 &rt,
1493 r#"{"jsonrpc":"2.0","id":1,"method":"close","params":{}}"#,
1494 );
1495 assert!(resp.contains("\"__close__\":true"));
1496 }
1497
1498 #[test]
1499 fn query_with_int_text_params_round_trips() {
1500 let rt = make_runtime();
1501 let _ = handle(
1502 &rt,
1503 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE p (id INTEGER, name TEXT)"}}"#,
1504 );
1505 let _ = handle(
1506 &rt,
1507 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (1, 'Alice')"}}"#,
1508 );
1509 let _ = handle(
1510 &rt,
1511 r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (2, 'Bob')"}}"#,
1512 );
1513 let resp = handle(
1514 &rt,
1515 r#"{"jsonrpc":"2.0","id":4,"method":"query","params":{"sql":"SELECT * FROM p WHERE id = $1 AND name = $2","params":[1,"Alice"]}}"#,
1516 );
1517 assert!(resp.contains("\"Alice\""), "got: {resp}");
1518 assert!(!resp.contains("\"Bob\""), "got: {resp}");
1519 }
1520
1521 #[test]
1522 fn query_with_params_arity_mismatch_rejected() {
1523 let rt = make_runtime();
1524 let _ = handle(
1525 &rt,
1526 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pa (id INTEGER)"}}"#,
1527 );
1528 let resp = handle(
1529 &rt,
1530 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pa WHERE id = $1","params":[1,2]}}"#,
1531 );
1532 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
1533 }
1534
1535 #[test]
1536 fn query_with_params_gap_rejected() {
1537 let rt = make_runtime();
1538 let _ = handle(
1539 &rt,
1540 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pg (a INTEGER, b INTEGER)"}}"#,
1541 );
1542 let resp = handle(
1543 &rt,
1544 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pg WHERE a = $1 AND b = $3","params":[1,2,3]}}"#,
1545 );
1546 assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
1547 }
1548
1549 #[test]
1550 fn query_select_one_returns_rows() {
1551 let rt = make_runtime();
1552 let resp = handle(
1553 &rt,
1554 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"SELECT 1 AS one"}}"#,
1555 );
1556 assert!(resp.contains("\"result\""));
1557 assert!(!resp.contains("\"error\""));
1558 }
1559
1560 #[test]
1565 fn tx_begin_returns_tx_id_and_isolation() {
1566 let rt = make_runtime();
1567 with_session(&rt, |call, _| {
1568 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1569 assert!(resp.contains("\"tx_id\":1"));
1570 assert!(resp.contains("\"isolation\":\"read_committed_deferred\""));
1571 assert!(!resp.contains("\"error\""));
1572 });
1573 }
1574
1575 #[test]
1576 fn tx_begin_twice_returns_already_open() {
1577 let rt = make_runtime();
1578 with_session(&rt, |call, _| {
1579 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1580 let resp = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
1581 assert!(resp.contains("\"code\":\"TX_ALREADY_OPEN\""));
1582 });
1583 }
1584
1585 #[test]
1586 fn tx_commit_without_begin_returns_no_tx_open() {
1587 let rt = make_runtime();
1588 with_session(&rt, |call, _| {
1589 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.commit","params":null}"#);
1590 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
1591 });
1592 }
1593
1594 #[test]
1595 fn tx_rollback_without_begin_returns_no_tx_open() {
1596 let rt = make_runtime();
1597 with_session(&rt, |call, _| {
1598 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.rollback","params":null}"#);
1599 assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
1600 });
1601 }
1602
1603 #[test]
1604 fn insert_inside_tx_returns_pending_envelope() {
1605 let rt = make_runtime();
1606 let _ = handle(
1608 &rt,
1609 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE users (name TEXT)"}}"#,
1610 );
1611 with_session(&rt, |call, _| {
1612 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1613 let resp = call(
1614 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"users","payload":{"name":"alice"}}}"#,
1615 );
1616 assert!(resp.contains("\"pending\":true"));
1617 assert!(resp.contains("\"tx_id\":1"));
1618 assert!(resp.contains("\"affected\":0"));
1619 });
1620 }
1621
1622 #[test]
1623 fn begin_insert_rollback_does_not_persist() {
1624 let rt = make_runtime();
1625 let _ = handle(
1626 &rt,
1627 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u (name TEXT)"}}"#,
1628 );
1629 with_session(&rt, |call, _| {
1630 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1631 let _ = call(
1632 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u","payload":{"name":"ghost"}}}"#,
1633 );
1634 let rollback = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
1635 assert!(rollback.contains("\"ops_discarded\":1"));
1636 assert!(rollback.contains("\"tx_id\":1"));
1637 });
1638 let resp = handle(
1640 &rt,
1641 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u"}}"#,
1642 );
1643 assert!(!resp.contains("\"ghost\""));
1644 }
1645
1646 #[test]
1647 fn begin_insert_commit_persists() {
1648 let rt = make_runtime();
1649 let _ = handle(
1650 &rt,
1651 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u2 (name TEXT)"}}"#,
1652 );
1653 with_session(&rt, |call, _| {
1654 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1655 let _ = call(
1656 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u2","payload":{"name":"alice"}}}"#,
1657 );
1658 let _ = call(
1659 r#"{"jsonrpc":"2.0","id":3,"method":"insert","params":{"collection":"u2","payload":{"name":"bob"}}}"#,
1660 );
1661 let commit = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.commit","params":null}"#);
1662 assert!(commit.contains("\"ops_replayed\":2"));
1663 assert!(!commit.contains("\"error\""));
1664 });
1665 let resp = handle(
1666 &rt,
1667 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u2"}}"#,
1668 );
1669 assert!(resp.contains("\"alice\""));
1670 assert!(resp.contains("\"bob\""));
1671 }
1672
1673 #[test]
1674 fn bulk_insert_inside_tx_buffers_everything() {
1675 let rt = make_runtime();
1676 let _ = handle(
1677 &rt,
1678 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u3 (name TEXT)"}}"#,
1679 );
1680 with_session(&rt, |call, _| {
1681 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1682 let resp = call(
1683 r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"u3","payloads":[{"name":"a"},{"name":"b"},{"name":"c"}]}}"#,
1684 );
1685 assert!(resp.contains("\"buffered\":3"));
1686 assert!(resp.contains("\"pending\":true"));
1687 assert!(resp.contains("\"affected\":0"));
1688
1689 let commit = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
1690 assert!(commit.contains("\"ops_replayed\":3"));
1691 });
1692 }
1693
1694 #[test]
1695 fn delete_inside_tx_is_buffered() {
1696 let rt = make_runtime();
1697 let _ = handle(
1699 &rt,
1700 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u4 (name TEXT)"}}"#,
1701 );
1702 let _ = handle(
1703 &rt,
1704 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO u4 (name) VALUES ('keep')"}}"#,
1705 );
1706 with_session(&rt, |call, _| {
1707 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1708 let resp = call(
1709 r#"{"jsonrpc":"2.0","id":2,"method":"delete","params":{"collection":"u4","id":"1"}}"#,
1710 );
1711 assert!(resp.contains("\"pending\":true"));
1712 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
1713 });
1714 let resp = handle(
1716 &rt,
1717 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u4"}}"#,
1718 );
1719 assert!(resp.contains("\"keep\""));
1720 }
1721
1722 #[test]
1723 fn close_with_open_tx_auto_rollbacks() {
1724 let rt = make_runtime();
1725 let _ = handle(
1726 &rt,
1727 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u5 (name TEXT)"}}"#,
1728 );
1729 with_session(&rt, |call, _| {
1730 let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1731 let _ = call(
1732 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u5","payload":{"name":"ghost"}}}"#,
1733 );
1734 let close = call(r#"{"jsonrpc":"2.0","id":3,"method":"close","params":null}"#);
1735 assert!(close.contains("\"__close__\":true"));
1736 assert!(!close.contains("\"error\""));
1737 });
1738 let resp = handle(
1739 &rt,
1740 r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u5"}}"#,
1741 );
1742 assert!(!resp.contains("\"ghost\""));
1743 }
1744
1745 fn seed_numbers_table(rt: &RedDBRuntime, table: &str, count: u32) {
1750 let _ = handle(
1751 rt,
1752 &format!(
1753 r#"{{"jsonrpc":"2.0","id":1,"method":"query","params":{{"sql":"CREATE TABLE {table} (n INTEGER)"}}}}"#,
1754 ),
1755 );
1756 for i in 0..count {
1757 let _ = handle(
1758 rt,
1759 &format!(
1760 r#"{{"jsonrpc":"2.0","id":2,"method":"query","params":{{"sql":"INSERT INTO {table} (n) VALUES ({i})"}}}}"#,
1761 ),
1762 );
1763 }
1764 }
1765
1766 #[test]
1767 fn cursor_open_returns_id_columns_and_total() {
1768 let rt = make_runtime();
1769 seed_numbers_table(&rt, "nums1", 3);
1770 with_session(&rt, |call, _| {
1771 let resp = call(
1772 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums1"}}"#,
1773 );
1774 assert!(resp.contains("\"cursor_id\":1"));
1775 assert!(resp.contains("\"total_rows\":3"));
1776 assert!(resp.contains("\"columns\""));
1777 assert!(!resp.contains("\"error\""));
1778 });
1779 }
1780
1781 #[test]
1782 fn cursor_next_chunks_rows_and_signals_done() {
1783 let rt = make_runtime();
1784 seed_numbers_table(&rt, "nums2", 5);
1785 with_session(&rt, |call, _| {
1786 let _ = call(
1787 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums2"}}"#,
1788 );
1789 let first = call(
1790 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
1791 );
1792 assert!(first.contains("\"done\":false"));
1793 assert!(first.contains("\"remaining\":3"));
1794
1795 let second = call(
1796 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
1797 );
1798 assert!(second.contains("\"done\":false"));
1799 assert!(second.contains("\"remaining\":1"));
1800
1801 let third = call(
1802 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
1803 );
1804 assert!(third.contains("\"done\":true"));
1805 assert!(third.contains("\"remaining\":0"));
1806 });
1807 }
1808
1809 #[test]
1810 fn cursor_auto_drops_when_exhausted() {
1811 let rt = make_runtime();
1812 seed_numbers_table(&rt, "nums3", 2);
1813 with_session(&rt, |call, _| {
1814 let _ = call(
1815 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums3"}}"#,
1816 );
1817 let _ = call(
1818 r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
1819 );
1820 let resp = call(
1823 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
1824 );
1825 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1826 });
1827 }
1828
1829 #[test]
1830 fn cursor_close_removes_it() {
1831 let rt = make_runtime();
1832 seed_numbers_table(&rt, "nums4", 3);
1833 with_session(&rt, |call, _| {
1834 let _ = call(
1835 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums4"}}"#,
1836 );
1837 let close =
1838 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.close","params":{"cursor_id":1}}"#);
1839 assert!(close.contains("\"closed\":true"));
1840 let after = call(
1841 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
1842 );
1843 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1844 });
1845 }
1846
1847 #[test]
1848 fn cursor_close_unknown_errors() {
1849 let rt = make_runtime();
1850 with_session(&rt, |call, _| {
1851 let resp = call(
1852 r#"{"jsonrpc":"2.0","id":1,"method":"query.close","params":{"cursor_id":9999}}"#,
1853 );
1854 assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1855 });
1856 }
1857
1858 #[test]
1859 fn cursor_next_without_cursor_id_errors() {
1860 let rt = make_runtime();
1861 with_session(&rt, |call, _| {
1862 let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"query.next","params":{}}"#);
1863 assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
1864 });
1865 }
1866
1867 #[test]
1868 fn cursor_default_batch_size_returns_all_when_smaller_than_default() {
1869 let rt = make_runtime();
1870 seed_numbers_table(&rt, "nums5", 7);
1871 with_session(&rt, |call, _| {
1872 let _ = call(
1873 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums5"}}"#,
1874 );
1875 let resp =
1877 call(r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1}}"#);
1878 assert!(resp.contains("\"done\":true"));
1879 assert!(resp.contains("\"remaining\":0"));
1880 });
1881 }
1882
1883 #[test]
1884 fn close_method_drops_open_cursors() {
1885 let rt = make_runtime();
1886 seed_numbers_table(&rt, "nums6", 3);
1887 with_session(&rt, |call, _| {
1890 let _ = call(
1891 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums6"}}"#,
1892 );
1893 let close = call(r#"{"jsonrpc":"2.0","id":2,"method":"close","params":null}"#);
1894 assert!(close.contains("\"__close__\":true"));
1895 let after = call(
1897 r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
1898 );
1899 assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1900 });
1901 }
1902
1903 #[test]
1904 fn cursor_independent_of_transaction_state() {
1905 let rt = make_runtime();
1906 seed_numbers_table(&rt, "nums7", 4);
1907 with_session(&rt, |call, _| {
1908 let _ = call(
1910 r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums7"}}"#,
1911 );
1912 let _ = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
1913 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
1914 let resp = call(
1915 r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
1916 );
1917 assert!(resp.contains("\"done\":true"));
1918 assert!(!resp.contains("\"error\""));
1919 });
1920 }
1921
1922 #[test]
1923 fn second_tx_after_commit_gets_fresh_id() {
1924 let rt = make_runtime();
1925 let _ = handle(
1926 &rt,
1927 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u6 (name TEXT)"}}"#,
1928 );
1929 with_session(&rt, |call, _| {
1930 let first = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1931 assert!(first.contains("\"tx_id\":1"));
1932 let _ = call(
1933 r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u6","payload":{"name":"x"}}}"#,
1934 );
1935 let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
1936
1937 let second = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.begin","params":null}"#);
1938 assert!(second.contains("\"tx_id\":2"));
1939 let _ = call(r#"{"jsonrpc":"2.0","id":5,"method":"tx.rollback","params":null}"#);
1940 });
1941 }
1942
1943 #[test]
1944 fn prepare_and_execute_prepared_statement() {
1945 let rt = make_runtime();
1946 let _ = handle(
1948 &rt,
1949 r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE ps_test (n INTEGER)"}}"#,
1950 );
1951 let _ = handle(
1952 &rt,
1953 r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO ps_test (n) VALUES (42)"}}"#,
1954 );
1955
1956 with_session(&rt, |call, _| {
1957 let prep = call(
1959 r#"{"jsonrpc":"2.0","id":3,"method":"prepare","params":{"sql":"SELECT n FROM ps_test WHERE n = 42"}}"#,
1960 );
1961 assert!(prep.contains("\"prepared_id\""), "prepare response: {prep}");
1962
1963 let id: u64 = {
1965 let v: crate::json::Value = crate::json::from_str(&prep).expect("json");
1966 let result = v.get("result").expect("result");
1967 result
1968 .get("prepared_id")
1969 .and_then(|n| n.as_f64())
1970 .expect("prepared_id") as u64
1971 };
1972
1973 let exec = call(&format!(
1975 r#"{{"jsonrpc":"2.0","id":4,"method":"execute_prepared","params":{{"prepared_id":{id},"binds":[42]}}}}"#
1976 ));
1977 assert!(
1979 exec.contains("\"rows\""),
1980 "execute_prepared response: {exec}"
1981 );
1982 assert!(exec.contains("42"), "expected row with n=42 in: {exec}");
1983 });
1984 }
1985}