Skip to main content

kimberlite_query/
lib.rs

1//! # kmb-query: SQL query layer for `Kimberlite` projections
2//!
3//! This crate provides a minimal SQL query engine for compliance lookups
4//! against the projection store.
5//!
6//! ## SQL Subset
7//!
8//! Supported SQL features:
9//! - `SELECT` with column list or `*`
10//! - `FROM` single table or `JOIN` (INNER, LEFT)
11//! - `WHERE` with comparison predicates (`=`, `<`, `>`, `<=`, `>=`, `IN`)
12//! - `ORDER BY` (ascending/descending)
13//! - `LIMIT`
14//! - `GROUP BY` with aggregates (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`)
15//! - `HAVING` with aggregate filtering
16//! - `UNION` / `UNION ALL`
17//! - `DISTINCT`
18//! - `ALTER TABLE` (ADD COLUMN, DROP COLUMN)
19//! - Parameterized queries (`$1`, `$2`, ...)
20//!
21//! - `WITH` (Common Table Expressions / CTEs)
22//! - Subqueries in FROM and JOIN (`SELECT * FROM (SELECT ...) AS t`)
23//!
24//! Not yet supported:
25//! - `WITH RECURSIVE`
26//! - Window functions
27//!
28//! ## Usage
29//!
30//! ```ignore
31//! use kimberlite_query::{QueryEngine, Schema, SchemaBuilder, ColumnDef, DataType, Value};
32//! use kimberlite_store::{BTreeStore, TableId};
33//!
34//! // Define schema
35//! let schema = SchemaBuilder::new()
36//!     .table(
37//!         "users",
38//!         TableId::new(1),
39//!         vec![
40//!             ColumnDef::new("id", DataType::BigInt).not_null(),
41//!             ColumnDef::new("name", DataType::Text).not_null(),
42//!         ],
43//!         vec!["id".into()],
44//!     )
45//!     .build();
46//!
47//! // Create engine
48//! let engine = QueryEngine::new(schema);
49//!
50//! // Execute query
51//! let mut store = BTreeStore::open("data/projections")?;
52//! let result = engine.query(&mut store, "SELECT * FROM users WHERE id = $1", &[Value::BigInt(42)])?;
53//! ```
54//!
55//! ## Point-in-Time Queries
56//!
57//! For compliance, you can query at a specific log position:
58//!
59//! ```ignore
60//! let result = engine.query_at(
61//!     &mut store,
62//!     "SELECT * FROM users WHERE id = 1",
63//!     &[],
64//!     Offset::new(1000),  // Query state as of log position 1000
65//! )?;
66//! ```
67
68pub mod dml_planner;
69mod error;
70mod executor;
71pub mod explain;
72pub mod information_schema;
73pub mod key_encoder;
74mod parse_cache;
75mod parser;
76mod plan;
77mod planner;
78pub mod rbac_filter;
79mod schema;
80mod value;
81pub mod window;
82
83#[cfg(test)]
84mod tests;
85
86// Re-export public types
87pub use error::{QueryError, Result};
88pub use executor::{QueryResult, Row, execute};
89pub use parser::{
90    HavingCondition, HavingOp, ParsedAlterTable, ParsedColumn, ParsedCreateIndex,
91    ParsedCreateMask, ParsedCreateTable, ParsedCreateUser, ParsedCte, ParsedDelete, ParsedGrant,
92    ParsedInsert, ParsedSelect, ParsedSetClassification, ParsedStatement, ParsedUnion,
93    ParsedUpdate, Predicate, PredicateValue, TimeTravel, extract_at_offset,
94    extract_time_travel, parse_statement, try_parse_custom_statement,
95};
96pub use planner::plan_query;
97pub use schema::{
98    ColumnDef, ColumnName, DataType, IndexDef, Schema, SchemaBuilder, TableDef, TableName,
99};
100pub use value::Value;
101
102use kimberlite_store::ProjectionStore;
103use kimberlite_types::Offset;
104
105/// Query engine for executing SQL against a projection store.
106///
107/// Holds the schema plus an optional parse cache (AUDIT-2026-04
108/// S3.4). The engine is `Clone` — the parse cache is shared via
109/// `Arc` so cloned handles hit the same memoised entries.
110#[derive(Debug, Clone)]
111pub struct QueryEngine {
112    schema: Schema,
113    parse_cache: Option<std::sync::Arc<parse_cache::ParseCache>>,
114}
115
116impl QueryEngine {
117    /// Creates a new query engine with the given schema. No
118    /// parse cache is attached by default — use
119    /// [`Self::with_parse_cache`] to opt in.
120    pub fn new(schema: Schema) -> Self {
121        Self {
122            schema,
123            parse_cache: None,
124        }
125    }
126
127    /// Attach an LRU parse cache of the given size. `0` disables
128    /// caching (every call re-parses).
129    #[must_use]
130    pub fn with_parse_cache(mut self, max_size: usize) -> Self {
131        self.parse_cache = Some(std::sync::Arc::new(parse_cache::ParseCache::new(max_size)));
132        self
133    }
134
135    /// Returns a snapshot of parse-cache stats, or `None` if no
136    /// cache is attached.
137    pub fn parse_cache_stats(&self) -> Option<parse_cache::ParseCacheStats> {
138        self.parse_cache.as_deref().map(parse_cache::ParseCache::stats)
139    }
140
141    /// Clear the parse cache, if any.
142    pub fn clear_parse_cache(&self) {
143        if let Some(c) = &self.parse_cache {
144            c.clear();
145        }
146    }
147
148    /// Returns a reference to the schema.
149    pub fn schema(&self) -> &Schema {
150        &self.schema
151    }
152
153    /// Parses a SQL string and extracts the SELECT or UNION statement.
154    ///
155    /// Static variant — bypasses the parse cache. Used by call
156    /// sites that predate the cache and by internal recursive
157    /// parsers.
158    fn parse_query_statement(sql: &str) -> Result<parser::ParsedStatement> {
159        let stmt = parser::parse_statement(sql)?;
160        match &stmt {
161            parser::ParsedStatement::Select(_) | parser::ParsedStatement::Union(_) => Ok(stmt),
162            _ => Err(QueryError::UnsupportedFeature(
163                "only SELECT and UNION queries are supported".to_string(),
164            )),
165        }
166    }
167
168    /// Cache-aware parse wrapper.
169    ///
170    /// Looks the SQL up in the parse cache (if attached). On
171    /// miss, parses via [`Self::parse_query_statement`] and
172    /// inserts into the cache. Non-SELECT/UNION errors are
173    /// returned directly without populating the cache — they're
174    /// errors for every subsequent call anyway and we don't want
175    /// to memoise them.
176    fn parse_query_statement_cached(
177        &self,
178        sql: &str,
179    ) -> Result<parser::ParsedStatement> {
180        if let Some(cache) = &self.parse_cache {
181            if let Some(stmt) = cache.get(sql) {
182                return Ok(stmt);
183            }
184        }
185        let stmt = Self::parse_query_statement(sql)?;
186        if let Some(cache) = &self.parse_cache {
187            cache.insert(sql.to_string(), stmt.clone());
188        }
189        Ok(stmt)
190    }
191
192    /// Executes a SQL query against the current store state.
193    ///
194    /// Supports SELECT and UNION/UNION ALL queries.
195    ///
196    /// # Arguments
197    ///
198    /// * `store` - The projection store to query
199    /// * `sql` - SQL query string
200    /// * `params` - Query parameters (for `$1`, `$2`, etc.)
201    ///
202    /// # Example
203    ///
204    /// ```ignore
205    /// let result = engine.query(
206    ///     &mut store,
207    ///     "SELECT name FROM users WHERE id = $1",
208    ///     &[Value::BigInt(42)],
209    /// )?;
210    /// ```
211    pub fn query<S: ProjectionStore>(
212        &self,
213        store: &mut S,
214        sql: &str,
215        params: &[Value],
216    ) -> Result<QueryResult> {
217        // AUDIT-2026-04 S3.5 — healthcare BREAK_GLASS prefix.
218        // `WITH BREAK_GLASS REASON='...' SELECT ...` strips the
219        // prefix, emits a warn-level structured log for the
220        // caller's audit pipeline to pick up, and falls through
221        // to the normal query path. The reason is the attribution
222        // value — enforcement (RBAC + masking) is still applied.
223        let (after_break_glass, break_glass_reason) =
224            explain::extract_break_glass(sql);
225        if let Some(ref reason) = break_glass_reason {
226            tracing::warn!(
227                break_glass_reason = %reason,
228                "BREAK_GLASS query — regulator-visible audit signal",
229            );
230        }
231        let sql = after_break_glass;
232
233        // AUDIT-2026-04 S3.4 — `information_schema.*` virtual-table
234        // interception. Synthesises results from the live schema
235        // without going through the planner/executor. Callers
236        // that point FROM at `information_schema.tables` or
237        // `.columns` get back schema introspection rows without
238        // the table needing to be registered in the store.
239        if let Some(result) = information_schema::maybe_answer(sql, &self.schema) {
240            return Ok(result);
241        }
242
243        // AUDIT-2026-04 S3.3 — EXPLAIN prefix dispatch. A caller
244        // issuing `EXPLAIN SELECT ...` gets a single-row result
245        // whose only value is the rendered plan tree, rather
246        // than executing the statement.
247        let (after_explain, is_explain) = explain::extract_explain(sql);
248        if is_explain {
249            let plan_text = self.explain(after_explain, params)?;
250            return Ok(executor::QueryResult {
251                columns: vec!["plan".into()],
252                rows: vec![vec![Value::Text(plan_text)]],
253            });
254        }
255        let sql = after_explain; // equivalent to original `sql` when EXPLAIN absent
256
257        // Extract time-travel clause (AT OFFSET / FOR SYSTEM_TIME AS OF / AS OF)
258        // before passing SQL to sqlparser. Offset syntax dispatches
259        // directly; timestamp syntax without a resolver errors out
260        // with a clear message pointing to
261        // `query_at_timestamp(..., resolver)`.
262        let (cleaned_sql, time_travel) = parser::extract_time_travel(sql);
263        match time_travel {
264            Some(parser::TimeTravel::Offset(o)) => {
265                return self.query_at(store, &cleaned_sql, params, Offset::new(o));
266            }
267            Some(parser::TimeTravel::TimestampNs(_)) => {
268                return Err(QueryError::UnsupportedFeature(
269                    "FOR SYSTEM_TIME AS OF '<iso>' / AS OF '<iso>' \
270                     requires a timestamp→offset resolver — use \
271                     QueryEngine::query_at_timestamp(..., resolver)"
272                        .to_string(),
273                ));
274            }
275            None => {}
276        }
277        let sql = sql;  // unmodified; suppress unused-shadow lint
278
279        let stmt = self.parse_query_statement_cached(sql)?;
280
281        match stmt {
282            parser::ParsedStatement::Select(parsed) => {
283                let window_fns = parsed.window_fns.clone();
284                let result = if parsed.ctes.is_empty() {
285                    let plan = planner::plan_query(&self.schema, &parsed, params)?;
286                    let table_def = self
287                        .schema
288                        .get_table(&plan.table_name().into())
289                        .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
290                    executor::execute(store, &plan, table_def)?
291                } else {
292                    self.execute_with_ctes(store, &parsed, params)?
293                };
294                // AUDIT-2026-04 S3.2 — window functions are a
295                // post-pass over the base SELECT result; the base
296                // plan already projected the columns the window
297                // fn references.
298                window::apply_window_fns(result, &window_fns)
299            }
300            parser::ParsedStatement::Union(union_stmt) => {
301                self.execute_union(store, &union_stmt, params)
302            }
303            _ => unreachable!("parse_query_statement only returns Select or Union"),
304        }
305    }
306
307    /// Executes a SELECT with CTEs by materializing each CTE and building
308    /// a temporary schema that includes the CTE result sets as tables.
309    fn execute_with_ctes<S: ProjectionStore>(
310        &self,
311        store: &mut S,
312        parsed: &parser::ParsedSelect,
313        params: &[Value],
314    ) -> Result<QueryResult> {
315        // Build an extended schema that includes CTE-derived tables
316        let mut extended_schema = self.schema.clone();
317
318        // Materialize each CTE
319        for cte in &parsed.ctes {
320            // Execute the CTE's inner query
321            let cte_plan = planner::plan_query(&extended_schema, &cte.query, params)?;
322            let cte_table_def = extended_schema
323                .get_table(&cte_plan.table_name().into())
324                .ok_or_else(|| QueryError::TableNotFound(cte_plan.table_name().to_string()))?;
325            let cte_result = executor::execute(store, &cte_plan, cte_table_def)?;
326
327            // Register CTE result as a virtual table in the extended schema
328            // Use a synthetic table ID based on the CTE name hash
329            let cte_table_id = {
330                use std::collections::hash_map::DefaultHasher;
331                use std::hash::{Hash, Hasher};
332                let mut hasher = DefaultHasher::new();
333                cte.name.hash(&mut hasher);
334                kimberlite_store::TableId::new(hasher.finish())
335            };
336
337            // Build column defs from the CTE result
338            let cte_columns: Vec<schema::ColumnDef> = cte_result
339                .columns
340                .iter()
341                .map(|col| schema::ColumnDef::new(col.as_str(), schema::DataType::Text))
342                .collect();
343
344            let pk_cols = if cte_columns.is_empty() {
345                vec![]
346            } else {
347                vec![cte_result.columns[0].clone()]
348            };
349
350            let cte_table = schema::TableDef::new(cte_table_id, cte_columns, pk_cols);
351            extended_schema.add_table(cte.name.as_str(), cte_table);
352
353            // Write CTE rows into the store as a temporary table
354            for (row_idx, row) in cte_result.rows.iter().enumerate() {
355                let mut row_map = serde_json::Map::new();
356                for (col, val) in cte_result.columns.iter().zip(row.iter()) {
357                    row_map.insert(col.as_str().to_string(), value_to_json(val));
358                }
359
360                let json_val =
361                    serde_json::to_vec(&serde_json::Value::Object(row_map)).map_err(|e| {
362                        QueryError::UnsupportedFeature(format!("CTE serialization failed: {e}"))
363                    })?;
364
365                let pk_key = crate::key_encoder::encode_key(&[Value::BigInt(row_idx as i64)]);
366                let batch = kimberlite_store::WriteBatch::new(kimberlite_types::Offset::new(
367                    store.applied_position().as_u64() + 1,
368                ))
369                .put(cte_table_id, pk_key, bytes::Bytes::from(json_val));
370                store.apply(batch)?;
371            }
372        }
373
374        // Execute the main query against the extended schema
375        let main_query = parser::ParsedSelect {
376            ctes: vec![], // CTEs already materialized
377            ..parsed.clone()
378        };
379
380        let plan = planner::plan_query(&extended_schema, &main_query, params)?;
381        let table_def = extended_schema
382            .get_table(&plan.table_name().into())
383            .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
384        executor::execute(store, &plan, table_def)
385    }
386
387    /// Executes a UNION / UNION ALL query.
388    fn execute_union<S: ProjectionStore>(
389        &self,
390        store: &mut S,
391        union_stmt: &parser::ParsedUnion,
392        params: &[Value],
393    ) -> Result<QueryResult> {
394        // Plan and execute left side
395        let left_plan = planner::plan_query(&self.schema, &union_stmt.left, params)?;
396        let left_table_def = self
397            .schema
398            .get_table(&left_plan.table_name().into())
399            .ok_or_else(|| QueryError::TableNotFound(left_plan.table_name().to_string()))?;
400        let left_result = executor::execute(store, &left_plan, left_table_def)?;
401
402        // Plan and execute right side
403        let right_plan = planner::plan_query(&self.schema, &union_stmt.right, params)?;
404        let right_table_def = self
405            .schema
406            .get_table(&right_plan.table_name().into())
407            .ok_or_else(|| QueryError::TableNotFound(right_plan.table_name().to_string()))?;
408        let right_result = executor::execute(store, &right_plan, right_table_def)?;
409
410        // Use left side column names for the result
411        let column_names = left_result.columns;
412
413        // Combine rows
414        let mut all_rows = left_result.rows;
415        all_rows.extend(right_result.rows);
416
417        // UNION (not ALL) removes duplicates
418        if !union_stmt.all {
419            let mut seen = std::collections::HashSet::new();
420            all_rows.retain(|row| {
421                // Use debug format as hash key (Value doesn't impl Hash)
422                let key = format!("{row:?}");
423                seen.insert(key)
424            });
425        }
426
427        Ok(QueryResult {
428            columns: column_names,
429            rows: all_rows,
430        })
431    }
432
433    /// Executes a SQL query at a specific log position (point-in-time query).
434    ///
435    /// This enables compliance queries that show the state as it was
436    /// at a specific point in the log.
437    ///
438    /// # Arguments
439    ///
440    /// * `store` - The projection store to query
441    /// * `sql` - SQL query string
442    /// * `params` - Query parameters
443    /// * `position` - Log position to query at
444    ///
445    /// # Example
446    ///
447    /// ```ignore
448    /// // Get user state as of log position 1000
449    /// let result = engine.query_at(
450    ///     &mut store,
451    ///     "SELECT * FROM users WHERE id = 1",
452    ///     &[],
453    ///     Offset::new(1000),
454    /// )?;
455    /// ```
456    pub fn query_at<S: ProjectionStore>(
457        &self,
458        store: &mut S,
459        sql: &str,
460        params: &[Value],
461        position: Offset,
462    ) -> Result<QueryResult> {
463        let stmt = self.parse_query_statement_cached(sql)?;
464
465        match stmt {
466            parser::ParsedStatement::Select(parsed) => {
467                let plan = planner::plan_query(&self.schema, &parsed, params)?;
468                let table_def = self
469                    .schema
470                    .get_table(&plan.table_name().into())
471                    .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
472                executor::execute_at(store, &plan, table_def, position)
473            }
474            parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
475                "UNION is not supported in point-in-time queries".to_string(),
476            )),
477            _ => unreachable!("parse_query_statement only returns Select or Union"),
478        }
479    }
480
481    /// Executes a query against a historical snapshot selected by
482    /// wall-clock timestamp (AUDIT-2026-04 L-4).
483    ///
484    /// This is the user-facing ergonomic form of
485    /// [`Self::query_at`] — healthcare auditors ask "what did the
486    /// chart look like on 2026-01-15?", not "what was log offset
487    /// 948,274?". The caller supplies a `resolver` callback that
488    /// translates a Unix-nanosecond timestamp into the log offset
489    /// whose commit timestamp is the greatest value ≤ the target.
490    ///
491    /// The resolver is a callback rather than a hard dependency
492    /// so the query crate does not take a direct dep on
493    /// `kimberlite-compliance::audit` or the kernel's audit log.
494    /// A typical impl performs a binary search on the in-memory
495    /// audit index.
496    ///
497    /// # Errors
498    ///
499    /// - [`QueryError::UnsupportedFeature`] if the resolver
500    ///   returns `None` (no offset exists at or before the target
501    ///   — typically because the log is empty or the timestamp
502    ///   predates genesis).
503    ///
504    /// # Example
505    ///
506    /// ```ignore
507    /// let resolver = |ts_ns: i64| -> Option<Offset> {
508    ///     audit_log.offset_at_or_before(ts_ns)
509    /// };
510    /// let result = engine.query_at_timestamp(
511    ///     &mut store,
512    ///     "SELECT * FROM charts WHERE patient_id = $1",
513    ///     &[Value::BigInt(42)],
514    ///     1_760_000_000_000_000_000, // 2025-10-09T07:06:40Z in ns
515    ///     resolver,
516    /// )?;
517    /// ```
518    pub fn query_at_timestamp<S, R>(
519        &self,
520        store: &mut S,
521        sql: &str,
522        params: &[Value],
523        target_ns: i64,
524        resolver: R,
525    ) -> Result<QueryResult>
526    where
527        S: ProjectionStore,
528        R: FnOnce(i64) -> Option<Offset>,
529    {
530        let offset = resolver(target_ns).ok_or_else(|| {
531            QueryError::UnsupportedFeature(format!(
532                "no log offset at or before timestamp {target_ns} ns \
533                 (empty log or predates genesis)"
534            ))
535        })?;
536        self.query_at(store, sql, params, offset)
537    }
538
539    /// AUDIT-2026-04 S3.3 — render a SQL query's access plan
540    /// without executing it.
541    ///
542    /// Returns a deterministic multi-line tree string — same
543    /// query always produces the same bytes, which lets apps
544    /// diff plans across schema versions and catch unexpected
545    /// regressions.
546    ///
547    /// The rendered plan **never reveals row data** — only table
548    /// names, column counts, filter presence/absence, and LIMIT
549    /// bounds. Masked column names render as their source name
550    /// (masks are applied post-projection and are not a plan
551    /// concern).
552    ///
553    /// # Errors
554    ///
555    /// Any error from parsing or planning (unsupported statement,
556    /// missing table, etc.) propagates verbatim.
557    ///
558    /// # Example
559    ///
560    /// ```ignore
561    /// let tree = engine.explain("SELECT * FROM patients WHERE id = $1", &[Value::BigInt(42)])?;
562    /// println!("{tree}");
563    /// // -> PointLookup [patients, cols=3]
564    /// ```
565    pub fn explain(&self, sql: &str, params: &[Value]) -> Result<String> {
566        let stmt = self.parse_query_statement_cached(sql)?;
567        match stmt {
568            parser::ParsedStatement::Select(parsed) => {
569                let plan = planner::plan_query(&self.schema, &parsed, params)?;
570                Ok(explain::explain_plan(&plan))
571            }
572            parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
573                "EXPLAIN does not yet render UNION plans".to_string(),
574            )),
575            _ => unreachable!("parse_query_statement only returns Select or Union"),
576        }
577    }
578
579    /// Parses a SQL query without executing it.
580    ///
581    /// Useful for validation or query plan inspection.
582    pub fn prepare(&self, sql: &str, params: &[Value]) -> Result<PreparedQuery> {
583        let stmt = self.parse_query_statement_cached(sql)?;
584        let parsed = match stmt {
585            parser::ParsedStatement::Select(s) => s,
586            _ => {
587                return Err(QueryError::UnsupportedFeature(
588                    "only SELECT queries can be prepared".to_string(),
589                ));
590            }
591        };
592        let plan = planner::plan_query(&self.schema, &parsed, params)?;
593
594        Ok(PreparedQuery {
595            plan,
596            schema: self.schema.clone(),
597        })
598    }
599}
600
601/// A prepared (planned) query ready for execution.
602#[derive(Debug, Clone)]
603pub struct PreparedQuery {
604    plan: plan::QueryPlan,
605    schema: Schema,
606}
607
608impl PreparedQuery {
609    /// Executes this prepared query against the current store state.
610    pub fn execute<S: ProjectionStore>(&self, store: &mut S) -> Result<QueryResult> {
611        let table_def = self
612            .schema
613            .get_table(&self.plan.table_name().into())
614            .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
615
616        executor::execute(store, &self.plan, table_def)
617    }
618
619    /// Executes this prepared query at a specific log position.
620    pub fn execute_at<S: ProjectionStore>(
621        &self,
622        store: &mut S,
623        position: Offset,
624    ) -> Result<QueryResult> {
625        let table_def = self
626            .schema
627            .get_table(&self.plan.table_name().into())
628            .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
629
630        executor::execute_at(store, &self.plan, table_def, position)
631    }
632
633    /// Returns the column names this query will return.
634    pub fn columns(&self) -> &[ColumnName] {
635        self.plan.column_names()
636    }
637
638    /// Returns the table name being queried.
639    pub fn table_name(&self) -> &str {
640        self.plan.table_name()
641    }
642}
643
644/// Converts a Value to a serde_json::Value for CTE materialization.
645fn value_to_json(val: &Value) -> serde_json::Value {
646    // NEVER: Placeholder values must be bound before reaching the CTE /
647    // query-result JSON boundary. An unbound Placeholder here indicates a
648    // parameter-binding bug (AUDIT fix: placeholders must be resolved upstream).
649    kimberlite_properties::never!(
650        matches!(val, Value::Placeholder(_)),
651        "query.placeholder_reaches_result_boundary",
652        "Value::Placeholder must never reach query-result / JSON serialization boundary"
653    );
654    match val {
655        Value::Null | Value::Placeholder(_) => serde_json::Value::Null,
656        Value::BigInt(i) => serde_json::json!(i),
657        Value::TinyInt(i) => serde_json::json!(i),
658        Value::SmallInt(i) => serde_json::json!(i),
659        Value::Integer(i) => serde_json::json!(i),
660        Value::Real(f) => serde_json::json!(f),
661        Value::Decimal(v, scale) => {
662            // Format decimal: store the raw value and scale as a string
663            if *scale == 0 {
664                serde_json::json!(v.to_string())
665            } else {
666                let divisor = 10i128.pow(u32::from(*scale));
667                let whole = v / divisor;
668                let frac = (v % divisor).unsigned_abs();
669                serde_json::json!(format!("{whole}.{frac:0>width$}", width = *scale as usize))
670            }
671        }
672        Value::Text(s) => serde_json::json!(s),
673        Value::Boolean(b) => serde_json::json!(b),
674        Value::Date(d) => serde_json::json!(d),
675        Value::Time(t) => serde_json::json!(t),
676        Value::Timestamp(ts) => serde_json::json!(ts.as_nanos()),
677        Value::Uuid(u) => {
678            // Format UUID bytes as hex string
679            let hex: String = u.iter().map(|b| format!("{b:02x}")).collect();
680            serde_json::json!(hex)
681        }
682        Value::Json(j) => j.clone(),
683        Value::Bytes(b) => {
684            use base64::Engine;
685            serde_json::json!(base64::engine::general_purpose::STANDARD.encode(b))
686        }
687    }
688}