Skip to main content

reddb_server/
rpc_stdio.rs

1//! JSON-RPC 2.0 line-delimited stdio mode for the `red` binary.
2//!
3//! See `PLAN_DRIVERS.md` for the protocol spec. This module is the
4//! sole server-side implementation of the protocol — drivers in
5//! every language target this contract.
6//!
7//! Loop:
8//!   1. Read a line from stdin (UTF-8, terminated by `\n`).
9//!   2. Parse it as a JSON-RPC 2.0 request envelope.
10//!   3. Dispatch on `method` to the runtime.
11//!   4. Serialize the response as a single line on stdout, flush.
12//!   5. Repeat until EOF or `close` method received.
13//!
14//! Errors do not crash the loop. Panics inside a method handler are
15//! caught and reported as `INTERNAL_ERROR` so a buggy query cannot
16//! kill the daemon.
17
18use std::io::{BufRead, BufReader, Stdin, Write};
19use std::panic::AssertUnwindSafe;
20
21use tokio::sync::Mutex as AsyncMutex;
22
23use crate::application::entity::{CreateRowInput, CreateRowsBatchInput};
24use crate::application::ports::RuntimeEntityPort;
25use crate::json::{self as json, Value};
26use crate::runtime::{RedDBRuntime, RuntimeQueryResult};
27use crate::storage::query::unified::UnifiedRecord;
28use crate::storage::schema::Value as SchemaValue;
29use reddb_client_connector::RedDBClient;
30
31/// Which backend the stdio loop is wrapping.
32///
33/// `Local` = the in-process engine (embedded). `Remote` = a tonic client
34/// to a standalone `red server` talking gRPC. The remote variant is
35/// boxed because `RedDBClient` + a `tokio::Runtime` reference is ~248
36/// bytes against `Local`'s ~8 bytes (clippy::large_enum_variant).
37///
38/// The mutex uses `tokio::sync::Mutex` instead of `std::sync::Mutex`
39/// because `dispatch_method_remote` holds the guard across `.await`
40/// points inside `tokio_rt.block_on(...)` — holding a sync mutex
41/// across an await would be a correctness bug in more complex
42/// runtimes.
43enum Backend<'a> {
44    Local(&'a RedDBRuntime),
45    Remote(Box<RemoteBackend<'a>>),
46}
47
48struct RemoteBackend<'a> {
49    client: AsyncMutex<RedDBClient>,
50    tokio_rt: &'a tokio::runtime::Runtime,
51}
52
53/// Protocol version reported by the `version` method.
54pub const PROTOCOL_VERSION: &str = "1.0";
55const STDIO_BULK_INSERT_CHUNK_ROWS: usize = 500;
56
57/// Stable error codes. Drivers map these to idiomatic exceptions.
58pub mod error_code {
59    pub const PARSE_ERROR: &str = "PARSE_ERROR";
60    pub const INVALID_REQUEST: &str = "INVALID_REQUEST";
61    pub const INVALID_PARAMS: &str = "INVALID_PARAMS";
62    pub const QUERY_ERROR: &str = "QUERY_ERROR";
63    pub const NOT_FOUND: &str = "NOT_FOUND";
64    pub const INTERNAL_ERROR: &str = "INTERNAL_ERROR";
65    /// `tx.begin` was called while a transaction was already open in the
66    /// same session.
67    pub const TX_ALREADY_OPEN: &str = "TX_ALREADY_OPEN";
68    /// `tx.commit` / `tx.rollback` was called without a matching
69    /// `tx.begin`.
70    pub const NO_TX_OPEN: &str = "NO_TX_OPEN";
71    /// A buffered statement failed during `tx.commit` replay. The error
72    /// message carries the index of the failing op and the number of
73    /// operations that successfully applied before the failure.
74    pub const TX_REPLAY_FAILED: &str = "TX_REPLAY_FAILED";
75    /// Transactions over the remote gRPC proxy are not supported yet.
76    pub const TX_NOT_SUPPORTED_REMOTE: &str = "TX_NOT_SUPPORTED_REMOTE";
77    /// `query.next` / `query.close` referenced an unknown cursor id.
78    /// Either the cursor was never opened, already closed, or was
79    /// automatically dropped when its rows were exhausted.
80    pub const CURSOR_NOT_FOUND: &str = "CURSOR_NOT_FOUND";
81    /// Too many concurrent cursors open in a single session.
82    pub const CURSOR_LIMIT_EXCEEDED: &str = "CURSOR_LIMIT_EXCEEDED";
83}
84
85/// Maximum number of cursors a single stdio session may hold open
86/// simultaneously. Serves as a memory-pressure guard against runaway
87/// clients that `query.open` without ever closing.
88pub(crate) const MAX_CURSORS_PER_SESSION: usize = 64;
89/// Default batch size for `query.next` when the client does not specify
90/// one explicitly. Tuned for small-to-medium rows; large-row clients
91/// should set a smaller value.
92pub(crate) const DEFAULT_CURSOR_BATCH_SIZE: usize = 100;
93/// Hard upper bound on `query.next` batch size. Prevents a single call
94/// from stalling the stdio loop with a multi-megabyte line.
95pub(crate) const MAX_CURSOR_BATCH_SIZE: usize = 10_000;
96
97// ---------------------------------------------------------------------------
98// Session state (transaction buffer)
99// ---------------------------------------------------------------------------
100//
101// Transactions in the stdio protocol are scoped to a single connection —
102// one process = one session = at most one open transaction. The state
103// lives in the stack of `run_backend` so nothing leaks between
104// connections, and there is no cross-session visibility of buffered
105// writes.
106//
107// Isolation model: `read_committed_deferred`. Reads inside a transaction
108// observe the latest *committed* state; they do **not** see writes the
109// same session has buffered via `insert` / `delete` / `bulk_insert`.
110// Atomicity is best-effort — a global commit lock serializes replays, but
111// auto-committed writes from other sessions may interleave between
112// commits. Strict atomicity requires funnelling every write through a
113// single session.
114
115/// Per-connection session that tracks the currently open transaction
116/// and any active streaming cursors.
117// A server-side prepared statement bound to this session.
118// When parameter_count == 0, shape == the exact plan (no substitution needed).
119struct StdioPreparedStatement {
120    shape: crate::storage::query::ast::QueryExpr,
121    parameter_count: usize,
122}
123
124pub(crate) struct Session {
125    next_tx_id: u64,
126    current_tx: Option<OpenTx>,
127    next_cursor_id: u64,
128    cursors: std::collections::HashMap<u64, Cursor>,
129    /// Monotone counter for prepared statement IDs within this session.
130    next_prepared_id: u64,
131    /// Active prepared statements, keyed by the ID returned to the client.
132    prepared: std::collections::HashMap<u64, StdioPreparedStatement>,
133}
134
135impl Session {
136    pub(crate) fn new() -> Self {
137        Self {
138            next_tx_id: 1,
139            current_tx: None,
140            next_cursor_id: 1,
141            cursors: std::collections::HashMap::new(),
142            next_prepared_id: 1,
143            prepared: std::collections::HashMap::new(),
144        }
145    }
146
147    fn open_tx(&mut self) -> Result<u64, (&'static str, String)> {
148        if let Some(tx) = &self.current_tx {
149            return Err((
150                error_code::TX_ALREADY_OPEN,
151                format!("transaction {} already open in this session", tx.tx_id),
152            ));
153        }
154        let tx_id = self.next_tx_id;
155        self.next_tx_id = self.next_tx_id.saturating_add(1);
156        self.current_tx = Some(OpenTx {
157            tx_id,
158            write_set: Vec::new(),
159        });
160        Ok(tx_id)
161    }
162
163    fn take_tx(&mut self) -> Option<OpenTx> {
164        self.current_tx.take()
165    }
166
167    fn current_tx_mut(&mut self) -> Option<&mut OpenTx> {
168        self.current_tx.as_mut()
169    }
170
171    #[allow(dead_code)]
172    fn has_tx(&self) -> bool {
173        self.current_tx.is_some()
174    }
175
176    /// Register a freshly materialised cursor and return its id.
177    /// Enforces [`MAX_CURSORS_PER_SESSION`] before allocating.
178    fn insert_cursor(&mut self, cursor: Cursor) -> Result<u64, (&'static str, String)> {
179        if self.cursors.len() >= MAX_CURSORS_PER_SESSION {
180            return Err((
181                error_code::CURSOR_LIMIT_EXCEEDED,
182                format!(
183                    "session already holds {} cursors (max {}) — close some before opening new ones",
184                    self.cursors.len(),
185                    MAX_CURSORS_PER_SESSION
186                ),
187            ));
188        }
189        let id = self.next_cursor_id;
190        self.next_cursor_id = self.next_cursor_id.saturating_add(1);
191        let mut cursor = cursor;
192        cursor.cursor_id = id;
193        self.cursors.insert(id, cursor);
194        Ok(id)
195    }
196
197    fn cursor_mut(&mut self, id: u64) -> Option<&mut Cursor> {
198        self.cursors.get_mut(&id)
199    }
200
201    fn drop_cursor(&mut self, id: u64) -> Option<Cursor> {
202        self.cursors.remove(&id)
203    }
204
205    fn clear_cursors(&mut self) {
206        self.cursors.clear();
207    }
208}
209
210impl Default for Session {
211    fn default() -> Self {
212        Self::new()
213    }
214}
215
216/// An in-flight transaction for a single stdio session.
217struct OpenTx {
218    tx_id: u64,
219    write_set: Vec<PendingSql>,
220}
221
222/// A buffered mutation waiting for `tx.commit`. Each variant carries a
223/// ready-to-execute SQL string so the replay loop is a straight
224/// `execute_query` call.
225enum PendingSql {
226    Insert(String),
227    Delete(String),
228    #[allow(dead_code)] // reserved for future query()-in-tx routing
229    Update(String),
230}
231
232impl PendingSql {
233    fn sql(&self) -> &str {
234        match self {
235            PendingSql::Insert(s) | PendingSql::Delete(s) | PendingSql::Update(s) => s,
236        }
237    }
238}
239
240/// An open streaming cursor over a materialised query result.
241///
242/// MVP model: the underlying [`RuntimeQueryResult`] has already been
243/// fully executed at `query.open` time and lives inside the cursor.
244/// Each `query.next` call slices off `batch_size` rows from the tail and
245/// advances `position`. This pays normal memory cost but lets the client
246/// consume the result in chunks, abort mid-stream, or pipeline the next
247/// batch request while processing the previous one.
248///
249/// A future iteration can swap the rows field for a lazy iterator pulled
250/// from the execution engine without changing the wire protocol.
251pub(crate) struct Cursor {
252    cursor_id: u64,
253    columns: Vec<String>,
254    rows: Vec<UnifiedRecord>,
255    position: usize,
256}
257
258impl Cursor {
259    fn new(columns: Vec<String>, rows: Vec<UnifiedRecord>) -> Self {
260        Self {
261            cursor_id: 0, // overwritten by Session::insert_cursor
262            columns,
263            rows,
264            position: 0,
265        }
266    }
267
268    fn total(&self) -> usize {
269        self.rows.len()
270    }
271
272    fn remaining(&self) -> usize {
273        self.rows.len().saturating_sub(self.position)
274    }
275
276    fn is_exhausted(&self) -> bool {
277        self.position >= self.rows.len()
278    }
279
280    /// Extract up to `batch_size` rows from the current position forward.
281    /// Advances the position to the end of the returned slice.
282    fn take_batch(&mut self, batch_size: usize) -> &[UnifiedRecord] {
283        let end = (self.position + batch_size).min(self.rows.len());
284        let slice = &self.rows[self.position..end];
285        self.position = end;
286        slice
287    }
288}
289
290/// Run the stdio JSON-RPC loop against a local in-process runtime.
291///
292/// Returns the process exit code. `0` on normal shutdown (EOF or
293/// explicit `close`). Non-zero only on fatal I/O errors reading
294/// stdin or writing stdout.
295pub fn run(runtime: &RedDBRuntime) -> i32 {
296    run_with_io(runtime, std::io::stdin(), &mut std::io::stdout())
297}
298
299/// Run the stdio JSON-RPC loop as a proxy to a remote gRPC server.
300///
301/// Every method is forwarded via tonic. This is what
302/// `red rpc --stdio --connect grpc://host:port` uses, and it is also
303/// what the JS and Python drivers spawn when the user calls
304/// `connect("grpc://...")`.
305pub fn run_remote(endpoint: &str, token: Option<String>) -> i32 {
306    let tokio_rt = match tokio::runtime::Builder::new_current_thread()
307        .enable_all()
308        .build()
309    {
310        Ok(rt) => rt,
311        Err(e) => {
312            tracing::error!(err = %e, "rpc: failed to build tokio runtime");
313            return 1;
314        }
315    };
316    let client = match tokio_rt.block_on(RedDBClient::connect(endpoint, token)) {
317        Ok(c) => c,
318        Err(e) => {
319            tracing::error!(endpoint, err = %e, "rpc: failed to connect");
320            return 1;
321        }
322    };
323    let backend = Backend::Remote(Box::new(RemoteBackend {
324        client: AsyncMutex::new(client),
325        tokio_rt: &tokio_rt,
326    }));
327    run_backend(&backend, std::io::stdin(), &mut std::io::stdout())
328}
329
330/// Same as [`run`] but takes explicit I/O handles. Used by tests.
331pub fn run_with_io<W: Write>(runtime: &RedDBRuntime, stdin: Stdin, stdout: &mut W) -> i32 {
332    run_backend(&Backend::Local(runtime), stdin, stdout)
333}
334
335/// Per-stdio-session connection-id counter. Each session captures a
336/// unique id so its `tx.commit` BEGIN/COMMIT pair routes to a distinct
337/// `TxnContext` in the runtime — without this every stdio session
338/// would share `conn_id = 0` and trample each other's transactions.
339/// Starts at a high base so we don't collide with PG-wire / gRPC
340/// transports that allocate from their own pools below.
341static STDIO_SESSION_CONN_ID: std::sync::atomic::AtomicU64 =
342    std::sync::atomic::AtomicU64::new(1_000_000);
343
344fn next_stdio_conn_id() -> u64 {
345    STDIO_SESSION_CONN_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
346}
347
348fn run_backend<W: Write>(backend: &Backend<'_>, stdin: Stdin, stdout: &mut W) -> i32 {
349    let reader = BufReader::new(stdin.lock());
350    let mut session = Session::new();
351    // Bind the session to a stable connection id so the runtime's
352    // `tx_contexts` (keyed by conn_id) survives across `handle_line`
353    // calls within the same session.
354    let conn_id = next_stdio_conn_id();
355    crate::runtime::impl_core::set_current_connection_id(conn_id);
356    for line_result in reader.lines() {
357        let line = match line_result {
358            Ok(l) => l,
359            Err(e) => {
360                let _ = writeln!(
361                    stdout,
362                    "{}",
363                    error_response(&Value::Null, error_code::INTERNAL_ERROR, &e.to_string())
364                );
365                let _ = stdout.flush();
366                return 1;
367            }
368        };
369        if line.trim().is_empty() {
370            continue;
371        }
372        let response = handle_line(backend, &mut session, &line);
373        if writeln!(stdout, "{}", response).is_err() || stdout.flush().is_err() {
374            return 1;
375        }
376        if response.contains("\"__close__\":true") {
377            return 0;
378        }
379    }
380    // EOF: silently drop any open transaction — atomicity is preserved
381    // (nothing was ever applied to the store) and no error is surfaced to
382    // the caller because EOF may be graceful client disconnect.
383    let _ = session.take_tx();
384    crate::runtime::impl_core::clear_current_connection_id();
385    0
386}
387
388/// Parse one input line and dispatch. Always returns a single-line
389/// JSON string suitable for direct write to stdout. Never panics
390/// (panics inside handlers are caught and reported).
391fn handle_line(backend: &Backend<'_>, session: &mut Session, line: &str) -> String {
392    let parsed: Value = match json::from_str(line) {
393        Ok(v) => v,
394        Err(err) => {
395            return error_response(
396                &Value::Null,
397                error_code::PARSE_ERROR,
398                &format!("invalid JSON: {err}"),
399            );
400        }
401    };
402
403    let id = parsed.get("id").cloned().unwrap_or(Value::Null);
404
405    let method = match parsed.get("method").and_then(Value::as_str) {
406        Some(m) => m.to_string(),
407        None => {
408            return error_response(&id, error_code::INVALID_REQUEST, "missing 'method' field");
409        }
410    };
411
412    let params = parsed.get("params").cloned().unwrap_or(Value::Null);
413
414    let dispatch = std::panic::catch_unwind(AssertUnwindSafe(|| match backend {
415        Backend::Local(rt) => dispatch_method(rt, session, &method, &params),
416        Backend::Remote(remote) => {
417            // Transactions are session-local and the remote path forwards
418            // each call independently — there is no place to park a tx
419            // handle across gRPC hops yet. Surface a clear error so
420            // drivers can fall back to per-call auto-commit.
421            if matches!(
422                method.as_str(),
423                "tx.begin"
424                    | "tx.commit"
425                    | "tx.rollback"
426                    | "query.open"
427                    | "query.next"
428                    | "query.close"
429            ) {
430                Err((
431                    error_code::TX_NOT_SUPPORTED_REMOTE,
432                    format!("{method} is not supported over remote gRPC yet"),
433                ))
434            } else {
435                dispatch_method_remote(&remote.client, remote.tokio_rt, &method, &params)
436            }
437        }
438    }));
439
440    match dispatch {
441        Ok(Ok(result)) => success_response(&id, &result, method == "close"),
442        Ok(Err((code, msg))) => error_response(&id, code, &msg),
443        Err(_) => error_response(&id, error_code::INTERNAL_ERROR, "handler panicked (caught)"),
444    }
445}
446
447/// Dispatch a parsed method call. Returns the `result` value on
448/// success or `(error_code, message)` on failure.
449fn dispatch_method(
450    runtime: &RedDBRuntime,
451    session: &mut Session,
452    method: &str,
453    params: &Value,
454) -> Result<Value, (&'static str, String)> {
455    match method {
456        "tx.begin" => {
457            let tx_id = session.open_tx()?;
458            Ok(Value::Object(
459                [
460                    ("tx_id".to_string(), Value::Number(tx_id as f64)),
461                    (
462                        "isolation".to_string(),
463                        Value::String("read_committed_deferred".to_string()),
464                    ),
465                ]
466                .into_iter()
467                .collect(),
468            ))
469        }
470
471        "tx.commit" => {
472            let tx = session.take_tx().ok_or((
473                error_code::NO_TX_OPEN,
474                "no transaction is open in this session".to_string(),
475            ))?;
476            let tx_id = tx.tx_id;
477            let op_count = tx.write_set.len();
478
479            // Drive the replay through a real engine transaction so
480            // failures roll back the buffered write_set atomically.
481            // Replaces the legacy `commit_lock`-serialised replay:
482            // cross-session ordering is now provided by the
483            // snapshot-manager's xid allocation, which is what the
484            // SQL `BEGIN`/`COMMIT` path has used since #31.
485            let replay: Result<(u64, usize), (usize, String)> = (|| {
486                runtime
487                    .execute_query("BEGIN")
488                    .map_err(|e| (0usize, format!("BEGIN: {e}")))?;
489                let mut total_affected: u64 = 0;
490                for (idx, op) in tx.write_set.iter().enumerate() {
491                    match runtime.execute_query(op.sql()) {
492                        Ok(qr) => total_affected += qr.affected_rows,
493                        Err(e) => {
494                            let _ = runtime.execute_query("ROLLBACK");
495                            return Err((idx, e.to_string()));
496                        }
497                    }
498                }
499                runtime
500                    .execute_query("COMMIT")
501                    .map_err(|e| (op_count, format!("COMMIT: {e}")))?;
502                Ok((total_affected, op_count))
503            })();
504
505            match replay {
506                Ok((affected, replayed)) => Ok(Value::Object(
507                    [
508                        ("tx_id".to_string(), Value::Number(tx_id as f64)),
509                        ("ops_replayed".to_string(), Value::Number(replayed as f64)),
510                        ("affected".to_string(), Value::Number(affected as f64)),
511                    ]
512                    .into_iter()
513                    .collect(),
514                )),
515                Err((failed_idx, msg)) => Err((
516                    error_code::TX_REPLAY_FAILED,
517                    format!(
518                        "tx {tx_id} replay failed at op {failed_idx}/{op_count}: {msg} \
519                         (ops 0..{failed_idx} already applied and are NOT rolled back)"
520                    ),
521                )),
522            }
523        }
524
525        "query.open" => {
526            let sql = params.get("sql").and_then(Value::as_str).ok_or((
527                error_code::INVALID_PARAMS,
528                "missing 'sql' string".to_string(),
529            ))?;
530            let qr = runtime
531                .execute_query(sql)
532                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
533
534            // Extract the column list from the first record. Consistent
535            // with query_result_to_json which uses the first row's keys
536            // as schema.
537            let columns: Vec<String> = qr
538                .result
539                .records
540                .first()
541                .map(|first| {
542                    let mut keys: Vec<String> = first
543                        .column_names()
544                        .into_iter()
545                        .map(|k| k.to_string())
546                        .collect();
547                    keys.sort();
548                    keys
549                })
550                .unwrap_or_default();
551
552            let cursor = Cursor::new(columns.clone(), qr.result.records);
553            let total = cursor.total();
554            let cursor_id = session.insert_cursor(cursor)?;
555
556            Ok(Value::Object(
557                [
558                    ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
559                    (
560                        "columns".to_string(),
561                        Value::Array(columns.into_iter().map(Value::String).collect()),
562                    ),
563                    ("total_rows".to_string(), Value::Number(total as f64)),
564                ]
565                .into_iter()
566                .collect(),
567            ))
568        }
569
570        "query.next" => {
571            let cursor_id = params
572                .get("cursor_id")
573                .and_then(|v| v.as_f64())
574                .map(|n| n as u64)
575                .ok_or((
576                    error_code::INVALID_PARAMS,
577                    "missing 'cursor_id' number".to_string(),
578                ))?;
579            let batch_size = params
580                .get("batch_size")
581                .and_then(|v| v.as_f64())
582                .map(|n| n as usize)
583                .unwrap_or(DEFAULT_CURSOR_BATCH_SIZE)
584                .clamp(1, MAX_CURSOR_BATCH_SIZE);
585
586            // Extract the batch inside a bounded borrow so we can
587            // drop the cursor afterwards without borrow-conflict.
588            let (rows, done, remaining) = {
589                let cursor = session.cursor_mut(cursor_id).ok_or((
590                    error_code::CURSOR_NOT_FOUND,
591                    format!("cursor {cursor_id} not found"),
592                ))?;
593                let batch = cursor.take_batch(batch_size);
594                let rows_json: Vec<Value> = batch.iter().map(record_to_json_object).collect();
595                (rows_json, cursor.is_exhausted(), cursor.remaining())
596            };
597
598            if done {
599                // Auto-drop exhausted cursors so long-lived sessions
600                // don't accumulate dead state.
601                let _ = session.drop_cursor(cursor_id);
602            }
603
604            Ok(Value::Object(
605                [
606                    ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
607                    ("rows".to_string(), Value::Array(rows)),
608                    ("done".to_string(), Value::Bool(done)),
609                    ("remaining".to_string(), Value::Number(remaining as f64)),
610                ]
611                .into_iter()
612                .collect(),
613            ))
614        }
615
616        "query.close" => {
617            let cursor_id = params
618                .get("cursor_id")
619                .and_then(|v| v.as_f64())
620                .map(|n| n as u64)
621                .ok_or((
622                    error_code::INVALID_PARAMS,
623                    "missing 'cursor_id' number".to_string(),
624                ))?;
625            let existed = session.drop_cursor(cursor_id).is_some();
626            if !existed {
627                return Err((
628                    error_code::CURSOR_NOT_FOUND,
629                    format!("cursor {cursor_id} not found"),
630                ));
631            }
632            Ok(Value::Object(
633                [
634                    ("cursor_id".to_string(), Value::Number(cursor_id as f64)),
635                    ("closed".to_string(), Value::Bool(true)),
636                ]
637                .into_iter()
638                .collect(),
639            ))
640        }
641
642        "tx.rollback" => {
643            let tx = session.take_tx().ok_or((
644                error_code::NO_TX_OPEN,
645                "no transaction is open in this session".to_string(),
646            ))?;
647            let ops_discarded = tx.write_set.len();
648            Ok(Value::Object(
649                [
650                    ("tx_id".to_string(), Value::Number(tx.tx_id as f64)),
651                    (
652                        "ops_discarded".to_string(),
653                        Value::Number(ops_discarded as f64),
654                    ),
655                ]
656                .into_iter()
657                .collect(),
658            ))
659        }
660
661        "version" => Ok(Value::Object(
662            [
663                (
664                    "version".to_string(),
665                    Value::String(env!("CARGO_PKG_VERSION").to_string()),
666                ),
667                (
668                    "protocol".to_string(),
669                    Value::String(PROTOCOL_VERSION.to_string()),
670                ),
671            ]
672            .into_iter()
673            .collect(),
674        )),
675
676        "health" => Ok(Value::Object(
677            [
678                ("ok".to_string(), Value::Bool(true)),
679                (
680                    "version".to_string(),
681                    Value::String(env!("CARGO_PKG_VERSION").to_string()),
682                ),
683            ]
684            .into_iter()
685            .collect(),
686        )),
687
688        "query" => {
689            let sql = params.get("sql").and_then(Value::as_str).ok_or((
690                error_code::INVALID_PARAMS,
691                "missing 'sql' string".to_string(),
692            ))?;
693
694            // Optional positional `$N` bind parameters (#353 tracer slice).
695            // Absence preserves the legacy single-arg `query(sql)` path.
696            let bind_values: Option<Vec<SchemaValue>> = params
697                .get("params")
698                .map(|v| {
699                    v.as_array()
700                        .ok_or((
701                            error_code::INVALID_PARAMS,
702                            "'params' must be an array".to_string(),
703                        ))
704                        .map(|arr| arr.iter().map(json_value_to_schema_value).collect())
705                })
706                .transpose()?;
707
708            if let Some(binds) = bind_values {
709                use crate::storage::query::modes::parse_multi;
710                use crate::storage::query::user_params;
711                let parsed =
712                    parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
713                let bound = user_params::bind(&parsed, &binds)
714                    .map_err(|e| (error_code::INVALID_PARAMS, e.to_string()))?;
715                let qr = runtime
716                    .execute_query_expr(bound)
717                    .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
718                return Ok(query_result_to_json(&qr));
719            }
720
721            let qr = runtime
722                .execute_query(sql)
723                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
724            Ok(query_result_to_json(&qr))
725        }
726
727        // ── Prepared statements ──────────────────────────────────────────────
728        //
729        // `prepare` parses the SQL once, extracts a parameterized shape, and
730        // returns a `prepared_id` the client can reuse. `execute_prepared` takes
731        // that id plus JSON-encoded bind values and runs the plan without parsing.
732        //
733        // This mirrors the PostgreSQL extended-query protocol semantics and is the
734        // server-side half of the client driver's `PreparedStatement` abstraction.
735        "prepare" => {
736            use crate::storage::query::modes::parse_multi;
737            use crate::storage::query::planner::shape::parameterize_query_expr;
738
739            let sql = params.get("sql").and_then(Value::as_str).ok_or((
740                error_code::INVALID_PARAMS,
741                "missing 'sql' string".to_string(),
742            ))?;
743            let parsed = parse_multi(sql).map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
744            let (shape, parameter_count) = if let Some(prepared) = parameterize_query_expr(&parsed)
745            {
746                (prepared.shape, prepared.parameter_count)
747            } else {
748                (parsed, 0)
749            };
750            let id = session.next_prepared_id;
751            session.next_prepared_id = session.next_prepared_id.saturating_add(1);
752            session.prepared.insert(
753                id,
754                StdioPreparedStatement {
755                    shape,
756                    parameter_count,
757                },
758            );
759            Ok(Value::Object(
760                [
761                    ("prepared_id".to_string(), Value::Number(id as f64)),
762                    (
763                        "parameter_count".to_string(),
764                        Value::Number(parameter_count as f64),
765                    ),
766                ]
767                .into_iter()
768                .collect(),
769            ))
770        }
771
772        "execute_prepared" => {
773            use crate::storage::query::planner::shape::bind_parameterized_query;
774            use crate::storage::schema::Value as SV;
775
776            let id = params
777                .get("prepared_id")
778                .and_then(Value::as_f64)
779                .map(|n| n as u64)
780                .ok_or((
781                    error_code::INVALID_PARAMS,
782                    "missing 'prepared_id'".to_string(),
783                ))?;
784
785            let stmt = session.prepared.get(&id).ok_or((
786                error_code::QUERY_ERROR,
787                format!("no prepared statement with id {id}"),
788            ))?;
789
790            // Parse bind values from JSON array of JSON-encoded literals.
791            let binds_json: Vec<Value> = params
792                .get("binds")
793                .and_then(Value::as_array)
794                .map(|a| a.to_vec())
795                .unwrap_or_default();
796            if binds_json.len() != stmt.parameter_count {
797                return Err((
798                    error_code::INVALID_PARAMS,
799                    format!(
800                        "expected {} bind values, got {}",
801                        stmt.parameter_count,
802                        binds_json.len()
803                    ),
804                ));
805            }
806
807            // Convert JSON bind values to SchemaValue.
808            let binds: Vec<SV> = binds_json.iter().map(json_value_to_schema_value).collect();
809
810            // Bind literals into the parameterized shape.
811            let expr = if stmt.parameter_count == 0 {
812                stmt.shape.clone()
813            } else {
814                bind_parameterized_query(&stmt.shape, &binds, stmt.parameter_count)
815                    .ok_or((error_code::QUERY_ERROR, "bind failed".to_string()))?
816            };
817
818            let qr = runtime
819                .execute_query_expr(expr)
820                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
821            Ok(query_result_to_json(&qr))
822        }
823
824        "insert" => {
825            let collection = params.get("collection").and_then(Value::as_str).ok_or((
826                error_code::INVALID_PARAMS,
827                "missing 'collection' string".to_string(),
828            ))?;
829            let payload = params.get("payload").ok_or((
830                error_code::INVALID_PARAMS,
831                "missing 'payload' object".to_string(),
832            ))?;
833            let payload_obj = payload.as_object().ok_or((
834                error_code::INVALID_PARAMS,
835                "'payload' must be a JSON object".to_string(),
836            ))?;
837            if let Some(tx) = session.current_tx_mut() {
838                let sql = build_insert_sql(collection, payload_obj.iter());
839                tx.write_set.push(PendingSql::Insert(sql));
840                return Ok(pending_tx_response(tx.tx_id));
841            }
842
843            let output = runtime
844                .create_row(flat_payload_to_row_input(collection, payload_obj))
845                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
846            let mut out = json::Map::new();
847            out.insert("affected".to_string(), Value::Number(1.0));
848            out.insert("id".to_string(), Value::String(output.id.raw().to_string()));
849            Ok(Value::Object(out))
850        }
851
852        "bulk_insert" => {
853            let collection = params.get("collection").and_then(Value::as_str).ok_or((
854                error_code::INVALID_PARAMS,
855                "missing 'collection' string".to_string(),
856            ))?;
857            let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
858                error_code::INVALID_PARAMS,
859                "missing 'payloads' array".to_string(),
860            ))?;
861
862            let mut objects = Vec::with_capacity(payloads.len());
863            for entry in payloads {
864                objects.push(entry.as_object().ok_or((
865                    error_code::INVALID_PARAMS,
866                    "each payload must be a JSON object".to_string(),
867                ))?);
868            }
869
870            if let Some(tx) = session.current_tx_mut() {
871                let mut buffered: u64 = 0;
872                for obj in &objects {
873                    let sql = build_insert_sql(collection, obj.iter());
874                    tx.write_set.push(PendingSql::Insert(sql));
875                    buffered += 1;
876                }
877                let tx_id = tx.tx_id;
878                return Ok(Value::Object(
879                    [
880                        ("affected".to_string(), Value::Number(0.0)),
881                        ("buffered".to_string(), Value::Number(buffered as f64)),
882                        ("pending".to_string(), Value::Bool(true)),
883                        ("tx_id".to_string(), Value::Number(tx_id as f64)),
884                    ]
885                    .into_iter()
886                    .collect(),
887                ));
888            }
889
890            if should_bulk_insert_graph(runtime, collection, &objects) {
891                return bulk_insert_graph(runtime, collection, &objects)
892                    .map_err(|e| (error_code::QUERY_ERROR, e.to_string()));
893            }
894
895            let mut total_affected: u64 = 0;
896            let mut ids = Vec::with_capacity(objects.len());
897            for chunk in objects.chunks(STDIO_BULK_INSERT_CHUNK_ROWS) {
898                let rows = chunk
899                    .iter()
900                    .map(|obj| flat_payload_to_row_input(collection, obj))
901                    .collect();
902                let outputs = runtime
903                    .create_rows_batch(CreateRowsBatchInput {
904                        collection: collection.to_string(),
905                        rows,
906                        suppress_events: false,
907                    })
908                    .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
909                total_affected += outputs.len() as u64;
910                ids.extend(
911                    outputs
912                        .into_iter()
913                        .map(|output| Value::String(output.id.raw().to_string())),
914                );
915            }
916            let mut out = json::Map::new();
917            out.insert("affected".to_string(), Value::Number(total_affected as f64));
918            out.insert("ids".to_string(), Value::Array(ids));
919            Ok(Value::Object(out))
920        }
921
922        "get" => {
923            let collection = params.get("collection").and_then(Value::as_str).ok_or((
924                error_code::INVALID_PARAMS,
925                "missing 'collection' string".to_string(),
926            ))?;
927            let id = params.get("id").and_then(Value::as_str).ok_or((
928                error_code::INVALID_PARAMS,
929                "missing 'id' string".to_string(),
930            ))?;
931            let sql = format!("SELECT * FROM {collection} WHERE red_entity_id = {id} LIMIT 1");
932            let qr = runtime
933                .execute_query(&sql)
934                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
935            let entity = qr
936                .result
937                .records
938                .first()
939                .map(record_to_json_object)
940                .unwrap_or(Value::Null);
941            Ok(Value::Object(
942                [("entity".to_string(), entity)].into_iter().collect(),
943            ))
944        }
945
946        "delete" => {
947            let collection = params.get("collection").and_then(Value::as_str).ok_or((
948                error_code::INVALID_PARAMS,
949                "missing 'collection' string".to_string(),
950            ))?;
951            let id = params.get("id").and_then(Value::as_str).ok_or((
952                error_code::INVALID_PARAMS,
953                "missing 'id' string".to_string(),
954            ))?;
955            let sql = format!("DELETE FROM {collection} WHERE red_entity_id = {id}");
956
957            if let Some(tx) = session.current_tx_mut() {
958                tx.write_set.push(PendingSql::Delete(sql));
959                return Ok(pending_tx_response(tx.tx_id));
960            }
961
962            let qr = runtime
963                .execute_query(&sql)
964                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
965            Ok(Value::Object(
966                [(
967                    "affected".to_string(),
968                    Value::Number(qr.affected_rows as f64),
969                )]
970                .into_iter()
971                .collect(),
972            ))
973        }
974
975        "close" => {
976            // Silently drop any open transaction and cursors on close.
977            // The client explicitly asked to terminate; surfacing an
978            // error here would leak state across what is effectively a
979            // reset.
980            let _ = session.take_tx();
981            session.clear_cursors();
982            let _ = runtime.checkpoint();
983            Ok(Value::Null)
984        }
985
986        // Auth surface — local stdio bridge has no auth backend
987        // (the spawned binary inherits the caller's privileges by
988        // construction). The remote bridge below maps these methods
989        // onto the gRPC server's auth endpoints.
990        "auth.login"
991        | "auth.whoami"
992        | "auth.change_password"
993        | "auth.create_api_key"
994        | "auth.revoke_api_key" => {
995            let _ = (session, params);
996            Err((
997                error_code::INVALID_REQUEST,
998                format!(
999                    "{method}: auth methods are only available on grpc:// connections; \
1000                     embedded modes (memory://, file://) inherit caller privileges"
1001                ),
1002            ))
1003        }
1004
1005        other => Err((
1006            error_code::INVALID_REQUEST,
1007            format!("unknown method: {other}"),
1008        )),
1009    }
1010}
1011
1012// ---------------------------------------------------------------------------
1013// Response builders
1014// ---------------------------------------------------------------------------
1015
1016fn success_response(id: &Value, result: &Value, is_close: bool) -> String {
1017    // For `close` we tag the response so the loop knows to exit after
1018    // flushing. The tag is stripped from the wire by replacing it
1019    // before serialization — actually we just include it as a sentinel
1020    // field that drivers ignore (forward compat).
1021    let mut envelope = json::Map::new();
1022    envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1023    envelope.insert("id".to_string(), id.clone());
1024    envelope.insert("result".to_string(), result.clone());
1025    if is_close {
1026        envelope.insert("__close__".to_string(), Value::Bool(true));
1027    }
1028    Value::Object(envelope).to_string_compact()
1029}
1030
1031fn error_response(id: &Value, code: &str, message: &str) -> String {
1032    let mut err = json::Map::new();
1033    err.insert("code".to_string(), Value::String(code.to_string()));
1034    err.insert("message".to_string(), Value::String(message.to_string()));
1035    err.insert("data".to_string(), Value::Null);
1036
1037    let mut envelope = json::Map::new();
1038    envelope.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1039    envelope.insert("id".to_string(), id.clone());
1040    envelope.insert("error".to_string(), Value::Object(err));
1041    Value::Object(envelope).to_string_compact()
1042}
1043
1044// ---------------------------------------------------------------------------
1045// Helpers
1046// ---------------------------------------------------------------------------
1047
1048/// Envelope returned by `insert` and `delete` when the call was buffered
1049/// into an open transaction instead of being auto-committed.
1050fn pending_tx_response(tx_id: u64) -> Value {
1051    Value::Object(
1052        [
1053            ("affected".to_string(), Value::Number(0.0)),
1054            ("pending".to_string(), Value::Bool(true)),
1055            ("tx_id".to_string(), Value::Number(tx_id as f64)),
1056        ]
1057        .into_iter()
1058        .collect(),
1059    )
1060}
1061
1062pub(crate) fn build_insert_sql<'a, I>(collection: &str, fields: I) -> String
1063where
1064    I: Iterator<Item = (&'a String, &'a Value)>,
1065{
1066    let mut cols = Vec::new();
1067    let mut vals = Vec::new();
1068    for (k, v) in fields {
1069        cols.push(k.clone());
1070        vals.push(value_to_sql_literal(v));
1071    }
1072    format!(
1073        "INSERT INTO {collection} ({}) VALUES ({})",
1074        cols.join(", "),
1075        vals.join(", "),
1076    )
1077}
1078
1079fn flat_payload_to_row_input(
1080    collection: &str,
1081    payload: &json::Map<String, Value>,
1082) -> CreateRowInput {
1083    CreateRowInput {
1084        collection: collection.to_string(),
1085        fields: payload
1086            .iter()
1087            .map(|(key, value)| (key.clone(), json_value_to_schema_value(value)))
1088            .collect(),
1089        metadata: Vec::new(),
1090        node_links: Vec::new(),
1091        vector_links: Vec::new(),
1092    }
1093}
1094
1095fn bulk_insert_chunk_count(row_count: usize) -> usize {
1096    if row_count == 0 {
1097        0
1098    } else {
1099        ((row_count - 1) / STDIO_BULK_INSERT_CHUNK_ROWS) + 1
1100    }
1101}
1102
1103pub(crate) fn should_bulk_insert_graph(
1104    runtime: &RedDBRuntime,
1105    collection: &str,
1106    payloads: &[&json::Map<String, Value>],
1107) -> bool {
1108    let graph_shaped = payloads
1109        .iter()
1110        .all(|payload| payload.get("label").and_then(Value::as_str).is_some());
1111    if !graph_shaped {
1112        return false;
1113    }
1114
1115    matches!(
1116        runtime
1117            .db()
1118            .catalog_model_snapshot()
1119            .collections
1120            .iter()
1121            .find(|descriptor| descriptor.name == collection)
1122            .map(|descriptor| descriptor.declared_model.unwrap_or(descriptor.model)),
1123        Some(crate::catalog::CollectionModel::Graph | crate::catalog::CollectionModel::Mixed)
1124    )
1125}
1126
1127pub(crate) fn bulk_insert_graph(
1128    runtime: &RedDBRuntime,
1129    collection: &str,
1130    payloads: &[&json::Map<String, Value>],
1131) -> crate::RedDBResult<Value> {
1132    use crate::application::entity_payload::{parse_create_edge_input, parse_create_node_input};
1133    use crate::application::ports::RuntimeEntityPort;
1134
1135    let mut ids = Vec::with_capacity(payloads.len());
1136    for payload in payloads {
1137        let input_payload = normalize_flat_graph_payload(payload);
1138        let id = if payload.contains_key("from") || payload.contains_key("to") {
1139            runtime
1140                .create_edge(parse_create_edge_input(
1141                    collection.to_string(),
1142                    &input_payload,
1143                )?)?
1144                .id
1145        } else {
1146            runtime
1147                .create_node(parse_create_node_input(
1148                    collection.to_string(),
1149                    &input_payload,
1150                )?)?
1151                .id
1152        };
1153        ids.push(Value::Number(id.raw() as f64));
1154    }
1155
1156    let mut out = json::Map::new();
1157    out.insert("affected".to_string(), Value::Number(ids.len() as f64));
1158    out.insert("ids".to_string(), Value::Array(ids));
1159    Ok(Value::Object(out))
1160}
1161
1162fn normalize_flat_graph_payload(payload: &json::Map<String, Value>) -> Value {
1163    if payload.contains_key("properties") || payload.contains_key("fields") {
1164        return Value::Object(payload.clone());
1165    }
1166
1167    let is_edge = payload.contains_key("from") || payload.contains_key("to");
1168    let mut normalized = payload.clone();
1169    let mut properties = json::Map::new();
1170    for (key, value) in payload {
1171        let reserved = if is_edge {
1172            matches!(
1173                key.as_str(),
1174                "label"
1175                    | "from"
1176                    | "to"
1177                    | "weight"
1178                    | "metadata"
1179                    | "properties"
1180                    | "fields"
1181                    | "_ttl_ms"
1182                    | "_expires_at"
1183            )
1184        } else {
1185            matches!(
1186                key.as_str(),
1187                "label"
1188                    | "node_type"
1189                    | "metadata"
1190                    | "links"
1191                    | "embeddings"
1192                    | "properties"
1193                    | "fields"
1194                    | "_ttl_ms"
1195                    | "_expires_at"
1196            )
1197        };
1198        if !reserved {
1199            properties.insert(key.clone(), value.clone());
1200        }
1201    }
1202    if !properties.is_empty() {
1203        normalized.insert("properties".to_string(), Value::Object(properties));
1204    }
1205    Value::Object(normalized)
1206}
1207
1208pub(crate) fn value_to_sql_literal(v: &Value) -> String {
1209    match v {
1210        Value::Null => "NULL".to_string(),
1211        Value::Bool(b) => b.to_string(),
1212        Value::Number(n) => {
1213            if n.fract() == 0.0 {
1214                format!("{}", *n as i64)
1215            } else {
1216                n.to_string()
1217            }
1218        }
1219        Value::String(s) => format!("'{}'", s.replace('\'', "''")),
1220        other => format!("'{}'", other.to_string_compact().replace('\'', "''")),
1221    }
1222}
1223
1224pub(crate) fn query_result_to_json(qr: &RuntimeQueryResult) -> Value {
1225    if let Some(ask) = ask_query_result_to_json(qr) {
1226        return ask;
1227    }
1228
1229    let mut envelope = json::Map::new();
1230    envelope.insert(
1231        "statement".to_string(),
1232        Value::String(qr.statement_type.to_string()),
1233    );
1234    envelope.insert(
1235        "affected".to_string(),
1236        Value::Number(qr.affected_rows as f64),
1237    );
1238
1239    let mut columns = Vec::new();
1240    if let Some(first) = qr.result.records.first() {
1241        let mut keys: Vec<String> = first
1242            .column_names()
1243            .into_iter()
1244            .map(|k| k.to_string())
1245            .collect();
1246        keys.sort();
1247        columns = keys.into_iter().map(Value::String).collect();
1248    }
1249    envelope.insert("columns".to_string(), Value::Array(columns));
1250
1251    let rows: Vec<Value> = qr
1252        .result
1253        .records
1254        .iter()
1255        .map(record_to_json_object)
1256        .collect();
1257    envelope.insert("rows".to_string(), Value::Array(rows));
1258
1259    Value::Object(envelope)
1260}
1261
1262fn ask_query_result_to_json(qr: &RuntimeQueryResult) -> Option<Value> {
1263    if qr.statement != "ask" {
1264        return None;
1265    }
1266    let row = qr.result.records.first()?;
1267    let answer = text_field(row, "answer")?;
1268    let provider = text_field(row, "provider").unwrap_or_default();
1269    let model = text_field(row, "model").unwrap_or_default();
1270    let sources_flat_json = json_field(row, "sources_flat").unwrap_or(Value::Array(Vec::new()));
1271    let citations_json = json_field(row, "citations").unwrap_or(Value::Array(Vec::new()));
1272    let validation_json = json_field(row, "validation").unwrap_or(Value::Object(json::Map::new()));
1273
1274    let effective_mode = match text_field(row, "mode").as_deref() {
1275        Some("lenient") => crate::runtime::ai::ask_response_envelope::Mode::Lenient,
1276        _ => crate::runtime::ai::ask_response_envelope::Mode::Strict,
1277    };
1278
1279    let result = crate::runtime::ai::ask_response_envelope::AskResult {
1280        answer,
1281        sources_flat: envelope_sources_flat(&sources_flat_json),
1282        citations: envelope_citations(&citations_json),
1283        validation: envelope_validation(&validation_json),
1284        cache_hit: bool_field(row, "cache_hit").unwrap_or(false),
1285        provider,
1286        model,
1287        prompt_tokens: u32_field(row, "prompt_tokens").unwrap_or(0),
1288        completion_tokens: u32_field(row, "completion_tokens").unwrap_or(0),
1289        cost_usd: f64_field(row, "cost_usd").unwrap_or(0.0),
1290        effective_mode,
1291        retry_count: u32_field(row, "retry_count").unwrap_or(0),
1292    };
1293    Some(crate::runtime::ai::ask_response_envelope::build(&result))
1294}
1295
1296fn record_field<'a>(record: &'a UnifiedRecord, name: &str) -> Option<&'a SchemaValue> {
1297    record
1298        .iter_fields()
1299        .find_map(|(key, value)| (key.as_ref() == name).then_some(value))
1300}
1301
1302fn text_field(record: &UnifiedRecord, name: &str) -> Option<String> {
1303    match record_field(record, name)? {
1304        SchemaValue::Text(s) => Some(s.to_string()),
1305        SchemaValue::Email(s)
1306        | SchemaValue::Url(s)
1307        | SchemaValue::NodeRef(s)
1308        | SchemaValue::EdgeRef(s) => Some(s.clone()),
1309        other => Some(format!("{other}")),
1310    }
1311}
1312
1313fn u32_field(record: &UnifiedRecord, name: &str) -> Option<u32> {
1314    match record_field(record, name)? {
1315        SchemaValue::Integer(n) => (*n >= 0).then_some((*n).min(u32::MAX as i64) as u32),
1316        SchemaValue::UnsignedInteger(n) => Some((*n).min(u32::MAX as u64) as u32),
1317        SchemaValue::BigInt(n)
1318        | SchemaValue::TimestampMs(n)
1319        | SchemaValue::Timestamp(n)
1320        | SchemaValue::Duration(n)
1321        | SchemaValue::Decimal(n) => (*n >= 0).then_some((*n).min(u32::MAX as i64) as u32),
1322        SchemaValue::Float(n) => (*n >= 0.0).then_some((*n).min(u32::MAX as f64) as u32),
1323        _ => None,
1324    }
1325}
1326
1327fn f64_field(record: &UnifiedRecord, name: &str) -> Option<f64> {
1328    match record_field(record, name)? {
1329        SchemaValue::Integer(n) => Some(*n as f64),
1330        SchemaValue::UnsignedInteger(n) => Some(*n as f64),
1331        SchemaValue::BigInt(n)
1332        | SchemaValue::TimestampMs(n)
1333        | SchemaValue::Timestamp(n)
1334        | SchemaValue::Duration(n)
1335        | SchemaValue::Decimal(n) => Some(*n as f64),
1336        SchemaValue::Float(n) => Some(*n),
1337        _ => None,
1338    }
1339}
1340
1341fn bool_field(record: &UnifiedRecord, name: &str) -> Option<bool> {
1342    match record_field(record, name)? {
1343        SchemaValue::Boolean(value) => Some(*value),
1344        _ => None,
1345    }
1346}
1347
1348fn json_field(record: &UnifiedRecord, name: &str) -> Option<Value> {
1349    match record_field(record, name)? {
1350        SchemaValue::Json(bytes) => json::from_slice(bytes).ok(),
1351        SchemaValue::Text(text) => json::from_str(text).ok(),
1352        _ => None,
1353    }
1354}
1355
1356fn envelope_sources_flat(
1357    value: &Value,
1358) -> Vec<crate::runtime::ai::ask_response_envelope::SourceRow> {
1359    value
1360        .as_array()
1361        .unwrap_or(&[])
1362        .iter()
1363        .filter_map(|source| {
1364            let urn = source.get("urn").and_then(Value::as_str)?.to_string();
1365            let payload = source
1366                .get("payload")
1367                .and_then(Value::as_str)
1368                .map(ToString::to_string)
1369                .unwrap_or_else(|| source.to_string_compact());
1370            Some(crate::runtime::ai::ask_response_envelope::SourceRow { urn, payload })
1371        })
1372        .collect()
1373}
1374
1375fn envelope_citations(value: &Value) -> Vec<crate::runtime::ai::ask_response_envelope::Citation> {
1376    value
1377        .as_array()
1378        .unwrap_or(&[])
1379        .iter()
1380        .filter_map(|citation| {
1381            let marker = citation.get("marker").and_then(Value::as_u64)?;
1382            let urn = citation.get("urn").and_then(Value::as_str)?.to_string();
1383            Some(crate::runtime::ai::ask_response_envelope::Citation {
1384                marker: marker.min(u32::MAX as u64) as u32,
1385                urn,
1386            })
1387        })
1388        .collect()
1389}
1390
1391fn envelope_validation(value: &Value) -> crate::runtime::ai::ask_response_envelope::Validation {
1392    crate::runtime::ai::ask_response_envelope::Validation {
1393        ok: value.get("ok").and_then(Value::as_bool).unwrap_or(true),
1394        warnings: validation_items(value, "warnings")
1395            .into_iter()
1396            .map(
1397                |(kind, detail)| crate::runtime::ai::ask_response_envelope::ValidationWarning {
1398                    kind,
1399                    detail,
1400                },
1401            )
1402            .collect(),
1403        errors: validation_items(value, "errors")
1404            .into_iter()
1405            .map(
1406                |(kind, detail)| crate::runtime::ai::ask_response_envelope::ValidationError {
1407                    kind,
1408                    detail,
1409                },
1410            )
1411            .collect(),
1412    }
1413}
1414
1415fn validation_items(value: &Value, key: &str) -> Vec<(String, String)> {
1416    value
1417        .get(key)
1418        .and_then(Value::as_array)
1419        .unwrap_or(&[])
1420        .iter()
1421        .filter_map(|item| {
1422            Some((
1423                item.get("kind").and_then(Value::as_str)?.to_string(),
1424                item.get("detail")
1425                    .and_then(Value::as_str)
1426                    .unwrap_or("")
1427                    .to_string(),
1428            ))
1429        })
1430        .collect()
1431}
1432
1433pub(crate) fn insert_result_to_json(qr: &RuntimeQueryResult) -> Value {
1434    let mut envelope = json::Map::new();
1435    envelope.insert(
1436        "affected".to_string(),
1437        Value::Number(qr.affected_rows as f64),
1438    );
1439    // First row of the result, if any, contains the inserted entity id.
1440    if let Some(first) = qr.result.records.first() {
1441        if let Some(id_val) = first
1442            .iter_fields()
1443            .find(|(k, _)| {
1444                let s: &str = k;
1445                s == "_entity_id"
1446            })
1447            .map(|(_, v)| schema_value_to_json(v))
1448        {
1449            envelope.insert("id".to_string(), id_val);
1450        }
1451    }
1452    Value::Object(envelope)
1453}
1454
1455fn record_to_json_object(record: &UnifiedRecord) -> Value {
1456    let mut map = json::Map::new();
1457    // iter_fields merges the columnar fast-path + HashMap so scan
1458    // rows (columnar only) contribute their values.
1459    let mut entries: Vec<(&str, &SchemaValue)> =
1460        record.iter_fields().map(|(k, v)| (k.as_ref(), v)).collect();
1461    entries.sort_by(|a, b| a.0.cmp(b.0));
1462    for (k, v) in entries {
1463        map.insert(k.to_string(), schema_value_to_json(v));
1464    }
1465    Value::Object(map)
1466}
1467
1468fn schema_value_to_json(v: &SchemaValue) -> Value {
1469    match v {
1470        SchemaValue::Null => Value::Null,
1471        SchemaValue::Boolean(b) => Value::Bool(*b),
1472        SchemaValue::Integer(n) => Value::Number(*n as f64),
1473        SchemaValue::UnsignedInteger(n) => Value::Number(*n as f64),
1474        SchemaValue::Float(n) if n.is_finite() => Value::Number(*n),
1475        SchemaValue::Float(n) => {
1476            let token = if n.is_nan() {
1477                "NaN"
1478            } else if n.is_sign_positive() {
1479                "Infinity"
1480            } else {
1481                "-Infinity"
1482            };
1483            single_key_object("$float", Value::String(token.to_string()))
1484        }
1485        SchemaValue::BigInt(n) => Value::Number(*n as f64),
1486        SchemaValue::TimestampMs(n) | SchemaValue::Duration(n) | SchemaValue::Decimal(n) => {
1487            Value::Number(*n as f64)
1488        }
1489        SchemaValue::Timestamp(n) => single_key_object("$ts", Value::String(n.to_string())),
1490        SchemaValue::Password(_) | SchemaValue::Secret(_) => Value::String("***".to_string()),
1491        SchemaValue::Text(s) => Value::String(s.to_string()),
1492        SchemaValue::Blob(bytes) => {
1493            single_key_object("$bytes", Value::String(base64_encode(bytes)))
1494        }
1495        SchemaValue::Json(bytes) => {
1496            crate::presentation::entity_json::storage_json_bytes_to_json(bytes)
1497        }
1498        SchemaValue::Uuid(bytes) => single_key_object("$uuid", Value::String(format_uuid(bytes))),
1499        SchemaValue::Email(s)
1500        | SchemaValue::Url(s)
1501        | SchemaValue::NodeRef(s)
1502        | SchemaValue::EdgeRef(s) => Value::String(s.clone()),
1503        other => Value::String(format!("{other}")),
1504    }
1505}
1506
1507fn single_key_object(key: &str, value: Value) -> Value {
1508    Value::Object([(key.to_string(), value)].into_iter().collect())
1509}
1510
1511/// Convert a JSON `Value` to a `SchemaValue` for use as a bind parameter
1512/// in a prepared statement. JSON-RPC envelopes preserve values that
1513/// ordinary JSON cannot represent losslessly.
1514pub(crate) fn json_value_to_schema_value(v: &Value) -> SchemaValue {
1515    match v {
1516        Value::Null => SchemaValue::Null,
1517        Value::Bool(b) => SchemaValue::Boolean(*b),
1518        Value::Number(n) => {
1519            if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
1520                SchemaValue::Integer(*n as i64)
1521            } else {
1522                SchemaValue::Float(*n)
1523            }
1524        }
1525        Value::String(s) => SchemaValue::text(s.clone()),
1526        Value::Array(items) => {
1527            // A JSON array of numbers (or empty) is taken as `Vector`
1528            // for the #355 query-param contract. Other arrays are
1529            // JSON values, so JSON columns can bind array payloads.
1530            if items.iter().all(|v| matches!(v, Value::Number(_))) {
1531                let floats: Vec<f32> = items
1532                    .iter()
1533                    .map(|v| v.as_f64().unwrap_or(0.0) as f32)
1534                    .collect();
1535                SchemaValue::Vector(floats)
1536            } else {
1537                SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default())
1538            }
1539        }
1540        Value::Object(map) => {
1541            if map.len() == 1 {
1542                if let Some(Value::String(encoded)) = map.get("$bytes") {
1543                    if let Ok(bytes) = base64_decode(encoded) {
1544                        return SchemaValue::Blob(bytes);
1545                    }
1546                }
1547                if let Some(value) = map.get("$ts") {
1548                    if let Some(ts) = json_i64(value) {
1549                        return SchemaValue::Timestamp(ts);
1550                    }
1551                }
1552                if let Some(Value::String(value)) = map.get("$uuid") {
1553                    if let Ok(uuid) = crate::crypto::Uuid::parse_str(value) {
1554                        return SchemaValue::Uuid(*uuid.as_bytes());
1555                    }
1556                }
1557                if let Some(Value::String(value)) = map.get("$float") {
1558                    return match value.as_str() {
1559                        "NaN" => SchemaValue::Float(f64::NAN),
1560                        "Infinity" | "+Infinity" | "inf" | "+inf" => {
1561                            SchemaValue::Float(f64::INFINITY)
1562                        }
1563                        "-Infinity" | "-inf" => SchemaValue::Float(f64::NEG_INFINITY),
1564                        _ => SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default()),
1565                    };
1566                }
1567            }
1568            SchemaValue::Json(crate::json::to_vec(v).unwrap_or_default())
1569        }
1570    }
1571}
1572
1573fn json_i64(value: &Value) -> Option<i64> {
1574    match value {
1575        Value::Number(n) => {
1576            if n.is_finite() && n.fract() == 0.0 && *n >= i64::MIN as f64 && *n <= i64::MAX as f64 {
1577                Some(*n as i64)
1578            } else {
1579                None
1580            }
1581        }
1582        Value::String(s) => s.parse::<i64>().ok(),
1583        _ => None,
1584    }
1585}
1586
1587fn format_uuid(bytes: &[u8; 16]) -> String {
1588    format!(
1589        "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
1590        bytes[0],
1591        bytes[1],
1592        bytes[2],
1593        bytes[3],
1594        bytes[4],
1595        bytes[5],
1596        bytes[6],
1597        bytes[7],
1598        bytes[8],
1599        bytes[9],
1600        bytes[10],
1601        bytes[11],
1602        bytes[12],
1603        bytes[13],
1604        bytes[14],
1605        bytes[15]
1606    )
1607}
1608
1609fn base64_encode(bytes: &[u8]) -> String {
1610    const TABLE: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
1611    let mut out = String::with_capacity(bytes.len().div_ceil(3) * 4);
1612    let mut chunks = bytes.chunks_exact(3);
1613    for chunk in chunks.by_ref() {
1614        let n = ((chunk[0] as u32) << 16) | ((chunk[1] as u32) << 8) | chunk[2] as u32;
1615        out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1616        out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1617        out.push(TABLE[((n >> 6) & 0x3f) as usize] as char);
1618        out.push(TABLE[(n & 0x3f) as usize] as char);
1619    }
1620    match chunks.remainder() {
1621        [] => {}
1622        [a] => {
1623            let n = (*a as u32) << 16;
1624            out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1625            out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1626            out.push('=');
1627            out.push('=');
1628        }
1629        [a, b] => {
1630            let n = ((*a as u32) << 16) | ((*b as u32) << 8);
1631            out.push(TABLE[((n >> 18) & 0x3f) as usize] as char);
1632            out.push(TABLE[((n >> 12) & 0x3f) as usize] as char);
1633            out.push(TABLE[((n >> 6) & 0x3f) as usize] as char);
1634            out.push('=');
1635        }
1636        _ => unreachable!(),
1637    }
1638    out
1639}
1640
1641fn base64_decode(input: &str) -> Result<Vec<u8>, String> {
1642    let bytes = input.as_bytes();
1643    if !bytes.len().is_multiple_of(4) {
1644        return Err("base64 length must be a multiple of 4".to_string());
1645    }
1646    let mut out = Vec::with_capacity(bytes.len() / 4 * 3);
1647    for chunk in bytes.chunks_exact(4) {
1648        let pad = chunk.iter().rev().take_while(|&&b| b == b'=').count();
1649        let a = base64_value(chunk[0])?;
1650        let b = base64_value(chunk[1])?;
1651        let c = if chunk[2] == b'=' {
1652            0
1653        } else {
1654            base64_value(chunk[2])?
1655        };
1656        let d = if chunk[3] == b'=' {
1657            0
1658        } else {
1659            base64_value(chunk[3])?
1660        };
1661        let n = ((a as u32) << 18) | ((b as u32) << 12) | ((c as u32) << 6) | d as u32;
1662        out.push(((n >> 16) & 0xff) as u8);
1663        if pad < 2 {
1664            out.push(((n >> 8) & 0xff) as u8);
1665        }
1666        if pad < 1 {
1667            out.push((n & 0xff) as u8);
1668        }
1669    }
1670    Ok(out)
1671}
1672
1673fn base64_value(byte: u8) -> Result<u8, String> {
1674    match byte {
1675        b'A'..=b'Z' => Ok(byte - b'A'),
1676        b'a'..=b'z' => Ok(byte - b'a' + 26),
1677        b'0'..=b'9' => Ok(byte - b'0' + 52),
1678        b'+' => Ok(62),
1679        b'/' => Ok(63),
1680        b'=' => Ok(0),
1681        _ => Err(format!("invalid base64 character: {}", byte as char)),
1682    }
1683}
1684
1685// ---------------------------------------------------------------------------
1686// Remote dispatch (grpc://)
1687// ---------------------------------------------------------------------------
1688
1689/// Dispatch a parsed JSON-RPC call over gRPC. Mirrors `dispatch_method`
1690/// but every operation goes through the tonic client. The server's
1691/// own `RedDBRuntime` does the actual work — we are just a wire
1692/// adapter between the JSON-RPC framing the drivers speak and the
1693/// gRPC protobuf framing the server speaks.
1694fn dispatch_method_remote(
1695    client: &AsyncMutex<RedDBClient>,
1696    tokio_rt: &tokio::runtime::Runtime,
1697    method: &str,
1698    params: &Value,
1699) -> Result<Value, (&'static str, String)> {
1700    match method {
1701        "version" => Ok(Value::Object(
1702            [
1703                (
1704                    "version".to_string(),
1705                    Value::String(env!("CARGO_PKG_VERSION").to_string()),
1706                ),
1707                (
1708                    "protocol".to_string(),
1709                    Value::String(PROTOCOL_VERSION.to_string()),
1710                ),
1711            ]
1712            .into_iter()
1713            .collect(),
1714        )),
1715
1716        "health" => {
1717            let result = tokio_rt.block_on(async {
1718                let mut guard = client.lock().await;
1719                guard.health_status().await
1720            });
1721            match result {
1722                Ok(status) => Ok(Value::Object(
1723                    [
1724                        ("ok".to_string(), Value::Bool(status.healthy)),
1725                        ("state".to_string(), Value::String(status.state)),
1726                        (
1727                            "checked_at_unix_ms".to_string(),
1728                            Value::Number(status.checked_at_unix_ms as f64),
1729                        ),
1730                        (
1731                            "version".to_string(),
1732                            Value::String(env!("CARGO_PKG_VERSION").to_string()),
1733                        ),
1734                    ]
1735                    .into_iter()
1736                    .collect(),
1737                )),
1738                Err(e) => Err((error_code::INTERNAL_ERROR, e.to_string())),
1739            }
1740        }
1741
1742        "query" => {
1743            let sql = params.get("sql").and_then(Value::as_str).ok_or((
1744                error_code::INVALID_PARAMS,
1745                "missing 'sql' string".to_string(),
1746            ))?;
1747            let json_str = tokio_rt
1748                .block_on(async {
1749                    let mut guard = client.lock().await;
1750                    guard.query(sql).await
1751                })
1752                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1753            // Server returned its own QueryReply.result_json. Parse and
1754            // repackage into the stdio-protocol shape. If parsing fails,
1755            // hand the raw server JSON back under a sentinel key so the
1756            // caller still gets something useful.
1757            let parsed = json::from_str::<Value>(&json_str)
1758                .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1759            Ok(parsed)
1760        }
1761
1762        "insert" => {
1763            let collection = params.get("collection").and_then(Value::as_str).ok_or((
1764                error_code::INVALID_PARAMS,
1765                "missing 'collection' string".to_string(),
1766            ))?;
1767            let payload = params.get("payload").ok_or((
1768                error_code::INVALID_PARAMS,
1769                "missing 'payload' object".to_string(),
1770            ))?;
1771            if payload.as_object().is_none() {
1772                return Err((
1773                    error_code::INVALID_PARAMS,
1774                    "'payload' must be a JSON object".to_string(),
1775                ));
1776            }
1777            let payload_json = payload.to_string_compact();
1778            let reply = tokio_rt
1779                .block_on(async {
1780                    let mut guard = client.lock().await;
1781                    guard.create_row_entity(collection, &payload_json).await
1782                })
1783                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1784            let mut out = json::Map::new();
1785            out.insert("affected".to_string(), Value::Number(1.0));
1786            out.insert("id".to_string(), Value::String(reply.id.to_string()));
1787            Ok(Value::Object(out))
1788        }
1789
1790        "bulk_insert" => {
1791            let collection = params.get("collection").and_then(Value::as_str).ok_or((
1792                error_code::INVALID_PARAMS,
1793                "missing 'collection' string".to_string(),
1794            ))?;
1795            let payloads = params.get("payloads").and_then(Value::as_array).ok_or((
1796                error_code::INVALID_PARAMS,
1797                "missing 'payloads' array".to_string(),
1798            ))?;
1799            let mut encoded = Vec::with_capacity(payloads.len());
1800            for entry in payloads {
1801                if entry.as_object().is_none() {
1802                    return Err((
1803                        error_code::INVALID_PARAMS,
1804                        "each payload must be a JSON object".to_string(),
1805                    ));
1806                }
1807                encoded.push(entry.to_string_compact());
1808            }
1809            let status = tokio_rt
1810                .block_on(async {
1811                    let mut guard = client.lock().await;
1812                    guard.bulk_create_rows(collection, encoded).await
1813                })
1814                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1815            Ok(Value::Object(
1816                [
1817                    ("affected".to_string(), Value::Number(status.count as f64)),
1818                    (
1819                        "ids".to_string(),
1820                        Value::Array(
1821                            status
1822                                .ids
1823                                .into_iter()
1824                                .map(|id| Value::Number(id as f64))
1825                                .collect(),
1826                        ),
1827                    ),
1828                ]
1829                .into_iter()
1830                .collect(),
1831            ))
1832        }
1833
1834        "get" => {
1835            let collection = params.get("collection").and_then(Value::as_str).ok_or((
1836                error_code::INVALID_PARAMS,
1837                "missing 'collection' string".to_string(),
1838            ))?;
1839            let id = params.get("id").and_then(Value::as_str).ok_or((
1840                error_code::INVALID_PARAMS,
1841                "missing 'id' string".to_string(),
1842            ))?;
1843            let sql = format!("SELECT * FROM {collection} WHERE red_entity_id = {id} LIMIT 1");
1844            let json_str = tokio_rt
1845                .block_on(async {
1846                    let mut guard = client.lock().await;
1847                    guard.query(&sql).await
1848                })
1849                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1850            let parsed = json::from_str::<Value>(&json_str)
1851                .map_err(|e| (error_code::INTERNAL_ERROR, format!("bad server JSON: {e}")))?;
1852            // Server response shape: {"rows":[{...}], ...}. Extract
1853            // the first row (if any) as `entity`.
1854            let entity = parsed
1855                .get("rows")
1856                .and_then(Value::as_array)
1857                .and_then(|rows| rows.first().cloned())
1858                .unwrap_or(Value::Null);
1859            Ok(Value::Object(
1860                [("entity".to_string(), entity)].into_iter().collect(),
1861            ))
1862        }
1863
1864        "delete" => {
1865            let collection = params.get("collection").and_then(Value::as_str).ok_or((
1866                error_code::INVALID_PARAMS,
1867                "missing 'collection' string".to_string(),
1868            ))?;
1869            let id = params.get("id").and_then(Value::as_str).ok_or((
1870                error_code::INVALID_PARAMS,
1871                "missing 'id' string".to_string(),
1872            ))?;
1873            let id = id.parse::<u64>().map_err(|_| {
1874                (
1875                    error_code::INVALID_PARAMS,
1876                    "id must be a numeric string".to_string(),
1877                )
1878            })?;
1879            let _reply = tokio_rt
1880                .block_on(async {
1881                    let mut guard = client.lock().await;
1882                    guard.delete_entity(collection, id).await
1883                })
1884                .map_err(|e| (error_code::QUERY_ERROR, e.to_string()))?;
1885            Ok(Value::Object(
1886                [("affected".to_string(), Value::Number(1.0))]
1887                    .into_iter()
1888                    .collect(),
1889            ))
1890        }
1891
1892        "close" => Ok(Value::Null),
1893
1894        other => Err((
1895            error_code::INVALID_REQUEST,
1896            format!("unknown method: {other}"),
1897        )),
1898    }
1899}
1900
1901#[cfg(test)]
1902mod tests {
1903    use super::*;
1904    use crate::json::json;
1905    use proptest::prelude::*;
1906
1907    fn make_runtime() -> RedDBRuntime {
1908        RedDBRuntime::in_memory().expect("in-memory runtime")
1909    }
1910
1911    fn create_graph_collection(rt: &RedDBRuntime, name: &str) {
1912        let db = rt.db();
1913        db.store()
1914            .create_collection(name)
1915            .expect("create collection");
1916        let now = std::time::SystemTime::now()
1917            .duration_since(std::time::UNIX_EPOCH)
1918            .unwrap_or_default()
1919            .as_millis();
1920        db.save_collection_contract(crate::physical::CollectionContract {
1921            name: name.to_string(),
1922            declared_model: crate::catalog::CollectionModel::Graph,
1923            schema_mode: crate::catalog::SchemaMode::Dynamic,
1924            origin: crate::physical::ContractOrigin::Explicit,
1925            version: 1,
1926            created_at_unix_ms: now,
1927            updated_at_unix_ms: now,
1928            default_ttl_ms: None,
1929            vector_dimension: None,
1930            vector_metric: None,
1931            context_index_fields: Vec::new(),
1932            declared_columns: Vec::new(),
1933            table_def: None,
1934            timestamps_enabled: false,
1935            context_index_enabled: false,
1936            metrics_raw_retention_ms: None,
1937            metrics_rollup_policies: Vec::new(),
1938            metrics_tenant_identity: None,
1939            metrics_namespace: None,
1940            append_only: false,
1941            subscriptions: Vec::new(),
1942            analytics_config: Vec::new(),
1943            session_key: None,
1944            session_gap_ms: None,
1945            retention_duration_ms: None,
1946            analytical_storage: None,
1947        })
1948        .expect("save graph contract");
1949    }
1950
1951    fn handle(rt: &RedDBRuntime, line: &str) -> String {
1952        let mut session = Session::new();
1953        handle_line(&Backend::Local(rt), &mut session, line)
1954    }
1955
1956    fn query_request(id: u64, sql: &str) -> String {
1957        let mut params = json::Map::new();
1958        params.insert("sql".to_string(), Value::String(sql.to_string()));
1959
1960        let mut request = json::Map::new();
1961        request.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1962        request.insert("id".to_string(), Value::Number(id as f64));
1963        request.insert("method".to_string(), Value::String("query".to_string()));
1964        request.insert("params".to_string(), Value::Object(params));
1965        Value::Object(request).to_string_compact()
1966    }
1967
1968    fn query_request_with_params(id: u64, sql: &str, binds: Vec<Value>) -> String {
1969        let mut params = json::Map::new();
1970        params.insert("sql".to_string(), Value::String(sql.to_string()));
1971        params.insert("params".to_string(), Value::Array(binds));
1972
1973        let mut request = json::Map::new();
1974        request.insert("jsonrpc".to_string(), Value::String("2.0".to_string()));
1975        request.insert("id".to_string(), Value::Number(id as f64));
1976        request.insert("method".to_string(), Value::String("query".to_string()));
1977        request.insert("params".to_string(), Value::Object(params));
1978        Value::Object(request).to_string_compact()
1979    }
1980
1981    /// Stateful helper: keeps the same `Session` across multiple calls so
1982    /// tests can exercise multi-step transaction flows in a single closure.
1983    fn with_session<F>(rt: &RedDBRuntime, f: F)
1984    where
1985        F: FnOnce(&dyn Fn(&str) -> String, &RedDBRuntime),
1986    {
1987        let session = std::cell::RefCell::new(Session::new());
1988        let call = |line: &str| -> String {
1989            let mut s = session.borrow_mut();
1990            handle_line(&Backend::Local(rt), &mut s, line)
1991        };
1992        f(&call, rt);
1993    }
1994
1995    fn result_rows(response: &str) -> Vec<Value> {
1996        json::from_str::<Value>(response)
1997            .expect("json response")
1998            .get("result")
1999            .and_then(|result| result.get("rows"))
2000            .and_then(Value::as_array)
2001            .map(|rows| rows.to_vec())
2002            .unwrap_or_default()
2003    }
2004
2005    fn result_name_kind(response: &str) -> Vec<(String, String)> {
2006        result_rows(response)
2007            .into_iter()
2008            .map(|row| {
2009                let object = row.as_object().expect("row object");
2010                let name = object
2011                    .get("name")
2012                    .and_then(Value::as_str)
2013                    .expect("row name")
2014                    .to_string();
2015                let kind = object
2016                    .get("kind")
2017                    .and_then(Value::as_str)
2018                    .expect("row kind")
2019                    .to_string();
2020                (name, kind)
2021            })
2022            .collect()
2023    }
2024
2025    fn json_scalar_param() -> impl Strategy<Value = Value> {
2026        prop_oneof![
2027            Just(Value::Null),
2028            any::<bool>().prop_map(Value::Bool),
2029            (-1000_i64..1000_i64).prop_map(|n| Value::Number(n as f64)),
2030            "[a-z']{0,8}".prop_map(Value::String),
2031        ]
2032    }
2033
2034    fn sql_literal_for_json(value: &Value) -> String {
2035        match value {
2036            Value::Null => "NULL".to_string(),
2037            Value::Bool(true) => "TRUE".to_string(),
2038            Value::Bool(false) => "FALSE".to_string(),
2039            Value::Number(n) => format!("{n:.0}"),
2040            Value::String(s) => format!("'{}'", s.replace('\'', "''")),
2041            _ => panic!("unsupported scalar param: {value:?}"),
2042        }
2043    }
2044
2045    #[test]
2046    fn version_method_returns_version_and_protocol() {
2047        let rt = make_runtime();
2048        let line = r#"{"jsonrpc":"2.0","id":1,"method":"version","params":{}}"#;
2049        let resp = handle(&rt, line);
2050        assert!(resp.contains("\"id\":1"));
2051        assert!(resp.contains("\"protocol\":\"1.0\""));
2052        assert!(resp.contains("\"version\""));
2053    }
2054
2055    #[test]
2056    fn health_method_returns_ok_true() {
2057        let rt = make_runtime();
2058        let resp = handle(
2059            &rt,
2060            r#"{"jsonrpc":"2.0","id":"abc","method":"health","params":{}}"#,
2061        );
2062        assert!(resp.contains("\"ok\":true"));
2063        assert!(resp.contains("\"id\":\"abc\""));
2064    }
2065
2066    #[test]
2067    fn parse_error_for_invalid_json() {
2068        let rt = make_runtime();
2069        let resp = handle(&rt, "not json {");
2070        assert!(resp.contains("\"code\":\"PARSE_ERROR\""));
2071        assert!(resp.contains("\"id\":null"));
2072    }
2073
2074    #[test]
2075    fn invalid_request_when_method_missing() {
2076        let rt = make_runtime();
2077        let resp = handle(&rt, r#"{"jsonrpc":"2.0","id":1,"params":{}}"#);
2078        assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
2079    }
2080
2081    #[test]
2082    fn unknown_method_is_invalid_request() {
2083        let rt = make_runtime();
2084        let resp = handle(
2085            &rt,
2086            r#"{"jsonrpc":"2.0","id":1,"method":"frobnicate","params":{}}"#,
2087        );
2088        assert!(resp.contains("\"code\":\"INVALID_REQUEST\""));
2089        assert!(resp.contains("frobnicate"));
2090    }
2091
2092    #[test]
2093    fn invalid_params_when_query_sql_missing() {
2094        let rt = make_runtime();
2095        let resp = handle(
2096            &rt,
2097            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{}}"#,
2098        );
2099        assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
2100    }
2101
2102    #[test]
2103    fn close_method_marks_response_for_shutdown() {
2104        let rt = make_runtime();
2105        let resp = handle(
2106            &rt,
2107            r#"{"jsonrpc":"2.0","id":1,"method":"close","params":{}}"#,
2108        );
2109        assert!(resp.contains("\"__close__\":true"));
2110    }
2111
2112    #[test]
2113    fn query_with_int_text_params_round_trips() {
2114        let rt = make_runtime();
2115        let _ = handle(
2116            &rt,
2117            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE p (id INTEGER, name TEXT)"}}"#,
2118        );
2119        let _ = handle(
2120            &rt,
2121            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (1, 'Alice')"}}"#,
2122        );
2123        let _ = handle(
2124            &rt,
2125            r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"INSERT INTO p (id, name) VALUES (2, 'Bob')"}}"#,
2126        );
2127        let resp = handle(
2128            &rt,
2129            r#"{"jsonrpc":"2.0","id":4,"method":"query","params":{"sql":"SELECT * FROM p WHERE id = $1 AND name = $2","params":[1,"Alice"]}}"#,
2130        );
2131        assert!(resp.contains("\"Alice\""), "got: {resp}");
2132        assert!(!resp.contains("\"Bob\""), "got: {resp}");
2133    }
2134
2135    #[test]
2136    fn query_with_question_params_covers_select_insert_update_delete() {
2137        let rt = make_runtime();
2138        let create = handle(
2139            &rt,
2140            &query_request(1, "CREATE TABLE qp (id INTEGER, name TEXT)"),
2141        );
2142        assert!(!create.contains("\"error\""), "got: {create}");
2143
2144        let inserted = handle(
2145            &rt,
2146            &query_request_with_params(
2147                2,
2148                "INSERT INTO qp (id, name) VALUES (?, ?)",
2149                vec![json!(1), json!("O'Reilly")],
2150            ),
2151        );
2152        assert!(inserted.contains("\"affected\":1"), "got: {inserted}");
2153
2154        let selected = handle(
2155            &rt,
2156            &query_request_with_params(3, "SELECT name FROM qp WHERE id = ?", vec![json!(1)]),
2157        );
2158        let rows = result_rows(&selected);
2159        assert_eq!(rows.len(), 1, "got: {selected}");
2160        assert_eq!(
2161            rows[0].get("name").and_then(Value::as_str),
2162            Some("O'Reilly")
2163        );
2164
2165        let selected_numbered = handle(
2166            &rt,
2167            &query_request_with_params(
2168                4,
2169                "SELECT name FROM qp WHERE name = ?1 AND id = ?2",
2170                vec![json!("O'Reilly"), json!(1)],
2171            ),
2172        );
2173        assert_eq!(
2174            result_rows(&selected_numbered).len(),
2175            1,
2176            "got: {selected_numbered}"
2177        );
2178
2179        let updated = handle(
2180            &rt,
2181            &query_request_with_params(
2182                5,
2183                "UPDATE qp SET name = ? WHERE id = ?",
2184                vec![json!("Alice"), json!(1)],
2185            ),
2186        );
2187        assert!(updated.contains("\"affected\":1"), "got: {updated}");
2188
2189        let deleted = handle(
2190            &rt,
2191            &query_request_with_params(6, "DELETE FROM qp WHERE name = ?", vec![json!("Alice")]),
2192        );
2193        assert!(deleted.contains("\"affected\":1"), "got: {deleted}");
2194
2195        let remaining = handle(&rt, &query_request(7, "SELECT * FROM qp"));
2196        assert!(result_rows(&remaining).is_empty(), "got: {remaining}");
2197    }
2198
2199    #[test]
2200    fn query_with_params_insert_and_search_round_trip() {
2201        let rt = make_runtime();
2202        let insert = handle(
2203            &rt,
2204            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"INSERT INTO bun_embeddings VECTOR (dense, content) VALUES ($1, $2)","params":[[1.0,0.0],"bun vector"]}}"#,
2205        );
2206        assert!(insert.contains("\"affected\":1"), "got: {insert}");
2207
2208        let search = handle(
2209            &rt,
2210            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SEARCH SIMILAR $1 COLLECTION bun_embeddings LIMIT 1","params":[[1.0,0.0]]}}"#,
2211        );
2212        assert!(search.contains("\"rows\""), "got: {search}");
2213        assert!(search.contains("\"score\":1"), "got: {search}");
2214        assert!(!search.contains("\"error\""), "got: {search}");
2215    }
2216
2217    #[test]
2218    fn query_with_question_vector_param_round_trips() {
2219        let rt = make_runtime();
2220        let insert = handle(
2221            &rt,
2222            &query_request_with_params(
2223                1,
2224                "INSERT INTO question_embeddings VECTOR (dense, content) VALUES (?, ?)",
2225                vec![json!([1.0, 0.0]), json!("question vector")],
2226            ),
2227        );
2228        assert!(insert.contains("\"affected\":1"), "got: {insert}");
2229
2230        let search = handle(
2231            &rt,
2232            &query_request_with_params(
2233                2,
2234                "SEARCH SIMILAR ? COLLECTION question_embeddings LIMIT 1",
2235                vec![json!([1.0, 0.0])],
2236            ),
2237        );
2238        assert!(search.contains("\"rows\""), "got: {search}");
2239        assert!(search.contains("\"score\":1"), "got: {search}");
2240        assert!(!search.contains("\"error\""), "got: {search}");
2241    }
2242
2243    #[test]
2244    fn query_with_typed_json_rpc_params_round_trips() {
2245        let rt = make_runtime();
2246        let create = handle(
2247            &rt,
2248            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE value_params (ok BOOLEAN, score FLOAT, payload BLOB, body JSON, seen_at TIMESTAMP, ident UUID)"}}"#,
2249        );
2250        assert!(!create.contains("\"error\""), "got: {create}");
2251
2252        let insert = handle(
2253            &rt,
2254            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO value_params (ok, score, payload, body, seen_at, ident) VALUES ($1, $2, $3, $4, $5, $6)","params":[true,{"$float":"NaN"},{"$bytes":"3q2+7w=="},{"z":[1,{"a":true}],"a":null},{"$ts":"1700000000123456789"},{"$uuid":"00112233-4455-6677-8899-aabbccddeeff"}]}}"#,
2255        );
2256        assert!(insert.contains("\"affected\":1"), "got: {insert}");
2257
2258        let selected = handle(
2259            &rt,
2260            r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"SELECT * FROM value_params"}}"#,
2261        );
2262        assert!(selected.contains("\"ok\":true"), "got: {selected}");
2263        assert!(selected.contains("\"$float\":\"NaN\""), "got: {selected}");
2264        assert!(
2265            selected.contains("\"$bytes\":\"3q2+7w==\""),
2266            "got: {selected}"
2267        );
2268        assert!(
2269            selected.contains("\"body\":{\"a\":null,\"z\":[1,{\"a\":true}]}"),
2270            "got: {selected}"
2271        );
2272        assert!(
2273            selected.contains("\"$ts\":\"1700000000123456789\""),
2274            "got: {selected}"
2275        );
2276        assert!(
2277            selected.contains("\"$uuid\":\"00112233-4455-6677-8899-aabbccddeeff\""),
2278            "got: {selected}"
2279        );
2280    }
2281
2282    #[test]
2283    fn select_timeseries_tags_decodes_json_payload() {
2284        let rt = make_runtime();
2285        let create = handle(&rt, &query_request(1, "CREATE TIMESERIES ts1"));
2286        assert!(!create.contains("\"error\""), "got: {create}");
2287
2288        let insert = handle(
2289            &rt,
2290            &query_request(
2291                2,
2292                r#"INSERT INTO ts1 (metric, value, tags, timestamp) VALUES ('cpu', 85, '{"host":"a"}', 1000)"#,
2293            ),
2294        );
2295        assert!(insert.contains("\"affected\":1"), "got: {insert}");
2296
2297        let selected = handle(&rt, &query_request(3, "SELECT tags FROM ts1"));
2298        assert!(!selected.contains("<json"), "got: {selected}");
2299        let response = json::from_str::<Value>(&selected).expect("response json");
2300        let tags = response
2301            .get("result")
2302            .and_then(|result| result.get("rows"))
2303            .and_then(Value::as_array)
2304            .and_then(|rows| rows.first())
2305            .and_then(|row| row.get("tags"))
2306            .expect("tags field");
2307        assert_eq!(tags, &json!({"host": "a"}));
2308    }
2309
2310    #[test]
2311    fn select_table_json_column_round_trips_after_single_parse() {
2312        let rt = make_runtime();
2313        let create = handle(&rt, &query_request(1, "CREATE TABLE docs (payload JSON)"));
2314        assert!(!create.contains("\"error\""), "got: {create}");
2315
2316        let original = r#"{"nested":{"items":[1,true,"x"],"object":{"k":"v"}}}"#;
2317        let insert_sql = format!("INSERT INTO docs (payload) VALUES ({original})");
2318        let insert = handle(&rt, &query_request(2, &insert_sql));
2319        assert!(insert.contains("\"affected\":1"), "got: {insert}");
2320
2321        let selected = handle(&rt, &query_request(3, "SELECT payload FROM docs"));
2322        assert!(!selected.contains("<json"), "got: {selected}");
2323        let response = json::from_str::<Value>(&selected).expect("response json");
2324        let payload = response
2325            .get("result")
2326            .and_then(|result| result.get("rows"))
2327            .and_then(Value::as_array)
2328            .and_then(|rows| rows.first())
2329            .and_then(|row| row.get("payload"))
2330            .expect("payload field");
2331        let expected = json::from_str::<Value>(original).expect("expected json");
2332        assert_eq!(payload, &expected);
2333
2334        let payload_text = payload.to_string_compact();
2335        assert_eq!(
2336            json::from_str::<Value>(&payload_text).expect("single parse"),
2337            expected
2338        );
2339    }
2340
2341    #[test]
2342    fn select_json_corruption_falls_back_to_code_and_hex() {
2343        use crate::storage::query::unified::UnifiedResult;
2344
2345        let mut result = UnifiedResult::with_columns(vec!["payload".into()]);
2346        let mut record = UnifiedRecord::new();
2347        record.set("payload", SchemaValue::Json(b"{not json".to_vec()));
2348        result.push(record);
2349
2350        let json = query_result_to_json(&RuntimeQueryResult {
2351            query: "SELECT payload FROM docs".to_string(),
2352            mode: crate::storage::query::modes::QueryMode::Sql,
2353            statement: "select",
2354            engine: "runtime-table",
2355            result,
2356            affected_rows: 0,
2357            statement_type: "select",
2358            bookmark: None,
2359        });
2360
2361        let payload = json
2362            .get("rows")
2363            .and_then(Value::as_array)
2364            .and_then(|rows| rows.first())
2365            .and_then(|row| row.get("payload"))
2366            .expect("payload field");
2367        assert_eq!(
2368            payload.get("code").and_then(Value::as_str),
2369            Some("INVALID_JSON")
2370        );
2371        assert_eq!(
2372            payload.get("hex").and_then(Value::as_str),
2373            Some("7b6e6f74206a736f6e")
2374        );
2375    }
2376
2377    #[test]
2378    fn json_value_to_schema_value_decodes_typed_envelopes() {
2379        let SchemaValue::Blob(bytes) = json_value_to_schema_value(&json!({ "$bytes": "AAECAw==" }))
2380        else {
2381            panic!("expected blob");
2382        };
2383        assert_eq!(bytes, vec![0, 1, 2, 3]);
2384
2385        assert_eq!(
2386            json_value_to_schema_value(&json!({ "$ts": "9223372036854775807" })),
2387            SchemaValue::Timestamp(i64::MAX)
2388        );
2389
2390        let SchemaValue::Uuid(bytes) = json_value_to_schema_value(&json!({
2391            "$uuid": "00112233-4455-6677-8899-aabbccddeeff"
2392        })) else {
2393            panic!("expected uuid");
2394        };
2395        assert_eq!(
2396            bytes,
2397            [
2398                0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd,
2399                0xee, 0xff
2400            ]
2401        );
2402
2403        let SchemaValue::Float(value) =
2404            json_value_to_schema_value(&json!({ "$float": "-Infinity" }))
2405        else {
2406            panic!("expected float");
2407        };
2408        assert!(value.is_infinite() && value.is_sign_negative());
2409    }
2410
2411    #[test]
2412    fn query_with_params_arity_mismatch_rejected() {
2413        let rt = make_runtime();
2414        let _ = handle(
2415            &rt,
2416            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pa (id INTEGER)"}}"#,
2417        );
2418        let resp = handle(
2419            &rt,
2420            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pa WHERE id = $1","params":[1,2]}}"#,
2421        );
2422        assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2423    }
2424
2425    #[test]
2426    fn query_with_question_params_arity_mismatch_rejected() {
2427        let rt = make_runtime();
2428        let _ = handle(&rt, &query_request(1, "CREATE TABLE qpa (id INTEGER)"));
2429        let resp = handle(
2430            &rt,
2431            &query_request_with_params(
2432                2,
2433                "SELECT * FROM qpa WHERE id = ?",
2434                vec![json!(1), json!(2)],
2435            ),
2436        );
2437        assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2438        assert!(resp.contains("SQL expects 1, got 2"), "got: {resp}");
2439    }
2440
2441    #[test]
2442    fn query_with_params_gap_rejected() {
2443        let rt = make_runtime();
2444        let _ = handle(
2445            &rt,
2446            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE pg (a INTEGER, b INTEGER)"}}"#,
2447        );
2448        let resp = handle(
2449            &rt,
2450            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"SELECT * FROM pg WHERE a = $1 AND b = $3","params":[1,2,3]}}"#,
2451        );
2452        assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2453    }
2454
2455    #[test]
2456    fn query_with_question_numbered_gap_rejected() {
2457        let rt = make_runtime();
2458        let _ = handle(&rt, &query_request(1, "CREATE TABLE qpg (id INTEGER)"));
2459        let resp = handle(
2460            &rt,
2461            &query_request_with_params(
2462                2,
2463                "SELECT * FROM qpg WHERE id = ?2",
2464                vec![json!(1), json!(2)],
2465            ),
2466        );
2467        assert!(resp.contains("\"INVALID_PARAMS\""), "got: {resp}");
2468        assert!(resp.contains("parameter $`1` is missing"), "got: {resp}");
2469    }
2470
2471    #[test]
2472    fn query_with_question_params_type_mismatch_names_slot() {
2473        let rt = make_runtime();
2474        let _ = handle(&rt, &query_request(1, "CREATE TABLE qpt (id INTEGER)"));
2475        let resp = handle(
2476            &rt,
2477            &query_request_with_params(
2478                2,
2479                "INSERT INTO qpt (id) VALUES (?)",
2480                vec![json!("not-an-integer")],
2481            ),
2482        );
2483        assert!(resp.contains("\"QUERY_ERROR\""), "got: {resp}");
2484        assert!(resp.contains("id"), "got: {resp}");
2485        assert!(resp.contains("integer"), "got: {resp}");
2486    }
2487
2488    #[test]
2489    fn query_select_one_returns_rows() {
2490        let rt = make_runtime();
2491        let resp = handle(
2492            &rt,
2493            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"SELECT 1 AS one"}}"#,
2494        );
2495        assert!(resp.contains("\"result\""));
2496        assert!(!resp.contains("\"error\""));
2497    }
2498
2499    #[test]
2500    fn ask_query_result_uses_canonical_envelope() {
2501        use crate::storage::query::unified::UnifiedResult;
2502
2503        let mut result = UnifiedResult::with_columns(vec![
2504            "answer".into(),
2505            "provider".into(),
2506            "model".into(),
2507            "prompt_tokens".into(),
2508            "completion_tokens".into(),
2509            "sources_count".into(),
2510            "sources_flat".into(),
2511            "citations".into(),
2512            "validation".into(),
2513        ]);
2514        let mut record = UnifiedRecord::new();
2515        record.set("answer", SchemaValue::text("Deploy failed [^1]."));
2516        record.set("provider", SchemaValue::text("openai"));
2517        record.set("model", SchemaValue::text("gpt-4o-mini"));
2518        record.set("prompt_tokens", SchemaValue::Integer(11));
2519        record.set("completion_tokens", SchemaValue::Integer(7));
2520        record.set(
2521            "sources_flat",
2522            SchemaValue::Json(
2523                br#"[{"urn":"urn:reddb:row:deployments:1","kind":"row","collection":"deployments","id":"1"}]"#.to_vec(),
2524            ),
2525        );
2526        record.set(
2527            "citations",
2528            SchemaValue::Json(br#"[{"marker":1,"urn":"urn:reddb:row:deployments:1"}]"#.to_vec()),
2529        );
2530        record.set(
2531            "validation",
2532            SchemaValue::Json(br#"{"ok":true,"warnings":[],"errors":[]}"#.to_vec()),
2533        );
2534        result.push(record);
2535
2536        let json = query_result_to_json(&RuntimeQueryResult {
2537            query: "ASK 'why did deploy fail?'".to_string(),
2538            mode: crate::storage::query::modes::QueryMode::Sql,
2539            statement: "ask",
2540            engine: "runtime-ai",
2541            result,
2542            affected_rows: 0,
2543            statement_type: "select",
2544            bookmark: None,
2545        });
2546
2547        assert_eq!(
2548            json.get("answer").and_then(Value::as_str),
2549            Some("Deploy failed [^1].")
2550        );
2551        assert_eq!(json.get("cache_hit").and_then(Value::as_bool), Some(false));
2552        assert_eq!(json.get("cost_usd").and_then(Value::as_f64), Some(0.0));
2553        assert_eq!(json.get("mode").and_then(Value::as_str), Some("strict"));
2554        assert_eq!(json.get("retry_count").and_then(Value::as_u64), Some(0));
2555        assert!(
2556            json.get("rows").is_none(),
2557            "ASK envelope must not be row-wrapped: {json}"
2558        );
2559        assert!(
2560            json.get("sources_flat")
2561                .and_then(Value::as_array)
2562                .is_some_and(|sources| sources.len() == 1
2563                    && sources[0].get("payload").and_then(Value::as_str).is_some()),
2564            "sources_flat must be a parsed array: {json}"
2565        );
2566        assert!(
2567            json.get("citations")
2568                .and_then(Value::as_array)
2569                .is_some_and(|citations| citations.len() == 1),
2570            "citations must be a parsed array: {json}"
2571        );
2572        assert_eq!(
2573            json.get("validation")
2574                .and_then(|v| v.get("ok"))
2575                .and_then(Value::as_bool),
2576            Some(true)
2577        );
2578    }
2579
2580    // -----------------------------------------------------------------
2581    // Transaction tests
2582    // -----------------------------------------------------------------
2583
2584    #[test]
2585    fn tx_begin_returns_tx_id_and_isolation() {
2586        let rt = make_runtime();
2587        with_session(&rt, |call, _| {
2588            let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2589            assert!(resp.contains("\"tx_id\":1"));
2590            assert!(resp.contains("\"isolation\":\"read_committed_deferred\""));
2591            assert!(!resp.contains("\"error\""));
2592        });
2593    }
2594
2595    #[test]
2596    fn tx_begin_twice_returns_already_open() {
2597        let rt = make_runtime();
2598        with_session(&rt, |call, _| {
2599            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2600            let resp = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
2601            assert!(resp.contains("\"code\":\"TX_ALREADY_OPEN\""));
2602        });
2603    }
2604
2605    #[test]
2606    fn tx_commit_without_begin_returns_no_tx_open() {
2607        let rt = make_runtime();
2608        with_session(&rt, |call, _| {
2609            let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.commit","params":null}"#);
2610            assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
2611        });
2612    }
2613
2614    #[test]
2615    fn tx_rollback_without_begin_returns_no_tx_open() {
2616        let rt = make_runtime();
2617        with_session(&rt, |call, _| {
2618            let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.rollback","params":null}"#);
2619            assert!(resp.contains("\"code\":\"NO_TX_OPEN\""));
2620        });
2621    }
2622
2623    #[test]
2624    fn insert_inside_tx_returns_pending_envelope() {
2625        let rt = make_runtime();
2626        // Create the collection first (outside any tx).
2627        let _ = handle(
2628            &rt,
2629            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE users (name TEXT)"}}"#,
2630        );
2631        with_session(&rt, |call, _| {
2632            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2633            let resp = call(
2634                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"users","payload":{"name":"alice"}}}"#,
2635            );
2636            assert!(resp.contains("\"pending\":true"));
2637            assert!(resp.contains("\"tx_id\":1"));
2638            assert!(resp.contains("\"affected\":0"));
2639        });
2640    }
2641
2642    #[test]
2643    fn begin_insert_rollback_does_not_persist() {
2644        let rt = make_runtime();
2645        let _ = handle(
2646            &rt,
2647            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u (name TEXT)"}}"#,
2648        );
2649        with_session(&rt, |call, _| {
2650            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2651            let _ = call(
2652                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u","payload":{"name":"ghost"}}}"#,
2653            );
2654            let rollback = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
2655            assert!(rollback.contains("\"ops_discarded\":1"));
2656            assert!(rollback.contains("\"tx_id\":1"));
2657        });
2658        // After rollback, the row must not be visible to a fresh query.
2659        let resp = handle(
2660            &rt,
2661            r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u"}}"#,
2662        );
2663        assert!(!resp.contains("\"ghost\""));
2664    }
2665
2666    #[test]
2667    fn begin_insert_commit_persists() {
2668        let rt = make_runtime();
2669        let _ = handle(
2670            &rt,
2671            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u2 (name TEXT)"}}"#,
2672        );
2673        with_session(&rt, |call, _| {
2674            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2675            let _ = call(
2676                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u2","payload":{"name":"alice"}}}"#,
2677            );
2678            let _ = call(
2679                r#"{"jsonrpc":"2.0","id":3,"method":"insert","params":{"collection":"u2","payload":{"name":"bob"}}}"#,
2680            );
2681            let commit = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.commit","params":null}"#);
2682            assert!(commit.contains("\"ops_replayed\":2"));
2683            assert!(!commit.contains("\"error\""));
2684        });
2685        let resp = handle(
2686            &rt,
2687            r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u2"}}"#,
2688        );
2689        assert!(resp.contains("\"alice\""));
2690        assert!(resp.contains("\"bob\""));
2691    }
2692
2693    #[test]
2694    fn bulk_insert_inside_tx_buffers_everything() {
2695        let rt = make_runtime();
2696        let _ = handle(
2697            &rt,
2698            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u3 (name TEXT)"}}"#,
2699        );
2700        with_session(&rt, |call, _| {
2701            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2702            let resp = call(
2703                r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"u3","payloads":[{"name":"a"},{"name":"b"},{"name":"c"}]}}"#,
2704            );
2705            assert!(resp.contains("\"buffered\":3"));
2706            assert!(resp.contains("\"pending\":true"));
2707            assert!(resp.contains("\"affected\":0"));
2708
2709            let commit = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
2710            assert!(commit.contains("\"ops_replayed\":3"));
2711        });
2712    }
2713
2714    #[test]
2715    fn bulk_insert_chunks_at_internal_500_row_limit() {
2716        assert_eq!(bulk_insert_chunk_count(0), 0);
2717        assert_eq!(bulk_insert_chunk_count(1), 1);
2718        assert_eq!(bulk_insert_chunk_count(500), 1);
2719        assert_eq!(bulk_insert_chunk_count(501), 2);
2720        assert_eq!(bulk_insert_chunk_count(1000), 2);
2721        assert_eq!(bulk_insert_chunk_count(1001), 3);
2722    }
2723
2724    proptest! {
2725        #![proptest_config(ProptestConfig {
2726            cases: 12,
2727            ..ProptestConfig::default()
2728        })]
2729
2730        #[test]
2731        fn bulk_insert_matches_sequential_insert_state(
2732            names in proptest::collection::vec("[a-z]{1,8}", 1usize..20)
2733        ) {
2734            let rt = make_runtime();
2735            let payloads = names
2736                .iter()
2737                .map(|name| format!(r#"{{"name":"{name}","kind":"bulk"}}"#))
2738                .collect::<Vec<_>>();
2739            let payload_array = payloads.join(",");
2740
2741            let bulk = handle(
2742                &rt,
2743                &format!(
2744                    r#"{{"jsonrpc":"2.0","id":1,"method":"bulk_insert","params":{{"collection":"bulk_prop","payloads":[{payload_array}]}}}}"#
2745                ),
2746            );
2747            let bulk_result = json::from_str::<Value>(&bulk).expect("bulk json");
2748            let bulk_ids = bulk_result
2749                .get("result")
2750                .and_then(|result| result.get("ids"))
2751                .and_then(Value::as_array)
2752                .expect("bulk ids");
2753            prop_assert_eq!(bulk_ids.len(), names.len());
2754
2755            for (index, payload) in payloads.iter().enumerate() {
2756                let insert = handle(
2757                    &rt,
2758                    &format!(
2759                        r#"{{"jsonrpc":"2.0","id":{},"method":"insert","params":{{"collection":"seq_prop","payload":{payload}}}}}"#,
2760                        index + 10
2761                    ),
2762                );
2763                let insert_result = json::from_str::<Value>(&insert).expect("insert json");
2764                prop_assert!(
2765                    insert_result
2766                        .get("result")
2767                        .and_then(|result| result.get("id"))
2768                        .is_some(),
2769                    "insert response missing id: {insert}"
2770                );
2771            }
2772
2773            let bulk_rows = result_name_kind(&handle(
2774                &rt,
2775                r#"{"jsonrpc":"2.0","id":99,"method":"query","params":{"sql":"SELECT name, kind FROM bulk_prop ORDER BY red_entity_id"}}"#,
2776            ));
2777            let seq_rows = result_name_kind(&handle(
2778                &rt,
2779                r#"{"jsonrpc":"2.0","id":100,"method":"query","params":{"sql":"SELECT name, kind FROM seq_prop ORDER BY red_entity_id"}}"#,
2780            ));
2781            prop_assert_eq!(bulk_rows, seq_rows);
2782        }
2783
2784        #[test]
2785        fn question_param_select_matches_inlined_literal(value in json_scalar_param()) {
2786            let rt = make_runtime();
2787            let bound = handle(
2788                &rt,
2789                &query_request_with_params(1, "SELECT ? AS v", vec![value.clone()]),
2790            );
2791            let inline_sql = format!("SELECT {} AS v", sql_literal_for_json(&value));
2792            let inlined = handle(&rt, &query_request(2, &inline_sql));
2793            prop_assert_eq!(
2794                result_rows(&bound),
2795                result_rows(&inlined),
2796                "bound={}, inlined={}",
2797                bound,
2798                inlined
2799            );
2800        }
2801    }
2802
2803    #[test]
2804    fn bulk_insert_graph_nodes_accepts_flat_rows_and_returns_ids() {
2805        let rt = make_runtime();
2806        create_graph_collection(&rt, "social");
2807
2808        let resp = handle(
2809            &rt,
2810            r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"social","payloads":[{"label":"User","name":"alice"},{"label":"User","name":"bob"}]}}"#,
2811        );
2812        let envelope: Value = json::from_str(&resp).expect("json response");
2813        let result = envelope.get("result").expect("result");
2814        assert_eq!(result.get("affected").and_then(Value::as_u64), Some(2));
2815        assert_eq!(
2816            result
2817                .get("ids")
2818                .and_then(Value::as_array)
2819                .map(|ids| ids.len()),
2820            Some(2)
2821        );
2822
2823        let query = handle(
2824            &rt,
2825            r#"{"jsonrpc":"2.0","id":3,"method":"query","params":{"sql":"MATCH (n:User) RETURN n.name"}}"#,
2826        );
2827        assert!(query.contains("\"alice\""), "got: {query}");
2828        assert!(query.contains("\"bob\""), "got: {query}");
2829    }
2830
2831    #[test]
2832    fn bulk_insert_graph_edges_accepts_flat_rows_and_returns_ids() {
2833        let rt = make_runtime();
2834        create_graph_collection(&rt, "network");
2835        let nodes = handle(
2836            &rt,
2837            r#"{"jsonrpc":"2.0","id":2,"method":"bulk_insert","params":{"collection":"network","payloads":[{"label":"Host","name":"app"},{"label":"Host","name":"db"}]}}"#,
2838        );
2839        let envelope: Value = json::from_str(&nodes).expect("node response");
2840        let ids = envelope
2841            .get("result")
2842            .and_then(|r| r.get("ids"))
2843            .and_then(Value::as_array)
2844            .expect("node ids");
2845        let from = ids[0].as_u64().expect("from id");
2846        let to = ids[1].as_u64().expect("to id");
2847
2848        let resp = handle(
2849            &rt,
2850            &format!(
2851                r#"{{"jsonrpc":"2.0","id":3,"method":"bulk_insert","params":{{"collection":"network","payloads":[{{"label":"connects","from":{from},"to":{to},"weight":0.5,"role":"primary"}}]}}}}"#
2852            ),
2853        );
2854        let envelope: Value = json::from_str(&resp).expect("edge response");
2855        let result = envelope.get("result").expect("result");
2856        assert_eq!(result.get("affected").and_then(Value::as_u64), Some(1));
2857        assert_eq!(
2858            result
2859                .get("ids")
2860                .and_then(Value::as_array)
2861                .map(|ids| ids.len()),
2862            Some(1)
2863        );
2864    }
2865
2866    #[test]
2867    fn delete_inside_tx_is_buffered() {
2868        let rt = make_runtime();
2869        // Seed two rows outside any tx.
2870        let _ = handle(
2871            &rt,
2872            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u4 (name TEXT)"}}"#,
2873        );
2874        let _ = handle(
2875            &rt,
2876            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO u4 (name) VALUES ('keep')"}}"#,
2877        );
2878        with_session(&rt, |call, _| {
2879            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2880            let resp = call(
2881                r#"{"jsonrpc":"2.0","id":2,"method":"delete","params":{"collection":"u4","id":"1"}}"#,
2882            );
2883            assert!(resp.contains("\"pending\":true"));
2884            let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.rollback","params":null}"#);
2885        });
2886        // Row should still be present after rollback of the delete.
2887        let resp = handle(
2888            &rt,
2889            r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u4"}}"#,
2890        );
2891        assert!(resp.contains("\"keep\""));
2892    }
2893
2894    #[test]
2895    fn close_with_open_tx_auto_rollbacks() {
2896        let rt = make_runtime();
2897        let _ = handle(
2898            &rt,
2899            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u5 (name TEXT)"}}"#,
2900        );
2901        with_session(&rt, |call, _| {
2902            let _ = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
2903            let _ = call(
2904                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u5","payload":{"name":"ghost"}}}"#,
2905            );
2906            let close = call(r#"{"jsonrpc":"2.0","id":3,"method":"close","params":null}"#);
2907            assert!(close.contains("\"__close__\":true"));
2908            assert!(!close.contains("\"error\""));
2909        });
2910        let resp = handle(
2911            &rt,
2912            r#"{"jsonrpc":"2.0","id":9,"method":"query","params":{"sql":"SELECT * FROM u5"}}"#,
2913        );
2914        assert!(!resp.contains("\"ghost\""));
2915    }
2916
2917    // -----------------------------------------------------------------
2918    // Cursor streaming tests
2919    // -----------------------------------------------------------------
2920
2921    fn seed_numbers_table(rt: &RedDBRuntime, table: &str, count: u32) {
2922        let _ = handle(
2923            rt,
2924            &format!(
2925                r#"{{"jsonrpc":"2.0","id":1,"method":"query","params":{{"sql":"CREATE TABLE {table} (n INTEGER)"}}}}"#,
2926            ),
2927        );
2928        for i in 0..count {
2929            let _ = handle(
2930                rt,
2931                &format!(
2932                    r#"{{"jsonrpc":"2.0","id":2,"method":"query","params":{{"sql":"INSERT INTO {table} (n) VALUES ({i})"}}}}"#,
2933                ),
2934            );
2935        }
2936    }
2937
2938    #[test]
2939    fn cursor_open_returns_id_columns_and_total() {
2940        let rt = make_runtime();
2941        seed_numbers_table(&rt, "nums1", 3);
2942        with_session(&rt, |call, _| {
2943            let resp = call(
2944                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums1"}}"#,
2945            );
2946            assert!(resp.contains("\"cursor_id\":1"));
2947            assert!(resp.contains("\"total_rows\":3"));
2948            assert!(resp.contains("\"columns\""));
2949            assert!(!resp.contains("\"error\""));
2950        });
2951    }
2952
2953    #[test]
2954    fn cursor_next_chunks_rows_and_signals_done() {
2955        let rt = make_runtime();
2956        seed_numbers_table(&rt, "nums2", 5);
2957        with_session(&rt, |call, _| {
2958            let _ = call(
2959                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums2"}}"#,
2960            );
2961            let first = call(
2962                r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2963            );
2964            assert!(first.contains("\"done\":false"));
2965            assert!(first.contains("\"remaining\":3"));
2966
2967            let second = call(
2968                r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2969            );
2970            assert!(second.contains("\"done\":false"));
2971            assert!(second.contains("\"remaining\":1"));
2972
2973            let third = call(
2974                r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":2}}"#,
2975            );
2976            assert!(third.contains("\"done\":true"));
2977            assert!(third.contains("\"remaining\":0"));
2978        });
2979    }
2980
2981    #[test]
2982    fn cursor_auto_drops_when_exhausted() {
2983        let rt = make_runtime();
2984        seed_numbers_table(&rt, "nums3", 2);
2985        with_session(&rt, |call, _| {
2986            let _ = call(
2987                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums3"}}"#,
2988            );
2989            let _ = call(
2990                r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
2991            );
2992            // Cursor was auto-dropped after done=true; subsequent next
2993            // must error with CURSOR_NOT_FOUND.
2994            let resp = call(
2995                r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":100}}"#,
2996            );
2997            assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
2998        });
2999    }
3000
3001    #[test]
3002    fn cursor_close_removes_it() {
3003        let rt = make_runtime();
3004        seed_numbers_table(&rt, "nums4", 3);
3005        with_session(&rt, |call, _| {
3006            let _ = call(
3007                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums4"}}"#,
3008            );
3009            let close =
3010                call(r#"{"jsonrpc":"2.0","id":2,"method":"query.close","params":{"cursor_id":1}}"#);
3011            assert!(close.contains("\"closed\":true"));
3012            let after = call(
3013                r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3014            );
3015            assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3016        });
3017    }
3018
3019    #[test]
3020    fn cursor_close_unknown_errors() {
3021        let rt = make_runtime();
3022        with_session(&rt, |call, _| {
3023            let resp = call(
3024                r#"{"jsonrpc":"2.0","id":1,"method":"query.close","params":{"cursor_id":9999}}"#,
3025            );
3026            assert!(resp.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3027        });
3028    }
3029
3030    #[test]
3031    fn cursor_next_without_cursor_id_errors() {
3032        let rt = make_runtime();
3033        with_session(&rt, |call, _| {
3034            let resp = call(r#"{"jsonrpc":"2.0","id":1,"method":"query.next","params":{}}"#);
3035            assert!(resp.contains("\"code\":\"INVALID_PARAMS\""));
3036        });
3037    }
3038
3039    #[test]
3040    fn cursor_default_batch_size_returns_all_when_smaller_than_default() {
3041        let rt = make_runtime();
3042        seed_numbers_table(&rt, "nums5", 7);
3043        with_session(&rt, |call, _| {
3044            let _ = call(
3045                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums5"}}"#,
3046            );
3047            // No batch_size → default 100, table has 7 rows, all in one call.
3048            let resp =
3049                call(r#"{"jsonrpc":"2.0","id":2,"method":"query.next","params":{"cursor_id":1}}"#);
3050            assert!(resp.contains("\"done\":true"));
3051            assert!(resp.contains("\"remaining\":0"));
3052        });
3053    }
3054
3055    #[test]
3056    fn close_method_drops_open_cursors() {
3057        let rt = make_runtime();
3058        seed_numbers_table(&rt, "nums6", 3);
3059        // Single session: open a cursor, call close, verify cursor is gone by reopening
3060        // fresh session and attempting to use cursor_id 1.
3061        with_session(&rt, |call, _| {
3062            let _ = call(
3063                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums6"}}"#,
3064            );
3065            let close = call(r#"{"jsonrpc":"2.0","id":2,"method":"close","params":null}"#);
3066            assert!(close.contains("\"__close__\":true"));
3067            // Cursor must be gone after close within the same session.
3068            let after = call(
3069                r#"{"jsonrpc":"2.0","id":3,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3070            );
3071            assert!(after.contains("\"code\":\"CURSOR_NOT_FOUND\""));
3072        });
3073    }
3074
3075    #[test]
3076    fn cursor_independent_of_transaction_state() {
3077        let rt = make_runtime();
3078        seed_numbers_table(&rt, "nums7", 4);
3079        with_session(&rt, |call, _| {
3080            // Open cursor, begin tx, commit tx — cursor survives.
3081            let _ = call(
3082                r#"{"jsonrpc":"2.0","id":1,"method":"query.open","params":{"sql":"SELECT n FROM nums7"}}"#,
3083            );
3084            let _ = call(r#"{"jsonrpc":"2.0","id":2,"method":"tx.begin","params":null}"#);
3085            let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
3086            let resp = call(
3087                r#"{"jsonrpc":"2.0","id":4,"method":"query.next","params":{"cursor_id":1,"batch_size":10}}"#,
3088            );
3089            assert!(resp.contains("\"done\":true"));
3090            assert!(!resp.contains("\"error\""));
3091        });
3092    }
3093
3094    #[test]
3095    fn second_tx_after_commit_gets_fresh_id() {
3096        let rt = make_runtime();
3097        let _ = handle(
3098            &rt,
3099            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE u6 (name TEXT)"}}"#,
3100        );
3101        with_session(&rt, |call, _| {
3102            let first = call(r#"{"jsonrpc":"2.0","id":1,"method":"tx.begin","params":null}"#);
3103            assert!(first.contains("\"tx_id\":1"));
3104            let _ = call(
3105                r#"{"jsonrpc":"2.0","id":2,"method":"insert","params":{"collection":"u6","payload":{"name":"x"}}}"#,
3106            );
3107            let _ = call(r#"{"jsonrpc":"2.0","id":3,"method":"tx.commit","params":null}"#);
3108
3109            let second = call(r#"{"jsonrpc":"2.0","id":4,"method":"tx.begin","params":null}"#);
3110            assert!(second.contains("\"tx_id\":2"));
3111            let _ = call(r#"{"jsonrpc":"2.0","id":5,"method":"tx.rollback","params":null}"#);
3112        });
3113    }
3114
3115    #[test]
3116    fn prepare_and_execute_prepared_statement() {
3117        let rt = make_runtime();
3118        // Create table + insert a row
3119        let _ = handle(
3120            &rt,
3121            r#"{"jsonrpc":"2.0","id":1,"method":"query","params":{"sql":"CREATE TABLE ps_test (n INTEGER)"}}"#,
3122        );
3123        let _ = handle(
3124            &rt,
3125            r#"{"jsonrpc":"2.0","id":2,"method":"query","params":{"sql":"INSERT INTO ps_test (n) VALUES (42)"}}"#,
3126        );
3127
3128        with_session(&rt, |call, _| {
3129            // Prepare a parameterized SELECT.
3130            let prep = call(
3131                r#"{"jsonrpc":"2.0","id":3,"method":"prepare","params":{"sql":"SELECT n FROM ps_test WHERE n = 42"}}"#,
3132            );
3133            assert!(prep.contains("\"prepared_id\""), "prepare response: {prep}");
3134
3135            // Extract the prepared_id.
3136            let id: u64 = {
3137                let v: crate::json::Value = crate::json::from_str(&prep).expect("json");
3138                let result = v.get("result").expect("result");
3139                result
3140                    .get("prepared_id")
3141                    .and_then(|n| n.as_f64())
3142                    .expect("prepared_id") as u64
3143            };
3144
3145            // Execute with the bind value for the parameterized literal.
3146            let exec = call(&format!(
3147                r#"{{"jsonrpc":"2.0","id":4,"method":"execute_prepared","params":{{"prepared_id":{id},"binds":[42]}}}}"#
3148            ));
3149            // Response uses "rows" key (see query_result_to_json).
3150            assert!(
3151                exec.contains("\"rows\""),
3152                "execute_prepared response: {exec}"
3153            );
3154            assert!(exec.contains("42"), "expected row with n=42 in: {exec}");
3155        });
3156    }
3157}