Skip to main content

sochdb_storage/
database.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SochDB Database Kernel
19//!
20//! The shared core that powers both embedded mode (`SochConnection::open`) and
21//! server mode (`sochdb-server`). This is the "SQLite engine" equivalent.
22//!
23//! ## Architecture
24//!
25//! ```text
26//! ┌──────────────────────────────────────────────────────────────────┐
27//! │                        Database Kernel                            │
28//! │  Arc<Database> - shared by all connections                       │
29//! ├──────────────────────────────────────────────────────────────────┤
30//! │                                                                   │
31//! │  ┌─────────────────┐   ┌─────────────────┐   ┌────────────────┐ │
32//! │  │  DurableStorage │   │     Catalog     │   │  Vector Index  │ │
33//! │  │  (WAL + MVCC)   │   │  (Schema Mgmt)  │   │  (HNSW/Vamana) │ │
34//! │  └────────┬────────┘   └────────┬────────┘   └───────┬────────┘ │
35//! │           │                     │                     │          │
36//! │           └─────────────────────┴─────────────────────┘          │
37//! │                                 │                                 │
38//! │  ┌─────────────────────────────────────────────────────────────┐ │
39//! │  │              Query Executor (Path-Native)                    │ │
40//! │  │  - Path resolution: O(|path|)                                │ │
41//! │  │  - Column projection: 80% I/O reduction                     │ │
42//! │  │  - Context selection: Token-aware chunking                  │ │
43//! │  └─────────────────────────────────────────────────────────────┘ │
44//! │                                                                   │
45//! └──────────────────────────────────────────────────────────────────┘
46//!
47//! Deployment Modes:
48//! ┌─────────────┐   ┌─────────────┐   ┌─────────────┐
49//! │  Embedded   │   │  IPC Server │   │  TCP Server │
50//! │  (in-proc)  │   │  (Unix sock)│   │  (remote)   │
51//! └──────┬──────┘   └──────┬──────┘   └──────┬──────┘
52//!        │                 │                 │
53//!        └─────────────────┴─────────────────┘
54//!                          │
55//!                   Arc<Database>
56//! ```
57//!
58//! ## Latency Model
59//!
60//! Let K = kernel processing cost for a query
61//!
62//! - Embedded: L_emb ≈ K (function call overhead negligible)
63//! - IPC: L_ipc ≈ K + δ_ipc (δ_ipc = ~10-50µs for Unix socket)
64//! - TCP: L_tcp ≈ K + δ_net (δ_net = 100µs-10ms depending on network)
65//!
66//! For LLM context queries where K >> δ_ipc, IPC is "nearly embedded".
67
68use std::collections::HashMap;
69use std::path::{Path, PathBuf};
70use std::sync::Arc;
71use std::sync::atomic::{AtomicU64, Ordering};
72
73use dashmap::DashMap;
74use parking_lot::RwLock;
75
76use crate::durable_storage::{DurableStorage, TransactionMode};
77use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
78use crate::key_buffer::KeyBuffer;
79use crate::packed_row::{PackedColumnDef, PackedColumnType, PackedRow, PackedTableSchema};
80use sochdb_core::catalog::Catalog;
81use sochdb_core::{Result, SochDBError, SochValue};
82
83// Re-export key types
84pub use crate::durable_storage::RecoveryStats;
85
86/// Database configuration
87#[derive(Debug, Clone)]
88pub struct DatabaseConfig {
89    /// Enable group commit for better write throughput
90    pub group_commit: bool,
91    /// Maximum memory for memtables before flush (bytes)
92    pub memtable_size_limit: usize,
93    /// Enable WAL for durability
94    pub wal_enabled: bool,
95    /// Sync mode: fsync after every commit vs periodic
96    pub sync_mode: SyncMode,
97    /// Read-only mode
98    pub read_only: bool,
99    
100    /// Enable ordered index for O(log N) prefix scans
101    ///
102    /// # Deprecation Notice
103    /// 
104    /// **DEPRECATED since 0.2.0**: Use `default_index_policy` instead for per-table control.
105    /// This field will be removed in v0.3.0.
106    ///
107    /// ## Migration Guide
108    ///
109    /// Replace:
110    /// ```ignore
111    /// DatabaseConfig { enable_ordered_index: true, .. }  // Old API
112    /// DatabaseConfig { enable_ordered_index: false, .. } // Old API
113    /// ```
114    ///
115    /// With:
116    /// ```ignore
117    /// DatabaseConfig { default_index_policy: IndexPolicy::ScanOptimized, .. }  // Ordered index enabled
118    /// DatabaseConfig { default_index_policy: IndexPolicy::WriteOptimized, .. } // Ordered index disabled
119    /// ```
120    ///
121    /// ## Behavior
122    ///
123    /// When false, saves ~134 ns/op on writes (20% speedup)
124    /// but scan_prefix becomes O(N) instead of O(log N + K).
125    /// 
126    /// Set to false for write-heavy workloads without range scans.
127    #[deprecated(
128        since = "0.2.0",
129        note = "Use `default_index_policy` field instead. This field will be removed in v0.3.0. \
130                Set IndexPolicy::ScanOptimized for ordered index, WriteOptimized to disable."
131    )]
132    ///
133    /// Set to false for write-heavy workloads without range scans.
134    pub enable_ordered_index: bool,
135    /// Group commit configuration
136    pub group_commit_config: GroupCommitSettings,
137    /// Default index policy for tables not explicitly configured
138    ///
139    /// This replaces the global `enable_ordered_index` toggle with
140    /// fine-grained per-table control. Use `index_registry` to configure
141    /// individual tables.
142    ///
143    /// | Policy         | Insert Cost | Scan Cost      | Use Case              |
144    /// |----------------|-------------|----------------|------------------------|
145    /// | WriteOptimized | O(1)        | O(N)           | High-write, rare scan  |
146    /// | Balanced       | O(1) amort  | O(output+logK) | Mixed OLTP            |
147    /// | ScanOptimized  | O(log N)    | O(logN + K)    | Analytics, range query |
148    /// | AppendOnly     | O(1)        | O(N)           | Time-series logs       |
149    pub default_index_policy: IndexPolicy,
150}
151
152/// Group commit settings - mirrors SQLite's WAL mode tuning
153///
154/// ## Performance Model
155///
156/// Without group commit: Throughput = 1 / L_fsync ≈ 200 commits/sec (L=5ms)
157/// With group commit (batch size K): Throughput = K / L_fsync = K × 200 commits/sec
158///
159/// For K=100: 20,000 commits/sec (100× speedup)
160///
161/// ## SQLite Comparison
162///
163/// | Setting                    | SQLite Equivalent           |
164/// |----------------------------|-----------------------------|
165/// | batch_size = 1             | PRAGMA synchronous = FULL   |
166/// | batch_size = 100           | WAL mode with batching      |
167/// | max_wait_us = 0            | No delay, immediate flush   |
168/// | max_wait_us = 10000        | Up to 10ms delay for batch  |
169#[derive(Debug, Clone)]
170pub struct GroupCommitSettings {
171    /// Minimum batch size before flush (default: 1)
172    pub min_batch_size: usize,
173    /// Maximum batch size (default: 1000)
174    pub max_batch_size: usize,
175    /// Maximum wait time before flush in microseconds (default: 10000 = 10ms)
176    pub max_wait_us: u64,
177    /// Expected fsync latency in microseconds (for adaptive sizing)
178    pub fsync_latency_us: u64,
179}
180
181impl Default for GroupCommitSettings {
182    fn default() -> Self {
183        Self {
184            min_batch_size: 1,
185            max_batch_size: 1000,
186            max_wait_us: 10_000,     // 10ms
187            fsync_latency_us: 5_000, // 5ms
188        }
189    }
190}
191
192impl GroupCommitSettings {
193    /// High throughput preset - maximizes batching
194    pub fn high_throughput() -> Self {
195        Self {
196            min_batch_size: 50,
197            max_batch_size: 5000,
198            max_wait_us: 50_000, // 50ms
199            fsync_latency_us: 5_000,
200        }
201    }
202
203    /// Low latency preset - minimal batching
204    pub fn low_latency() -> Self {
205        Self {
206            min_batch_size: 1,
207            max_batch_size: 10,
208            max_wait_us: 1_000, // 1ms
209            fsync_latency_us: 5_000,
210        }
211    }
212
213    /// Calculate optimal batch size using Little's Law
214    ///
215    /// N* = sqrt(2 × L_fsync × λ / C_wait)
216    ///
217    /// # Arguments
218    /// * `arrival_rate` - Operations per second
219    /// * `wait_cost` - Cost coefficient for waiting (0.0-1.0)
220    pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
221        let l_fsync = self.fsync_latency_us as f64 / 1_000_000.0;
222        let n_star = (2.0 * l_fsync * arrival_rate / wait_cost.max(0.001)).sqrt();
223        (n_star as usize).clamp(self.min_batch_size, self.max_batch_size)
224    }
225}
226
227impl Default for DatabaseConfig {
228    #[allow(deprecated)]
229    fn default() -> Self {
230        Self {
231            group_commit: true,
232            memtable_size_limit: 64 * 1024 * 1024, // 64MB
233            wal_enabled: true,
234            sync_mode: SyncMode::Normal,
235            read_only: false,
236            enable_ordered_index: true, // Default: enabled for compatibility
237            group_commit_config: GroupCommitSettings::default(),
238            default_index_policy: IndexPolicy::Balanced, // New default: balanced OLTP policy
239        }
240    }
241}
242
243impl DatabaseConfig {
244    /// Create config optimized for throughput (Fast Mode)
245    ///
246    /// - Disables ordered index (saves ~134 ns/op)
247    /// - Uses high-throughput group commit settings
248    /// - Suitable for append-only workloads
249    #[allow(deprecated)]
250    pub fn throughput_optimized() -> Self {
251        Self {
252            group_commit: true,
253            memtable_size_limit: 128 * 1024 * 1024, // 128MB
254            wal_enabled: true,
255            sync_mode: SyncMode::Normal,
256            read_only: false,
257            enable_ordered_index: false,
258            group_commit_config: GroupCommitSettings::high_throughput(),
259            default_index_policy: IndexPolicy::WriteOptimized, // No ordered index overhead
260        }
261    }
262
263    /// Create config optimized for latency
264    ///
265    /// - Keeps ordered index for fast range scans
266    /// - Uses low-latency group commit settings
267    /// - Suitable for OLTP workloads
268    #[allow(deprecated)]
269    pub fn latency_optimized() -> Self {
270        Self {
271            group_commit: true,
272            memtable_size_limit: 32 * 1024 * 1024, // 32MB
273            wal_enabled: true,
274            sync_mode: SyncMode::Full,
275            read_only: false,
276            enable_ordered_index: true,
277            group_commit_config: GroupCommitSettings::low_latency(),
278            default_index_policy: IndexPolicy::ScanOptimized, // Fast range scans
279        }
280    }
281
282    /// Create config matching SQLite defaults
283    #[allow(deprecated)]
284    pub fn sqlite_compatible() -> Self {
285        Self {
286            group_commit: false, // SQLite default is single-commit
287            memtable_size_limit: 64 * 1024 * 1024,
288            wal_enabled: true,
289            sync_mode: SyncMode::Normal, // PRAGMA synchronous = NORMAL
290            read_only: false,
291            enable_ordered_index: true,
292            group_commit_config: GroupCommitSettings::default(),
293            default_index_policy: IndexPolicy::Balanced, // Good default for mixed workloads
294        }
295    }
296
297    /// Get effective ordered index setting, derived from `default_index_policy`.
298    /// 
299    /// This is the shim method for the deprecated `enable_ordered_index` field.
300    /// It returns `true` if the policy requires an ordered index (ScanOptimized),
301    /// and `false` otherwise (WriteOptimized, Balanced, AppendOnly).
302    ///
303    /// # Policy Mapping
304    ///
305    /// | IndexPolicy      | Returns |
306    /// |------------------|---------|
307    /// | ScanOptimized    | true    |
308    /// | Balanced         | false   |
309    /// | WriteOptimized   | false   |
310    /// | AppendOnly       | false   |
311    ///
312    /// Note: `Balanced` uses lazy compaction rather than a live ordered index,
313    /// so it returns `false` for the low-level memtable config but still supports
314    /// efficient range scans via sorted runs.
315    pub fn effective_ordered_index(&self) -> bool {
316        matches!(self.default_index_policy, IndexPolicy::ScanOptimized)
317    }
318}
319
320/// WAL sync mode - matches SQLite's PRAGMA synchronous semantics
321///
322/// | SochDB     | SQLite       | Description                                    |
323/// |------------|--------------|------------------------------------------------|
324/// | Off        | OFF (0)      | No fsync, risk of data loss on crash           |
325/// | Normal     | NORMAL (1)   | Fsync at checkpoints, not every commit         |
326/// | Full       | FULL (2)     | Fsync every commit (safest, slowest)           |
327///
328/// # Performance vs Durability Trade-offs
329///
330/// - **Off**: ~10x faster than Full, but may lose last ~100ms of data on crash
331/// - **Normal**: ~5x faster than Full, durable at checkpoint boundaries
332/// - **Full**: Every commit is fsync'd, no data loss possible
333///
334/// # SQLite Compatibility
335///
336/// ```sql
337/// -- SQLite equivalent settings
338/// PRAGMA synchronous = OFF;    -- SyncMode::Off
339/// PRAGMA synchronous = NORMAL; -- SyncMode::Normal  
340/// PRAGMA synchronous = FULL;   -- SyncMode::Full
341/// ```
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum SyncMode {
344    /// No fsync (equivalent to SQLite PRAGMA synchronous = OFF)
345    ///
346    /// Writes buffered in OS, may lose data on power failure.
347    /// Use for non-critical data or bulk loading.
348    Off = 0,
349
350    /// Fsync at checkpoints (equivalent to SQLite PRAGMA synchronous = NORMAL)
351    ///
352    /// Default mode. Syncs WAL at checkpoint boundaries.
353    /// Good balance of performance and durability.
354    Normal = 1,
355
356    /// Fsync every commit (equivalent to SQLite PRAGMA synchronous = FULL)
357    ///
358    /// Safest mode. Every commit is immediately durable.
359    /// Required for financial/critical data.
360    Full = 2,
361}
362
363impl SyncMode {
364    /// Convert from SQLite synchronous pragma value
365    pub fn from_sqlite_pragma(value: u32) -> Self {
366        match value {
367            0 => SyncMode::Off,
368            1 => SyncMode::Normal,
369            _ => SyncMode::Full, // 2+ treated as Full
370        }
371    }
372
373    /// Convert to SQLite synchronous pragma value
374    pub fn to_sqlite_pragma(self) -> u32 {
375        self as u32
376    }
377
378    /// Parse from string (case-insensitive)
379    pub fn parse(s: &str) -> Option<Self> {
380        match s.to_ascii_uppercase().as_str() {
381            "OFF" | "0" => Some(SyncMode::Off),
382            "NORMAL" | "1" => Some(SyncMode::Normal),
383            "FULL" | "2" => Some(SyncMode::Full),
384            _ => None,
385        }
386    }
387}
388
389/// Table schema for the kernel
390#[derive(Debug, Clone)]
391pub struct TableSchema {
392    pub name: String,
393    pub columns: Vec<ColumnDef>,
394}
395
396/// Column definition
397#[derive(Debug, Clone)]
398pub struct ColumnDef {
399    pub name: String,
400    pub col_type: ColumnType,
401    pub nullable: bool,
402}
403
404/// Column types
405#[derive(Debug, Clone, Copy, PartialEq, Eq)]
406pub enum ColumnType {
407    Int64,
408    UInt64,
409    Float64,
410    Text,
411    Binary,
412    Bool,
413}
414
415/// Transaction handle for kernel operations
416#[derive(Debug, Clone, Copy)]
417pub struct TxnHandle {
418    pub txn_id: u64,
419    pub snapshot_ts: u64,
420}
421
422/// Query result from the kernel
423#[derive(Debug, Clone)]
424pub struct QueryResult {
425    /// Column names
426    pub columns: Vec<String>,
427    /// Row data (each row is a map of column -> value)
428    pub rows: Vec<HashMap<String, SochValue>>,
429    /// Number of rows scanned (for stats)
430    pub rows_scanned: usize,
431    /// Bytes read from storage
432    pub bytes_read: usize,
433}
434
435impl QueryResult {
436    /// Empty result
437    pub fn empty() -> Self {
438        Self {
439            columns: vec![],
440            rows: vec![],
441            rows_scanned: 0,
442            bytes_read: 0,
443        }
444    }
445
446    /// Convert to TOON format for token efficiency
447    pub fn to_toon(&self) -> String {
448        if self.rows.is_empty() {
449            return "[]".to_string();
450        }
451
452        // TOON format: table[N]{cols}: row1; row2; ...
453        let n = self.rows.len();
454        let cols = self.columns.join(",");
455
456        let rows_str: Vec<String> = self
457            .rows
458            .iter()
459            .map(|row| {
460                self.columns
461                    .iter()
462                    .map(|c| {
463                        row.get(c)
464                            .map(format_soch_value)
465                            .unwrap_or_else(|| "∅".to_string())
466                    })
467                    .collect::<Vec<_>>()
468                    .join(",")
469            })
470            .collect();
471
472        format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
473    }
474}
475
476fn format_soch_value(v: &SochValue) -> String {
477    match v {
478        SochValue::Null => "∅".to_string(),
479        SochValue::Int(i) => i.to_string(),
480        SochValue::UInt(u) => u.to_string(),
481        SochValue::Float(f) => format!("{:.6}", f),
482        SochValue::Text(s) => {
483            if s.contains(',') || s.contains(';') {
484                format!("\"{}\"", s.replace('"', "\\\""))
485            } else {
486                s.clone()
487            }
488        }
489        SochValue::Bool(b) => if *b { "T" } else { "F" }.to_string(),
490        SochValue::Binary(b) => format!("b64:{}", base64_encode(b)),
491        _ => format!("{:?}", v),
492    }
493}
494
495fn base64_encode(data: &[u8]) -> String {
496    // Simple base64 encoding
497    const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
498    let mut result = String::new();
499
500    for chunk in data.chunks(3) {
501        let b0 = chunk[0] as usize;
502        let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
503        let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
504
505        result.push(ALPHABET[b0 >> 2] as char);
506        result.push(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
507
508        if chunk.len() > 1 {
509            result.push(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
510        } else {
511            result.push('=');
512        }
513
514        if chunk.len() > 2 {
515            result.push(ALPHABET[b2 & 0x3f] as char);
516        } else {
517            result.push('=');
518        }
519    }
520
521    result
522}
523
524// ============================================================================
525// Columnar Query Results - SIMD-friendly result format
526// ============================================================================
527
528use sochdb_core::TypedColumn as CoreTypedColumn;
529
530/// Columnar query result - SIMD-friendly format for analytics
531///
532/// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, this returns
533/// column-oriented `Vec<TypedColumn>` for efficient vectorized operations.
534///
535/// ## Memory Layout
536///
537/// Row-oriented (standard):
538/// ```text
539/// Row 0: [id=1, name="Alice", score=85]
540/// Row 1: [id=2, name="Bob", score=92]
541/// Row 2: [id=3, name="Carol", score=78]
542/// ```
543///
544/// Column-oriented (this format):
545/// ```text
546/// id:    [1, 2, 3]           ← contiguous i64 array (SIMD-friendly)
547/// name:  ["Alice", "Bob", "Carol"] ← Arrow-style string encoding
548/// score: [85, 92, 78]        ← contiguous i64 array
549/// ```
550///
551/// ## Performance Benefits
552///
553/// - SIMD: Column sums use vectorized instructions (~8× faster)
554/// - Cache: Sequential access pattern maximizes L1/L2 cache hits
555/// - Compression: Same-type data compresses better (5-10× typical)
556/// - Filtering: Bitmap operations instead of row iteration
557///
558/// ## Usage
559///
560/// ```ignore
561/// let result = db.query(txn, "users")
562///     .columns(&["id", "score"])
563///     .as_columnar()?;
564///
565/// // SIMD sum
566/// let total_score = result.column("score")
567///     .map(|c| c.sum_i64())
568///     .unwrap_or(0);
569///
570/// // Stats
571/// println!("Rows: {}, Memory: {} bytes", result.row_count(), result.memory_size());
572/// ```
573#[derive(Debug, Clone)]
574pub struct ColumnarQueryResult {
575    /// Column names in order
576    pub columns: Vec<String>,
577    /// Column data - each TypedColumn contains all values for one column
578    pub data: Vec<CoreTypedColumn>,
579    /// Number of rows
580    pub row_count: usize,
581    /// Bytes read from storage
582    pub bytes_read: usize,
583}
584
585impl ColumnarQueryResult {
586    /// Create an empty result
587    pub fn empty() -> Self {
588        Self {
589            columns: vec![],
590            data: vec![],
591            row_count: 0,
592            bytes_read: 0,
593        }
594    }
595
596    /// Get column by name
597    pub fn column(&self, name: &str) -> Option<&CoreTypedColumn> {
598        self.columns
599            .iter()
600            .position(|c| c == name)
601            .and_then(|idx| self.data.get(idx))
602    }
603
604    /// Get column index by name
605    pub fn column_index(&self, name: &str) -> Option<usize> {
606        self.columns.iter().position(|c| c == name)
607    }
608
609    /// Number of rows
610    pub fn row_count(&self) -> usize {
611        self.row_count
612    }
613
614    /// Number of columns
615    pub fn column_count(&self) -> usize {
616        self.columns.len()
617    }
618
619    /// Total memory size in bytes
620    pub fn memory_size(&self) -> usize {
621        self.data.iter().map(|c| c.memory_size()).sum()
622    }
623
624    /// Sum of an i64 column (SIMD-optimized)
625    pub fn sum_i64(&self, column: &str) -> Option<i64> {
626        self.column(column).map(|c| c.sum_i64())
627    }
628
629    /// Sum of an f64 column (SIMD-optimized)
630    pub fn sum_f64(&self, column: &str) -> Option<f64> {
631        self.column(column).map(|c| c.sum_f64())
632    }
633
634    /// Get column statistics (min, max, null count)
635    pub fn column_stats(&self, column: &str) -> Option<&sochdb_core::columnar::ColumnStats> {
636        self.column(column).map(|c| c.stats())
637    }
638
639    /// Convert to TOON format (token-efficient)
640    pub fn to_toon(&self) -> String {
641        if self.row_count == 0 {
642            return "[]".to_string();
643        }
644
645        let n = self.row_count;
646        let cols = self.columns.join(",");
647
648        // Build rows from columns
649        let mut rows_str = Vec::with_capacity(n);
650        for i in 0..n {
651            let row: Vec<String> = self
652                .data
653                .iter()
654                .map(|col| format_columnar_value(col, i))
655                .collect();
656            rows_str.push(row.join(","));
657        }
658
659        format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
660    }
661}
662
663/// Format a single value from a TypedColumn at index
664fn format_columnar_value(col: &CoreTypedColumn, idx: usize) -> String {
665    match col {
666        CoreTypedColumn::Int64 {
667            values, validity, ..
668        } => {
669            if validity.is_valid(idx) && idx < values.len() {
670                values[idx].to_string()
671            } else {
672                "∅".to_string()
673            }
674        }
675        CoreTypedColumn::UInt64 {
676            values, validity, ..
677        } => {
678            if validity.is_valid(idx) && idx < values.len() {
679                values[idx].to_string()
680            } else {
681                "∅".to_string()
682            }
683        }
684        CoreTypedColumn::Float64 {
685            values, validity, ..
686        } => {
687            if validity.is_valid(idx) && idx < values.len() {
688                format!("{:.6}", values[idx])
689            } else {
690                "∅".to_string()
691            }
692        }
693        CoreTypedColumn::Text {
694            offsets,
695            data,
696            validity,
697            ..
698        } => {
699            if validity.is_valid(idx) && idx + 1 < offsets.len() {
700                let start = offsets[idx] as usize;
701                let end = offsets[idx + 1] as usize;
702                std::str::from_utf8(&data[start..end])
703                    .map(|s| {
704                        if s.contains(',') || s.contains(';') {
705                            format!("\"{}\"", s.replace('"', "\\\""))
706                        } else {
707                            s.to_string()
708                        }
709                    })
710                    .unwrap_or_else(|_| "∅".to_string())
711            } else {
712                "∅".to_string()
713            }
714        }
715        CoreTypedColumn::Binary {
716            offsets,
717            data,
718            validity,
719            ..
720        } => {
721            if validity.is_valid(idx) && idx + 1 < offsets.len() {
722                let start = offsets[idx] as usize;
723                let end = offsets[idx + 1] as usize;
724                format!("b64:{}", base64_encode(&data[start..end]))
725            } else {
726                "∅".to_string()
727            }
728        }
729        CoreTypedColumn::Bool {
730            values,
731            validity,
732            len,
733            ..
734        } => {
735            if validity.is_valid(idx) && idx < *len {
736                let word = idx / 64;
737                let bit = idx % 64;
738                if (values[word] >> bit) & 1 == 1 {
739                    "T"
740                } else {
741                    "F"
742                }
743                .to_string()
744            } else {
745                "∅".to_string()
746            }
747        }
748    }
749}
750
751/// Vector search result
752#[derive(Debug, Clone)]
753pub struct VectorSearchResult {
754    pub id: u64,
755    pub distance: f32,
756    pub metadata: Option<HashMap<String, SochValue>>,
757}
758
759/// The SochDB Database Kernel
760///
761/// This is the shared core used by both embedded (`SochConnection`) and
762/// server (`sochdb-server`) modes. It owns all storage, catalog, and
763/// indexing components.
764///
765/// # Thread Safety
766///
767/// The Database is fully thread-safe via internal synchronization:
768/// - Multiple readers can operate concurrently (MVCC snapshots)
769/// - Writers coordinate through WAL and group commit
770/// - All state is behind Arc/RwLock for shared access
771///
772/// # Concurrency Modes
773///
774/// ## Standard Mode (Single Process)
775/// - Uses exclusive file lock (`flock(LOCK_EX)`)
776/// - Best for: Scripts, notebooks, CLI tools
777/// - Open with: `Database::open(path)`
778///
779/// ## Concurrent Mode (Multi-Process/Web Apps)
780/// - Uses lock-free MVCC for reads, single-writer coordination for writes
781/// - Best for: Web servers, Flask/FastAPI apps, hot reloading
782/// - Open with: `Database::open_concurrent(path)`
783///
784/// # Example
785///
786/// ```ignore
787/// // Standard mode (single process)
788/// let db = Database::open("./my_data")?;
789///
790/// // Concurrent mode (multi-reader, single-writer)
791/// let db = Database::open_concurrent("./my_data")?;
792///
793/// // Begin a transaction
794/// let txn = db.begin_transaction()?;
795///
796/// // Write data
797/// db.put(txn, b"user:1:name", b"Alice")?;
798///
799/// // Commit
800/// db.commit(txn)?;
801/// ```
802#[allow(dead_code)]
803pub struct Database {
804    /// Path to database directory
805    path: PathBuf,
806    /// Durable storage layer (WAL + MVCC + memtable)
807    storage: Arc<DurableStorage>,
808    /// Concurrent MVCC manager (for concurrent mode)
809    concurrent_mvcc: Option<Arc<crate::mvcc_concurrent::ConcurrentMvcc>>,
810    /// Schema catalog
811    catalog: Arc<RwLock<Catalog>>,
812    /// Registered table schemas (name -> schema) - lock-free for reads
813    tables: DashMap<String, TableSchema>,
814    /// Cached packed schemas for fast insert (name -> packed schema)
815    packed_schemas: DashMap<String, PackedTableSchema>,
816    /// Per-table index policy registry
817    index_registry: Arc<TableIndexRegistry>,
818    /// Configuration
819    config: DatabaseConfig,
820    /// Statistics
821    stats: DatabaseStats,
822    /// Shutdown flag
823    shutdown: AtomicU64,
824    /// Whether this database is in concurrent mode
825    is_concurrent: bool,
826}
827
828/// Database statistics
829struct DatabaseStats {
830    transactions_started: AtomicU64,
831    transactions_committed: AtomicU64,
832    transactions_aborted: AtomicU64,
833    queries_executed: AtomicU64,
834    bytes_written: AtomicU64,
835    bytes_read: AtomicU64,
836}
837
838impl DatabaseStats {
839    fn new() -> Self {
840        Self {
841            transactions_started: AtomicU64::new(0),
842            transactions_committed: AtomicU64::new(0),
843            transactions_aborted: AtomicU64::new(0),
844            queries_executed: AtomicU64::new(0),
845            bytes_written: AtomicU64::new(0),
846            bytes_read: AtomicU64::new(0),
847        }
848    }
849}
850
851/// Public statistics snapshot
852#[derive(Debug, Clone)]
853pub struct Stats {
854    pub transactions_started: u64,
855    pub transactions_committed: u64,
856    pub transactions_aborted: u64,
857    pub queries_executed: u64,
858    pub bytes_written: u64,
859    pub bytes_read: u64,
860}
861
862impl Database {
863    /// Open or create a database at the given path.
864    ///
865    /// This is the primary entry point, similar to `sqlite3_open()`.
866    /// If the database exists, it will be opened and WAL recovery performed.
867    /// If it doesn't exist, a new database will be created.
868    ///
869    /// # Arguments
870    ///
871    /// * `path` - Directory path for the database files
872    ///
873    /// # Returns
874    ///
875    /// An `Arc<Database>` that can be shared across threads and connections.
876    pub fn open<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
877        Self::open_with_config(path, DatabaseConfig::default())
878    }
879
880    /// Open without locking (for testing crash recovery scenarios)
881    ///
882    /// # Safety
883    /// This should ONLY be used in tests that simulate crashes by forgetting
884    /// the storage instance. In production, always use `open()`.
885    #[cfg(test)]
886    pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
887        let path = path.as_ref().to_path_buf();
888        let config = DatabaseConfig::default();
889
890        let storage = Arc::new(DurableStorage::open_without_lock(&path)?);
891
892        let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
893            config.default_index_policy,
894        ));
895
896        let db = Arc::new(Self {
897            path: path.clone(),
898            storage,
899            concurrent_mvcc: None,
900            catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
901            tables: DashMap::new(),
902            packed_schemas: DashMap::new(),
903            index_registry,
904            config,
905            stats: DatabaseStats::new(),
906            shutdown: AtomicU64::new(0),
907            is_concurrent: false,
908        });
909
910        db.recover()?;
911        Ok(db)
912    }
913
914    /// Open with custom configuration
915    pub fn open_with_config<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> Result<Arc<Self>> {
916        let path = path.as_ref().to_path_buf();
917
918        // Use IndexPolicy-based storage configuration for automatic memtable selection
919        // This derives ordered index and memtable type from the policy
920        let storage = Arc::new(DurableStorage::open_with_policy(
921            &path,
922            config.default_index_policy,
923            config.group_commit,
924        )?);
925
926        // Create index registry with default policy from config
927        let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
928            config.default_index_policy,
929        ));
930
931        let db = Arc::new(Self {
932            path: path.clone(),
933            storage,
934            concurrent_mvcc: None,
935            catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
936            tables: DashMap::new(),
937            packed_schemas: DashMap::new(),
938            index_registry,
939            config,
940            stats: DatabaseStats::new(),
941            shutdown: AtomicU64::new(0),
942            is_concurrent: false,
943        });
944
945        // Perform crash recovery if needed
946        db.recover()?;
947
948        Ok(db)
949    }
950
951    /// Open database in concurrent mode (multi-reader, single-writer)
952    ///
953    /// This mode allows multiple processes to access the database simultaneously:
954    /// - **Readers**: Lock-free, concurrent access via MVCC snapshots
955    /// - **Writers**: Single-writer coordination through atomic locks
956    ///
957    /// # Use Cases
958    ///
959    /// - Web applications (Flask, FastAPI, Django)
960    /// - Hot reloading development servers
961    /// - Multi-process worker pools
962    /// - Any scenario with concurrent read access
963    ///
964    /// # Performance
965    ///
966    /// - Read latency: ~100ns (lock-free atomic operations)
967    /// - Write latency: ~60μs amortized (with group commit)
968    /// - Concurrent readers: Up to 1024 (configurable)
969    ///
970    /// # Example
971    ///
972    /// ```ignore
973    /// // Multiple processes can open the same database
974    /// let db = Database::open_concurrent("./my_data")?;
975    ///
976    /// // Reads are lock-free
977    /// let value = db.get(b"key")?;
978    ///
979    /// // Writes coordinate automatically
980    /// let txn = db.begin_transaction()?;
981    /// db.put(txn, b"key", b"value")?;
982    /// db.commit(txn)?;
983    /// ```
984    pub fn open_concurrent<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
985        Self::open_concurrent_with_config(path, DatabaseConfig::default())
986    }
987
988    /// Open database in concurrent mode with custom configuration
989    pub fn open_concurrent_with_config<P: AsRef<Path>>(
990        path: P,
991        config: DatabaseConfig,
992    ) -> Result<Arc<Self>> {
993        use crate::mvcc_concurrent::ConcurrentMvcc;
994
995        let path = path.as_ref().to_path_buf();
996        std::fs::create_dir_all(&path)?;
997
998        // Open concurrent MVCC manager (this uses shared memory, not exclusive lock)
999        let concurrent_mvcc = Arc::new(ConcurrentMvcc::open(&path)?);
1000
1001        // Open storage WITHOUT exclusive lock (concurrent MVCC handles coordination)
1002        // We use a special internal method that skips the file lock
1003        let storage = Arc::new(DurableStorage::open_for_concurrent(&path, config.default_index_policy)?);
1004
1005        // Create index registry with default policy from config
1006        let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
1007            config.default_index_policy,
1008        ));
1009
1010        let db = Arc::new(Self {
1011            path: path.clone(),
1012            storage,
1013            concurrent_mvcc: Some(concurrent_mvcc),
1014            catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1015            tables: DashMap::new(),
1016            packed_schemas: DashMap::new(),
1017            index_registry,
1018            config,
1019            stats: DatabaseStats::new(),
1020            shutdown: AtomicU64::new(0),
1021            is_concurrent: true,
1022        });
1023
1024        // Perform crash recovery if needed
1025        db.recover()?;
1026
1027        // Clean up any stale readers from crashed processes
1028        if let Some(ref mvcc) = db.concurrent_mvcc {
1029            mvcc.cleanup_stale_readers();
1030        }
1031
1032        Ok(db)
1033    }
1034
1035    /// Check if database is in concurrent mode
1036    #[inline]
1037    pub fn is_concurrent(&self) -> bool {
1038        self.is_concurrent
1039    }
1040
1041    /// Perform crash recovery
1042    fn recover(&self) -> Result<RecoveryStats> {
1043        self.storage.recover()
1044    }
1045
1046    /// Get database path
1047    pub fn path(&self) -> &Path {
1048        &self.path
1049    }
1050
1051    // =========================================================================
1052    // Transaction API
1053    // =========================================================================
1054
1055    /// Begin a new transaction
1056    pub fn begin_transaction(&self) -> Result<TxnHandle> {
1057        self.stats
1058            .transactions_started
1059            .fetch_add(1, Ordering::Relaxed);
1060        let txn_id = self.storage.begin_transaction()?;
1061
1062        // Get snapshot timestamp from MVCC
1063        // For now, use txn_id as a proxy (the real snapshot_ts is managed internally)
1064        Ok(TxnHandle {
1065            txn_id,
1066            snapshot_ts: txn_id,
1067        })
1068    }
1069
1070    /// Begin a read-only transaction (optimized: no SSI tracking)
1071    ///
1072    /// Read-only transactions skip SSI read tracking, reducing overhead
1073    /// from ~82ns to ~32ns per read (2.6x faster).
1074    ///
1075    /// Use this for:
1076    /// - SELECT queries that don't modify data
1077    /// - Analytics and reporting queries
1078    /// - Snapshot reads for backup
1079    pub fn begin_read_only(&self) -> Result<TxnHandle> {
1080        self.stats
1081            .transactions_started
1082            .fetch_add(1, Ordering::Relaxed);
1083        let txn_id = self.storage.begin_with_mode(TransactionMode::ReadOnly)?;
1084        Ok(TxnHandle {
1085            txn_id,
1086            snapshot_ts: txn_id,
1087        })
1088    }
1089
1090    /// Begin a write-only transaction (optimized: no read tracking)
1091    ///
1092    /// Write-only transactions skip read tracking, improving insert
1093    /// throughput for bulk loading scenarios.
1094    ///
1095    /// Use this for:
1096    /// - Bulk data imports
1097    /// - Append-only logging tables
1098    /// - ETL pipelines
1099    pub fn begin_write_only(&self) -> Result<TxnHandle> {
1100        self.stats
1101            .transactions_started
1102            .fetch_add(1, Ordering::Relaxed);
1103        let txn_id = self.storage.begin_with_mode(TransactionMode::WriteOnly)?;
1104        Ok(TxnHandle {
1105            txn_id,
1106            snapshot_ts: txn_id,
1107        })
1108    }
1109
1110    /// Commit a transaction
1111    ///
1112    /// In concurrent mode, acquires the shared writer lock to ensure
1113    /// WAL writes are serialized across processes, and forces a flush+sync
1114    /// so that subsequent processes see the committed data.
1115    pub fn commit(&self, txn: TxnHandle) -> Result<u64> {
1116        self.stats
1117            .transactions_committed
1118            .fetch_add(1, Ordering::Relaxed);
1119
1120        // In concurrent mode, acquire the cross-process writer lock
1121        // to serialize WAL commits across processes
1122        let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1123            Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1124        } else {
1125            None
1126        };
1127
1128        let commit_ts = self.storage.commit(txn.txn_id)?;
1129
1130        // In concurrent mode, force flush+sync so other processes can see
1131        // the committed data when they open the DB or run recovery.
1132        // Without this, the BufWriter may hold data that isn't visible
1133        // to other processes reading the WAL file.
1134        if self.is_concurrent {
1135            self.storage.flush_wal()?;
1136            self.storage.fsync()?;
1137        }
1138
1139        // Notify concurrent MVCC of commit for GC tracking
1140        if let Some(ref mvcc) = self.concurrent_mvcc {
1141            mvcc.on_commit();
1142        }
1143
1144        Ok(commit_ts)
1145    }
1146
1147    /// Abort a transaction
1148    pub fn abort(&self, txn: TxnHandle) -> Result<()> {
1149        self.stats
1150            .transactions_aborted
1151            .fetch_add(1, Ordering::Relaxed);
1152        self.storage.abort(txn.txn_id)
1153    }
1154
1155    // =========================================================================
1156    // Per-Table Index Policy API
1157    // =========================================================================
1158
1159    /// Configure index policy for a table
1160    ///
1161    /// This allows fine-grained control over write/scan trade-offs per table:
1162    ///
1163    /// | Policy         | Insert Cost | Scan Cost      | Use Case              |
1164    /// |----------------|-------------|----------------|------------------------|
1165    /// | WriteOptimized | O(1)        | O(N)           | High-write, rare scan  |
1166    /// | Balanced       | O(1) amort  | O(output+logK) | Mixed OLTP            |
1167    /// | ScanOptimized  | O(log N)    | O(logN + K)    | Analytics, range query |
1168    /// | AppendOnly     | O(1)        | O(N)           | Time-series logs       |
1169    ///
1170    /// # Example
1171    ///
1172    /// ```ignore
1173    /// // Fast inserts for logs table (no ordered index overhead)
1174    /// db.set_table_index_policy("logs", IndexPolicy::WriteOptimized);
1175    ///
1176    /// // Efficient range scans for analytics table
1177    /// db.set_table_index_policy("analytics", IndexPolicy::ScanOptimized);
1178    ///
1179    /// // Balanced for OLTP tables
1180    /// db.set_table_index_policy("users", IndexPolicy::Balanced);
1181    /// ```
1182    pub fn set_table_index_policy(&self, table: &str, policy: IndexPolicy) {
1183        self.index_registry.configure_table(
1184            TableIndexConfig::new(table, policy)
1185        );
1186    }
1187
1188    /// Get the index policy for a table
1189    pub fn get_table_index_policy(&self, table: &str) -> IndexPolicy {
1190        self.index_registry.get_policy(table)
1191    }
1192
1193    /// Get the index registry for advanced configuration
1194    pub fn index_registry(&self) -> &Arc<TableIndexRegistry> {
1195        &self.index_registry
1196    }
1197
1198    // =========================================================================
1199    // Key-Value API (Low-level)
1200    // =========================================================================
1201
1202    /// Put a key-value pair
1203    ///
1204    /// In concurrent mode, acquires the shared writer lock to ensure
1205    /// WAL writes are serialized across processes.
1206    pub fn put(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1207        self.stats
1208            .bytes_written
1209            .fetch_add((key.len() + value.len()) as u64, Ordering::Relaxed);
1210
1211        // In concurrent mode, acquire cross-process writer lock
1212        let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1213            Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1214        } else {
1215            None
1216        };
1217
1218        // Use write_refs to avoid unnecessary allocations
1219        self.storage.write_refs(txn.txn_id, key, value)
1220    }
1221
1222    /// Batch put multiple key-value pairs with reduced overhead
1223    ///
1224    /// This amortizes per-operation costs over the entire batch:
1225    /// - Single DashMap lookup
1226    /// - Batch MVCC tracking
1227    /// - Batch memtable writes
1228    ///
1229    /// For 100+ entries, this is 2-3x faster than individual puts.
1230    ///
1231    /// # Example
1232    ///
1233    /// ```ignore
1234    /// let writes: Vec<(&[u8], &[u8])> = vec![
1235    ///     (b"key1", b"value1"),
1236    ///     (b"key2", b"value2"),
1237    ///     (b"key3", b"value3"),
1238    /// ];
1239    /// db.put_batch(txn, &writes)?;
1240    /// ```
1241    pub fn put_batch(&self, txn: TxnHandle, writes: &[(&[u8], &[u8])]) -> Result<()> {
1242        let bytes: u64 = writes
1243            .iter()
1244            .map(|(k, v)| (k.len() + v.len()) as u64)
1245            .sum();
1246        self.stats.bytes_written.fetch_add(bytes, Ordering::Relaxed);
1247
1248        // In concurrent mode, acquire cross-process writer lock
1249        let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1250            Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1251        } else {
1252            None
1253        };
1254
1255        self.storage.write_batch_refs(txn.txn_id, writes)
1256    }
1257
1258    /// Get a value by key
1259    pub fn get(&self, txn: TxnHandle, key: &[u8]) -> Result<Option<Vec<u8>>> {
1260        let result = self.storage.read(txn.txn_id, key)?;
1261        if let Some(ref data) = result {
1262            self.stats
1263                .bytes_read
1264                .fetch_add(data.len() as u64, Ordering::Relaxed);
1265        }
1266        Ok(result)
1267    }
1268
1269    /// Delete a key
1270    pub fn delete(&self, txn: TxnHandle, key: &[u8]) -> Result<()> {
1271        self.storage.delete(txn.txn_id, key.to_vec())
1272    }
1273
1274    /// Minimum prefix length for scan operations.
1275    /// Prevents expensive full-table scans by requiring a meaningful prefix.
1276    pub const MIN_SCAN_PREFIX_LEN: usize = 2;
1277
1278    /// Scan keys with a prefix (enforces minimum prefix length for safety).
1279    ///
1280    /// # Prefix Safety
1281    /// 
1282    /// To prevent accidental full-table scans, this method requires a minimum
1283    /// prefix length of 2 bytes. Use `scan_unchecked` for internal operations
1284    /// that need empty/short prefixes.
1285    ///
1286    /// # Errors
1287    ///
1288    /// Returns `SochDBError::InvalidInput` if prefix is too short.
1289    pub fn scan(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1290        if prefix.len() < Self::MIN_SCAN_PREFIX_LEN {
1291            return Err(SochDBError::InvalidArgument(format!(
1292                "Prefix too short: {} bytes (minimum {} required). \
1293                 Use scan_unchecked() for unrestricted scans.",
1294                prefix.len(),
1295                Self::MIN_SCAN_PREFIX_LEN
1296            )));
1297        }
1298        self.scan_unchecked(txn, prefix)
1299    }
1300
1301    /// Scan keys with a prefix without length validation.
1302    ///
1303    /// # Warning
1304    ///
1305    /// This method allows empty/short prefixes which can cause expensive
1306    /// full-table scans. Use `scan()` unless you specifically need unrestricted
1307    /// prefix access for internal operations.
1308    pub fn scan_unchecked(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1309        let results = self.storage.scan(txn.txn_id, prefix)?;
1310        let bytes: u64 = results
1311            .iter()
1312            .map(|(k, v)| (k.len() + v.len()) as u64)
1313            .sum();
1314        self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1315        Ok(results)
1316    }
1317
1318    /// Scan keys in range
1319    pub fn scan_range(
1320        &self,
1321        txn: TxnHandle,
1322        start: &[u8],
1323        end: &[u8],
1324    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1325        let results = self.storage.scan_range(txn.txn_id, start, end)?;
1326        let bytes: u64 = results
1327            .iter()
1328            .map(|(k, v)| (k.len() + v.len()) as u64)
1329            .sum();
1330        self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1331        Ok(results)
1332    }
1333
1334    /// Streaming scan for very large result sets
1335    /// 
1336    /// Returns an iterator that yields (key, value) pairs without
1337    /// materializing the entire result set. Use this for large scans
1338    /// where memory efficiency is important.
1339    /// 
1340    /// ## Performance
1341    /// 
1342    /// - Memory: O(1) per iteration vs O(N) for scan_range
1343    /// - Latency: First result available immediately vs waiting for all results
1344    /// - Throughput: Slightly lower due to per-item overhead
1345    /// 
1346    /// ## Usage
1347    /// 
1348    /// ```ignore
1349    /// for result in db.scan_range_iter(txn, b"start", b"end") {
1350    ///     let (key, value) = result?;
1351    ///     // Process immediately - no need to wait for all results
1352    /// }
1353    /// ```
1354    pub fn scan_range_iter<'a>(
1355        &'a self,
1356        txn: TxnHandle,
1357        start: &'a [u8],
1358        end: &'a [u8],
1359    ) -> impl Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a {
1360        let stats = &self.stats;
1361        self.storage
1362            .scan_range_iter(txn.txn_id, start, end)
1363            .map(move |item| {
1364                stats.bytes_read.fetch_add(
1365                    (item.0.len() + item.1.len()) as u64,
1366                    Ordering::Relaxed,
1367                );
1368                Ok(item)
1369            })
1370    }
1371
1372    /// Flush memtable to WAL/Disk
1373    pub fn flush(&self) -> Result<()> {
1374        self.storage.fsync()
1375    }
1376
1377    // =========================================================================
1378    // Path-Native API (SochDB's differentiator)
1379    // =========================================================================
1380
1381    /// Get storage statistics
1382    pub fn storage_stats(&self) -> crate::durable_storage::StorageStats {
1383        self.storage.stats()
1384    }
1385
1386    /// Put a value at a path
1387    ///
1388    /// Path format: "collection/doc_id/field" or "table.row_id.column"
1389    /// Resolution is O(|path|), not O(log N) like B-tree.
1390    pub fn put_path(&self, txn: TxnHandle, path: &str, value: &[u8]) -> Result<()> {
1391        self.put(txn, path.as_bytes(), value)
1392    }
1393
1394    /// Get a value at a path
1395    pub fn get_path(&self, txn: TxnHandle, path: &str) -> Result<Option<Vec<u8>>> {
1396        self.get(txn, path.as_bytes())
1397    }
1398
1399    /// Delete at a path
1400    pub fn delete_path(&self, txn: TxnHandle, path: &str) -> Result<()> {
1401        self.delete(txn, path.as_bytes())
1402    }
1403
1404    /// Scan a path prefix
1405    ///
1406    /// Returns all key-value pairs where key starts with prefix.
1407    /// Useful for: "users/123/" -> all fields of user 123
1408    pub fn scan_path(&self, txn: TxnHandle, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
1409        self.stats.queries_executed.fetch_add(1, Ordering::Relaxed);
1410
1411        let results = self.scan(txn, prefix.as_bytes())?;
1412
1413        Ok(results
1414            .into_iter()
1415            .filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
1416            .collect())
1417    }
1418
1419    // =========================================================================
1420    // Query API
1421    // =========================================================================
1422
1423    /// Execute a path query and return results
1424    ///
1425    /// This is the main query interface for LLM context retrieval.
1426    /// Supports:
1427    /// - Path prefix matching
1428    /// - Column projection (for I/O reduction)
1429    /// - Limit/offset
1430    pub fn query(&self, txn: TxnHandle, path_prefix: &str) -> QueryBuilder<'_> {
1431        QueryBuilder::new(self, txn, path_prefix.to_string())
1432    }
1433
1434    // =========================================================================
1435    // Table API (Higher-level abstraction)
1436    // =========================================================================
1437
1438    /// Register a table schema
1439    pub fn register_table(&self, schema: TableSchema) -> Result<()> {
1440        if self.tables.contains_key(&schema.name) {
1441            return Err(SochDBError::InvalidArgument(format!(
1442                "Table '{}' already exists",
1443                schema.name
1444            )));
1445        }
1446        // Cache the packed schema for fast inserts
1447        let packed_schema = Self::to_packed_schema(&schema);
1448        self.packed_schemas
1449            .insert(schema.name.clone(), packed_schema);
1450        self.tables.insert(schema.name.clone(), schema);
1451        Ok(())
1452    }
1453
1454    /// Get table schema
1455    pub fn get_table_schema(&self, name: &str) -> Option<TableSchema> {
1456        self.tables.get(name).map(|s| s.clone())
1457    }
1458
1459    /// List all tables
1460    pub fn list_tables(&self) -> Vec<String> {
1461        self.tables.iter().map(|e| e.key().clone()).collect()
1462    }
1463    /// Convert TableSchema to PackedTableSchema for efficient storage
1464    fn to_packed_schema(schema: &TableSchema) -> PackedTableSchema {
1465        let columns = schema
1466            .columns
1467            .iter()
1468            .map(|col| PackedColumnDef {
1469                name: col.name.clone(),
1470                col_type: match col.col_type {
1471                    ColumnType::Int64 => PackedColumnType::Int64,
1472                    ColumnType::UInt64 => PackedColumnType::UInt64,
1473                    ColumnType::Float64 => PackedColumnType::Float64,
1474                    ColumnType::Text => PackedColumnType::Text,
1475                    ColumnType::Binary => PackedColumnType::Binary,
1476                    ColumnType::Bool => PackedColumnType::Bool,
1477                },
1478                nullable: col.nullable,
1479            })
1480            .collect();
1481
1482        PackedTableSchema::new(&schema.name, columns)
1483    }
1484
1485    /// Insert a row into a table
1486    ///
1487    /// Uses packed row format: stores entire row as single key-value pair.
1488    /// This reduces write amplification from 4× to 1× for a 4-column table.
1489    ///
1490    /// # Performance
1491    /// - Before: 4 columns × (WAL entry + MVCC version) = 4 writes
1492    /// - After: 1 packed row = 1 write
1493    /// - Improvement: ~4× fewer WAL entries, ~48% less I/O overhead
1494    pub fn insert_row(
1495        &self,
1496        txn: TxnHandle,
1497        table: &str,
1498        row_id: u64,
1499        values: &HashMap<String, SochValue>,
1500    ) -> Result<()> {
1501        // Use cached packed schema - single DashMap lookup, no clone
1502        let packed_schema = self
1503            .packed_schemas
1504            .get(table)
1505            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1506
1507        // Pack the row using cached schema
1508        let packed_row = PackedRow::pack(&packed_schema, values);
1509
1510        // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1511        let key = KeyBuffer::format_row_key(table, row_id);
1512
1513        self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1514
1515        Ok(())
1516    }
1517
1518    /// Read a row from a table
1519    ///
1520    /// Reads packed row and extracts requested columns in O(k) time.
1521    /// Column projection happens in memory, not storage - all columns are fetched.
1522    pub fn read_row(
1523        &self,
1524        txn: TxnHandle,
1525        table: &str,
1526        row_id: u64,
1527        columns: Option<&[&str]>,
1528    ) -> Result<Option<HashMap<String, SochValue>>> {
1529        let schema = self
1530            .tables
1531            .get(table)
1532            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1533
1534        // Read the packed row with a single key lookup using KeyBuffer
1535        let key = KeyBuffer::format_row_key(table, row_id);
1536        let bytes = match self.get(txn, key.as_bytes())? {
1537            Some(b) => b,
1538            None => return Ok(None),
1539        };
1540
1541        // Use cached packed schema
1542        let packed_schema = self
1543            .packed_schemas
1544            .get(table)
1545            .ok_or_else(|| SochDBError::Internal("Packed schema not found".into()))?;
1546        let packed_row = PackedRow::from_bytes(bytes, packed_schema.num_columns())?;
1547
1548        // Determine which columns to return
1549        let cols_to_read: Vec<&str> = match columns {
1550            Some(c) => c.to_vec(),
1551            None => schema.columns.iter().map(|c| c.name.as_str()).collect(),
1552        };
1553
1554        let mut row = HashMap::new();
1555        for col_name in cols_to_read {
1556            if let Some(idx) = packed_schema.column_index(col_name)
1557                && let Some(col_def) = packed_schema.column(idx)
1558                && let Some(value) = packed_row.get_column(idx, col_def.col_type)
1559            {
1560                row.insert(col_name.to_string(), value);
1561            }
1562        }
1563
1564        Ok(Some(row))
1565    }
1566
1567    /// Insert multiple rows efficiently in a batch
1568    ///
1569    /// This method accumulates all rows and writes them with fewer WAL syncs.
1570    /// Ideal for bulk loading scenarios.
1571    ///
1572    /// # Performance
1573    /// - Uses group commit to batch fsync operations
1574    /// - Expected throughput: 500K-1M rows/sec depending on row size
1575    pub fn insert_rows_batch(
1576        &self,
1577        txn: TxnHandle,
1578        table: &str,
1579        rows: &[(u64, HashMap<String, SochValue>)],
1580    ) -> Result<usize> {
1581        // Use cached packed schema
1582        let packed_schema = self
1583            .packed_schemas
1584            .get(table)
1585            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1586
1587        let mut count = 0;
1588
1589        for (row_id, values) in rows {
1590            // Pack and write using KeyBuffer for efficient key construction
1591            let packed_row = PackedRow::pack(&packed_schema, values);
1592            let key = KeyBuffer::format_row_key(table, *row_id);
1593            self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1594            count += 1;
1595        }
1596
1597        Ok(count)
1598    }
1599
1600    /// Ultra-fast raw put - bypasses all validation
1601    ///
1602    /// Use when you've already validated the data and just need speed.
1603    /// This is ~10× faster than insert_row() for bulk inserts.
1604    #[inline]
1605    pub fn put_raw(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1606        self.storage.write_refs(txn.txn_id, key, value)
1607    }
1608
1609    /// Zero-allocation insert - fastest path for bulk inserts
1610    ///
1611    /// Takes values as a slice in schema column order, avoiding HashMap overhead.
1612    ///
1613    /// # Arguments
1614    /// * `txn` - Transaction handle
1615    /// * `table` - Table name
1616    /// * `row_id` - Row identifier
1617    /// * `values` - Values in schema column order (None = NULL)
1618    ///
1619    /// # Performance
1620    /// - Eliminates ~6 allocations per row vs insert_row()
1621    /// - Expected: 1.2M-1.5M inserts/sec
1622    ///
1623    /// # Example
1624    /// ```ignore
1625    /// let values: &[Option<&SochValue>] = &[
1626    ///     Some(&SochValue::Int(1)),
1627    ///     Some(&SochValue::Text("Alice".into())),
1628    ///     None, // NULL
1629    /// ];
1630    /// db.insert_row_slice(txn, "users", 1, values)?;
1631    /// ```
1632    #[inline]
1633    pub fn insert_row_slice(
1634        &self,
1635        txn: TxnHandle,
1636        table: &str,
1637        row_id: u64,
1638        values: &[Option<&SochValue>],
1639    ) -> Result<()> {
1640        // Use cached packed schema - single DashMap lookup
1641        let packed_schema = self
1642            .packed_schemas
1643            .get(table)
1644            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1645
1646        // Validate column count matches
1647        if values.len() != packed_schema.num_columns() {
1648            return Err(SochDBError::InvalidArgument(format!(
1649                "Expected {} columns, got {}",
1650                packed_schema.num_columns(),
1651                values.len()
1652            )));
1653        }
1654
1655        // Pack using zero-allocation path
1656        let packed_row = PackedRow::pack_slice(&packed_schema, values);
1657
1658        // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1659        let key = KeyBuffer::format_row_key(table, row_id);
1660
1661        self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1662        Ok(())
1663    }
1664
1665    // =========================================================================
1666    // Maintenance
1667    // =========================================================================
1668
1669    /// Force fsync to disk
1670    pub fn fsync(&self) -> Result<()> {
1671        self.storage.fsync()
1672    }
1673
1674    /// Create a checkpoint
1675    pub fn checkpoint(&self) -> Result<u64> {
1676        self.storage.checkpoint()
1677    }
1678
1679    /// Run garbage collection
1680    pub fn gc(&self) -> usize {
1681        self.storage.gc()
1682    }
1683
1684    /// Get database statistics
1685    pub fn stats(&self) -> Stats {
1686        Stats {
1687            transactions_started: self.stats.transactions_started.load(Ordering::Relaxed),
1688            transactions_committed: self.stats.transactions_committed.load(Ordering::Relaxed),
1689            transactions_aborted: self.stats.transactions_aborted.load(Ordering::Relaxed),
1690            queries_executed: self.stats.queries_executed.load(Ordering::Relaxed),
1691            bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
1692            bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
1693        }
1694    }
1695
1696    /// Shutdown the database gracefully
1697    pub fn shutdown(&self) -> Result<()> {
1698        if self.shutdown.swap(1, Ordering::SeqCst) == 1 {
1699            return Ok(()); // Already shutting down
1700        }
1701
1702        // Flush any pending writes
1703        self.fsync()?;
1704
1705        // Create clean shutdown marker
1706        let marker = self.path.join(".clean_shutdown");
1707        std::fs::write(&marker, b"ok")?;
1708
1709        Ok(())
1710    }
1711}
1712
1713impl Drop for Database {
1714    fn drop(&mut self) {
1715        // Try graceful shutdown if not already done
1716        if self.shutdown.load(Ordering::SeqCst) == 0 {
1717            let _ = self.fsync();
1718            let marker = self.path.join(".clean_shutdown");
1719            let _ = std::fs::write(&marker, b"ok");
1720        }
1721    }
1722}
1723
1724/// Query builder for fluent query construction
1725pub struct QueryBuilder<'a> {
1726    db: &'a Database,
1727    txn: TxnHandle,
1728    path_prefix: String,
1729    columns: Option<Vec<String>>,
1730    limit: Option<usize>,
1731    offset: Option<usize>,
1732}
1733
1734impl<'a> QueryBuilder<'a> {
1735    fn new(db: &'a Database, txn: TxnHandle, path_prefix: String) -> Self {
1736        Self {
1737            db,
1738            txn,
1739            path_prefix,
1740            columns: None,
1741            limit: None,
1742            offset: None,
1743        }
1744    }
1745
1746    /// Select specific columns (for I/O reduction)
1747    pub fn columns(mut self, cols: &[&str]) -> Self {
1748        self.columns = Some(cols.iter().map(|s| s.to_string()).collect());
1749        self
1750    }
1751
1752    /// Limit results
1753    pub fn limit(mut self, n: usize) -> Self {
1754        self.limit = Some(n);
1755        self
1756    }
1757
1758    /// Skip results
1759    pub fn offset(mut self, n: usize) -> Self {
1760        self.offset = Some(n);
1761        self
1762    }
1763
1764    /// Execute the query
1765    ///
1766    /// Scans packed rows and unpacks them. Each key is "table/row_id" pointing to a packed row.
1767    pub fn execute(self) -> Result<QueryResult> {
1768        self.db
1769            .stats
1770            .queries_executed
1771            .fetch_add(1, Ordering::Relaxed);
1772
1773        // Get schema for the table if we're querying a table
1774        let table_name = self
1775            .path_prefix
1776            .split('/')
1777            .next()
1778            .unwrap_or(&self.path_prefix);
1779        let schema = self.db.tables.get(table_name).map(|s| s.clone());
1780
1781        // Scan the path prefix
1782        let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1783
1784        let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
1785        let mut bytes_read = 0usize;
1786
1787        if let Some(ref schema) = schema {
1788            // We have a table schema - use cached packed schema
1789            let packed_schema = self
1790                .db
1791                .packed_schemas
1792                .get(table_name)
1793                .map(|ps| ps.clone())
1794                .unwrap_or_else(|| Database::to_packed_schema(schema));
1795
1796            for (path, value_bytes) in results {
1797                // Parse path: table/row_id
1798                let parts: Vec<&str> = path.split('/').collect();
1799                if parts.len() == 2 {
1800                    // This is a packed row
1801                    bytes_read += value_bytes.len();
1802
1803                    if let Ok(packed_row) =
1804                        PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
1805                    {
1806                        // Unpack all columns or just requested columns
1807                        let mut row = HashMap::new();
1808
1809                        if let Some(ref cols) = self.columns {
1810                            // Only extract requested columns
1811                            for col_name in cols {
1812                                if let Some(idx) = packed_schema.column_index(col_name)
1813                                    && let Some(col_def) = packed_schema.column(idx)
1814                                    && let Some(value) =
1815                                        packed_row.get_column(idx, col_def.col_type)
1816                                {
1817                                    row.insert(col_name.clone(), value);
1818                                }
1819                            }
1820                        } else {
1821                            // Extract all columns
1822                            row = packed_row.unpack(&packed_schema);
1823                        }
1824
1825                        if !row.is_empty() {
1826                            rows.push(row);
1827                        }
1828                    }
1829                }
1830            }
1831        } else {
1832            // Fallback: no schema, try legacy column-per-key format
1833            let mut rows_map: HashMap<String, HashMap<String, SochValue>> = HashMap::new();
1834
1835            for (path, value_bytes) in results {
1836                let parts: Vec<&str> = path.split('/').collect();
1837                if parts.len() >= 3 {
1838                    let row_key = format!("{}/{}", parts[0], parts[1]);
1839                    let col_name = parts[2..].join("/");
1840
1841                    if let Some(ref cols) = self.columns
1842                        && !cols.contains(&col_name)
1843                    {
1844                        continue;
1845                    }
1846
1847                    bytes_read += value_bytes.len();
1848                    let row = rows_map.entry(row_key).or_default();
1849                    row.insert(col_name, deserialize_value(&value_bytes));
1850                }
1851            }
1852
1853            rows = rows_map.into_values().collect();
1854        }
1855
1856        // Apply offset
1857        if let Some(offset) = self.offset {
1858            rows = rows.into_iter().skip(offset).collect();
1859        }
1860
1861        // Apply limit
1862        if let Some(limit) = self.limit {
1863            rows.truncate(limit);
1864        }
1865
1866        // Collect column names
1867        let columns: Vec<String> = self.columns.unwrap_or_else(|| {
1868            rows.iter()
1869                .flat_map(|r| r.keys().cloned())
1870                .collect::<std::collections::HashSet<_>>()
1871                .into_iter()
1872                .collect()
1873        });
1874
1875        Ok(QueryResult {
1876            columns,
1877            rows_scanned: rows.len(),
1878            bytes_read,
1879            rows,
1880        })
1881    }
1882
1883    /// Execute and return TOON format (for LLM efficiency)
1884    pub fn to_toon(self) -> Result<String> {
1885        let result = self.execute()?;
1886        Ok(result.to_toon())
1887    }
1888
1889    /// Execute with lazy iteration - avoids materializing all rows
1890    ///
1891    /// Returns an iterator over rows as `Vec<SochValue>` in schema column order.
1892    /// This is more memory-efficient than `execute()` for large result sets.
1893    ///
1894    /// # Performance
1895    /// - No upfront materialization of all rows
1896    /// - ~40% less memory for large result sets
1897    /// - Ideal for streaming to network or aggregations
1898    ///
1899    /// # Example
1900    /// ```ignore
1901    /// for row_result in db.query(txn, "users").execute_iter()? {
1902    ///     let row = row_result?;
1903    ///     // row is Vec<SochValue> in column order
1904    /// }
1905    /// ```
1906    pub fn execute_iter(self) -> Result<QueryRowIterator> {
1907        self.db
1908            .stats
1909            .queries_executed
1910            .fetch_add(1, Ordering::Relaxed);
1911
1912        let table_name = self
1913            .path_prefix
1914            .split('/')
1915            .next()
1916            .unwrap_or(&self.path_prefix)
1917            .to_string();
1918
1919        // Get packed schema (clone needed for iterator ownership)
1920        let packed_schema = self.db.packed_schemas.get(&table_name).map(|ps| ps.clone());
1921
1922        // Scan the path prefix
1923        let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1924
1925        Ok(QueryRowIterator {
1926            results: results.into_iter(),
1927            packed_schema,
1928            columns: self.columns,
1929            offset: self.offset.unwrap_or(0),
1930            limit: self.limit,
1931            yielded: 0,
1932            skipped: 0,
1933        })
1934    }
1935
1936    /// Execute and return columnar (SIMD-friendly) result format
1937    ///
1938    /// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, returns
1939    /// column-oriented `Vec<TypedColumn>` for vectorized operations.
1940    ///
1941    /// ## Performance Benefits
1942    ///
1943    /// - SIMD: Aggregate operations (sum, avg) use vectorized instructions
1944    /// - Cache: Sequential access maximizes L1/L2 hits
1945    /// - Memory: ~30% less overhead than row-based format
1946    /// - Analytics: Ideal for ML preprocessing and statistics
1947    ///
1948    /// ## Example
1949    ///
1950    /// ```ignore
1951    /// let result = db.query(txn, "users")
1952    ///     .columns(&["id", "score"])
1953    ///     .as_columnar()?;
1954    ///
1955    /// // SIMD-optimized sum
1956    /// let total = result.sum_i64("score").unwrap_or(0);
1957    ///
1958    /// // Direct column access
1959    /// if let Some(scores) = result.column("score") {
1960    ///     for i in 0..scores.len() {
1961    ///         if let Some(v) = scores.get_i64(i) {
1962    ///             println!("Score: {}", v);
1963    ///         }
1964    ///     }
1965    /// }
1966    /// ```
1967    pub fn as_columnar(self) -> Result<ColumnarQueryResult> {
1968        self.db
1969            .stats
1970            .queries_executed
1971            .fetch_add(1, Ordering::Relaxed);
1972
1973        let table_name = self
1974            .path_prefix
1975            .split('/')
1976            .next()
1977            .unwrap_or(&self.path_prefix);
1978        let schema = self.db.tables.get(table_name).map(|s| s.clone());
1979
1980        // Get packed schema
1981        let packed_schema = match self.db.packed_schemas.get(table_name) {
1982            Some(ps) => ps.clone(),
1983            None => return Ok(ColumnarQueryResult::empty()),
1984        };
1985
1986        // Determine columns to fetch
1987        let column_names: Vec<String> = self.columns.clone().unwrap_or_else(|| {
1988            schema
1989                .as_ref()
1990                .map(|s| s.columns.iter().map(|c| c.name.clone()).collect())
1991                .unwrap_or_default()
1992        });
1993
1994        if column_names.is_empty() {
1995            return Ok(ColumnarQueryResult::empty());
1996        }
1997
1998        // Initialize TypedColumns based on schema types
1999        let mut columns: Vec<CoreTypedColumn> = column_names
2000            .iter()
2001            .map(|col_name| {
2002                packed_schema
2003                    .column_index(col_name)
2004                    .and_then(|idx| packed_schema.column(idx))
2005                    .map(|col_def| match col_def.col_type {
2006                        PackedColumnType::Int64 => CoreTypedColumn::new_int64(),
2007                        PackedColumnType::UInt64 => CoreTypedColumn::new_uint64(),
2008                        PackedColumnType::Float64 => CoreTypedColumn::new_float64(),
2009                        PackedColumnType::Text => CoreTypedColumn::new_text(),
2010                        PackedColumnType::Binary => CoreTypedColumn::new_binary(),
2011                        PackedColumnType::Bool => CoreTypedColumn::new_bool(),
2012                        PackedColumnType::Null => CoreTypedColumn::new_text(), // Null column = fallback to text
2013                    })
2014                    .unwrap_or_else(CoreTypedColumn::new_text) // fallback
2015            })
2016            .collect();
2017
2018        // Scan the path prefix
2019        let results = self.db.scan_path(self.txn, &self.path_prefix)?;
2020
2021        let mut row_count = 0;
2022        let mut bytes_read = 0;
2023        let mut skipped = 0;
2024
2025        for (path, value_bytes) in results {
2026            // Parse path: table/row_id
2027            let parts: Vec<&str> = path.split('/').collect();
2028            if parts.len() != 2 {
2029                continue;
2030            }
2031
2032            // Apply offset
2033            if let Some(offset) = self.offset
2034                && skipped < offset
2035            {
2036                skipped += 1;
2037                continue;
2038            }
2039
2040            // Apply limit
2041            if let Some(limit) = self.limit
2042                && row_count >= limit
2043            {
2044                break;
2045            }
2046
2047            bytes_read += value_bytes.len();
2048
2049            if let Ok(packed_row) = PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
2050            {
2051                // Extract each column and push to corresponding TypedColumn
2052                for (col_idx, col_name) in column_names.iter().enumerate() {
2053                    if let Some(schema_idx) = packed_schema.column_index(col_name) {
2054                        if let Some(col_def) = packed_schema.column(schema_idx) {
2055                            let value = packed_row.get_column(schema_idx, col_def.col_type);
2056                            push_value_to_typed_column(&mut columns[col_idx], value);
2057                        } else {
2058                            push_null_to_typed_column(&mut columns[col_idx]);
2059                        }
2060                    } else {
2061                        push_null_to_typed_column(&mut columns[col_idx]);
2062                    }
2063                }
2064                row_count += 1;
2065            }
2066        }
2067
2068        Ok(ColumnarQueryResult {
2069            columns: column_names,
2070            data: columns,
2071            row_count,
2072            bytes_read,
2073        })
2074    }
2075}
2076
2077/// Lazy iterator over query results
2078///
2079/// Unpacks rows on-demand, avoiding upfront materialization.
2080pub struct QueryRowIterator {
2081    results: std::vec::IntoIter<(String, Vec<u8>)>,
2082    packed_schema: Option<PackedTableSchema>,
2083    columns: Option<Vec<String>>,
2084    offset: usize,
2085    limit: Option<usize>,
2086    yielded: usize,
2087    skipped: usize,
2088}
2089
2090impl Iterator for QueryRowIterator {
2091    type Item = Result<Vec<SochValue>>;
2092
2093    fn next(&mut self) -> Option<Self::Item> {
2094        // Check limit
2095        if let Some(limit) = self.limit
2096            && self.yielded >= limit
2097        {
2098            return None;
2099        }
2100
2101        loop {
2102            let (path, value_bytes) = self.results.next()?;
2103
2104            // Parse path: table/row_id
2105            let parts: Vec<&str> = path.split('/').collect();
2106            if parts.len() != 2 {
2107                continue; // Skip non-row entries
2108            }
2109
2110            // Apply offset
2111            if self.skipped < self.offset {
2112                self.skipped += 1;
2113                continue;
2114            }
2115
2116            if let Some(ref schema) = self.packed_schema {
2117                match PackedRow::from_bytes(value_bytes, schema.num_columns()) {
2118                    Ok(packed_row) => {
2119                        let row = if let Some(ref cols) = self.columns {
2120                            // Project specific columns
2121                            cols.iter()
2122                                .map(|col_name| {
2123                                    schema
2124                                        .column_index(col_name)
2125                                        .and_then(|idx| schema.column(idx))
2126                                        .and_then(|col_def| {
2127                                            packed_row.get_column(
2128                                                schema.column_index(col_name).unwrap(),
2129                                                col_def.col_type,
2130                                            )
2131                                        })
2132                                        .unwrap_or(SochValue::Null)
2133                                })
2134                                .collect()
2135                        } else {
2136                            // All columns in order
2137                            packed_row.unpack_to_vec(schema)
2138                        };
2139
2140                        self.yielded += 1;
2141                        return Some(Ok(row));
2142                    }
2143                    Err(e) => return Some(Err(e)),
2144                }
2145            } else {
2146                // No schema - return raw bytes as binary
2147                self.yielded += 1;
2148                return Some(Ok(vec![SochValue::Binary(value_bytes)]));
2149            }
2150        }
2151    }
2152}
2153
2154// Helper functions for serialization (kept for backward compatibility with legacy data)
2155
2156#[allow(dead_code)]
2157fn serialize_value(value: &SochValue) -> Vec<u8> {
2158    // Simple serialization - in production use proper format
2159    match value {
2160        SochValue::Null => vec![0],
2161        SochValue::Int(i) => {
2162            let mut buf = vec![1];
2163            buf.extend_from_slice(&i.to_le_bytes());
2164            buf
2165        }
2166        SochValue::UInt(u) => {
2167            let mut buf = vec![2];
2168            buf.extend_from_slice(&u.to_le_bytes());
2169            buf
2170        }
2171        SochValue::Float(f) => {
2172            let mut buf = vec![3];
2173            buf.extend_from_slice(&f.to_le_bytes());
2174            buf
2175        }
2176        SochValue::Text(s) => {
2177            let mut buf = vec![4];
2178            buf.extend_from_slice(s.as_bytes());
2179            buf
2180        }
2181        SochValue::Bool(b) => vec![5, if *b { 1 } else { 0 }],
2182        SochValue::Binary(b) => {
2183            let mut buf = vec![6];
2184            buf.extend_from_slice(b);
2185            buf
2186        }
2187        _ => {
2188            // Fallback: serialize as text
2189            let s = format!("{:?}", value);
2190            let mut buf = vec![4];
2191            buf.extend_from_slice(s.as_bytes());
2192            buf
2193        }
2194    }
2195}
2196
2197fn deserialize_value(bytes: &[u8]) -> SochValue {
2198    if bytes.is_empty() {
2199        return SochValue::Null;
2200    }
2201
2202    match bytes[0] {
2203        0 => SochValue::Null,
2204        1 if bytes.len() >= 9 => {
2205            let i = i64::from_le_bytes(bytes[1..9].try_into().unwrap());
2206            SochValue::Int(i)
2207        }
2208        2 if bytes.len() >= 9 => {
2209            let u = u64::from_le_bytes(bytes[1..9].try_into().unwrap());
2210            SochValue::UInt(u)
2211        }
2212        3 if bytes.len() >= 9 => {
2213            let f = f64::from_le_bytes(bytes[1..9].try_into().unwrap());
2214            SochValue::Float(f)
2215        }
2216        4 => {
2217            let s = String::from_utf8_lossy(&bytes[1..]).to_string();
2218            SochValue::Text(s)
2219        }
2220        5 if bytes.len() >= 2 => SochValue::Bool(bytes[1] != 0),
2221        6 => SochValue::Binary(bytes[1..].to_vec()),
2222        _ => {
2223            // Treat as text
2224            let s = String::from_utf8_lossy(bytes).to_string();
2225            SochValue::Text(s)
2226        }
2227    }
2228}
2229
2230// ============================================================================
2231// Helper functions for columnar query result building
2232// ============================================================================
2233
2234/// Push a SochValue into a TypedColumn
2235fn push_value_to_typed_column(col: &mut CoreTypedColumn, value: Option<SochValue>) {
2236    match value {
2237        None => push_null_to_typed_column(col),
2238        Some(v) => match (col, v) {
2239            (
2240                CoreTypedColumn::Int64 {
2241                    values,
2242                    validity,
2243                    stats,
2244                },
2245                SochValue::Int(i),
2246            ) => {
2247                values.push(i);
2248                validity.push(true);
2249                stats.update_i64(i);
2250            }
2251            (
2252                CoreTypedColumn::Int64 {
2253                    values,
2254                    validity,
2255                    stats,
2256                },
2257                SochValue::UInt(u),
2258            ) => {
2259                values.push(u as i64);
2260                validity.push(true);
2261                stats.update_i64(u as i64);
2262            }
2263            (
2264                CoreTypedColumn::UInt64 {
2265                    values,
2266                    validity,
2267                    stats,
2268                },
2269                SochValue::UInt(u),
2270            ) => {
2271                values.push(u);
2272                validity.push(true);
2273                stats.update_i64(u as i64);
2274            }
2275            (
2276                CoreTypedColumn::UInt64 {
2277                    values,
2278                    validity,
2279                    stats,
2280                },
2281                SochValue::Int(i),
2282            ) => {
2283                values.push(i as u64);
2284                validity.push(true);
2285                stats.update_i64(i);
2286            }
2287            (
2288                CoreTypedColumn::Float64 {
2289                    values,
2290                    validity,
2291                    stats,
2292                },
2293                SochValue::Float(f),
2294            ) => {
2295                values.push(f);
2296                validity.push(true);
2297                stats.update_f64(f);
2298            }
2299            (
2300                CoreTypedColumn::Float64 {
2301                    values,
2302                    validity,
2303                    stats,
2304                },
2305                SochValue::Int(i),
2306            ) => {
2307                values.push(i as f64);
2308                validity.push(true);
2309                stats.update_f64(i as f64);
2310            }
2311            (
2312                CoreTypedColumn::Text {
2313                    offsets,
2314                    data,
2315                    validity,
2316                    stats,
2317                },
2318                SochValue::Text(s),
2319            ) => {
2320                data.extend_from_slice(s.as_bytes());
2321                offsets.push(data.len() as u32);
2322                validity.push(true);
2323                stats.row_count += 1;
2324            }
2325            (
2326                CoreTypedColumn::Binary {
2327                    offsets,
2328                    data,
2329                    validity,
2330                    stats,
2331                },
2332                SochValue::Binary(b),
2333            ) => {
2334                data.extend_from_slice(&b);
2335                offsets.push(data.len() as u32);
2336                validity.push(true);
2337                stats.row_count += 1;
2338            }
2339            (
2340                CoreTypedColumn::Bool {
2341                    values,
2342                    validity,
2343                    stats,
2344                    len,
2345                },
2346                SochValue::Bool(b),
2347            ) => {
2348                let idx = *len;
2349                *len += 1;
2350                let num_words = (*len).div_ceil(64);
2351                while values.len() < num_words {
2352                    values.push(0);
2353                }
2354                if b {
2355                    let word = idx / 64;
2356                    let bit = idx % 64;
2357                    values[word] |= 1 << bit;
2358                }
2359                validity.push(true);
2360                stats.row_count += 1;
2361            }
2362            // Type mismatch - push as null
2363            (col, _) => push_null_to_typed_column(col),
2364        },
2365    }
2366}
2367
2368/// Push a null value into a TypedColumn
2369fn push_null_to_typed_column(col: &mut CoreTypedColumn) {
2370    match col {
2371        CoreTypedColumn::Int64 {
2372            values,
2373            validity,
2374            stats,
2375        } => {
2376            values.push(0);
2377            validity.push(false);
2378            stats.update_null();
2379        }
2380        CoreTypedColumn::UInt64 {
2381            values,
2382            validity,
2383            stats,
2384        } => {
2385            values.push(0);
2386            validity.push(false);
2387            stats.update_null();
2388        }
2389        CoreTypedColumn::Float64 {
2390            values,
2391            validity,
2392            stats,
2393        } => {
2394            values.push(0.0);
2395            validity.push(false);
2396            stats.update_null();
2397        }
2398        CoreTypedColumn::Text {
2399            offsets,
2400            data: _,
2401            validity,
2402            stats,
2403        } => {
2404            offsets.push(offsets.last().copied().unwrap_or(0));
2405            validity.push(false);
2406            stats.update_null();
2407        }
2408        CoreTypedColumn::Binary {
2409            offsets,
2410            data: _,
2411            validity,
2412            stats,
2413        } => {
2414            offsets.push(offsets.last().copied().unwrap_or(0));
2415            validity.push(false);
2416            stats.update_null();
2417        }
2418        CoreTypedColumn::Bool {
2419            values,
2420            validity,
2421            stats,
2422            len,
2423        } => {
2424            *len += 1;
2425            let num_words = (*len).div_ceil(64);
2426            while values.len() < num_words {
2427                values.push(0);
2428            }
2429            validity.push(false);
2430            stats.update_null();
2431        }
2432    }
2433}
2434
2435#[cfg(test)]
2436mod tests {
2437    use super::*;
2438    use tempfile::tempdir;
2439
2440    #[test]
2441    fn test_database_open_close() {
2442        let dir = tempdir().unwrap();
2443        let db = Database::open(dir.path()).unwrap();
2444
2445        // Should be able to begin a transaction
2446        let txn = db.begin_transaction().unwrap();
2447        assert!(txn.txn_id > 0);
2448
2449        db.abort(txn).unwrap();
2450        db.shutdown().unwrap();
2451    }
2452
2453    #[test]
2454    fn test_database_put_get() {
2455        let dir = tempdir().unwrap();
2456        let db = Database::open(dir.path()).unwrap();
2457
2458        let txn = db.begin_transaction().unwrap();
2459        db.put(txn, b"key1", b"value1").unwrap();
2460
2461        let val = db.get(txn, b"key1").unwrap();
2462        assert_eq!(val, Some(b"value1".to_vec()));
2463
2464        db.commit(txn).unwrap();
2465
2466        // New transaction should see committed data
2467        let txn2 = db.begin_transaction().unwrap();
2468        let val = db.get(txn2, b"key1").unwrap();
2469        assert_eq!(val, Some(b"value1".to_vec()));
2470        db.abort(txn2).unwrap();
2471    }
2472
2473    #[test]
2474    fn test_database_path_api() {
2475        let dir = tempdir().unwrap();
2476        let db = Database::open(dir.path()).unwrap();
2477
2478        let txn = db.begin_transaction().unwrap();
2479
2480        // Write using path API
2481        db.put_path(txn, "users/1/name", b"Alice").unwrap();
2482        db.put_path(txn, "users/1/email", b"alice@example.com")
2483            .unwrap();
2484        db.put_path(txn, "users/2/name", b"Bob").unwrap();
2485
2486        db.commit(txn).unwrap();
2487
2488        // Scan path prefix
2489        let txn2 = db.begin_transaction().unwrap();
2490        let results = db.scan_path(txn2, "users/1/").unwrap();
2491        assert_eq!(results.len(), 2);
2492
2493        db.abort(txn2).unwrap();
2494    }
2495
2496    #[test]
2497    fn test_database_table_api() {
2498        let dir = tempdir().unwrap();
2499        let db = Database::open(dir.path()).unwrap();
2500
2501        // Register table
2502        db.register_table(TableSchema {
2503            name: "users".to_string(),
2504            columns: vec![
2505                ColumnDef {
2506                    name: "name".to_string(),
2507                    col_type: ColumnType::Text,
2508                    nullable: false,
2509                },
2510                ColumnDef {
2511                    name: "age".to_string(),
2512                    col_type: ColumnType::Int64,
2513                    nullable: true,
2514                },
2515            ],
2516        })
2517        .unwrap();
2518
2519        // Insert row
2520        let txn = db.begin_transaction().unwrap();
2521        let mut values = HashMap::new();
2522        values.insert("name".to_string(), SochValue::Text("Alice".to_string()));
2523        values.insert("age".to_string(), SochValue::Int(30));
2524
2525        db.insert_row(txn, "users", 1, &values).unwrap();
2526        db.commit(txn).unwrap();
2527
2528        // Read row
2529        let txn2 = db.begin_transaction().unwrap();
2530        let row = db.read_row(txn2, "users", 1, None).unwrap();
2531        assert!(row.is_some());
2532
2533        let row = row.unwrap();
2534        assert_eq!(row.get("name"), Some(&SochValue::Text("Alice".to_string())));
2535
2536        db.abort(txn2).unwrap();
2537    }
2538
2539    #[test]
2540    fn test_database_query_builder() {
2541        let dir = tempdir().unwrap();
2542        let db = Database::open(dir.path()).unwrap();
2543
2544        // Insert test data
2545        let txn = db.begin_transaction().unwrap();
2546        db.put_path(txn, "docs/1/title", b"Hello").unwrap();
2547        db.put_path(txn, "docs/1/content", b"World").unwrap();
2548        db.put_path(txn, "docs/2/title", b"Foo").unwrap();
2549        db.put_path(txn, "docs/2/content", b"Bar").unwrap();
2550        db.commit(txn).unwrap();
2551
2552        // Query with limit
2553        let txn2 = db.begin_transaction().unwrap();
2554        let result = db.query(txn2, "docs/").limit(1).execute().unwrap();
2555
2556        assert_eq!(result.rows.len(), 1);
2557        db.abort(txn2).unwrap();
2558    }
2559
2560    #[test]
2561    fn test_database_crash_recovery() {
2562        let dir = tempdir().unwrap();
2563
2564        // Write and commit
2565        {
2566            // Use open_without_lock for crash simulation tests
2567            let db = Database::open_without_lock(dir.path()).unwrap();
2568            // Set sync mode to FULL to ensure data is persisted before "crash"
2569            db.storage.set_sync_mode(2);
2570            let txn = db.begin_transaction().unwrap();
2571            db.put(txn, b"persist", b"this").unwrap();
2572            db.commit(txn).unwrap();
2573            // Don't call shutdown - simulate crash
2574            std::mem::forget(db);
2575        }
2576
2577        // Reopen - should recover
2578        {
2579            let db = Database::open_without_lock(dir.path()).unwrap();
2580            let txn = db.begin_transaction().unwrap();
2581            let val = db.get(txn, b"persist").unwrap();
2582            assert_eq!(val, Some(b"this".to_vec()));
2583            db.abort(txn).unwrap();
2584        }
2585    }
2586}