sochdb_storage/
database.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SochDB Database Kernel
16//!
17//! The shared core that powers both embedded mode (`SochConnection::open`) and
18//! server mode (`sochdb-server`). This is the "SQLite engine" equivalent.
19//!
20//! ## Architecture
21//!
22//! ```text
23//! ┌──────────────────────────────────────────────────────────────────┐
24//! │                        Database Kernel                            │
25//! │  Arc<Database> - shared by all connections                       │
26//! ├──────────────────────────────────────────────────────────────────┤
27//! │                                                                   │
28//! │  ┌─────────────────┐   ┌─────────────────┐   ┌────────────────┐ │
29//! │  │  DurableStorage │   │     Catalog     │   │  Vector Index  │ │
30//! │  │  (WAL + MVCC)   │   │  (Schema Mgmt)  │   │  (HNSW/Vamana) │ │
31//! │  └────────┬────────┘   └────────┬────────┘   └───────┬────────┘ │
32//! │           │                     │                     │          │
33//! │           └─────────────────────┴─────────────────────┘          │
34//! │                                 │                                 │
35//! │  ┌─────────────────────────────────────────────────────────────┐ │
36//! │  │              Query Executor (Path-Native)                    │ │
37//! │  │  - Path resolution: O(|path|)                                │ │
38//! │  │  - Column projection: 80% I/O reduction                     │ │
39//! │  │  - Context selection: Token-aware chunking                  │ │
40//! │  └─────────────────────────────────────────────────────────────┘ │
41//! │                                                                   │
42//! └──────────────────────────────────────────────────────────────────┘
43//!
44//! Deployment Modes:
45//! ┌─────────────┐   ┌─────────────┐   ┌─────────────┐
46//! │  Embedded   │   │  IPC Server │   │  TCP Server │
47//! │  (in-proc)  │   │  (Unix sock)│   │  (remote)   │
48//! └──────┬──────┘   └──────┬──────┘   └──────┬──────┘
49//!        │                 │                 │
50//!        └─────────────────┴─────────────────┘
51//!                          │
52//!                   Arc<Database>
53//! ```
54//!
55//! ## Latency Model
56//!
57//! Let K = kernel processing cost for a query
58//!
59//! - Embedded: L_emb ≈ K (function call overhead negligible)
60//! - IPC: L_ipc ≈ K + δ_ipc (δ_ipc = ~10-50µs for Unix socket)
61//! - TCP: L_tcp ≈ K + δ_net (δ_net = 100µs-10ms depending on network)
62//!
63//! For LLM context queries where K >> δ_ipc, IPC is "nearly embedded".
64
65use std::collections::HashMap;
66use std::path::{Path, PathBuf};
67use std::sync::Arc;
68use std::sync::atomic::{AtomicU64, Ordering};
69
70use dashmap::DashMap;
71use parking_lot::RwLock;
72
73use crate::durable_storage::{DurableStorage, TransactionMode};
74use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
75use crate::key_buffer::KeyBuffer;
76use crate::packed_row::{PackedColumnDef, PackedColumnType, PackedRow, PackedTableSchema};
77use sochdb_core::catalog::Catalog;
78use sochdb_core::{Result, SochDBError, SochValue};
79
80// Re-export key types
81pub use crate::durable_storage::RecoveryStats;
82
83/// Database configuration
84#[derive(Debug, Clone)]
85pub struct DatabaseConfig {
86    /// Enable group commit for better write throughput
87    pub group_commit: bool,
88    /// Maximum memory for memtables before flush (bytes)
89    pub memtable_size_limit: usize,
90    /// Enable WAL for durability
91    pub wal_enabled: bool,
92    /// Sync mode: fsync after every commit vs periodic
93    pub sync_mode: SyncMode,
94    /// Read-only mode
95    pub read_only: bool,
96    
97    /// Enable ordered index for O(log N) prefix scans
98    ///
99    /// # Deprecation Notice
100    /// 
101    /// **DEPRECATED since 0.2.0**: Use `default_index_policy` instead for per-table control.
102    /// This field will be removed in v0.3.0.
103    ///
104    /// ## Migration Guide
105    ///
106    /// Replace:
107    /// ```ignore
108    /// DatabaseConfig { enable_ordered_index: true, .. }  // Old API
109    /// DatabaseConfig { enable_ordered_index: false, .. } // Old API
110    /// ```
111    ///
112    /// With:
113    /// ```ignore
114    /// DatabaseConfig { default_index_policy: IndexPolicy::ScanOptimized, .. }  // Ordered index enabled
115    /// DatabaseConfig { default_index_policy: IndexPolicy::WriteOptimized, .. } // Ordered index disabled
116    /// ```
117    ///
118    /// ## Behavior
119    ///
120    /// When false, saves ~134 ns/op on writes (20% speedup)
121    /// but scan_prefix becomes O(N) instead of O(log N + K).
122    /// 
123    /// Set to false for write-heavy workloads without range scans.
124    #[deprecated(
125        since = "0.2.0",
126        note = "Use `default_index_policy` field instead. This field will be removed in v0.3.0. \
127                Set IndexPolicy::ScanOptimized for ordered index, WriteOptimized to disable."
128    )]
129    ///
130    /// Set to false for write-heavy workloads without range scans.
131    pub enable_ordered_index: bool,
132    /// Group commit configuration
133    pub group_commit_config: GroupCommitSettings,
134    /// Default index policy for tables not explicitly configured
135    ///
136    /// This replaces the global `enable_ordered_index` toggle with
137    /// fine-grained per-table control. Use `index_registry` to configure
138    /// individual tables.
139    ///
140    /// | Policy         | Insert Cost | Scan Cost      | Use Case              |
141    /// |----------------|-------------|----------------|------------------------|
142    /// | WriteOptimized | O(1)        | O(N)           | High-write, rare scan  |
143    /// | Balanced       | O(1) amort  | O(output+logK) | Mixed OLTP            |
144    /// | ScanOptimized  | O(log N)    | O(logN + K)    | Analytics, range query |
145    /// | AppendOnly     | O(1)        | O(N)           | Time-series logs       |
146    pub default_index_policy: IndexPolicy,
147}
148
149/// Group commit settings - mirrors SQLite's WAL mode tuning
150///
151/// ## Performance Model
152///
153/// Without group commit: Throughput = 1 / L_fsync ≈ 200 commits/sec (L=5ms)
154/// With group commit (batch size K): Throughput = K / L_fsync = K × 200 commits/sec
155///
156/// For K=100: 20,000 commits/sec (100× speedup)
157///
158/// ## SQLite Comparison
159///
160/// | Setting                    | SQLite Equivalent           |
161/// |----------------------------|-----------------------------|
162/// | batch_size = 1             | PRAGMA synchronous = FULL   |
163/// | batch_size = 100           | WAL mode with batching      |
164/// | max_wait_us = 0            | No delay, immediate flush   |
165/// | max_wait_us = 10000        | Up to 10ms delay for batch  |
166#[derive(Debug, Clone)]
167pub struct GroupCommitSettings {
168    /// Minimum batch size before flush (default: 1)
169    pub min_batch_size: usize,
170    /// Maximum batch size (default: 1000)
171    pub max_batch_size: usize,
172    /// Maximum wait time before flush in microseconds (default: 10000 = 10ms)
173    pub max_wait_us: u64,
174    /// Expected fsync latency in microseconds (for adaptive sizing)
175    pub fsync_latency_us: u64,
176}
177
178impl Default for GroupCommitSettings {
179    fn default() -> Self {
180        Self {
181            min_batch_size: 1,
182            max_batch_size: 1000,
183            max_wait_us: 10_000,     // 10ms
184            fsync_latency_us: 5_000, // 5ms
185        }
186    }
187}
188
189impl GroupCommitSettings {
190    /// High throughput preset - maximizes batching
191    pub fn high_throughput() -> Self {
192        Self {
193            min_batch_size: 50,
194            max_batch_size: 5000,
195            max_wait_us: 50_000, // 50ms
196            fsync_latency_us: 5_000,
197        }
198    }
199
200    /// Low latency preset - minimal batching
201    pub fn low_latency() -> Self {
202        Self {
203            min_batch_size: 1,
204            max_batch_size: 10,
205            max_wait_us: 1_000, // 1ms
206            fsync_latency_us: 5_000,
207        }
208    }
209
210    /// Calculate optimal batch size using Little's Law
211    ///
212    /// N* = sqrt(2 × L_fsync × λ / C_wait)
213    ///
214    /// # Arguments
215    /// * `arrival_rate` - Operations per second
216    /// * `wait_cost` - Cost coefficient for waiting (0.0-1.0)
217    pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
218        let l_fsync = self.fsync_latency_us as f64 / 1_000_000.0;
219        let n_star = (2.0 * l_fsync * arrival_rate / wait_cost.max(0.001)).sqrt();
220        (n_star as usize).clamp(self.min_batch_size, self.max_batch_size)
221    }
222}
223
224impl Default for DatabaseConfig {
225    #[allow(deprecated)]
226    fn default() -> Self {
227        Self {
228            group_commit: true,
229            memtable_size_limit: 64 * 1024 * 1024, // 64MB
230            wal_enabled: true,
231            sync_mode: SyncMode::Normal,
232            read_only: false,
233            enable_ordered_index: true, // Default: enabled for compatibility
234            group_commit_config: GroupCommitSettings::default(),
235            default_index_policy: IndexPolicy::Balanced, // New default: balanced OLTP policy
236        }
237    }
238}
239
240impl DatabaseConfig {
241    /// Create config optimized for throughput (Fast Mode)
242    ///
243    /// - Disables ordered index (saves ~134 ns/op)
244    /// - Uses high-throughput group commit settings
245    /// - Suitable for append-only workloads
246    #[allow(deprecated)]
247    pub fn throughput_optimized() -> Self {
248        Self {
249            group_commit: true,
250            memtable_size_limit: 128 * 1024 * 1024, // 128MB
251            wal_enabled: true,
252            sync_mode: SyncMode::Normal,
253            read_only: false,
254            enable_ordered_index: false,
255            group_commit_config: GroupCommitSettings::high_throughput(),
256            default_index_policy: IndexPolicy::WriteOptimized, // No ordered index overhead
257        }
258    }
259
260    /// Create config optimized for latency
261    ///
262    /// - Keeps ordered index for fast range scans
263    /// - Uses low-latency group commit settings
264    /// - Suitable for OLTP workloads
265    #[allow(deprecated)]
266    pub fn latency_optimized() -> Self {
267        Self {
268            group_commit: true,
269            memtable_size_limit: 32 * 1024 * 1024, // 32MB
270            wal_enabled: true,
271            sync_mode: SyncMode::Full,
272            read_only: false,
273            enable_ordered_index: true,
274            group_commit_config: GroupCommitSettings::low_latency(),
275            default_index_policy: IndexPolicy::ScanOptimized, // Fast range scans
276        }
277    }
278
279    /// Create config matching SQLite defaults
280    #[allow(deprecated)]
281    pub fn sqlite_compatible() -> Self {
282        Self {
283            group_commit: false, // SQLite default is single-commit
284            memtable_size_limit: 64 * 1024 * 1024,
285            wal_enabled: true,
286            sync_mode: SyncMode::Normal, // PRAGMA synchronous = NORMAL
287            read_only: false,
288            enable_ordered_index: true,
289            group_commit_config: GroupCommitSettings::default(),
290            default_index_policy: IndexPolicy::Balanced, // Good default for mixed workloads
291        }
292    }
293
294    /// Get effective ordered index setting, derived from `default_index_policy`.
295    /// 
296    /// This is the shim method for the deprecated `enable_ordered_index` field.
297    /// It returns `true` if the policy requires an ordered index (ScanOptimized),
298    /// and `false` otherwise (WriteOptimized, Balanced, AppendOnly).
299    ///
300    /// # Policy Mapping
301    ///
302    /// | IndexPolicy      | Returns |
303    /// |------------------|---------|
304    /// | ScanOptimized    | true    |
305    /// | Balanced         | false   |
306    /// | WriteOptimized   | false   |
307    /// | AppendOnly       | false   |
308    ///
309    /// Note: `Balanced` uses lazy compaction rather than a live ordered index,
310    /// so it returns `false` for the low-level memtable config but still supports
311    /// efficient range scans via sorted runs.
312    pub fn effective_ordered_index(&self) -> bool {
313        matches!(self.default_index_policy, IndexPolicy::ScanOptimized)
314    }
315}
316
317/// WAL sync mode - matches SQLite's PRAGMA synchronous semantics
318///
319/// | SochDB     | SQLite       | Description                                    |
320/// |------------|--------------|------------------------------------------------|
321/// | Off        | OFF (0)      | No fsync, risk of data loss on crash           |
322/// | Normal     | NORMAL (1)   | Fsync at checkpoints, not every commit         |
323/// | Full       | FULL (2)     | Fsync every commit (safest, slowest)           |
324///
325/// # Performance vs Durability Trade-offs
326///
327/// - **Off**: ~10x faster than Full, but may lose last ~100ms of data on crash
328/// - **Normal**: ~5x faster than Full, durable at checkpoint boundaries
329/// - **Full**: Every commit is fsync'd, no data loss possible
330///
331/// # SQLite Compatibility
332///
333/// ```sql
334/// -- SQLite equivalent settings
335/// PRAGMA synchronous = OFF;    -- SyncMode::Off
336/// PRAGMA synchronous = NORMAL; -- SyncMode::Normal  
337/// PRAGMA synchronous = FULL;   -- SyncMode::Full
338/// ```
339#[derive(Debug, Clone, Copy, PartialEq, Eq)]
340pub enum SyncMode {
341    /// No fsync (equivalent to SQLite PRAGMA synchronous = OFF)
342    ///
343    /// Writes buffered in OS, may lose data on power failure.
344    /// Use for non-critical data or bulk loading.
345    Off = 0,
346
347    /// Fsync at checkpoints (equivalent to SQLite PRAGMA synchronous = NORMAL)
348    ///
349    /// Default mode. Syncs WAL at checkpoint boundaries.
350    /// Good balance of performance and durability.
351    Normal = 1,
352
353    /// Fsync every commit (equivalent to SQLite PRAGMA synchronous = FULL)
354    ///
355    /// Safest mode. Every commit is immediately durable.
356    /// Required for financial/critical data.
357    Full = 2,
358}
359
360impl SyncMode {
361    /// Convert from SQLite synchronous pragma value
362    pub fn from_sqlite_pragma(value: u32) -> Self {
363        match value {
364            0 => SyncMode::Off,
365            1 => SyncMode::Normal,
366            _ => SyncMode::Full, // 2+ treated as Full
367        }
368    }
369
370    /// Convert to SQLite synchronous pragma value
371    pub fn to_sqlite_pragma(self) -> u32 {
372        self as u32
373    }
374
375    /// Parse from string (case-insensitive)
376    pub fn parse(s: &str) -> Option<Self> {
377        match s.to_ascii_uppercase().as_str() {
378            "OFF" | "0" => Some(SyncMode::Off),
379            "NORMAL" | "1" => Some(SyncMode::Normal),
380            "FULL" | "2" => Some(SyncMode::Full),
381            _ => None,
382        }
383    }
384}
385
386/// Table schema for the kernel
387#[derive(Debug, Clone)]
388pub struct TableSchema {
389    pub name: String,
390    pub columns: Vec<ColumnDef>,
391}
392
393/// Column definition
394#[derive(Debug, Clone)]
395pub struct ColumnDef {
396    pub name: String,
397    pub col_type: ColumnType,
398    pub nullable: bool,
399}
400
401/// Column types
402#[derive(Debug, Clone, Copy, PartialEq, Eq)]
403pub enum ColumnType {
404    Int64,
405    UInt64,
406    Float64,
407    Text,
408    Binary,
409    Bool,
410}
411
412/// Transaction handle for kernel operations
413#[derive(Debug, Clone, Copy)]
414pub struct TxnHandle {
415    pub txn_id: u64,
416    pub snapshot_ts: u64,
417}
418
419/// Query result from the kernel
420#[derive(Debug, Clone)]
421pub struct QueryResult {
422    /// Column names
423    pub columns: Vec<String>,
424    /// Row data (each row is a map of column -> value)
425    pub rows: Vec<HashMap<String, SochValue>>,
426    /// Number of rows scanned (for stats)
427    pub rows_scanned: usize,
428    /// Bytes read from storage
429    pub bytes_read: usize,
430}
431
432impl QueryResult {
433    /// Empty result
434    pub fn empty() -> Self {
435        Self {
436            columns: vec![],
437            rows: vec![],
438            rows_scanned: 0,
439            bytes_read: 0,
440        }
441    }
442
443    /// Convert to TOON format for token efficiency
444    pub fn to_toon(&self) -> String {
445        if self.rows.is_empty() {
446            return "[]".to_string();
447        }
448
449        // TOON format: table[N]{cols}: row1; row2; ...
450        let n = self.rows.len();
451        let cols = self.columns.join(",");
452
453        let rows_str: Vec<String> = self
454            .rows
455            .iter()
456            .map(|row| {
457                self.columns
458                    .iter()
459                    .map(|c| {
460                        row.get(c)
461                            .map(format_soch_value)
462                            .unwrap_or_else(|| "∅".to_string())
463                    })
464                    .collect::<Vec<_>>()
465                    .join(",")
466            })
467            .collect();
468
469        format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
470    }
471}
472
473fn format_soch_value(v: &SochValue) -> String {
474    match v {
475        SochValue::Null => "∅".to_string(),
476        SochValue::Int(i) => i.to_string(),
477        SochValue::UInt(u) => u.to_string(),
478        SochValue::Float(f) => format!("{:.6}", f),
479        SochValue::Text(s) => {
480            if s.contains(',') || s.contains(';') {
481                format!("\"{}\"", s.replace('"', "\\\""))
482            } else {
483                s.clone()
484            }
485        }
486        SochValue::Bool(b) => if *b { "T" } else { "F" }.to_string(),
487        SochValue::Binary(b) => format!("b64:{}", base64_encode(b)),
488        _ => format!("{:?}", v),
489    }
490}
491
492fn base64_encode(data: &[u8]) -> String {
493    // Simple base64 encoding
494    const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
495    let mut result = String::new();
496
497    for chunk in data.chunks(3) {
498        let b0 = chunk[0] as usize;
499        let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
500        let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
501
502        result.push(ALPHABET[b0 >> 2] as char);
503        result.push(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
504
505        if chunk.len() > 1 {
506            result.push(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
507        } else {
508            result.push('=');
509        }
510
511        if chunk.len() > 2 {
512            result.push(ALPHABET[b2 & 0x3f] as char);
513        } else {
514            result.push('=');
515        }
516    }
517
518    result
519}
520
521// ============================================================================
522// Columnar Query Results - SIMD-friendly result format
523// ============================================================================
524
525use sochdb_core::TypedColumn as CoreTypedColumn;
526
527/// Columnar query result - SIMD-friendly format for analytics
528///
529/// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, this returns
530/// column-oriented `Vec<TypedColumn>` for efficient vectorized operations.
531///
532/// ## Memory Layout
533///
534/// Row-oriented (standard):
535/// ```text
536/// Row 0: [id=1, name="Alice", score=85]
537/// Row 1: [id=2, name="Bob", score=92]
538/// Row 2: [id=3, name="Carol", score=78]
539/// ```
540///
541/// Column-oriented (this format):
542/// ```text
543/// id:    [1, 2, 3]           ← contiguous i64 array (SIMD-friendly)
544/// name:  ["Alice", "Bob", "Carol"] ← Arrow-style string encoding
545/// score: [85, 92, 78]        ← contiguous i64 array
546/// ```
547///
548/// ## Performance Benefits
549///
550/// - SIMD: Column sums use vectorized instructions (~8× faster)
551/// - Cache: Sequential access pattern maximizes L1/L2 cache hits
552/// - Compression: Same-type data compresses better (5-10× typical)
553/// - Filtering: Bitmap operations instead of row iteration
554///
555/// ## Usage
556///
557/// ```ignore
558/// let result = db.query(txn, "users")
559///     .columns(&["id", "score"])
560///     .as_columnar()?;
561///
562/// // SIMD sum
563/// let total_score = result.column("score")
564///     .map(|c| c.sum_i64())
565///     .unwrap_or(0);
566///
567/// // Stats
568/// println!("Rows: {}, Memory: {} bytes", result.row_count(), result.memory_size());
569/// ```
570#[derive(Debug, Clone)]
571pub struct ColumnarQueryResult {
572    /// Column names in order
573    pub columns: Vec<String>,
574    /// Column data - each TypedColumn contains all values for one column
575    pub data: Vec<CoreTypedColumn>,
576    /// Number of rows
577    pub row_count: usize,
578    /// Bytes read from storage
579    pub bytes_read: usize,
580}
581
582impl ColumnarQueryResult {
583    /// Create an empty result
584    pub fn empty() -> Self {
585        Self {
586            columns: vec![],
587            data: vec![],
588            row_count: 0,
589            bytes_read: 0,
590        }
591    }
592
593    /// Get column by name
594    pub fn column(&self, name: &str) -> Option<&CoreTypedColumn> {
595        self.columns
596            .iter()
597            .position(|c| c == name)
598            .and_then(|idx| self.data.get(idx))
599    }
600
601    /// Get column index by name
602    pub fn column_index(&self, name: &str) -> Option<usize> {
603        self.columns.iter().position(|c| c == name)
604    }
605
606    /// Number of rows
607    pub fn row_count(&self) -> usize {
608        self.row_count
609    }
610
611    /// Number of columns
612    pub fn column_count(&self) -> usize {
613        self.columns.len()
614    }
615
616    /// Total memory size in bytes
617    pub fn memory_size(&self) -> usize {
618        self.data.iter().map(|c| c.memory_size()).sum()
619    }
620
621    /// Sum of an i64 column (SIMD-optimized)
622    pub fn sum_i64(&self, column: &str) -> Option<i64> {
623        self.column(column).map(|c| c.sum_i64())
624    }
625
626    /// Sum of an f64 column (SIMD-optimized)
627    pub fn sum_f64(&self, column: &str) -> Option<f64> {
628        self.column(column).map(|c| c.sum_f64())
629    }
630
631    /// Get column statistics (min, max, null count)
632    pub fn column_stats(&self, column: &str) -> Option<&sochdb_core::columnar::ColumnStats> {
633        self.column(column).map(|c| c.stats())
634    }
635
636    /// Convert to TOON format (token-efficient)
637    pub fn to_toon(&self) -> String {
638        if self.row_count == 0 {
639            return "[]".to_string();
640        }
641
642        let n = self.row_count;
643        let cols = self.columns.join(",");
644
645        // Build rows from columns
646        let mut rows_str = Vec::with_capacity(n);
647        for i in 0..n {
648            let row: Vec<String> = self
649                .data
650                .iter()
651                .map(|col| format_columnar_value(col, i))
652                .collect();
653            rows_str.push(row.join(","));
654        }
655
656        format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
657    }
658}
659
660/// Format a single value from a TypedColumn at index
661fn format_columnar_value(col: &CoreTypedColumn, idx: usize) -> String {
662    match col {
663        CoreTypedColumn::Int64 {
664            values, validity, ..
665        } => {
666            if validity.is_valid(idx) && idx < values.len() {
667                values[idx].to_string()
668            } else {
669                "∅".to_string()
670            }
671        }
672        CoreTypedColumn::UInt64 {
673            values, validity, ..
674        } => {
675            if validity.is_valid(idx) && idx < values.len() {
676                values[idx].to_string()
677            } else {
678                "∅".to_string()
679            }
680        }
681        CoreTypedColumn::Float64 {
682            values, validity, ..
683        } => {
684            if validity.is_valid(idx) && idx < values.len() {
685                format!("{:.6}", values[idx])
686            } else {
687                "∅".to_string()
688            }
689        }
690        CoreTypedColumn::Text {
691            offsets,
692            data,
693            validity,
694            ..
695        } => {
696            if validity.is_valid(idx) && idx + 1 < offsets.len() {
697                let start = offsets[idx] as usize;
698                let end = offsets[idx + 1] as usize;
699                std::str::from_utf8(&data[start..end])
700                    .map(|s| {
701                        if s.contains(',') || s.contains(';') {
702                            format!("\"{}\"", s.replace('"', "\\\""))
703                        } else {
704                            s.to_string()
705                        }
706                    })
707                    .unwrap_or_else(|_| "∅".to_string())
708            } else {
709                "∅".to_string()
710            }
711        }
712        CoreTypedColumn::Binary {
713            offsets,
714            data,
715            validity,
716            ..
717        } => {
718            if validity.is_valid(idx) && idx + 1 < offsets.len() {
719                let start = offsets[idx] as usize;
720                let end = offsets[idx + 1] as usize;
721                format!("b64:{}", base64_encode(&data[start..end]))
722            } else {
723                "∅".to_string()
724            }
725        }
726        CoreTypedColumn::Bool {
727            values,
728            validity,
729            len,
730            ..
731        } => {
732            if validity.is_valid(idx) && idx < *len {
733                let word = idx / 64;
734                let bit = idx % 64;
735                if (values[word] >> bit) & 1 == 1 {
736                    "T"
737                } else {
738                    "F"
739                }
740                .to_string()
741            } else {
742                "∅".to_string()
743            }
744        }
745    }
746}
747
748/// Vector search result
749#[derive(Debug, Clone)]
750pub struct VectorSearchResult {
751    pub id: u64,
752    pub distance: f32,
753    pub metadata: Option<HashMap<String, SochValue>>,
754}
755
756/// The SochDB Database Kernel
757///
758/// This is the shared core used by both embedded (`SochConnection`) and
759/// server (`sochdb-server`) modes. It owns all storage, catalog, and
760/// indexing components.
761///
762/// # Thread Safety
763///
764/// The Database is fully thread-safe via internal synchronization:
765/// - Multiple readers can operate concurrently (MVCC snapshots)
766/// - Writers coordinate through WAL and group commit
767/// - All state is behind Arc/RwLock for shared access
768///
769/// # Example
770///
771/// ```ignore
772/// // Open a database (SQLite-style)
773/// let db = Database::open("./my_data")?;
774///
775/// // Begin a transaction
776/// let txn = db.begin_transaction()?;
777///
778/// // Write data
779/// db.put(txn, b"user:1:name", b"Alice")?;
780///
781/// // Commit
782/// db.commit(txn)?;
783/// ```
784#[allow(dead_code)]
785pub struct Database {
786    /// Path to database directory
787    path: PathBuf,
788    /// Durable storage layer (WAL + MVCC + memtable)
789    storage: Arc<DurableStorage>,
790    /// Schema catalog
791    catalog: Arc<RwLock<Catalog>>,
792    /// Registered table schemas (name -> schema) - lock-free for reads
793    tables: DashMap<String, TableSchema>,
794    /// Cached packed schemas for fast insert (name -> packed schema)
795    packed_schemas: DashMap<String, PackedTableSchema>,
796    /// Per-table index policy registry
797    index_registry: Arc<TableIndexRegistry>,
798    /// Configuration
799    config: DatabaseConfig,
800    /// Statistics
801    stats: DatabaseStats,
802    /// Shutdown flag
803    shutdown: AtomicU64,
804}
805
806/// Database statistics
807struct DatabaseStats {
808    transactions_started: AtomicU64,
809    transactions_committed: AtomicU64,
810    transactions_aborted: AtomicU64,
811    queries_executed: AtomicU64,
812    bytes_written: AtomicU64,
813    bytes_read: AtomicU64,
814}
815
816impl DatabaseStats {
817    fn new() -> Self {
818        Self {
819            transactions_started: AtomicU64::new(0),
820            transactions_committed: AtomicU64::new(0),
821            transactions_aborted: AtomicU64::new(0),
822            queries_executed: AtomicU64::new(0),
823            bytes_written: AtomicU64::new(0),
824            bytes_read: AtomicU64::new(0),
825        }
826    }
827}
828
829/// Public statistics snapshot
830#[derive(Debug, Clone)]
831pub struct Stats {
832    pub transactions_started: u64,
833    pub transactions_committed: u64,
834    pub transactions_aborted: u64,
835    pub queries_executed: u64,
836    pub bytes_written: u64,
837    pub bytes_read: u64,
838}
839
840impl Database {
841    /// Open or create a database at the given path.
842    ///
843    /// This is the primary entry point, similar to `sqlite3_open()`.
844    /// If the database exists, it will be opened and WAL recovery performed.
845    /// If it doesn't exist, a new database will be created.
846    ///
847    /// # Arguments
848    ///
849    /// * `path` - Directory path for the database files
850    ///
851    /// # Returns
852    ///
853    /// An `Arc<Database>` that can be shared across threads and connections.
854    pub fn open<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
855        Self::open_with_config(path, DatabaseConfig::default())
856    }
857
858    /// Open with custom configuration
859    pub fn open_with_config<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> Result<Arc<Self>> {
860        let path = path.as_ref().to_path_buf();
861
862        // Use IndexPolicy-based storage configuration for automatic memtable selection
863        // This derives ordered index and memtable type from the policy
864        let storage = Arc::new(DurableStorage::open_with_policy(
865            &path,
866            config.default_index_policy,
867            config.group_commit,
868        )?);
869
870        // Create index registry with default policy from config
871        let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
872            config.default_index_policy,
873        ));
874
875        let db = Arc::new(Self {
876            path: path.clone(),
877            storage,
878            catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
879            tables: DashMap::new(),
880            packed_schemas: DashMap::new(),
881            index_registry,
882            config,
883            stats: DatabaseStats::new(),
884            shutdown: AtomicU64::new(0),
885        });
886
887        // Perform crash recovery if needed
888        db.recover()?;
889
890        Ok(db)
891    }
892
893    /// Perform crash recovery
894    fn recover(&self) -> Result<RecoveryStats> {
895        self.storage.recover()
896    }
897
898    /// Get database path
899    pub fn path(&self) -> &Path {
900        &self.path
901    }
902
903    // =========================================================================
904    // Transaction API
905    // =========================================================================
906
907    /// Begin a new transaction
908    pub fn begin_transaction(&self) -> Result<TxnHandle> {
909        self.stats
910            .transactions_started
911            .fetch_add(1, Ordering::Relaxed);
912        let txn_id = self.storage.begin_transaction()?;
913
914        // Get snapshot timestamp from MVCC
915        // For now, use txn_id as a proxy (the real snapshot_ts is managed internally)
916        Ok(TxnHandle {
917            txn_id,
918            snapshot_ts: txn_id,
919        })
920    }
921
922    /// Begin a read-only transaction (optimized: no SSI tracking)
923    ///
924    /// Read-only transactions skip SSI read tracking, reducing overhead
925    /// from ~82ns to ~32ns per read (2.6x faster).
926    ///
927    /// Use this for:
928    /// - SELECT queries that don't modify data
929    /// - Analytics and reporting queries
930    /// - Snapshot reads for backup
931    pub fn begin_read_only(&self) -> Result<TxnHandle> {
932        self.stats
933            .transactions_started
934            .fetch_add(1, Ordering::Relaxed);
935        let txn_id = self.storage.begin_with_mode(TransactionMode::ReadOnly)?;
936        Ok(TxnHandle {
937            txn_id,
938            snapshot_ts: txn_id,
939        })
940    }
941
942    /// Begin a write-only transaction (optimized: no read tracking)
943    ///
944    /// Write-only transactions skip read tracking, improving insert
945    /// throughput for bulk loading scenarios.
946    ///
947    /// Use this for:
948    /// - Bulk data imports
949    /// - Append-only logging tables
950    /// - ETL pipelines
951    pub fn begin_write_only(&self) -> Result<TxnHandle> {
952        self.stats
953            .transactions_started
954            .fetch_add(1, Ordering::Relaxed);
955        let txn_id = self.storage.begin_with_mode(TransactionMode::WriteOnly)?;
956        Ok(TxnHandle {
957            txn_id,
958            snapshot_ts: txn_id,
959        })
960    }
961
962    /// Commit a transaction
963    pub fn commit(&self, txn: TxnHandle) -> Result<u64> {
964        self.stats
965            .transactions_committed
966            .fetch_add(1, Ordering::Relaxed);
967        self.storage.commit(txn.txn_id)
968    }
969
970    /// Abort a transaction
971    pub fn abort(&self, txn: TxnHandle) -> Result<()> {
972        self.stats
973            .transactions_aborted
974            .fetch_add(1, Ordering::Relaxed);
975        self.storage.abort(txn.txn_id)
976    }
977
978    // =========================================================================
979    // Per-Table Index Policy API
980    // =========================================================================
981
982    /// Configure index policy for a table
983    ///
984    /// This allows fine-grained control over write/scan trade-offs per table:
985    ///
986    /// | Policy         | Insert Cost | Scan Cost      | Use Case              |
987    /// |----------------|-------------|----------------|------------------------|
988    /// | WriteOptimized | O(1)        | O(N)           | High-write, rare scan  |
989    /// | Balanced       | O(1) amort  | O(output+logK) | Mixed OLTP            |
990    /// | ScanOptimized  | O(log N)    | O(logN + K)    | Analytics, range query |
991    /// | AppendOnly     | O(1)        | O(N)           | Time-series logs       |
992    ///
993    /// # Example
994    ///
995    /// ```ignore
996    /// // Fast inserts for logs table (no ordered index overhead)
997    /// db.set_table_index_policy("logs", IndexPolicy::WriteOptimized);
998    ///
999    /// // Efficient range scans for analytics table
1000    /// db.set_table_index_policy("analytics", IndexPolicy::ScanOptimized);
1001    ///
1002    /// // Balanced for OLTP tables
1003    /// db.set_table_index_policy("users", IndexPolicy::Balanced);
1004    /// ```
1005    pub fn set_table_index_policy(&self, table: &str, policy: IndexPolicy) {
1006        self.index_registry.configure_table(
1007            TableIndexConfig::new(table, policy)
1008        );
1009    }
1010
1011    /// Get the index policy for a table
1012    pub fn get_table_index_policy(&self, table: &str) -> IndexPolicy {
1013        self.index_registry.get_policy(table)
1014    }
1015
1016    /// Get the index registry for advanced configuration
1017    pub fn index_registry(&self) -> &Arc<TableIndexRegistry> {
1018        &self.index_registry
1019    }
1020
1021    // =========================================================================
1022    // Key-Value API (Low-level)
1023    // =========================================================================
1024
1025    /// Put a key-value pair
1026    pub fn put(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1027        self.stats
1028            .bytes_written
1029            .fetch_add((key.len() + value.len()) as u64, Ordering::Relaxed);
1030        // Use write_refs to avoid unnecessary allocations
1031        self.storage.write_refs(txn.txn_id, key, value)
1032    }
1033
1034    /// Batch put multiple key-value pairs with reduced overhead
1035    ///
1036    /// This amortizes per-operation costs over the entire batch:
1037    /// - Single DashMap lookup
1038    /// - Batch MVCC tracking
1039    /// - Batch memtable writes
1040    ///
1041    /// For 100+ entries, this is 2-3x faster than individual puts.
1042    ///
1043    /// # Example
1044    ///
1045    /// ```ignore
1046    /// let writes: Vec<(&[u8], &[u8])> = vec![
1047    ///     (b"key1", b"value1"),
1048    ///     (b"key2", b"value2"),
1049    ///     (b"key3", b"value3"),
1050    /// ];
1051    /// db.put_batch(txn, &writes)?;
1052    /// ```
1053    pub fn put_batch(&self, txn: TxnHandle, writes: &[(&[u8], &[u8])]) -> Result<()> {
1054        let bytes: u64 = writes
1055            .iter()
1056            .map(|(k, v)| (k.len() + v.len()) as u64)
1057            .sum();
1058        self.stats.bytes_written.fetch_add(bytes, Ordering::Relaxed);
1059        self.storage.write_batch_refs(txn.txn_id, writes)
1060    }
1061
1062    /// Get a value by key
1063    pub fn get(&self, txn: TxnHandle, key: &[u8]) -> Result<Option<Vec<u8>>> {
1064        let result = self.storage.read(txn.txn_id, key)?;
1065        if let Some(ref data) = result {
1066            self.stats
1067                .bytes_read
1068                .fetch_add(data.len() as u64, Ordering::Relaxed);
1069        }
1070        Ok(result)
1071    }
1072
1073    /// Delete a key
1074    pub fn delete(&self, txn: TxnHandle, key: &[u8]) -> Result<()> {
1075        self.storage.delete(txn.txn_id, key.to_vec())
1076    }
1077
1078    /// Minimum prefix length for scan operations.
1079    /// Prevents expensive full-table scans by requiring a meaningful prefix.
1080    pub const MIN_SCAN_PREFIX_LEN: usize = 2;
1081
1082    /// Scan keys with a prefix (enforces minimum prefix length for safety).
1083    ///
1084    /// # Prefix Safety
1085    /// 
1086    /// To prevent accidental full-table scans, this method requires a minimum
1087    /// prefix length of 2 bytes. Use `scan_unchecked` for internal operations
1088    /// that need empty/short prefixes.
1089    ///
1090    /// # Errors
1091    ///
1092    /// Returns `SochDBError::InvalidInput` if prefix is too short.
1093    pub fn scan(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1094        if prefix.len() < Self::MIN_SCAN_PREFIX_LEN {
1095            return Err(SochDBError::InvalidArgument(format!(
1096                "Prefix too short: {} bytes (minimum {} required). \
1097                 Use scan_unchecked() for unrestricted scans.",
1098                prefix.len(),
1099                Self::MIN_SCAN_PREFIX_LEN
1100            )));
1101        }
1102        self.scan_unchecked(txn, prefix)
1103    }
1104
1105    /// Scan keys with a prefix without length validation.
1106    ///
1107    /// # Warning
1108    ///
1109    /// This method allows empty/short prefixes which can cause expensive
1110    /// full-table scans. Use `scan()` unless you specifically need unrestricted
1111    /// prefix access for internal operations.
1112    pub fn scan_unchecked(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1113        let results = self.storage.scan(txn.txn_id, prefix)?;
1114        let bytes: u64 = results
1115            .iter()
1116            .map(|(k, v)| (k.len() + v.len()) as u64)
1117            .sum();
1118        self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1119        Ok(results)
1120    }
1121
1122    /// Scan keys in range
1123    pub fn scan_range(
1124        &self,
1125        txn: TxnHandle,
1126        start: &[u8],
1127        end: &[u8],
1128    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1129        let results = self.storage.scan_range(txn.txn_id, start, end)?;
1130        let bytes: u64 = results
1131            .iter()
1132            .map(|(k, v)| (k.len() + v.len()) as u64)
1133            .sum();
1134        self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1135        Ok(results)
1136    }
1137
1138    /// Streaming scan for very large result sets
1139    /// 
1140    /// Returns an iterator that yields (key, value) pairs without
1141    /// materializing the entire result set. Use this for large scans
1142    /// where memory efficiency is important.
1143    /// 
1144    /// ## Performance
1145    /// 
1146    /// - Memory: O(1) per iteration vs O(N) for scan_range
1147    /// - Latency: First result available immediately vs waiting for all results
1148    /// - Throughput: Slightly lower due to per-item overhead
1149    /// 
1150    /// ## Usage
1151    /// 
1152    /// ```ignore
1153    /// for result in db.scan_range_iter(txn, b"start", b"end") {
1154    ///     let (key, value) = result?;
1155    ///     // Process immediately - no need to wait for all results
1156    /// }
1157    /// ```
1158    pub fn scan_range_iter<'a>(
1159        &'a self,
1160        txn: TxnHandle,
1161        start: &'a [u8],
1162        end: &'a [u8],
1163    ) -> impl Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a {
1164        let stats = &self.stats;
1165        self.storage
1166            .scan_range_iter(txn.txn_id, start, end)
1167            .map(move |item| {
1168                stats.bytes_read.fetch_add(
1169                    (item.0.len() + item.1.len()) as u64,
1170                    Ordering::Relaxed,
1171                );
1172                Ok(item)
1173            })
1174    }
1175
1176    /// Flush memtable to WAL/Disk
1177    pub fn flush(&self) -> Result<()> {
1178        self.storage.fsync()
1179    }
1180
1181    // =========================================================================
1182    // Path-Native API (SochDB's differentiator)
1183    // =========================================================================
1184
1185    /// Get storage statistics
1186    pub fn storage_stats(&self) -> crate::durable_storage::StorageStats {
1187        self.storage.stats()
1188    }
1189
1190    /// Put a value at a path
1191    ///
1192    /// Path format: "collection/doc_id/field" or "table.row_id.column"
1193    /// Resolution is O(|path|), not O(log N) like B-tree.
1194    pub fn put_path(&self, txn: TxnHandle, path: &str, value: &[u8]) -> Result<()> {
1195        self.put(txn, path.as_bytes(), value)
1196    }
1197
1198    /// Get a value at a path
1199    pub fn get_path(&self, txn: TxnHandle, path: &str) -> Result<Option<Vec<u8>>> {
1200        self.get(txn, path.as_bytes())
1201    }
1202
1203    /// Delete at a path
1204    pub fn delete_path(&self, txn: TxnHandle, path: &str) -> Result<()> {
1205        self.delete(txn, path.as_bytes())
1206    }
1207
1208    /// Scan a path prefix
1209    ///
1210    /// Returns all key-value pairs where key starts with prefix.
1211    /// Useful for: "users/123/" -> all fields of user 123
1212    pub fn scan_path(&self, txn: TxnHandle, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
1213        self.stats.queries_executed.fetch_add(1, Ordering::Relaxed);
1214
1215        let results = self.scan(txn, prefix.as_bytes())?;
1216
1217        Ok(results
1218            .into_iter()
1219            .filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
1220            .collect())
1221    }
1222
1223    // =========================================================================
1224    // Query API
1225    // =========================================================================
1226
1227    /// Execute a path query and return results
1228    ///
1229    /// This is the main query interface for LLM context retrieval.
1230    /// Supports:
1231    /// - Path prefix matching
1232    /// - Column projection (for I/O reduction)
1233    /// - Limit/offset
1234    pub fn query(&self, txn: TxnHandle, path_prefix: &str) -> QueryBuilder<'_> {
1235        QueryBuilder::new(self, txn, path_prefix.to_string())
1236    }
1237
1238    // =========================================================================
1239    // Table API (Higher-level abstraction)
1240    // =========================================================================
1241
1242    /// Register a table schema
1243    pub fn register_table(&self, schema: TableSchema) -> Result<()> {
1244        if self.tables.contains_key(&schema.name) {
1245            return Err(SochDBError::InvalidArgument(format!(
1246                "Table '{}' already exists",
1247                schema.name
1248            )));
1249        }
1250        // Cache the packed schema for fast inserts
1251        let packed_schema = Self::to_packed_schema(&schema);
1252        self.packed_schemas
1253            .insert(schema.name.clone(), packed_schema);
1254        self.tables.insert(schema.name.clone(), schema);
1255        Ok(())
1256    }
1257
1258    /// Get table schema
1259    pub fn get_table_schema(&self, name: &str) -> Option<TableSchema> {
1260        self.tables.get(name).map(|s| s.clone())
1261    }
1262
1263    /// List all tables
1264    pub fn list_tables(&self) -> Vec<String> {
1265        self.tables.iter().map(|e| e.key().clone()).collect()
1266    }
1267    /// Convert TableSchema to PackedTableSchema for efficient storage
1268    fn to_packed_schema(schema: &TableSchema) -> PackedTableSchema {
1269        let columns = schema
1270            .columns
1271            .iter()
1272            .map(|col| PackedColumnDef {
1273                name: col.name.clone(),
1274                col_type: match col.col_type {
1275                    ColumnType::Int64 => PackedColumnType::Int64,
1276                    ColumnType::UInt64 => PackedColumnType::UInt64,
1277                    ColumnType::Float64 => PackedColumnType::Float64,
1278                    ColumnType::Text => PackedColumnType::Text,
1279                    ColumnType::Binary => PackedColumnType::Binary,
1280                    ColumnType::Bool => PackedColumnType::Bool,
1281                },
1282                nullable: col.nullable,
1283            })
1284            .collect();
1285
1286        PackedTableSchema::new(&schema.name, columns)
1287    }
1288
1289    /// Insert a row into a table
1290    ///
1291    /// Uses packed row format: stores entire row as single key-value pair.
1292    /// This reduces write amplification from 4× to 1× for a 4-column table.
1293    ///
1294    /// # Performance
1295    /// - Before: 4 columns × (WAL entry + MVCC version) = 4 writes
1296    /// - After: 1 packed row = 1 write
1297    /// - Improvement: ~4× fewer WAL entries, ~48% less I/O overhead
1298    pub fn insert_row(
1299        &self,
1300        txn: TxnHandle,
1301        table: &str,
1302        row_id: u64,
1303        values: &HashMap<String, SochValue>,
1304    ) -> Result<()> {
1305        // Use cached packed schema - single DashMap lookup, no clone
1306        let packed_schema = self
1307            .packed_schemas
1308            .get(table)
1309            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1310
1311        // Pack the row using cached schema
1312        let packed_row = PackedRow::pack(&packed_schema, values);
1313
1314        // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1315        let key = KeyBuffer::format_row_key(table, row_id);
1316
1317        self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1318
1319        Ok(())
1320    }
1321
1322    /// Read a row from a table
1323    ///
1324    /// Reads packed row and extracts requested columns in O(k) time.
1325    /// Column projection happens in memory, not storage - all columns are fetched.
1326    pub fn read_row(
1327        &self,
1328        txn: TxnHandle,
1329        table: &str,
1330        row_id: u64,
1331        columns: Option<&[&str]>,
1332    ) -> Result<Option<HashMap<String, SochValue>>> {
1333        let schema = self
1334            .tables
1335            .get(table)
1336            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1337
1338        // Read the packed row with a single key lookup using KeyBuffer
1339        let key = KeyBuffer::format_row_key(table, row_id);
1340        let bytes = match self.get(txn, key.as_bytes())? {
1341            Some(b) => b,
1342            None => return Ok(None),
1343        };
1344
1345        // Use cached packed schema
1346        let packed_schema = self
1347            .packed_schemas
1348            .get(table)
1349            .ok_or_else(|| SochDBError::Internal("Packed schema not found".into()))?;
1350        let packed_row = PackedRow::from_bytes(bytes, packed_schema.num_columns())?;
1351
1352        // Determine which columns to return
1353        let cols_to_read: Vec<&str> = match columns {
1354            Some(c) => c.to_vec(),
1355            None => schema.columns.iter().map(|c| c.name.as_str()).collect(),
1356        };
1357
1358        let mut row = HashMap::new();
1359        for col_name in cols_to_read {
1360            if let Some(idx) = packed_schema.column_index(col_name)
1361                && let Some(col_def) = packed_schema.column(idx)
1362                && let Some(value) = packed_row.get_column(idx, col_def.col_type)
1363            {
1364                row.insert(col_name.to_string(), value);
1365            }
1366        }
1367
1368        Ok(Some(row))
1369    }
1370
1371    /// Insert multiple rows efficiently in a batch
1372    ///
1373    /// This method accumulates all rows and writes them with fewer WAL syncs.
1374    /// Ideal for bulk loading scenarios.
1375    ///
1376    /// # Performance
1377    /// - Uses group commit to batch fsync operations
1378    /// - Expected throughput: 500K-1M rows/sec depending on row size
1379    pub fn insert_rows_batch(
1380        &self,
1381        txn: TxnHandle,
1382        table: &str,
1383        rows: &[(u64, HashMap<String, SochValue>)],
1384    ) -> Result<usize> {
1385        // Use cached packed schema
1386        let packed_schema = self
1387            .packed_schemas
1388            .get(table)
1389            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1390
1391        let mut count = 0;
1392
1393        for (row_id, values) in rows {
1394            // Pack and write using KeyBuffer for efficient key construction
1395            let packed_row = PackedRow::pack(&packed_schema, values);
1396            let key = KeyBuffer::format_row_key(table, *row_id);
1397            self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1398            count += 1;
1399        }
1400
1401        Ok(count)
1402    }
1403
1404    /// Ultra-fast raw put - bypasses all validation
1405    ///
1406    /// Use when you've already validated the data and just need speed.
1407    /// This is ~10× faster than insert_row() for bulk inserts.
1408    #[inline]
1409    pub fn put_raw(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1410        self.storage.write_refs(txn.txn_id, key, value)
1411    }
1412
1413    /// Zero-allocation insert - fastest path for bulk inserts
1414    ///
1415    /// Takes values as a slice in schema column order, avoiding HashMap overhead.
1416    ///
1417    /// # Arguments
1418    /// * `txn` - Transaction handle
1419    /// * `table` - Table name
1420    /// * `row_id` - Row identifier
1421    /// * `values` - Values in schema column order (None = NULL)
1422    ///
1423    /// # Performance
1424    /// - Eliminates ~6 allocations per row vs insert_row()
1425    /// - Expected: 1.2M-1.5M inserts/sec
1426    ///
1427    /// # Example
1428    /// ```ignore
1429    /// let values: &[Option<&SochValue>] = &[
1430    ///     Some(&SochValue::Int(1)),
1431    ///     Some(&SochValue::Text("Alice".into())),
1432    ///     None, // NULL
1433    /// ];
1434    /// db.insert_row_slice(txn, "users", 1, values)?;
1435    /// ```
1436    #[inline]
1437    pub fn insert_row_slice(
1438        &self,
1439        txn: TxnHandle,
1440        table: &str,
1441        row_id: u64,
1442        values: &[Option<&SochValue>],
1443    ) -> Result<()> {
1444        // Use cached packed schema - single DashMap lookup
1445        let packed_schema = self
1446            .packed_schemas
1447            .get(table)
1448            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1449
1450        // Validate column count matches
1451        if values.len() != packed_schema.num_columns() {
1452            return Err(SochDBError::InvalidArgument(format!(
1453                "Expected {} columns, got {}",
1454                packed_schema.num_columns(),
1455                values.len()
1456            )));
1457        }
1458
1459        // Pack using zero-allocation path
1460        let packed_row = PackedRow::pack_slice(&packed_schema, values);
1461
1462        // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1463        let key = KeyBuffer::format_row_key(table, row_id);
1464
1465        self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1466        Ok(())
1467    }
1468
1469    // =========================================================================
1470    // Maintenance
1471    // =========================================================================
1472
1473    /// Force fsync to disk
1474    pub fn fsync(&self) -> Result<()> {
1475        self.storage.fsync()
1476    }
1477
1478    /// Create a checkpoint
1479    pub fn checkpoint(&self) -> Result<u64> {
1480        self.storage.checkpoint()
1481    }
1482
1483    /// Run garbage collection
1484    pub fn gc(&self) -> usize {
1485        self.storage.gc()
1486    }
1487
1488    /// Get database statistics
1489    pub fn stats(&self) -> Stats {
1490        Stats {
1491            transactions_started: self.stats.transactions_started.load(Ordering::Relaxed),
1492            transactions_committed: self.stats.transactions_committed.load(Ordering::Relaxed),
1493            transactions_aborted: self.stats.transactions_aborted.load(Ordering::Relaxed),
1494            queries_executed: self.stats.queries_executed.load(Ordering::Relaxed),
1495            bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
1496            bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
1497        }
1498    }
1499
1500    /// Shutdown the database gracefully
1501    pub fn shutdown(&self) -> Result<()> {
1502        if self.shutdown.swap(1, Ordering::SeqCst) == 1 {
1503            return Ok(()); // Already shutting down
1504        }
1505
1506        // Flush any pending writes
1507        self.fsync()?;
1508
1509        // Create clean shutdown marker
1510        let marker = self.path.join(".clean_shutdown");
1511        std::fs::write(&marker, b"ok")?;
1512
1513        Ok(())
1514    }
1515}
1516
1517impl Drop for Database {
1518    fn drop(&mut self) {
1519        // Try graceful shutdown if not already done
1520        if self.shutdown.load(Ordering::SeqCst) == 0 {
1521            let _ = self.fsync();
1522            let marker = self.path.join(".clean_shutdown");
1523            let _ = std::fs::write(&marker, b"ok");
1524        }
1525    }
1526}
1527
1528/// Query builder for fluent query construction
1529pub struct QueryBuilder<'a> {
1530    db: &'a Database,
1531    txn: TxnHandle,
1532    path_prefix: String,
1533    columns: Option<Vec<String>>,
1534    limit: Option<usize>,
1535    offset: Option<usize>,
1536}
1537
1538impl<'a> QueryBuilder<'a> {
1539    fn new(db: &'a Database, txn: TxnHandle, path_prefix: String) -> Self {
1540        Self {
1541            db,
1542            txn,
1543            path_prefix,
1544            columns: None,
1545            limit: None,
1546            offset: None,
1547        }
1548    }
1549
1550    /// Select specific columns (for I/O reduction)
1551    pub fn columns(mut self, cols: &[&str]) -> Self {
1552        self.columns = Some(cols.iter().map(|s| s.to_string()).collect());
1553        self
1554    }
1555
1556    /// Limit results
1557    pub fn limit(mut self, n: usize) -> Self {
1558        self.limit = Some(n);
1559        self
1560    }
1561
1562    /// Skip results
1563    pub fn offset(mut self, n: usize) -> Self {
1564        self.offset = Some(n);
1565        self
1566    }
1567
1568    /// Execute the query
1569    ///
1570    /// Scans packed rows and unpacks them. Each key is "table/row_id" pointing to a packed row.
1571    pub fn execute(self) -> Result<QueryResult> {
1572        self.db
1573            .stats
1574            .queries_executed
1575            .fetch_add(1, Ordering::Relaxed);
1576
1577        // Get schema for the table if we're querying a table
1578        let table_name = self
1579            .path_prefix
1580            .split('/')
1581            .next()
1582            .unwrap_or(&self.path_prefix);
1583        let schema = self.db.tables.get(table_name).map(|s| s.clone());
1584
1585        // Scan the path prefix
1586        let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1587
1588        let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
1589        let mut bytes_read = 0usize;
1590
1591        if let Some(ref schema) = schema {
1592            // We have a table schema - use cached packed schema
1593            let packed_schema = self
1594                .db
1595                .packed_schemas
1596                .get(table_name)
1597                .map(|ps| ps.clone())
1598                .unwrap_or_else(|| Database::to_packed_schema(schema));
1599
1600            for (path, value_bytes) in results {
1601                // Parse path: table/row_id
1602                let parts: Vec<&str> = path.split('/').collect();
1603                if parts.len() == 2 {
1604                    // This is a packed row
1605                    bytes_read += value_bytes.len();
1606
1607                    if let Ok(packed_row) =
1608                        PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
1609                    {
1610                        // Unpack all columns or just requested columns
1611                        let mut row = HashMap::new();
1612
1613                        if let Some(ref cols) = self.columns {
1614                            // Only extract requested columns
1615                            for col_name in cols {
1616                                if let Some(idx) = packed_schema.column_index(col_name)
1617                                    && let Some(col_def) = packed_schema.column(idx)
1618                                    && let Some(value) =
1619                                        packed_row.get_column(idx, col_def.col_type)
1620                                {
1621                                    row.insert(col_name.clone(), value);
1622                                }
1623                            }
1624                        } else {
1625                            // Extract all columns
1626                            row = packed_row.unpack(&packed_schema);
1627                        }
1628
1629                        if !row.is_empty() {
1630                            rows.push(row);
1631                        }
1632                    }
1633                }
1634            }
1635        } else {
1636            // Fallback: no schema, try legacy column-per-key format
1637            let mut rows_map: HashMap<String, HashMap<String, SochValue>> = HashMap::new();
1638
1639            for (path, value_bytes) in results {
1640                let parts: Vec<&str> = path.split('/').collect();
1641                if parts.len() >= 3 {
1642                    let row_key = format!("{}/{}", parts[0], parts[1]);
1643                    let col_name = parts[2..].join("/");
1644
1645                    if let Some(ref cols) = self.columns
1646                        && !cols.contains(&col_name)
1647                    {
1648                        continue;
1649                    }
1650
1651                    bytes_read += value_bytes.len();
1652                    let row = rows_map.entry(row_key).or_default();
1653                    row.insert(col_name, deserialize_value(&value_bytes));
1654                }
1655            }
1656
1657            rows = rows_map.into_values().collect();
1658        }
1659
1660        // Apply offset
1661        if let Some(offset) = self.offset {
1662            rows = rows.into_iter().skip(offset).collect();
1663        }
1664
1665        // Apply limit
1666        if let Some(limit) = self.limit {
1667            rows.truncate(limit);
1668        }
1669
1670        // Collect column names
1671        let columns: Vec<String> = self.columns.unwrap_or_else(|| {
1672            rows.iter()
1673                .flat_map(|r| r.keys().cloned())
1674                .collect::<std::collections::HashSet<_>>()
1675                .into_iter()
1676                .collect()
1677        });
1678
1679        Ok(QueryResult {
1680            columns,
1681            rows_scanned: rows.len(),
1682            bytes_read,
1683            rows,
1684        })
1685    }
1686
1687    /// Execute and return TOON format (for LLM efficiency)
1688    pub fn to_toon(self) -> Result<String> {
1689        let result = self.execute()?;
1690        Ok(result.to_toon())
1691    }
1692
1693    /// Execute with lazy iteration - avoids materializing all rows
1694    ///
1695    /// Returns an iterator over rows as `Vec<SochValue>` in schema column order.
1696    /// This is more memory-efficient than `execute()` for large result sets.
1697    ///
1698    /// # Performance
1699    /// - No upfront materialization of all rows
1700    /// - ~40% less memory for large result sets
1701    /// - Ideal for streaming to network or aggregations
1702    ///
1703    /// # Example
1704    /// ```ignore
1705    /// for row_result in db.query(txn, "users").execute_iter()? {
1706    ///     let row = row_result?;
1707    ///     // row is Vec<SochValue> in column order
1708    /// }
1709    /// ```
1710    pub fn execute_iter(self) -> Result<QueryRowIterator> {
1711        self.db
1712            .stats
1713            .queries_executed
1714            .fetch_add(1, Ordering::Relaxed);
1715
1716        let table_name = self
1717            .path_prefix
1718            .split('/')
1719            .next()
1720            .unwrap_or(&self.path_prefix)
1721            .to_string();
1722
1723        // Get packed schema (clone needed for iterator ownership)
1724        let packed_schema = self.db.packed_schemas.get(&table_name).map(|ps| ps.clone());
1725
1726        // Scan the path prefix
1727        let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1728
1729        Ok(QueryRowIterator {
1730            results: results.into_iter(),
1731            packed_schema,
1732            columns: self.columns,
1733            offset: self.offset.unwrap_or(0),
1734            limit: self.limit,
1735            yielded: 0,
1736            skipped: 0,
1737        })
1738    }
1739
1740    /// Execute and return columnar (SIMD-friendly) result format
1741    ///
1742    /// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, returns
1743    /// column-oriented `Vec<TypedColumn>` for vectorized operations.
1744    ///
1745    /// ## Performance Benefits
1746    ///
1747    /// - SIMD: Aggregate operations (sum, avg) use vectorized instructions
1748    /// - Cache: Sequential access maximizes L1/L2 hits
1749    /// - Memory: ~30% less overhead than row-based format
1750    /// - Analytics: Ideal for ML preprocessing and statistics
1751    ///
1752    /// ## Example
1753    ///
1754    /// ```ignore
1755    /// let result = db.query(txn, "users")
1756    ///     .columns(&["id", "score"])
1757    ///     .as_columnar()?;
1758    ///
1759    /// // SIMD-optimized sum
1760    /// let total = result.sum_i64("score").unwrap_or(0);
1761    ///
1762    /// // Direct column access
1763    /// if let Some(scores) = result.column("score") {
1764    ///     for i in 0..scores.len() {
1765    ///         if let Some(v) = scores.get_i64(i) {
1766    ///             println!("Score: {}", v);
1767    ///         }
1768    ///     }
1769    /// }
1770    /// ```
1771    pub fn as_columnar(self) -> Result<ColumnarQueryResult> {
1772        self.db
1773            .stats
1774            .queries_executed
1775            .fetch_add(1, Ordering::Relaxed);
1776
1777        let table_name = self
1778            .path_prefix
1779            .split('/')
1780            .next()
1781            .unwrap_or(&self.path_prefix);
1782        let schema = self.db.tables.get(table_name).map(|s| s.clone());
1783
1784        // Get packed schema
1785        let packed_schema = match self.db.packed_schemas.get(table_name) {
1786            Some(ps) => ps.clone(),
1787            None => return Ok(ColumnarQueryResult::empty()),
1788        };
1789
1790        // Determine columns to fetch
1791        let column_names: Vec<String> = self.columns.clone().unwrap_or_else(|| {
1792            schema
1793                .as_ref()
1794                .map(|s| s.columns.iter().map(|c| c.name.clone()).collect())
1795                .unwrap_or_default()
1796        });
1797
1798        if column_names.is_empty() {
1799            return Ok(ColumnarQueryResult::empty());
1800        }
1801
1802        // Initialize TypedColumns based on schema types
1803        let mut columns: Vec<CoreTypedColumn> = column_names
1804            .iter()
1805            .map(|col_name| {
1806                packed_schema
1807                    .column_index(col_name)
1808                    .and_then(|idx| packed_schema.column(idx))
1809                    .map(|col_def| match col_def.col_type {
1810                        PackedColumnType::Int64 => CoreTypedColumn::new_int64(),
1811                        PackedColumnType::UInt64 => CoreTypedColumn::new_uint64(),
1812                        PackedColumnType::Float64 => CoreTypedColumn::new_float64(),
1813                        PackedColumnType::Text => CoreTypedColumn::new_text(),
1814                        PackedColumnType::Binary => CoreTypedColumn::new_binary(),
1815                        PackedColumnType::Bool => CoreTypedColumn::new_bool(),
1816                        PackedColumnType::Null => CoreTypedColumn::new_text(), // Null column = fallback to text
1817                    })
1818                    .unwrap_or_else(CoreTypedColumn::new_text) // fallback
1819            })
1820            .collect();
1821
1822        // Scan the path prefix
1823        let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1824
1825        let mut row_count = 0;
1826        let mut bytes_read = 0;
1827        let mut skipped = 0;
1828
1829        for (path, value_bytes) in results {
1830            // Parse path: table/row_id
1831            let parts: Vec<&str> = path.split('/').collect();
1832            if parts.len() != 2 {
1833                continue;
1834            }
1835
1836            // Apply offset
1837            if let Some(offset) = self.offset
1838                && skipped < offset
1839            {
1840                skipped += 1;
1841                continue;
1842            }
1843
1844            // Apply limit
1845            if let Some(limit) = self.limit
1846                && row_count >= limit
1847            {
1848                break;
1849            }
1850
1851            bytes_read += value_bytes.len();
1852
1853            if let Ok(packed_row) = PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
1854            {
1855                // Extract each column and push to corresponding TypedColumn
1856                for (col_idx, col_name) in column_names.iter().enumerate() {
1857                    if let Some(schema_idx) = packed_schema.column_index(col_name) {
1858                        if let Some(col_def) = packed_schema.column(schema_idx) {
1859                            let value = packed_row.get_column(schema_idx, col_def.col_type);
1860                            push_value_to_typed_column(&mut columns[col_idx], value);
1861                        } else {
1862                            push_null_to_typed_column(&mut columns[col_idx]);
1863                        }
1864                    } else {
1865                        push_null_to_typed_column(&mut columns[col_idx]);
1866                    }
1867                }
1868                row_count += 1;
1869            }
1870        }
1871
1872        Ok(ColumnarQueryResult {
1873            columns: column_names,
1874            data: columns,
1875            row_count,
1876            bytes_read,
1877        })
1878    }
1879}
1880
1881/// Lazy iterator over query results
1882///
1883/// Unpacks rows on-demand, avoiding upfront materialization.
1884pub struct QueryRowIterator {
1885    results: std::vec::IntoIter<(String, Vec<u8>)>,
1886    packed_schema: Option<PackedTableSchema>,
1887    columns: Option<Vec<String>>,
1888    offset: usize,
1889    limit: Option<usize>,
1890    yielded: usize,
1891    skipped: usize,
1892}
1893
1894impl Iterator for QueryRowIterator {
1895    type Item = Result<Vec<SochValue>>;
1896
1897    fn next(&mut self) -> Option<Self::Item> {
1898        // Check limit
1899        if let Some(limit) = self.limit
1900            && self.yielded >= limit
1901        {
1902            return None;
1903        }
1904
1905        loop {
1906            let (path, value_bytes) = self.results.next()?;
1907
1908            // Parse path: table/row_id
1909            let parts: Vec<&str> = path.split('/').collect();
1910            if parts.len() != 2 {
1911                continue; // Skip non-row entries
1912            }
1913
1914            // Apply offset
1915            if self.skipped < self.offset {
1916                self.skipped += 1;
1917                continue;
1918            }
1919
1920            if let Some(ref schema) = self.packed_schema {
1921                match PackedRow::from_bytes(value_bytes, schema.num_columns()) {
1922                    Ok(packed_row) => {
1923                        let row = if let Some(ref cols) = self.columns {
1924                            // Project specific columns
1925                            cols.iter()
1926                                .map(|col_name| {
1927                                    schema
1928                                        .column_index(col_name)
1929                                        .and_then(|idx| schema.column(idx))
1930                                        .and_then(|col_def| {
1931                                            packed_row.get_column(
1932                                                schema.column_index(col_name).unwrap(),
1933                                                col_def.col_type,
1934                                            )
1935                                        })
1936                                        .unwrap_or(SochValue::Null)
1937                                })
1938                                .collect()
1939                        } else {
1940                            // All columns in order
1941                            packed_row.unpack_to_vec(schema)
1942                        };
1943
1944                        self.yielded += 1;
1945                        return Some(Ok(row));
1946                    }
1947                    Err(e) => return Some(Err(e)),
1948                }
1949            } else {
1950                // No schema - return raw bytes as binary
1951                self.yielded += 1;
1952                return Some(Ok(vec![SochValue::Binary(value_bytes)]));
1953            }
1954        }
1955    }
1956}
1957
1958// Helper functions for serialization (kept for backward compatibility with legacy data)
1959
1960#[allow(dead_code)]
1961fn serialize_value(value: &SochValue) -> Vec<u8> {
1962    // Simple serialization - in production use proper format
1963    match value {
1964        SochValue::Null => vec![0],
1965        SochValue::Int(i) => {
1966            let mut buf = vec![1];
1967            buf.extend_from_slice(&i.to_le_bytes());
1968            buf
1969        }
1970        SochValue::UInt(u) => {
1971            let mut buf = vec![2];
1972            buf.extend_from_slice(&u.to_le_bytes());
1973            buf
1974        }
1975        SochValue::Float(f) => {
1976            let mut buf = vec![3];
1977            buf.extend_from_slice(&f.to_le_bytes());
1978            buf
1979        }
1980        SochValue::Text(s) => {
1981            let mut buf = vec![4];
1982            buf.extend_from_slice(s.as_bytes());
1983            buf
1984        }
1985        SochValue::Bool(b) => vec![5, if *b { 1 } else { 0 }],
1986        SochValue::Binary(b) => {
1987            let mut buf = vec![6];
1988            buf.extend_from_slice(b);
1989            buf
1990        }
1991        _ => {
1992            // Fallback: serialize as text
1993            let s = format!("{:?}", value);
1994            let mut buf = vec![4];
1995            buf.extend_from_slice(s.as_bytes());
1996            buf
1997        }
1998    }
1999}
2000
2001fn deserialize_value(bytes: &[u8]) -> SochValue {
2002    if bytes.is_empty() {
2003        return SochValue::Null;
2004    }
2005
2006    match bytes[0] {
2007        0 => SochValue::Null,
2008        1 if bytes.len() >= 9 => {
2009            let i = i64::from_le_bytes(bytes[1..9].try_into().unwrap());
2010            SochValue::Int(i)
2011        }
2012        2 if bytes.len() >= 9 => {
2013            let u = u64::from_le_bytes(bytes[1..9].try_into().unwrap());
2014            SochValue::UInt(u)
2015        }
2016        3 if bytes.len() >= 9 => {
2017            let f = f64::from_le_bytes(bytes[1..9].try_into().unwrap());
2018            SochValue::Float(f)
2019        }
2020        4 => {
2021            let s = String::from_utf8_lossy(&bytes[1..]).to_string();
2022            SochValue::Text(s)
2023        }
2024        5 if bytes.len() >= 2 => SochValue::Bool(bytes[1] != 0),
2025        6 => SochValue::Binary(bytes[1..].to_vec()),
2026        _ => {
2027            // Treat as text
2028            let s = String::from_utf8_lossy(bytes).to_string();
2029            SochValue::Text(s)
2030        }
2031    }
2032}
2033
2034// ============================================================================
2035// Helper functions for columnar query result building
2036// ============================================================================
2037
2038/// Push a SochValue into a TypedColumn
2039fn push_value_to_typed_column(col: &mut CoreTypedColumn, value: Option<SochValue>) {
2040    match value {
2041        None => push_null_to_typed_column(col),
2042        Some(v) => match (col, v) {
2043            (
2044                CoreTypedColumn::Int64 {
2045                    values,
2046                    validity,
2047                    stats,
2048                },
2049                SochValue::Int(i),
2050            ) => {
2051                values.push(i);
2052                validity.push(true);
2053                stats.update_i64(i);
2054            }
2055            (
2056                CoreTypedColumn::Int64 {
2057                    values,
2058                    validity,
2059                    stats,
2060                },
2061                SochValue::UInt(u),
2062            ) => {
2063                values.push(u as i64);
2064                validity.push(true);
2065                stats.update_i64(u as i64);
2066            }
2067            (
2068                CoreTypedColumn::UInt64 {
2069                    values,
2070                    validity,
2071                    stats,
2072                },
2073                SochValue::UInt(u),
2074            ) => {
2075                values.push(u);
2076                validity.push(true);
2077                stats.update_i64(u as i64);
2078            }
2079            (
2080                CoreTypedColumn::UInt64 {
2081                    values,
2082                    validity,
2083                    stats,
2084                },
2085                SochValue::Int(i),
2086            ) => {
2087                values.push(i as u64);
2088                validity.push(true);
2089                stats.update_i64(i);
2090            }
2091            (
2092                CoreTypedColumn::Float64 {
2093                    values,
2094                    validity,
2095                    stats,
2096                },
2097                SochValue::Float(f),
2098            ) => {
2099                values.push(f);
2100                validity.push(true);
2101                stats.update_f64(f);
2102            }
2103            (
2104                CoreTypedColumn::Float64 {
2105                    values,
2106                    validity,
2107                    stats,
2108                },
2109                SochValue::Int(i),
2110            ) => {
2111                values.push(i as f64);
2112                validity.push(true);
2113                stats.update_f64(i as f64);
2114            }
2115            (
2116                CoreTypedColumn::Text {
2117                    offsets,
2118                    data,
2119                    validity,
2120                    stats,
2121                },
2122                SochValue::Text(s),
2123            ) => {
2124                data.extend_from_slice(s.as_bytes());
2125                offsets.push(data.len() as u32);
2126                validity.push(true);
2127                stats.row_count += 1;
2128            }
2129            (
2130                CoreTypedColumn::Binary {
2131                    offsets,
2132                    data,
2133                    validity,
2134                    stats,
2135                },
2136                SochValue::Binary(b),
2137            ) => {
2138                data.extend_from_slice(&b);
2139                offsets.push(data.len() as u32);
2140                validity.push(true);
2141                stats.row_count += 1;
2142            }
2143            (
2144                CoreTypedColumn::Bool {
2145                    values,
2146                    validity,
2147                    stats,
2148                    len,
2149                },
2150                SochValue::Bool(b),
2151            ) => {
2152                let idx = *len;
2153                *len += 1;
2154                let num_words = (*len).div_ceil(64);
2155                while values.len() < num_words {
2156                    values.push(0);
2157                }
2158                if b {
2159                    let word = idx / 64;
2160                    let bit = idx % 64;
2161                    values[word] |= 1 << bit;
2162                }
2163                validity.push(true);
2164                stats.row_count += 1;
2165            }
2166            // Type mismatch - push as null
2167            (col, _) => push_null_to_typed_column(col),
2168        },
2169    }
2170}
2171
2172/// Push a null value into a TypedColumn
2173fn push_null_to_typed_column(col: &mut CoreTypedColumn) {
2174    match col {
2175        CoreTypedColumn::Int64 {
2176            values,
2177            validity,
2178            stats,
2179        } => {
2180            values.push(0);
2181            validity.push(false);
2182            stats.update_null();
2183        }
2184        CoreTypedColumn::UInt64 {
2185            values,
2186            validity,
2187            stats,
2188        } => {
2189            values.push(0);
2190            validity.push(false);
2191            stats.update_null();
2192        }
2193        CoreTypedColumn::Float64 {
2194            values,
2195            validity,
2196            stats,
2197        } => {
2198            values.push(0.0);
2199            validity.push(false);
2200            stats.update_null();
2201        }
2202        CoreTypedColumn::Text {
2203            offsets,
2204            data: _,
2205            validity,
2206            stats,
2207        } => {
2208            offsets.push(offsets.last().copied().unwrap_or(0));
2209            validity.push(false);
2210            stats.update_null();
2211        }
2212        CoreTypedColumn::Binary {
2213            offsets,
2214            data: _,
2215            validity,
2216            stats,
2217        } => {
2218            offsets.push(offsets.last().copied().unwrap_or(0));
2219            validity.push(false);
2220            stats.update_null();
2221        }
2222        CoreTypedColumn::Bool {
2223            values,
2224            validity,
2225            stats,
2226            len,
2227        } => {
2228            *len += 1;
2229            let num_words = (*len).div_ceil(64);
2230            while values.len() < num_words {
2231                values.push(0);
2232            }
2233            validity.push(false);
2234            stats.update_null();
2235        }
2236    }
2237}
2238
2239#[cfg(test)]
2240mod tests {
2241    use super::*;
2242    use tempfile::tempdir;
2243
2244    #[test]
2245    fn test_database_open_close() {
2246        let dir = tempdir().unwrap();
2247        let db = Database::open(dir.path()).unwrap();
2248
2249        // Should be able to begin a transaction
2250        let txn = db.begin_transaction().unwrap();
2251        assert!(txn.txn_id > 0);
2252
2253        db.abort(txn).unwrap();
2254        db.shutdown().unwrap();
2255    }
2256
2257    #[test]
2258    fn test_database_put_get() {
2259        let dir = tempdir().unwrap();
2260        let db = Database::open(dir.path()).unwrap();
2261
2262        let txn = db.begin_transaction().unwrap();
2263        db.put(txn, b"key1", b"value1").unwrap();
2264
2265        let val = db.get(txn, b"key1").unwrap();
2266        assert_eq!(val, Some(b"value1".to_vec()));
2267
2268        db.commit(txn).unwrap();
2269
2270        // New transaction should see committed data
2271        let txn2 = db.begin_transaction().unwrap();
2272        let val = db.get(txn2, b"key1").unwrap();
2273        assert_eq!(val, Some(b"value1".to_vec()));
2274        db.abort(txn2).unwrap();
2275    }
2276
2277    #[test]
2278    fn test_database_path_api() {
2279        let dir = tempdir().unwrap();
2280        let db = Database::open(dir.path()).unwrap();
2281
2282        let txn = db.begin_transaction().unwrap();
2283
2284        // Write using path API
2285        db.put_path(txn, "users/1/name", b"Alice").unwrap();
2286        db.put_path(txn, "users/1/email", b"alice@example.com")
2287            .unwrap();
2288        db.put_path(txn, "users/2/name", b"Bob").unwrap();
2289
2290        db.commit(txn).unwrap();
2291
2292        // Scan path prefix
2293        let txn2 = db.begin_transaction().unwrap();
2294        let results = db.scan_path(txn2, "users/1/").unwrap();
2295        assert_eq!(results.len(), 2);
2296
2297        db.abort(txn2).unwrap();
2298    }
2299
2300    #[test]
2301    fn test_database_table_api() {
2302        let dir = tempdir().unwrap();
2303        let db = Database::open(dir.path()).unwrap();
2304
2305        // Register table
2306        db.register_table(TableSchema {
2307            name: "users".to_string(),
2308            columns: vec![
2309                ColumnDef {
2310                    name: "name".to_string(),
2311                    col_type: ColumnType::Text,
2312                    nullable: false,
2313                },
2314                ColumnDef {
2315                    name: "age".to_string(),
2316                    col_type: ColumnType::Int64,
2317                    nullable: true,
2318                },
2319            ],
2320        })
2321        .unwrap();
2322
2323        // Insert row
2324        let txn = db.begin_transaction().unwrap();
2325        let mut values = HashMap::new();
2326        values.insert("name".to_string(), SochValue::Text("Alice".to_string()));
2327        values.insert("age".to_string(), SochValue::Int(30));
2328
2329        db.insert_row(txn, "users", 1, &values).unwrap();
2330        db.commit(txn).unwrap();
2331
2332        // Read row
2333        let txn2 = db.begin_transaction().unwrap();
2334        let row = db.read_row(txn2, "users", 1, None).unwrap();
2335        assert!(row.is_some());
2336
2337        let row = row.unwrap();
2338        assert_eq!(row.get("name"), Some(&SochValue::Text("Alice".to_string())));
2339
2340        db.abort(txn2).unwrap();
2341    }
2342
2343    #[test]
2344    fn test_database_query_builder() {
2345        let dir = tempdir().unwrap();
2346        let db = Database::open(dir.path()).unwrap();
2347
2348        // Insert test data
2349        let txn = db.begin_transaction().unwrap();
2350        db.put_path(txn, "docs/1/title", b"Hello").unwrap();
2351        db.put_path(txn, "docs/1/content", b"World").unwrap();
2352        db.put_path(txn, "docs/2/title", b"Foo").unwrap();
2353        db.put_path(txn, "docs/2/content", b"Bar").unwrap();
2354        db.commit(txn).unwrap();
2355
2356        // Query with limit
2357        let txn2 = db.begin_transaction().unwrap();
2358        let result = db.query(txn2, "docs/").limit(1).execute().unwrap();
2359
2360        assert_eq!(result.rows.len(), 1);
2361        db.abort(txn2).unwrap();
2362    }
2363
2364    #[test]
2365    fn test_database_crash_recovery() {
2366        let dir = tempdir().unwrap();
2367
2368        // Write and commit
2369        {
2370            let db = Database::open(dir.path()).unwrap();
2371            let txn = db.begin_transaction().unwrap();
2372            db.put(txn, b"persist", b"this").unwrap();
2373            db.commit(txn).unwrap();
2374            // Don't call shutdown - simulate crash
2375            std::mem::forget(db);
2376        }
2377
2378        // Reopen - should recover
2379        {
2380            let db = Database::open(dir.path()).unwrap();
2381            let txn = db.begin_transaction().unwrap();
2382            let val = db.get(txn, b"persist").unwrap();
2383            assert_eq!(val, Some(b"this".to_vec()));
2384            db.abort(txn).unwrap();
2385        }
2386    }
2387}