Skip to main content

reddb_server/
rpc_stdio.rs

1//! JSON-RPC 2.0 line-delimited stdio mode for the `red` binary.
2//!
3//! See `PLAN_DRIVERS.md` for the protocol spec. This module is the
4//! sole server-side implementation of the protocol — drivers in
5//! every language target this contract.
6//!
7//! Loop:
8//!   1. Read a line from stdin (UTF-8, terminated by `\n`).
9//!   2. Parse it as a JSON-RPC 2.0 request envelope.
10//!   3. Dispatch on `method` to the runtime.
11//!   4. Serialize the response as a single line on stdout, flush.
12//!   5. Repeat until EOF or `close` method received.
13//!
14//! Errors do not crash the loop. Panics inside a method handler are
15//! caught and reported as `INTERNAL_ERROR` so a buggy query cannot
16//! kill the daemon.
17
18use std::io::{BufRead, BufReader, Stdin, Write};
19use std::panic::AssertUnwindSafe;
20
21use tokio::sync::Mutex as AsyncMutex;
22
23use crate::json::{self as json, Value};
24use crate::runtime::{RedDBRuntime, RuntimeQueryResult};
25use crate::storage::query::unified::UnifiedRecord;
26use crate::storage::schema::Value as SchemaValue;
27use reddb_client_connector::RedDBClient;
28
29/// Which backend the stdio loop is wrapping.
30///
31/// `Local` = the in-process engine (embedded). `Remote` = a tonic client
32/// to a standalone `red server` talking gRPC. The remote variant is
33/// boxed because `RedDBClient` + a `tokio::Runtime` reference is ~248
34/// bytes against `Local`'s ~8 bytes (clippy::large_enum_variant).
35///
36/// The mutex uses `tokio::sync::Mutex` instead of `std::sync::Mutex`
37/// because `dispatch_method_remote` holds the guard across `.await`
38/// points inside `tokio_rt.block_on(...)` — holding a sync mutex
39/// across an await would be a correctness bug in more complex
40/// runtimes.
41enum Backend<'a> {
42    Local(&'a RedDBRuntime),
43    Remote(Box<RemoteBackend<'a>>),
44}
45
46struct RemoteBackend<'a> {
47    client: AsyncMutex<RedDBClient>,
48    tokio_rt: &'a tokio::runtime::Runtime,
49}
50
51/// Protocol version reported by the `version` method.
52pub const PROTOCOL_VERSION: &str = "1.0";
53
54/// Stable error codes. Drivers map these to idiomatic exceptions.
55pub mod error_code {
56    pub const PARSE_ERROR: &str = "PARSE_ERROR";
57    pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
58    pub const INVALID_PARAMS: &str = "INVALID_PARAMS";
59    pub const QUERY_ERROR: &str = "QUERY_ERROR";
60    pub const NOT_FOUND: &str = "NOT_FOUND";
61    pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
62    /// `tx.begin` was called while a transaction was already open in the
63    /// same session.
64    pub const TX_ALREADY_OPEN: &str = "TX_ALREADY_OPEN";
65    /// `tx.commit` / `tx.rollback` was called without a matching
66    /// `tx.begin`.
67    pub const NO_TX_OPEN: &str = "NO_TX_OPEN";
68    /// A buffered statement failed during `tx.commit` replay. The error
69    /// message carries the index of the failing op and the number of
70    /// operations that successfully applied before the failure.
71    pub const TX_REPLAY_FAILED: &str = "TX_REPLAY_FAILED";
72    /// Transactions over the remote gRPC proxy are not supported yet.
73    pub const TX_NOT_SUPPORTED_REMOTE: &str = "TX_NOT_SUPPORTED_REMOTE";
74    /// `query.next` / `query.close` referenced an unknown cursor id.
75    /// Either the cursor was never opened, already closed, or was
76    /// automatically dropped when its rows were exhausted.
77    pub const CURSOR_NOT_FOUND: &str = "CURSOR_NOT_FOUND";
78    /// Too many concurrent cursors open in a single session.
79    pub const CURSOR_LIMIT_EXCEEDED: &str = "CURSOR_LIMIT_EXCEEDED";
80}
81
82/// Maximum number of cursors a single stdio session may hold open
83/// simultaneously. Serves as a memory-pressure guard against runaway
84/// clients that `query.open` without ever closing.
85pub(crate) const MAX_CURSORS_PER_SESSION: usize = 64;
86/// Default batch size for `query.next` when the client does not specify
87/// one explicitly. Tuned for small-to-medium rows; large-row clients
88/// should set a smaller value.
89pub(crate) const DEFAULT_CURSOR_BATCH_SIZE: usize = 100;
90/// Hard upper bound on `query.next` batch size. Prevents a single call
91/// from stalling the stdio loop with a multi-megabyte line.
92pub(crate) const MAX_CURSOR_BATCH_SIZE: usize = 10_000;
93
94// ---------------------------------------------------------------------------
95// Session state (transaction buffer)
96// ---------------------------------------------------------------------------
97//
98// Transactions in the stdio protocol are scoped to a single connection —
99// one process = one session = at most one open transaction. The state
100// lives in the stack of `run_backend` so nothing leaks between
101// connections, and there is no cross-session visibility of buffered
102// writes.
103//
104// Isolation model: `read_committed_deferred`. Reads inside a transaction
105// observe the latest *committed* state; they do **not** see writes the
106// same session has buffered via `insert` / `delete` / `bulk_insert`.
107// Atomicity is best-effort — a global commit lock serializes replays, but
108// auto-committed writes from other sessions may interleave between
109// commits. Strict atomicity requires funnelling every write through a
110// single session.
111
112/// Per-connection session that tracks the currently open transaction
113/// and any active streaming cursors.
114// A server-side prepared statement bound to this session.
115// When parameter_count == 0, shape == the exact plan (no substitution needed).
116struct StdioPreparedStatement {
117    shape: crate::storage::query::ast::QueryExpr,
118    parameter_count: usize,
119}
120
121pub(crate) struct Session {
122    next_tx_id: u64,
123    current_tx: Option<OpenTx>,
124    next_cursor_id: u64,
125    cursors: std::collections::HashMap<u64, Cursor>,
126    /// Monotone counter for prepared statement IDs within this session.
127    next_prepared_id: u64,
128    /// Active prepared statements, keyed by the ID returned to the client.
129    prepared: std::collections::HashMap<u64, StdioPreparedStatement>,
130}
131
132impl Session {
133    pub(crate) fn new() -> Self {
134        Self {
135            next_tx_id: 1,
136            current_tx: None,
137            next_cursor_id: 1,
138            cursors: std::collections::HashMap::new(),
139            next_prepared_id: 1,
140            prepared: std::collections::HashMap::new(),
141        }
142    }
143
144    fn open_tx(&mut self) -> Result<u64, (&'static str, String)> {
145        if let Some(tx) = &self.current_tx {
146            return Err((
147                error_code::TX_ALREADY_OPEN,
148                format!("transaction {} already open in this session", tx.tx_id),
149            ));
150        }
151        let tx_id = self.next_tx_id;
152        self.next_tx_id = self.next_tx_id.saturating_add(1);
153        self.current_tx = Some(OpenTx {
154            tx_id,
155            write_set: Vec::new(),
156        });
157        Ok(tx_id)
158    }
159
160    fn take_tx(&mut self) -> Option<OpenTx> {
161        self.current_tx.take()
162    }
163
164    fn current_tx_mut(&mut self) -> Option<&mut OpenTx> {
165        self.current_tx.as_mut()
166    }
167
168    #[allow(dead_code)]
169    fn has_tx(&self) -> bool {
170        self.current_tx.is_some()
171    }
172
173    /// Register a freshly materialised cursor and return its id.
174    /// Enforces [`MAX_CURSORS_PER_SESSION`] before allocating.
175    fn insert_cursor(&mut self, cursor: Cursor) -> Result<u64, (&'static str, String)> {
176        if self.cursors.len() >= MAX_CURSORS_PER_SESSION {
177            return Err((
178                error_code::CURSOR_LIMIT_EXCEEDED,
179                format!(
180                    "session already holds {} cursors (max {}) — close some before opening new ones",
181                    self.cursors.len(),
182                    MAX_CURSORS_PER_SESSION
183                ),
184            ));
185        }
186        let id = self.next_cursor_id;
187        self.next_cursor_id = self.next_cursor_id.saturating_add(1);
188        let mut cursor = cursor;
189        cursor.cursor_id = id;
190        self.cursors.insert(id, cursor);
191        Ok(id)
192    }
193
194    fn cursor_mut(&mut self, id: u64) -> Option<&mut Cursor> {
195        self.cursors.get_mut(&id)
196    }
197
198    fn drop_cursor(&mut self, id: u64) -> Option<Cursor> {
199        self.cursors.remove(&id)
200    }
201
202    fn clear_cursors(&mut self) {
203        self.cursors.clear();
204    }
205}
206
207impl Default for Session {
208    fn default() -> Self {
209        Self::new()
210    }
211}
212
213/// An in-flight transaction for a single stdio session.
214struct OpenTx {
215    tx_id: u64,
216    write_set: Vec<PendingSql>,
217}
218
219/// A buffered mutation waiting for `tx.commit`. Each variant carries a
220/// ready-to-execute SQL string so the replay loop is a straight
221/// `execute_query` call.
222enum PendingSql {
223    Insert(String),
224    Delete(String),
225    #[allow(dead_code)] // reserved for future query()-in-tx routing
226    Update(String),
227}
228
229impl PendingSql {
230    fn sql(&self) -> &str {
231        match self {
232            PendingSql::Insert(s) | PendingSql::Delete(s) | PendingSql::Update(s) => s,
233        }
234    }
235}
236
237/// An open streaming cursor over a materialised query result.
238///
239/// MVP model: the underlying [`RuntimeQueryResult`] has already been
240/// fully executed at `query.open` time and lives inside the cursor.
241/// Each `query.next` call slices off `batch_size` rows from the tail and
242/// advances `position`. This pays normal memory cost but lets the client
243/// consume the result in chunks, abort mid-stream, or pipeline the next
244/// batch request while processing the previous one.
245///
246/// A future iteration can swap the rows field for a lazy iterator pulled
247/// from the execution engine without changing the wire protocol.
248pub(crate) struct Cursor {
249    cursor_id: u64,
250    columns: Vec<String>,
251    rows: Vec<UnifiedRecord>,
252    position: usize,
253}
254
255impl Cursor {
256    fn new(columns: Vec<String>, rows: Vec<UnifiedRecord>) -> Self {
257        Self {
258            cursor_id: 0, // overwritten by Session::insert_cursor
259            columns,
260            rows,
261            position: 0,
262        }
263    }
264
265    fn total(&self) -> usize {
266        self.rows.len()
267    }
268
269    fn remaining(&self) -> usize {
270        self.rows.len().saturating_sub(self.position)
271    }
272
273    fn is_exhausted(&self) -> bool {
274        self.position >= self.rows.len()
275    }
276
277    /// Extract up to `batch_size` rows from the current position forward.
278    /// Advances the position to the end of the returned slice.
279    fn take_batch(&mut self, batch_size: usize) -> &[UnifiedRecord] {
280        let end = (self.position + batch_size).min(self.rows.len());
281        let slice = &self.rows[self.position..end];
282        self.position = end;
283        slice
284    }
285}
286
287/// Run the stdio JSON-RPC loop against a local in-process runtime.
288///
289/// Returns the process exit code. `0` on normal shutdown (EOF or
290/// explicit `close`). Non-zero only on fatal I/O errors reading
291/// stdin or writing stdout.
292pub fn run(runtime: &RedDBRuntime) -> i32 {
293    run_with_io(runtime, std::io::stdin(), &mut std::io::stdout())
294}
295
296/// Run the stdio JSON-RPC loop as a proxy to a remote gRPC server.
297///
298/// Every method is forwarded via tonic. This is what
299/// `red rpc --stdio --connect grpc://host:port` uses, and it is also
300/// what the JS and Python drivers spawn when the user calls
301/// `connect("grpc://...")`.
302pub fn run_remote(endpoint: &str, token: Option<String>) -> i32 {
303    let tokio_rt = match tokio::runtime::Builder::new_current_thread()
304        .enable_all()
305        .build()
306    {
307        Ok(rt) => rt,
308        Err(e) => {
309            tracing::error!(err = %e, "rpc: failed to build tokio runtime");
310            return 1;
311        }
312    };
313    let client = match tokio_rt.block_on(RedDBClient::connect(endpoint, token)) {
314        Ok(c) => c,
315        Err(e) => {
316            tracing::error!(endpoint, err = %e, "rpc: failed to connect");
317            return 1;
318        }
319    };
320    let backend = Backend::Remote(Box::new(RemoteBackend {
321        client: AsyncMutex::new(client),
322        tokio_rt: &tokio_rt,
323    }));
324    run_backend(&backend, std::io::stdin(), &mut std::io::stdout())
325}
326
327/// Same as [`run`] but takes explicit I/O handles. Used by tests.
328pub fn run_with_io<W: Write>(runtime: &RedDBRuntime, stdin: Stdin, stdout: &mut W) -> i32 {
329    run_backend(&Backend::Local(runtime), stdin, stdout)
330}
331
332/// Per-stdio-session connection-id counter. Each session captures a
333/// unique id so its `tx.commit` BEGIN/COMMIT pair routes to a distinct
334/// `TxnContext` in the runtime — without this every stdio session
335/// would share `conn_id = 0` and trample each other's transactions.
336/// Starts at a high base so we don't collide with PG-wire / gRPC
337/// transports that allocate from their own pools below.
338static STDIO_SESSION_CONN_ID: std::sync::atomic::AtomicU64 =
339    std::sync::atomic::AtomicU64::new(1_000_000);
340
341fn next_stdio_conn_id() -> u64 {
342    STDIO_SESSION_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
343}
344
345fn run_backend<W: Write>(backend: &Backend<'_>, stdin: Stdin, stdout: &mut W) -> i32 {
346    let reader = BufReader::new(stdin.lock());
347    let mut session = Session::new();
348    // Bind the session to a stable connection id so the runtime's
349    // `tx_contexts` (keyed by conn_id) survives across `handle_line`
350    // calls within the same session.
351    let conn_id = next_stdio_conn_id();
352    crate::runtime::impl_core::set_current_connection_id(conn_id);
353    for line_result in reader.lines() {
354        let line = match line_result {
355            Ok(l) => l,
356            Err(e) => {
357                let _ = writeln!(
358                    stdout,
359                    "{}",
360                    error_response(&Value::Null, error_code::INTERNAL_ERROR, &e.to_string())
361                );
362                let _ = stdout.flush();
363                return 1;
364            }
365        };
366        if line.trim().is_empty() {
367            continue;
368        }
369        let response = handle_line(backend, &mut session, &line);
370        if writeln!(stdout, "{}", response).is_err() || stdout.flush().is_err() {
371            return 1;
372        }
373        if response.contains("\"__close__\":true") {
374            return 0;
375        }
376    }
377    // EOF: silently drop any open transaction — atomicity is preserved
378    // (nothing was ever applied to the store) and no error is surfaced to
379    // the caller because EOF may be graceful client disconnect.
380    let _ = session.take_tx();
381    crate::runtime::impl_core::clear_current_connection_id();
382    0
383}
384
385/// Parse one input line and dispatch. Always returns a single-line
386/// JSON string suitable for direct write to stdout. Never panics
387/// (panics inside handlers are caught and reported).
388fn handle_line(backend: &Backend<'_>, session: &mut Session, line: &str) -> String {
389    let parsed: Value = match json::from_str(line) {
390        Ok(v) => v,
391        Err(err) => {
392            return error_response(
393                &Value::Null,
394                error_code::PARSE_ERROR,
395                &format!("invalid JSON: {err}"),
396            );
397        }
398    };
399
400    let id = parsed.get("id").cloned().unwrap_or(Value::Null);
401
402    let method = match parsed.get("method").and_then(Value::as_str) {
403        Some(m) => m.to_string(),
404        None => {
405            return error_response(&id, error_code::INVALID_REQUEST, "missing 'method' field");
406        }
407    };
408
409    let params = parsed.get("params").cloned().unwrap_or(Value::Null);
410
411    let dispatch = std::panic::catch_unwind(AssertUnwindSafe(|| match backend {
412        Backend::Local(rt) => dispatch_method(rt, session, &method, &params),
413        Backend::Remote(remote) => {
414            // Transactions are session-local and the remote path forwards
415            // each call independently — there is no place to park a tx
416            // handle across gRPC hops yet. Surface a clear error so
417            // drivers can fall back to per-call auto-commit.
418            if matches!(
419                method.as_str(),
420                "tx.begin"
421                    | "tx.commit"
422                    | "tx.rollback"
423                    | "query.open"
424                    | "query.next"
425                    | "query.close"
426            ) {
427                Err((
428                    error_code::TX_NOT_SUPPORTED_REMOTE,
429                    format!("{method} is not supported over remote gRPC yet"),
430                ))
431            } else {
432                dispatch_method_remote(&remote.client, remote.tokio_rt, &method, &params)
433            }
434        }
435    }));
436
437    match dispatch {
438        Ok(Ok(result)) => success_response(&id, &result, method == "close"),
439        Ok(Err((code, msg))) => error_response(&id, code, &msg),
440        Err(_) => error_response(&id, error_code::INTERNAL_ERROR, "handler panicked (caught)"),
441    }
442}
443
444/// Dispatch a parsed method call. Returns the `result` value on
445/// success or `(error_code, message)` on failure.
446fn dispatch_method(
447    runtime: &RedDBRuntime,
448    session: &mut Session,
449    method: &str,
450    params: &Value,
451) -> Result<Value, (&'static str, String)> {
452    match method {
453        "tx.begin" => {
454            let tx_id = session.open_tx()?;
455            Ok(Value::Object(
456                [
457                    ("tx_id".to_string(), Value::Number(tx_id as f64)),
458                    (
459                        "isolation".to_string(),
460                        Value::String("read_committed_deferred".to_string()),
461                    ),
462                ]
463                .into_iter()
464                .collect(),
465            ))
466        }
467
468        "tx.commit" => {
469            let tx = session.take_tx().ok_or((
470                error_code::NO_TX_OPEN,
471                "no transaction is open in this session".to_string(),
472            ))?;
473            let tx_id = tx.tx_id;
474            let op_count = tx.write_set.len();
475
476            // Drive the replay through a real engine transaction so
477            // failures roll back the buffered write_set atomically.
478            // Replaces the legacy `commit_lock`-serialised replay:
479            // cross-session ordering is now provided by the
480            // snapshot-manager's xid allocation, which is what the
481            // SQL `BEGIN`/`COMMIT` path has used since #31.
482            let replay: Result<(u64, usize), (usize, String)> = (|| {
483                runtime
484                    .execute_query("BEGIN")
485                    .map_err(|e| (0usize, format!("BEGIN: {e}")))?;
486                let mut total_affected: u64 = 0;
487                for (idx, op) in tx.write_set.iter().enumerate() {
488                    match runtime.execute_query(op.sql()) {
489                        Ok(qr) => total_affected += qr.affected_rows,
490                        Err(e) => {
491                            let _ = runtime.execute_query("ROLLBACK");
492                            return Err((idx, e.to_string()));
493                        }
494                    }
495                }
496                runtime
497                    .execute_query("COMMIT")
498                    .map_err(|e| (op_count, format!("COMMIT: {e}")))?;
499                Ok((total_affected, op_count))
500            })();
501
502            match replay {
503                Ok((affected, replayed)) => Ok(Value::Object(
504                    [
505                        ("tx_id".to_string(), Value::Number(tx_id as f64)),
506                        ("ops_replayed".to_string(), Value::Number(replayed as f64)),
507                        ("affected".to_string(), Value::Number(affected as f64)),
508                    ]
509                    .into_iter()
510                    .collect(),
511                )),
512                Err((failed_idx, msg)) => Err((
513                    error_code::TX_REPLAY_FAILED,
514                    format!(
515                        "tx {tx_id} replay failed at op {failed_idx}/{op_count}: {msg} \
516                         (ops 0..{failed_idx} already applied and are NOT rolled back)"
517                    ),
518                )),
519            }
520        }
521
522        "query.open" => {
523            let sql = params.get("sql").and_then(Value::as_str).ok_or((
524                error_code::INVALID_PARAMS,
525                "missing 'sql' string".to_string(),
526            ))?;
527            let qr = runtime
528                .execute_query(sql)
529                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
530
531            // Extract the column list from the first record. Consistent
532            // with query_result_to_json which uses the first row's keys
533            // as schema.
534            let columns: Vec<String> = qr
535                .result
536                .records
537                .first()
538                .map(|first| {
539                    let mut keys: Vec<String> = first
540                        .column_names()
541                        .into_iter()
542                        .map(|k| k.to_string())
543                        .collect();
544                    keys.sort();
545                    keys
546                })
547                .unwrap_or_default();
548
549            let cursor = Cursor::new(columns.clone(), qr.result.records);
550            let total = cursor.total();
551            let cursor_id = session.insert_cursor(cursor)?;
552
553            Ok(Value::Object(
554                [
555                    ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
556                    (
557                        "columns".to_string(),
558                        Value::Array(columns.into_iter().map(Value::String).collect()),
559                    ),
560                    ("total_rows".to_string(), Value::Number(total as f64)),
561                ]
562                .into_iter()
563                .collect(),
564            ))
565        }
566
567        "query.next" => {
568            let cursor_id = params
569                .get("cursor_id")
570                .and_then(|v| v.as_f64())
571                .map(|n| n as u64)
572                .ok_or((
573                    error_code::INVALID_PARAMS,
574                    "missing 'cursor_id' number".to_string(),
575                ))?;
576            let batch_size = params
577                .get("batch_size")
578                .and_then(|v| v.as_f64())
579                .map(|n| n as usize)
580                .unwrap_or(DEFAULT_CURSOR_BATCH_SIZE)
581                .clamp(1, MAX_CURSOR_BATCH_SIZE);
582
583            // Extract the batch inside a bounded borrow so we can
584            // drop the cursor afterwards without borrow-conflict.
585            let (rows, done, remaining) = {
586                let cursor = session.cursor_mut(cursor_id).ok_or((
587                    error_code::CURSOR_NOT_FOUND,
588                    format!("cursor {cursor_id} not found"),
589                ))?;
590                let batch = cursor.take_batch(batch_size);
591                let rows_json: Vec<Value> = batch.iter().map(record_to_json_object).collect();
592                (rows_json, cursor.is_exhausted(), cursor.remaining())
593            };
594
595            if done {
596                // Auto-drop exhausted cursors so long-lived sessions
597                // don't accumulate dead state.
598                let _ = session.drop_cursor(cursor_id);
599            }
600
601            Ok(Value::Object(
602                [
603                    ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
604                    ("rows".to_string(), Value::Array(rows)),
605                    ("done".to_string(), Value::Bool(done)),
606                    ("remaining".to_string(), Value::Number(remaining as f64)),
607                ]
608                .into_iter()
609                .collect(),
610            ))
611        }
612
613        "query.close" => {
614            let cursor_id = params
615                .get("cursor_id")
616                .and_then(|v| v.as_f64())
617                .map(|n| n as u64)
618                .ok_or((
619                    error_code::INVALID_PARAMS,
620                    "missing 'cursor_id' number".to_string(),
621                ))?;
622            let existed = session.drop_cursor(cursor_id).is_some();
623            if !existed {
624                return Err((
625                    error_code::CURSOR_NOT_FOUND,
626                    format!("cursor {cursor_id} not found"),
627                ));
628            }
629            Ok(Value::Object(
630                [
631                    ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
632                    ("closed".to_string(), Value::Bool(true)),
633                ]
634                .into_iter()
635                .collect(),
636            ))
637        }
638
639        "tx.rollback" => {
640            let tx = session.take_tx().ok_or((
641                error_code::NO_TX_OPEN,
642                "no transaction is open in this session".to_string(),
643            ))?;
644            let ops_discarded = tx.write_set.len();
645            Ok(Value::Object(
646                [
647                    ("tx_id".to_string(), Value::Number(tx.tx_id as f64)),
648                    (
649                        "ops_discarded".to_string(),
650                        Value::Number(ops_discarded as f64),
651                    ),
652                ]
653                .into_iter()
654                .collect(),
655            ))
656        }
657
658        "version" => Ok(Value::Object(
659            [
660                (
661                    "version".to_string(),
662                    Value::String(env!("CARGO_PKG_VERSION").to_string()),
663                ),
664                (
665                    "protocol".to_string(),
666                    Value::String(PROTOCOL_VERSION.to_string()),
667                ),
668            ]
669            .into_iter()
670            .collect(),
671        )),
672
673        "health" => Ok(Value::Object(
674            [
675                ("ok".to_string(), Value::Bool(true)),
676                (
677                    "version".to_string(),
678                    Value::String(env!("CARGO_PKG_VERSION").to_string()),
679                ),
680            ]
681            .into_iter()
682            .collect(),
683        )),
684
685        "query" => {
686            let sql = params.get("sql").and_then(Value::as_str).ok_or((
687                error_code::INVALID_PARAMS,
688                "missing 'sql' string".to_string(),
689            ))?;
690            let qr = runtime
691                .execute_query(sql)
692                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
693            Ok(query_result_to_json(&qr))
694        }
695
696        // ── Prepared statements ──────────────────────────────────────────────
697        //
698        // `prepare` parses the SQL once, extracts a parameterized shape, and
699        // returns a `prepared_id` the client can reuse. `execute_prepared` takes
700        // that id plus JSON-encoded bind values and runs the plan without parsing.
701        //
702        // This mirrors the PostgreSQL extended-query protocol semantics and is the
703        // server-side half of the client driver's `PreparedStatement` abstraction.
704        "prepare" => {
705            use crate::storage::query::modes::parse_multi;
706            use crate::storage::query::planner::shape::parameterize_query_expr;
707
708            let sql = params.get("sql").and_then(Value::as_str).ok_or((
709                error_code::INVALID_PARAMS,
710                "missing 'sql' string".to_string(),
711            ))?;
712            let parsed = parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
713            let (shape, parameter_count) = if let Some(prepared) = parameterize_query_expr(&parsed)
714            {
715                (prepared.shape, prepared.parameter_count)
716            } else {
717                (parsed, 0)
718            };
719            let id = session.next_prepared_id;
720            session.next_prepared_id = session.next_prepared_id.saturating_add(1);
721            session.prepared.insert(
722                id,
723                StdioPreparedStatement {
724                    shape,
725                    parameter_count,
726                },
727            );
728            Ok(Value::Object(
729                [
730                    ("prepared_id".to_string(), Value::Number(id as f64)),
731                    (
732                        "parameter_count".to_string(),
733                        Value::Number(parameter_count as f64),
734                    ),
735                ]
736                .into_iter()
737                .collect(),
738            ))
739        }
740
741        "execute_prepared" => {
742            use crate::storage::query::planner::shape::bind_parameterized_query;
743            use crate::storage::schema::Value as SV;
744
745            let id = params
746                .get("prepared_id")
747                .and_then(Value::as_f64)
748                .map(|n| n as u64)
749                .ok_or((
750                    error_code::INVALID_PARAMS,
751                    "missing 'prepared_id'".to_string(),
752                ))?;
753
754            let stmt = session.prepared.get(&id).ok_or((
755                error_code::QUERY_ERROR,
756                format!("no prepared statement with id {id}"),
757            ))?;
758
759            // Parse bind values from JSON array of JSON-encoded literals.
760            let binds_json: Vec<Value> = params
761                .get("binds")
762                .and_then(Value::as_array)
763                .map(|a| a.to_vec())
764                .unwrap_or_default();
765            if binds_json.len() != stmt.parameter_count {
766                return Err((
767                    error_code::INVALID_PARAMS,
768                    format!(
769                        "expected {} bind values, got {}",
770                        stmt.parameter_count,
771                        binds_json.len()
772                    ),
773                ));
774            }
775
776            // Convert JSON bind values to SchemaValue.
777            let binds: Vec<SV> = binds_json.iter().map(json_value_to_schema_value).collect();
778
779            // Bind literals into the parameterized shape.
780            let expr = if stmt.parameter_count == 0 {
781                stmt.shape.clone()
782            } else {
783                bind_parameterized_query(&stmt.shape, &binds, stmt.parameter_count)
784                    .ok_or((error_code::QUERY_ERROR, "bind failed".to_string()))?
785            };
786
787            let qr = runtime
788                .execute_query_expr(expr)
789                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
790            Ok(query_result_to_json(&qr))
791        }
792
793        "insert" => {
794            let collection = params.get("collection").and_then(Value::as_str).ok_or((
795                error_code::INVALID_PARAMS,
796                "missing 'collection' string".to_string(),
797            ))?;
798            let payload = params.get("payload").ok_or((
799                error_code::INVALID_PARAMS,
800                "missing 'payload' object".to_string(),
801            ))?;
802            let payload_obj = payload.as_object().ok_or((
803                error_code::INVALID_PARAMS,
804                "'payload' must be a JSON object".to_string(),
805            ))?;
806            let sql = build_insert_sql(collection, payload_obj.iter());
807
808            if let Some(tx) = session.current_tx_mut() {
809                tx.write_set.push(PendingSql::Insert(sql));
810                return Ok(pending_tx_response(tx.tx_id));
811            }
812
813            let qr = runtime
814                .execute_query(&sql)
815                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
816            Ok(insert_result_to_json(&qr))
817        }
818
819        "bulk_insert" => {
820            let collection = params.get("collection").and_then(Value::as_str).ok_or((
821                error_code::INVALID_PARAMS,
822                "missing 'collection' string".to_string(),
823            ))?;
824            let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
825                error_code::INVALID_PARAMS,
826                "missing 'payloads' array".to_string(),
827            ))?;
828
829            if let Some(tx) = session.current_tx_mut() {
830                let mut buffered: u64 = 0;
831                for entry in payloads {
832                    let obj = entry.as_object().ok_or((
833                        error_code::INVALID_PARAMS,
834                        "each payload must be a JSON object".to_string(),
835                    ))?;
836                    let sql = build_insert_sql(collection, obj.iter());
837                    tx.write_set.push(PendingSql::Insert(sql));
838                    buffered += 1;
839                }
840                let tx_id = tx.tx_id;
841                return Ok(Value::Object(
842                    [
843                        ("affected".to_string(), Value::Number(0.0)),
844                        ("buffered".to_string(), Value::Number(buffered as f64)),
845                        ("pending".to_string(), Value::Bool(true)),
846                        ("tx_id".to_string(), Value::Number(tx_id as f64)),
847                    ]
848                    .into_iter()
849                    .collect(),
850                ));
851            }
852
853            let mut total_affected: u64 = 0;
854            for entry in payloads {
855                let obj = entry.as_object().ok_or((
856                    error_code::INVALID_PARAMS,
857                    "each payload must be a JSON object".to_string(),
858                ))?;
859                let sql = build_insert_sql(collection, obj.iter());
860                let qr = runtime
861                    .execute_query(&sql)
862                    .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
863                total_affected += qr.affected_rows;
864            }
865            Ok(Value::Object(
866                [("affected".to_string(), Value::Number(total_affected as f64))]
867                    .into_iter()
868                    .collect(),
869            ))
870        }
871
872        "get" => {
873            let collection = params.get("collection").and_then(Value::as_str).ok_or((
874                error_code::INVALID_PARAMS,
875                "missing 'collection' string".to_string(),
876            ))?;
877            let id = params.get("id").and_then(Value::as_str).ok_or((
878                error_code::INVALID_PARAMS,
879                "missing 'id' string".to_string(),
880            ))?;
881            let sql = format!("SELECT * FROM {collection} WHERE _entity_id = {id} LIMIT 1");
882            let qr = runtime
883                .execute_query(&sql)
884                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
885            let entity = qr
886                .result
887                .records
888                .first()
889                .map(record_to_json_object)
890                .unwrap_or(Value::Null);
891            Ok(Value::Object(
892                [("entity".to_string(), entity)].into_iter().collect(),
893            ))
894        }
895
896        "delete" => {
897            let collection = params.get("collection").and_then(Value::as_str).ok_or((
898                error_code::INVALID_PARAMS,
899                "missing 'collection' string".to_string(),
900            ))?;
901            let id = params.get("id").and_then(Value::as_str).ok_or((
902                error_code::INVALID_PARAMS,
903                "missing 'id' string".to_string(),
904            ))?;
905            let sql = format!("DELETE FROM {collection} WHERE _entity_id = {id}");
906
907            if let Some(tx) = session.current_tx_mut() {
908                tx.write_set.push(PendingSql::Delete(sql));
909                return Ok(pending_tx_response(tx.tx_id));
910            }
911
912            let qr = runtime
913                .execute_query(&sql)
914                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
915            Ok(Value::Object(
916                [(
917                    "affected".to_string(),
918                    Value::Number(qr.affected_rows as f64),
919                )]
920                .into_iter()
921                .collect(),
922            ))
923        }
924
925        "close" => {
926            // Silently drop any open transaction and cursors on close.
927            // The client explicitly asked to terminate; surfacing an
928            // error here would leak state across what is effectively a
929            // reset.
930            let _ = session.take_tx();
931            session.clear_cursors();
932            let _ = runtime.checkpoint();
933            Ok(Value::Null)
934        }
935
936        // Auth surface — local stdio bridge has no auth backend
937        // (the spawned binary inherits the caller's privileges by
938        // construction). The remote bridge below maps these methods
939        // onto the gRPC server's auth endpoints.
940        "auth.login"
941        | "auth.whoami"
942        | "auth.change_password"
943        | "auth.create_api_key"
944        | "auth.revoke_api_key" => {
945            let _ = (session, params);
946            Err((
947                error_code::INVALID_REQUEST,
948                format!(
949                    "{method}: auth methods are only available on grpc:// connections; \
950                     embedded modes (memory://, file://) inherit caller privileges"
951                ),
952            ))
953        }
954
955        other => Err((
956            error_code::INVALID_REQUEST,
957            format!("unknown method: {other}"),
958        )),
959    }
960}
961
962// ---------------------------------------------------------------------------
963// Response builders
964// ---------------------------------------------------------------------------
965
966fn success_response(id: &Value, result: &Value, is_close: bool) -> String {
967    // For `close` we tag the response so the loop knows to exit after
968    // flushing. The tag is stripped from the wire by replacing it
969    // before serialization — actually we just include it as a sentinel
970    // field that drivers ignore (forward compat).
971    let mut envelope = json::Map::new();
972    envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
973    envelope.insert("id".to_string(), id.clone());
974    envelope.insert("result".to_string(), result.clone());
975    if is_close {
976        envelope.insert("__close__".to_string(), Value::Bool(true));
977    }
978    Value::Object(envelope).to_string_compact()
979}
980
981fn error_response(id: &Value, code: &str, message: &str) -> String {
982    let mut err = json::Map::new();
983    err.insert("code".to_string(), Value::String(code.to_string()));
984    err.insert("message".to_string(), Value::String(message.to_string()));
985    err.insert("data".to_string(), Value::Null);
986
987    let mut envelope = json::Map::new();
988    envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
989    envelope.insert("id".to_string(), id.clone());
990    envelope.insert("error".to_string(), Value::Object(err));
991    Value::Object(envelope).to_string_compact()
992}
993
994// ---------------------------------------------------------------------------
995// Helpers
996// ---------------------------------------------------------------------------
997
998/// Envelope returned by `insert` and `delete` when the call was buffered
999/// into an open transaction instead of being auto-committed.
1000fn pending_tx_response(tx_id: u64) -> Value {
1001    Value::Object(
1002        [
1003            ("affected".to_string(), Value::Number(0.0)),
1004            ("pending".to_string(), Value::Bool(true)),
1005            ("tx_id".to_string(), Value::Number(tx_id as f64)),
1006        ]
1007        .into_iter()
1008        .collect(),
1009    )
1010}
1011
1012pub(crate) fn build_insert_sql<'a, I>(collection: &str, fields: I) -> String
1013where
1014    I: Iterator<Item = (&'a String, &'a Value)>,
1015{
1016    let mut cols = Vec::new();
1017    let mut vals = Vec::new();
1018    for (k, v) in fields {
1019        cols.push(k.clone());
1020        vals.push(value_to_sql_literal(v));
1021    }
1022    format!(
1023        "INSERT INTO {collection} ({}) VALUES ({})",
1024        cols.join(", "),
1025        vals.join(", "),
1026    )
1027}
1028
1029pub(crate) fn value_to_sql_literal(v: &Value) -> String {
1030    match v {
1031        Value::Null => "NULL".to_string(),
1032        Value::Bool(b) => b.to_string(),
1033        Value::Number(n) => {
1034            if n.fract() == 0.0 {
1035                format!("{}", *n as i64)
1036            } else {
1037                n.to_string()
1038            }
1039        }
1040        Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1041        other => format!("'{}'", other.to_string_compact().replace('\'', "''")),
1042    }
1043}
1044
1045fn query_result_to_json(qr: &RuntimeQueryResult) -> Value {
1046    let mut envelope = json::Map::new();
1047    envelope.insert(
1048        "statement".to_string(),
1049        Value::String(qr.statement_type.to_string()),
1050    );
1051    envelope.insert(
1052        "affected".to_string(),
1053        Value::Number(qr.affected_rows as f64),
1054    );
1055
1056    let mut columns = Vec::new();
1057    if let Some(first) = qr.result.records.first() {
1058        let mut keys: Vec<String> = first
1059            .column_names()
1060            .into_iter()
1061            .map(|k| k.to_string())
1062            .collect();
1063        keys.sort();
1064        columns = keys.into_iter().map(Value::String).collect();
1065    }
1066    envelope.insert("columns".to_string(), Value::Array(columns));
1067
1068    let rows: Vec<Value> = qr
1069        .result
1070        .records
1071        .iter()
1072        .map(record_to_json_object)
1073        .collect();
1074    envelope.insert("rows".to_string(), Value::Array(rows));
1075
1076    Value::Object(envelope)
1077}
1078
1079pub(crate) fn insert_result_to_json(qr: &RuntimeQueryResult) -> Value {
1080    let mut envelope = json::Map::new();
1081    envelope.insert(
1082        "affected".to_string(),
1083        Value::Number(qr.affected_rows as f64),
1084    );
1085    // First row of the result, if any, contains the inserted entity id.
1086    if let Some(first) = qr.result.records.first() {
1087        if let Some(id_val) = first
1088            .iter_fields()
1089            .find(|(k, _)| {
1090                let s: &str = k;
1091                s == "_entity_id"
1092            })
1093            .map(|(_, v)| schema_value_to_json(v))
1094        {
1095            envelope.insert("id".to_string(), id_val);
1096        }
1097    }
1098    Value::Object(envelope)
1099}
1100
1101fn record_to_json_object(record: &UnifiedRecord) -> Value {
1102    let mut map = json::Map::new();
1103    // iter_fields merges the columnar fast-path + HashMap so scan
1104    // rows (columnar only) contribute their values.
1105    let mut entries: Vec<(&str, &SchemaValue)> =
1106        record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
1107    entries.sort_by(|a, b| a.0.cmp(b.0));
1108    for (k, v) in entries {
1109        map.insert(k.to_string(), schema_value_to_json(v));
1110    }
1111    Value::Object(map)
1112}
1113
1114fn schema_value_to_json(v: &SchemaValue) -> Value {
1115    match v {
1116        SchemaValue::Null => Value::Null,
1117        SchemaValue::Boolean(b) => Value::Bool(*b),
1118        SchemaValue::Integer(n) => Value::Number(*n as f64),
1119        SchemaValue::UnsignedInteger(n) => Value::Number(*n as f64),
1120        SchemaValue::Float(n) => Value::Number(*n),
1121        SchemaValue::BigInt(n) => Value::Number(*n as f64),
1122        SchemaValue::TimestampMs(n)
1123        | SchemaValue::Timestamp(n)
1124        | SchemaValue::Duration(n)
1125        | SchemaValue::Decimal(n) => Value::Number(*n as f64),
1126        SchemaValue::Password(_) | SchemaValue::Secret(_) => Value::String("***".to_string()),
1127        SchemaValue::Text(s) => Value::String(s.to_string()),
1128        SchemaValue::Email(s)
1129        | SchemaValue::Url(s)
1130        | SchemaValue::NodeRef(s)
1131        | SchemaValue::EdgeRef(s) => Value::String(s.clone()),
1132        other => Value::String(format!("{other}")),
1133    }
1134}
1135
1136/// Convert a JSON `Value` to a `SchemaValue` for use as a bind parameter
1137/// in a prepared statement. Mirrors PostgreSQL's implicit type coercion:
1138/// JSON numbers become `Float`, strings become `Text`, booleans map to
1139/// `Boolean`, and `null` becomes `Null`.
1140fn json_value_to_schema_value(v: &Value) -> SchemaValue {
1141    match v {
1142        Value::Null => SchemaValue::Null,
1143        Value::Bool(b) => SchemaValue::Boolean(*b),
1144        Value::Number(n) => {
1145            if n.fract() == 0.0 && n.abs() < i64::MAX as f64 {
1146                SchemaValue::Integer(*n as i64)
1147            } else {
1148                SchemaValue::Float(*n)
1149            }
1150        }
1151        Value::String(s) => SchemaValue::text(s.clone()),
1152        Value::Array(_) | Value::Object(_) => {
1153            SchemaValue::text(crate::json::to_string(v).unwrap_or_default())
1154        }
1155    }
1156}
1157
1158// ---------------------------------------------------------------------------
1159// Remote dispatch (grpc://)
1160// ---------------------------------------------------------------------------
1161
1162/// Dispatch a parsed JSON-RPC call over gRPC. Mirrors `dispatch_method`
1163/// but every operation goes through the tonic client. The server's
1164/// own `RedDBRuntime` does the actual work — we are just a wire
1165/// adapter between the JSON-RPC framing the drivers speak and the
1166/// gRPC protobuf framing the server speaks.
1167fn dispatch_method_remote(
1168    client: &AsyncMutex<RedDBClient>,
1169    tokio_rt: &tokio::runtime::Runtime,
1170    method: &str,
1171    params: &Value,
1172) -> Result<Value, (&'static str, String)> {
1173    match method {
1174        "version" => Ok(Value::Object(
1175            [
1176                (
1177                    "version".to_string(),
1178                    Value::String(env!("CARGO_PKG_VERSION").to_string()),
1179                ),
1180                (
1181                    "protocol".to_string(),
1182                    Value::String(PROTOCOL_VERSION.to_string()),
1183                ),
1184            ]
1185            .into_iter()
1186            .collect(),
1187        )),
1188
1189        "health" => {
1190            let result = tokio_rt.block_on(async {
1191                let mut guard = client.lock().await;
1192                guard.health_status().await
1193            });
1194            match result {
1195                Ok(status) => Ok(Value::Object(
1196                    [
1197                        ("ok".to_string(), Value::Bool(status.healthy)),
1198                        ("state".to_string(), Value::String(status.state)),
1199                        (
1200                            "checked_at_unix_ms".to_string(),
1201                            Value::Number(status.checked_at_unix_ms as f64),
1202                        ),
1203                        (
1204                            "version".to_string(),
1205                            Value::String(env!("CARGO_PKG_VERSION").to_string()),
1206                        ),
1207                    ]
1208                    .into_iter()
1209                    .collect(),
1210                )),
1211                Err(e) => Err((error_code::INTERNAL_ERROR, e.to_string())),
1212            }
1213        }
1214
1215        "query" => {
1216            let sql = params.get("sql").and_then(Value::as_str).ok_or((
1217                error_code::INVALID_PARAMS,
1218                "missing 'sql' string".to_string(),
1219            ))?;
1220            let json_str = tokio_rt
1221                .block_on(async {
1222                    let mut guard = client.lock().await;
1223                    guard.query(sql).await
1224                })
1225                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1226            // Server returned its own QueryReply.result_json. Parse and
1227            // repackage into the stdio-protocol shape. If parsing fails,
1228            // hand the raw server JSON back under a sentinel key so the
1229            // caller still gets something useful.
1230            let parsed = json::from_str::<Value>(&json_str)
1231                .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1232            Ok(parsed)
1233        }
1234
1235        "insert" => {
1236            let collection = params.get("collection").and_then(Value::as_str).ok_or((
1237                error_code::INVALID_PARAMS,
1238                "missing 'collection' string".to_string(),
1239            ))?;
1240            let payload = params.get("payload").ok_or((
1241                error_code::INVALID_PARAMS,
1242                "missing 'payload' object".to_string(),
1243            ))?;
1244            if payload.as_object().is_none() {
1245                return Err((
1246                    error_code::INVALID_PARAMS,
1247                    "'payload' must be a JSON object".to_string(),
1248                ));
1249            }
1250            let payload_json = payload.to_string_compact();
1251            let reply = tokio_rt
1252                .block_on(async {
1253                    let mut guard = client.lock().await;
1254                    guard.create_row_entity(collection, &payload_json).await
1255                })
1256                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1257            let mut out = json::Map::new();
1258            out.insert("affected".to_string(), Value::Number(1.0));
1259            out.insert("id".to_string(), Value::String(reply.id.to_string()));
1260            Ok(Value::Object(out))
1261        }
1262
1263        "bulk_insert" => {
1264            let collection = params.get("collection").and_then(Value::as_str).ok_or((
1265                error_code::INVALID_PARAMS,
1266                "missing 'collection' string".to_string(),
1267            ))?;
1268            let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
1269                error_code::INVALID_PARAMS,
1270                "missing 'payloads' array".to_string(),
1271            ))?;
1272            let mut encoded = Vec::with_capacity(payloads.len());
1273            for entry in payloads {
1274                if entry.as_object().is_none() {
1275                    return Err((
1276                        error_code::INVALID_PARAMS,
1277                        "each payload must be a JSON object".to_string(),
1278                    ));
1279                }
1280                encoded.push(entry.to_string_compact());
1281            }
1282            let total = tokio_rt
1283                .block_on(async {
1284                    let mut guard = client.lock().await;
1285                    guard.bulk_create_rows(collection, encoded).await
1286                })
1287                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?
1288                .count;
1289            Ok(Value::Object(
1290                [("affected".to_string(), Value::Number(total as f64))]
1291                    .into_iter()
1292                    .collect(),
1293            ))
1294        }
1295
1296        "get" => {
1297            let collection = params.get("collection").and_then(Value::as_str).ok_or((
1298                error_code::INVALID_PARAMS,
1299                "missing 'collection' string".to_string(),
1300            ))?;
1301            let id = params.get("id").and_then(Value::as_str).ok_or((
1302                error_code::INVALID_PARAMS,
1303                "missing 'id' string".to_string(),
1304            ))?;
1305            let sql = format!("SELECT * FROM {collection} WHERE _entity_id = {id} LIMIT 1");
1306            let json_str = tokio_rt
1307                .block_on(async {
1308                    let mut guard = client.lock().await;
1309                    guard.query(&sql).await
1310                })
1311                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1312            let parsed = json::from_str::<Value>(&json_str)
1313                .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1314            // Server response shape: {"rows":[{...}], ...}. Extract
1315            // the first row (if any) as `entity`.
1316            let entity = parsed
1317                .get("rows")
1318                .and_then(Value::as_array)
1319                .and_then(|rows| rows.first().cloned())
1320                .unwrap_or(Value::Null);
1321            Ok(Value::Object(
1322                [("entity".to_string(), entity)].into_iter().collect(),
1323            ))
1324        }
1325
1326        "delete" => {
1327            let collection = params.get("collection").and_then(Value::as_str).ok_or((
1328                error_code::INVALID_PARAMS,
1329                "missing 'collection' string".to_string(),
1330            ))?;
1331            let id = params.get("id").and_then(Value::as_str).ok_or((
1332                error_code::INVALID_PARAMS,
1333                "missing 'id' string".to_string(),
1334            ))?;
1335            let id = id.parse::<u64>().map_err(|_| {
1336                (
1337                    error_code::INVALID_PARAMS,
1338                    "id must be a numeric string".to_string(),
1339                )
1340            })?;
1341            let _reply = tokio_rt
1342                .block_on(async {
1343                    let mut guard = client.lock().await;
1344                    guard.delete_entity(collection, id).await
1345                })
1346                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1347            Ok(Value::Object(
1348                [("affected".to_string(), Value::Number(1.0))]
1349                    .into_iter()
1350                    .collect(),
1351            ))
1352        }
1353
1354        "close" => Ok(Value::Null),
1355
1356        other => Err((
1357            error_code::INVALID_REQUEST,
1358            format!("unknown method: {other}"),
1359        )),
1360    }
1361}
1362
1363#[cfg(test)]
1364mod tests {
1365    use super::*;
1366
1367    fn make_runtime() -> RedDBRuntime {
1368        RedDBRuntime::in_memory().expect("in-memory runtime")
1369    }
1370
1371    fn handle(rt: &RedDBRuntime, line: &str) -> String {
1372        let mut session = Session::new();
1373        handle_line(&Backend::Local(rt), &mut session, line)
1374    }
1375
1376    /// Stateful helper: keeps the same `Session` across multiple calls so
1377    /// tests can exercise multi-step transaction flows in a single closure.
1378    fn with_session<F>(rt: &RedDBRuntime, f: F)
1379    where
1380        F: FnOnce(&dyn Fn(&str) -> String, &RedDBRuntime),
1381    {
1382        let session = std::cell::RefCell::new(Session::new());
1383        let call = |line: &str| -> String {
1384            let mut s = session.borrow_mut();
1385            handle_line(&Backend::Local(rt), &mut s, line)
1386        };
1387        f(&call, rt);
1388    }
1389
1390    #[test]
1391    fn version_method_returns_version_and_protocol() {
1392        let rt = make_runtime();
1393        let line = r#"{"jsonrpc":"2.0","id":1,"method":"version","params":{}}"#;
1394        let resp = handle(&rt, line);
1395        assert!(resp.contains("\"id\":1"));
1396        assert!(resp.contains("\"protocol\":\"1.0\""));
1397        assert!(resp.contains("\"version\""));
1398    }
1399
1400    #[test]
1401    fn health_method_returns_ok_true() {
1402        let rt = make_runtime();
1403        let resp = handle(
1404            &rt,
1405            r#"{"jsonrpc":"2.0","id":"abc","method":"health","params":{}}"#,
1406        );
1407        assert!(resp.contains("\"ok\":true"));
1408        assert!(resp.contains("\"id\":\"abc\""));
1409    }
1410
1411    #[test]
1412    fn parse_error_for_invalid_json() {
1413        let rt = make_runtime();
1414        let resp = handle(&rt, "not json {");
1415        assert!(resp.contains("\"code\":\"PARSE_ERROR\""));
1416        assert!(resp.contains("\"id\":null"));
1417    }
1418
1419    #[test]
1420    fn invalid_request_when_method_missing() {
1421        let rt = make_runtime();
1422        let resp = handle(&rt, r#"{"jsonrpc":"2.0","id":1,"params":{}}"#);
1423        assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
1424    }
1425
1426    #[test]
1427    fn unknown_method_is_invalid_request() {
1428        let rt = make_runtime();
1429        let resp = handle(
1430            &rt,
1431            r#"{"jsonrpc":"2.0","id":1,"method":"frobnicate","params":{}}"#,
1432        );
1433        assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
1434        assert!(resp.contains("frobnicate"));
1435    }
1436
1437    #[test]
1438    fn invalid_params_when_query_sql_missing() {
1439        let rt = make_runtime();
1440        let resp = handle(
1441            &rt,
1442            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{}}"#,
1443        );
1444        assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
1445    }
1446
1447    #[test]
1448    fn close_method_marks_response_for_shutdown() {
1449        let rt = make_runtime();
1450        let resp = handle(
1451            &rt,
1452            r#"{"jsonrpc":"2.0","id":1,"method":"close","params":{}}"#,
1453        );
1454        assert!(resp.contains("\"__close__\":true"));
1455    }
1456
1457    #[test]
1458    fn query_select_one_returns_rows() {
1459        let rt = make_runtime();
1460        let resp = handle(
1461            &rt,
1462            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"SELECT 1 AS one"}}"#,
1463        );
1464        assert!(resp.contains("\"result\""));
1465        assert!(!resp.contains("\"error\""));
1466    }
1467
1468    // -----------------------------------------------------------------
1469    // Transaction tests
1470    // -----------------------------------------------------------------
1471
1472    #[test]
1473    fn tx_begin_returns_tx_id_and_isolation() {
1474        let rt = make_runtime();
1475        with_session(&rt, |call, _| {
1476            let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1477            assert!(resp.contains("\"tx_id\":1"));
1478            assert!(resp.contains("\"isolation\":\"read_committed_deferred\""));
1479            assert!(!resp.contains("\"error\""));
1480        });
1481    }
1482
1483    #[test]
1484    fn tx_begin_twice_returns_already_open() {
1485        let rt = make_runtime();
1486        with_session(&rt, |call, _| {
1487            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1488            let resp = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
1489            assert!(resp.contains("\"code\":\"TX_ALREADY_OPEN\""));
1490        });
1491    }
1492
1493    #[test]
1494    fn tx_commit_without_begin_returns_no_tx_open() {
1495        let rt = make_runtime();
1496        with_session(&rt, |call, _| {
1497            let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.commit","params":null}"#);
1498            assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
1499        });
1500    }
1501
1502    #[test]
1503    fn tx_rollback_without_begin_returns_no_tx_open() {
1504        let rt = make_runtime();
1505        with_session(&rt, |call, _| {
1506            let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.rollback","params":null}"#);
1507            assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
1508        });
1509    }
1510
1511    #[test]
1512    fn insert_inside_tx_returns_pending_envelope() {
1513        let rt = make_runtime();
1514        // Create the collection first (outside any tx).
1515        let _ = handle(
1516            &rt,
1517            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE users (name TEXT)"}}"#,
1518        );
1519        with_session(&rt, |call, _| {
1520            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1521            let resp = call(
1522                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"users","payload":{"name":"alice"}}}"#,
1523            );
1524            assert!(resp.contains("\"pending\":true"));
1525            assert!(resp.contains("\"tx_id\":1"));
1526            assert!(resp.contains("\"affected\":0"));
1527        });
1528    }
1529
1530    #[test]
1531    fn begin_insert_rollback_does_not_persist() {
1532        let rt = make_runtime();
1533        let _ = handle(
1534            &rt,
1535            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u (name TEXT)"}}"#,
1536        );
1537        with_session(&rt, |call, _| {
1538            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1539            let _ = call(
1540                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u","payload":{"name":"ghost"}}}"#,
1541            );
1542            let rollback = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
1543            assert!(rollback.contains("\"ops_discarded\":1"));
1544            assert!(rollback.contains("\"tx_id\":1"));
1545        });
1546        // After rollback, the row must not be visible to a fresh query.
1547        let resp = handle(
1548            &rt,
1549            r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u"}}"#,
1550        );
1551        assert!(!resp.contains("\"ghost\""));
1552    }
1553
1554    #[test]
1555    fn begin_insert_commit_persists() {
1556        let rt = make_runtime();
1557        let _ = handle(
1558            &rt,
1559            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u2 (name TEXT)"}}"#,
1560        );
1561        with_session(&rt, |call, _| {
1562            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1563            let _ = call(
1564                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u2","payload":{"name":"alice"}}}"#,
1565            );
1566            let _ = call(
1567                r#"{"jsonrpc":"2.0","id":3,"method":"insert","params":{"collection":"u2","payload":{"name":"bob"}}}"#,
1568            );
1569            let commit = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.commit","params":null}"#);
1570            assert!(commit.contains("\"ops_replayed\":2"));
1571            assert!(!commit.contains("\"error\""));
1572        });
1573        let resp = handle(
1574            &rt,
1575            r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u2"}}"#,
1576        );
1577        assert!(resp.contains("\"alice\""));
1578        assert!(resp.contains("\"bob\""));
1579    }
1580
1581    #[test]
1582    fn bulk_insert_inside_tx_buffers_everything() {
1583        let rt = make_runtime();
1584        let _ = handle(
1585            &rt,
1586            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u3 (name TEXT)"}}"#,
1587        );
1588        with_session(&rt, |call, _| {
1589            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1590            let resp = call(
1591                r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"u3","payloads":[{"name":"a"},{"name":"b"},{"name":"c"}]}}"#,
1592            );
1593            assert!(resp.contains("\"buffered\":3"));
1594            assert!(resp.contains("\"pending\":true"));
1595            assert!(resp.contains("\"affected\":0"));
1596
1597            let commit = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
1598            assert!(commit.contains("\"ops_replayed\":3"));
1599        });
1600    }
1601
1602    #[test]
1603    fn delete_inside_tx_is_buffered() {
1604        let rt = make_runtime();
1605        // Seed two rows outside any tx.
1606        let _ = handle(
1607            &rt,
1608            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u4 (name TEXT)"}}"#,
1609        );
1610        let _ = handle(
1611            &rt,
1612            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO u4 (name) VALUES ('keep')"}}"#,
1613        );
1614        with_session(&rt, |call, _| {
1615            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1616            let resp = call(
1617                r#"{"jsonrpc":"2.0","id":2,"method":"delete","params":{"collection":"u4","id":"1"}}"#,
1618            );
1619            assert!(resp.contains("\"pending\":true"));
1620            let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
1621        });
1622        // Row should still be present after rollback of the delete.
1623        let resp = handle(
1624            &rt,
1625            r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u4"}}"#,
1626        );
1627        assert!(resp.contains("\"keep\""));
1628    }
1629
1630    #[test]
1631    fn close_with_open_tx_auto_rollbacks() {
1632        let rt = make_runtime();
1633        let _ = handle(
1634            &rt,
1635            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u5 (name TEXT)"}}"#,
1636        );
1637        with_session(&rt, |call, _| {
1638            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1639            let _ = call(
1640                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u5","payload":{"name":"ghost"}}}"#,
1641            );
1642            let close = call(r#"{"jsonrpc":"2.0","id":3,"method":"close","params":null}"#);
1643            assert!(close.contains("\"__close__\":true"));
1644            assert!(!close.contains("\"error\""));
1645        });
1646        let resp = handle(
1647            &rt,
1648            r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u5"}}"#,
1649        );
1650        assert!(!resp.contains("\"ghost\""));
1651    }
1652
1653    // -----------------------------------------------------------------
1654    // Cursor streaming tests
1655    // -----------------------------------------------------------------
1656
1657    fn seed_numbers_table(rt: &RedDBRuntime, table: &str, count: u32) {
1658        let _ = handle(
1659            rt,
1660            &format!(
1661                r#"{{"jsonrpc":"2.0","id":1,"method":"query","params":{{"sql":"CREATE TABLE {table} (n INTEGER)"}}}}"#,
1662            ),
1663        );
1664        for i in 0..count {
1665            let _ = handle(
1666                rt,
1667                &format!(
1668                    r#"{{"jsonrpc":"2.0","id":2,"method":"query","params":{{"sql":"INSERT INTO {table} (n) VALUES ({i})"}}}}"#,
1669                ),
1670            );
1671        }
1672    }
1673
1674    #[test]
1675    fn cursor_open_returns_id_columns_and_total() {
1676        let rt = make_runtime();
1677        seed_numbers_table(&rt, "nums1", 3);
1678        with_session(&rt, |call, _| {
1679            let resp = call(
1680                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums1"}}"#,
1681            );
1682            assert!(resp.contains("\"cursor_id\":1"));
1683            assert!(resp.contains("\"total_rows\":3"));
1684            assert!(resp.contains("\"columns\""));
1685            assert!(!resp.contains("\"error\""));
1686        });
1687    }
1688
1689    #[test]
1690    fn cursor_next_chunks_rows_and_signals_done() {
1691        let rt = make_runtime();
1692        seed_numbers_table(&rt, "nums2", 5);
1693        with_session(&rt, |call, _| {
1694            let _ = call(
1695                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums2"}}"#,
1696            );
1697            let first = call(
1698                r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
1699            );
1700            assert!(first.contains("\"done\":false"));
1701            assert!(first.contains("\"remaining\":3"));
1702
1703            let second = call(
1704                r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
1705            );
1706            assert!(second.contains("\"done\":false"));
1707            assert!(second.contains("\"remaining\":1"));
1708
1709            let third = call(
1710                r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
1711            );
1712            assert!(third.contains("\"done\":true"));
1713            assert!(third.contains("\"remaining\":0"));
1714        });
1715    }
1716
1717    #[test]
1718    fn cursor_auto_drops_when_exhausted() {
1719        let rt = make_runtime();
1720        seed_numbers_table(&rt, "nums3", 2);
1721        with_session(&rt, |call, _| {
1722            let _ = call(
1723                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums3"}}"#,
1724            );
1725            let _ = call(
1726                r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
1727            );
1728            // Cursor was auto-dropped after done=true; subsequent next
1729            // must error with CURSOR_NOT_FOUND.
1730            let resp = call(
1731                r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
1732            );
1733            assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1734        });
1735    }
1736
1737    #[test]
1738    fn cursor_close_removes_it() {
1739        let rt = make_runtime();
1740        seed_numbers_table(&rt, "nums4", 3);
1741        with_session(&rt, |call, _| {
1742            let _ = call(
1743                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums4"}}"#,
1744            );
1745            let close =
1746                call(r#"{"jsonrpc":"2.0","id":2,"method":"query.close","params":{"cursor_id":1}}"#);
1747            assert!(close.contains("\"closed\":true"));
1748            let after = call(
1749                r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
1750            );
1751            assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1752        });
1753    }
1754
1755    #[test]
1756    fn cursor_close_unknown_errors() {
1757        let rt = make_runtime();
1758        with_session(&rt, |call, _| {
1759            let resp = call(
1760                r#"{"jsonrpc":"2.0","id":1,"method":"query.close","params":{"cursor_id":9999}}"#,
1761            );
1762            assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1763        });
1764    }
1765
1766    #[test]
1767    fn cursor_next_without_cursor_id_errors() {
1768        let rt = make_runtime();
1769        with_session(&rt, |call, _| {
1770            let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"query.next","params":{}}"#);
1771            assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
1772        });
1773    }
1774
1775    #[test]
1776    fn cursor_default_batch_size_returns_all_when_smaller_than_default() {
1777        let rt = make_runtime();
1778        seed_numbers_table(&rt, "nums5", 7);
1779        with_session(&rt, |call, _| {
1780            let _ = call(
1781                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums5"}}"#,
1782            );
1783            // No batch_size → default 100, table has 7 rows, all in one call.
1784            let resp =
1785                call(r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1}}"#);
1786            assert!(resp.contains("\"done\":true"));
1787            assert!(resp.contains("\"remaining\":0"));
1788        });
1789    }
1790
1791    #[test]
1792    fn close_method_drops_open_cursors() {
1793        let rt = make_runtime();
1794        seed_numbers_table(&rt, "nums6", 3);
1795        // Single session: open a cursor, call close, verify cursor is gone by reopening
1796        // fresh session and attempting to use cursor_id 1.
1797        with_session(&rt, |call, _| {
1798            let _ = call(
1799                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums6"}}"#,
1800            );
1801            let close = call(r#"{"jsonrpc":"2.0","id":2,"method":"close","params":null}"#);
1802            assert!(close.contains("\"__close__\":true"));
1803            // Cursor must be gone after close within the same session.
1804            let after = call(
1805                r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
1806            );
1807            assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
1808        });
1809    }
1810
1811    #[test]
1812    fn cursor_independent_of_transaction_state() {
1813        let rt = make_runtime();
1814        seed_numbers_table(&rt, "nums7", 4);
1815        with_session(&rt, |call, _| {
1816            // Open cursor, begin tx, commit tx — cursor survives.
1817            let _ = call(
1818                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums7"}}"#,
1819            );
1820            let _ = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
1821            let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
1822            let resp = call(
1823                r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
1824            );
1825            assert!(resp.contains("\"done\":true"));
1826            assert!(!resp.contains("\"error\""));
1827        });
1828    }
1829
1830    #[test]
1831    fn second_tx_after_commit_gets_fresh_id() {
1832        let rt = make_runtime();
1833        let _ = handle(
1834            &rt,
1835            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u6 (name TEXT)"}}"#,
1836        );
1837        with_session(&rt, |call, _| {
1838            let first = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
1839            assert!(first.contains("\"tx_id\":1"));
1840            let _ = call(
1841                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u6","payload":{"name":"x"}}}"#,
1842            );
1843            let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
1844
1845            let second = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.begin","params":null}"#);
1846            assert!(second.contains("\"tx_id\":2"));
1847            let _ = call(r#"{"jsonrpc":"2.0","id":5,"method":"tx.rollback","params":null}"#);
1848        });
1849    }
1850
1851    #[test]
1852    fn prepare_and_execute_prepared_statement() {
1853        let rt = make_runtime();
1854        // Create table + insert a row
1855        let _ = handle(
1856            &rt,
1857            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE ps_test (n INTEGER)"}}"#,
1858        );
1859        let _ = handle(
1860            &rt,
1861            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO ps_test (n) VALUES (42)"}}"#,
1862        );
1863
1864        with_session(&rt, |call, _| {
1865            // Prepare a parameterized SELECT.
1866            let prep = call(
1867                r#"{"jsonrpc":"2.0","id":3,"method":"prepare","params":{"sql":"SELECT n FROM ps_test WHERE n = 42"}}"#,
1868            );
1869            assert!(prep.contains("\"prepared_id\""), "prepare response: {prep}");
1870
1871            // Extract the prepared_id.
1872            let id: u64 = {
1873                let v: crate::json::Value = crate::json::from_str(&prep).expect("json");
1874                let result = v.get("result").expect("result");
1875                result
1876                    .get("prepared_id")
1877                    .and_then(|n| n.as_f64())
1878                    .expect("prepared_id") as u64
1879            };
1880
1881            // Execute with the bind value for the parameterized literal.
1882            let exec = call(&format!(
1883                r#"{{"jsonrpc":"2.0","id":4,"method":"execute_prepared","params":{{"prepared_id":{id},"binds":[42]}}}}"#
1884            ));
1885            // Response uses "rows" key (see query_result_to_json).
1886            assert!(
1887                exec.contains("\"rows\""),
1888                "execute_prepared response: {exec}"
1889            );
1890            assert!(exec.contains("42"), "expected row with n=42 in: {exec}");
1891        });
1892    }
1893}