Skip to main content

sochdb_storage/
database.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SochDB Database Kernel
19//!
20//! The shared core that powers both embedded mode (`SochConnection::open`) and
21//! server mode (`sochdb-server`). This is the "SQLite engine" equivalent.
22//!
23//! ## Architecture
24//!
25//! ```text
26//! ┌──────────────────────────────────────────────────────────────────┐
27//! │                        Database Kernel                            │
28//! │  Arc<Database> - shared by all connections                       │
29//! ├──────────────────────────────────────────────────────────────────┤
30//! │                                                                   │
31//! │  ┌─────────────────┐   ┌─────────────────┐   ┌────────────────┐ │
32//! │  │  DurableStorage │   │     Catalog     │   │  Vector Index  │ │
33//! │  │  (WAL + MVCC)   │   │  (Schema Mgmt)  │   │  (HNSW/Vamana) │ │
34//! │  └────────┬────────┘   └────────┬────────┘   └───────┬────────┘ │
35//! │           │                     │                     │          │
36//! │           └─────────────────────┴─────────────────────┘          │
37//! │                                 │                                 │
38//! │  ┌─────────────────────────────────────────────────────────────┐ │
39//! │  │              Query Executor (Path-Native)                    │ │
40//! │  │  - Path resolution: O(|path|)                                │ │
41//! │  │  - Column projection: 80% I/O reduction                     │ │
42//! │  │  - Context selection: Token-aware chunking                  │ │
43//! │  └─────────────────────────────────────────────────────────────┘ │
44//! │                                                                   │
45//! └──────────────────────────────────────────────────────────────────┘
46//!
47//! Deployment Modes:
48//! ┌─────────────┐   ┌─────────────┐   ┌─────────────┐
49//! │  Embedded   │   │  IPC Server │   │  TCP Server │
50//! │  (in-proc)  │   │  (Unix sock)│   │  (remote)   │
51//! └──────┬──────┘   └──────┬──────┘   └──────┬──────┘
52//!        │                 │                 │
53//!        └─────────────────┴─────────────────┘
54//!                          │
55//!                   Arc<Database>
56//! ```
57//!
58//! ## Latency Model
59//!
60//! Let K = kernel processing cost for a query
61//!
62//! - Embedded: L_emb ≈ K (function call overhead negligible)
63//! - IPC: L_ipc ≈ K + δ_ipc (δ_ipc = ~10-50µs for Unix socket)
64//! - TCP: L_tcp ≈ K + δ_net (δ_net = 100µs-10ms depending on network)
65//!
66//! For LLM context queries where K >> δ_ipc, IPC is "nearly embedded".
67
68use std::collections::HashMap;
69use std::path::{Path, PathBuf};
70use std::sync::Arc;
71use std::sync::atomic::{AtomicU64, Ordering};
72
73use dashmap::DashMap;
74use parking_lot::RwLock;
75
76use crate::durable_storage::{DurableStorage, TransactionMode};
77use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
78use crate::key_buffer::KeyBuffer;
79use crate::packed_row::{PackedColumnDef, PackedColumnType, PackedRow, PackedTableSchema};
80use sochdb_core::catalog::Catalog;
81use sochdb_core::{Result, SochDBError, SochValue};
82
83// Re-export key types
84pub use crate::durable_storage::RecoveryStats;
85use crate::durable_storage::StorageEncryption;
86
87/// Database configuration
88#[derive(Debug, Clone)]
89pub struct DatabaseConfig {
90    /// Enable group commit for better write throughput
91    pub group_commit: bool,
92    /// Maximum memory for memtables before flush (bytes)
93    pub memtable_size_limit: usize,
94    /// Enable WAL for durability
95    pub wal_enabled: bool,
96    /// Sync mode: fsync after every commit vs periodic
97    pub sync_mode: SyncMode,
98    /// Read-only mode
99    pub read_only: bool,
100
101    /// Enable ordered index for O(log N) prefix scans
102    ///
103    /// # Deprecation Notice
104    ///
105    /// **DEPRECATED since 0.2.0**: Use `default_index_policy` instead for per-table control.
106    /// This field will be removed in v0.3.0.
107    ///
108    /// ## Migration Guide
109    ///
110    /// Replace:
111    /// ```ignore
112    /// DatabaseConfig { enable_ordered_index: true, .. }  // Old API
113    /// DatabaseConfig { enable_ordered_index: false, .. } // Old API
114    /// ```
115    ///
116    /// With:
117    /// ```ignore
118    /// DatabaseConfig { default_index_policy: IndexPolicy::ScanOptimized, .. }  // Ordered index enabled
119    /// DatabaseConfig { default_index_policy: IndexPolicy::WriteOptimized, .. } // Ordered index disabled
120    /// ```
121    ///
122    /// ## Behavior
123    ///
124    /// When false, saves ~134 ns/op on writes (20% speedup)
125    /// but scan_prefix becomes O(N) instead of O(log N + K).
126    ///
127    /// Set to false for write-heavy workloads without range scans.
128    #[deprecated(
129        since = "0.2.0",
130        note = "Use `default_index_policy` field instead. This field will be removed in v0.3.0. \
131                Set IndexPolicy::ScanOptimized for ordered index, WriteOptimized to disable."
132    )]
133    ///
134    /// Set to false for write-heavy workloads without range scans.
135    pub enable_ordered_index: bool,
136    /// Group commit configuration
137    pub group_commit_config: GroupCommitSettings,
138    /// Default index policy for tables not explicitly configured
139    ///
140    /// This replaces the global `enable_ordered_index` toggle with
141    /// fine-grained per-table control. Use `index_registry` to configure
142    /// individual tables.
143    ///
144    /// | Policy         | Insert Cost | Scan Cost      | Use Case              |
145    /// |----------------|-------------|----------------|------------------------|
146    /// | WriteOptimized | O(1)        | O(N)           | High-write, rare scan  |
147    /// | Balanced       | O(1) amort  | O(output+logK) | Mixed OLTP            |
148    /// | ScanOptimized  | O(log N)    | O(logN + K)    | Analytics, range query |
149    /// | AppendOnly     | O(1)        | O(N)           | Time-series logs       |
150    pub default_index_policy: IndexPolicy,
151}
152
153/// Group commit settings - mirrors SQLite's WAL mode tuning
154///
155/// ## Performance Model
156///
157/// Without group commit: Throughput = 1 / L_fsync ≈ 200 commits/sec (L=5ms)
158/// With group commit (batch size K): Throughput = K / L_fsync = K × 200 commits/sec
159///
160/// For K=100: 20,000 commits/sec (100× speedup)
161///
162/// ## SQLite Comparison
163///
164/// | Setting                    | SQLite Equivalent           |
165/// |----------------------------|-----------------------------|
166/// | batch_size = 1             | PRAGMA synchronous = FULL   |
167/// | batch_size = 100           | WAL mode with batching      |
168/// | max_wait_us = 0            | No delay, immediate flush   |
169/// | max_wait_us = 10000        | Up to 10ms delay for batch  |
170#[derive(Debug, Clone)]
171pub struct GroupCommitSettings {
172    /// Minimum batch size before flush (default: 1)
173    pub min_batch_size: usize,
174    /// Maximum batch size (default: 1000)
175    pub max_batch_size: usize,
176    /// Maximum wait time before flush in microseconds (default: 10000 = 10ms)
177    pub max_wait_us: u64,
178    /// Expected fsync latency in microseconds (for adaptive sizing)
179    pub fsync_latency_us: u64,
180}
181
182impl Default for GroupCommitSettings {
183    fn default() -> Self {
184        Self {
185            min_batch_size: 1,
186            max_batch_size: 1000,
187            max_wait_us: 10_000,     // 10ms
188            fsync_latency_us: 5_000, // 5ms
189        }
190    }
191}
192
193impl GroupCommitSettings {
194    /// High throughput preset - maximizes batching
195    pub fn high_throughput() -> Self {
196        Self {
197            min_batch_size: 50,
198            max_batch_size: 5000,
199            max_wait_us: 50_000, // 50ms
200            fsync_latency_us: 5_000,
201        }
202    }
203
204    /// Low latency preset - minimal batching
205    pub fn low_latency() -> Self {
206        Self {
207            min_batch_size: 1,
208            max_batch_size: 10,
209            max_wait_us: 1_000, // 1ms
210            fsync_latency_us: 5_000,
211        }
212    }
213
214    /// Calculate optimal batch size using Little's Law
215    ///
216    /// N* = sqrt(2 × L_fsync × λ / C_wait)
217    ///
218    /// # Arguments
219    /// * `arrival_rate` - Operations per second
220    /// * `wait_cost` - Cost coefficient for waiting (0.0-1.0)
221    pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
222        let l_fsync = self.fsync_latency_us as f64 / 1_000_000.0;
223        let n_star = (2.0 * l_fsync * arrival_rate / wait_cost.max(0.001)).sqrt();
224        (n_star as usize).clamp(self.min_batch_size, self.max_batch_size)
225    }
226}
227
228impl Default for DatabaseConfig {
229    #[allow(deprecated)]
230    fn default() -> Self {
231        Self {
232            group_commit: true,
233            memtable_size_limit: 64 * 1024 * 1024, // 64MB
234            wal_enabled: true,
235            sync_mode: SyncMode::Normal,
236            read_only: false,
237            enable_ordered_index: true, // Default: enabled for compatibility
238            group_commit_config: GroupCommitSettings::default(),
239            default_index_policy: IndexPolicy::Balanced, // New default: balanced OLTP policy
240        }
241    }
242}
243
244impl DatabaseConfig {
245    /// Create config optimized for throughput (Fast Mode)
246    ///
247    /// - Disables ordered index (saves ~134 ns/op)
248    /// - Uses high-throughput group commit settings
249    /// - Suitable for append-only workloads
250    #[allow(deprecated)]
251    pub fn throughput_optimized() -> Self {
252        Self {
253            group_commit: true,
254            memtable_size_limit: 128 * 1024 * 1024, // 128MB
255            wal_enabled: true,
256            sync_mode: SyncMode::Normal,
257            read_only: false,
258            enable_ordered_index: false,
259            group_commit_config: GroupCommitSettings::high_throughput(),
260            default_index_policy: IndexPolicy::WriteOptimized, // No ordered index overhead
261        }
262    }
263
264    /// Create config optimized for latency
265    ///
266    /// - Keeps ordered index for fast range scans
267    /// - Uses low-latency group commit settings
268    /// - Suitable for OLTP workloads
269    #[allow(deprecated)]
270    pub fn latency_optimized() -> Self {
271        Self {
272            group_commit: true,
273            memtable_size_limit: 32 * 1024 * 1024, // 32MB
274            wal_enabled: true,
275            sync_mode: SyncMode::Full,
276            read_only: false,
277            enable_ordered_index: true,
278            group_commit_config: GroupCommitSettings::low_latency(),
279            default_index_policy: IndexPolicy::ScanOptimized, // Fast range scans
280        }
281    }
282
283    /// Create config matching SQLite defaults
284    #[allow(deprecated)]
285    pub fn sqlite_compatible() -> Self {
286        Self {
287            group_commit: false, // SQLite default is single-commit
288            memtable_size_limit: 64 * 1024 * 1024,
289            wal_enabled: true,
290            sync_mode: SyncMode::Normal, // PRAGMA synchronous = NORMAL
291            read_only: false,
292            enable_ordered_index: true,
293            group_commit_config: GroupCommitSettings::default(),
294            default_index_policy: IndexPolicy::Balanced, // Good default for mixed workloads
295        }
296    }
297
298    /// Get effective ordered index setting, derived from `default_index_policy`.
299    ///
300    /// This is the shim method for the deprecated `enable_ordered_index` field.
301    /// It returns `true` if the policy requires an ordered index (ScanOptimized),
302    /// and `false` otherwise (WriteOptimized, Balanced, AppendOnly).
303    ///
304    /// # Policy Mapping
305    ///
306    /// | IndexPolicy      | Returns |
307    /// |------------------|---------|
308    /// | ScanOptimized    | true    |
309    /// | Balanced         | false   |
310    /// | WriteOptimized   | false   |
311    /// | AppendOnly       | false   |
312    ///
313    /// Note: `Balanced` uses lazy compaction rather than a live ordered index,
314    /// so it returns `false` for the low-level memtable config but still supports
315    /// efficient range scans via sorted runs.
316    pub fn effective_ordered_index(&self) -> bool {
317        matches!(self.default_index_policy, IndexPolicy::ScanOptimized)
318    }
319}
320
321/// WAL sync mode - matches SQLite's PRAGMA synchronous semantics
322///
323/// | SochDB     | SQLite       | Description                                    |
324/// |------------|--------------|------------------------------------------------|
325/// | Off        | OFF (0)      | No fsync, risk of data loss on crash           |
326/// | Normal     | NORMAL (1)   | Fsync at checkpoints, not every commit         |
327/// | Full       | FULL (2)     | Fsync every commit (safest, slowest)           |
328///
329/// # Performance vs Durability Trade-offs
330///
331/// - **Off**: ~10x faster than Full, but may lose last ~100ms of data on crash
332/// - **Normal**: ~5x faster than Full, durable at checkpoint boundaries
333/// - **Full**: Every commit is fsync'd, no data loss possible
334///
335/// # SQLite Compatibility
336///
337/// ```sql
338/// -- SQLite equivalent settings
339/// PRAGMA synchronous = OFF;    -- SyncMode::Off
340/// PRAGMA synchronous = NORMAL; -- SyncMode::Normal  
341/// PRAGMA synchronous = FULL;   -- SyncMode::Full
342/// ```
343#[derive(Debug, Clone, Copy, PartialEq, Eq)]
344pub enum SyncMode {
345    /// No fsync (equivalent to SQLite PRAGMA synchronous = OFF)
346    ///
347    /// Writes buffered in OS, may lose data on power failure.
348    /// Use for non-critical data or bulk loading.
349    Off = 0,
350
351    /// Fsync at checkpoints (equivalent to SQLite PRAGMA synchronous = NORMAL)
352    ///
353    /// Default mode. Syncs WAL at checkpoint boundaries.
354    /// Good balance of performance and durability.
355    Normal = 1,
356
357    /// Fsync every commit (equivalent to SQLite PRAGMA synchronous = FULL)
358    ///
359    /// Safest mode. Every commit is immediately durable.
360    /// Required for financial/critical data.
361    Full = 2,
362}
363
364impl SyncMode {
365    /// Convert from SQLite synchronous pragma value
366    pub fn from_sqlite_pragma(value: u32) -> Self {
367        match value {
368            0 => SyncMode::Off,
369            1 => SyncMode::Normal,
370            _ => SyncMode::Full, // 2+ treated as Full
371        }
372    }
373
374    /// Convert to SQLite synchronous pragma value
375    pub fn to_sqlite_pragma(self) -> u32 {
376        self as u32
377    }
378
379    /// Parse from string (case-insensitive)
380    pub fn parse(s: &str) -> Option<Self> {
381        match s.to_ascii_uppercase().as_str() {
382            "OFF" | "0" => Some(SyncMode::Off),
383            "NORMAL" | "1" => Some(SyncMode::Normal),
384            "FULL" | "2" => Some(SyncMode::Full),
385            _ => None,
386        }
387    }
388}
389
390/// Table schema for the kernel
391#[derive(Debug, Clone)]
392pub struct TableSchema {
393    pub name: String,
394    pub columns: Vec<ColumnDef>,
395}
396
397/// Column definition
398#[derive(Debug, Clone)]
399pub struct ColumnDef {
400    pub name: String,
401    pub col_type: ColumnType,
402    pub nullable: bool,
403}
404
405/// Column types
406#[derive(Debug, Clone, Copy, PartialEq, Eq)]
407pub enum ColumnType {
408    Int64,
409    UInt64,
410    Float64,
411    Text,
412    Binary,
413    Bool,
414}
415
416/// Transaction handle for kernel operations
417#[derive(Debug, Clone, Copy)]
418pub struct TxnHandle {
419    pub txn_id: u64,
420    pub snapshot_ts: u64,
421}
422
423/// Query result from the kernel
424#[derive(Debug, Clone)]
425pub struct QueryResult {
426    /// Column names
427    pub columns: Vec<String>,
428    /// Row data (each row is a map of column -> value)
429    pub rows: Vec<HashMap<String, SochValue>>,
430    /// Number of rows scanned (for stats)
431    pub rows_scanned: usize,
432    /// Bytes read from storage
433    pub bytes_read: usize,
434}
435
436impl QueryResult {
437    /// Empty result
438    pub fn empty() -> Self {
439        Self {
440            columns: vec![],
441            rows: vec![],
442            rows_scanned: 0,
443            bytes_read: 0,
444        }
445    }
446
447    /// Convert to TOON format for token efficiency
448    pub fn to_toon(&self) -> String {
449        if self.rows.is_empty() {
450            return "[]".to_string();
451        }
452
453        // TOON format: table[N]{cols}: row1; row2; ...
454        let n = self.rows.len();
455        let cols = self.columns.join(",");
456
457        let rows_str: Vec<String> = self
458            .rows
459            .iter()
460            .map(|row| {
461                self.columns
462                    .iter()
463                    .map(|c| {
464                        row.get(c)
465                            .map(format_soch_value)
466                            .unwrap_or_else(|| "∅".to_string())
467                    })
468                    .collect::<Vec<_>>()
469                    .join(",")
470            })
471            .collect();
472
473        format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
474    }
475}
476
477fn format_soch_value(v: &SochValue) -> String {
478    match v {
479        SochValue::Null => "∅".to_string(),
480        SochValue::Int(i) => i.to_string(),
481        SochValue::UInt(u) => u.to_string(),
482        SochValue::Float(f) => format!("{:.6}", f),
483        SochValue::Text(s) => {
484            if s.contains(',') || s.contains(';') {
485                format!("\"{}\"", s.replace('"', "\\\""))
486            } else {
487                s.clone()
488            }
489        }
490        SochValue::Bool(b) => if *b { "T" } else { "F" }.to_string(),
491        SochValue::Binary(b) => format!("b64:{}", base64_encode(b)),
492        _ => format!("{:?}", v),
493    }
494}
495
496fn base64_encode(data: &[u8]) -> String {
497    // Simple base64 encoding
498    const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
499    let mut result = String::new();
500
501    for chunk in data.chunks(3) {
502        let b0 = chunk[0] as usize;
503        let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
504        let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
505
506        result.push(ALPHABET[b0 >> 2] as char);
507        result.push(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
508
509        if chunk.len() > 1 {
510            result.push(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
511        } else {
512            result.push('=');
513        }
514
515        if chunk.len() > 2 {
516            result.push(ALPHABET[b2 & 0x3f] as char);
517        } else {
518            result.push('=');
519        }
520    }
521
522    result
523}
524
525// ============================================================================
526// Columnar Query Results - SIMD-friendly result format
527// ============================================================================
528
529use sochdb_core::TypedColumn as CoreTypedColumn;
530
531/// Columnar query result - SIMD-friendly format for analytics
532///
533/// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, this returns
534/// column-oriented `Vec<TypedColumn>` for efficient vectorized operations.
535///
536/// ## Memory Layout
537///
538/// Row-oriented (standard):
539/// ```text
540/// Row 0: [id=1, name="Alice", score=85]
541/// Row 1: [id=2, name="Bob", score=92]
542/// Row 2: [id=3, name="Carol", score=78]
543/// ```
544///
545/// Column-oriented (this format):
546/// ```text
547/// id:    [1, 2, 3]           ← contiguous i64 array (SIMD-friendly)
548/// name:  ["Alice", "Bob", "Carol"] ← Arrow-style string encoding
549/// score: [85, 92, 78]        ← contiguous i64 array
550/// ```
551///
552/// ## Performance Benefits
553///
554/// - SIMD: Column sums use vectorized instructions (~8× faster)
555/// - Cache: Sequential access pattern maximizes L1/L2 cache hits
556/// - Compression: Same-type data compresses better (5-10× typical)
557/// - Filtering: Bitmap operations instead of row iteration
558///
559/// ## Usage
560///
561/// ```ignore
562/// let result = db.query(txn, "users")
563///     .columns(&["id", "score"])
564///     .as_columnar()?;
565///
566/// // SIMD sum
567/// let total_score = result.column("score")
568///     .map(|c| c.sum_i64())
569///     .unwrap_or(0);
570///
571/// // Stats
572/// println!("Rows: {}, Memory: {} bytes", result.row_count(), result.memory_size());
573/// ```
574#[derive(Debug, Clone)]
575pub struct ColumnarQueryResult {
576    /// Column names in order
577    pub columns: Vec<String>,
578    /// Column data - each TypedColumn contains all values for one column
579    pub data: Vec<CoreTypedColumn>,
580    /// Number of rows
581    pub row_count: usize,
582    /// Bytes read from storage
583    pub bytes_read: usize,
584}
585
586impl ColumnarQueryResult {
587    /// Create an empty result
588    pub fn empty() -> Self {
589        Self {
590            columns: vec![],
591            data: vec![],
592            row_count: 0,
593            bytes_read: 0,
594        }
595    }
596
597    /// Get column by name
598    pub fn column(&self, name: &str) -> Option<&CoreTypedColumn> {
599        self.columns
600            .iter()
601            .position(|c| c == name)
602            .and_then(|idx| self.data.get(idx))
603    }
604
605    /// Get column index by name
606    pub fn column_index(&self, name: &str) -> Option<usize> {
607        self.columns.iter().position(|c| c == name)
608    }
609
610    /// Number of rows
611    pub fn row_count(&self) -> usize {
612        self.row_count
613    }
614
615    /// Number of columns
616    pub fn column_count(&self) -> usize {
617        self.columns.len()
618    }
619
620    /// Total memory size in bytes
621    pub fn memory_size(&self) -> usize {
622        self.data.iter().map(|c| c.memory_size()).sum()
623    }
624
625    /// Sum of an i64 column (SIMD-optimized)
626    pub fn sum_i64(&self, column: &str) -> Option<i64> {
627        self.column(column).map(|c| c.sum_i64())
628    }
629
630    /// Sum of an f64 column (SIMD-optimized)
631    pub fn sum_f64(&self, column: &str) -> Option<f64> {
632        self.column(column).map(|c| c.sum_f64())
633    }
634
635    /// Zero-allocation row access by index.
636    ///
637    /// Returns a lightweight view that resolves column values on demand
638    /// from the underlying columnar arrays — no `HashMap` per row.
639    ///
640    /// ```ignore
641    /// let result = query.as_columnar()?;
642    /// for i in 0..result.row_count() {
643    ///     let row = result.row_view(i).unwrap();
644    ///     let name = row.get("name"); // SochValue::Text(...)
645    /// }
646    /// ```
647    #[inline]
648    pub fn row_view(&self, index: usize) -> Option<ColumnarRowView<'_>> {
649        if index < self.row_count {
650            Some(ColumnarRowView {
651                result: self,
652                index,
653            })
654        } else {
655            None
656        }
657    }
658
659    /// Convert to row-oriented `QueryResult` for backward compatibility.
660    ///
661    /// This materialises one `HashMap<String, SochValue>` per row, so prefer
662    /// using `row_view()` or direct columnar access when performance matters.
663    pub fn into_query_result(self) -> QueryResult {
664        let rows: Vec<HashMap<String, SochValue>> = (0..self.row_count)
665            .map(|i| {
666                self.columns
667                    .iter()
668                    .zip(self.data.iter())
669                    .map(|(name, col)| (name.clone(), col.value_at(i)))
670                    .collect()
671            })
672            .collect();
673
674        QueryResult {
675            columns: self.columns,
676            rows,
677            rows_scanned: self.row_count,
678            bytes_read: self.bytes_read,
679        }
680    }
681
682    /// Get column statistics (min, max, null count)
683    pub fn column_stats(&self, column: &str) -> Option<&sochdb_core::columnar::ColumnStats> {
684        self.column(column).map(|c| c.stats())
685    }
686
687    /// Convert to TOON format (token-efficient)
688    pub fn to_toon(&self) -> String {
689        if self.row_count == 0 {
690            return "[]".to_string();
691        }
692
693        let n = self.row_count;
694        let cols = self.columns.join(",");
695
696        // Build rows from columns
697        let mut rows_str = Vec::with_capacity(n);
698        for i in 0..n {
699            let row: Vec<String> = self
700                .data
701                .iter()
702                .map(|col| format_columnar_value(col, i))
703                .collect();
704            rows_str.push(row.join(","));
705        }
706
707        format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
708    }
709}
710
711/// Format a single value from a TypedColumn at index
712fn format_columnar_value(col: &CoreTypedColumn, idx: usize) -> String {
713    match col {
714        CoreTypedColumn::Int64 {
715            values, validity, ..
716        } => {
717            if validity.is_valid(idx) && idx < values.len() {
718                values[idx].to_string()
719            } else {
720                "∅".to_string()
721            }
722        }
723        CoreTypedColumn::UInt64 {
724            values, validity, ..
725        } => {
726            if validity.is_valid(idx) && idx < values.len() {
727                values[idx].to_string()
728            } else {
729                "∅".to_string()
730            }
731        }
732        CoreTypedColumn::Float64 {
733            values, validity, ..
734        } => {
735            if validity.is_valid(idx) && idx < values.len() {
736                format!("{:.6}", values[idx])
737            } else {
738                "∅".to_string()
739            }
740        }
741        CoreTypedColumn::Text {
742            offsets,
743            data,
744            validity,
745            ..
746        } => {
747            if validity.is_valid(idx) && idx + 1 < offsets.len() {
748                let start = offsets[idx] as usize;
749                let end = offsets[idx + 1] as usize;
750                std::str::from_utf8(&data[start..end])
751                    .map(|s| {
752                        if s.contains(',') || s.contains(';') {
753                            format!("\"{}\"", s.replace('"', "\\\""))
754                        } else {
755                            s.to_string()
756                        }
757                    })
758                    .unwrap_or_else(|_| "∅".to_string())
759            } else {
760                "∅".to_string()
761            }
762        }
763        CoreTypedColumn::Binary {
764            offsets,
765            data,
766            validity,
767            ..
768        } => {
769            if validity.is_valid(idx) && idx + 1 < offsets.len() {
770                let start = offsets[idx] as usize;
771                let end = offsets[idx + 1] as usize;
772                format!("b64:{}", base64_encode(&data[start..end]))
773            } else {
774                "∅".to_string()
775            }
776        }
777        CoreTypedColumn::Bool {
778            values,
779            validity,
780            len,
781            ..
782        } => {
783            if validity.is_valid(idx) && idx < *len {
784                let word = idx / 64;
785                let bit = idx % 64;
786                if (values[word] >> bit) & 1 == 1 {
787                    "T"
788                } else {
789                    "F"
790                }
791                .to_string()
792            } else {
793                "∅".to_string()
794            }
795        }
796    }
797}
798
799/// Zero-allocation row view into a `ColumnarQueryResult`.
800///
801/// Provides named-column access (like `HashMap<String, SochValue>`)
802/// without allocating a HashMap per row. Values are read directly
803/// from the underlying typed column arrays.
804///
805/// **Cost per access:** O(1) column index lookup + O(1) array read.
806/// **Allocation:** zero (borrows from `ColumnarQueryResult`).
807#[derive(Debug)]
808pub struct ColumnarRowView<'a> {
809    result: &'a ColumnarQueryResult,
810    index: usize,
811}
812
813impl<'a> ColumnarRowView<'a> {
814    /// Get a value by column name without allocation.
815    ///
816    /// Returns `None` if the column does not exist.
817    /// Returns `Some(SochValue::Null)` if the column exists but the value is NULL.
818    #[inline]
819    pub fn get(&self, column: &str) -> Option<SochValue> {
820        self.result
821            .column_index(column)
822            .map(|ci| self.result.data[ci].value_at(self.index))
823    }
824
825    /// Get all column values as a `Vec<SochValue>` (positional, no HashMap).
826    pub fn values(&self) -> Vec<SochValue> {
827        self.result
828            .data
829            .iter()
830            .map(|col| col.value_at(self.index))
831            .collect()
832    }
833
834    /// Row index within the result set.
835    #[inline]
836    pub fn index(&self) -> usize {
837        self.index
838    }
839
840    /// Materialise this row into a `HashMap<String, SochValue>` for backward
841    /// compatibility.  Prefer `get()` for single column access.
842    pub fn to_map(&self) -> HashMap<String, SochValue> {
843        self.result
844            .columns
845            .iter()
846            .zip(self.result.data.iter())
847            .map(|(name, col)| (name.clone(), col.value_at(self.index)))
848            .collect()
849    }
850}
851
852/// Vector search result
853#[derive(Debug, Clone)]
854pub struct VectorSearchResult {
855    pub id: u64,
856    pub distance: f32,
857    pub metadata: Option<HashMap<String, SochValue>>,
858}
859
860/// The SochDB Database Kernel
861///
862/// This is the shared core used by both embedded (`SochConnection`) and
863/// server (`sochdb-server`) modes. It owns all storage, catalog, and
864/// indexing components.
865///
866/// # Thread Safety
867///
868/// The Database is fully thread-safe via internal synchronization:
869/// - Multiple readers can operate concurrently (MVCC snapshots)
870/// - Writers coordinate through WAL and group commit
871/// - All state is behind Arc/RwLock for shared access
872///
873/// # Concurrency Modes
874///
875/// ## Standard Mode (Single Process)
876/// - Uses exclusive file lock (`flock(LOCK_EX)`)
877/// - Best for: Scripts, notebooks, CLI tools
878/// - Open with: `Database::open(path)`
879///
880/// ## Concurrent Mode (Multi-Process/Web Apps)
881/// - Uses lock-free MVCC for reads, single-writer coordination for writes
882/// - Best for: Web servers, Flask/FastAPI apps, hot reloading
883/// - Open with: `Database::open_concurrent(path)`
884///
885/// # Example
886///
887/// ```ignore
888/// // Standard mode (single process)
889/// let db = Database::open("./my_data")?;
890///
891/// // Concurrent mode (multi-reader, single-writer)
892/// let db = Database::open_concurrent("./my_data")?;
893///
894/// // Begin a transaction
895/// let txn = db.begin_transaction()?;
896///
897/// // Write data
898/// db.put(txn, b"user:1:name", b"Alice")?;
899///
900/// // Commit
901/// db.commit(txn)?;
902/// ```
903#[allow(dead_code)]
904pub struct Database {
905    /// Path to database directory
906    path: PathBuf,
907    /// Durable storage layer (WAL + MVCC + memtable)
908    storage: Arc<DurableStorage>,
909    /// Concurrent MVCC manager (for concurrent mode)
910    concurrent_mvcc: Option<Arc<crate::mvcc_concurrent::ConcurrentMvcc>>,
911    /// Schema catalog
912    catalog: Arc<RwLock<Catalog>>,
913    /// Registered table schemas (name -> schema) - lock-free for reads
914    tables: DashMap<String, TableSchema>,
915    /// Cached packed schemas for fast insert (name -> packed schema)
916    packed_schemas: DashMap<String, PackedTableSchema>,
917    /// Per-table index policy registry
918    index_registry: Arc<TableIndexRegistry>,
919    /// Configuration
920    config: DatabaseConfig,
921    /// Statistics
922    stats: DatabaseStats,
923    /// Shutdown flag
924    shutdown: AtomicU64,
925    /// Whether this database is in concurrent mode
926    is_concurrent: bool,
927    /// CDC (Change Data Capture) log for streaming mutations
928    cdc_log: Option<Arc<crate::cdc::CdcLog>>,
929}
930
931/// Database statistics
932struct DatabaseStats {
933    transactions_started: AtomicU64,
934    transactions_committed: AtomicU64,
935    transactions_aborted: AtomicU64,
936    queries_executed: AtomicU64,
937    bytes_written: AtomicU64,
938    bytes_read: AtomicU64,
939}
940
941impl DatabaseStats {
942    fn new() -> Self {
943        Self {
944            transactions_started: AtomicU64::new(0),
945            transactions_committed: AtomicU64::new(0),
946            transactions_aborted: AtomicU64::new(0),
947            queries_executed: AtomicU64::new(0),
948            bytes_written: AtomicU64::new(0),
949            bytes_read: AtomicU64::new(0),
950        }
951    }
952}
953
954/// Public statistics snapshot
955#[derive(Debug, Clone)]
956pub struct Stats {
957    pub transactions_started: u64,
958    pub transactions_committed: u64,
959    pub transactions_aborted: u64,
960    pub queries_executed: u64,
961    pub bytes_written: u64,
962    pub bytes_read: u64,
963}
964
965impl Database {
966    /// Open or create a database at the given path.
967    ///
968    /// This is the primary entry point, similar to `sqlite3_open()`.
969    /// If the database exists, it will be opened and WAL recovery performed.
970    /// If it doesn't exist, a new database will be created.
971    ///
972    /// # Arguments
973    ///
974    /// * `path` - Directory path for the database files
975    ///
976    /// # Returns
977    ///
978    /// An `Arc<Database>` that can be shared across threads and connections.
979    pub fn open<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
980        Self::open_with_config(path, DatabaseConfig::default())
981    }
982
983    /// Open without locking (for testing crash recovery scenarios)
984    ///
985    /// # Safety
986    /// This should ONLY be used in tests that simulate crashes by forgetting
987    /// the storage instance. In production, always use `open()`.
988    #[cfg(test)]
989    pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
990        let path = path.as_ref().to_path_buf();
991        let config = DatabaseConfig::default();
992
993        let storage = Arc::new(DurableStorage::open_without_lock(&path)?);
994
995        let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
996            config.default_index_policy,
997        ));
998
999        let db = Arc::new(Self {
1000            path: path.clone(),
1001            storage,
1002            concurrent_mvcc: None,
1003            catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1004            tables: DashMap::new(),
1005            packed_schemas: DashMap::new(),
1006            index_registry,
1007            config,
1008            stats: DatabaseStats::new(),
1009            shutdown: AtomicU64::new(0),
1010            is_concurrent: false,
1011            cdc_log: None,
1012        });
1013
1014        db.recover()?;
1015        Ok(db)
1016    }
1017
1018    /// Open with custom configuration
1019    pub fn open_with_config<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> Result<Arc<Self>> {
1020        Self::open_with_config_and_encryption(path, config, StorageEncryption::disabled())
1021    }
1022
1023    /// Open with custom configuration AND at-rest encryption.
1024    ///
1025    /// Threads the supplied [`StorageEncryption`] (the KEK envelope) down to the
1026    /// keyring + WAL. `StorageEncryption::disabled()` is exactly plaintext
1027    /// [`Self::open_with_config`]. With a KEK the database is opened (or created
1028    /// on first open) encrypted; a wrong/missing key for an already-encrypted DB
1029    /// fails closed. `StorageEncryption` is move-only (zeroizing), so it is a
1030    /// by-value parameter rather than a `DatabaseConfig` field.
1031    pub fn open_with_config_and_encryption<P: AsRef<Path>>(
1032        path: P,
1033        config: DatabaseConfig,
1034        encryption: StorageEncryption,
1035    ) -> Result<Arc<Self>> {
1036        let path = path.as_ref().to_path_buf();
1037
1038        // Use IndexPolicy-based storage configuration for automatic memtable selection
1039        // This derives ordered index and memtable type from the policy
1040        let storage = Arc::new(DurableStorage::open_with_policy_encrypted(
1041            &path,
1042            config.default_index_policy,
1043            config.group_commit,
1044            encryption,
1045        )?);
1046
1047        // Propagate sync_mode from config to storage engine.
1048        // Without this, DurableStorage defaults to SyncMode::Normal (adaptive fsync).
1049        storage.set_sync_mode(config.sync_mode as u64);
1050
1051        // Create index registry with default policy from config
1052        let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
1053            config.default_index_policy,
1054        ));
1055
1056        let db = Arc::new(Self {
1057            path: path.clone(),
1058            storage,
1059            concurrent_mvcc: None,
1060            catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1061            tables: DashMap::new(),
1062            packed_schemas: DashMap::new(),
1063            index_registry,
1064            config,
1065            stats: DatabaseStats::new(),
1066            shutdown: AtomicU64::new(0),
1067            is_concurrent: false,
1068            cdc_log: None,
1069        });
1070
1071        // Perform crash recovery if needed
1072        db.recover()?;
1073
1074        Ok(db)
1075    }
1076
1077    /// Open database in concurrent mode (multi-reader, single-writer)
1078    ///
1079    /// This mode allows multiple processes to access the database simultaneously:
1080    /// - **Readers**: Lock-free, concurrent access via MVCC snapshots
1081    /// - **Writers**: Single-writer coordination through atomic locks
1082    ///
1083    /// # Use Cases
1084    ///
1085    /// - Web applications (Flask, FastAPI, Django)
1086    /// - Hot reloading development servers
1087    /// - Multi-process worker pools
1088    /// - Any scenario with concurrent read access
1089    ///
1090    /// # Performance
1091    ///
1092    /// - Read latency: ~100ns (lock-free atomic operations)
1093    /// - Write latency: ~60μs amortized (with group commit)
1094    /// - Concurrent readers: Up to 1024 (configurable)
1095    ///
1096    /// # Example
1097    ///
1098    /// ```ignore
1099    /// // Multiple processes can open the same database
1100    /// let db = Database::open_concurrent("./my_data")?;
1101    ///
1102    /// // Reads are lock-free
1103    /// let value = db.get(b"key")?;
1104    ///
1105    /// // Writes coordinate automatically
1106    /// let txn = db.begin_transaction()?;
1107    /// db.put(txn, b"key", b"value")?;
1108    /// db.commit(txn)?;
1109    /// ```
1110    pub fn open_concurrent<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
1111        Self::open_concurrent_with_config(path, DatabaseConfig::default())
1112    }
1113
1114    /// Open database in concurrent mode with custom configuration
1115    pub fn open_concurrent_with_config<P: AsRef<Path>>(
1116        path: P,
1117        config: DatabaseConfig,
1118    ) -> Result<Arc<Self>> {
1119        Self::open_concurrent_with_config_and_encryption(
1120            path,
1121            config,
1122            StorageEncryption::disabled(),
1123        )
1124    }
1125
1126    /// Open in concurrent (multi-reader) mode with at-rest encryption.
1127    ///
1128    /// The encryption-aware sibling of [`Self::open_concurrent_with_config`] —
1129    /// makes the multi-process path able to open an encrypted database instead of
1130    /// failing closed for lack of a key channel.
1131    pub fn open_concurrent_with_config_and_encryption<P: AsRef<Path>>(
1132        path: P,
1133        config: DatabaseConfig,
1134        encryption: StorageEncryption,
1135    ) -> Result<Arc<Self>> {
1136        use crate::mvcc_concurrent::ConcurrentMvcc;
1137
1138        let path = path.as_ref().to_path_buf();
1139        std::fs::create_dir_all(&path)?;
1140
1141        // Open concurrent MVCC manager (this uses shared memory, not exclusive lock)
1142        let concurrent_mvcc = Arc::new(ConcurrentMvcc::open(&path)?);
1143
1144        // Open storage WITHOUT exclusive lock (concurrent MVCC handles coordination)
1145        // We use a special internal method that skips the file lock
1146        let storage = Arc::new(DurableStorage::open_for_concurrent_encrypted(
1147            &path,
1148            config.default_index_policy,
1149            encryption,
1150        )?);
1151
1152        // Propagate sync_mode from config to storage engine
1153        storage.set_sync_mode(config.sync_mode as u64);
1154
1155        // Create index registry with default policy from config
1156        let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
1157            config.default_index_policy,
1158        ));
1159
1160        let db = Arc::new(Self {
1161            path: path.clone(),
1162            storage,
1163            concurrent_mvcc: Some(concurrent_mvcc),
1164            catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1165            tables: DashMap::new(),
1166            packed_schemas: DashMap::new(),
1167            index_registry,
1168            config,
1169            stats: DatabaseStats::new(),
1170            shutdown: AtomicU64::new(0),
1171            is_concurrent: true,
1172            cdc_log: None,
1173        });
1174
1175        // Perform crash recovery if needed
1176        db.recover()?;
1177
1178        // Clean up any stale readers from crashed processes
1179        if let Some(ref mvcc) = db.concurrent_mvcc {
1180            mvcc.cleanup_stale_readers();
1181        }
1182
1183        Ok(db)
1184    }
1185
1186    /// Check if database is in concurrent mode
1187    #[inline]
1188    pub fn is_concurrent(&self) -> bool {
1189        self.is_concurrent
1190    }
1191
1192    /// Perform crash recovery
1193    fn recover(&self) -> Result<RecoveryStats> {
1194        self.storage.recover()
1195    }
1196
1197    /// Get database path
1198    pub fn path(&self) -> &Path {
1199        &self.path
1200    }
1201
1202    // =========================================================================
1203    // Transaction API
1204    // =========================================================================
1205
1206    /// Begin a new transaction
1207    pub fn begin_transaction(&self) -> Result<TxnHandle> {
1208        self.stats
1209            .transactions_started
1210            .fetch_add(1, Ordering::Relaxed);
1211        let txn_id = self.storage.begin_transaction()?;
1212
1213        // Get snapshot timestamp from MVCC
1214        // For now, use txn_id as a proxy (the real snapshot_ts is managed internally)
1215        Ok(TxnHandle {
1216            txn_id,
1217            snapshot_ts: txn_id,
1218        })
1219    }
1220
1221    /// Begin a read-only transaction (optimized: no SSI tracking)
1222    ///
1223    /// Read-only transactions skip SSI read tracking, reducing overhead
1224    /// from ~82ns to ~32ns per read (2.6x faster).
1225    ///
1226    /// Use this for:
1227    /// - SELECT queries that don't modify data
1228    /// - Analytics and reporting queries
1229    /// - Snapshot reads for backup
1230    pub fn begin_read_only(&self) -> Result<TxnHandle> {
1231        self.stats
1232            .transactions_started
1233            .fetch_add(1, Ordering::Relaxed);
1234        let txn_id = self.storage.begin_with_mode(TransactionMode::ReadOnly)?;
1235        Ok(TxnHandle {
1236            txn_id,
1237            snapshot_ts: txn_id,
1238        })
1239    }
1240
1241    /// Begin a lightweight read-only transaction (no WAL overhead).
1242    ///
1243    /// Eliminates WAL mutex acquisitions entirely for read operations.
1244    /// The txn_id is allocated atomically and MVCC snapshot state is created,
1245    /// but NO WAL records are written (no TxnBegin, no TxnAbort).
1246    ///
1247    /// ~5-10x faster per-operation than `begin_read_only()` because it avoids:
1248    /// - 2 WAL mutex lock/unlock cycles per transaction
1249    /// - 2 WAL BufWriter serializations per transaction
1250    ///
1251    /// Callers MUST use `abort_read_only_fast()` to clean up — NOT `commit()`
1252    /// or `abort()`.
1253    #[inline]
1254    pub fn begin_read_only_fast(&self) -> TxnHandle {
1255        let txn_id = self.storage.begin_read_only_fast();
1256        TxnHandle {
1257            txn_id,
1258            snapshot_ts: txn_id,
1259        }
1260    }
1261
1262    /// Abort a fast read-only transaction — O(1), no WAL, no memtable scan.
1263    #[inline]
1264    pub fn abort_read_only_fast(&self, txn: TxnHandle) {
1265        self.storage.abort_read_only_fast(txn.txn_id);
1266    }
1267
1268    /// Read a key WITHOUT any MVCC transaction tracking.
1269    ///
1270    /// Uses the current global timestamp to see all committed writes.
1271    /// Bypasses: begin/abort, active_txns DashMap, record_read, stats.
1272    /// Only safe for single-threaded access with no concurrent writers.
1273    #[inline]
1274    pub fn get_raw_read(&self, key: &[u8]) -> Option<Vec<u8>> {
1275        self.storage.read_latest(key)
1276    }
1277
1278    /// Scan by prefix WITHOUT any MVCC transaction tracking.
1279    ///
1280    /// Uses the current global timestamp. Only safe for single-threaded access.
1281    #[inline]
1282    pub fn scan_raw(&self, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
1283        self.storage.scan_latest(prefix)
1284    }
1285
1286    /// Begin a write-only transaction (optimized: no read tracking)
1287    ///
1288    /// Write-only transactions skip read tracking, improving insert
1289    /// throughput for bulk loading scenarios.
1290    ///
1291    /// Use this for:
1292    /// - Bulk data imports
1293    /// - Append-only logging tables
1294    /// - ETL pipelines
1295    pub fn begin_write_only(&self) -> Result<TxnHandle> {
1296        self.stats
1297            .transactions_started
1298            .fetch_add(1, Ordering::Relaxed);
1299        let txn_id = self.storage.begin_with_mode(TransactionMode::WriteOnly)?;
1300        Ok(TxnHandle {
1301            txn_id,
1302            snapshot_ts: txn_id,
1303        })
1304    }
1305
1306    /// Commit a transaction
1307    ///
1308    /// In concurrent mode, acquires the shared writer lock to ensure
1309    /// WAL writes are serialized across processes, and forces a flush+sync
1310    /// so that subsequent processes see the committed data.
1311    pub fn commit(&self, txn: TxnHandle) -> Result<u64> {
1312        self.stats
1313            .transactions_committed
1314            .fetch_add(1, Ordering::Relaxed);
1315
1316        // In concurrent mode, acquire the cross-process writer lock
1317        // to serialize WAL commits across processes
1318        let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1319            Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1320        } else {
1321            None
1322        };
1323
1324        let commit_ts = self.storage.commit(txn.txn_id)?;
1325
1326        // In concurrent mode, force flush+sync so other processes can see
1327        // the committed data when they open the DB or run recovery.
1328        // Without this, the BufWriter may hold data that isn't visible
1329        // to other processes reading the WAL file.
1330        if self.is_concurrent {
1331            self.storage.flush_wal()?;
1332            self.storage.fsync()?;
1333        }
1334
1335        // Notify concurrent MVCC of commit for GC tracking
1336        if let Some(ref mvcc) = self.concurrent_mvcc {
1337            mvcc.on_commit();
1338        }
1339
1340        Ok(commit_ts)
1341    }
1342
1343    /// Abort a transaction
1344    pub fn abort(&self, txn: TxnHandle) -> Result<()> {
1345        self.stats
1346            .transactions_aborted
1347            .fetch_add(1, Ordering::Relaxed);
1348        self.storage.abort(txn.txn_id)
1349    }
1350
1351    // =========================================================================
1352    // Per-Table Index Policy API
1353    // =========================================================================
1354
1355    /// Configure index policy for a table
1356    ///
1357    /// This allows fine-grained control over write/scan trade-offs per table:
1358    ///
1359    /// | Policy         | Insert Cost | Scan Cost      | Use Case              |
1360    /// |----------------|-------------|----------------|------------------------|
1361    /// | WriteOptimized | O(1)        | O(N)           | High-write, rare scan  |
1362    /// | Balanced       | O(1) amort  | O(output+logK) | Mixed OLTP            |
1363    /// | ScanOptimized  | O(log N)    | O(logN + K)    | Analytics, range query |
1364    /// | AppendOnly     | O(1)        | O(N)           | Time-series logs       |
1365    ///
1366    /// # Example
1367    ///
1368    /// ```ignore
1369    /// // Fast inserts for logs table (no ordered index overhead)
1370    /// db.set_table_index_policy("logs", IndexPolicy::WriteOptimized);
1371    ///
1372    /// // Efficient range scans for analytics table
1373    /// db.set_table_index_policy("analytics", IndexPolicy::ScanOptimized);
1374    ///
1375    /// // Balanced for OLTP tables
1376    /// db.set_table_index_policy("users", IndexPolicy::Balanced);
1377    /// ```
1378    pub fn set_table_index_policy(&self, table: &str, policy: IndexPolicy) {
1379        self.index_registry
1380            .configure_table(TableIndexConfig::new(table, policy));
1381    }
1382
1383    /// Get the index policy for a table
1384    pub fn get_table_index_policy(&self, table: &str) -> IndexPolicy {
1385        self.index_registry.get_policy(table)
1386    }
1387
1388    /// Get the index registry for advanced configuration
1389    pub fn index_registry(&self) -> &Arc<TableIndexRegistry> {
1390        &self.index_registry
1391    }
1392
1393    // =========================================================================
1394    // Key-Value API (Low-level)
1395    // =========================================================================
1396
1397    /// Put a key-value pair
1398    ///
1399    /// In concurrent mode, acquires the shared writer lock to ensure
1400    /// WAL writes are serialized across processes.
1401    pub fn put(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1402        self.stats
1403            .bytes_written
1404            .fetch_add((key.len() + value.len()) as u64, Ordering::Relaxed);
1405
1406        // In concurrent mode, acquire cross-process writer lock
1407        let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1408            Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1409        } else {
1410            None
1411        };
1412
1413        // Use write_refs to avoid unnecessary allocations
1414        self.storage.write_refs(txn.txn_id, key, value)
1415    }
1416
1417    /// Batch put multiple key-value pairs with reduced overhead
1418    ///
1419    /// This amortizes per-operation costs over the entire batch:
1420    /// - Single DashMap lookup
1421    /// - Batch MVCC tracking
1422    /// - Batch memtable writes
1423    ///
1424    /// For 100+ entries, this is 2-3x faster than individual puts.
1425    ///
1426    /// # Example
1427    ///
1428    /// ```ignore
1429    /// let writes: Vec<(&[u8], &[u8])> = vec![
1430    ///     (b"key1", b"value1"),
1431    ///     (b"key2", b"value2"),
1432    ///     (b"key3", b"value3"),
1433    /// ];
1434    /// db.put_batch(txn, &writes)?;
1435    /// ```
1436    pub fn put_batch(&self, txn: TxnHandle, writes: &[(&[u8], &[u8])]) -> Result<()> {
1437        let bytes: u64 = writes.iter().map(|(k, v)| (k.len() + v.len()) as u64).sum();
1438        self.stats.bytes_written.fetch_add(bytes, Ordering::Relaxed);
1439
1440        // In concurrent mode, acquire cross-process writer lock
1441        let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1442            Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1443        } else {
1444            None
1445        };
1446
1447        self.storage.write_batch_refs(txn.txn_id, writes)
1448    }
1449
1450    /// Get a value by key
1451    pub fn get(&self, txn: TxnHandle, key: &[u8]) -> Result<Option<Vec<u8>>> {
1452        let result = self.storage.read(txn.txn_id, key)?;
1453        if let Some(ref data) = result {
1454            self.stats
1455                .bytes_read
1456                .fetch_add(data.len() as u64, Ordering::Relaxed);
1457        }
1458        Ok(result)
1459    }
1460
1461    /// Delete a key
1462    pub fn delete(&self, txn: TxnHandle, key: &[u8]) -> Result<()> {
1463        self.storage.delete(txn.txn_id, key.to_vec())
1464    }
1465
1466    /// Minimum prefix length for scan operations.
1467    /// Prevents expensive full-table scans by requiring a meaningful prefix.
1468    pub const MIN_SCAN_PREFIX_LEN: usize = 2;
1469
1470    /// Scan keys with a prefix (enforces minimum prefix length for safety).
1471    ///
1472    /// # Prefix Safety
1473    ///
1474    /// To prevent accidental full-table scans, this method requires a minimum
1475    /// prefix length of 2 bytes. Use `scan_unchecked` for internal operations
1476    /// that need empty/short prefixes.
1477    ///
1478    /// # Errors
1479    ///
1480    /// Returns `SochDBError::InvalidInput` if prefix is too short.
1481    pub fn scan(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1482        if prefix.len() < Self::MIN_SCAN_PREFIX_LEN {
1483            return Err(SochDBError::InvalidArgument(format!(
1484                "Prefix too short: {} bytes (minimum {} required). \
1485                 Use scan_unchecked() for unrestricted scans.",
1486                prefix.len(),
1487                Self::MIN_SCAN_PREFIX_LEN
1488            )));
1489        }
1490        self.scan_unchecked(txn, prefix)
1491    }
1492
1493    /// Scan keys with a prefix without length validation.
1494    ///
1495    /// # Warning
1496    ///
1497    /// This method allows empty/short prefixes which can cause expensive
1498    /// full-table scans. Use `scan()` unless you specifically need unrestricted
1499    /// prefix access for internal operations.
1500    pub fn scan_unchecked(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1501        let results = self.storage.scan(txn.txn_id, prefix)?;
1502        let bytes: u64 = results
1503            .iter()
1504            .map(|(k, v)| (k.len() + v.len()) as u64)
1505            .sum();
1506        self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1507        Ok(results)
1508    }
1509
1510    /// Scan keys in range
1511    pub fn scan_range(
1512        &self,
1513        txn: TxnHandle,
1514        start: &[u8],
1515        end: &[u8],
1516    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1517        let results = self.storage.scan_range(txn.txn_id, start, end)?;
1518        let bytes: u64 = results
1519            .iter()
1520            .map(|(k, v)| (k.len() + v.len()) as u64)
1521            .sum();
1522        self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1523        Ok(results)
1524    }
1525
1526    /// Streaming scan for very large result sets
1527    ///
1528    /// Returns an iterator that yields (key, value) pairs without
1529    /// materializing the entire result set. Use this for large scans
1530    /// where memory efficiency is important.
1531    ///
1532    /// ## Performance
1533    ///
1534    /// - Memory: O(1) per iteration vs O(N) for scan_range
1535    /// - Latency: First result available immediately vs waiting for all results
1536    /// - Throughput: Slightly lower due to per-item overhead
1537    ///
1538    /// ## Usage
1539    ///
1540    /// ```ignore
1541    /// for result in db.scan_range_iter(txn, b"start", b"end") {
1542    ///     let (key, value) = result?;
1543    ///     // Process immediately - no need to wait for all results
1544    /// }
1545    /// ```
1546    pub fn scan_range_iter<'a>(
1547        &'a self,
1548        txn: TxnHandle,
1549        start: &'a [u8],
1550        end: &'a [u8],
1551    ) -> impl Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a {
1552        let stats = &self.stats;
1553        self.storage
1554            .scan_range_iter(txn.txn_id, start, end)
1555            .map(move |item| {
1556                stats
1557                    .bytes_read
1558                    .fetch_add((item.0.len() + item.1.len()) as u64, Ordering::Relaxed);
1559                Ok(item)
1560            })
1561    }
1562
1563    /// Flush memtable to WAL/Disk
1564    pub fn flush(&self) -> Result<()> {
1565        self.storage.fsync()
1566    }
1567
1568    // =========================================================================
1569    // Path-Native API (SochDB's differentiator)
1570    // =========================================================================
1571
1572    /// Get storage statistics
1573    pub fn storage_stats(&self) -> crate::durable_storage::StorageStats {
1574        self.storage.stats()
1575    }
1576
1577    /// Put a value at a path
1578    ///
1579    /// Path format: "collection/doc_id/field" or "table.row_id.column"
1580    /// Resolution is O(|path|), not O(log N) like B-tree.
1581    pub fn put_path(&self, txn: TxnHandle, path: &str, value: &[u8]) -> Result<()> {
1582        self.put(txn, path.as_bytes(), value)
1583    }
1584
1585    /// Get a value at a path
1586    pub fn get_path(&self, txn: TxnHandle, path: &str) -> Result<Option<Vec<u8>>> {
1587        self.get(txn, path.as_bytes())
1588    }
1589
1590    /// Delete at a path
1591    pub fn delete_path(&self, txn: TxnHandle, path: &str) -> Result<()> {
1592        self.delete(txn, path.as_bytes())
1593    }
1594
1595    /// Scan a path prefix
1596    ///
1597    /// Returns all key-value pairs where key starts with prefix.
1598    /// Useful for: "users/123/" -> all fields of user 123
1599    pub fn scan_path(&self, txn: TxnHandle, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
1600        self.stats.queries_executed.fetch_add(1, Ordering::Relaxed);
1601
1602        let results = self.scan(txn, prefix.as_bytes())?;
1603
1604        Ok(results
1605            .into_iter()
1606            .filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
1607            .collect())
1608    }
1609
1610    // =========================================================================
1611    // Query API
1612    // =========================================================================
1613
1614    /// Execute a path query and return results
1615    ///
1616    /// This is the main query interface for LLM context retrieval.
1617    /// Supports:
1618    /// - Path prefix matching
1619    /// - Column projection (for I/O reduction)
1620    /// - Limit/offset
1621    pub fn query(&self, txn: TxnHandle, path_prefix: &str) -> QueryBuilder<'_> {
1622        QueryBuilder::new(self, txn, path_prefix.to_string())
1623    }
1624
1625    // =========================================================================
1626    // Table API (Higher-level abstraction)
1627    // =========================================================================
1628
1629    /// Register a table schema
1630    pub fn register_table(&self, schema: TableSchema) -> Result<()> {
1631        if self.tables.contains_key(&schema.name) {
1632            return Err(SochDBError::InvalidArgument(format!(
1633                "Table '{}' already exists",
1634                schema.name
1635            )));
1636        }
1637        // Cache the packed schema for fast inserts
1638        let packed_schema = Self::to_packed_schema(&schema);
1639        self.packed_schemas
1640            .insert(schema.name.clone(), packed_schema);
1641        self.tables.insert(schema.name.clone(), schema);
1642        Ok(())
1643    }
1644
1645    /// Get table schema
1646    pub fn get_table_schema(&self, name: &str) -> Option<TableSchema> {
1647        self.tables.get(name).map(|s| s.clone())
1648    }
1649
1650    /// Update the schema for an existing table (used by ALTER TABLE).
1651    ///
1652    /// Replaces the schema in both the `tables` DashMap and the packed schema
1653    /// cache atomically (per-key). The caller is responsible for validating
1654    /// the new schema.
1655    pub fn update_table_schema(&self, old_name: &str, schema: TableSchema) -> Result<()> {
1656        if !self.tables.contains_key(old_name) {
1657            return Err(SochDBError::InvalidArgument(format!(
1658                "Table '{}' not found",
1659                old_name
1660            )));
1661        }
1662        // Remove old entries
1663        self.tables.remove(old_name);
1664        self.packed_schemas.remove(old_name);
1665        // Insert new
1666        let packed = Self::to_packed_schema(&schema);
1667        self.packed_schemas.insert(schema.name.clone(), packed);
1668        self.tables.insert(schema.name.clone(), schema);
1669        Ok(())
1670    }
1671
1672    /// List all tables
1673    pub fn list_tables(&self) -> Vec<String> {
1674        self.tables.iter().map(|e| e.key().clone()).collect()
1675    }
1676
1677    // =========================================================================
1678    // CDC (Change Data Capture)
1679    // =========================================================================
1680
1681    /// Enable CDC on this database, returning the CDC log handle.
1682    ///
1683    /// Subsequent mutations emitted via the SQL execution layer will be
1684    /// recorded in the CDC log for subscriber consumption.
1685    pub fn enable_cdc(&mut self, config: crate::cdc::CdcConfig) -> Arc<crate::cdc::CdcLog> {
1686        let log = crate::cdc::CdcLog::new(config);
1687        self.cdc_log = Some(log.clone());
1688        log
1689    }
1690
1691    /// Get the CDC log handle, if CDC is enabled.
1692    pub fn cdc_log(&self) -> Option<&Arc<crate::cdc::CdcLog>> {
1693        self.cdc_log.as_ref()
1694    }
1695
1696    /// Convert TableSchema to PackedTableSchema for efficient storage
1697    fn to_packed_schema(schema: &TableSchema) -> PackedTableSchema {
1698        let columns = schema
1699            .columns
1700            .iter()
1701            .map(|col| PackedColumnDef {
1702                name: col.name.clone(),
1703                col_type: match col.col_type {
1704                    ColumnType::Int64 => PackedColumnType::Int64,
1705                    ColumnType::UInt64 => PackedColumnType::UInt64,
1706                    ColumnType::Float64 => PackedColumnType::Float64,
1707                    ColumnType::Text => PackedColumnType::Text,
1708                    ColumnType::Binary => PackedColumnType::Binary,
1709                    ColumnType::Bool => PackedColumnType::Bool,
1710                },
1711                nullable: col.nullable,
1712            })
1713            .collect();
1714
1715        PackedTableSchema::new(&schema.name, columns)
1716    }
1717
1718    /// Insert a row into a table
1719    ///
1720    /// Uses packed row format: stores entire row as single key-value pair.
1721    /// This reduces write amplification from 4× to 1× for a 4-column table.
1722    ///
1723    /// # Performance
1724    /// - Before: 4 columns × (WAL entry + MVCC version) = 4 writes
1725    /// - After: 1 packed row = 1 write
1726    /// - Improvement: ~4× fewer WAL entries, ~48% less I/O overhead
1727    pub fn insert_row(
1728        &self,
1729        txn: TxnHandle,
1730        table: &str,
1731        row_id: u64,
1732        values: &HashMap<String, SochValue>,
1733    ) -> Result<()> {
1734        // Use cached packed schema - single DashMap lookup, no clone
1735        let packed_schema = self
1736            .packed_schemas
1737            .get(table)
1738            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1739
1740        // Pack the row using cached schema
1741        let packed_row = PackedRow::pack(&packed_schema, values);
1742
1743        // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1744        let key = KeyBuffer::format_row_key(table, row_id);
1745
1746        self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1747
1748        Ok(())
1749    }
1750
1751    /// Read a row from a table
1752    ///
1753    /// Reads packed row and extracts requested columns in O(k) time.
1754    /// Column projection happens in memory, not storage - all columns are fetched.
1755    pub fn read_row(
1756        &self,
1757        txn: TxnHandle,
1758        table: &str,
1759        row_id: u64,
1760        columns: Option<&[&str]>,
1761    ) -> Result<Option<HashMap<String, SochValue>>> {
1762        let schema = self
1763            .tables
1764            .get(table)
1765            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1766
1767        // Read the packed row with a single key lookup using KeyBuffer
1768        let key = KeyBuffer::format_row_key(table, row_id);
1769        let bytes = match self.get(txn, key.as_bytes())? {
1770            Some(b) => b,
1771            None => return Ok(None),
1772        };
1773
1774        // Use cached packed schema
1775        let packed_schema = self
1776            .packed_schemas
1777            .get(table)
1778            .ok_or_else(|| SochDBError::Internal("Packed schema not found".into()))?;
1779        let packed_row = PackedRow::from_bytes(bytes, packed_schema.num_columns())?;
1780
1781        // Determine which columns to return
1782        let cols_to_read: Vec<&str> = match columns {
1783            Some(c) => c.to_vec(),
1784            None => schema.columns.iter().map(|c| c.name.as_str()).collect(),
1785        };
1786
1787        let mut row = HashMap::new();
1788        for col_name in cols_to_read {
1789            if let Some(idx) = packed_schema.column_index(col_name)
1790                && let Some(col_def) = packed_schema.column(idx)
1791                && let Some(value) = packed_row.get_column(idx, col_def.col_type)
1792            {
1793                row.insert(col_name.to_string(), value);
1794            }
1795        }
1796
1797        Ok(Some(row))
1798    }
1799
1800    /// Insert multiple rows efficiently in a batch
1801    ///
1802    /// This method accumulates all rows and writes them with fewer WAL syncs.
1803    /// Ideal for bulk loading scenarios.
1804    ///
1805    /// # Performance
1806    /// - Uses group commit to batch fsync operations
1807    /// - Expected throughput: 500K-1M rows/sec depending on row size
1808    pub fn insert_rows_batch(
1809        &self,
1810        txn: TxnHandle,
1811        table: &str,
1812        rows: &[(u64, HashMap<String, SochValue>)],
1813    ) -> Result<usize> {
1814        // Use cached packed schema
1815        let packed_schema = self
1816            .packed_schemas
1817            .get(table)
1818            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1819
1820        let mut count = 0;
1821
1822        for (row_id, values) in rows {
1823            // Pack and write using KeyBuffer for efficient key construction
1824            let packed_row = PackedRow::pack(&packed_schema, values);
1825            let key = KeyBuffer::format_row_key(table, *row_id);
1826            self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1827            count += 1;
1828        }
1829
1830        Ok(count)
1831    }
1832
1833    /// Ultra-fast raw put - bypasses all validation
1834    ///
1835    /// Use when you've already validated the data and just need speed.
1836    /// This is ~10× faster than insert_row() for bulk inserts.
1837    #[inline]
1838    pub fn put_raw(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1839        self.storage.write_refs(txn.txn_id, key, value)
1840    }
1841
1842    /// Zero-allocation insert - fastest path for bulk inserts
1843    ///
1844    /// Takes values as a slice in schema column order, avoiding HashMap overhead.
1845    ///
1846    /// # Arguments
1847    /// * `txn` - Transaction handle
1848    /// * `table` - Table name
1849    /// * `row_id` - Row identifier
1850    /// * `values` - Values in schema column order (None = NULL)
1851    ///
1852    /// # Performance
1853    /// - Eliminates ~6 allocations per row vs insert_row()
1854    /// - Expected: 1.2M-1.5M inserts/sec
1855    ///
1856    /// # Example
1857    /// ```ignore
1858    /// let values: &[Option<&SochValue>] = &[
1859    ///     Some(&SochValue::Int(1)),
1860    ///     Some(&SochValue::Text("Alice".into())),
1861    ///     None, // NULL
1862    /// ];
1863    /// db.insert_row_slice(txn, "users", 1, values)?;
1864    /// ```
1865    #[inline]
1866    pub fn insert_row_slice(
1867        &self,
1868        txn: TxnHandle,
1869        table: &str,
1870        row_id: u64,
1871        values: &[Option<&SochValue>],
1872    ) -> Result<()> {
1873        // Use cached packed schema - single DashMap lookup
1874        let packed_schema = self
1875            .packed_schemas
1876            .get(table)
1877            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1878
1879        // Validate column count matches
1880        if values.len() != packed_schema.num_columns() {
1881            return Err(SochDBError::InvalidArgument(format!(
1882                "Expected {} columns, got {}",
1883                packed_schema.num_columns(),
1884                values.len()
1885            )));
1886        }
1887
1888        // Pack using zero-allocation path
1889        let packed_row = PackedRow::pack_slice(&packed_schema, values);
1890
1891        // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1892        let key = KeyBuffer::format_row_key(table, row_id);
1893
1894        self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1895        Ok(())
1896    }
1897
1898    // =========================================================================
1899    // Maintenance
1900    // =========================================================================
1901
1902    /// Force fsync to disk
1903    pub fn fsync(&self) -> Result<()> {
1904        self.storage.fsync()
1905    }
1906
1907    /// Create a checkpoint
1908    pub fn checkpoint(&self) -> Result<u64> {
1909        self.storage.checkpoint()
1910    }
1911
1912    /// Truncate the WAL file after a checkpoint.
1913    ///
1914    /// See [`DurableStorage::truncate_wal`] for safety notes.
1915    pub fn truncate_wal(&self) -> Result<()> {
1916        self.storage.truncate_wal()
1917    }
1918
1919    /// Run garbage collection
1920    pub fn gc(&self) -> usize {
1921        self.storage.gc()
1922    }
1923
1924    /// Get database statistics
1925    pub fn stats(&self) -> Stats {
1926        Stats {
1927            transactions_started: self.stats.transactions_started.load(Ordering::Relaxed),
1928            transactions_committed: self.stats.transactions_committed.load(Ordering::Relaxed),
1929            transactions_aborted: self.stats.transactions_aborted.load(Ordering::Relaxed),
1930            queries_executed: self.stats.queries_executed.load(Ordering::Relaxed),
1931            bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
1932            bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
1933        }
1934    }
1935
1936    /// Shutdown the database gracefully
1937    pub fn shutdown(&self) -> Result<()> {
1938        if self.shutdown.swap(1, Ordering::SeqCst) == 1 {
1939            return Ok(()); // Already shutting down
1940        }
1941
1942        // Flush any pending writes
1943        self.fsync()?;
1944
1945        // Create clean shutdown marker
1946        let marker = self.path.join(".clean_shutdown");
1947        std::fs::write(&marker, b"ok")?;
1948
1949        Ok(())
1950    }
1951}
1952
1953impl Drop for Database {
1954    fn drop(&mut self) {
1955        // Try graceful shutdown if not already done
1956        if self.shutdown.load(Ordering::SeqCst) == 0 {
1957            let _ = self.fsync();
1958            let marker = self.path.join(".clean_shutdown");
1959            let _ = std::fs::write(&marker, b"ok");
1960        }
1961    }
1962}
1963
1964/// Query builder for fluent query construction
1965pub struct QueryBuilder<'a> {
1966    db: &'a Database,
1967    txn: TxnHandle,
1968    path_prefix: String,
1969    columns: Option<Vec<String>>,
1970    limit: Option<usize>,
1971    offset: Option<usize>,
1972}
1973
1974impl<'a> QueryBuilder<'a> {
1975    fn new(db: &'a Database, txn: TxnHandle, path_prefix: String) -> Self {
1976        Self {
1977            db,
1978            txn,
1979            path_prefix,
1980            columns: None,
1981            limit: None,
1982            offset: None,
1983        }
1984    }
1985
1986    /// Select specific columns (for I/O reduction)
1987    pub fn columns(mut self, cols: &[&str]) -> Self {
1988        self.columns = Some(cols.iter().map(|s| s.to_string()).collect());
1989        self
1990    }
1991
1992    /// Limit results
1993    pub fn limit(mut self, n: usize) -> Self {
1994        self.limit = Some(n);
1995        self
1996    }
1997
1998    /// Skip results
1999    pub fn offset(mut self, n: usize) -> Self {
2000        self.offset = Some(n);
2001        self
2002    }
2003
2004    /// Execute the query
2005    ///
2006    /// Scans packed rows and unpacks them. Each key is "table/row_id" pointing to a packed row.
2007    pub fn execute(self) -> Result<QueryResult> {
2008        self.db
2009            .stats
2010            .queries_executed
2011            .fetch_add(1, Ordering::Relaxed);
2012
2013        // Get schema for the table if we're querying a table
2014        let table_name = self
2015            .path_prefix
2016            .split('/')
2017            .next()
2018            .unwrap_or(&self.path_prefix);
2019        let schema = self.db.tables.get(table_name).map(|s| s.clone());
2020
2021        // When querying a registered table by its bare name, scan the row-key
2022        // prefix "table/" rather than the bare name. Row keys are "table/row_id"
2023        // (see KeyBuffer::format_row_key), so scanning the bare name would:
2024        //   (1) bleed across sibling tables whose names share a prefix — e.g.
2025        //       querying "user" would also match "users/123"; and
2026        //   (2) trip the scan minimum-prefix guard for single-character table
2027        //       names ("t" is 1 byte, below the 2-byte minimum).
2028        // Appending the separator fixes both. Path-style prefixes that already
2029        // contain '/' (e.g. "users/123" to fetch one row's columns) are left
2030        // unchanged so field-prefix scans keep working.
2031        let scan_prefix = if schema.is_some() && !self.path_prefix.contains('/') {
2032            format!("{}/", self.path_prefix)
2033        } else {
2034            self.path_prefix.clone()
2035        };
2036
2037        // Scan the path prefix
2038        let results = self.db.scan_path(self.txn, &scan_prefix)?;
2039
2040        let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
2041        let mut bytes_read = 0usize;
2042
2043        if let Some(ref schema) = schema {
2044            // We have a table schema - use cached packed schema
2045            let packed_schema = self
2046                .db
2047                .packed_schemas
2048                .get(table_name)
2049                .map(|ps| ps.clone())
2050                .unwrap_or_else(|| Database::to_packed_schema(schema));
2051
2052            for (path, value_bytes) in results {
2053                // Parse path: table/row_id
2054                let parts: Vec<&str> = path.split('/').collect();
2055                if parts.len() == 2 {
2056                    // This is a packed row
2057                    bytes_read += value_bytes.len();
2058
2059                    if let Ok(packed_row) =
2060                        PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
2061                    {
2062                        // Unpack all columns or just requested columns
2063                        let mut row = HashMap::new();
2064
2065                        if let Some(ref cols) = self.columns {
2066                            // Only extract requested columns
2067                            for col_name in cols {
2068                                if let Some(idx) = packed_schema.column_index(col_name)
2069                                    && let Some(col_def) = packed_schema.column(idx)
2070                                    && let Some(value) =
2071                                        packed_row.get_column(idx, col_def.col_type)
2072                                {
2073                                    row.insert(col_name.clone(), value);
2074                                }
2075                            }
2076                        } else {
2077                            // Extract all columns
2078                            row = packed_row.unpack(&packed_schema);
2079                        }
2080
2081                        if !row.is_empty() {
2082                            rows.push(row);
2083                        }
2084                    }
2085                }
2086            }
2087        } else {
2088            // Fallback: no schema, try legacy column-per-key format
2089            let mut rows_map: HashMap<String, HashMap<String, SochValue>> = HashMap::new();
2090
2091            for (path, value_bytes) in results {
2092                let parts: Vec<&str> = path.split('/').collect();
2093                if parts.len() >= 3 {
2094                    let row_key = format!("{}/{}", parts[0], parts[1]);
2095                    let col_name = parts[2..].join("/");
2096
2097                    if let Some(ref cols) = self.columns
2098                        && !cols.contains(&col_name)
2099                    {
2100                        continue;
2101                    }
2102
2103                    bytes_read += value_bytes.len();
2104                    let row = rows_map.entry(row_key).or_default();
2105                    row.insert(col_name, deserialize_value(&value_bytes));
2106                }
2107            }
2108
2109            rows = rows_map.into_values().collect();
2110        }
2111
2112        // Apply offset
2113        if let Some(offset) = self.offset {
2114            rows = rows.into_iter().skip(offset).collect();
2115        }
2116
2117        // Apply limit
2118        if let Some(limit) = self.limit {
2119            rows.truncate(limit);
2120        }
2121
2122        // Collect column names
2123        let columns: Vec<String> = self.columns.unwrap_or_else(|| {
2124            rows.iter()
2125                .flat_map(|r| r.keys().cloned())
2126                .collect::<std::collections::HashSet<_>>()
2127                .into_iter()
2128                .collect()
2129        });
2130
2131        Ok(QueryResult {
2132            columns,
2133            rows_scanned: rows.len(),
2134            bytes_read,
2135            rows,
2136        })
2137    }
2138
2139    /// Execute and return TOON format (for LLM efficiency)
2140    pub fn to_toon(self) -> Result<String> {
2141        let result = self.execute()?;
2142        Ok(result.to_toon())
2143    }
2144
2145    /// Execute with lazy iteration - avoids materializing all rows
2146    ///
2147    /// Returns an iterator over rows as `Vec<SochValue>` in schema column order.
2148    /// This is more memory-efficient than `execute()` for large result sets.
2149    ///
2150    /// # Performance
2151    /// - No upfront materialization of all rows
2152    /// - ~40% less memory for large result sets
2153    /// - Ideal for streaming to network or aggregations
2154    ///
2155    /// # Example
2156    /// ```ignore
2157    /// for row_result in db.query(txn, "users").execute_iter()? {
2158    ///     let row = row_result?;
2159    ///     // row is Vec<SochValue> in column order
2160    /// }
2161    /// ```
2162    pub fn execute_iter(self) -> Result<QueryRowIterator> {
2163        self.db
2164            .stats
2165            .queries_executed
2166            .fetch_add(1, Ordering::Relaxed);
2167
2168        let table_name = self
2169            .path_prefix
2170            .split('/')
2171            .next()
2172            .unwrap_or(&self.path_prefix)
2173            .to_string();
2174
2175        // Get packed schema (clone needed for iterator ownership)
2176        let packed_schema = self.db.packed_schemas.get(&table_name).map(|ps| ps.clone());
2177
2178        // Scan the path prefix
2179        let results = self.db.scan_path(self.txn, &self.path_prefix)?;
2180
2181        Ok(QueryRowIterator {
2182            results: results.into_iter(),
2183            packed_schema,
2184            columns: self.columns,
2185            offset: self.offset.unwrap_or(0),
2186            limit: self.limit,
2187            yielded: 0,
2188            skipped: 0,
2189        })
2190    }
2191
2192    /// Execute and return columnar (SIMD-friendly) result format
2193    ///
2194    /// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, returns
2195    /// column-oriented `Vec<TypedColumn>` for vectorized operations.
2196    ///
2197    /// ## Performance Benefits
2198    ///
2199    /// - SIMD: Aggregate operations (sum, avg) use vectorized instructions
2200    /// - Cache: Sequential access maximizes L1/L2 hits
2201    /// - Memory: ~30% less overhead than row-based format
2202    /// - Analytics: Ideal for ML preprocessing and statistics
2203    ///
2204    /// ## Example
2205    ///
2206    /// ```ignore
2207    /// let result = db.query(txn, "users")
2208    ///     .columns(&["id", "score"])
2209    ///     .as_columnar()?;
2210    ///
2211    /// // SIMD-optimized sum
2212    /// let total = result.sum_i64("score").unwrap_or(0);
2213    ///
2214    /// // Direct column access
2215    /// if let Some(scores) = result.column("score") {
2216    ///     for i in 0..scores.len() {
2217    ///         if let Some(v) = scores.get_i64(i) {
2218    ///             println!("Score: {}", v);
2219    ///         }
2220    ///     }
2221    /// }
2222    /// ```
2223    pub fn as_columnar(self) -> Result<ColumnarQueryResult> {
2224        self.db
2225            .stats
2226            .queries_executed
2227            .fetch_add(1, Ordering::Relaxed);
2228
2229        let table_name = self
2230            .path_prefix
2231            .split('/')
2232            .next()
2233            .unwrap_or(&self.path_prefix);
2234        let schema = self.db.tables.get(table_name).map(|s| s.clone());
2235
2236        // Get packed schema
2237        let packed_schema = match self.db.packed_schemas.get(table_name) {
2238            Some(ps) => ps.clone(),
2239            None => return Ok(ColumnarQueryResult::empty()),
2240        };
2241
2242        // Determine columns to fetch
2243        let column_names: Vec<String> = self.columns.clone().unwrap_or_else(|| {
2244            schema
2245                .as_ref()
2246                .map(|s| s.columns.iter().map(|c| c.name.clone()).collect())
2247                .unwrap_or_default()
2248        });
2249
2250        if column_names.is_empty() {
2251            return Ok(ColumnarQueryResult::empty());
2252        }
2253
2254        // Initialize TypedColumns based on schema types
2255        let mut columns: Vec<CoreTypedColumn> = column_names
2256            .iter()
2257            .map(|col_name| {
2258                packed_schema
2259                    .column_index(col_name)
2260                    .and_then(|idx| packed_schema.column(idx))
2261                    .map(|col_def| match col_def.col_type {
2262                        PackedColumnType::Int64 => CoreTypedColumn::new_int64(),
2263                        PackedColumnType::UInt64 => CoreTypedColumn::new_uint64(),
2264                        PackedColumnType::Float64 => CoreTypedColumn::new_float64(),
2265                        PackedColumnType::Text => CoreTypedColumn::new_text(),
2266                        PackedColumnType::Binary => CoreTypedColumn::new_binary(),
2267                        PackedColumnType::Bool => CoreTypedColumn::new_bool(),
2268                        PackedColumnType::Null => CoreTypedColumn::new_text(), // Null column = fallback to text
2269                    })
2270                    .unwrap_or_else(CoreTypedColumn::new_text) // fallback
2271            })
2272            .collect();
2273
2274        // Scan the path prefix
2275        let results = self.db.scan_path(self.txn, &self.path_prefix)?;
2276
2277        let mut row_count = 0;
2278        let mut bytes_read = 0;
2279        let mut skipped = 0;
2280
2281        for (path, value_bytes) in results {
2282            // Parse path: table/row_id
2283            let parts: Vec<&str> = path.split('/').collect();
2284            if parts.len() != 2 {
2285                continue;
2286            }
2287
2288            // Apply offset
2289            if let Some(offset) = self.offset
2290                && skipped < offset
2291            {
2292                skipped += 1;
2293                continue;
2294            }
2295
2296            // Apply limit
2297            if let Some(limit) = self.limit
2298                && row_count >= limit
2299            {
2300                break;
2301            }
2302
2303            bytes_read += value_bytes.len();
2304
2305            if let Ok(packed_row) = PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
2306            {
2307                // Extract each column and push to corresponding TypedColumn
2308                for (col_idx, col_name) in column_names.iter().enumerate() {
2309                    if let Some(schema_idx) = packed_schema.column_index(col_name) {
2310                        if let Some(col_def) = packed_schema.column(schema_idx) {
2311                            let value = packed_row.get_column(schema_idx, col_def.col_type);
2312                            push_value_to_typed_column(&mut columns[col_idx], value);
2313                        } else {
2314                            push_null_to_typed_column(&mut columns[col_idx]);
2315                        }
2316                    } else {
2317                        push_null_to_typed_column(&mut columns[col_idx]);
2318                    }
2319                }
2320                row_count += 1;
2321            }
2322        }
2323
2324        Ok(ColumnarQueryResult {
2325            columns: column_names,
2326            data: columns,
2327            row_count,
2328            bytes_read,
2329        })
2330    }
2331}
2332
2333/// Lazy iterator over query results
2334///
2335/// Unpacks rows on-demand, avoiding upfront materialization.
2336pub struct QueryRowIterator {
2337    results: std::vec::IntoIter<(String, Vec<u8>)>,
2338    packed_schema: Option<PackedTableSchema>,
2339    columns: Option<Vec<String>>,
2340    offset: usize,
2341    limit: Option<usize>,
2342    yielded: usize,
2343    skipped: usize,
2344}
2345
2346impl Iterator for QueryRowIterator {
2347    type Item = Result<Vec<SochValue>>;
2348
2349    fn next(&mut self) -> Option<Self::Item> {
2350        // Check limit
2351        if let Some(limit) = self.limit
2352            && self.yielded >= limit
2353        {
2354            return None;
2355        }
2356
2357        loop {
2358            let (path, value_bytes) = self.results.next()?;
2359
2360            // Parse path: table/row_id
2361            let parts: Vec<&str> = path.split('/').collect();
2362            if parts.len() != 2 {
2363                continue; // Skip non-row entries
2364            }
2365
2366            // Apply offset
2367            if self.skipped < self.offset {
2368                self.skipped += 1;
2369                continue;
2370            }
2371
2372            if let Some(ref schema) = self.packed_schema {
2373                match PackedRow::from_bytes(value_bytes, schema.num_columns()) {
2374                    Ok(packed_row) => {
2375                        let row = if let Some(ref cols) = self.columns {
2376                            // Project specific columns
2377                            cols.iter()
2378                                .map(|col_name| {
2379                                    schema
2380                                        .column_index(col_name)
2381                                        .and_then(|idx| schema.column(idx))
2382                                        .and_then(|col_def| {
2383                                            packed_row.get_column(
2384                                                schema.column_index(col_name).unwrap(),
2385                                                col_def.col_type,
2386                                            )
2387                                        })
2388                                        .unwrap_or(SochValue::Null)
2389                                })
2390                                .collect()
2391                        } else {
2392                            // All columns in order
2393                            packed_row.unpack_to_vec(schema)
2394                        };
2395
2396                        self.yielded += 1;
2397                        return Some(Ok(row));
2398                    }
2399                    Err(e) => return Some(Err(e)),
2400                }
2401            } else {
2402                // No schema - return raw bytes as binary
2403                self.yielded += 1;
2404                return Some(Ok(vec![SochValue::Binary(value_bytes)]));
2405            }
2406        }
2407    }
2408}
2409
2410// Helper functions for serialization (kept for backward compatibility with legacy data)
2411
2412#[allow(dead_code)]
2413fn serialize_value(value: &SochValue) -> Vec<u8> {
2414    // Simple serialization - in production use proper format
2415    match value {
2416        SochValue::Null => vec![0],
2417        SochValue::Int(i) => {
2418            let mut buf = vec![1];
2419            buf.extend_from_slice(&i.to_le_bytes());
2420            buf
2421        }
2422        SochValue::UInt(u) => {
2423            let mut buf = vec![2];
2424            buf.extend_from_slice(&u.to_le_bytes());
2425            buf
2426        }
2427        SochValue::Float(f) => {
2428            let mut buf = vec![3];
2429            buf.extend_from_slice(&f.to_le_bytes());
2430            buf
2431        }
2432        SochValue::Text(s) => {
2433            let mut buf = vec![4];
2434            buf.extend_from_slice(s.as_bytes());
2435            buf
2436        }
2437        SochValue::Bool(b) => vec![5, if *b { 1 } else { 0 }],
2438        SochValue::Binary(b) => {
2439            let mut buf = vec![6];
2440            buf.extend_from_slice(b);
2441            buf
2442        }
2443        _ => {
2444            // Fallback: serialize as text
2445            let s = format!("{:?}", value);
2446            let mut buf = vec![4];
2447            buf.extend_from_slice(s.as_bytes());
2448            buf
2449        }
2450    }
2451}
2452
2453fn deserialize_value(bytes: &[u8]) -> SochValue {
2454    if bytes.is_empty() {
2455        return SochValue::Null;
2456    }
2457
2458    match bytes[0] {
2459        0 => SochValue::Null,
2460        1 if bytes.len() >= 9 => {
2461            let i = i64::from_le_bytes(bytes[1..9].try_into().unwrap());
2462            SochValue::Int(i)
2463        }
2464        2 if bytes.len() >= 9 => {
2465            let u = u64::from_le_bytes(bytes[1..9].try_into().unwrap());
2466            SochValue::UInt(u)
2467        }
2468        3 if bytes.len() >= 9 => {
2469            let f = f64::from_le_bytes(bytes[1..9].try_into().unwrap());
2470            SochValue::Float(f)
2471        }
2472        4 => {
2473            let s = String::from_utf8_lossy(&bytes[1..]).to_string();
2474            SochValue::Text(s)
2475        }
2476        5 if bytes.len() >= 2 => SochValue::Bool(bytes[1] != 0),
2477        6 => SochValue::Binary(bytes[1..].to_vec()),
2478        _ => {
2479            // Treat as text
2480            let s = String::from_utf8_lossy(bytes).to_string();
2481            SochValue::Text(s)
2482        }
2483    }
2484}
2485
2486// ============================================================================
2487// Helper functions for columnar query result building
2488// ============================================================================
2489
2490/// Push a SochValue into a TypedColumn
2491fn push_value_to_typed_column(col: &mut CoreTypedColumn, value: Option<SochValue>) {
2492    match value {
2493        None => push_null_to_typed_column(col),
2494        Some(v) => match (col, v) {
2495            (
2496                CoreTypedColumn::Int64 {
2497                    values,
2498                    validity,
2499                    stats,
2500                },
2501                SochValue::Int(i),
2502            ) => {
2503                values.push(i);
2504                validity.push(true);
2505                stats.update_i64(i);
2506            }
2507            (
2508                CoreTypedColumn::Int64 {
2509                    values,
2510                    validity,
2511                    stats,
2512                },
2513                SochValue::UInt(u),
2514            ) => {
2515                values.push(u as i64);
2516                validity.push(true);
2517                stats.update_i64(u as i64);
2518            }
2519            (
2520                CoreTypedColumn::UInt64 {
2521                    values,
2522                    validity,
2523                    stats,
2524                },
2525                SochValue::UInt(u),
2526            ) => {
2527                values.push(u);
2528                validity.push(true);
2529                stats.update_i64(u as i64);
2530            }
2531            (
2532                CoreTypedColumn::UInt64 {
2533                    values,
2534                    validity,
2535                    stats,
2536                },
2537                SochValue::Int(i),
2538            ) => {
2539                values.push(i as u64);
2540                validity.push(true);
2541                stats.update_i64(i);
2542            }
2543            (
2544                CoreTypedColumn::Float64 {
2545                    values,
2546                    validity,
2547                    stats,
2548                },
2549                SochValue::Float(f),
2550            ) => {
2551                values.push(f);
2552                validity.push(true);
2553                stats.update_f64(f);
2554            }
2555            (
2556                CoreTypedColumn::Float64 {
2557                    values,
2558                    validity,
2559                    stats,
2560                },
2561                SochValue::Int(i),
2562            ) => {
2563                values.push(i as f64);
2564                validity.push(true);
2565                stats.update_f64(i as f64);
2566            }
2567            (
2568                CoreTypedColumn::Text {
2569                    offsets,
2570                    data,
2571                    validity,
2572                    stats,
2573                },
2574                SochValue::Text(s),
2575            ) => {
2576                data.extend_from_slice(s.as_bytes());
2577                offsets.push(data.len() as u32);
2578                validity.push(true);
2579                stats.row_count += 1;
2580            }
2581            (
2582                CoreTypedColumn::Binary {
2583                    offsets,
2584                    data,
2585                    validity,
2586                    stats,
2587                },
2588                SochValue::Binary(b),
2589            ) => {
2590                data.extend_from_slice(&b);
2591                offsets.push(data.len() as u32);
2592                validity.push(true);
2593                stats.row_count += 1;
2594            }
2595            (
2596                CoreTypedColumn::Bool {
2597                    values,
2598                    validity,
2599                    stats,
2600                    len,
2601                },
2602                SochValue::Bool(b),
2603            ) => {
2604                let idx = *len;
2605                *len += 1;
2606                let num_words = (*len).div_ceil(64);
2607                while values.len() < num_words {
2608                    values.push(0);
2609                }
2610                if b {
2611                    let word = idx / 64;
2612                    let bit = idx % 64;
2613                    values[word] |= 1 << bit;
2614                }
2615                validity.push(true);
2616                stats.row_count += 1;
2617            }
2618            // Type mismatch - push as null
2619            (col, _) => push_null_to_typed_column(col),
2620        },
2621    }
2622}
2623
2624/// Push a null value into a TypedColumn
2625fn push_null_to_typed_column(col: &mut CoreTypedColumn) {
2626    match col {
2627        CoreTypedColumn::Int64 {
2628            values,
2629            validity,
2630            stats,
2631        } => {
2632            values.push(0);
2633            validity.push(false);
2634            stats.update_null();
2635        }
2636        CoreTypedColumn::UInt64 {
2637            values,
2638            validity,
2639            stats,
2640        } => {
2641            values.push(0);
2642            validity.push(false);
2643            stats.update_null();
2644        }
2645        CoreTypedColumn::Float64 {
2646            values,
2647            validity,
2648            stats,
2649        } => {
2650            values.push(0.0);
2651            validity.push(false);
2652            stats.update_null();
2653        }
2654        CoreTypedColumn::Text {
2655            offsets,
2656            data: _,
2657            validity,
2658            stats,
2659        } => {
2660            offsets.push(offsets.last().copied().unwrap_or(0));
2661            validity.push(false);
2662            stats.update_null();
2663        }
2664        CoreTypedColumn::Binary {
2665            offsets,
2666            data: _,
2667            validity,
2668            stats,
2669        } => {
2670            offsets.push(offsets.last().copied().unwrap_or(0));
2671            validity.push(false);
2672            stats.update_null();
2673        }
2674        CoreTypedColumn::Bool {
2675            values,
2676            validity,
2677            stats,
2678            len,
2679        } => {
2680            *len += 1;
2681            let num_words = (*len).div_ceil(64);
2682            while values.len() < num_words {
2683                values.push(0);
2684            }
2685            validity.push(false);
2686            stats.update_null();
2687        }
2688    }
2689}
2690
2691#[cfg(test)]
2692mod tests {
2693    use super::*;
2694    use tempfile::tempdir;
2695
2696    #[test]
2697    fn test_database_open_close() {
2698        let dir = tempdir().unwrap();
2699        let db = Database::open(dir.path()).unwrap();
2700
2701        // Should be able to begin a transaction
2702        let txn = db.begin_transaction().unwrap();
2703        assert!(txn.txn_id > 0);
2704
2705        db.abort(txn).unwrap();
2706        db.shutdown().unwrap();
2707    }
2708
2709    #[test]
2710    fn test_database_put_get() {
2711        let dir = tempdir().unwrap();
2712        let db = Database::open(dir.path()).unwrap();
2713
2714        let txn = db.begin_transaction().unwrap();
2715        db.put(txn, b"key1", b"value1").unwrap();
2716
2717        let val = db.get(txn, b"key1").unwrap();
2718        assert_eq!(val, Some(b"value1".to_vec()));
2719
2720        db.commit(txn).unwrap();
2721
2722        // New transaction should see committed data
2723        let txn2 = db.begin_transaction().unwrap();
2724        let val = db.get(txn2, b"key1").unwrap();
2725        assert_eq!(val, Some(b"value1".to_vec()));
2726        db.abort(txn2).unwrap();
2727    }
2728
2729    #[test]
2730    fn test_database_encryption_end_to_end() {
2731        use crate::durable_storage::StorageEncryption;
2732        use crate::encryption::EncryptionKey;
2733
2734        let dir = tempdir().unwrap();
2735        let kek = [0x5Au8; 32];
2736        let enc = || StorageEncryption::with_kek(EncryptionKey::new(kek), "test");
2737
2738        // Create encrypted DB through the kernel, write committed data.
2739        {
2740            let db = Database::open_with_config_and_encryption(
2741                dir.path(),
2742                DatabaseConfig::default(),
2743                enc(),
2744            )
2745            .unwrap();
2746            assert!(db.storage.is_encrypted());
2747            let txn = db.begin_transaction().unwrap();
2748            db.put(txn, b"secret", b"value").unwrap();
2749            db.commit(txn).unwrap();
2750            db.shutdown().unwrap();
2751        }
2752        assert!(dir.path().join("keyring.json").exists());
2753
2754        // Reopen with the correct KEK -> committed data round-trips.
2755        {
2756            let db = Database::open_with_config_and_encryption(
2757                dir.path(),
2758                DatabaseConfig::default(),
2759                enc(),
2760            )
2761            .unwrap();
2762            let txn = db.begin_transaction().unwrap();
2763            assert_eq!(db.get(txn, b"secret").unwrap(), Some(b"value".to_vec()));
2764            db.abort(txn).unwrap();
2765            db.shutdown().unwrap();
2766        }
2767
2768        // Wrong KEK -> fail closed (no silent plaintext/empty open).
2769        {
2770            let wrong = Database::open_with_config_and_encryption(
2771                dir.path(),
2772                DatabaseConfig::default(),
2773                StorageEncryption::with_kek(EncryptionKey::new([0u8; 32]), "test"),
2774            );
2775            assert!(wrong.is_err(), "wrong KEK must fail closed at the kernel");
2776        }
2777
2778        // No key on an encrypted DB -> fail closed.
2779        {
2780            let no_key = Database::open_with_config(dir.path(), DatabaseConfig::default());
2781            assert!(
2782                no_key.is_err(),
2783                "encrypted DB opened without key must fail closed"
2784            );
2785        }
2786    }
2787
2788    #[test]
2789    fn test_database_concurrent_encryption() {
2790        use crate::durable_storage::StorageEncryption;
2791        use crate::encryption::EncryptionKey;
2792
2793        let dir = tempdir().unwrap();
2794        let kek = [0x33u8; 32];
2795
2796        {
2797            let db = Database::open_concurrent_with_config_and_encryption(
2798                dir.path(),
2799                DatabaseConfig::default(),
2800                StorageEncryption::with_kek(EncryptionKey::new(kek), "test"),
2801            )
2802            .unwrap();
2803            assert!(db.storage.is_encrypted());
2804            let txn = db.begin_transaction().unwrap();
2805            db.put(txn, b"ck", b"cv").unwrap();
2806            db.commit(txn).unwrap();
2807            db.shutdown().unwrap();
2808        }
2809
2810        // Reopen concurrent with correct key.
2811        let db = Database::open_concurrent_with_config_and_encryption(
2812            dir.path(),
2813            DatabaseConfig::default(),
2814            StorageEncryption::with_kek(EncryptionKey::new(kek), "test"),
2815        )
2816        .unwrap();
2817        let txn = db.begin_transaction().unwrap();
2818        assert_eq!(db.get(txn, b"ck").unwrap(), Some(b"cv".to_vec()));
2819        db.abort(txn).unwrap();
2820        db.shutdown().unwrap();
2821    }
2822
2823    #[test]
2824    fn test_database_path_api() {
2825        let dir = tempdir().unwrap();
2826        let db = Database::open(dir.path()).unwrap();
2827
2828        let txn = db.begin_transaction().unwrap();
2829
2830        // Write using path API
2831        db.put_path(txn, "users/1/name", b"Alice").unwrap();
2832        db.put_path(txn, "users/1/email", b"alice@example.com")
2833            .unwrap();
2834        db.put_path(txn, "users/2/name", b"Bob").unwrap();
2835
2836        db.commit(txn).unwrap();
2837
2838        // Scan path prefix
2839        let txn2 = db.begin_transaction().unwrap();
2840        let results = db.scan_path(txn2, "users/1/").unwrap();
2841        assert_eq!(results.len(), 2);
2842
2843        db.abort(txn2).unwrap();
2844    }
2845
2846    #[test]
2847    fn test_database_table_api() {
2848        let dir = tempdir().unwrap();
2849        let db = Database::open(dir.path()).unwrap();
2850
2851        // Register table
2852        db.register_table(TableSchema {
2853            name: "users".to_string(),
2854            columns: vec![
2855                ColumnDef {
2856                    name: "name".to_string(),
2857                    col_type: ColumnType::Text,
2858                    nullable: false,
2859                },
2860                ColumnDef {
2861                    name: "age".to_string(),
2862                    col_type: ColumnType::Int64,
2863                    nullable: true,
2864                },
2865            ],
2866        })
2867        .unwrap();
2868
2869        // Insert row
2870        let txn = db.begin_transaction().unwrap();
2871        let mut values = HashMap::new();
2872        values.insert("name".to_string(), SochValue::Text("Alice".to_string()));
2873        values.insert("age".to_string(), SochValue::Int(30));
2874
2875        db.insert_row(txn, "users", 1, &values).unwrap();
2876        db.commit(txn).unwrap();
2877
2878        // Read row
2879        let txn2 = db.begin_transaction().unwrap();
2880        let row = db.read_row(txn2, "users", 1, None).unwrap();
2881        assert!(row.is_some());
2882
2883        let row = row.unwrap();
2884        assert_eq!(row.get("name"), Some(&SochValue::Text("Alice".to_string())));
2885
2886        db.abort(txn2).unwrap();
2887    }
2888
2889    #[test]
2890    fn test_query_does_not_bleed_across_prefix_sibling_tables() {
2891        // Regression: row keys are "table/row_id", so a bare-name prefix scan of
2892        // "user" used to also match "users/..." (cross-table bleed), and a
2893        // single-character table name like "t" tripped the 2-byte scan guard.
2894        // Querying a registered table by name must scan exactly "table/".
2895        let dir = tempdir().unwrap();
2896        let db = Database::open(dir.path()).unwrap();
2897
2898        let col = || ColumnDef {
2899            name: "name".to_string(),
2900            col_type: ColumnType::Text,
2901            nullable: false,
2902        };
2903
2904        // Two sibling tables whose names share a prefix, plus a 1-char table.
2905        for table in ["user", "users", "t"] {
2906            db.register_table(TableSchema {
2907                name: table.to_string(),
2908                columns: vec![col()],
2909            })
2910            .unwrap();
2911        }
2912
2913        let txn = db.begin_transaction().unwrap();
2914        let mut v_user = HashMap::new();
2915        v_user.insert("name".to_string(), SochValue::Text("in_user".to_string()));
2916        db.insert_row(txn, "user", 1, &v_user).unwrap();
2917
2918        let mut v_users = HashMap::new();
2919        v_users.insert("name".to_string(), SochValue::Text("in_users".to_string()));
2920        db.insert_row(txn, "users", 1, &v_users).unwrap();
2921        db.insert_row(txn, "users", 2, &v_users).unwrap();
2922
2923        let mut v_t = HashMap::new();
2924        v_t.insert("name".to_string(), SochValue::Text("in_t".to_string()));
2925        db.insert_row(txn, "t", 1, &v_t).unwrap();
2926        db.commit(txn).unwrap();
2927
2928        // "user" must return exactly its one row — no bleed from "users".
2929        let txn_r = db.begin_read_only_fast();
2930        let user_rows = db.query(txn_r, "user").execute().unwrap().rows;
2931        assert_eq!(user_rows.len(), 1, "querying 'user' bled into 'users'");
2932        assert_eq!(
2933            user_rows[0].get("name"),
2934            Some(&SochValue::Text("in_user".to_string()))
2935        );
2936
2937        // "users" must return its two rows.
2938        let users_rows = db.query(txn_r, "users").execute().unwrap().rows;
2939        assert_eq!(users_rows.len(), 2);
2940
2941        // Single-character table name must be queryable (was rejected by the
2942        // 2-byte scan guard before the "table/" prefix fix).
2943        let t_rows = db.query(txn_r, "t").execute().unwrap().rows;
2944        assert_eq!(t_rows.len(), 1);
2945        assert_eq!(
2946            t_rows[0].get("name"),
2947            Some(&SochValue::Text("in_t".to_string()))
2948        );
2949        db.abort_read_only_fast(txn_r);
2950    }
2951
2952    #[test]
2953    fn test_database_query_builder() {
2954        let dir = tempdir().unwrap();
2955        let db = Database::open(dir.path()).unwrap();
2956
2957        // Insert test data
2958        let txn = db.begin_transaction().unwrap();
2959        db.put_path(txn, "docs/1/title", b"Hello").unwrap();
2960        db.put_path(txn, "docs/1/content", b"World").unwrap();
2961        db.put_path(txn, "docs/2/title", b"Foo").unwrap();
2962        db.put_path(txn, "docs/2/content", b"Bar").unwrap();
2963        db.commit(txn).unwrap();
2964
2965        // Query with limit
2966        let txn2 = db.begin_transaction().unwrap();
2967        let result = db.query(txn2, "docs/").limit(1).execute().unwrap();
2968
2969        assert_eq!(result.rows.len(), 1);
2970        db.abort(txn2).unwrap();
2971    }
2972
2973    #[test]
2974    fn test_database_crash_recovery() {
2975        let dir = tempdir().unwrap();
2976
2977        // Write and commit
2978        {
2979            // Use open_without_lock for crash simulation tests
2980            let db = Database::open_without_lock(dir.path()).unwrap();
2981            // Set sync mode to FULL to ensure data is persisted before "crash"
2982            db.storage.set_sync_mode(2);
2983            let txn = db.begin_transaction().unwrap();
2984            db.put(txn, b"persist", b"this").unwrap();
2985            db.commit(txn).unwrap();
2986            // Don't call shutdown - simulate crash
2987            std::mem::forget(db);
2988        }
2989
2990        // Reopen - should recover
2991        {
2992            let db = Database::open_without_lock(dir.path()).unwrap();
2993            let txn = db.begin_transaction().unwrap();
2994            let val = db.get(txn, b"persist").unwrap();
2995            assert_eq!(val, Some(b"this".to_vec()));
2996            db.abort(txn).unwrap();
2997        }
2998    }
2999
3000    #[test]
3001    fn test_columnar_row_view_zero_alloc() {
3002        use sochdb_core::columnar::TypedColumn;
3003
3004        // Build a small columnar result: 3 rows × 2 columns (id: i64, name: text)
3005        let mut id_col = TypedColumn::new_int64();
3006        id_col.push_i64(Some(1));
3007        id_col.push_i64(Some(2));
3008        id_col.push_i64(Some(3));
3009
3010        let mut name_col = TypedColumn::new_text();
3011        name_col.push_text(Some("Alice"));
3012        name_col.push_text(Some("Bob"));
3013        name_col.push_text(None); // NULL
3014
3015        let cr = ColumnarQueryResult {
3016            columns: vec!["id".to_string(), "name".to_string()],
3017            data: vec![id_col, name_col],
3018            row_count: 3,
3019            bytes_read: 0,
3020        };
3021
3022        // row_view access — zero HashMap allocation
3023        let row0 = cr.row_view(0).unwrap();
3024        assert_eq!(row0.get("id"), Some(SochValue::Int(1)));
3025        assert_eq!(row0.get("name"), Some(SochValue::Text("Alice".to_string())));
3026        assert_eq!(row0.get("nonexistent"), None);
3027
3028        let row2 = cr.row_view(2).unwrap();
3029        assert_eq!(row2.get("id"), Some(SochValue::Int(3)));
3030        assert_eq!(row2.get("name"), Some(SochValue::Null));
3031
3032        // Out of bounds
3033        assert!(cr.row_view(3).is_none());
3034
3035        // values() — positional
3036        let vals = row0.values();
3037        assert_eq!(vals.len(), 2);
3038        assert_eq!(vals[0], SochValue::Int(1));
3039
3040        // to_map() — backward compat
3041        let map = row0.to_map();
3042        assert_eq!(map.get("id"), Some(&SochValue::Int(1)));
3043    }
3044
3045    #[test]
3046    fn test_columnar_into_query_result() {
3047        use sochdb_core::columnar::TypedColumn;
3048
3049        let mut score_col = TypedColumn::new_float64();
3050        score_col.push_f64(Some(9.5));
3051        score_col.push_f64(Some(8.2));
3052
3053        let cr = ColumnarQueryResult {
3054            columns: vec!["score".to_string()],
3055            data: vec![score_col],
3056            row_count: 2,
3057            bytes_read: 100,
3058        };
3059
3060        let qr = cr.into_query_result();
3061        assert_eq!(qr.rows.len(), 2);
3062        assert_eq!(qr.rows[0].get("score"), Some(&SochValue::Float(9.5)));
3063        assert_eq!(qr.rows[1].get("score"), Some(&SochValue::Float(8.2)));
3064        assert_eq!(qr.bytes_read, 100);
3065    }
3066}