sochdb_storage/
database.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SochDB Database Kernel
16//!
17//! The shared core that powers both embedded mode (`SochConnection::open`) and
18//! server mode (`sochdb-server`). This is the "SQLite engine" equivalent.
19//!
20//! ## Architecture
21//!
22//! ```text
23//! ┌──────────────────────────────────────────────────────────────────┐
24//! │                        Database Kernel                            │
25//! │  Arc<Database> - shared by all connections                       │
26//! ├──────────────────────────────────────────────────────────────────┤
27//! │                                                                   │
28//! │  ┌─────────────────┐   ┌─────────────────┐   ┌────────────────┐ │
29//! │  │  DurableStorage │   │     Catalog     │   │  Vector Index  │ │
30//! │  │  (WAL + MVCC)   │   │  (Schema Mgmt)  │   │  (HNSW/Vamana) │ │
31//! │  └────────┬────────┘   └────────┬────────┘   └───────┬────────┘ │
32//! │           │                     │                     │          │
33//! │           └─────────────────────┴─────────────────────┘          │
34//! │                                 │                                 │
35//! │  ┌─────────────────────────────────────────────────────────────┐ │
36//! │  │              Query Executor (Path-Native)                    │ │
37//! │  │  - Path resolution: O(|path|)                                │ │
38//! │  │  - Column projection: 80% I/O reduction                     │ │
39//! │  │  - Context selection: Token-aware chunking                  │ │
40//! │  └─────────────────────────────────────────────────────────────┘ │
41//! │                                                                   │
42//! └──────────────────────────────────────────────────────────────────┘
43//!
44//! Deployment Modes:
45//! ┌─────────────┐   ┌─────────────┐   ┌─────────────┐
46//! │  Embedded   │   │  IPC Server │   │  TCP Server │
47//! │  (in-proc)  │   │  (Unix sock)│   │  (remote)   │
48//! └──────┬──────┘   └──────┬──────┘   └──────┬──────┘
49//!        │                 │                 │
50//!        └─────────────────┴─────────────────┘
51//!                          │
52//!                   Arc<Database>
53//! ```
54//!
55//! ## Latency Model
56//!
57//! Let K = kernel processing cost for a query
58//!
59//! - Embedded: L_emb ≈ K (function call overhead negligible)
60//! - IPC: L_ipc ≈ K + δ_ipc (δ_ipc = ~10-50µs for Unix socket)
61//! - TCP: L_tcp ≈ K + δ_net (δ_net = 100µs-10ms depending on network)
62//!
63//! For LLM context queries where K >> δ_ipc, IPC is "nearly embedded".
64
65use std::collections::HashMap;
66use std::path::{Path, PathBuf};
67use std::sync::Arc;
68use std::sync::atomic::{AtomicU64, Ordering};
69
70use dashmap::DashMap;
71use parking_lot::RwLock;
72
73use crate::durable_storage::{DurableStorage, TransactionMode};
74use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
75use crate::key_buffer::KeyBuffer;
76use crate::packed_row::{PackedColumnDef, PackedColumnType, PackedRow, PackedTableSchema};
77use sochdb_core::catalog::Catalog;
78use sochdb_core::{Result, SochDBError, SochValue};
79
80// Re-export key types
81pub use crate::durable_storage::RecoveryStats;
82
83/// Database configuration
84#[derive(Debug, Clone)]
85pub struct DatabaseConfig {
86    /// Enable group commit for better write throughput
87    pub group_commit: bool,
88    /// Maximum memory for memtables before flush (bytes)
89    pub memtable_size_limit: usize,
90    /// Enable WAL for durability
91    pub wal_enabled: bool,
92    /// Sync mode: fsync after every commit vs periodic
93    pub sync_mode: SyncMode,
94    /// Read-only mode
95    pub read_only: bool,
96    
97    /// Enable ordered index for O(log N) prefix scans
98    ///
99    /// # Deprecation Notice
100    /// 
101    /// **DEPRECATED since 0.2.0**: Use `default_index_policy` instead for per-table control.
102    /// This field will be removed in v0.3.0.
103    ///
104    /// ## Migration Guide
105    ///
106    /// Replace:
107    /// ```ignore
108    /// DatabaseConfig { enable_ordered_index: true, .. }  // Old API
109    /// DatabaseConfig { enable_ordered_index: false, .. } // Old API
110    /// ```
111    ///
112    /// With:
113    /// ```ignore
114    /// DatabaseConfig { default_index_policy: IndexPolicy::ScanOptimized, .. }  // Ordered index enabled
115    /// DatabaseConfig { default_index_policy: IndexPolicy::WriteOptimized, .. } // Ordered index disabled
116    /// ```
117    ///
118    /// ## Behavior
119    ///
120    /// When false, saves ~134 ns/op on writes (20% speedup)
121    /// but scan_prefix becomes O(N) instead of O(log N + K).
122    /// 
123    /// Set to false for write-heavy workloads without range scans.
124    #[deprecated(
125        since = "0.2.0",
126        note = "Use `default_index_policy` field instead. This field will be removed in v0.3.0. \
127                Set IndexPolicy::ScanOptimized for ordered index, WriteOptimized to disable."
128    )]
129    ///
130    /// Set to false for write-heavy workloads without range scans.
131    pub enable_ordered_index: bool,
132    /// Group commit configuration
133    pub group_commit_config: GroupCommitSettings,
134    /// Default index policy for tables not explicitly configured
135    ///
136    /// This replaces the global `enable_ordered_index` toggle with
137    /// fine-grained per-table control. Use `index_registry` to configure
138    /// individual tables.
139    ///
140    /// | Policy         | Insert Cost | Scan Cost      | Use Case              |
141    /// |----------------|-------------|----------------|------------------------|
142    /// | WriteOptimized | O(1)        | O(N)           | High-write, rare scan  |
143    /// | Balanced       | O(1) amort  | O(output+logK) | Mixed OLTP            |
144    /// | ScanOptimized  | O(log N)    | O(logN + K)    | Analytics, range query |
145    /// | AppendOnly     | O(1)        | O(N)           | Time-series logs       |
146    pub default_index_policy: IndexPolicy,
147}
148
149/// Group commit settings - mirrors SQLite's WAL mode tuning
150///
151/// ## Performance Model
152///
153/// Without group commit: Throughput = 1 / L_fsync ≈ 200 commits/sec (L=5ms)
154/// With group commit (batch size K): Throughput = K / L_fsync = K × 200 commits/sec
155///
156/// For K=100: 20,000 commits/sec (100× speedup)
157///
158/// ## SQLite Comparison
159///
160/// | Setting                    | SQLite Equivalent           |
161/// |----------------------------|-----------------------------|
162/// | batch_size = 1             | PRAGMA synchronous = FULL   |
163/// | batch_size = 100           | WAL mode with batching      |
164/// | max_wait_us = 0            | No delay, immediate flush   |
165/// | max_wait_us = 10000        | Up to 10ms delay for batch  |
166#[derive(Debug, Clone)]
167pub struct GroupCommitSettings {
168    /// Minimum batch size before flush (default: 1)
169    pub min_batch_size: usize,
170    /// Maximum batch size (default: 1000)
171    pub max_batch_size: usize,
172    /// Maximum wait time before flush in microseconds (default: 10000 = 10ms)
173    pub max_wait_us: u64,
174    /// Expected fsync latency in microseconds (for adaptive sizing)
175    pub fsync_latency_us: u64,
176}
177
178impl Default for GroupCommitSettings {
179    fn default() -> Self {
180        Self {
181            min_batch_size: 1,
182            max_batch_size: 1000,
183            max_wait_us: 10_000,     // 10ms
184            fsync_latency_us: 5_000, // 5ms
185        }
186    }
187}
188
189impl GroupCommitSettings {
190    /// High throughput preset - maximizes batching
191    pub fn high_throughput() -> Self {
192        Self {
193            min_batch_size: 50,
194            max_batch_size: 5000,
195            max_wait_us: 50_000, // 50ms
196            fsync_latency_us: 5_000,
197        }
198    }
199
200    /// Low latency preset - minimal batching
201    pub fn low_latency() -> Self {
202        Self {
203            min_batch_size: 1,
204            max_batch_size: 10,
205            max_wait_us: 1_000, // 1ms
206            fsync_latency_us: 5_000,
207        }
208    }
209
210    /// Calculate optimal batch size using Little's Law
211    ///
212    /// N* = sqrt(2 × L_fsync × λ / C_wait)
213    ///
214    /// # Arguments
215    /// * `arrival_rate` - Operations per second
216    /// * `wait_cost` - Cost coefficient for waiting (0.0-1.0)
217    pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
218        let l_fsync = self.fsync_latency_us as f64 / 1_000_000.0;
219        let n_star = (2.0 * l_fsync * arrival_rate / wait_cost.max(0.001)).sqrt();
220        (n_star as usize).clamp(self.min_batch_size, self.max_batch_size)
221    }
222}
223
224impl Default for DatabaseConfig {
225    #[allow(deprecated)]
226    fn default() -> Self {
227        Self {
228            group_commit: true,
229            memtable_size_limit: 64 * 1024 * 1024, // 64MB
230            wal_enabled: true,
231            sync_mode: SyncMode::Normal,
232            read_only: false,
233            enable_ordered_index: true, // Default: enabled for compatibility
234            group_commit_config: GroupCommitSettings::default(),
235            default_index_policy: IndexPolicy::Balanced, // New default: balanced OLTP policy
236        }
237    }
238}
239
240impl DatabaseConfig {
241    /// Create config optimized for throughput (Fast Mode)
242    ///
243    /// - Disables ordered index (saves ~134 ns/op)
244    /// - Uses high-throughput group commit settings
245    /// - Suitable for append-only workloads
246    #[allow(deprecated)]
247    pub fn throughput_optimized() -> Self {
248        Self {
249            group_commit: true,
250            memtable_size_limit: 128 * 1024 * 1024, // 128MB
251            wal_enabled: true,
252            sync_mode: SyncMode::Normal,
253            read_only: false,
254            enable_ordered_index: false,
255            group_commit_config: GroupCommitSettings::high_throughput(),
256            default_index_policy: IndexPolicy::WriteOptimized, // No ordered index overhead
257        }
258    }
259
260    /// Create config optimized for latency
261    ///
262    /// - Keeps ordered index for fast range scans
263    /// - Uses low-latency group commit settings
264    /// - Suitable for OLTP workloads
265    #[allow(deprecated)]
266    pub fn latency_optimized() -> Self {
267        Self {
268            group_commit: true,
269            memtable_size_limit: 32 * 1024 * 1024, // 32MB
270            wal_enabled: true,
271            sync_mode: SyncMode::Full,
272            read_only: false,
273            enable_ordered_index: true,
274            group_commit_config: GroupCommitSettings::low_latency(),
275            default_index_policy: IndexPolicy::ScanOptimized, // Fast range scans
276        }
277    }
278
279    /// Create config matching SQLite defaults
280    #[allow(deprecated)]
281    pub fn sqlite_compatible() -> Self {
282        Self {
283            group_commit: false, // SQLite default is single-commit
284            memtable_size_limit: 64 * 1024 * 1024,
285            wal_enabled: true,
286            sync_mode: SyncMode::Normal, // PRAGMA synchronous = NORMAL
287            read_only: false,
288            enable_ordered_index: true,
289            group_commit_config: GroupCommitSettings::default(),
290            default_index_policy: IndexPolicy::Balanced, // Good default for mixed workloads
291        }
292    }
293
294    /// Get effective ordered index setting, derived from `default_index_policy`.
295    /// 
296    /// This is the shim method for the deprecated `enable_ordered_index` field.
297    /// It returns `true` if the policy requires an ordered index (ScanOptimized),
298    /// and `false` otherwise (WriteOptimized, Balanced, AppendOnly).
299    ///
300    /// # Policy Mapping
301    ///
302    /// | IndexPolicy      | Returns |
303    /// |------------------|---------|
304    /// | ScanOptimized    | true    |
305    /// | Balanced         | false   |
306    /// | WriteOptimized   | false   |
307    /// | AppendOnly       | false   |
308    ///
309    /// Note: `Balanced` uses lazy compaction rather than a live ordered index,
310    /// so it returns `false` for the low-level memtable config but still supports
311    /// efficient range scans via sorted runs.
312    pub fn effective_ordered_index(&self) -> bool {
313        matches!(self.default_index_policy, IndexPolicy::ScanOptimized)
314    }
315}
316
317/// WAL sync mode - matches SQLite's PRAGMA synchronous semantics
318///
319/// | SochDB     | SQLite       | Description                                    |
320/// |------------|--------------|------------------------------------------------|
321/// | Off        | OFF (0)      | No fsync, risk of data loss on crash           |
322/// | Normal     | NORMAL (1)   | Fsync at checkpoints, not every commit         |
323/// | Full       | FULL (2)     | Fsync every commit (safest, slowest)           |
324///
325/// # Performance vs Durability Trade-offs
326///
327/// - **Off**: ~10x faster than Full, but may lose last ~100ms of data on crash
328/// - **Normal**: ~5x faster than Full, durable at checkpoint boundaries
329/// - **Full**: Every commit is fsync'd, no data loss possible
330///
331/// # SQLite Compatibility
332///
333/// ```sql
334/// -- SQLite equivalent settings
335/// PRAGMA synchronous = OFF;    -- SyncMode::Off
336/// PRAGMA synchronous = NORMAL; -- SyncMode::Normal  
337/// PRAGMA synchronous = FULL;   -- SyncMode::Full
338/// ```
339#[derive(Debug, Clone, Copy, PartialEq, Eq)]
340pub enum SyncMode {
341    /// No fsync (equivalent to SQLite PRAGMA synchronous = OFF)
342    ///
343    /// Writes buffered in OS, may lose data on power failure.
344    /// Use for non-critical data or bulk loading.
345    Off = 0,
346
347    /// Fsync at checkpoints (equivalent to SQLite PRAGMA synchronous = NORMAL)
348    ///
349    /// Default mode. Syncs WAL at checkpoint boundaries.
350    /// Good balance of performance and durability.
351    Normal = 1,
352
353    /// Fsync every commit (equivalent to SQLite PRAGMA synchronous = FULL)
354    ///
355    /// Safest mode. Every commit is immediately durable.
356    /// Required for financial/critical data.
357    Full = 2,
358}
359
360impl SyncMode {
361    /// Convert from SQLite synchronous pragma value
362    pub fn from_sqlite_pragma(value: u32) -> Self {
363        match value {
364            0 => SyncMode::Off,
365            1 => SyncMode::Normal,
366            _ => SyncMode::Full, // 2+ treated as Full
367        }
368    }
369
370    /// Convert to SQLite synchronous pragma value
371    pub fn to_sqlite_pragma(self) -> u32 {
372        self as u32
373    }
374
375    /// Parse from string (case-insensitive)
376    pub fn parse(s: &str) -> Option<Self> {
377        match s.to_ascii_uppercase().as_str() {
378            "OFF" | "0" => Some(SyncMode::Off),
379            "NORMAL" | "1" => Some(SyncMode::Normal),
380            "FULL" | "2" => Some(SyncMode::Full),
381            _ => None,
382        }
383    }
384}
385
386/// Table schema for the kernel
387#[derive(Debug, Clone)]
388pub struct TableSchema {
389    pub name: String,
390    pub columns: Vec<ColumnDef>,
391}
392
393/// Column definition
394#[derive(Debug, Clone)]
395pub struct ColumnDef {
396    pub name: String,
397    pub col_type: ColumnType,
398    pub nullable: bool,
399}
400
401/// Column types
402#[derive(Debug, Clone, Copy, PartialEq, Eq)]
403pub enum ColumnType {
404    Int64,
405    UInt64,
406    Float64,
407    Text,
408    Binary,
409    Bool,
410}
411
412/// Transaction handle for kernel operations
413#[derive(Debug, Clone, Copy)]
414pub struct TxnHandle {
415    pub txn_id: u64,
416    pub snapshot_ts: u64,
417}
418
419/// Query result from the kernel
420#[derive(Debug, Clone)]
421pub struct QueryResult {
422    /// Column names
423    pub columns: Vec<String>,
424    /// Row data (each row is a map of column -> value)
425    pub rows: Vec<HashMap<String, SochValue>>,
426    /// Number of rows scanned (for stats)
427    pub rows_scanned: usize,
428    /// Bytes read from storage
429    pub bytes_read: usize,
430}
431
432impl QueryResult {
433    /// Empty result
434    pub fn empty() -> Self {
435        Self {
436            columns: vec![],
437            rows: vec![],
438            rows_scanned: 0,
439            bytes_read: 0,
440        }
441    }
442
443    /// Convert to TOON format for token efficiency
444    pub fn to_toon(&self) -> String {
445        if self.rows.is_empty() {
446            return "[]".to_string();
447        }
448
449        // TOON format: table[N]{cols}: row1; row2; ...
450        let n = self.rows.len();
451        let cols = self.columns.join(",");
452
453        let rows_str: Vec<String> = self
454            .rows
455            .iter()
456            .map(|row| {
457                self.columns
458                    .iter()
459                    .map(|c| {
460                        row.get(c)
461                            .map(format_soch_value)
462                            .unwrap_or_else(|| "∅".to_string())
463                    })
464                    .collect::<Vec<_>>()
465                    .join(",")
466            })
467            .collect();
468
469        format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
470    }
471}
472
473fn format_soch_value(v: &SochValue) -> String {
474    match v {
475        SochValue::Null => "∅".to_string(),
476        SochValue::Int(i) => i.to_string(),
477        SochValue::UInt(u) => u.to_string(),
478        SochValue::Float(f) => format!("{:.6}", f),
479        SochValue::Text(s) => {
480            if s.contains(',') || s.contains(';') {
481                format!("\"{}\"", s.replace('"', "\\\""))
482            } else {
483                s.clone()
484            }
485        }
486        SochValue::Bool(b) => if *b { "T" } else { "F" }.to_string(),
487        SochValue::Binary(b) => format!("b64:{}", base64_encode(b)),
488        _ => format!("{:?}", v),
489    }
490}
491
492fn base64_encode(data: &[u8]) -> String {
493    // Simple base64 encoding
494    const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
495    let mut result = String::new();
496
497    for chunk in data.chunks(3) {
498        let b0 = chunk[0] as usize;
499        let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
500        let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
501
502        result.push(ALPHABET[b0 >> 2] as char);
503        result.push(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
504
505        if chunk.len() > 1 {
506            result.push(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
507        } else {
508            result.push('=');
509        }
510
511        if chunk.len() > 2 {
512            result.push(ALPHABET[b2 & 0x3f] as char);
513        } else {
514            result.push('=');
515        }
516    }
517
518    result
519}
520
521// ============================================================================
522// Columnar Query Results - SIMD-friendly result format
523// ============================================================================
524
525use sochdb_core::TypedColumn as CoreTypedColumn;
526
527/// Columnar query result - SIMD-friendly format for analytics
528///
529/// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, this returns
530/// column-oriented `Vec<TypedColumn>` for efficient vectorized operations.
531///
532/// ## Memory Layout
533///
534/// Row-oriented (standard):
535/// ```text
536/// Row 0: [id=1, name="Alice", score=85]
537/// Row 1: [id=2, name="Bob", score=92]
538/// Row 2: [id=3, name="Carol", score=78]
539/// ```
540///
541/// Column-oriented (this format):
542/// ```text
543/// id:    [1, 2, 3]           ← contiguous i64 array (SIMD-friendly)
544/// name:  ["Alice", "Bob", "Carol"] ← Arrow-style string encoding
545/// score: [85, 92, 78]        ← contiguous i64 array
546/// ```
547///
548/// ## Performance Benefits
549///
550/// - SIMD: Column sums use vectorized instructions (~8× faster)
551/// - Cache: Sequential access pattern maximizes L1/L2 cache hits
552/// - Compression: Same-type data compresses better (5-10× typical)
553/// - Filtering: Bitmap operations instead of row iteration
554///
555/// ## Usage
556///
557/// ```ignore
558/// let result = db.query(txn, "users")
559///     .columns(&["id", "score"])
560///     .as_columnar()?;
561///
562/// // SIMD sum
563/// let total_score = result.column("score")
564///     .map(|c| c.sum_i64())
565///     .unwrap_or(0);
566///
567/// // Stats
568/// println!("Rows: {}, Memory: {} bytes", result.row_count(), result.memory_size());
569/// ```
570#[derive(Debug, Clone)]
571pub struct ColumnarQueryResult {
572    /// Column names in order
573    pub columns: Vec<String>,
574    /// Column data - each TypedColumn contains all values for one column
575    pub data: Vec<CoreTypedColumn>,
576    /// Number of rows
577    pub row_count: usize,
578    /// Bytes read from storage
579    pub bytes_read: usize,
580}
581
582impl ColumnarQueryResult {
583    /// Create an empty result
584    pub fn empty() -> Self {
585        Self {
586            columns: vec![],
587            data: vec![],
588            row_count: 0,
589            bytes_read: 0,
590        }
591    }
592
593    /// Get column by name
594    pub fn column(&self, name: &str) -> Option<&CoreTypedColumn> {
595        self.columns
596            .iter()
597            .position(|c| c == name)
598            .and_then(|idx| self.data.get(idx))
599    }
600
601    /// Get column index by name
602    pub fn column_index(&self, name: &str) -> Option<usize> {
603        self.columns.iter().position(|c| c == name)
604    }
605
606    /// Number of rows
607    pub fn row_count(&self) -> usize {
608        self.row_count
609    }
610
611    /// Number of columns
612    pub fn column_count(&self) -> usize {
613        self.columns.len()
614    }
615
616    /// Total memory size in bytes
617    pub fn memory_size(&self) -> usize {
618        self.data.iter().map(|c| c.memory_size()).sum()
619    }
620
621    /// Sum of an i64 column (SIMD-optimized)
622    pub fn sum_i64(&self, column: &str) -> Option<i64> {
623        self.column(column).map(|c| c.sum_i64())
624    }
625
626    /// Sum of an f64 column (SIMD-optimized)
627    pub fn sum_f64(&self, column: &str) -> Option<f64> {
628        self.column(column).map(|c| c.sum_f64())
629    }
630
631    /// Get column statistics (min, max, null count)
632    pub fn column_stats(&self, column: &str) -> Option<&sochdb_core::columnar::ColumnStats> {
633        self.column(column).map(|c| c.stats())
634    }
635
636    /// Convert to TOON format (token-efficient)
637    pub fn to_toon(&self) -> String {
638        if self.row_count == 0 {
639            return "[]".to_string();
640        }
641
642        let n = self.row_count;
643        let cols = self.columns.join(",");
644
645        // Build rows from columns
646        let mut rows_str = Vec::with_capacity(n);
647        for i in 0..n {
648            let row: Vec<String> = self
649                .data
650                .iter()
651                .map(|col| format_columnar_value(col, i))
652                .collect();
653            rows_str.push(row.join(","));
654        }
655
656        format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
657    }
658}
659
660/// Format a single value from a TypedColumn at index
661fn format_columnar_value(col: &CoreTypedColumn, idx: usize) -> String {
662    match col {
663        CoreTypedColumn::Int64 {
664            values, validity, ..
665        } => {
666            if validity.is_valid(idx) && idx < values.len() {
667                values[idx].to_string()
668            } else {
669                "∅".to_string()
670            }
671        }
672        CoreTypedColumn::UInt64 {
673            values, validity, ..
674        } => {
675            if validity.is_valid(idx) && idx < values.len() {
676                values[idx].to_string()
677            } else {
678                "∅".to_string()
679            }
680        }
681        CoreTypedColumn::Float64 {
682            values, validity, ..
683        } => {
684            if validity.is_valid(idx) && idx < values.len() {
685                format!("{:.6}", values[idx])
686            } else {
687                "∅".to_string()
688            }
689        }
690        CoreTypedColumn::Text {
691            offsets,
692            data,
693            validity,
694            ..
695        } => {
696            if validity.is_valid(idx) && idx + 1 < offsets.len() {
697                let start = offsets[idx] as usize;
698                let end = offsets[idx + 1] as usize;
699                std::str::from_utf8(&data[start..end])
700                    .map(|s| {
701                        if s.contains(',') || s.contains(';') {
702                            format!("\"{}\"", s.replace('"', "\\\""))
703                        } else {
704                            s.to_string()
705                        }
706                    })
707                    .unwrap_or_else(|_| "∅".to_string())
708            } else {
709                "∅".to_string()
710            }
711        }
712        CoreTypedColumn::Binary {
713            offsets,
714            data,
715            validity,
716            ..
717        } => {
718            if validity.is_valid(idx) && idx + 1 < offsets.len() {
719                let start = offsets[idx] as usize;
720                let end = offsets[idx + 1] as usize;
721                format!("b64:{}", base64_encode(&data[start..end]))
722            } else {
723                "∅".to_string()
724            }
725        }
726        CoreTypedColumn::Bool {
727            values,
728            validity,
729            len,
730            ..
731        } => {
732            if validity.is_valid(idx) && idx < *len {
733                let word = idx / 64;
734                let bit = idx % 64;
735                if (values[word] >> bit) & 1 == 1 {
736                    "T"
737                } else {
738                    "F"
739                }
740                .to_string()
741            } else {
742                "∅".to_string()
743            }
744        }
745    }
746}
747
748/// Vector search result
749#[derive(Debug, Clone)]
750pub struct VectorSearchResult {
751    pub id: u64,
752    pub distance: f32,
753    pub metadata: Option<HashMap<String, SochValue>>,
754}
755
756/// The SochDB Database Kernel
757///
758/// This is the shared core used by both embedded (`SochConnection`) and
759/// server (`sochdb-server`) modes. It owns all storage, catalog, and
760/// indexing components.
761///
762/// # Thread Safety
763///
764/// The Database is fully thread-safe via internal synchronization:
765/// - Multiple readers can operate concurrently (MVCC snapshots)
766/// - Writers coordinate through WAL and group commit
767/// - All state is behind Arc/RwLock for shared access
768///
769/// # Example
770///
771/// ```ignore
772/// // Open a database (SQLite-style)
773/// let db = Database::open("./my_data")?;
774///
775/// // Begin a transaction
776/// let txn = db.begin_transaction()?;
777///
778/// // Write data
779/// db.put(txn, b"user:1:name", b"Alice")?;
780///
781/// // Commit
782/// db.commit(txn)?;
783/// ```
784#[allow(dead_code)]
785pub struct Database {
786    /// Path to database directory
787    path: PathBuf,
788    /// Durable storage layer (WAL + MVCC + memtable)
789    storage: Arc<DurableStorage>,
790    /// Schema catalog
791    catalog: Arc<RwLock<Catalog>>,
792    /// Registered table schemas (name -> schema) - lock-free for reads
793    tables: DashMap<String, TableSchema>,
794    /// Cached packed schemas for fast insert (name -> packed schema)
795    packed_schemas: DashMap<String, PackedTableSchema>,
796    /// Per-table index policy registry
797    index_registry: Arc<TableIndexRegistry>,
798    /// Configuration
799    config: DatabaseConfig,
800    /// Statistics
801    stats: DatabaseStats,
802    /// Shutdown flag
803    shutdown: AtomicU64,
804}
805
806/// Database statistics
807struct DatabaseStats {
808    transactions_started: AtomicU64,
809    transactions_committed: AtomicU64,
810    transactions_aborted: AtomicU64,
811    queries_executed: AtomicU64,
812    bytes_written: AtomicU64,
813    bytes_read: AtomicU64,
814}
815
816impl DatabaseStats {
817    fn new() -> Self {
818        Self {
819            transactions_started: AtomicU64::new(0),
820            transactions_committed: AtomicU64::new(0),
821            transactions_aborted: AtomicU64::new(0),
822            queries_executed: AtomicU64::new(0),
823            bytes_written: AtomicU64::new(0),
824            bytes_read: AtomicU64::new(0),
825        }
826    }
827}
828
829/// Public statistics snapshot
830#[derive(Debug, Clone)]
831pub struct Stats {
832    pub transactions_started: u64,
833    pub transactions_committed: u64,
834    pub transactions_aborted: u64,
835    pub queries_executed: u64,
836    pub bytes_written: u64,
837    pub bytes_read: u64,
838}
839
840impl Database {
841    /// Open or create a database at the given path.
842    ///
843    /// This is the primary entry point, similar to `sqlite3_open()`.
844    /// If the database exists, it will be opened and WAL recovery performed.
845    /// If it doesn't exist, a new database will be created.
846    ///
847    /// # Arguments
848    ///
849    /// * `path` - Directory path for the database files
850    ///
851    /// # Returns
852    ///
853    /// An `Arc<Database>` that can be shared across threads and connections.
854    pub fn open<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
855        Self::open_with_config(path, DatabaseConfig::default())
856    }
857
858    /// Open without locking (for testing crash recovery scenarios)
859    ///
860    /// # Safety
861    /// This should ONLY be used in tests that simulate crashes by forgetting
862    /// the storage instance. In production, always use `open()`.
863    #[cfg(test)]
864    pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
865        let path = path.as_ref().to_path_buf();
866        let config = DatabaseConfig::default();
867
868        let storage = Arc::new(DurableStorage::open_without_lock(&path)?);
869
870        let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
871            config.default_index_policy,
872        ));
873
874        let db = Arc::new(Self {
875            path: path.clone(),
876            storage,
877            catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
878            tables: DashMap::new(),
879            packed_schemas: DashMap::new(),
880            index_registry,
881            config,
882            stats: DatabaseStats::new(),
883            shutdown: AtomicU64::new(0),
884        });
885
886        db.recover()?;
887        Ok(db)
888    }
889
890    /// Open with custom configuration
891    pub fn open_with_config<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> Result<Arc<Self>> {
892        let path = path.as_ref().to_path_buf();
893
894        // Use IndexPolicy-based storage configuration for automatic memtable selection
895        // This derives ordered index and memtable type from the policy
896        let storage = Arc::new(DurableStorage::open_with_policy(
897            &path,
898            config.default_index_policy,
899            config.group_commit,
900        )?);
901
902        // Create index registry with default policy from config
903        let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
904            config.default_index_policy,
905        ));
906
907        let db = Arc::new(Self {
908            path: path.clone(),
909            storage,
910            catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
911            tables: DashMap::new(),
912            packed_schemas: DashMap::new(),
913            index_registry,
914            config,
915            stats: DatabaseStats::new(),
916            shutdown: AtomicU64::new(0),
917        });
918
919        // Perform crash recovery if needed
920        db.recover()?;
921
922        Ok(db)
923    }
924
925    /// Perform crash recovery
926    fn recover(&self) -> Result<RecoveryStats> {
927        self.storage.recover()
928    }
929
930    /// Get database path
931    pub fn path(&self) -> &Path {
932        &self.path
933    }
934
935    // =========================================================================
936    // Transaction API
937    // =========================================================================
938
939    /// Begin a new transaction
940    pub fn begin_transaction(&self) -> Result<TxnHandle> {
941        self.stats
942            .transactions_started
943            .fetch_add(1, Ordering::Relaxed);
944        let txn_id = self.storage.begin_transaction()?;
945
946        // Get snapshot timestamp from MVCC
947        // For now, use txn_id as a proxy (the real snapshot_ts is managed internally)
948        Ok(TxnHandle {
949            txn_id,
950            snapshot_ts: txn_id,
951        })
952    }
953
954    /// Begin a read-only transaction (optimized: no SSI tracking)
955    ///
956    /// Read-only transactions skip SSI read tracking, reducing overhead
957    /// from ~82ns to ~32ns per read (2.6x faster).
958    ///
959    /// Use this for:
960    /// - SELECT queries that don't modify data
961    /// - Analytics and reporting queries
962    /// - Snapshot reads for backup
963    pub fn begin_read_only(&self) -> Result<TxnHandle> {
964        self.stats
965            .transactions_started
966            .fetch_add(1, Ordering::Relaxed);
967        let txn_id = self.storage.begin_with_mode(TransactionMode::ReadOnly)?;
968        Ok(TxnHandle {
969            txn_id,
970            snapshot_ts: txn_id,
971        })
972    }
973
974    /// Begin a write-only transaction (optimized: no read tracking)
975    ///
976    /// Write-only transactions skip read tracking, improving insert
977    /// throughput for bulk loading scenarios.
978    ///
979    /// Use this for:
980    /// - Bulk data imports
981    /// - Append-only logging tables
982    /// - ETL pipelines
983    pub fn begin_write_only(&self) -> Result<TxnHandle> {
984        self.stats
985            .transactions_started
986            .fetch_add(1, Ordering::Relaxed);
987        let txn_id = self.storage.begin_with_mode(TransactionMode::WriteOnly)?;
988        Ok(TxnHandle {
989            txn_id,
990            snapshot_ts: txn_id,
991        })
992    }
993
994    /// Commit a transaction
995    pub fn commit(&self, txn: TxnHandle) -> Result<u64> {
996        self.stats
997            .transactions_committed
998            .fetch_add(1, Ordering::Relaxed);
999        self.storage.commit(txn.txn_id)
1000    }
1001
1002    /// Abort a transaction
1003    pub fn abort(&self, txn: TxnHandle) -> Result<()> {
1004        self.stats
1005            .transactions_aborted
1006            .fetch_add(1, Ordering::Relaxed);
1007        self.storage.abort(txn.txn_id)
1008    }
1009
1010    // =========================================================================
1011    // Per-Table Index Policy API
1012    // =========================================================================
1013
1014    /// Configure index policy for a table
1015    ///
1016    /// This allows fine-grained control over write/scan trade-offs per table:
1017    ///
1018    /// | Policy         | Insert Cost | Scan Cost      | Use Case              |
1019    /// |----------------|-------------|----------------|------------------------|
1020    /// | WriteOptimized | O(1)        | O(N)           | High-write, rare scan  |
1021    /// | Balanced       | O(1) amort  | O(output+logK) | Mixed OLTP            |
1022    /// | ScanOptimized  | O(log N)    | O(logN + K)    | Analytics, range query |
1023    /// | AppendOnly     | O(1)        | O(N)           | Time-series logs       |
1024    ///
1025    /// # Example
1026    ///
1027    /// ```ignore
1028    /// // Fast inserts for logs table (no ordered index overhead)
1029    /// db.set_table_index_policy("logs", IndexPolicy::WriteOptimized);
1030    ///
1031    /// // Efficient range scans for analytics table
1032    /// db.set_table_index_policy("analytics", IndexPolicy::ScanOptimized);
1033    ///
1034    /// // Balanced for OLTP tables
1035    /// db.set_table_index_policy("users", IndexPolicy::Balanced);
1036    /// ```
1037    pub fn set_table_index_policy(&self, table: &str, policy: IndexPolicy) {
1038        self.index_registry.configure_table(
1039            TableIndexConfig::new(table, policy)
1040        );
1041    }
1042
1043    /// Get the index policy for a table
1044    pub fn get_table_index_policy(&self, table: &str) -> IndexPolicy {
1045        self.index_registry.get_policy(table)
1046    }
1047
1048    /// Get the index registry for advanced configuration
1049    pub fn index_registry(&self) -> &Arc<TableIndexRegistry> {
1050        &self.index_registry
1051    }
1052
1053    // =========================================================================
1054    // Key-Value API (Low-level)
1055    // =========================================================================
1056
1057    /// Put a key-value pair
1058    pub fn put(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1059        self.stats
1060            .bytes_written
1061            .fetch_add((key.len() + value.len()) as u64, Ordering::Relaxed);
1062        // Use write_refs to avoid unnecessary allocations
1063        self.storage.write_refs(txn.txn_id, key, value)
1064    }
1065
1066    /// Batch put multiple key-value pairs with reduced overhead
1067    ///
1068    /// This amortizes per-operation costs over the entire batch:
1069    /// - Single DashMap lookup
1070    /// - Batch MVCC tracking
1071    /// - Batch memtable writes
1072    ///
1073    /// For 100+ entries, this is 2-3x faster than individual puts.
1074    ///
1075    /// # Example
1076    ///
1077    /// ```ignore
1078    /// let writes: Vec<(&[u8], &[u8])> = vec![
1079    ///     (b"key1", b"value1"),
1080    ///     (b"key2", b"value2"),
1081    ///     (b"key3", b"value3"),
1082    /// ];
1083    /// db.put_batch(txn, &writes)?;
1084    /// ```
1085    pub fn put_batch(&self, txn: TxnHandle, writes: &[(&[u8], &[u8])]) -> Result<()> {
1086        let bytes: u64 = writes
1087            .iter()
1088            .map(|(k, v)| (k.len() + v.len()) as u64)
1089            .sum();
1090        self.stats.bytes_written.fetch_add(bytes, Ordering::Relaxed);
1091        self.storage.write_batch_refs(txn.txn_id, writes)
1092    }
1093
1094    /// Get a value by key
1095    pub fn get(&self, txn: TxnHandle, key: &[u8]) -> Result<Option<Vec<u8>>> {
1096        let result = self.storage.read(txn.txn_id, key)?;
1097        if let Some(ref data) = result {
1098            self.stats
1099                .bytes_read
1100                .fetch_add(data.len() as u64, Ordering::Relaxed);
1101        }
1102        Ok(result)
1103    }
1104
1105    /// Delete a key
1106    pub fn delete(&self, txn: TxnHandle, key: &[u8]) -> Result<()> {
1107        self.storage.delete(txn.txn_id, key.to_vec())
1108    }
1109
1110    /// Minimum prefix length for scan operations.
1111    /// Prevents expensive full-table scans by requiring a meaningful prefix.
1112    pub const MIN_SCAN_PREFIX_LEN: usize = 2;
1113
1114    /// Scan keys with a prefix (enforces minimum prefix length for safety).
1115    ///
1116    /// # Prefix Safety
1117    /// 
1118    /// To prevent accidental full-table scans, this method requires a minimum
1119    /// prefix length of 2 bytes. Use `scan_unchecked` for internal operations
1120    /// that need empty/short prefixes.
1121    ///
1122    /// # Errors
1123    ///
1124    /// Returns `SochDBError::InvalidInput` if prefix is too short.
1125    pub fn scan(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1126        if prefix.len() < Self::MIN_SCAN_PREFIX_LEN {
1127            return Err(SochDBError::InvalidArgument(format!(
1128                "Prefix too short: {} bytes (minimum {} required). \
1129                 Use scan_unchecked() for unrestricted scans.",
1130                prefix.len(),
1131                Self::MIN_SCAN_PREFIX_LEN
1132            )));
1133        }
1134        self.scan_unchecked(txn, prefix)
1135    }
1136
1137    /// Scan keys with a prefix without length validation.
1138    ///
1139    /// # Warning
1140    ///
1141    /// This method allows empty/short prefixes which can cause expensive
1142    /// full-table scans. Use `scan()` unless you specifically need unrestricted
1143    /// prefix access for internal operations.
1144    pub fn scan_unchecked(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1145        let results = self.storage.scan(txn.txn_id, prefix)?;
1146        let bytes: u64 = results
1147            .iter()
1148            .map(|(k, v)| (k.len() + v.len()) as u64)
1149            .sum();
1150        self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1151        Ok(results)
1152    }
1153
1154    /// Scan keys in range
1155    pub fn scan_range(
1156        &self,
1157        txn: TxnHandle,
1158        start: &[u8],
1159        end: &[u8],
1160    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1161        let results = self.storage.scan_range(txn.txn_id, start, end)?;
1162        let bytes: u64 = results
1163            .iter()
1164            .map(|(k, v)| (k.len() + v.len()) as u64)
1165            .sum();
1166        self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1167        Ok(results)
1168    }
1169
1170    /// Streaming scan for very large result sets
1171    /// 
1172    /// Returns an iterator that yields (key, value) pairs without
1173    /// materializing the entire result set. Use this for large scans
1174    /// where memory efficiency is important.
1175    /// 
1176    /// ## Performance
1177    /// 
1178    /// - Memory: O(1) per iteration vs O(N) for scan_range
1179    /// - Latency: First result available immediately vs waiting for all results
1180    /// - Throughput: Slightly lower due to per-item overhead
1181    /// 
1182    /// ## Usage
1183    /// 
1184    /// ```ignore
1185    /// for result in db.scan_range_iter(txn, b"start", b"end") {
1186    ///     let (key, value) = result?;
1187    ///     // Process immediately - no need to wait for all results
1188    /// }
1189    /// ```
1190    pub fn scan_range_iter<'a>(
1191        &'a self,
1192        txn: TxnHandle,
1193        start: &'a [u8],
1194        end: &'a [u8],
1195    ) -> impl Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a {
1196        let stats = &self.stats;
1197        self.storage
1198            .scan_range_iter(txn.txn_id, start, end)
1199            .map(move |item| {
1200                stats.bytes_read.fetch_add(
1201                    (item.0.len() + item.1.len()) as u64,
1202                    Ordering::Relaxed,
1203                );
1204                Ok(item)
1205            })
1206    }
1207
1208    /// Flush memtable to WAL/Disk
1209    pub fn flush(&self) -> Result<()> {
1210        self.storage.fsync()
1211    }
1212
1213    // =========================================================================
1214    // Path-Native API (SochDB's differentiator)
1215    // =========================================================================
1216
1217    /// Get storage statistics
1218    pub fn storage_stats(&self) -> crate::durable_storage::StorageStats {
1219        self.storage.stats()
1220    }
1221
1222    /// Put a value at a path
1223    ///
1224    /// Path format: "collection/doc_id/field" or "table.row_id.column"
1225    /// Resolution is O(|path|), not O(log N) like B-tree.
1226    pub fn put_path(&self, txn: TxnHandle, path: &str, value: &[u8]) -> Result<()> {
1227        self.put(txn, path.as_bytes(), value)
1228    }
1229
1230    /// Get a value at a path
1231    pub fn get_path(&self, txn: TxnHandle, path: &str) -> Result<Option<Vec<u8>>> {
1232        self.get(txn, path.as_bytes())
1233    }
1234
1235    /// Delete at a path
1236    pub fn delete_path(&self, txn: TxnHandle, path: &str) -> Result<()> {
1237        self.delete(txn, path.as_bytes())
1238    }
1239
1240    /// Scan a path prefix
1241    ///
1242    /// Returns all key-value pairs where key starts with prefix.
1243    /// Useful for: "users/123/" -> all fields of user 123
1244    pub fn scan_path(&self, txn: TxnHandle, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
1245        self.stats.queries_executed.fetch_add(1, Ordering::Relaxed);
1246
1247        let results = self.scan(txn, prefix.as_bytes())?;
1248
1249        Ok(results
1250            .into_iter()
1251            .filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
1252            .collect())
1253    }
1254
1255    // =========================================================================
1256    // Query API
1257    // =========================================================================
1258
1259    /// Execute a path query and return results
1260    ///
1261    /// This is the main query interface for LLM context retrieval.
1262    /// Supports:
1263    /// - Path prefix matching
1264    /// - Column projection (for I/O reduction)
1265    /// - Limit/offset
1266    pub fn query(&self, txn: TxnHandle, path_prefix: &str) -> QueryBuilder<'_> {
1267        QueryBuilder::new(self, txn, path_prefix.to_string())
1268    }
1269
1270    // =========================================================================
1271    // Table API (Higher-level abstraction)
1272    // =========================================================================
1273
1274    /// Register a table schema
1275    pub fn register_table(&self, schema: TableSchema) -> Result<()> {
1276        if self.tables.contains_key(&schema.name) {
1277            return Err(SochDBError::InvalidArgument(format!(
1278                "Table '{}' already exists",
1279                schema.name
1280            )));
1281        }
1282        // Cache the packed schema for fast inserts
1283        let packed_schema = Self::to_packed_schema(&schema);
1284        self.packed_schemas
1285            .insert(schema.name.clone(), packed_schema);
1286        self.tables.insert(schema.name.clone(), schema);
1287        Ok(())
1288    }
1289
1290    /// Get table schema
1291    pub fn get_table_schema(&self, name: &str) -> Option<TableSchema> {
1292        self.tables.get(name).map(|s| s.clone())
1293    }
1294
1295    /// List all tables
1296    pub fn list_tables(&self) -> Vec<String> {
1297        self.tables.iter().map(|e| e.key().clone()).collect()
1298    }
1299    /// Convert TableSchema to PackedTableSchema for efficient storage
1300    fn to_packed_schema(schema: &TableSchema) -> PackedTableSchema {
1301        let columns = schema
1302            .columns
1303            .iter()
1304            .map(|col| PackedColumnDef {
1305                name: col.name.clone(),
1306                col_type: match col.col_type {
1307                    ColumnType::Int64 => PackedColumnType::Int64,
1308                    ColumnType::UInt64 => PackedColumnType::UInt64,
1309                    ColumnType::Float64 => PackedColumnType::Float64,
1310                    ColumnType::Text => PackedColumnType::Text,
1311                    ColumnType::Binary => PackedColumnType::Binary,
1312                    ColumnType::Bool => PackedColumnType::Bool,
1313                },
1314                nullable: col.nullable,
1315            })
1316            .collect();
1317
1318        PackedTableSchema::new(&schema.name, columns)
1319    }
1320
1321    /// Insert a row into a table
1322    ///
1323    /// Uses packed row format: stores entire row as single key-value pair.
1324    /// This reduces write amplification from 4× to 1× for a 4-column table.
1325    ///
1326    /// # Performance
1327    /// - Before: 4 columns × (WAL entry + MVCC version) = 4 writes
1328    /// - After: 1 packed row = 1 write
1329    /// - Improvement: ~4× fewer WAL entries, ~48% less I/O overhead
1330    pub fn insert_row(
1331        &self,
1332        txn: TxnHandle,
1333        table: &str,
1334        row_id: u64,
1335        values: &HashMap<String, SochValue>,
1336    ) -> Result<()> {
1337        // Use cached packed schema - single DashMap lookup, no clone
1338        let packed_schema = self
1339            .packed_schemas
1340            .get(table)
1341            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1342
1343        // Pack the row using cached schema
1344        let packed_row = PackedRow::pack(&packed_schema, values);
1345
1346        // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1347        let key = KeyBuffer::format_row_key(table, row_id);
1348
1349        self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1350
1351        Ok(())
1352    }
1353
1354    /// Read a row from a table
1355    ///
1356    /// Reads packed row and extracts requested columns in O(k) time.
1357    /// Column projection happens in memory, not storage - all columns are fetched.
1358    pub fn read_row(
1359        &self,
1360        txn: TxnHandle,
1361        table: &str,
1362        row_id: u64,
1363        columns: Option<&[&str]>,
1364    ) -> Result<Option<HashMap<String, SochValue>>> {
1365        let schema = self
1366            .tables
1367            .get(table)
1368            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1369
1370        // Read the packed row with a single key lookup using KeyBuffer
1371        let key = KeyBuffer::format_row_key(table, row_id);
1372        let bytes = match self.get(txn, key.as_bytes())? {
1373            Some(b) => b,
1374            None => return Ok(None),
1375        };
1376
1377        // Use cached packed schema
1378        let packed_schema = self
1379            .packed_schemas
1380            .get(table)
1381            .ok_or_else(|| SochDBError::Internal("Packed schema not found".into()))?;
1382        let packed_row = PackedRow::from_bytes(bytes, packed_schema.num_columns())?;
1383
1384        // Determine which columns to return
1385        let cols_to_read: Vec<&str> = match columns {
1386            Some(c) => c.to_vec(),
1387            None => schema.columns.iter().map(|c| c.name.as_str()).collect(),
1388        };
1389
1390        let mut row = HashMap::new();
1391        for col_name in cols_to_read {
1392            if let Some(idx) = packed_schema.column_index(col_name)
1393                && let Some(col_def) = packed_schema.column(idx)
1394                && let Some(value) = packed_row.get_column(idx, col_def.col_type)
1395            {
1396                row.insert(col_name.to_string(), value);
1397            }
1398        }
1399
1400        Ok(Some(row))
1401    }
1402
1403    /// Insert multiple rows efficiently in a batch
1404    ///
1405    /// This method accumulates all rows and writes them with fewer WAL syncs.
1406    /// Ideal for bulk loading scenarios.
1407    ///
1408    /// # Performance
1409    /// - Uses group commit to batch fsync operations
1410    /// - Expected throughput: 500K-1M rows/sec depending on row size
1411    pub fn insert_rows_batch(
1412        &self,
1413        txn: TxnHandle,
1414        table: &str,
1415        rows: &[(u64, HashMap<String, SochValue>)],
1416    ) -> Result<usize> {
1417        // Use cached packed schema
1418        let packed_schema = self
1419            .packed_schemas
1420            .get(table)
1421            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1422
1423        let mut count = 0;
1424
1425        for (row_id, values) in rows {
1426            // Pack and write using KeyBuffer for efficient key construction
1427            let packed_row = PackedRow::pack(&packed_schema, values);
1428            let key = KeyBuffer::format_row_key(table, *row_id);
1429            self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1430            count += 1;
1431        }
1432
1433        Ok(count)
1434    }
1435
1436    /// Ultra-fast raw put - bypasses all validation
1437    ///
1438    /// Use when you've already validated the data and just need speed.
1439    /// This is ~10× faster than insert_row() for bulk inserts.
1440    #[inline]
1441    pub fn put_raw(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1442        self.storage.write_refs(txn.txn_id, key, value)
1443    }
1444
1445    /// Zero-allocation insert - fastest path for bulk inserts
1446    ///
1447    /// Takes values as a slice in schema column order, avoiding HashMap overhead.
1448    ///
1449    /// # Arguments
1450    /// * `txn` - Transaction handle
1451    /// * `table` - Table name
1452    /// * `row_id` - Row identifier
1453    /// * `values` - Values in schema column order (None = NULL)
1454    ///
1455    /// # Performance
1456    /// - Eliminates ~6 allocations per row vs insert_row()
1457    /// - Expected: 1.2M-1.5M inserts/sec
1458    ///
1459    /// # Example
1460    /// ```ignore
1461    /// let values: &[Option<&SochValue>] = &[
1462    ///     Some(&SochValue::Int(1)),
1463    ///     Some(&SochValue::Text("Alice".into())),
1464    ///     None, // NULL
1465    /// ];
1466    /// db.insert_row_slice(txn, "users", 1, values)?;
1467    /// ```
1468    #[inline]
1469    pub fn insert_row_slice(
1470        &self,
1471        txn: TxnHandle,
1472        table: &str,
1473        row_id: u64,
1474        values: &[Option<&SochValue>],
1475    ) -> Result<()> {
1476        // Use cached packed schema - single DashMap lookup
1477        let packed_schema = self
1478            .packed_schemas
1479            .get(table)
1480            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1481
1482        // Validate column count matches
1483        if values.len() != packed_schema.num_columns() {
1484            return Err(SochDBError::InvalidArgument(format!(
1485                "Expected {} columns, got {}",
1486                packed_schema.num_columns(),
1487                values.len()
1488            )));
1489        }
1490
1491        // Pack using zero-allocation path
1492        let packed_row = PackedRow::pack_slice(&packed_schema, values);
1493
1494        // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1495        let key = KeyBuffer::format_row_key(table, row_id);
1496
1497        self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1498        Ok(())
1499    }
1500
1501    // =========================================================================
1502    // Maintenance
1503    // =========================================================================
1504
1505    /// Force fsync to disk
1506    pub fn fsync(&self) -> Result<()> {
1507        self.storage.fsync()
1508    }
1509
1510    /// Create a checkpoint
1511    pub fn checkpoint(&self) -> Result<u64> {
1512        self.storage.checkpoint()
1513    }
1514
1515    /// Run garbage collection
1516    pub fn gc(&self) -> usize {
1517        self.storage.gc()
1518    }
1519
1520    /// Get database statistics
1521    pub fn stats(&self) -> Stats {
1522        Stats {
1523            transactions_started: self.stats.transactions_started.load(Ordering::Relaxed),
1524            transactions_committed: self.stats.transactions_committed.load(Ordering::Relaxed),
1525            transactions_aborted: self.stats.transactions_aborted.load(Ordering::Relaxed),
1526            queries_executed: self.stats.queries_executed.load(Ordering::Relaxed),
1527            bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
1528            bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
1529        }
1530    }
1531
1532    /// Shutdown the database gracefully
1533    pub fn shutdown(&self) -> Result<()> {
1534        if self.shutdown.swap(1, Ordering::SeqCst) == 1 {
1535            return Ok(()); // Already shutting down
1536        }
1537
1538        // Flush any pending writes
1539        self.fsync()?;
1540
1541        // Create clean shutdown marker
1542        let marker = self.path.join(".clean_shutdown");
1543        std::fs::write(&marker, b"ok")?;
1544
1545        Ok(())
1546    }
1547}
1548
1549impl Drop for Database {
1550    fn drop(&mut self) {
1551        // Try graceful shutdown if not already done
1552        if self.shutdown.load(Ordering::SeqCst) == 0 {
1553            let _ = self.fsync();
1554            let marker = self.path.join(".clean_shutdown");
1555            let _ = std::fs::write(&marker, b"ok");
1556        }
1557    }
1558}
1559
1560/// Query builder for fluent query construction
1561pub struct QueryBuilder<'a> {
1562    db: &'a Database,
1563    txn: TxnHandle,
1564    path_prefix: String,
1565    columns: Option<Vec<String>>,
1566    limit: Option<usize>,
1567    offset: Option<usize>,
1568}
1569
1570impl<'a> QueryBuilder<'a> {
1571    fn new(db: &'a Database, txn: TxnHandle, path_prefix: String) -> Self {
1572        Self {
1573            db,
1574            txn,
1575            path_prefix,
1576            columns: None,
1577            limit: None,
1578            offset: None,
1579        }
1580    }
1581
1582    /// Select specific columns (for I/O reduction)
1583    pub fn columns(mut self, cols: &[&str]) -> Self {
1584        self.columns = Some(cols.iter().map(|s| s.to_string()).collect());
1585        self
1586    }
1587
1588    /// Limit results
1589    pub fn limit(mut self, n: usize) -> Self {
1590        self.limit = Some(n);
1591        self
1592    }
1593
1594    /// Skip results
1595    pub fn offset(mut self, n: usize) -> Self {
1596        self.offset = Some(n);
1597        self
1598    }
1599
1600    /// Execute the query
1601    ///
1602    /// Scans packed rows and unpacks them. Each key is "table/row_id" pointing to a packed row.
1603    pub fn execute(self) -> Result<QueryResult> {
1604        self.db
1605            .stats
1606            .queries_executed
1607            .fetch_add(1, Ordering::Relaxed);
1608
1609        // Get schema for the table if we're querying a table
1610        let table_name = self
1611            .path_prefix
1612            .split('/')
1613            .next()
1614            .unwrap_or(&self.path_prefix);
1615        let schema = self.db.tables.get(table_name).map(|s| s.clone());
1616
1617        // Scan the path prefix
1618        let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1619
1620        let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
1621        let mut bytes_read = 0usize;
1622
1623        if let Some(ref schema) = schema {
1624            // We have a table schema - use cached packed schema
1625            let packed_schema = self
1626                .db
1627                .packed_schemas
1628                .get(table_name)
1629                .map(|ps| ps.clone())
1630                .unwrap_or_else(|| Database::to_packed_schema(schema));
1631
1632            for (path, value_bytes) in results {
1633                // Parse path: table/row_id
1634                let parts: Vec<&str> = path.split('/').collect();
1635                if parts.len() == 2 {
1636                    // This is a packed row
1637                    bytes_read += value_bytes.len();
1638
1639                    if let Ok(packed_row) =
1640                        PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
1641                    {
1642                        // Unpack all columns or just requested columns
1643                        let mut row = HashMap::new();
1644
1645                        if let Some(ref cols) = self.columns {
1646                            // Only extract requested columns
1647                            for col_name in cols {
1648                                if let Some(idx) = packed_schema.column_index(col_name)
1649                                    && let Some(col_def) = packed_schema.column(idx)
1650                                    && let Some(value) =
1651                                        packed_row.get_column(idx, col_def.col_type)
1652                                {
1653                                    row.insert(col_name.clone(), value);
1654                                }
1655                            }
1656                        } else {
1657                            // Extract all columns
1658                            row = packed_row.unpack(&packed_schema);
1659                        }
1660
1661                        if !row.is_empty() {
1662                            rows.push(row);
1663                        }
1664                    }
1665                }
1666            }
1667        } else {
1668            // Fallback: no schema, try legacy column-per-key format
1669            let mut rows_map: HashMap<String, HashMap<String, SochValue>> = HashMap::new();
1670
1671            for (path, value_bytes) in results {
1672                let parts: Vec<&str> = path.split('/').collect();
1673                if parts.len() >= 3 {
1674                    let row_key = format!("{}/{}", parts[0], parts[1]);
1675                    let col_name = parts[2..].join("/");
1676
1677                    if let Some(ref cols) = self.columns
1678                        && !cols.contains(&col_name)
1679                    {
1680                        continue;
1681                    }
1682
1683                    bytes_read += value_bytes.len();
1684                    let row = rows_map.entry(row_key).or_default();
1685                    row.insert(col_name, deserialize_value(&value_bytes));
1686                }
1687            }
1688
1689            rows = rows_map.into_values().collect();
1690        }
1691
1692        // Apply offset
1693        if let Some(offset) = self.offset {
1694            rows = rows.into_iter().skip(offset).collect();
1695        }
1696
1697        // Apply limit
1698        if let Some(limit) = self.limit {
1699            rows.truncate(limit);
1700        }
1701
1702        // Collect column names
1703        let columns: Vec<String> = self.columns.unwrap_or_else(|| {
1704            rows.iter()
1705                .flat_map(|r| r.keys().cloned())
1706                .collect::<std::collections::HashSet<_>>()
1707                .into_iter()
1708                .collect()
1709        });
1710
1711        Ok(QueryResult {
1712            columns,
1713            rows_scanned: rows.len(),
1714            bytes_read,
1715            rows,
1716        })
1717    }
1718
1719    /// Execute and return TOON format (for LLM efficiency)
1720    pub fn to_toon(self) -> Result<String> {
1721        let result = self.execute()?;
1722        Ok(result.to_toon())
1723    }
1724
1725    /// Execute with lazy iteration - avoids materializing all rows
1726    ///
1727    /// Returns an iterator over rows as `Vec<SochValue>` in schema column order.
1728    /// This is more memory-efficient than `execute()` for large result sets.
1729    ///
1730    /// # Performance
1731    /// - No upfront materialization of all rows
1732    /// - ~40% less memory for large result sets
1733    /// - Ideal for streaming to network or aggregations
1734    ///
1735    /// # Example
1736    /// ```ignore
1737    /// for row_result in db.query(txn, "users").execute_iter()? {
1738    ///     let row = row_result?;
1739    ///     // row is Vec<SochValue> in column order
1740    /// }
1741    /// ```
1742    pub fn execute_iter(self) -> Result<QueryRowIterator> {
1743        self.db
1744            .stats
1745            .queries_executed
1746            .fetch_add(1, Ordering::Relaxed);
1747
1748        let table_name = self
1749            .path_prefix
1750            .split('/')
1751            .next()
1752            .unwrap_or(&self.path_prefix)
1753            .to_string();
1754
1755        // Get packed schema (clone needed for iterator ownership)
1756        let packed_schema = self.db.packed_schemas.get(&table_name).map(|ps| ps.clone());
1757
1758        // Scan the path prefix
1759        let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1760
1761        Ok(QueryRowIterator {
1762            results: results.into_iter(),
1763            packed_schema,
1764            columns: self.columns,
1765            offset: self.offset.unwrap_or(0),
1766            limit: self.limit,
1767            yielded: 0,
1768            skipped: 0,
1769        })
1770    }
1771
1772    /// Execute and return columnar (SIMD-friendly) result format
1773    ///
1774    /// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, returns
1775    /// column-oriented `Vec<TypedColumn>` for vectorized operations.
1776    ///
1777    /// ## Performance Benefits
1778    ///
1779    /// - SIMD: Aggregate operations (sum, avg) use vectorized instructions
1780    /// - Cache: Sequential access maximizes L1/L2 hits
1781    /// - Memory: ~30% less overhead than row-based format
1782    /// - Analytics: Ideal for ML preprocessing and statistics
1783    ///
1784    /// ## Example
1785    ///
1786    /// ```ignore
1787    /// let result = db.query(txn, "users")
1788    ///     .columns(&["id", "score"])
1789    ///     .as_columnar()?;
1790    ///
1791    /// // SIMD-optimized sum
1792    /// let total = result.sum_i64("score").unwrap_or(0);
1793    ///
1794    /// // Direct column access
1795    /// if let Some(scores) = result.column("score") {
1796    ///     for i in 0..scores.len() {
1797    ///         if let Some(v) = scores.get_i64(i) {
1798    ///             println!("Score: {}", v);
1799    ///         }
1800    ///     }
1801    /// }
1802    /// ```
1803    pub fn as_columnar(self) -> Result<ColumnarQueryResult> {
1804        self.db
1805            .stats
1806            .queries_executed
1807            .fetch_add(1, Ordering::Relaxed);
1808
1809        let table_name = self
1810            .path_prefix
1811            .split('/')
1812            .next()
1813            .unwrap_or(&self.path_prefix);
1814        let schema = self.db.tables.get(table_name).map(|s| s.clone());
1815
1816        // Get packed schema
1817        let packed_schema = match self.db.packed_schemas.get(table_name) {
1818            Some(ps) => ps.clone(),
1819            None => return Ok(ColumnarQueryResult::empty()),
1820        };
1821
1822        // Determine columns to fetch
1823        let column_names: Vec<String> = self.columns.clone().unwrap_or_else(|| {
1824            schema
1825                .as_ref()
1826                .map(|s| s.columns.iter().map(|c| c.name.clone()).collect())
1827                .unwrap_or_default()
1828        });
1829
1830        if column_names.is_empty() {
1831            return Ok(ColumnarQueryResult::empty());
1832        }
1833
1834        // Initialize TypedColumns based on schema types
1835        let mut columns: Vec<CoreTypedColumn> = column_names
1836            .iter()
1837            .map(|col_name| {
1838                packed_schema
1839                    .column_index(col_name)
1840                    .and_then(|idx| packed_schema.column(idx))
1841                    .map(|col_def| match col_def.col_type {
1842                        PackedColumnType::Int64 => CoreTypedColumn::new_int64(),
1843                        PackedColumnType::UInt64 => CoreTypedColumn::new_uint64(),
1844                        PackedColumnType::Float64 => CoreTypedColumn::new_float64(),
1845                        PackedColumnType::Text => CoreTypedColumn::new_text(),
1846                        PackedColumnType::Binary => CoreTypedColumn::new_binary(),
1847                        PackedColumnType::Bool => CoreTypedColumn::new_bool(),
1848                        PackedColumnType::Null => CoreTypedColumn::new_text(), // Null column = fallback to text
1849                    })
1850                    .unwrap_or_else(CoreTypedColumn::new_text) // fallback
1851            })
1852            .collect();
1853
1854        // Scan the path prefix
1855        let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1856
1857        let mut row_count = 0;
1858        let mut bytes_read = 0;
1859        let mut skipped = 0;
1860
1861        for (path, value_bytes) in results {
1862            // Parse path: table/row_id
1863            let parts: Vec<&str> = path.split('/').collect();
1864            if parts.len() != 2 {
1865                continue;
1866            }
1867
1868            // Apply offset
1869            if let Some(offset) = self.offset
1870                && skipped < offset
1871            {
1872                skipped += 1;
1873                continue;
1874            }
1875
1876            // Apply limit
1877            if let Some(limit) = self.limit
1878                && row_count >= limit
1879            {
1880                break;
1881            }
1882
1883            bytes_read += value_bytes.len();
1884
1885            if let Ok(packed_row) = PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
1886            {
1887                // Extract each column and push to corresponding TypedColumn
1888                for (col_idx, col_name) in column_names.iter().enumerate() {
1889                    if let Some(schema_idx) = packed_schema.column_index(col_name) {
1890                        if let Some(col_def) = packed_schema.column(schema_idx) {
1891                            let value = packed_row.get_column(schema_idx, col_def.col_type);
1892                            push_value_to_typed_column(&mut columns[col_idx], value);
1893                        } else {
1894                            push_null_to_typed_column(&mut columns[col_idx]);
1895                        }
1896                    } else {
1897                        push_null_to_typed_column(&mut columns[col_idx]);
1898                    }
1899                }
1900                row_count += 1;
1901            }
1902        }
1903
1904        Ok(ColumnarQueryResult {
1905            columns: column_names,
1906            data: columns,
1907            row_count,
1908            bytes_read,
1909        })
1910    }
1911}
1912
1913/// Lazy iterator over query results
1914///
1915/// Unpacks rows on-demand, avoiding upfront materialization.
1916pub struct QueryRowIterator {
1917    results: std::vec::IntoIter<(String, Vec<u8>)>,
1918    packed_schema: Option<PackedTableSchema>,
1919    columns: Option<Vec<String>>,
1920    offset: usize,
1921    limit: Option<usize>,
1922    yielded: usize,
1923    skipped: usize,
1924}
1925
1926impl Iterator for QueryRowIterator {
1927    type Item = Result<Vec<SochValue>>;
1928
1929    fn next(&mut self) -> Option<Self::Item> {
1930        // Check limit
1931        if let Some(limit) = self.limit
1932            && self.yielded >= limit
1933        {
1934            return None;
1935        }
1936
1937        loop {
1938            let (path, value_bytes) = self.results.next()?;
1939
1940            // Parse path: table/row_id
1941            let parts: Vec<&str> = path.split('/').collect();
1942            if parts.len() != 2 {
1943                continue; // Skip non-row entries
1944            }
1945
1946            // Apply offset
1947            if self.skipped < self.offset {
1948                self.skipped += 1;
1949                continue;
1950            }
1951
1952            if let Some(ref schema) = self.packed_schema {
1953                match PackedRow::from_bytes(value_bytes, schema.num_columns()) {
1954                    Ok(packed_row) => {
1955                        let row = if let Some(ref cols) = self.columns {
1956                            // Project specific columns
1957                            cols.iter()
1958                                .map(|col_name| {
1959                                    schema
1960                                        .column_index(col_name)
1961                                        .and_then(|idx| schema.column(idx))
1962                                        .and_then(|col_def| {
1963                                            packed_row.get_column(
1964                                                schema.column_index(col_name).unwrap(),
1965                                                col_def.col_type,
1966                                            )
1967                                        })
1968                                        .unwrap_or(SochValue::Null)
1969                                })
1970                                .collect()
1971                        } else {
1972                            // All columns in order
1973                            packed_row.unpack_to_vec(schema)
1974                        };
1975
1976                        self.yielded += 1;
1977                        return Some(Ok(row));
1978                    }
1979                    Err(e) => return Some(Err(e)),
1980                }
1981            } else {
1982                // No schema - return raw bytes as binary
1983                self.yielded += 1;
1984                return Some(Ok(vec![SochValue::Binary(value_bytes)]));
1985            }
1986        }
1987    }
1988}
1989
1990// Helper functions for serialization (kept for backward compatibility with legacy data)
1991
1992#[allow(dead_code)]
1993fn serialize_value(value: &SochValue) -> Vec<u8> {
1994    // Simple serialization - in production use proper format
1995    match value {
1996        SochValue::Null => vec![0],
1997        SochValue::Int(i) => {
1998            let mut buf = vec![1];
1999            buf.extend_from_slice(&i.to_le_bytes());
2000            buf
2001        }
2002        SochValue::UInt(u) => {
2003            let mut buf = vec![2];
2004            buf.extend_from_slice(&u.to_le_bytes());
2005            buf
2006        }
2007        SochValue::Float(f) => {
2008            let mut buf = vec![3];
2009            buf.extend_from_slice(&f.to_le_bytes());
2010            buf
2011        }
2012        SochValue::Text(s) => {
2013            let mut buf = vec![4];
2014            buf.extend_from_slice(s.as_bytes());
2015            buf
2016        }
2017        SochValue::Bool(b) => vec![5, if *b { 1 } else { 0 }],
2018        SochValue::Binary(b) => {
2019            let mut buf = vec![6];
2020            buf.extend_from_slice(b);
2021            buf
2022        }
2023        _ => {
2024            // Fallback: serialize as text
2025            let s = format!("{:?}", value);
2026            let mut buf = vec![4];
2027            buf.extend_from_slice(s.as_bytes());
2028            buf
2029        }
2030    }
2031}
2032
2033fn deserialize_value(bytes: &[u8]) -> SochValue {
2034    if bytes.is_empty() {
2035        return SochValue::Null;
2036    }
2037
2038    match bytes[0] {
2039        0 => SochValue::Null,
2040        1 if bytes.len() >= 9 => {
2041            let i = i64::from_le_bytes(bytes[1..9].try_into().unwrap());
2042            SochValue::Int(i)
2043        }
2044        2 if bytes.len() >= 9 => {
2045            let u = u64::from_le_bytes(bytes[1..9].try_into().unwrap());
2046            SochValue::UInt(u)
2047        }
2048        3 if bytes.len() >= 9 => {
2049            let f = f64::from_le_bytes(bytes[1..9].try_into().unwrap());
2050            SochValue::Float(f)
2051        }
2052        4 => {
2053            let s = String::from_utf8_lossy(&bytes[1..]).to_string();
2054            SochValue::Text(s)
2055        }
2056        5 if bytes.len() >= 2 => SochValue::Bool(bytes[1] != 0),
2057        6 => SochValue::Binary(bytes[1..].to_vec()),
2058        _ => {
2059            // Treat as text
2060            let s = String::from_utf8_lossy(bytes).to_string();
2061            SochValue::Text(s)
2062        }
2063    }
2064}
2065
2066// ============================================================================
2067// Helper functions for columnar query result building
2068// ============================================================================
2069
2070/// Push a SochValue into a TypedColumn
2071fn push_value_to_typed_column(col: &mut CoreTypedColumn, value: Option<SochValue>) {
2072    match value {
2073        None => push_null_to_typed_column(col),
2074        Some(v) => match (col, v) {
2075            (
2076                CoreTypedColumn::Int64 {
2077                    values,
2078                    validity,
2079                    stats,
2080                },
2081                SochValue::Int(i),
2082            ) => {
2083                values.push(i);
2084                validity.push(true);
2085                stats.update_i64(i);
2086            }
2087            (
2088                CoreTypedColumn::Int64 {
2089                    values,
2090                    validity,
2091                    stats,
2092                },
2093                SochValue::UInt(u),
2094            ) => {
2095                values.push(u as i64);
2096                validity.push(true);
2097                stats.update_i64(u as i64);
2098            }
2099            (
2100                CoreTypedColumn::UInt64 {
2101                    values,
2102                    validity,
2103                    stats,
2104                },
2105                SochValue::UInt(u),
2106            ) => {
2107                values.push(u);
2108                validity.push(true);
2109                stats.update_i64(u as i64);
2110            }
2111            (
2112                CoreTypedColumn::UInt64 {
2113                    values,
2114                    validity,
2115                    stats,
2116                },
2117                SochValue::Int(i),
2118            ) => {
2119                values.push(i as u64);
2120                validity.push(true);
2121                stats.update_i64(i);
2122            }
2123            (
2124                CoreTypedColumn::Float64 {
2125                    values,
2126                    validity,
2127                    stats,
2128                },
2129                SochValue::Float(f),
2130            ) => {
2131                values.push(f);
2132                validity.push(true);
2133                stats.update_f64(f);
2134            }
2135            (
2136                CoreTypedColumn::Float64 {
2137                    values,
2138                    validity,
2139                    stats,
2140                },
2141                SochValue::Int(i),
2142            ) => {
2143                values.push(i as f64);
2144                validity.push(true);
2145                stats.update_f64(i as f64);
2146            }
2147            (
2148                CoreTypedColumn::Text {
2149                    offsets,
2150                    data,
2151                    validity,
2152                    stats,
2153                },
2154                SochValue::Text(s),
2155            ) => {
2156                data.extend_from_slice(s.as_bytes());
2157                offsets.push(data.len() as u32);
2158                validity.push(true);
2159                stats.row_count += 1;
2160            }
2161            (
2162                CoreTypedColumn::Binary {
2163                    offsets,
2164                    data,
2165                    validity,
2166                    stats,
2167                },
2168                SochValue::Binary(b),
2169            ) => {
2170                data.extend_from_slice(&b);
2171                offsets.push(data.len() as u32);
2172                validity.push(true);
2173                stats.row_count += 1;
2174            }
2175            (
2176                CoreTypedColumn::Bool {
2177                    values,
2178                    validity,
2179                    stats,
2180                    len,
2181                },
2182                SochValue::Bool(b),
2183            ) => {
2184                let idx = *len;
2185                *len += 1;
2186                let num_words = (*len).div_ceil(64);
2187                while values.len() < num_words {
2188                    values.push(0);
2189                }
2190                if b {
2191                    let word = idx / 64;
2192                    let bit = idx % 64;
2193                    values[word] |= 1 << bit;
2194                }
2195                validity.push(true);
2196                stats.row_count += 1;
2197            }
2198            // Type mismatch - push as null
2199            (col, _) => push_null_to_typed_column(col),
2200        },
2201    }
2202}
2203
2204/// Push a null value into a TypedColumn
2205fn push_null_to_typed_column(col: &mut CoreTypedColumn) {
2206    match col {
2207        CoreTypedColumn::Int64 {
2208            values,
2209            validity,
2210            stats,
2211        } => {
2212            values.push(0);
2213            validity.push(false);
2214            stats.update_null();
2215        }
2216        CoreTypedColumn::UInt64 {
2217            values,
2218            validity,
2219            stats,
2220        } => {
2221            values.push(0);
2222            validity.push(false);
2223            stats.update_null();
2224        }
2225        CoreTypedColumn::Float64 {
2226            values,
2227            validity,
2228            stats,
2229        } => {
2230            values.push(0.0);
2231            validity.push(false);
2232            stats.update_null();
2233        }
2234        CoreTypedColumn::Text {
2235            offsets,
2236            data: _,
2237            validity,
2238            stats,
2239        } => {
2240            offsets.push(offsets.last().copied().unwrap_or(0));
2241            validity.push(false);
2242            stats.update_null();
2243        }
2244        CoreTypedColumn::Binary {
2245            offsets,
2246            data: _,
2247            validity,
2248            stats,
2249        } => {
2250            offsets.push(offsets.last().copied().unwrap_or(0));
2251            validity.push(false);
2252            stats.update_null();
2253        }
2254        CoreTypedColumn::Bool {
2255            values,
2256            validity,
2257            stats,
2258            len,
2259        } => {
2260            *len += 1;
2261            let num_words = (*len).div_ceil(64);
2262            while values.len() < num_words {
2263                values.push(0);
2264            }
2265            validity.push(false);
2266            stats.update_null();
2267        }
2268    }
2269}
2270
2271#[cfg(test)]
2272mod tests {
2273    use super::*;
2274    use tempfile::tempdir;
2275
2276    #[test]
2277    fn test_database_open_close() {
2278        let dir = tempdir().unwrap();
2279        let db = Database::open(dir.path()).unwrap();
2280
2281        // Should be able to begin a transaction
2282        let txn = db.begin_transaction().unwrap();
2283        assert!(txn.txn_id > 0);
2284
2285        db.abort(txn).unwrap();
2286        db.shutdown().unwrap();
2287    }
2288
2289    #[test]
2290    fn test_database_put_get() {
2291        let dir = tempdir().unwrap();
2292        let db = Database::open(dir.path()).unwrap();
2293
2294        let txn = db.begin_transaction().unwrap();
2295        db.put(txn, b"key1", b"value1").unwrap();
2296
2297        let val = db.get(txn, b"key1").unwrap();
2298        assert_eq!(val, Some(b"value1".to_vec()));
2299
2300        db.commit(txn).unwrap();
2301
2302        // New transaction should see committed data
2303        let txn2 = db.begin_transaction().unwrap();
2304        let val = db.get(txn2, b"key1").unwrap();
2305        assert_eq!(val, Some(b"value1".to_vec()));
2306        db.abort(txn2).unwrap();
2307    }
2308
2309    #[test]
2310    fn test_database_path_api() {
2311        let dir = tempdir().unwrap();
2312        let db = Database::open(dir.path()).unwrap();
2313
2314        let txn = db.begin_transaction().unwrap();
2315
2316        // Write using path API
2317        db.put_path(txn, "users/1/name", b"Alice").unwrap();
2318        db.put_path(txn, "users/1/email", b"alice@example.com")
2319            .unwrap();
2320        db.put_path(txn, "users/2/name", b"Bob").unwrap();
2321
2322        db.commit(txn).unwrap();
2323
2324        // Scan path prefix
2325        let txn2 = db.begin_transaction().unwrap();
2326        let results = db.scan_path(txn2, "users/1/").unwrap();
2327        assert_eq!(results.len(), 2);
2328
2329        db.abort(txn2).unwrap();
2330    }
2331
2332    #[test]
2333    fn test_database_table_api() {
2334        let dir = tempdir().unwrap();
2335        let db = Database::open(dir.path()).unwrap();
2336
2337        // Register table
2338        db.register_table(TableSchema {
2339            name: "users".to_string(),
2340            columns: vec![
2341                ColumnDef {
2342                    name: "name".to_string(),
2343                    col_type: ColumnType::Text,
2344                    nullable: false,
2345                },
2346                ColumnDef {
2347                    name: "age".to_string(),
2348                    col_type: ColumnType::Int64,
2349                    nullable: true,
2350                },
2351            ],
2352        })
2353        .unwrap();
2354
2355        // Insert row
2356        let txn = db.begin_transaction().unwrap();
2357        let mut values = HashMap::new();
2358        values.insert("name".to_string(), SochValue::Text("Alice".to_string()));
2359        values.insert("age".to_string(), SochValue::Int(30));
2360
2361        db.insert_row(txn, "users", 1, &values).unwrap();
2362        db.commit(txn).unwrap();
2363
2364        // Read row
2365        let txn2 = db.begin_transaction().unwrap();
2366        let row = db.read_row(txn2, "users", 1, None).unwrap();
2367        assert!(row.is_some());
2368
2369        let row = row.unwrap();
2370        assert_eq!(row.get("name"), Some(&SochValue::Text("Alice".to_string())));
2371
2372        db.abort(txn2).unwrap();
2373    }
2374
2375    #[test]
2376    fn test_database_query_builder() {
2377        let dir = tempdir().unwrap();
2378        let db = Database::open(dir.path()).unwrap();
2379
2380        // Insert test data
2381        let txn = db.begin_transaction().unwrap();
2382        db.put_path(txn, "docs/1/title", b"Hello").unwrap();
2383        db.put_path(txn, "docs/1/content", b"World").unwrap();
2384        db.put_path(txn, "docs/2/title", b"Foo").unwrap();
2385        db.put_path(txn, "docs/2/content", b"Bar").unwrap();
2386        db.commit(txn).unwrap();
2387
2388        // Query with limit
2389        let txn2 = db.begin_transaction().unwrap();
2390        let result = db.query(txn2, "docs/").limit(1).execute().unwrap();
2391
2392        assert_eq!(result.rows.len(), 1);
2393        db.abort(txn2).unwrap();
2394    }
2395
2396    #[test]
2397    fn test_database_crash_recovery() {
2398        let dir = tempdir().unwrap();
2399
2400        // Write and commit
2401        {
2402            // Use open_without_lock for crash simulation tests
2403            let db = Database::open_without_lock(dir.path()).unwrap();
2404            // Set sync mode to FULL to ensure data is persisted before "crash"
2405            db.storage.set_sync_mode(2);
2406            let txn = db.begin_transaction().unwrap();
2407            db.put(txn, b"persist", b"this").unwrap();
2408            db.commit(txn).unwrap();
2409            // Don't call shutdown - simulate crash
2410            std::mem::forget(db);
2411        }
2412
2413        // Reopen - should recover
2414        {
2415            let db = Database::open_without_lock(dir.path()).unwrap();
2416            let txn = db.begin_transaction().unwrap();
2417            let val = db.get(txn, b"persist").unwrap();
2418            assert_eq!(val, Some(b"this".to_vec()));
2419            db.abort(txn).unwrap();
2420        }
2421    }
2422}