Skip to main content

reddb_server/
rpc_stdio.rs

1//! JSON-RPC 2.0 line-delimited stdio mode for the `red` binary.
2//!
3//! See `PLAN_DRIVERS.md` for the protocol spec. This module is the
4//! sole server-side implementation of the protocol — drivers in
5//! every language target this contract.
6//!
7//! Loop:
8//!   1. Read a line from stdin (UTF-8, terminated by `\n`).
9//!   2. Parse it as a JSON-RPC 2.0 request envelope.
10//!   3. Dispatch on `method` to the runtime.
11//!   4. Serialize the response as a single line on stdout, flush.
12//!   5. Repeat until EOF or `close` method received.
13//!
14//! Errors do not crash the loop. Panics inside a method handler are
15//! caught and reported as `INTERNAL_ERROR` so a buggy query cannot
16//! kill the daemon.
17
18use std::io::{BufRead, BufReader, Stdin, Write};
19use std::panic::AssertUnwindSafe;
20
21use tokio::sync::Mutex as AsyncMutex;
22
23use crate::json::{self as json, Value};
24use crate::runtime::{RedDBRuntime, RuntimeQueryResult};
25use crate::storage::query::unified::UnifiedRecord;
26use crate::storage::schema::Value as SchemaValue;
27use reddb_client_connector::RedDBClient;
28
29/// Which backend the stdio loop is wrapping.
30///
31/// `Local` = the in-process engine (embedded). `Remote` = a tonic client
32/// to a standalone `red server` talking gRPC. The remote variant is
33/// boxed because `RedDBClient` + a `tokio::Runtime` reference is ~248
34/// bytes against `Local`'s ~8 bytes (clippy::large_enum_variant).
35///
36/// The mutex uses `tokio::sync::Mutex` instead of `std::sync::Mutex`
37/// because `dispatch_method_remote` holds the guard across `.await`
38/// points inside `tokio_rt.block_on(...)` — holding a sync mutex
39/// across an await would be a correctness bug in more complex
40/// runtimes.
41enum Backend<'a> {
42    Local(&'a RedDBRuntime),
43    Remote(Box<RemoteBackend<'a>>),
44}
45
46struct RemoteBackend<'a> {
47    client: AsyncMutex<RedDBClient>,
48    tokio_rt: &'a tokio::runtime::Runtime,
49}
50
51/// Protocol version reported by the `version` method.
52pub const PROTOCOL_VERSION: &str = "1.0";
53
54/// Stable error codes. Drivers map these to idiomatic exceptions.
55pub mod error_code {
56    pub const PARSE_ERROR: &str = "PARSE_ERROR";
57    pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
58    pub const INVALID_PARAMS: &str = "INVALID_PARAMS";
59    pub const QUERY_ERROR: &str = "QUERY_ERROR";
60    pub const NOT_FOUND: &str = "NOT_FOUND";
61    pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
62    /// `tx.begin` was called while a transaction was already open in the
63    /// same session.
64    pub const TX_ALREADY_OPEN: &str = "TX_ALREADY_OPEN";
65    /// `tx.commit` / `tx.rollback` was called without a matching
66    /// `tx.begin`.
67    pub const NO_TX_OPEN: &str = "NO_TX_OPEN";
68    /// A buffered statement failed during `tx.commit` replay. The error
69    /// message carries the index of the failing op and the number of
70    /// operations that successfully applied before the failure.
71    pub const TX_REPLAY_FAILED: &str = "TX_REPLAY_FAILED";
72    /// Transactions over the remote gRPC proxy are not supported yet.
73    pub const TX_NOT_SUPPORTED_REMOTE: &str = "TX_NOT_SUPPORTED_REMOTE";
74    /// `query.next` / `query.close` referenced an unknown cursor id.
75    /// Either the cursor was never opened, already closed, or was
76    /// automatically dropped when its rows were exhausted.
77    pub const CURSOR_NOT_FOUND: &str = "CURSOR_NOT_FOUND";
78    /// Too many concurrent cursors open in a single session.
79    pub const CURSOR_LIMIT_EXCEEDED: &str = "CURSOR_LIMIT_EXCEEDED";
80}
81
82/// Maximum number of cursors a single stdio session may hold open
83/// simultaneously. Serves as a memory-pressure guard against runaway
84/// clients that `query.open` without ever closing.
85pub(crate) const MAX_CURSORS_PER_SESSION: usize = 64;
86/// Default batch size for `query.next` when the client does not specify
87/// one explicitly. Tuned for small-to-medium rows; large-row clients
88/// should set a smaller value.
89pub(crate) const DEFAULT_CURSOR_BATCH_SIZE: usize = 100;
90/// Hard upper bound on `query.next` batch size. Prevents a single call
91/// from stalling the stdio loop with a multi-megabyte line.
92pub(crate) const MAX_CURSOR_BATCH_SIZE: usize = 10_000;
93
94// ---------------------------------------------------------------------------
95// Session state (transaction buffer)
96// ---------------------------------------------------------------------------
97//
98// Transactions in the stdio protocol are scoped to a single connection —
99// one process = one session = at most one open transaction. The state
100// lives in the stack of `run_backend` so nothing leaks between
101// connections, and there is no cross-session visibility of buffered
102// writes.
103//
104// Isolation model: `read_committed_deferred`. Reads inside a transaction
105// observe the latest *committed* state; they do **not** see writes the
106// same session has buffered via `insert` / `delete` / `bulk_insert`.
107// Atomicity is best-effort — a global commit lock serializes replays, but
108// auto-committed writes from other sessions may interleave between
109// commits. Strict atomicity requires funnelling every write through a
110// single session.
111
112/// Per-connection session that tracks the currently open transaction
113/// and any active streaming cursors.
114// A server-side prepared statement bound to this session.
115// When parameter_count == 0, shape == the exact plan (no substitution needed).
116struct StdioPreparedStatement {
117    shape: crate::storage::query::ast::QueryExpr,
118    parameter_count: usize,
119}
120
121pub(crate) struct Session {
122    next_tx_id: u64,
123    current_tx: Option<OpenTx>,
124    next_cursor_id: u64,
125    cursors: std::collections::HashMap<u64, Cursor>,
126    /// Monotone counter for prepared statement IDs within this session.
127    next_prepared_id: u64,
128    /// Active prepared statements, keyed by the ID returned to the client.
129    prepared: std::collections::HashMap<u64, StdioPreparedStatement>,
130}
131
132impl Session {
133    pub(crate) fn new() -> Self {
134        Self {
135            next_tx_id: 1,
136            current_tx: None,
137            next_cursor_id: 1,
138            cursors: std::collections::HashMap::new(),
139            next_prepared_id: 1,
140            prepared: std::collections::HashMap::new(),
141        }
142    }
143
144    fn open_tx(&mut self) -> Result<u64, (&'static str, String)> {
145        if let Some(tx) = &self.current_tx {
146            return Err((
147                error_code::TX_ALREADY_OPEN,
148                format!("transaction {} already open in this session", tx.tx_id),
149            ));
150        }
151        let tx_id = self.next_tx_id;
152        self.next_tx_id = self.next_tx_id.saturating_add(1);
153        self.current_tx = Some(OpenTx {
154            tx_id,
155            write_set: Vec::new(),
156        });
157        Ok(tx_id)
158    }
159
160    fn take_tx(&mut self) -> Option<OpenTx> {
161        self.current_tx.take()
162    }
163
164    fn current_tx_mut(&mut self) -> Option<&mut OpenTx> {
165        self.current_tx.as_mut()
166    }
167
168    #[allow(dead_code)]
169    fn has_tx(&self) -> bool {
170        self.current_tx.is_some()
171    }
172
173    /// Register a freshly materialised cursor and return its id.
174    /// Enforces [`MAX_CURSORS_PER_SESSION`] before allocating.
175    fn insert_cursor(&mut self, cursor: Cursor) -> Result<u64, (&'static str, String)> {
176        if self.cursors.len() >= MAX_CURSORS_PER_SESSION {
177            return Err((
178                error_code::CURSOR_LIMIT_EXCEEDED,
179                format!(
180                    "session already holds {} cursors (max {}) — close some before opening new ones",
181                    self.cursors.len(),
182                    MAX_CURSORS_PER_SESSION
183                ),
184            ));
185        }
186        let id = self.next_cursor_id;
187        self.next_cursor_id = self.next_cursor_id.saturating_add(1);
188        let mut cursor = cursor;
189        cursor.cursor_id = id;
190        self.cursors.insert(id, cursor);
191        Ok(id)
192    }
193
194    fn cursor_mut(&mut self, id: u64) -> Option<&mut Cursor> {
195        self.cursors.get_mut(&id)
196    }
197
198    fn drop_cursor(&mut self, id: u64) -> Option<Cursor> {
199        self.cursors.remove(&id)
200    }
201
202    fn clear_cursors(&mut self) {
203        self.cursors.clear();
204    }
205}
206
207impl Default for Session {
208    fn default() -> Self {
209        Self::new()
210    }
211}
212
213/// An in-flight transaction for a single stdio session.
214struct OpenTx {
215    tx_id: u64,
216    write_set: Vec<PendingSql>,
217}
218
219/// A buffered mutation waiting for `tx.commit`. Each variant carries a
220/// ready-to-execute SQL string so the replay loop is a straight
221/// `execute_query` call.
222enum PendingSql {
223    Insert(String),
224    Delete(String),
225    #[allow(dead_code)] // reserved for future query()-in-tx routing
226    Update(String),
227}
228
229impl PendingSql {
230    fn sql(&self) -> &str {
231        match self {
232            PendingSql::Insert(s) | PendingSql::Delete(s) | PendingSql::Update(s) => s,
233        }
234    }
235}
236
237/// An open streaming cursor over a materialised query result.
238///
239/// MVP model: the underlying [`RuntimeQueryResult`] has already been
240/// fully executed at `query.open` time and lives inside the cursor.
241/// Each `query.next` call slices off `batch_size` rows from the tail and
242/// advances `position`. This pays normal memory cost but lets the client
243/// consume the result in chunks, abort mid-stream, or pipeline the next
244/// batch request while processing the previous one.
245///
246/// A future iteration can swap the rows field for a lazy iterator pulled
247/// from the execution engine without changing the wire protocol.
248pub(crate) struct Cursor {
249    cursor_id: u64,
250    columns: Vec<String>,
251    rows: Vec<UnifiedRecord>,
252    position: usize,
253}
254
255impl Cursor {
256    fn new(columns: Vec<String>, rows: Vec<UnifiedRecord>) -> Self {
257        Self {
258            cursor_id: 0, // overwritten by Session::insert_cursor
259            columns,
260            rows,
261            position: 0,
262        }
263    }
264
265    fn total(&self) -> usize {
266        self.rows.len()
267    }
268
269    fn remaining(&self) -> usize {
270        self.rows.len().saturating_sub(self.position)
271    }
272
273    fn is_exhausted(&self) -> bool {
274        self.position >= self.rows.len()
275    }
276
277    /// Extract up to `batch_size` rows from the current position forward.
278    /// Advances the position to the end of the returned slice.
279    fn take_batch(&mut self, batch_size: usize) -> &[UnifiedRecord] {
280        let end = (self.position + batch_size).min(self.rows.len());
281        let slice = &self.rows[self.position..end];
282        self.position = end;
283        slice
284    }
285}
286
287/// Run the stdio JSON-RPC loop against a local in-process runtime.
288///
289/// Returns the process exit code. `0` on normal shutdown (EOF or
290/// explicit `close`). Non-zero only on fatal I/O errors reading
291/// stdin or writing stdout.
292pub fn run(runtime: &RedDBRuntime) -> i32 {
293    run_with_io(runtime, std::io::stdin(), &mut std::io::stdout())
294}
295
296/// Run the stdio JSON-RPC loop as a proxy to a remote gRPC server.
297///
298/// Every method is forwarded via tonic. This is what
299/// `red rpc --stdio --connect grpc://host:port` uses, and it is also
300/// what the JS and Python drivers spawn when the user calls
301/// `connect("grpc://...")`.
302pub fn run_remote(endpoint: &str, token: Option<String>) -> i32 {
303    let tokio_rt = match tokio::runtime::Builder::new_current_thread()
304        .enable_all()
305        .build()
306    {
307        Ok(rt) => rt,
308        Err(e) => {
309            tracing::error!(err = %e, "rpc: failed to build tokio runtime");
310            return 1;
311        }
312    };
313    let client = match tokio_rt.block_on(RedDBClient::connect(endpoint, token)) {
314        Ok(c) => c,
315        Err(e) => {
316            tracing::error!(endpoint, err = %e, "rpc: failed to connect");
317            return 1;
318        }
319    };
320    let backend = Backend::Remote(Box::new(RemoteBackend {
321        client: AsyncMutex::new(client),
322        tokio_rt: &tokio_rt,
323    }));
324    run_backend(&backend, std::io::stdin(), &mut std::io::stdout())
325}
326
327/// Same as [`run`] but takes explicit I/O handles. Used by tests.
328pub fn run_with_io<W: Write>(runtime: &RedDBRuntime, stdin: Stdin, stdout: &mut W) -> i32 {
329    run_backend(&Backend::Local(runtime), stdin, stdout)
330}
331
332/// Per-stdio-session connection-id counter. Each session captures a
333/// unique id so its `tx.commit` BEGIN/COMMIT pair routes to a distinct
334/// `TxnContext` in the runtime — without this every stdio session
335/// would share `conn_id = 0` and trample each other's transactions.
336/// Starts at a high base so we don't collide with PG-wire / gRPC
337/// transports that allocate from their own pools below.
338static STDIO_SESSION_CONN_ID: std::sync::atomic::AtomicU64 =
339    std::sync::atomic::AtomicU64::new(1_000_000);
340
341fn next_stdio_conn_id() -> u64 {
342    STDIO_SESSION_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
343}
344
345fn run_backend<W: Write>(backend: &Backend<'_>, stdin: Stdin, stdout: &mut W) -> i32 {
346    let reader = BufReader::new(stdin.lock());
347    let mut session = Session::new();
348    // Bind the session to a stable connection id so the runtime's
349    // `tx_contexts` (keyed by conn_id) survives across `handle_line`
350    // calls within the same session.
351    let conn_id = next_stdio_conn_id();
352    crate::runtime::impl_core::set_current_connection_id(conn_id);
353    for line_result in reader.lines() {
354        let line = match line_result {
355            Ok(l) => l,
356            Err(e) => {
357                let _ = writeln!(
358                    stdout,
359                    "{}",
360                    error_response(&Value::Null, error_code::INTERNAL_ERROR, &e.to_string())
361                );
362                let _ = stdout.flush();
363                return 1;
364            }
365        };
366        if line.trim().is_empty() {
367            continue;
368        }
369        let response = handle_line(backend, &mut session, &line);
370        if writeln!(stdout, "{}", response).is_err() || stdout.flush().is_err() {
371            return 1;
372        }
373        if response.contains("\"__close__\":true") {
374            return 0;
375        }
376    }
377    // EOF: silently drop any open transaction — atomicity is preserved
378    // (nothing was ever applied to the store) and no error is surfaced to
379    // the caller because EOF may be graceful client disconnect.
380    let _ = session.take_tx();
381    crate::runtime::impl_core::clear_current_connection_id();
382    0
383}
384
385/// Parse one input line and dispatch. Always returns a single-line
386/// JSON string suitable for direct write to stdout. Never panics
387/// (panics inside handlers are caught and reported).
388fn handle_line(backend: &Backend<'_>, session: &mut Session, line: &str) -> String {
389    let parsed: Value = match json::from_str(line) {
390        Ok(v) => v,
391        Err(err) => {
392            return error_response(
393                &Value::Null,
394                error_code::PARSE_ERROR,
395                &format!("invalid JSON: {err}"),
396            );
397        }
398    };
399
400    let id = parsed.get("id").cloned().unwrap_or(Value::Null);
401
402    let method = match parsed.get("method").and_then(Value::as_str) {
403        Some(m) => m.to_string(),
404        None => {
405            return error_response(&id, error_code::INVALID_REQUEST, "missing 'method' field");
406        }
407    };
408
409    let params = parsed.get("params").cloned().unwrap_or(Value::Null);
410
411    let dispatch = std::panic::catch_unwind(AssertUnwindSafe(|| match backend {
412        Backend::Local(rt) => dispatch_method(rt, session, &method, &params),
413        Backend::Remote(remote) => {
414            // Transactions are session-local and the remote path forwards
415            // each call independently — there is no place to park a tx
416            // handle across gRPC hops yet. Surface a clear error so
417            // drivers can fall back to per-call auto-commit.
418            if matches!(
419                method.as_str(),
420                "tx.begin"
421                    | "tx.commit"
422                    | "tx.rollback"
423                    | "query.open"
424                    | "query.next"
425                    | "query.close"
426            ) {
427                Err((
428                    error_code::TX_NOT_SUPPORTED_REMOTE,
429                    format!("{method} is not supported over remote gRPC yet"),
430                ))
431            } else {
432                dispatch_method_remote(&remote.client, remote.tokio_rt, &method, &params)
433            }
434        }
435    }));
436
437    match dispatch {
438        Ok(Ok(result)) => success_response(&id, &result, method == "close"),
439        Ok(Err((code, msg))) => error_response(&id, code, &msg),
440        Err(_) => error_response(&id, error_code::INTERNAL_ERROR, "handler panicked (caught)"),
441    }
442}
443
444/// Dispatch a parsed method call. Returns the `result` value on
445/// success or `(error_code, message)` on failure.
446fn dispatch_method(
447    runtime: &RedDBRuntime,
448    session: &mut Session,
449    method: &str,
450    params: &Value,
451) -> Result<Value, (&'static str, String)> {
452    match method {
453        "tx.begin" => {
454            let tx_id = session.open_tx()?;
455            Ok(Value::Object(
456                [
457                    ("tx_id".to_string(), Value::Number(tx_id as f64)),
458                    (
459                        "isolation".to_string(),
460                        Value::String("read_committed_deferred".to_string()),
461                    ),
462                ]
463                .into_iter()
464                .collect(),
465            ))
466        }
467
468        "tx.commit" => {
469            let tx = session.take_tx().ok_or((
470                error_code::NO_TX_OPEN,
471                "no transaction is open in this session".to_string(),
472            ))?;
473            let tx_id = tx.tx_id;
474            let op_count = tx.write_set.len();
475
476            // Drive the replay through a real engine transaction so
477            // failures roll back the buffered write_set atomically.
478            // Replaces the legacy `commit_lock`-serialised replay:
479            // cross-session ordering is now provided by the
480            // snapshot-manager's xid allocation, which is what the
481            // SQL `BEGIN`/`COMMIT` path has used since #31.
482            let replay: Result<(u64, usize), (usize, String)> = (|| {
483                runtime
484                    .execute_query("BEGIN")
485                    .map_err(|e| (0usize, format!("BEGIN: {e}")))?;
486                let mut total_affected: u64 = 0;
487                for (idx, op) in tx.write_set.iter().enumerate() {
488                    match runtime.execute_query(op.sql()) {
489                        Ok(qr) => total_affected += qr.affected_rows,
490                        Err(e) => {
491                            let _ = runtime.execute_query("ROLLBACK");
492                            return Err((idx, e.to_string()));
493                        }
494                    }
495                }
496                runtime
497                    .execute_query("COMMIT")
498                    .map_err(|e| (op_count, format!("COMMIT: {e}")))?;
499                Ok((total_affected, op_count))
500            })();
501
502            match replay {
503                Ok((affected, replayed)) => Ok(Value::Object(
504                    [
505                        ("tx_id".to_string(), Value::Number(tx_id as f64)),
506                        ("ops_replayed".to_string(), Value::Number(replayed as f64)),
507                        ("affected".to_string(), Value::Number(affected as f64)),
508                    ]
509                    .into_iter()
510                    .collect(),
511                )),
512                Err((failed_idx, msg)) => Err((
513                    error_code::TX_REPLAY_FAILED,
514                    format!(
515                        "tx {tx_id} replay failed at op {failed_idx}/{op_count}: {msg} \
516                         (ops 0..{failed_idx} already applied and are NOT rolled back)"
517                    ),
518                )),
519            }
520        }
521
522        "query.open" => {
523            let sql = params.get("sql").and_then(Value::as_str).ok_or((
524                error_code::INVALID_PARAMS,
525                "missing 'sql' string".to_string(),
526            ))?;
527            let qr = runtime
528                .execute_query(sql)
529                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
530
531            // Extract the column list from the first record. Consistent
532            // with query_result_to_json which uses the first row's keys
533            // as schema.
534            let columns: Vec<String> = qr
535                .result
536                .records
537                .first()
538                .map(|first| {
539                    let mut keys: Vec<String> = first
540                        .column_names()
541                        .into_iter()
542                        .map(|k| k.to_string())
543                        .collect();
544                    keys.sort();
545                    keys
546                })
547                .unwrap_or_default();
548
549            let cursor = Cursor::new(columns.clone(), qr.result.records);
550            let total = cursor.total();
551            let cursor_id = session.insert_cursor(cursor)?;
552
553            Ok(Value::Object(
554                [
555                    ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
556                    (
557                        "columns".to_string(),
558                        Value::Array(columns.into_iter().map(Value::String).collect()),
559                    ),
560                    ("total_rows".to_string(), Value::Number(total as f64)),
561                ]
562                .into_iter()
563                .collect(),
564            ))
565        }
566
567        "query.next" => {
568            let cursor_id = params
569                .get("cursor_id")
570                .and_then(|v| v.as_f64())
571                .map(|n| n as u64)
572                .ok_or((
573                    error_code::INVALID_PARAMS,
574                    "missing 'cursor_id' number".to_string(),
575                ))?;
576            let batch_size = params
577                .get("batch_size")
578                .and_then(|v| v.as_f64())
579                .map(|n| n as usize)
580                .unwrap_or(DEFAULT_CURSOR_BATCH_SIZE)
581                .clamp(1, MAX_CURSOR_BATCH_SIZE);
582
583            // Extract the batch inside a bounded borrow so we can
584            // drop the cursor afterwards without borrow-conflict.
585            let (rows, done, remaining) = {
586                let cursor = session.cursor_mut(cursor_id).ok_or((
587                    error_code::CURSOR_NOT_FOUND,
588                    format!("cursor {cursor_id} not found"),
589                ))?;
590                let batch = cursor.take_batch(batch_size);
591                let rows_json: Vec<Value> = batch.iter().map(record_to_json_object).collect();
592                (rows_json, cursor.is_exhausted(), cursor.remaining())
593            };
594
595            if done {
596                // Auto-drop exhausted cursors so long-lived sessions
597                // don't accumulate dead state.
598                let _ = session.drop_cursor(cursor_id);
599            }
600
601            Ok(Value::Object(
602                [
603                    ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
604                    ("rows".to_string(), Value::Array(rows)),
605                    ("done".to_string(), Value::Bool(done)),
606                    ("remaining".to_string(), Value::Number(remaining as f64)),
607                ]
608                .into_iter()
609                .collect(),
610            ))
611        }
612
613        "query.close" => {
614            let cursor_id = params
615                .get("cursor_id")
616                .and_then(|v| v.as_f64())
617                .map(|n| n as u64)
618                .ok_or((
619                    error_code::INVALID_PARAMS,
620                    "missing 'cursor_id' number".to_string(),
621                ))?;
622            let existed = session.drop_cursor(cursor_id).is_some();
623            if !existed {
624                return Err((
625                    error_code::CURSOR_NOT_FOUND,
626                    format!("cursor {cursor_id} not found"),
627                ));
628            }
629            Ok(Value::Object(
630                [
631                    ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
632                    ("closed".to_string(), Value::Bool(true)),
633                ]
634                .into_iter()
635                .collect(),
636            ))
637        }
638
639        "tx.rollback" => {
640            let tx = session.take_tx().ok_or((
641                error_code::NO_TX_OPEN,
642                "no transaction is open in this session".to_string(),
643            ))?;
644            let ops_discarded = tx.write_set.len();
645            Ok(Value::Object(
646                [
647                    ("tx_id".to_string(), Value::Number(tx.tx_id as f64)),
648                    (
649                        "ops_discarded".to_string(),
650                        Value::Number(ops_discarded as f64),
651                    ),
652                ]
653                .into_iter()
654                .collect(),
655            ))
656        }
657
658        "version" => Ok(Value::Object(
659            [
660                (
661                    "version".to_string(),
662                    Value::String(env!("CARGO_PKG_VERSION").to_string()),
663                ),
664                (
665                    "protocol".to_string(),
666                    Value::String(PROTOCOL_VERSION.to_string()),
667                ),
668            ]
669            .into_iter()
670            .collect(),
671        )),
672
673        "health" => Ok(Value::Object(
674            [
675                ("ok".to_string(), Value::Bool(true)),
676                (
677                    "version".to_string(),
678                    Value::String(env!("CARGO_PKG_VERSION").to_string()),
679                ),
680            ]
681            .into_iter()
682            .collect(),
683        )),
684
685        "query" => {
686            let sql = params.get("sql").and_then(Value::as_str).ok_or((
687                error_code::INVALID_PARAMS,
688                "missing 'sql' string".to_string(),
689            ))?;
690
691            // Optional positional `$N` bind parameters (#353 tracer slice).
692            // Absence preserves the legacy single-arg `query(sql)` path.
693            let bind_values: Option<Vec<SchemaValue>> = params
694                .get("params")
695                .map(|v| {
696                    v.as_array()
697                        .ok_or((
698                            error_code::INVALID_PARAMS,
699                            "'params' must be an array".to_string(),
700                        ))
701                        .map(|arr| arr.iter().map(json_value_to_schema_value).collect())
702                })
703                .transpose()?;
704
705            if let Some(binds) = bind_values {
706                use crate::storage::query::modes::parse_multi;
707                use crate::storage::query::user_params;
708                let parsed =
709                    parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
710                let bound = user_params::bind(&parsed, &binds)
711                    .map_err(|e| (error_code::INVALID_PARAMS, e.to_string()))?;
712                let qr = runtime
713                    .execute_query_expr(bound)
714                    .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
715                return Ok(query_result_to_json(&qr));
716            }
717
718            let qr = runtime
719                .execute_query(sql)
720                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
721            Ok(query_result_to_json(&qr))
722        }
723
724        // ── Prepared statements ──────────────────────────────────────────────
725        //
726        // `prepare` parses the SQL once, extracts a parameterized shape, and
727        // returns a `prepared_id` the client can reuse. `execute_prepared` takes
728        // that id plus JSON-encoded bind values and runs the plan without parsing.
729        //
730        // This mirrors the PostgreSQL extended-query protocol semantics and is the
731        // server-side half of the client driver's `PreparedStatement` abstraction.
732        "prepare" => {
733            use crate::storage::query::modes::parse_multi;
734            use crate::storage::query::planner::shape::parameterize_query_expr;
735
736            let sql = params.get("sql").and_then(Value::as_str).ok_or((
737                error_code::INVALID_PARAMS,
738                "missing 'sql' string".to_string(),
739            ))?;
740            let parsed = parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
741            let (shape, parameter_count) = if let Some(prepared) = parameterize_query_expr(&parsed)
742            {
743                (prepared.shape, prepared.parameter_count)
744            } else {
745                (parsed, 0)
746            };
747            let id = session.next_prepared_id;
748            session.next_prepared_id = session.next_prepared_id.saturating_add(1);
749            session.prepared.insert(
750                id,
751                StdioPreparedStatement {
752                    shape,
753                    parameter_count,
754                },
755            );
756            Ok(Value::Object(
757                [
758                    ("prepared_id".to_string(), Value::Number(id as f64)),
759                    (
760                        "parameter_count".to_string(),
761                        Value::Number(parameter_count as f64),
762                    ),
763                ]
764                .into_iter()
765                .collect(),
766            ))
767        }
768
769        "execute_prepared" => {
770            use crate::storage::query::planner::shape::bind_parameterized_query;
771            use crate::storage::schema::Value as SV;
772
773            let id = params
774                .get("prepared_id")
775                .and_then(Value::as_f64)
776                .map(|n| n as u64)
777                .ok_or((
778                    error_code::INVALID_PARAMS,
779                    "missing 'prepared_id'".to_string(),
780                ))?;
781
782            let stmt = session.prepared.get(&id).ok_or((
783                error_code::QUERY_ERROR,
784                format!("no prepared statement with id {id}"),
785            ))?;
786
787            // Parse bind values from JSON array of JSON-encoded literals.
788            let binds_json: Vec<Value> = params
789                .get("binds")
790                .and_then(Value::as_array)
791                .map(|a| a.to_vec())
792                .unwrap_or_default();
793            if binds_json.len() != stmt.parameter_count {
794                return Err((
795                    error_code::INVALID_PARAMS,
796                    format!(
797                        "expected {} bind values, got {}",
798                        stmt.parameter_count,
799                        binds_json.len()
800                    ),
801                ));
802            }
803
804            // Convert JSON bind values to SchemaValue.
805            let binds: Vec<SV> = binds_json.iter().map(json_value_to_schema_value).collect();
806
807            // Bind literals into the parameterized shape.
808            let expr = if stmt.parameter_count == 0 {
809                stmt.shape.clone()
810            } else {
811                bind_parameterized_query(&stmt.shape, &binds, stmt.parameter_count)
812                    .ok_or((error_code::QUERY_ERROR, "bind failed".to_string()))?
813            };
814
815            let qr = runtime
816                .execute_query_expr(expr)
817                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
818            Ok(query_result_to_json(&qr))
819        }
820
821        "insert" => {
822            let collection = params.get("collection").and_then(Value::as_str).ok_or((
823                error_code::INVALID_PARAMS,
824                "missing 'collection' string".to_string(),
825            ))?;
826            let payload = params.get("payload").ok_or((
827                error_code::INVALID_PARAMS,
828                "missing 'payload' object".to_string(),
829            ))?;
830            let payload_obj = payload.as_object().ok_or((
831                error_code::INVALID_PARAMS,
832                "'payload' must be a JSON object".to_string(),
833            ))?;
834            let sql = build_insert_sql(collection, payload_obj.iter());
835
836            if let Some(tx) = session.current_tx_mut() {
837                tx.write_set.push(PendingSql::Insert(sql));
838                return Ok(pending_tx_response(tx.tx_id));
839            }
840
841            let qr = runtime
842                .execute_query(&sql)
843                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
844            Ok(insert_result_to_json(&qr))
845        }
846
847        "bulk_insert" => {
848            let collection = params.get("collection").and_then(Value::as_str).ok_or((
849                error_code::INVALID_PARAMS,
850                "missing 'collection' string".to_string(),
851            ))?;
852            let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
853                error_code::INVALID_PARAMS,
854                "missing 'payloads' array".to_string(),
855            ))?;
856
857            if let Some(tx) = session.current_tx_mut() {
858                let mut buffered: u64 = 0;
859                for entry in payloads {
860                    let obj = entry.as_object().ok_or((
861                        error_code::INVALID_PARAMS,
862                        "each payload must be a JSON object".to_string(),
863                    ))?;
864                    let sql = build_insert_sql(collection, obj.iter());
865                    tx.write_set.push(PendingSql::Insert(sql));
866                    buffered += 1;
867                }
868                let tx_id = tx.tx_id;
869                return Ok(Value::Object(
870                    [
871                        ("affected".to_string(), Value::Number(0.0)),
872                        ("buffered".to_string(), Value::Number(buffered as f64)),
873                        ("pending".to_string(), Value::Bool(true)),
874                        ("tx_id".to_string(), Value::Number(tx_id as f64)),
875                    ]
876                    .into_iter()
877                    .collect(),
878                ));
879            }
880
881            let mut total_affected: u64 = 0;
882            for entry in payloads {
883                let obj = entry.as_object().ok_or((
884                    error_code::INVALID_PARAMS,
885                    "each payload must be a JSON object".to_string(),
886                ))?;
887                let sql = build_insert_sql(collection, obj.iter());
888                let qr = runtime
889                    .execute_query(&sql)
890                    .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
891                total_affected += qr.affected_rows;
892            }
893            Ok(Value::Object(
894                [("affected".to_string(), Value::Number(total_affected as f64))]
895                    .into_iter()
896                    .collect(),
897            ))
898        }
899
900        "get" => {
901            let collection = params.get("collection").and_then(Value::as_str).ok_or((
902                error_code::INVALID_PARAMS,
903                "missing 'collection' string".to_string(),
904            ))?;
905            let id = params.get("id").and_then(Value::as_str).ok_or((
906                error_code::INVALID_PARAMS,
907                "missing 'id' string".to_string(),
908            ))?;
909            let sql = format!("SELECT * FROM {collection} WHERE _entity_id = {id} LIMIT 1");
910            let qr = runtime
911                .execute_query(&sql)
912                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
913            let entity = qr
914                .result
915                .records
916                .first()
917                .map(record_to_json_object)
918                .unwrap_or(Value::Null);
919            Ok(Value::Object(
920                [("entity".to_string(), entity)].into_iter().collect(),
921            ))
922        }
923
924        "delete" => {
925            let collection = params.get("collection").and_then(Value::as_str).ok_or((
926                error_code::INVALID_PARAMS,
927                "missing 'collection' string".to_string(),
928            ))?;
929            let id = params.get("id").and_then(Value::as_str).ok_or((
930                error_code::INVALID_PARAMS,
931                "missing 'id' string".to_string(),
932            ))?;
933            let sql = format!("DELETE FROM {collection} WHERE _entity_id = {id}");
934
935            if let Some(tx) = session.current_tx_mut() {
936                tx.write_set.push(PendingSql::Delete(sql));
937                return Ok(pending_tx_response(tx.tx_id));
938            }
939
940            let qr = runtime
941                .execute_query(&sql)
942                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
943            Ok(Value::Object(
944                [(
945                    "affected".to_string(),
946                    Value::Number(qr.affected_rows as f64),
947                )]
948                .into_iter()
949                .collect(),
950            ))
951        }
952
953        "close" => {
954            // Silently drop any open transaction and cursors on close.
955            // The client explicitly asked to terminate; surfacing an
956            // error here would leak state across what is effectively a
957            // reset.
958            let _ = session.take_tx();
959            session.clear_cursors();
960            let _ = runtime.checkpoint();
961            Ok(Value::Null)
962        }
963
964        // Auth surface — local stdio bridge has no auth backend
965        // (the spawned binary inherits the caller's privileges by
966        // construction). The remote bridge below maps these methods
967        // onto the gRPC server's auth endpoints.
968        "auth.login"
969        | "auth.whoami"
970        | "auth.change_password"
971        | "auth.create_api_key"
972        | "auth.revoke_api_key" => {
973            let _ = (session, params);
974            Err((
975                error_code::INVALID_REQUEST,
976                format!(
977                    "{method}: auth methods are only available on grpc:// connections; \
978                     embedded modes (memory://, file://) inherit caller privileges"
979                ),
980            ))
981        }
982
983        other => Err((
984            error_code::INVALID_REQUEST,
985            format!("unknown method: {other}"),
986        )),
987    }
988}
989
990// ---------------------------------------------------------------------------
991// Response builders
992// ---------------------------------------------------------------------------
993
994fn success_response(id: &Value, result: &Value, is_close: bool) -> String {
995    // For `close` we tag the response so the loop knows to exit after
996    // flushing. The tag is stripped from the wire by replacing it
997    // before serialization — actually we just include it as a sentinel
998    // field that drivers ignore (forward compat).
999    let mut envelope = json::Map::new();
1000    envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1001    envelope.insert("id".to_string(), id.clone());
1002    envelope.insert("result".to_string(), result.clone());
1003    if is_close {
1004        envelope.insert("__close__".to_string(), Value::Bool(true));
1005    }
1006    Value::Object(envelope).to_string_compact()
1007}
1008
1009fn error_response(id: &Value, code: &str, message: &str) -> String {
1010    let mut err = json::Map::new();
1011    err.insert("code".to_string(), Value::String(code.to_string()));
1012    err.insert("message".to_string(), Value::String(message.to_string()));
1013    err.insert("data".to_string(), Value::Null);
1014
1015    let mut envelope = json::Map::new();
1016    envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1017    envelope.insert("id".to_string(), id.clone());
1018    envelope.insert("error".to_string(), Value::Object(err));
1019    Value::Object(envelope).to_string_compact()
1020}
1021
1022// ---------------------------------------------------------------------------
1023// Helpers
1024// ---------------------------------------------------------------------------
1025
1026/// Envelope returned by `insert` and `delete` when the call was buffered
1027/// into an open transaction instead of being auto-committed.
1028fn pending_tx_response(tx_id: u64) -> Value {
1029    Value::Object(
1030        [
1031            ("affected".to_string(), Value::Number(0.0)),
1032            ("pending".to_string(), Value::Bool(true)),
1033            ("tx_id".to_string(), Value::Number(tx_id as f64)),
1034        ]
1035        .into_iter()
1036        .collect(),
1037    )
1038}
1039
1040pub(crate) fn build_insert_sql<'a, I>(collection: &str, fields: I) -> String
1041where
1042    I: Iterator<Item = (&'a String, &'a Value)>,
1043{
1044    let mut cols = Vec::new();
1045    let mut vals = Vec::new();
1046    for (k, v) in fields {
1047        cols.push(k.clone());
1048        vals.push(value_to_sql_literal(v));
1049    }
1050    format!(
1051        "INSERT INTO {collection} ({}) VALUES ({})",
1052        cols.join(", "),
1053        vals.join(", "),
1054    )
1055}
1056
1057pub(crate) fn value_to_sql_literal(v: &Value) -> String {
1058    match v {
1059        Value::Null => "NULL".to_string(),
1060        Value::Bool(b) => b.to_string(),
1061        Value::Number(n) => {
1062            if n.fract() == 0.0 {
1063                format!("{}", *n as i64)
1064            } else {
1065                n.to_string()
1066            }
1067        }
1068        Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1069        other => format!("'{}'", other.to_string_compact().replace('\'', "''")),
1070    }
1071}
1072
1073fn query_result_to_json(qr: &RuntimeQueryResult) -> Value {
1074    let mut envelope = json::Map::new();
1075    envelope.insert(
1076        "statement".to_string(),
1077        Value::String(qr.statement_type.to_string()),
1078    );
1079    envelope.insert(
1080        "affected".to_string(),
1081        Value::Number(qr.affected_rows as f64),
1082    );
1083
1084    let mut columns = Vec::new();
1085    if let Some(first) = qr.result.records.first() {
1086        let mut keys: Vec<String> = first
1087            .column_names()
1088            .into_iter()
1089            .map(|k| k.to_string())
1090            .collect();
1091        keys.sort();
1092        columns = keys.into_iter().map(Value::String).collect();
1093    }
1094    envelope.insert("columns".to_string(), Value::Array(columns));
1095
1096    let rows: Vec<Value> = qr
1097        .result
1098        .records
1099        .iter()
1100        .map(record_to_json_object)
1101        .collect();
1102    envelope.insert("rows".to_string(), Value::Array(rows));
1103
1104    Value::Object(envelope)
1105}
1106
1107pub(crate) fn insert_result_to_json(qr: &RuntimeQueryResult) -> Value {
1108    let mut envelope = json::Map::new();
1109    envelope.insert(
1110        "affected".to_string(),
1111        Value::Number(qr.affected_rows as f64),
1112    );
1113    // First row of the result, if any, contains the inserted entity id.
1114    if let Some(first) = qr.result.records.first() {
1115        if let Some(id_val) = first
1116            .iter_fields()
1117            .find(|(k, _)| {
1118                let s: &str = k;
1119                s == "_entity_id"
1120            })
1121            .map(|(_, v)| schema_value_to_json(v))
1122        {
1123            envelope.insert("id".to_string(), id_val);
1124        }
1125    }
1126    Value::Object(envelope)
1127}
1128
1129fn record_to_json_object(record: &UnifiedRecord) -> Value {
1130    let mut map = json::Map::new();
1131    // iter_fields merges the columnar fast-path + HashMap so scan
1132    // rows (columnar only) contribute their values.
1133    let mut entries: Vec<(&str, &SchemaValue)> =
1134        record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
1135    entries.sort_by(|a, b| a.0.cmp(b.0));
1136    for (k, v) in entries {
1137        map.insert(k.to_string(), schema_value_to_json(v));
1138    }
1139    Value::Object(map)
1140}
1141
1142fn schema_value_to_json(v: &SchemaValue) -> Value {
1143    match v {
1144        SchemaValue::Null => Value::Null,
1145        SchemaValue::Boolean(b) => Value::Bool(*b),
1146        SchemaValue::Integer(n) => Value::Number(*n as f64),
1147        SchemaValue::UnsignedInteger(n) => Value::Number(*n as f64),
1148        SchemaValue::Float(n) => Value::Number(*n),
1149        SchemaValue::BigInt(n) => Value::Number(*n as f64),
1150        SchemaValue::TimestampMs(n)
1151        | SchemaValue::Timestamp(n)
1152        | SchemaValue::Duration(n)
1153        | SchemaValue::Decimal(n) => Value::Number(*n as f64),
1154        SchemaValue::Password(_) | SchemaValue::Secret(_) => Value::String("***".to_string()),
1155        SchemaValue::Text(s) => Value::String(s.to_string()),
1156        SchemaValue::Email(s)
1157        | SchemaValue::Url(s)
1158        | SchemaValue::NodeRef(s)
1159        | SchemaValue::EdgeRef(s) => Value::String(s.clone()),
1160        other => Value::String(format!("{other}")),
1161    }
1162}
1163
1164/// Convert a JSON `Value` to a `SchemaValue` for use as a bind parameter
1165/// in a prepared statement. Mirrors PostgreSQL's implicit type coercion:
1166/// JSON numbers become `Float`, strings become `Text`, booleans map to
1167/// `Boolean`, and `null` becomes `Null`.
1168pub(crate) fn json_value_to_schema_value(v: &Value) -> SchemaValue {
1169    match v {
1170        Value::Null => SchemaValue::Null,
1171        Value::Bool(b) => SchemaValue::Boolean(*b),
1172        Value::Number(n) => {
1173            if n.fract() == 0.0 && n.abs() < i64::MAX as f64 {
1174                SchemaValue::Integer(*n as i64)
1175            } else {
1176                SchemaValue::Float(*n)
1177            }
1178        }
1179        Value::String(s) => SchemaValue::text(s.clone()),
1180        Value::Array(items) => {
1181            // Tracer for #355: a JSON array of numbers (or empty) is taken
1182            // as a `Value::Vector`. Mixed/non-number arrays fall back to
1183            // the JSON-string form so the binder can reject them with a
1184            // typed error when they land in a vector slot.
1185            if items.iter().all(|v| matches!(v, Value::Number(_))) {
1186                let floats: Vec<f32> = items
1187                    .iter()
1188                    .map(|v| v.as_f64().unwrap_or(0.0) as f32)
1189                    .collect();
1190                SchemaValue::Vector(floats)
1191            } else {
1192                SchemaValue::text(crate::json::to_string(v).unwrap_or_default())
1193            }
1194        }
1195        Value::Object(_) => SchemaValue::text(crate::json::to_string(v).unwrap_or_default()),
1196    }
1197}
1198
1199// ---------------------------------------------------------------------------
1200// Remote dispatch (grpc://)
1201// ---------------------------------------------------------------------------
1202
1203/// Dispatch a parsed JSON-RPC call over gRPC. Mirrors `dispatch_method`
1204/// but every operation goes through the tonic client. The server's
1205/// own `RedDBRuntime` does the actual work — we are just a wire
1206/// adapter between the JSON-RPC framing the drivers speak and the
1207/// gRPC protobuf framing the server speaks.
1208fn dispatch_method_remote(
1209    client: &AsyncMutex<RedDBClient>,
1210    tokio_rt: &tokio::runtime::Runtime,
1211    method: &str,
1212    params: &Value,
1213) -> Result<Value, (&'static str, String)> {
1214    match method {
1215        "version" => Ok(Value::Object(
1216            [
1217                (
1218                    "version".to_string(),
1219                    Value::String(env!("CARGO_PKG_VERSION").to_string()),
1220                ),
1221                (
1222                    "protocol".to_string(),
1223                    Value::String(PROTOCOL_VERSION.to_string()),
1224                ),
1225            ]
1226            .into_iter()
1227            .collect(),
1228        )),
1229
1230        "health" => {
1231            let result = tokio_rt.block_on(async {
1232                let mut guard = client.lock().await;
1233                guard.health_status().await
1234            });
1235            match result {
1236                Ok(status) => Ok(Value::Object(
1237                    [
1238                        ("ok".to_string(), Value::Bool(status.healthy)),
1239                        ("state".to_string(), Value::String(status.state)),
1240                        (
1241                            "checked_at_unix_ms".to_string(),
1242                            Value::Number(status.checked_at_unix_ms as f64),
1243                        ),
1244                        (
1245                            "version".to_string(),
1246                            Value::String(env!("CARGO_PKG_VERSION").to_string()),
1247                        ),
1248                    ]
1249                    .into_iter()
1250                    .collect(),
1251                )),
1252                Err(e) => Err((error_code::INTERNAL_ERROR, e.to_string())),
1253            }
1254        }
1255
1256        "query" => {
1257            let sql = params.get("sql").and_then(Value::as_str).ok_or((
1258                error_code::INVALID_PARAMS,
1259                "missing 'sql' string".to_string(),
1260            ))?;
1261            let json_str = tokio_rt
1262                .block_on(async {
1263                    let mut guard = client.lock().await;
1264                    guard.query(sql).await
1265                })
1266                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1267            // Server returned its own QueryReply.result_json. Parse and
1268            // repackage into the stdio-protocol shape. If parsing fails,
1269            // hand the raw server JSON back under a sentinel key so the
1270            // caller still gets something useful.
1271            let parsed = json::from_str::<Value>(&json_str)
1272                .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1273            Ok(parsed)
1274        }
1275
1276        "insert" => {
1277            let collection = params.get("collection").and_then(Value::as_str).ok_or((
1278                error_code::INVALID_PARAMS,
1279                "missing 'collection' string".to_string(),
1280            ))?;
1281            let payload = params.get("payload").ok_or((
1282                error_code::INVALID_PARAMS,
1283                "missing 'payload' object".to_string(),
1284            ))?;
1285            if payload.as_object().is_none() {
1286                return Err((
1287                    error_code::INVALID_PARAMS,
1288                    "'payload' must be a JSON object".to_string(),
1289                ));
1290            }
1291            let payload_json = payload.to_string_compact();
1292            let reply = tokio_rt
1293                .block_on(async {
1294                    let mut guard = client.lock().await;
1295                    guard.create_row_entity(collection, &payload_json).await
1296                })
1297                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1298            let mut out = json::Map::new();
1299            out.insert("affected".to_string(), Value::Number(1.0));
1300            out.insert("id".to_string(), Value::String(reply.id.to_string()));
1301            Ok(Value::Object(out))
1302        }
1303
1304        "bulk_insert" => {
1305            let collection = params.get("collection").and_then(Value::as_str).ok_or((
1306                error_code::INVALID_PARAMS,
1307                "missing 'collection' string".to_string(),
1308            ))?;
1309            let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
1310                error_code::INVALID_PARAMS,
1311                "missing 'payloads' array".to_string(),
1312            ))?;
1313            let mut encoded = Vec::with_capacity(payloads.len());
1314            for entry in payloads {
1315                if entry.as_object().is_none() {
1316                    return Err((
1317                        error_code::INVALID_PARAMS,
1318                        "each payload must be a JSON object".to_string(),
1319                    ));
1320                }
1321                encoded.push(entry.to_string_compact());
1322            }
1323            let total = tokio_rt
1324                .block_on(async {
1325                    let mut guard = client.lock().await;
1326                    guard.bulk_create_rows(collection, encoded).await
1327                })
1328                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?
1329                .count;
1330            Ok(Value::Object(
1331                [("affected".to_string(), Value::Number(total as f64))]
1332                    .into_iter()
1333                    .collect(),
1334            ))
1335        }
1336
1337        "get" => {
1338            let collection = params.get("collection").and_then(Value::as_str).ok_or((
1339                error_code::INVALID_PARAMS,
1340                "missing 'collection' string".to_string(),
1341            ))?;
1342            let id = params.get("id").and_then(Value::as_str).ok_or((
1343                error_code::INVALID_PARAMS,
1344                "missing 'id' string".to_string(),
1345            ))?;
1346            let sql = format!("SELECT * FROM {collection} WHERE _entity_id = {id} LIMIT 1");
1347            let json_str = tokio_rt
1348                .block_on(async {
1349                    let mut guard = client.lock().await;
1350                    guard.query(&sql).await
1351                })
1352                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1353            let parsed = json::from_str::<Value>(&json_str)
1354                .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1355            // Server response shape: {"rows":[{...}], ...}. Extract
1356            // the first row (if any) as `entity`.
1357            let entity = parsed
1358                .get("rows")
1359                .and_then(Value::as_array)
1360                .and_then(|rows| rows.first().cloned())
1361                .unwrap_or(Value::Null);
1362            Ok(Value::Object(
1363                [("entity".to_string(), entity)].into_iter().collect(),
1364            ))
1365        }
1366
1367        "delete" => {
1368            let collection = params.get("collection").and_then(Value::as_str).ok_or((
1369                error_code::INVALID_PARAMS,
1370                "missing 'collection' string".to_string(),
1371            ))?;
1372            let id = params.get("id").and_then(Value::as_str).ok_or((
1373                error_code::INVALID_PARAMS,
1374                "missing 'id' string".to_string(),
1375            ))?;
1376            let id = id.parse::<u64>().map_err(|_| {
1377                (
1378                    error_code::INVALID_PARAMS,
1379                    "id must be a numeric string".to_string(),
1380                )
1381            })?;
1382            let _reply = tokio_rt
1383                .block_on(async {
1384                    let mut guard = client.lock().await;
1385                    guard.delete_entity(collection, id).await
1386                })
1387                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1388            Ok(Value::Object(
1389                [("affected".to_string(), Value::Number(1.0))]
1390                    .into_iter()
1391                    .collect(),
1392            ))
1393        }
1394
1395        "close" => Ok(Value::Null),
1396
1397        other => Err((
1398            error_code::INVALID_REQUEST,
1399            format!("unknown method: {other}"),
1400        )),
1401    }
1402}
1403
1404#[cfg(test)]
1405mod tests {
1406    use super::*;
1407
1408    fn make_runtime() -> RedDBRuntime {
1409        RedDBRuntime::in_memory().expect("in-memory runtime")
1410    }
1411
1412    fn handle(rt: &RedDBRuntime, line: &str) -> String {
1413        let mut session = Session::new();
1414        handle_line(&Backend::Local(rt), &mut session, line)
1415    }
1416
1417    /// Stateful helper: keeps the same `Session` across multiple calls so
1418    /// tests can exercise multi-step transaction flows in a single closure.
1419    fn with_session<F>(rt: &RedDBRuntime, f: F)
1420    where
1421        F: FnOnce(&dyn Fn(&str) -> String, &RedDBRuntime),
1422    {
1423        let session = std::cell::RefCell::new(Session::new());
1424        let call = |line: &str| -> String {
1425            let mut s = session.borrow_mut();
1426            handle_line(&Backend::Local(rt), &mut s, line)
1427        };
1428        f(&call, rt);
1429    }
1430
1431    #[test]
1432    fn version_method_returns_version_and_protocol() {
1433        let rt = make_runtime();
1434        let line = r#"{"jsonrpc":"2.0","id":1,"method":"version","params":{}}"#;
1435        let resp = handle(&rt, line);
1436        assert!(resp.contains("\"id\":1"));
1437        assert!(resp.contains("\"protocol\":\"1.0\""));
1438        assert!(resp.contains("\"version\""));
1439    }
1440
1441    #[test]
1442    fn health_method_returns_ok_true() {
1443        let rt = make_runtime();
1444        let resp = handle(
1445            &rt,
1446            r#"{"jsonrpc":"2.0","id":"abc","method":"health","params":{}}"#,
1447        );
1448        assert!(resp.contains("\"ok\":true"));
1449        assert!(resp.contains("\"id\":\"abc\""));
1450    }
1451
1452    #[test]
1453    fn parse_error_for_invalid_json() {
1454        let rt = make_runtime();
1455        let resp = handle(&rt, "not json {");
1456        assert!(resp.contains("\"code\":\"PARSE_ERROR\""));
1457        assert!(resp.contains("\"id\":null"));
1458    }
1459
1460    #[test]
1461    fn invalid_request_when_method_missing() {
1462        let rt = make_runtime();
1463        let resp = handle(&rt, r#"{"jsonrpc":"2.0","id":1,"params":{}}"#);
1464        assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
1465    }
1466
1467    #[test]
1468    fn unknown_method_is_invalid_request() {
1469        let rt = make_runtime();
1470        let resp = handle(
1471            &rt,
1472            r#"{"jsonrpc":"2.0","id":1,"method":"frobnicate","params":{}}"#,
1473        );
1474        assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
1475        assert!(resp.contains("frobnicate"));
1476    }
1477
1478    #[test]
1479    fn invalid_params_when_query_sql_missing() {
1480        let rt = make_runtime();
1481        let resp = handle(
1482            &rt,
1483            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{}}"#,
1484        );
1485        assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
1486    }
1487
1488    #[test]
1489    fn close_method_marks_response_for_shutdown() {
1490        let rt = make_runtime();
1491        let resp = handle(
1492            &rt,
1493            r#"{"jsonrpc":"2.0","id":1,"method":"close","params":{}}"#,
1494        );
1495        assert!(resp.contains("\"__close__\":true"));
1496    }
1497
1498    #[test]
1499    fn query_with_int_text_params_round_trips() {
1500        let rt = make_runtime();
1501        let _ = handle(
1502            &rt,
1503            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE p (id INTEGER, name TEXT)"}}"#,
1504        );
1505        let _ = handle(
1506            &rt,
1507            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (1, 'Alice')"}}"#,
1508        );
1509        let _ = handle(
1510            &rt,
1511            r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (2, 'Bob')"}}"#,
1512        );
1513        let resp = handle(
1514            &rt,
1515            r#"{"jsonrpc":"2.0","id":4,"method":"query","params":{"sql":"SELECT * FROM p WHERE id = $1 AND name = $2","params":[1,"Alice"]}}"#,
1516        );
1517        assert!(resp.contains("\"Alice\""), "got: {resp}");
1518        assert!(!resp.contains("\"Bob\""), "got: {resp}");
1519    }
1520
1521    #[test]
1522    fn query_with_params_arity_mismatch_rejected() {
1523        let rt = make_runtime();
1524        let _ = handle(
1525            &rt,
1526            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pa (id INTEGER)"}}"#,
1527        );
1528        let resp = handle(
1529            &rt,
1530            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pa WHERE id = $1","params":[1,2]}}"#,
1531        );
1532        assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
1533    }
1534
1535    #[test]
1536    fn query_with_params_gap_rejected() {
1537        let rt = make_runtime();
1538        let _ = handle(
1539            &rt,
1540            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pg (a INTEGER, b INTEGER)"}}"#,
1541        );
1542        let resp = handle(
1543            &rt,
1544            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pg WHERE a = $1 AND b = $3","params":[1,2,3]}}"#,
1545        );
1546        assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
1547    }
1548
1549    #[test]
1550    fn query_select_one_returns_rows() {
1551        let rt = make_runtime();
1552        let resp = handle(
1553            &rt,
1554            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"SELECT 1 AS one"}}"#,
1555        );
1556        assert!(resp.contains("\"result\""));
1557        assert!(!resp.contains("\"error\""));
1558    }
1559
1560    // -----------------------------------------------------------------
1561    // Transaction tests
1562    // -----------------------------------------------------------------
1563
1564    #[test]
1565    fn tx_begin_returns_tx_id_and_isolation() {
1566        let rt = make_runtime();
1567        with_session(&rt, |call, _| {
1568            let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1569            assert!(resp.contains("\"tx_id\":1"));
1570            assert!(resp.contains("\"isolation\":\"read_committed_deferred\""));
1571            assert!(!resp.contains("\"error\""));
1572        });
1573    }
1574
1575    #[test]
1576    fn tx_begin_twice_returns_already_open() {
1577        let rt = make_runtime();
1578        with_session(&rt, |call, _| {
1579            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1580            let resp = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
1581            assert!(resp.contains("\"code\":\"TX_ALREADY_OPEN\""));
1582        });
1583    }
1584
1585    #[test]
1586    fn tx_commit_without_begin_returns_no_tx_open() {
1587        let rt = make_runtime();
1588        with_session(&rt, |call, _| {
1589            let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.commit","params":null}"#);
1590            assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
1591        });
1592    }
1593
1594    #[test]
1595    fn tx_rollback_without_begin_returns_no_tx_open() {
1596        let rt = make_runtime();
1597        with_session(&rt, |call, _| {
1598            let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.rollback","params":null}"#);
1599            assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
1600        });
1601    }
1602
1603    #[test]
1604    fn insert_inside_tx_returns_pending_envelope() {
1605        let rt = make_runtime();
1606        // Create the collection first (outside any tx).
1607        let _ = handle(
1608            &rt,
1609            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE users (name TEXT)"}}"#,
1610        );
1611        with_session(&rt, |call, _| {
1612            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1613            let resp = call(
1614                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"users","payload":{"name":"alice"}}}"#,
1615            );
1616            assert!(resp.contains("\"pending\":true"));
1617            assert!(resp.contains("\"tx_id\":1"));
1618            assert!(resp.contains("\"affected\":0"));
1619        });
1620    }
1621
1622    #[test]
1623    fn begin_insert_rollback_does_not_persist() {
1624        let rt = make_runtime();
1625        let _ = handle(
1626            &rt,
1627            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u (name TEXT)"}}"#,
1628        );
1629        with_session(&rt, |call, _| {
1630            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1631            let _ = call(
1632                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u","payload":{"name":"ghost"}}}"#,
1633            );
1634            let rollback = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
1635            assert!(rollback.contains("\"ops_discarded\":1"));
1636            assert!(rollback.contains("\"tx_id\":1"));
1637        });
1638        // After rollback, the row must not be visible to a fresh query.
1639        let resp = handle(
1640            &rt,
1641            r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u"}}"#,
1642        );
1643        assert!(!resp.contains("\"ghost\""));
1644    }
1645
1646    #[test]
1647    fn begin_insert_commit_persists() {
1648        let rt = make_runtime();
1649        let _ = handle(
1650            &rt,
1651            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u2 (name TEXT)"}}"#,
1652        );
1653        with_session(&rt, |call, _| {
1654            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1655            let _ = call(
1656                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u2","payload":{"name":"alice"}}}"#,
1657            );
1658            let _ = call(
1659                r#"{"jsonrpc":"2.0","id":3,"method":"insert","params":{"collection":"u2","payload":{"name":"bob"}}}"#,
1660            );
1661            let commit = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.commit","params":null}"#);
1662            assert!(commit.contains("\"ops_replayed\":2"));
1663            assert!(!commit.contains("\"error\""));
1664        });
1665        let resp = handle(
1666            &rt,
1667            r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u2"}}"#,
1668        );
1669        assert!(resp.contains("\"alice\""));
1670        assert!(resp.contains("\"bob\""));
1671    }
1672
1673    #[test]
1674    fn bulk_insert_inside_tx_buffers_everything() {
1675        let rt = make_runtime();
1676        let _ = handle(
1677            &rt,
1678            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u3 (name TEXT)"}}"#,
1679        );
1680        with_session(&rt, |call, _| {
1681            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1682            let resp = call(
1683                r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"u3","payloads":[{"name":"a"},{"name":"b"},{"name":"c"}]}}"#,
1684            );
1685            assert!(resp.contains("\"buffered\":3"));
1686            assert!(resp.contains("\"pending\":true"));
1687            assert!(resp.contains("\"affected\":0"));
1688
1689            let commit = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
1690            assert!(commit.contains("\"ops_replayed\":3"));
1691        });
1692    }
1693
1694    #[test]
1695    fn delete_inside_tx_is_buffered() {
1696        let rt = make_runtime();
1697        // Seed two rows outside any tx.
1698        let _ = handle(
1699            &rt,
1700            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u4 (name TEXT)"}}"#,
1701        );
1702        let _ = handle(
1703            &rt,
1704            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO u4 (name) VALUES ('keep')"}}"#,
1705        );
1706        with_session(&rt, |call, _| {
1707            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1708            let resp = call(
1709                r#"{"jsonrpc":"2.0","id":2,"method":"delete","params":{"collection":"u4","id":"1"}}"#,
1710            );
1711            assert!(resp.contains("\"pending\":true"));
1712            let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
1713        });
1714        // Row should still be present after rollback of the delete.
1715        let resp = handle(
1716            &rt,
1717            r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u4"}}"#,
1718        );
1719        assert!(resp.contains("\"keep\""));
1720    }
1721
1722    #[test]
1723    fn close_with_open_tx_auto_rollbacks() {
1724        let rt = make_runtime();
1725        let _ = handle(
1726            &rt,
1727            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u5 (name TEXT)"}}"#,
1728        );
1729        with_session(&rt, |call, _| {
1730            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1731            let _ = call(
1732                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u5","payload":{"name":"ghost"}}}"#,
1733            );
1734            let close = call(r#"{"jsonrpc":"2.0","id":3,"method":"close","params":null}"#);
1735            assert!(close.contains("\"__close__\":true"));
1736            assert!(!close.contains("\"error\""));
1737        });
1738        let resp = handle(
1739            &rt,
1740            r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u5"}}"#,
1741        );
1742        assert!(!resp.contains("\"ghost\""));
1743    }
1744
1745    // -----------------------------------------------------------------
1746    // Cursor streaming tests
1747    // -----------------------------------------------------------------
1748
1749    fn seed_numbers_table(rt: &RedDBRuntime, table: &str, count: u32) {
1750        let _ = handle(
1751            rt,
1752            &format!(
1753                r#"{{"jsonrpc":"2.0","id":1,"method":"query","params":{{"sql":"CREATE TABLE {table} (n INTEGER)"}}}}"#,
1754            ),
1755        );
1756        for i in 0..count {
1757            let _ = handle(
1758                rt,
1759                &format!(
1760                    r#"{{"jsonrpc":"2.0","id":2,"method":"query","params":{{"sql":"INSERT INTO {table} (n) VALUES ({i})"}}}}"#,
1761                ),
1762            );
1763        }
1764    }
1765
1766    #[test]
1767    fn cursor_open_returns_id_columns_and_total() {
1768        let rt = make_runtime();
1769        seed_numbers_table(&rt, "nums1", 3);
1770        with_session(&rt, |call, _| {
1771            let resp = call(
1772                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums1"}}"#,
1773            );
1774            assert!(resp.contains("\"cursor_id\":1"));
1775            assert!(resp.contains("\"total_rows\":3"));
1776            assert!(resp.contains("\"columns\""));
1777            assert!(!resp.contains("\"error\""));
1778        });
1779    }
1780
1781    #[test]
1782    fn cursor_next_chunks_rows_and_signals_done() {
1783        let rt = make_runtime();
1784        seed_numbers_table(&rt, "nums2", 5);
1785        with_session(&rt, |call, _| {
1786            let _ = call(
1787                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums2"}}"#,
1788            );
1789            let first = call(
1790                r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
1791            );
1792            assert!(first.contains("\"done\":false"));
1793            assert!(first.contains("\"remaining\":3"));
1794
1795            let second = call(
1796                r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
1797            );
1798            assert!(second.contains("\"done\":false"));
1799            assert!(second.contains("\"remaining\":1"));
1800
1801            let third = call(
1802                r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
1803            );
1804            assert!(third.contains("\"done\":true"));
1805            assert!(third.contains("\"remaining\":0"));
1806        });
1807    }
1808
1809    #[test]
1810    fn cursor_auto_drops_when_exhausted() {
1811        let rt = make_runtime();
1812        seed_numbers_table(&rt, "nums3", 2);
1813        with_session(&rt, |call, _| {
1814            let _ = call(
1815                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums3"}}"#,
1816            );
1817            let _ = call(
1818                r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
1819            );
1820            // Cursor was auto-dropped after done=true; subsequent next
1821            // must error with CURSOR_NOT_FOUND.
1822            let resp = call(
1823                r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
1824            );
1825            assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1826        });
1827    }
1828
1829    #[test]
1830    fn cursor_close_removes_it() {
1831        let rt = make_runtime();
1832        seed_numbers_table(&rt, "nums4", 3);
1833        with_session(&rt, |call, _| {
1834            let _ = call(
1835                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums4"}}"#,
1836            );
1837            let close =
1838                call(r#"{"jsonrpc":"2.0","id":2,"method":"query.close","params":{"cursor_id":1}}"#);
1839            assert!(close.contains("\"closed\":true"));
1840            let after = call(
1841                r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
1842            );
1843            assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1844        });
1845    }
1846
1847    #[test]
1848    fn cursor_close_unknown_errors() {
1849        let rt = make_runtime();
1850        with_session(&rt, |call, _| {
1851            let resp = call(
1852                r#"{"jsonrpc":"2.0","id":1,"method":"query.close","params":{"cursor_id":9999}}"#,
1853            );
1854            assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1855        });
1856    }
1857
1858    #[test]
1859    fn cursor_next_without_cursor_id_errors() {
1860        let rt = make_runtime();
1861        with_session(&rt, |call, _| {
1862            let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"query.next","params":{}}"#);
1863            assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
1864        });
1865    }
1866
1867    #[test]
1868    fn cursor_default_batch_size_returns_all_when_smaller_than_default() {
1869        let rt = make_runtime();
1870        seed_numbers_table(&rt, "nums5", 7);
1871        with_session(&rt, |call, _| {
1872            let _ = call(
1873                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums5"}}"#,
1874            );
1875            // No batch_size → default 100, table has 7 rows, all in one call.
1876            let resp =
1877                call(r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1}}"#);
1878            assert!(resp.contains("\"done\":true"));
1879            assert!(resp.contains("\"remaining\":0"));
1880        });
1881    }
1882
1883    #[test]
1884    fn close_method_drops_open_cursors() {
1885        let rt = make_runtime();
1886        seed_numbers_table(&rt, "nums6", 3);
1887        // Single session: open a cursor, call close, verify cursor is gone by reopening
1888        // fresh session and attempting to use cursor_id 1.
1889        with_session(&rt, |call, _| {
1890            let _ = call(
1891                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums6"}}"#,
1892            );
1893            let close = call(r#"{"jsonrpc":"2.0","id":2,"method":"close","params":null}"#);
1894            assert!(close.contains("\"__close__\":true"));
1895            // Cursor must be gone after close within the same session.
1896            let after = call(
1897                r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
1898            );
1899            assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1900        });
1901    }
1902
1903    #[test]
1904    fn cursor_independent_of_transaction_state() {
1905        let rt = make_runtime();
1906        seed_numbers_table(&rt, "nums7", 4);
1907        with_session(&rt, |call, _| {
1908            // Open cursor, begin tx, commit tx — cursor survives.
1909            let _ = call(
1910                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums7"}}"#,
1911            );
1912            let _ = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
1913            let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
1914            let resp = call(
1915                r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
1916            );
1917            assert!(resp.contains("\"done\":true"));
1918            assert!(!resp.contains("\"error\""));
1919        });
1920    }
1921
1922    #[test]
1923    fn second_tx_after_commit_gets_fresh_id() {
1924        let rt = make_runtime();
1925        let _ = handle(
1926            &rt,
1927            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u6 (name TEXT)"}}"#,
1928        );
1929        with_session(&rt, |call, _| {
1930            let first = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1931            assert!(first.contains("\"tx_id\":1"));
1932            let _ = call(
1933                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u6","payload":{"name":"x"}}}"#,
1934            );
1935            let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
1936
1937            let second = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.begin","params":null}"#);
1938            assert!(second.contains("\"tx_id\":2"));
1939            let _ = call(r#"{"jsonrpc":"2.0","id":5,"method":"tx.rollback","params":null}"#);
1940        });
1941    }
1942
1943    #[test]
1944    fn prepare_and_execute_prepared_statement() {
1945        let rt = make_runtime();
1946        // Create table + insert a row
1947        let _ = handle(
1948            &rt,
1949            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE ps_test (n INTEGER)"}}"#,
1950        );
1951        let _ = handle(
1952            &rt,
1953            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO ps_test (n) VALUES (42)"}}"#,
1954        );
1955
1956        with_session(&rt, |call, _| {
1957            // Prepare a parameterized SELECT.
1958            let prep = call(
1959                r#"{"jsonrpc":"2.0","id":3,"method":"prepare","params":{"sql":"SELECT n FROM ps_test WHERE n = 42"}}"#,
1960            );
1961            assert!(prep.contains("\"prepared_id\""), "prepare response: {prep}");
1962
1963            // Extract the prepared_id.
1964            let id: u64 = {
1965                let v: crate::json::Value = crate::json::from_str(&prep).expect("json");
1966                let result = v.get("result").expect("result");
1967                result
1968                    .get("prepared_id")
1969                    .and_then(|n| n.as_f64())
1970                    .expect("prepared_id") as u64
1971            };
1972
1973            // Execute with the bind value for the parameterized literal.
1974            let exec = call(&format!(
1975                r#"{{"jsonrpc":"2.0","id":4,"method":"execute_prepared","params":{{"prepared_id":{id},"binds":[42]}}}}"#
1976            ));
1977            // Response uses "rows" key (see query_result_to_json).
1978            assert!(
1979                exec.contains("\"rows\""),
1980                "execute_prepared response: {exec}"
1981            );
1982            assert!(exec.contains("42"), "expected row with n=42 in: {exec}");
1983        });
1984    }
1985}