kimberlite_query/lib.rs
1//! # kmb-query: SQL query layer for `Kimberlite` projections
2//!
3//! This crate provides a minimal SQL query engine for compliance lookups
4//! against the projection store.
5//!
6//! ## SQL Subset
7//!
8//! Supported SQL features:
9//! - `SELECT` with column list or `*`
10//! - `FROM` single table or `JOIN` (INNER, LEFT)
11//! - `WHERE` with comparison predicates (`=`, `<`, `>`, `<=`, `>=`, `IN`)
12//! - `ORDER BY` (ascending/descending)
13//! - `LIMIT`
14//! - `GROUP BY` with aggregates (`COUNT`, `SUM`, `AVG`, `MIN`, `MAX`)
15//! - `HAVING` with aggregate filtering
16//! - `UNION` / `UNION ALL`
17//! - `DISTINCT`
18//! - `ALTER TABLE` (ADD COLUMN, DROP COLUMN)
19//! - Parameterized queries (`$1`, `$2`, ...)
20//!
21//! - `WITH` (Common Table Expressions / CTEs)
22//! - Subqueries in FROM and JOIN (`SELECT * FROM (SELECT ...) AS t`)
23//!
24//! Not yet supported:
25//! - `WITH RECURSIVE`
26//! - Window functions
27//!
28//! ## Usage
29//!
30//! ```ignore
31//! use kimberlite_query::{QueryEngine, Schema, SchemaBuilder, ColumnDef, DataType, Value};
32//! use kimberlite_store::{BTreeStore, TableId};
33//!
34//! // Define schema
35//! let schema = SchemaBuilder::new()
36//! .table(
37//! "users",
38//! TableId::new(1),
39//! vec![
40//! ColumnDef::new("id", DataType::BigInt).not_null(),
41//! ColumnDef::new("name", DataType::Text).not_null(),
42//! ],
43//! vec!["id".into()],
44//! )
45//! .build();
46//!
47//! // Create engine
48//! let engine = QueryEngine::new(schema);
49//!
50//! // Execute query
51//! let mut store = BTreeStore::open("data/projections")?;
52//! let result = engine.query(&mut store, "SELECT * FROM users WHERE id = $1", &[Value::BigInt(42)])?;
53//! ```
54//!
55//! ## Point-in-Time Queries
56//!
57//! For compliance, you can query at a specific log position:
58//!
59//! ```ignore
60//! let result = engine.query_at(
61//! &mut store,
62//! "SELECT * FROM users WHERE id = 1",
63//! &[],
64//! Offset::new(1000), // Query state as of log position 1000
65//! )?;
66//! ```
67
68pub mod dml_planner;
69mod error;
70mod executor;
71pub mod explain;
72pub mod information_schema;
73pub mod key_encoder;
74mod parse_cache;
75mod parser;
76mod plan;
77mod planner;
78pub mod rbac_filter;
79mod schema;
80mod value;
81pub mod window;
82
83#[cfg(test)]
84mod tests;
85
86// Re-export public types
87pub use error::{QueryError, Result};
88pub use executor::{QueryResult, Row, execute};
89pub use parser::{
90 HavingCondition, HavingOp, ParsedAlterTable, ParsedColumn, ParsedCreateIndex,
91 ParsedCreateMask, ParsedCreateTable, ParsedCreateUser, ParsedCte, ParsedDelete, ParsedGrant,
92 ParsedInsert, ParsedSelect, ParsedSetClassification, ParsedStatement, ParsedUnion,
93 ParsedUpdate, Predicate, PredicateValue, TimeTravel, extract_at_offset,
94 extract_time_travel, parse_statement, try_parse_custom_statement,
95};
96pub use planner::plan_query;
97pub use schema::{
98 ColumnDef, ColumnName, DataType, IndexDef, Schema, SchemaBuilder, TableDef, TableName,
99};
100pub use value::Value;
101
102use kimberlite_store::ProjectionStore;
103use kimberlite_types::Offset;
104
105/// Query engine for executing SQL against a projection store.
106///
107/// Holds the schema plus an optional parse cache (AUDIT-2026-04
108/// S3.4). The engine is `Clone` — the parse cache is shared via
109/// `Arc` so cloned handles hit the same memoised entries.
110#[derive(Debug, Clone)]
111pub struct QueryEngine {
112 schema: Schema,
113 parse_cache: Option<std::sync::Arc<parse_cache::ParseCache>>,
114}
115
116impl QueryEngine {
117 /// Creates a new query engine with the given schema. No
118 /// parse cache is attached by default — use
119 /// [`Self::with_parse_cache`] to opt in.
120 pub fn new(schema: Schema) -> Self {
121 Self {
122 schema,
123 parse_cache: None,
124 }
125 }
126
127 /// Attach an LRU parse cache of the given size. `0` disables
128 /// caching (every call re-parses).
129 #[must_use]
130 pub fn with_parse_cache(mut self, max_size: usize) -> Self {
131 self.parse_cache = Some(std::sync::Arc::new(parse_cache::ParseCache::new(max_size)));
132 self
133 }
134
135 /// Returns a snapshot of parse-cache stats, or `None` if no
136 /// cache is attached.
137 pub fn parse_cache_stats(&self) -> Option<parse_cache::ParseCacheStats> {
138 self.parse_cache.as_deref().map(parse_cache::ParseCache::stats)
139 }
140
141 /// Clear the parse cache, if any.
142 pub fn clear_parse_cache(&self) {
143 if let Some(c) = &self.parse_cache {
144 c.clear();
145 }
146 }
147
148 /// Returns a reference to the schema.
149 pub fn schema(&self) -> &Schema {
150 &self.schema
151 }
152
153 /// Parses a SQL string and extracts the SELECT or UNION statement.
154 ///
155 /// Static variant — bypasses the parse cache. Used by call
156 /// sites that predate the cache and by internal recursive
157 /// parsers.
158 fn parse_query_statement(sql: &str) -> Result<parser::ParsedStatement> {
159 let stmt = parser::parse_statement(sql)?;
160 match &stmt {
161 parser::ParsedStatement::Select(_) | parser::ParsedStatement::Union(_) => Ok(stmt),
162 _ => Err(QueryError::UnsupportedFeature(
163 "only SELECT and UNION queries are supported".to_string(),
164 )),
165 }
166 }
167
168 /// Cache-aware parse wrapper.
169 ///
170 /// Looks the SQL up in the parse cache (if attached). On
171 /// miss, parses via [`Self::parse_query_statement`] and
172 /// inserts into the cache. Non-SELECT/UNION errors are
173 /// returned directly without populating the cache — they're
174 /// errors for every subsequent call anyway and we don't want
175 /// to memoise them.
176 fn parse_query_statement_cached(
177 &self,
178 sql: &str,
179 ) -> Result<parser::ParsedStatement> {
180 if let Some(cache) = &self.parse_cache {
181 if let Some(stmt) = cache.get(sql) {
182 return Ok(stmt);
183 }
184 }
185 let stmt = Self::parse_query_statement(sql)?;
186 if let Some(cache) = &self.parse_cache {
187 cache.insert(sql.to_string(), stmt.clone());
188 }
189 Ok(stmt)
190 }
191
192 /// Executes a SQL query against the current store state.
193 ///
194 /// Supports SELECT and UNION/UNION ALL queries.
195 ///
196 /// # Arguments
197 ///
198 /// * `store` - The projection store to query
199 /// * `sql` - SQL query string
200 /// * `params` - Query parameters (for `$1`, `$2`, etc.)
201 ///
202 /// # Example
203 ///
204 /// ```ignore
205 /// let result = engine.query(
206 /// &mut store,
207 /// "SELECT name FROM users WHERE id = $1",
208 /// &[Value::BigInt(42)],
209 /// )?;
210 /// ```
211 pub fn query<S: ProjectionStore>(
212 &self,
213 store: &mut S,
214 sql: &str,
215 params: &[Value],
216 ) -> Result<QueryResult> {
217 // AUDIT-2026-04 S3.5 — healthcare BREAK_GLASS prefix.
218 // `WITH BREAK_GLASS REASON='...' SELECT ...` strips the
219 // prefix, emits a warn-level structured log for the
220 // caller's audit pipeline to pick up, and falls through
221 // to the normal query path. The reason is the attribution
222 // value — enforcement (RBAC + masking) is still applied.
223 let (after_break_glass, break_glass_reason) =
224 explain::extract_break_glass(sql);
225 if let Some(ref reason) = break_glass_reason {
226 tracing::warn!(
227 break_glass_reason = %reason,
228 "BREAK_GLASS query — regulator-visible audit signal",
229 );
230 }
231 let sql = after_break_glass;
232
233 // AUDIT-2026-04 S3.4 — `information_schema.*` virtual-table
234 // interception. Synthesises results from the live schema
235 // without going through the planner/executor. Callers
236 // that point FROM at `information_schema.tables` or
237 // `.columns` get back schema introspection rows without
238 // the table needing to be registered in the store.
239 if let Some(result) = information_schema::maybe_answer(sql, &self.schema) {
240 return Ok(result);
241 }
242
243 // AUDIT-2026-04 S3.3 — EXPLAIN prefix dispatch. A caller
244 // issuing `EXPLAIN SELECT ...` gets a single-row result
245 // whose only value is the rendered plan tree, rather
246 // than executing the statement.
247 let (after_explain, is_explain) = explain::extract_explain(sql);
248 if is_explain {
249 let plan_text = self.explain(after_explain, params)?;
250 return Ok(executor::QueryResult {
251 columns: vec!["plan".into()],
252 rows: vec![vec![Value::Text(plan_text)]],
253 });
254 }
255 let sql = after_explain; // equivalent to original `sql` when EXPLAIN absent
256
257 // Extract time-travel clause (AT OFFSET / FOR SYSTEM_TIME AS OF / AS OF)
258 // before passing SQL to sqlparser. Offset syntax dispatches
259 // directly; timestamp syntax without a resolver errors out
260 // with a clear message pointing to
261 // `query_at_timestamp(..., resolver)`.
262 let (cleaned_sql, time_travel) = parser::extract_time_travel(sql);
263 match time_travel {
264 Some(parser::TimeTravel::Offset(o)) => {
265 return self.query_at(store, &cleaned_sql, params, Offset::new(o));
266 }
267 Some(parser::TimeTravel::TimestampNs(_)) => {
268 return Err(QueryError::UnsupportedFeature(
269 "FOR SYSTEM_TIME AS OF '<iso>' / AS OF '<iso>' \
270 requires a timestamp→offset resolver — use \
271 QueryEngine::query_at_timestamp(..., resolver)"
272 .to_string(),
273 ));
274 }
275 None => {}
276 }
277 let sql = sql; // unmodified; suppress unused-shadow lint
278
279 let stmt = self.parse_query_statement_cached(sql)?;
280
281 match stmt {
282 parser::ParsedStatement::Select(parsed) => {
283 let window_fns = parsed.window_fns.clone();
284 let result = if parsed.ctes.is_empty() {
285 let plan = planner::plan_query(&self.schema, &parsed, params)?;
286 let table_def = self
287 .schema
288 .get_table(&plan.table_name().into())
289 .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
290 executor::execute(store, &plan, table_def)?
291 } else {
292 self.execute_with_ctes(store, &parsed, params)?
293 };
294 // AUDIT-2026-04 S3.2 — window functions are a
295 // post-pass over the base SELECT result; the base
296 // plan already projected the columns the window
297 // fn references.
298 window::apply_window_fns(result, &window_fns)
299 }
300 parser::ParsedStatement::Union(union_stmt) => {
301 self.execute_union(store, &union_stmt, params)
302 }
303 _ => unreachable!("parse_query_statement only returns Select or Union"),
304 }
305 }
306
307 /// Executes a SELECT with CTEs by materializing each CTE and building
308 /// a temporary schema that includes the CTE result sets as tables.
309 fn execute_with_ctes<S: ProjectionStore>(
310 &self,
311 store: &mut S,
312 parsed: &parser::ParsedSelect,
313 params: &[Value],
314 ) -> Result<QueryResult> {
315 // Build an extended schema that includes CTE-derived tables
316 let mut extended_schema = self.schema.clone();
317
318 // Materialize each CTE
319 for cte in &parsed.ctes {
320 // Execute the CTE's inner query
321 let cte_plan = planner::plan_query(&extended_schema, &cte.query, params)?;
322 let cte_table_def = extended_schema
323 .get_table(&cte_plan.table_name().into())
324 .ok_or_else(|| QueryError::TableNotFound(cte_plan.table_name().to_string()))?;
325 let cte_result = executor::execute(store, &cte_plan, cte_table_def)?;
326
327 // Register CTE result as a virtual table in the extended schema
328 // Use a synthetic table ID based on the CTE name hash
329 let cte_table_id = {
330 use std::collections::hash_map::DefaultHasher;
331 use std::hash::{Hash, Hasher};
332 let mut hasher = DefaultHasher::new();
333 cte.name.hash(&mut hasher);
334 kimberlite_store::TableId::new(hasher.finish())
335 };
336
337 // Build column defs from the CTE result
338 let cte_columns: Vec<schema::ColumnDef> = cte_result
339 .columns
340 .iter()
341 .map(|col| schema::ColumnDef::new(col.as_str(), schema::DataType::Text))
342 .collect();
343
344 let pk_cols = if cte_columns.is_empty() {
345 vec![]
346 } else {
347 vec![cte_result.columns[0].clone()]
348 };
349
350 let cte_table = schema::TableDef::new(cte_table_id, cte_columns, pk_cols);
351 extended_schema.add_table(cte.name.as_str(), cte_table);
352
353 // Write CTE rows into the store as a temporary table
354 for (row_idx, row) in cte_result.rows.iter().enumerate() {
355 let mut row_map = serde_json::Map::new();
356 for (col, val) in cte_result.columns.iter().zip(row.iter()) {
357 row_map.insert(col.as_str().to_string(), value_to_json(val));
358 }
359
360 let json_val =
361 serde_json::to_vec(&serde_json::Value::Object(row_map)).map_err(|e| {
362 QueryError::UnsupportedFeature(format!("CTE serialization failed: {e}"))
363 })?;
364
365 let pk_key = crate::key_encoder::encode_key(&[Value::BigInt(row_idx as i64)]);
366 let batch = kimberlite_store::WriteBatch::new(kimberlite_types::Offset::new(
367 store.applied_position().as_u64() + 1,
368 ))
369 .put(cte_table_id, pk_key, bytes::Bytes::from(json_val));
370 store.apply(batch)?;
371 }
372 }
373
374 // Execute the main query against the extended schema
375 let main_query = parser::ParsedSelect {
376 ctes: vec![], // CTEs already materialized
377 ..parsed.clone()
378 };
379
380 let plan = planner::plan_query(&extended_schema, &main_query, params)?;
381 let table_def = extended_schema
382 .get_table(&plan.table_name().into())
383 .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
384 executor::execute(store, &plan, table_def)
385 }
386
387 /// Executes a UNION / UNION ALL query.
388 fn execute_union<S: ProjectionStore>(
389 &self,
390 store: &mut S,
391 union_stmt: &parser::ParsedUnion,
392 params: &[Value],
393 ) -> Result<QueryResult> {
394 // Plan and execute left side
395 let left_plan = planner::plan_query(&self.schema, &union_stmt.left, params)?;
396 let left_table_def = self
397 .schema
398 .get_table(&left_plan.table_name().into())
399 .ok_or_else(|| QueryError::TableNotFound(left_plan.table_name().to_string()))?;
400 let left_result = executor::execute(store, &left_plan, left_table_def)?;
401
402 // Plan and execute right side
403 let right_plan = planner::plan_query(&self.schema, &union_stmt.right, params)?;
404 let right_table_def = self
405 .schema
406 .get_table(&right_plan.table_name().into())
407 .ok_or_else(|| QueryError::TableNotFound(right_plan.table_name().to_string()))?;
408 let right_result = executor::execute(store, &right_plan, right_table_def)?;
409
410 // Use left side column names for the result
411 let column_names = left_result.columns;
412
413 // Combine rows
414 let mut all_rows = left_result.rows;
415 all_rows.extend(right_result.rows);
416
417 // UNION (not ALL) removes duplicates
418 if !union_stmt.all {
419 let mut seen = std::collections::HashSet::new();
420 all_rows.retain(|row| {
421 // Use debug format as hash key (Value doesn't impl Hash)
422 let key = format!("{row:?}");
423 seen.insert(key)
424 });
425 }
426
427 Ok(QueryResult {
428 columns: column_names,
429 rows: all_rows,
430 })
431 }
432
433 /// Executes a SQL query at a specific log position (point-in-time query).
434 ///
435 /// This enables compliance queries that show the state as it was
436 /// at a specific point in the log.
437 ///
438 /// # Arguments
439 ///
440 /// * `store` - The projection store to query
441 /// * `sql` - SQL query string
442 /// * `params` - Query parameters
443 /// * `position` - Log position to query at
444 ///
445 /// # Example
446 ///
447 /// ```ignore
448 /// // Get user state as of log position 1000
449 /// let result = engine.query_at(
450 /// &mut store,
451 /// "SELECT * FROM users WHERE id = 1",
452 /// &[],
453 /// Offset::new(1000),
454 /// )?;
455 /// ```
456 pub fn query_at<S: ProjectionStore>(
457 &self,
458 store: &mut S,
459 sql: &str,
460 params: &[Value],
461 position: Offset,
462 ) -> Result<QueryResult> {
463 let stmt = self.parse_query_statement_cached(sql)?;
464
465 match stmt {
466 parser::ParsedStatement::Select(parsed) => {
467 let plan = planner::plan_query(&self.schema, &parsed, params)?;
468 let table_def = self
469 .schema
470 .get_table(&plan.table_name().into())
471 .ok_or_else(|| QueryError::TableNotFound(plan.table_name().to_string()))?;
472 executor::execute_at(store, &plan, table_def, position)
473 }
474 parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
475 "UNION is not supported in point-in-time queries".to_string(),
476 )),
477 _ => unreachable!("parse_query_statement only returns Select or Union"),
478 }
479 }
480
481 /// Executes a query against a historical snapshot selected by
482 /// wall-clock timestamp (AUDIT-2026-04 L-4).
483 ///
484 /// This is the user-facing ergonomic form of
485 /// [`Self::query_at`] — healthcare auditors ask "what did the
486 /// chart look like on 2026-01-15?", not "what was log offset
487 /// 948,274?". The caller supplies a `resolver` callback that
488 /// translates a Unix-nanosecond timestamp into the log offset
489 /// whose commit timestamp is the greatest value ≤ the target.
490 ///
491 /// The resolver is a callback rather than a hard dependency
492 /// so the query crate does not take a direct dep on
493 /// `kimberlite-compliance::audit` or the kernel's audit log.
494 /// A typical impl performs a binary search on the in-memory
495 /// audit index.
496 ///
497 /// # Errors
498 ///
499 /// - [`QueryError::UnsupportedFeature`] if the resolver
500 /// returns `None` (no offset exists at or before the target
501 /// — typically because the log is empty or the timestamp
502 /// predates genesis).
503 ///
504 /// # Example
505 ///
506 /// ```ignore
507 /// let resolver = |ts_ns: i64| -> Option<Offset> {
508 /// audit_log.offset_at_or_before(ts_ns)
509 /// };
510 /// let result = engine.query_at_timestamp(
511 /// &mut store,
512 /// "SELECT * FROM charts WHERE patient_id = $1",
513 /// &[Value::BigInt(42)],
514 /// 1_760_000_000_000_000_000, // 2025-10-09T07:06:40Z in ns
515 /// resolver,
516 /// )?;
517 /// ```
518 pub fn query_at_timestamp<S, R>(
519 &self,
520 store: &mut S,
521 sql: &str,
522 params: &[Value],
523 target_ns: i64,
524 resolver: R,
525 ) -> Result<QueryResult>
526 where
527 S: ProjectionStore,
528 R: FnOnce(i64) -> Option<Offset>,
529 {
530 let offset = resolver(target_ns).ok_or_else(|| {
531 QueryError::UnsupportedFeature(format!(
532 "no log offset at or before timestamp {target_ns} ns \
533 (empty log or predates genesis)"
534 ))
535 })?;
536 self.query_at(store, sql, params, offset)
537 }
538
539 /// AUDIT-2026-04 S3.3 — render a SQL query's access plan
540 /// without executing it.
541 ///
542 /// Returns a deterministic multi-line tree string — same
543 /// query always produces the same bytes, which lets apps
544 /// diff plans across schema versions and catch unexpected
545 /// regressions.
546 ///
547 /// The rendered plan **never reveals row data** — only table
548 /// names, column counts, filter presence/absence, and LIMIT
549 /// bounds. Masked column names render as their source name
550 /// (masks are applied post-projection and are not a plan
551 /// concern).
552 ///
553 /// # Errors
554 ///
555 /// Any error from parsing or planning (unsupported statement,
556 /// missing table, etc.) propagates verbatim.
557 ///
558 /// # Example
559 ///
560 /// ```ignore
561 /// let tree = engine.explain("SELECT * FROM patients WHERE id = $1", &[Value::BigInt(42)])?;
562 /// println!("{tree}");
563 /// // -> PointLookup [patients, cols=3]
564 /// ```
565 pub fn explain(&self, sql: &str, params: &[Value]) -> Result<String> {
566 let stmt = self.parse_query_statement_cached(sql)?;
567 match stmt {
568 parser::ParsedStatement::Select(parsed) => {
569 let plan = planner::plan_query(&self.schema, &parsed, params)?;
570 Ok(explain::explain_plan(&plan))
571 }
572 parser::ParsedStatement::Union(_) => Err(QueryError::UnsupportedFeature(
573 "EXPLAIN does not yet render UNION plans".to_string(),
574 )),
575 _ => unreachable!("parse_query_statement only returns Select or Union"),
576 }
577 }
578
579 /// Parses a SQL query without executing it.
580 ///
581 /// Useful for validation or query plan inspection.
582 pub fn prepare(&self, sql: &str, params: &[Value]) -> Result<PreparedQuery> {
583 let stmt = self.parse_query_statement_cached(sql)?;
584 let parsed = match stmt {
585 parser::ParsedStatement::Select(s) => s,
586 _ => {
587 return Err(QueryError::UnsupportedFeature(
588 "only SELECT queries can be prepared".to_string(),
589 ));
590 }
591 };
592 let plan = planner::plan_query(&self.schema, &parsed, params)?;
593
594 Ok(PreparedQuery {
595 plan,
596 schema: self.schema.clone(),
597 })
598 }
599}
600
601/// A prepared (planned) query ready for execution.
602#[derive(Debug, Clone)]
603pub struct PreparedQuery {
604 plan: plan::QueryPlan,
605 schema: Schema,
606}
607
608impl PreparedQuery {
609 /// Executes this prepared query against the current store state.
610 pub fn execute<S: ProjectionStore>(&self, store: &mut S) -> Result<QueryResult> {
611 let table_def = self
612 .schema
613 .get_table(&self.plan.table_name().into())
614 .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
615
616 executor::execute(store, &self.plan, table_def)
617 }
618
619 /// Executes this prepared query at a specific log position.
620 pub fn execute_at<S: ProjectionStore>(
621 &self,
622 store: &mut S,
623 position: Offset,
624 ) -> Result<QueryResult> {
625 let table_def = self
626 .schema
627 .get_table(&self.plan.table_name().into())
628 .ok_or_else(|| QueryError::TableNotFound(self.plan.table_name().to_string()))?;
629
630 executor::execute_at(store, &self.plan, table_def, position)
631 }
632
633 /// Returns the column names this query will return.
634 pub fn columns(&self) -> &[ColumnName] {
635 self.plan.column_names()
636 }
637
638 /// Returns the table name being queried.
639 pub fn table_name(&self) -> &str {
640 self.plan.table_name()
641 }
642}
643
644/// Converts a Value to a serde_json::Value for CTE materialization.
645fn value_to_json(val: &Value) -> serde_json::Value {
646 // NEVER: Placeholder values must be bound before reaching the CTE /
647 // query-result JSON boundary. An unbound Placeholder here indicates a
648 // parameter-binding bug (AUDIT fix: placeholders must be resolved upstream).
649 kimberlite_properties::never!(
650 matches!(val, Value::Placeholder(_)),
651 "query.placeholder_reaches_result_boundary",
652 "Value::Placeholder must never reach query-result / JSON serialization boundary"
653 );
654 match val {
655 Value::Null | Value::Placeholder(_) => serde_json::Value::Null,
656 Value::BigInt(i) => serde_json::json!(i),
657 Value::TinyInt(i) => serde_json::json!(i),
658 Value::SmallInt(i) => serde_json::json!(i),
659 Value::Integer(i) => serde_json::json!(i),
660 Value::Real(f) => serde_json::json!(f),
661 Value::Decimal(v, scale) => {
662 // Format decimal: store the raw value and scale as a string
663 if *scale == 0 {
664 serde_json::json!(v.to_string())
665 } else {
666 let divisor = 10i128.pow(u32::from(*scale));
667 let whole = v / divisor;
668 let frac = (v % divisor).unsigned_abs();
669 serde_json::json!(format!("{whole}.{frac:0>width$}", width = *scale as usize))
670 }
671 }
672 Value::Text(s) => serde_json::json!(s),
673 Value::Boolean(b) => serde_json::json!(b),
674 Value::Date(d) => serde_json::json!(d),
675 Value::Time(t) => serde_json::json!(t),
676 Value::Timestamp(ts) => serde_json::json!(ts.as_nanos()),
677 Value::Uuid(u) => {
678 // Format UUID bytes as hex string
679 let hex: String = u.iter().map(|b| format!("{b:02x}")).collect();
680 serde_json::json!(hex)
681 }
682 Value::Json(j) => j.clone(),
683 Value::Bytes(b) => {
684 use base64::Engine;
685 serde_json::json!(base64::engine::general_purpose::STANDARD.encode(b))
686 }
687 }
688}