Skip to main content

sochdb_storage/
database.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SochDB Database Kernel
19//!
20//! The shared core that powers both embedded mode (`SochConnection::open`) and
21//! server mode (`sochdb-server`). This is the "SQLite engine" equivalent.
22//!
23//! ## Architecture
24//!
25//! ```text
26//! ┌──────────────────────────────────────────────────────────────────┐
27//! │                        Database Kernel                            │
28//! │  Arc<Database> - shared by all connections                       │
29//! ├──────────────────────────────────────────────────────────────────┤
30//! │                                                                   │
31//! │  ┌─────────────────┐   ┌─────────────────┐   ┌────────────────┐ │
32//! │  │  DurableStorage │   │     Catalog     │   │  Vector Index  │ │
33//! │  │  (WAL + MVCC)   │   │  (Schema Mgmt)  │   │  (HNSW/Vamana) │ │
34//! │  └────────┬────────┘   └────────┬────────┘   └───────┬────────┘ │
35//! │           │                     │                     │          │
36//! │           └─────────────────────┴─────────────────────┘          │
37//! │                                 │                                 │
38//! │  ┌─────────────────────────────────────────────────────────────┐ │
39//! │  │              Query Executor (Path-Native)                    │ │
40//! │  │  - Path resolution: O(|path|)                                │ │
41//! │  │  - Column projection: 80% I/O reduction                     │ │
42//! │  │  - Context selection: Token-aware chunking                  │ │
43//! │  └─────────────────────────────────────────────────────────────┘ │
44//! │                                                                   │
45//! └──────────────────────────────────────────────────────────────────┘
46//!
47//! Deployment Modes:
48//! ┌─────────────┐   ┌─────────────┐   ┌─────────────┐
49//! │  Embedded   │   │  IPC Server │   │  TCP Server │
50//! │  (in-proc)  │   │  (Unix sock)│   │  (remote)   │
51//! └──────┬──────┘   └──────┬──────┘   └──────┬──────┘
52//!        │                 │                 │
53//!        └─────────────────┴─────────────────┘
54//!                          │
55//!                   Arc<Database>
56//! ```
57//!
58//! ## Latency Model
59//!
60//! Let K = kernel processing cost for a query
61//!
62//! - Embedded: L_emb ≈ K (function call overhead negligible)
63//! - IPC: L_ipc ≈ K + δ_ipc (δ_ipc = ~10-50µs for Unix socket)
64//! - TCP: L_tcp ≈ K + δ_net (δ_net = 100µs-10ms depending on network)
65//!
66//! For LLM context queries where K >> δ_ipc, IPC is "nearly embedded".
67
68use std::collections::HashMap;
69use std::path::{Path, PathBuf};
70use std::sync::Arc;
71use std::sync::atomic::{AtomicU64, Ordering};
72
73use dashmap::DashMap;
74use parking_lot::RwLock;
75
76use crate::durable_storage::{DurableStorage, TransactionMode};
77use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
78use crate::key_buffer::KeyBuffer;
79use crate::packed_row::{PackedColumnDef, PackedColumnType, PackedRow, PackedTableSchema};
80use sochdb_core::catalog::Catalog;
81use sochdb_core::{Result, SochDBError, SochValue};
82
83// Re-export key types
84pub use crate::durable_storage::RecoveryStats;
85
86/// Database configuration
87#[derive(Debug, Clone)]
88pub struct DatabaseConfig {
89    /// Enable group commit for better write throughput
90    pub group_commit: bool,
91    /// Maximum memory for memtables before flush (bytes)
92    pub memtable_size_limit: usize,
93    /// Enable WAL for durability
94    pub wal_enabled: bool,
95    /// Sync mode: fsync after every commit vs periodic
96    pub sync_mode: SyncMode,
97    /// Read-only mode
98    pub read_only: bool,
99
100    /// Enable ordered index for O(log N) prefix scans
101    ///
102    /// # Deprecation Notice
103    ///
104    /// **DEPRECATED since 0.2.0**: Use `default_index_policy` instead for per-table control.
105    /// This field will be removed in v0.3.0.
106    ///
107    /// ## Migration Guide
108    ///
109    /// Replace:
110    /// ```ignore
111    /// DatabaseConfig { enable_ordered_index: true, .. }  // Old API
112    /// DatabaseConfig { enable_ordered_index: false, .. } // Old API
113    /// ```
114    ///
115    /// With:
116    /// ```ignore
117    /// DatabaseConfig { default_index_policy: IndexPolicy::ScanOptimized, .. }  // Ordered index enabled
118    /// DatabaseConfig { default_index_policy: IndexPolicy::WriteOptimized, .. } // Ordered index disabled
119    /// ```
120    ///
121    /// ## Behavior
122    ///
123    /// When false, saves ~134 ns/op on writes (20% speedup)
124    /// but scan_prefix becomes O(N) instead of O(log N + K).
125    ///
126    /// Set to false for write-heavy workloads without range scans.
127    #[deprecated(
128        since = "0.2.0",
129        note = "Use `default_index_policy` field instead. This field will be removed in v0.3.0. \
130                Set IndexPolicy::ScanOptimized for ordered index, WriteOptimized to disable."
131    )]
132    ///
133    /// Set to false for write-heavy workloads without range scans.
134    pub enable_ordered_index: bool,
135    /// Group commit configuration
136    pub group_commit_config: GroupCommitSettings,
137    /// Default index policy for tables not explicitly configured
138    ///
139    /// This replaces the global `enable_ordered_index` toggle with
140    /// fine-grained per-table control. Use `index_registry` to configure
141    /// individual tables.
142    ///
143    /// | Policy         | Insert Cost | Scan Cost      | Use Case              |
144    /// |----------------|-------------|----------------|------------------------|
145    /// | WriteOptimized | O(1)        | O(N)           | High-write, rare scan  |
146    /// | Balanced       | O(1) amort  | O(output+logK) | Mixed OLTP            |
147    /// | ScanOptimized  | O(log N)    | O(logN + K)    | Analytics, range query |
148    /// | AppendOnly     | O(1)        | O(N)           | Time-series logs       |
149    pub default_index_policy: IndexPolicy,
150}
151
152/// Group commit settings - mirrors SQLite's WAL mode tuning
153///
154/// ## Performance Model
155///
156/// Without group commit: Throughput = 1 / L_fsync ≈ 200 commits/sec (L=5ms)
157/// With group commit (batch size K): Throughput = K / L_fsync = K × 200 commits/sec
158///
159/// For K=100: 20,000 commits/sec (100× speedup)
160///
161/// ## SQLite Comparison
162///
163/// | Setting                    | SQLite Equivalent           |
164/// |----------------------------|-----------------------------|
165/// | batch_size = 1             | PRAGMA synchronous = FULL   |
166/// | batch_size = 100           | WAL mode with batching      |
167/// | max_wait_us = 0            | No delay, immediate flush   |
168/// | max_wait_us = 10000        | Up to 10ms delay for batch  |
169#[derive(Debug, Clone)]
170pub struct GroupCommitSettings {
171    /// Minimum batch size before flush (default: 1)
172    pub min_batch_size: usize,
173    /// Maximum batch size (default: 1000)
174    pub max_batch_size: usize,
175    /// Maximum wait time before flush in microseconds (default: 10000 = 10ms)
176    pub max_wait_us: u64,
177    /// Expected fsync latency in microseconds (for adaptive sizing)
178    pub fsync_latency_us: u64,
179}
180
181impl Default for GroupCommitSettings {
182    fn default() -> Self {
183        Self {
184            min_batch_size: 1,
185            max_batch_size: 1000,
186            max_wait_us: 10_000,     // 10ms
187            fsync_latency_us: 5_000, // 5ms
188        }
189    }
190}
191
192impl GroupCommitSettings {
193    /// High throughput preset - maximizes batching
194    pub fn high_throughput() -> Self {
195        Self {
196            min_batch_size: 50,
197            max_batch_size: 5000,
198            max_wait_us: 50_000, // 50ms
199            fsync_latency_us: 5_000,
200        }
201    }
202
203    /// Low latency preset - minimal batching
204    pub fn low_latency() -> Self {
205        Self {
206            min_batch_size: 1,
207            max_batch_size: 10,
208            max_wait_us: 1_000, // 1ms
209            fsync_latency_us: 5_000,
210        }
211    }
212
213    /// Calculate optimal batch size using Little's Law
214    ///
215    /// N* = sqrt(2 × L_fsync × λ / C_wait)
216    ///
217    /// # Arguments
218    /// * `arrival_rate` - Operations per second
219    /// * `wait_cost` - Cost coefficient for waiting (0.0-1.0)
220    pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
221        let l_fsync = self.fsync_latency_us as f64 / 1_000_000.0;
222        let n_star = (2.0 * l_fsync * arrival_rate / wait_cost.max(0.001)).sqrt();
223        (n_star as usize).clamp(self.min_batch_size, self.max_batch_size)
224    }
225}
226
227impl Default for DatabaseConfig {
228    #[allow(deprecated)]
229    fn default() -> Self {
230        Self {
231            group_commit: true,
232            memtable_size_limit: 64 * 1024 * 1024, // 64MB
233            wal_enabled: true,
234            sync_mode: SyncMode::Normal,
235            read_only: false,
236            enable_ordered_index: true, // Default: enabled for compatibility
237            group_commit_config: GroupCommitSettings::default(),
238            default_index_policy: IndexPolicy::Balanced, // New default: balanced OLTP policy
239        }
240    }
241}
242
243impl DatabaseConfig {
244    /// Create config optimized for throughput (Fast Mode)
245    ///
246    /// - Disables ordered index (saves ~134 ns/op)
247    /// - Uses high-throughput group commit settings
248    /// - Suitable for append-only workloads
249    #[allow(deprecated)]
250    pub fn throughput_optimized() -> Self {
251        Self {
252            group_commit: true,
253            memtable_size_limit: 128 * 1024 * 1024, // 128MB
254            wal_enabled: true,
255            sync_mode: SyncMode::Normal,
256            read_only: false,
257            enable_ordered_index: false,
258            group_commit_config: GroupCommitSettings::high_throughput(),
259            default_index_policy: IndexPolicy::WriteOptimized, // No ordered index overhead
260        }
261    }
262
263    /// Create config optimized for latency
264    ///
265    /// - Keeps ordered index for fast range scans
266    /// - Uses low-latency group commit settings
267    /// - Suitable for OLTP workloads
268    #[allow(deprecated)]
269    pub fn latency_optimized() -> Self {
270        Self {
271            group_commit: true,
272            memtable_size_limit: 32 * 1024 * 1024, // 32MB
273            wal_enabled: true,
274            sync_mode: SyncMode::Full,
275            read_only: false,
276            enable_ordered_index: true,
277            group_commit_config: GroupCommitSettings::low_latency(),
278            default_index_policy: IndexPolicy::ScanOptimized, // Fast range scans
279        }
280    }
281
282    /// Create config matching SQLite defaults
283    #[allow(deprecated)]
284    pub fn sqlite_compatible() -> Self {
285        Self {
286            group_commit: false, // SQLite default is single-commit
287            memtable_size_limit: 64 * 1024 * 1024,
288            wal_enabled: true,
289            sync_mode: SyncMode::Normal, // PRAGMA synchronous = NORMAL
290            read_only: false,
291            enable_ordered_index: true,
292            group_commit_config: GroupCommitSettings::default(),
293            default_index_policy: IndexPolicy::Balanced, // Good default for mixed workloads
294        }
295    }
296
297    /// Get effective ordered index setting, derived from `default_index_policy`.
298    ///
299    /// This is the shim method for the deprecated `enable_ordered_index` field.
300    /// It returns `true` if the policy requires an ordered index (ScanOptimized),
301    /// and `false` otherwise (WriteOptimized, Balanced, AppendOnly).
302    ///
303    /// # Policy Mapping
304    ///
305    /// | IndexPolicy      | Returns |
306    /// |------------------|---------|
307    /// | ScanOptimized    | true    |
308    /// | Balanced         | false   |
309    /// | WriteOptimized   | false   |
310    /// | AppendOnly       | false   |
311    ///
312    /// Note: `Balanced` uses lazy compaction rather than a live ordered index,
313    /// so it returns `false` for the low-level memtable config but still supports
314    /// efficient range scans via sorted runs.
315    pub fn effective_ordered_index(&self) -> bool {
316        matches!(self.default_index_policy, IndexPolicy::ScanOptimized)
317    }
318}
319
320/// WAL sync mode - matches SQLite's PRAGMA synchronous semantics
321///
322/// | SochDB     | SQLite       | Description                                    |
323/// |------------|--------------|------------------------------------------------|
324/// | Off        | OFF (0)      | No fsync, risk of data loss on crash           |
325/// | Normal     | NORMAL (1)   | Fsync at checkpoints, not every commit         |
326/// | Full       | FULL (2)     | Fsync every commit (safest, slowest)           |
327///
328/// # Performance vs Durability Trade-offs
329///
330/// - **Off**: ~10x faster than Full, but may lose last ~100ms of data on crash
331/// - **Normal**: ~5x faster than Full, durable at checkpoint boundaries
332/// - **Full**: Every commit is fsync'd, no data loss possible
333///
334/// # SQLite Compatibility
335///
336/// ```sql
337/// -- SQLite equivalent settings
338/// PRAGMA synchronous = OFF;    -- SyncMode::Off
339/// PRAGMA synchronous = NORMAL; -- SyncMode::Normal  
340/// PRAGMA synchronous = FULL;   -- SyncMode::Full
341/// ```
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum SyncMode {
344    /// No fsync (equivalent to SQLite PRAGMA synchronous = OFF)
345    ///
346    /// Writes buffered in OS, may lose data on power failure.
347    /// Use for non-critical data or bulk loading.
348    Off = 0,
349
350    /// Fsync at checkpoints (equivalent to SQLite PRAGMA synchronous = NORMAL)
351    ///
352    /// Default mode. Syncs WAL at checkpoint boundaries.
353    /// Good balance of performance and durability.
354    Normal = 1,
355
356    /// Fsync every commit (equivalent to SQLite PRAGMA synchronous = FULL)
357    ///
358    /// Safest mode. Every commit is immediately durable.
359    /// Required for financial/critical data.
360    Full = 2,
361}
362
363impl SyncMode {
364    /// Convert from SQLite synchronous pragma value
365    pub fn from_sqlite_pragma(value: u32) -> Self {
366        match value {
367            0 => SyncMode::Off,
368            1 => SyncMode::Normal,
369            _ => SyncMode::Full, // 2+ treated as Full
370        }
371    }
372
373    /// Convert to SQLite synchronous pragma value
374    pub fn to_sqlite_pragma(self) -> u32 {
375        self as u32
376    }
377
378    /// Parse from string (case-insensitive)
379    pub fn parse(s: &str) -> Option<Self> {
380        match s.to_ascii_uppercase().as_str() {
381            "OFF" | "0" => Some(SyncMode::Off),
382            "NORMAL" | "1" => Some(SyncMode::Normal),
383            "FULL" | "2" => Some(SyncMode::Full),
384            _ => None,
385        }
386    }
387}
388
389/// Table schema for the kernel
390#[derive(Debug, Clone)]
391pub struct TableSchema {
392    pub name: String,
393    pub columns: Vec<ColumnDef>,
394}
395
396/// Column definition
397#[derive(Debug, Clone)]
398pub struct ColumnDef {
399    pub name: String,
400    pub col_type: ColumnType,
401    pub nullable: bool,
402}
403
404/// Column types
405#[derive(Debug, Clone, Copy, PartialEq, Eq)]
406pub enum ColumnType {
407    Int64,
408    UInt64,
409    Float64,
410    Text,
411    Binary,
412    Bool,
413}
414
415/// Transaction handle for kernel operations
416#[derive(Debug, Clone, Copy)]
417pub struct TxnHandle {
418    pub txn_id: u64,
419    pub snapshot_ts: u64,
420}
421
422/// Query result from the kernel
423#[derive(Debug, Clone)]
424pub struct QueryResult {
425    /// Column names
426    pub columns: Vec<String>,
427    /// Row data (each row is a map of column -> value)
428    pub rows: Vec<HashMap<String, SochValue>>,
429    /// Number of rows scanned (for stats)
430    pub rows_scanned: usize,
431    /// Bytes read from storage
432    pub bytes_read: usize,
433}
434
435impl QueryResult {
436    /// Empty result
437    pub fn empty() -> Self {
438        Self {
439            columns: vec![],
440            rows: vec![],
441            rows_scanned: 0,
442            bytes_read: 0,
443        }
444    }
445
446    /// Convert to TOON format for token efficiency
447    pub fn to_toon(&self) -> String {
448        if self.rows.is_empty() {
449            return "[]".to_string();
450        }
451
452        // TOON format: table[N]{cols}: row1; row2; ...
453        let n = self.rows.len();
454        let cols = self.columns.join(",");
455
456        let rows_str: Vec<String> = self
457            .rows
458            .iter()
459            .map(|row| {
460                self.columns
461                    .iter()
462                    .map(|c| {
463                        row.get(c)
464                            .map(format_soch_value)
465                            .unwrap_or_else(|| "∅".to_string())
466                    })
467                    .collect::<Vec<_>>()
468                    .join(",")
469            })
470            .collect();
471
472        format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
473    }
474}
475
476fn format_soch_value(v: &SochValue) -> String {
477    match v {
478        SochValue::Null => "∅".to_string(),
479        SochValue::Int(i) => i.to_string(),
480        SochValue::UInt(u) => u.to_string(),
481        SochValue::Float(f) => format!("{:.6}", f),
482        SochValue::Text(s) => {
483            if s.contains(',') || s.contains(';') {
484                format!("\"{}\"", s.replace('"', "\\\""))
485            } else {
486                s.clone()
487            }
488        }
489        SochValue::Bool(b) => if *b { "T" } else { "F" }.to_string(),
490        SochValue::Binary(b) => format!("b64:{}", base64_encode(b)),
491        _ => format!("{:?}", v),
492    }
493}
494
495fn base64_encode(data: &[u8]) -> String {
496    // Simple base64 encoding
497    const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
498    let mut result = String::new();
499
500    for chunk in data.chunks(3) {
501        let b0 = chunk[0] as usize;
502        let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
503        let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
504
505        result.push(ALPHABET[b0 >> 2] as char);
506        result.push(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
507
508        if chunk.len() > 1 {
509            result.push(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
510        } else {
511            result.push('=');
512        }
513
514        if chunk.len() > 2 {
515            result.push(ALPHABET[b2 & 0x3f] as char);
516        } else {
517            result.push('=');
518        }
519    }
520
521    result
522}
523
524// ============================================================================
525// Columnar Query Results - SIMD-friendly result format
526// ============================================================================
527
528use sochdb_core::TypedColumn as CoreTypedColumn;
529
530/// Columnar query result - SIMD-friendly format for analytics
531///
532/// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, this returns
533/// column-oriented `Vec<TypedColumn>` for efficient vectorized operations.
534///
535/// ## Memory Layout
536///
537/// Row-oriented (standard):
538/// ```text
539/// Row 0: [id=1, name="Alice", score=85]
540/// Row 1: [id=2, name="Bob", score=92]
541/// Row 2: [id=3, name="Carol", score=78]
542/// ```
543///
544/// Column-oriented (this format):
545/// ```text
546/// id:    [1, 2, 3]           ← contiguous i64 array (SIMD-friendly)
547/// name:  ["Alice", "Bob", "Carol"] ← Arrow-style string encoding
548/// score: [85, 92, 78]        ← contiguous i64 array
549/// ```
550///
551/// ## Performance Benefits
552///
553/// - SIMD: Column sums use vectorized instructions (~8× faster)
554/// - Cache: Sequential access pattern maximizes L1/L2 cache hits
555/// - Compression: Same-type data compresses better (5-10× typical)
556/// - Filtering: Bitmap operations instead of row iteration
557///
558/// ## Usage
559///
560/// ```ignore
561/// let result = db.query(txn, "users")
562///     .columns(&["id", "score"])
563///     .as_columnar()?;
564///
565/// // SIMD sum
566/// let total_score = result.column("score")
567///     .map(|c| c.sum_i64())
568///     .unwrap_or(0);
569///
570/// // Stats
571/// println!("Rows: {}, Memory: {} bytes", result.row_count(), result.memory_size());
572/// ```
573#[derive(Debug, Clone)]
574pub struct ColumnarQueryResult {
575    /// Column names in order
576    pub columns: Vec<String>,
577    /// Column data - each TypedColumn contains all values for one column
578    pub data: Vec<CoreTypedColumn>,
579    /// Number of rows
580    pub row_count: usize,
581    /// Bytes read from storage
582    pub bytes_read: usize,
583}
584
585impl ColumnarQueryResult {
586    /// Create an empty result
587    pub fn empty() -> Self {
588        Self {
589            columns: vec![],
590            data: vec![],
591            row_count: 0,
592            bytes_read: 0,
593        }
594    }
595
596    /// Get column by name
597    pub fn column(&self, name: &str) -> Option<&CoreTypedColumn> {
598        self.columns
599            .iter()
600            .position(|c| c == name)
601            .and_then(|idx| self.data.get(idx))
602    }
603
604    /// Get column index by name
605    pub fn column_index(&self, name: &str) -> Option<usize> {
606        self.columns.iter().position(|c| c == name)
607    }
608
609    /// Number of rows
610    pub fn row_count(&self) -> usize {
611        self.row_count
612    }
613
614    /// Number of columns
615    pub fn column_count(&self) -> usize {
616        self.columns.len()
617    }
618
619    /// Total memory size in bytes
620    pub fn memory_size(&self) -> usize {
621        self.data.iter().map(|c| c.memory_size()).sum()
622    }
623
624    /// Sum of an i64 column (SIMD-optimized)
625    pub fn sum_i64(&self, column: &str) -> Option<i64> {
626        self.column(column).map(|c| c.sum_i64())
627    }
628
629    /// Sum of an f64 column (SIMD-optimized)
630    pub fn sum_f64(&self, column: &str) -> Option<f64> {
631        self.column(column).map(|c| c.sum_f64())
632    }
633
634    /// Zero-allocation row access by index.
635    ///
636    /// Returns a lightweight view that resolves column values on demand
637    /// from the underlying columnar arrays — no `HashMap` per row.
638    ///
639    /// ```ignore
640    /// let result = query.as_columnar()?;
641    /// for i in 0..result.row_count() {
642    ///     let row = result.row_view(i).unwrap();
643    ///     let name = row.get("name"); // SochValue::Text(...)
644    /// }
645    /// ```
646    #[inline]
647    pub fn row_view(&self, index: usize) -> Option<ColumnarRowView<'_>> {
648        if index < self.row_count {
649            Some(ColumnarRowView {
650                result: self,
651                index,
652            })
653        } else {
654            None
655        }
656    }
657
658    /// Convert to row-oriented `QueryResult` for backward compatibility.
659    ///
660    /// This materialises one `HashMap<String, SochValue>` per row, so prefer
661    /// using `row_view()` or direct columnar access when performance matters.
662    pub fn into_query_result(self) -> QueryResult {
663        let rows: Vec<HashMap<String, SochValue>> = (0..self.row_count)
664            .map(|i| {
665                self.columns
666                    .iter()
667                    .zip(self.data.iter())
668                    .map(|(name, col)| (name.clone(), col.value_at(i)))
669                    .collect()
670            })
671            .collect();
672
673        QueryResult {
674            columns: self.columns,
675            rows,
676            rows_scanned: self.row_count,
677            bytes_read: self.bytes_read,
678        }
679    }
680
681    /// Get column statistics (min, max, null count)
682    pub fn column_stats(&self, column: &str) -> Option<&sochdb_core::columnar::ColumnStats> {
683        self.column(column).map(|c| c.stats())
684    }
685
686    /// Convert to TOON format (token-efficient)
687    pub fn to_toon(&self) -> String {
688        if self.row_count == 0 {
689            return "[]".to_string();
690        }
691
692        let n = self.row_count;
693        let cols = self.columns.join(",");
694
695        // Build rows from columns
696        let mut rows_str = Vec::with_capacity(n);
697        for i in 0..n {
698            let row: Vec<String> = self
699                .data
700                .iter()
701                .map(|col| format_columnar_value(col, i))
702                .collect();
703            rows_str.push(row.join(","));
704        }
705
706        format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
707    }
708}
709
710/// Format a single value from a TypedColumn at index
711fn format_columnar_value(col: &CoreTypedColumn, idx: usize) -> String {
712    match col {
713        CoreTypedColumn::Int64 {
714            values, validity, ..
715        } => {
716            if validity.is_valid(idx) && idx < values.len() {
717                values[idx].to_string()
718            } else {
719                "∅".to_string()
720            }
721        }
722        CoreTypedColumn::UInt64 {
723            values, validity, ..
724        } => {
725            if validity.is_valid(idx) && idx < values.len() {
726                values[idx].to_string()
727            } else {
728                "∅".to_string()
729            }
730        }
731        CoreTypedColumn::Float64 {
732            values, validity, ..
733        } => {
734            if validity.is_valid(idx) && idx < values.len() {
735                format!("{:.6}", values[idx])
736            } else {
737                "∅".to_string()
738            }
739        }
740        CoreTypedColumn::Text {
741            offsets,
742            data,
743            validity,
744            ..
745        } => {
746            if validity.is_valid(idx) && idx + 1 < offsets.len() {
747                let start = offsets[idx] as usize;
748                let end = offsets[idx + 1] as usize;
749                std::str::from_utf8(&data[start..end])
750                    .map(|s| {
751                        if s.contains(',') || s.contains(';') {
752                            format!("\"{}\"", s.replace('"', "\\\""))
753                        } else {
754                            s.to_string()
755                        }
756                    })
757                    .unwrap_or_else(|_| "∅".to_string())
758            } else {
759                "∅".to_string()
760            }
761        }
762        CoreTypedColumn::Binary {
763            offsets,
764            data,
765            validity,
766            ..
767        } => {
768            if validity.is_valid(idx) && idx + 1 < offsets.len() {
769                let start = offsets[idx] as usize;
770                let end = offsets[idx + 1] as usize;
771                format!("b64:{}", base64_encode(&data[start..end]))
772            } else {
773                "∅".to_string()
774            }
775        }
776        CoreTypedColumn::Bool {
777            values,
778            validity,
779            len,
780            ..
781        } => {
782            if validity.is_valid(idx) && idx < *len {
783                let word = idx / 64;
784                let bit = idx % 64;
785                if (values[word] >> bit) & 1 == 1 {
786                    "T"
787                } else {
788                    "F"
789                }
790                .to_string()
791            } else {
792                "∅".to_string()
793            }
794        }
795    }
796}
797
798/// Zero-allocation row view into a `ColumnarQueryResult`.
799///
800/// Provides named-column access (like `HashMap<String, SochValue>`)
801/// without allocating a HashMap per row. Values are read directly
802/// from the underlying typed column arrays.
803///
804/// **Cost per access:** O(1) column index lookup + O(1) array read.
805/// **Allocation:** zero (borrows from `ColumnarQueryResult`).
806#[derive(Debug)]
807pub struct ColumnarRowView<'a> {
808    result: &'a ColumnarQueryResult,
809    index: usize,
810}
811
812impl<'a> ColumnarRowView<'a> {
813    /// Get a value by column name without allocation.
814    ///
815    /// Returns `None` if the column does not exist.
816    /// Returns `Some(SochValue::Null)` if the column exists but the value is NULL.
817    #[inline]
818    pub fn get(&self, column: &str) -> Option<SochValue> {
819        self.result
820            .column_index(column)
821            .map(|ci| self.result.data[ci].value_at(self.index))
822    }
823
824    /// Get all column values as a `Vec<SochValue>` (positional, no HashMap).
825    pub fn values(&self) -> Vec<SochValue> {
826        self.result
827            .data
828            .iter()
829            .map(|col| col.value_at(self.index))
830            .collect()
831    }
832
833    /// Row index within the result set.
834    #[inline]
835    pub fn index(&self) -> usize {
836        self.index
837    }
838
839    /// Materialise this row into a `HashMap<String, SochValue>` for backward
840    /// compatibility.  Prefer `get()` for single column access.
841    pub fn to_map(&self) -> HashMap<String, SochValue> {
842        self.result
843            .columns
844            .iter()
845            .zip(self.result.data.iter())
846            .map(|(name, col)| (name.clone(), col.value_at(self.index)))
847            .collect()
848    }
849}
850
851/// Vector search result
852#[derive(Debug, Clone)]
853pub struct VectorSearchResult {
854    pub id: u64,
855    pub distance: f32,
856    pub metadata: Option<HashMap<String, SochValue>>,
857}
858
859/// The SochDB Database Kernel
860///
861/// This is the shared core used by both embedded (`SochConnection`) and
862/// server (`sochdb-server`) modes. It owns all storage, catalog, and
863/// indexing components.
864///
865/// # Thread Safety
866///
867/// The Database is fully thread-safe via internal synchronization:
868/// - Multiple readers can operate concurrently (MVCC snapshots)
869/// - Writers coordinate through WAL and group commit
870/// - All state is behind Arc/RwLock for shared access
871///
872/// # Concurrency Modes
873///
874/// ## Standard Mode (Single Process)
875/// - Uses exclusive file lock (`flock(LOCK_EX)`)
876/// - Best for: Scripts, notebooks, CLI tools
877/// - Open with: `Database::open(path)`
878///
879/// ## Concurrent Mode (Multi-Process/Web Apps)
880/// - Uses lock-free MVCC for reads, single-writer coordination for writes
881/// - Best for: Web servers, Flask/FastAPI apps, hot reloading
882/// - Open with: `Database::open_concurrent(path)`
883///
884/// # Example
885///
886/// ```ignore
887/// // Standard mode (single process)
888/// let db = Database::open("./my_data")?;
889///
890/// // Concurrent mode (multi-reader, single-writer)
891/// let db = Database::open_concurrent("./my_data")?;
892///
893/// // Begin a transaction
894/// let txn = db.begin_transaction()?;
895///
896/// // Write data
897/// db.put(txn, b"user:1:name", b"Alice")?;
898///
899/// // Commit
900/// db.commit(txn)?;
901/// ```
902#[allow(dead_code)]
903pub struct Database {
904    /// Path to database directory
905    path: PathBuf,
906    /// Durable storage layer (WAL + MVCC + memtable)
907    storage: Arc<DurableStorage>,
908    /// Concurrent MVCC manager (for concurrent mode)
909    concurrent_mvcc: Option<Arc<crate::mvcc_concurrent::ConcurrentMvcc>>,
910    /// Schema catalog
911    catalog: Arc<RwLock<Catalog>>,
912    /// Registered table schemas (name -> schema) - lock-free for reads
913    tables: DashMap<String, TableSchema>,
914    /// Cached packed schemas for fast insert (name -> packed schema)
915    packed_schemas: DashMap<String, PackedTableSchema>,
916    /// Per-table index policy registry
917    index_registry: Arc<TableIndexRegistry>,
918    /// Configuration
919    config: DatabaseConfig,
920    /// Statistics
921    stats: DatabaseStats,
922    /// Shutdown flag
923    shutdown: AtomicU64,
924    /// Whether this database is in concurrent mode
925    is_concurrent: bool,
926    /// CDC (Change Data Capture) log for streaming mutations
927    cdc_log: Option<Arc<crate::cdc::CdcLog>>,
928}
929
930/// Database statistics
931struct DatabaseStats {
932    transactions_started: AtomicU64,
933    transactions_committed: AtomicU64,
934    transactions_aborted: AtomicU64,
935    queries_executed: AtomicU64,
936    bytes_written: AtomicU64,
937    bytes_read: AtomicU64,
938}
939
940impl DatabaseStats {
941    fn new() -> Self {
942        Self {
943            transactions_started: AtomicU64::new(0),
944            transactions_committed: AtomicU64::new(0),
945            transactions_aborted: AtomicU64::new(0),
946            queries_executed: AtomicU64::new(0),
947            bytes_written: AtomicU64::new(0),
948            bytes_read: AtomicU64::new(0),
949        }
950    }
951}
952
953/// Public statistics snapshot
954#[derive(Debug, Clone)]
955pub struct Stats {
956    pub transactions_started: u64,
957    pub transactions_committed: u64,
958    pub transactions_aborted: u64,
959    pub queries_executed: u64,
960    pub bytes_written: u64,
961    pub bytes_read: u64,
962}
963
964impl Database {
965    /// Open or create a database at the given path.
966    ///
967    /// This is the primary entry point, similar to `sqlite3_open()`.
968    /// If the database exists, it will be opened and WAL recovery performed.
969    /// If it doesn't exist, a new database will be created.
970    ///
971    /// # Arguments
972    ///
973    /// * `path` - Directory path for the database files
974    ///
975    /// # Returns
976    ///
977    /// An `Arc<Database>` that can be shared across threads and connections.
978    pub fn open<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
979        Self::open_with_config(path, DatabaseConfig::default())
980    }
981
982    /// Open without locking (for testing crash recovery scenarios)
983    ///
984    /// # Safety
985    /// This should ONLY be used in tests that simulate crashes by forgetting
986    /// the storage instance. In production, always use `open()`.
987    #[cfg(test)]
988    pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
989        let path = path.as_ref().to_path_buf();
990        let config = DatabaseConfig::default();
991
992        let storage = Arc::new(DurableStorage::open_without_lock(&path)?);
993
994        let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
995            config.default_index_policy,
996        ));
997
998        let db = Arc::new(Self {
999            path: path.clone(),
1000            storage,
1001            concurrent_mvcc: None,
1002            catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1003            tables: DashMap::new(),
1004            packed_schemas: DashMap::new(),
1005            index_registry,
1006            config,
1007            stats: DatabaseStats::new(),
1008            shutdown: AtomicU64::new(0),
1009            is_concurrent: false,
1010            cdc_log: None,
1011        });
1012
1013        db.recover()?;
1014        Ok(db)
1015    }
1016
1017    /// Open with custom configuration
1018    pub fn open_with_config<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> Result<Arc<Self>> {
1019        let path = path.as_ref().to_path_buf();
1020
1021        // Use IndexPolicy-based storage configuration for automatic memtable selection
1022        // This derives ordered index and memtable type from the policy
1023        let storage = Arc::new(DurableStorage::open_with_policy(
1024            &path,
1025            config.default_index_policy,
1026            config.group_commit,
1027        )?);
1028
1029        // Propagate sync_mode from config to storage engine.
1030        // Without this, DurableStorage defaults to SyncMode::Normal (adaptive fsync).
1031        storage.set_sync_mode(config.sync_mode as u64);
1032
1033        // Create index registry with default policy from config
1034        let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
1035            config.default_index_policy,
1036        ));
1037
1038        let db = Arc::new(Self {
1039            path: path.clone(),
1040            storage,
1041            concurrent_mvcc: None,
1042            catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1043            tables: DashMap::new(),
1044            packed_schemas: DashMap::new(),
1045            index_registry,
1046            config,
1047            stats: DatabaseStats::new(),
1048            shutdown: AtomicU64::new(0),
1049            is_concurrent: false,
1050            cdc_log: None,
1051        });
1052
1053        // Perform crash recovery if needed
1054        db.recover()?;
1055
1056        Ok(db)
1057    }
1058
1059    /// Open database in concurrent mode (multi-reader, single-writer)
1060    ///
1061    /// This mode allows multiple processes to access the database simultaneously:
1062    /// - **Readers**: Lock-free, concurrent access via MVCC snapshots
1063    /// - **Writers**: Single-writer coordination through atomic locks
1064    ///
1065    /// # Use Cases
1066    ///
1067    /// - Web applications (Flask, FastAPI, Django)
1068    /// - Hot reloading development servers
1069    /// - Multi-process worker pools
1070    /// - Any scenario with concurrent read access
1071    ///
1072    /// # Performance
1073    ///
1074    /// - Read latency: ~100ns (lock-free atomic operations)
1075    /// - Write latency: ~60μs amortized (with group commit)
1076    /// - Concurrent readers: Up to 1024 (configurable)
1077    ///
1078    /// # Example
1079    ///
1080    /// ```ignore
1081    /// // Multiple processes can open the same database
1082    /// let db = Database::open_concurrent("./my_data")?;
1083    ///
1084    /// // Reads are lock-free
1085    /// let value = db.get(b"key")?;
1086    ///
1087    /// // Writes coordinate automatically
1088    /// let txn = db.begin_transaction()?;
1089    /// db.put(txn, b"key", b"value")?;
1090    /// db.commit(txn)?;
1091    /// ```
1092    pub fn open_concurrent<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
1093        Self::open_concurrent_with_config(path, DatabaseConfig::default())
1094    }
1095
1096    /// Open database in concurrent mode with custom configuration
1097    pub fn open_concurrent_with_config<P: AsRef<Path>>(
1098        path: P,
1099        config: DatabaseConfig,
1100    ) -> Result<Arc<Self>> {
1101        use crate::mvcc_concurrent::ConcurrentMvcc;
1102
1103        let path = path.as_ref().to_path_buf();
1104        std::fs::create_dir_all(&path)?;
1105
1106        // Open concurrent MVCC manager (this uses shared memory, not exclusive lock)
1107        let concurrent_mvcc = Arc::new(ConcurrentMvcc::open(&path)?);
1108
1109        // Open storage WITHOUT exclusive lock (concurrent MVCC handles coordination)
1110        // We use a special internal method that skips the file lock
1111        let storage = Arc::new(DurableStorage::open_for_concurrent(
1112            &path,
1113            config.default_index_policy,
1114        )?);
1115
1116        // Propagate sync_mode from config to storage engine
1117        storage.set_sync_mode(config.sync_mode as u64);
1118
1119        // Create index registry with default policy from config
1120        let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
1121            config.default_index_policy,
1122        ));
1123
1124        let db = Arc::new(Self {
1125            path: path.clone(),
1126            storage,
1127            concurrent_mvcc: Some(concurrent_mvcc),
1128            catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1129            tables: DashMap::new(),
1130            packed_schemas: DashMap::new(),
1131            index_registry,
1132            config,
1133            stats: DatabaseStats::new(),
1134            shutdown: AtomicU64::new(0),
1135            is_concurrent: true,
1136            cdc_log: None,
1137        });
1138
1139        // Perform crash recovery if needed
1140        db.recover()?;
1141
1142        // Clean up any stale readers from crashed processes
1143        if let Some(ref mvcc) = db.concurrent_mvcc {
1144            mvcc.cleanup_stale_readers();
1145        }
1146
1147        Ok(db)
1148    }
1149
1150    /// Check if database is in concurrent mode
1151    #[inline]
1152    pub fn is_concurrent(&self) -> bool {
1153        self.is_concurrent
1154    }
1155
1156    /// Perform crash recovery
1157    fn recover(&self) -> Result<RecoveryStats> {
1158        self.storage.recover()
1159    }
1160
1161    /// Get database path
1162    pub fn path(&self) -> &Path {
1163        &self.path
1164    }
1165
1166    // =========================================================================
1167    // Transaction API
1168    // =========================================================================
1169
1170    /// Begin a new transaction
1171    pub fn begin_transaction(&self) -> Result<TxnHandle> {
1172        self.stats
1173            .transactions_started
1174            .fetch_add(1, Ordering::Relaxed);
1175        let txn_id = self.storage.begin_transaction()?;
1176
1177        // Get snapshot timestamp from MVCC
1178        // For now, use txn_id as a proxy (the real snapshot_ts is managed internally)
1179        Ok(TxnHandle {
1180            txn_id,
1181            snapshot_ts: txn_id,
1182        })
1183    }
1184
1185    /// Begin a read-only transaction (optimized: no SSI tracking)
1186    ///
1187    /// Read-only transactions skip SSI read tracking, reducing overhead
1188    /// from ~82ns to ~32ns per read (2.6x faster).
1189    ///
1190    /// Use this for:
1191    /// - SELECT queries that don't modify data
1192    /// - Analytics and reporting queries
1193    /// - Snapshot reads for backup
1194    pub fn begin_read_only(&self) -> Result<TxnHandle> {
1195        self.stats
1196            .transactions_started
1197            .fetch_add(1, Ordering::Relaxed);
1198        let txn_id = self.storage.begin_with_mode(TransactionMode::ReadOnly)?;
1199        Ok(TxnHandle {
1200            txn_id,
1201            snapshot_ts: txn_id,
1202        })
1203    }
1204
1205    /// Begin a lightweight read-only transaction (no WAL overhead).
1206    ///
1207    /// Eliminates WAL mutex acquisitions entirely for read operations.
1208    /// The txn_id is allocated atomically and MVCC snapshot state is created,
1209    /// but NO WAL records are written (no TxnBegin, no TxnAbort).
1210    ///
1211    /// ~5-10x faster per-operation than `begin_read_only()` because it avoids:
1212    /// - 2 WAL mutex lock/unlock cycles per transaction
1213    /// - 2 WAL BufWriter serializations per transaction
1214    ///
1215    /// Callers MUST use `abort_read_only_fast()` to clean up — NOT `commit()`
1216    /// or `abort()`.
1217    #[inline]
1218    pub fn begin_read_only_fast(&self) -> TxnHandle {
1219        let txn_id = self.storage.begin_read_only_fast();
1220        TxnHandle {
1221            txn_id,
1222            snapshot_ts: txn_id,
1223        }
1224    }
1225
1226    /// Abort a fast read-only transaction — O(1), no WAL, no memtable scan.
1227    #[inline]
1228    pub fn abort_read_only_fast(&self, txn: TxnHandle) {
1229        self.storage.abort_read_only_fast(txn.txn_id);
1230    }
1231
1232    /// Read a key WITHOUT any MVCC transaction tracking.
1233    ///
1234    /// Uses the current global timestamp to see all committed writes.
1235    /// Bypasses: begin/abort, active_txns DashMap, record_read, stats.
1236    /// Only safe for single-threaded access with no concurrent writers.
1237    #[inline]
1238    pub fn get_raw_read(&self, key: &[u8]) -> Option<Vec<u8>> {
1239        self.storage.read_latest(key)
1240    }
1241
1242    /// Scan by prefix WITHOUT any MVCC transaction tracking.
1243    ///
1244    /// Uses the current global timestamp. Only safe for single-threaded access.
1245    #[inline]
1246    pub fn scan_raw(&self, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
1247        self.storage.scan_latest(prefix)
1248    }
1249
1250    /// Begin a write-only transaction (optimized: no read tracking)
1251    ///
1252    /// Write-only transactions skip read tracking, improving insert
1253    /// throughput for bulk loading scenarios.
1254    ///
1255    /// Use this for:
1256    /// - Bulk data imports
1257    /// - Append-only logging tables
1258    /// - ETL pipelines
1259    pub fn begin_write_only(&self) -> Result<TxnHandle> {
1260        self.stats
1261            .transactions_started
1262            .fetch_add(1, Ordering::Relaxed);
1263        let txn_id = self.storage.begin_with_mode(TransactionMode::WriteOnly)?;
1264        Ok(TxnHandle {
1265            txn_id,
1266            snapshot_ts: txn_id,
1267        })
1268    }
1269
1270    /// Commit a transaction
1271    ///
1272    /// In concurrent mode, acquires the shared writer lock to ensure
1273    /// WAL writes are serialized across processes, and forces a flush+sync
1274    /// so that subsequent processes see the committed data.
1275    pub fn commit(&self, txn: TxnHandle) -> Result<u64> {
1276        self.stats
1277            .transactions_committed
1278            .fetch_add(1, Ordering::Relaxed);
1279
1280        // In concurrent mode, acquire the cross-process writer lock
1281        // to serialize WAL commits across processes
1282        let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1283            Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1284        } else {
1285            None
1286        };
1287
1288        let commit_ts = self.storage.commit(txn.txn_id)?;
1289
1290        // In concurrent mode, force flush+sync so other processes can see
1291        // the committed data when they open the DB or run recovery.
1292        // Without this, the BufWriter may hold data that isn't visible
1293        // to other processes reading the WAL file.
1294        if self.is_concurrent {
1295            self.storage.flush_wal()?;
1296            self.storage.fsync()?;
1297        }
1298
1299        // Notify concurrent MVCC of commit for GC tracking
1300        if let Some(ref mvcc) = self.concurrent_mvcc {
1301            mvcc.on_commit();
1302        }
1303
1304        Ok(commit_ts)
1305    }
1306
1307    /// Abort a transaction
1308    pub fn abort(&self, txn: TxnHandle) -> Result<()> {
1309        self.stats
1310            .transactions_aborted
1311            .fetch_add(1, Ordering::Relaxed);
1312        self.storage.abort(txn.txn_id)
1313    }
1314
1315    // =========================================================================
1316    // Per-Table Index Policy API
1317    // =========================================================================
1318
1319    /// Configure index policy for a table
1320    ///
1321    /// This allows fine-grained control over write/scan trade-offs per table:
1322    ///
1323    /// | Policy         | Insert Cost | Scan Cost      | Use Case              |
1324    /// |----------------|-------------|----------------|------------------------|
1325    /// | WriteOptimized | O(1)        | O(N)           | High-write, rare scan  |
1326    /// | Balanced       | O(1) amort  | O(output+logK) | Mixed OLTP            |
1327    /// | ScanOptimized  | O(log N)    | O(logN + K)    | Analytics, range query |
1328    /// | AppendOnly     | O(1)        | O(N)           | Time-series logs       |
1329    ///
1330    /// # Example
1331    ///
1332    /// ```ignore
1333    /// // Fast inserts for logs table (no ordered index overhead)
1334    /// db.set_table_index_policy("logs", IndexPolicy::WriteOptimized);
1335    ///
1336    /// // Efficient range scans for analytics table
1337    /// db.set_table_index_policy("analytics", IndexPolicy::ScanOptimized);
1338    ///
1339    /// // Balanced for OLTP tables
1340    /// db.set_table_index_policy("users", IndexPolicy::Balanced);
1341    /// ```
1342    pub fn set_table_index_policy(&self, table: &str, policy: IndexPolicy) {
1343        self.index_registry
1344            .configure_table(TableIndexConfig::new(table, policy));
1345    }
1346
1347    /// Get the index policy for a table
1348    pub fn get_table_index_policy(&self, table: &str) -> IndexPolicy {
1349        self.index_registry.get_policy(table)
1350    }
1351
1352    /// Get the index registry for advanced configuration
1353    pub fn index_registry(&self) -> &Arc<TableIndexRegistry> {
1354        &self.index_registry
1355    }
1356
1357    // =========================================================================
1358    // Key-Value API (Low-level)
1359    // =========================================================================
1360
1361    /// Put a key-value pair
1362    ///
1363    /// In concurrent mode, acquires the shared writer lock to ensure
1364    /// WAL writes are serialized across processes.
1365    pub fn put(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1366        self.stats
1367            .bytes_written
1368            .fetch_add((key.len() + value.len()) as u64, Ordering::Relaxed);
1369
1370        // In concurrent mode, acquire cross-process writer lock
1371        let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1372            Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1373        } else {
1374            None
1375        };
1376
1377        // Use write_refs to avoid unnecessary allocations
1378        self.storage.write_refs(txn.txn_id, key, value)
1379    }
1380
1381    /// Batch put multiple key-value pairs with reduced overhead
1382    ///
1383    /// This amortizes per-operation costs over the entire batch:
1384    /// - Single DashMap lookup
1385    /// - Batch MVCC tracking
1386    /// - Batch memtable writes
1387    ///
1388    /// For 100+ entries, this is 2-3x faster than individual puts.
1389    ///
1390    /// # Example
1391    ///
1392    /// ```ignore
1393    /// let writes: Vec<(&[u8], &[u8])> = vec![
1394    ///     (b"key1", b"value1"),
1395    ///     (b"key2", b"value2"),
1396    ///     (b"key3", b"value3"),
1397    /// ];
1398    /// db.put_batch(txn, &writes)?;
1399    /// ```
1400    pub fn put_batch(&self, txn: TxnHandle, writes: &[(&[u8], &[u8])]) -> Result<()> {
1401        let bytes: u64 = writes.iter().map(|(k, v)| (k.len() + v.len()) as u64).sum();
1402        self.stats.bytes_written.fetch_add(bytes, Ordering::Relaxed);
1403
1404        // In concurrent mode, acquire cross-process writer lock
1405        let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1406            Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1407        } else {
1408            None
1409        };
1410
1411        self.storage.write_batch_refs(txn.txn_id, writes)
1412    }
1413
1414    /// Get a value by key
1415    pub fn get(&self, txn: TxnHandle, key: &[u8]) -> Result<Option<Vec<u8>>> {
1416        let result = self.storage.read(txn.txn_id, key)?;
1417        if let Some(ref data) = result {
1418            self.stats
1419                .bytes_read
1420                .fetch_add(data.len() as u64, Ordering::Relaxed);
1421        }
1422        Ok(result)
1423    }
1424
1425    /// Delete a key
1426    pub fn delete(&self, txn: TxnHandle, key: &[u8]) -> Result<()> {
1427        self.storage.delete(txn.txn_id, key.to_vec())
1428    }
1429
1430    /// Minimum prefix length for scan operations.
1431    /// Prevents expensive full-table scans by requiring a meaningful prefix.
1432    pub const MIN_SCAN_PREFIX_LEN: usize = 2;
1433
1434    /// Scan keys with a prefix (enforces minimum prefix length for safety).
1435    ///
1436    /// # Prefix Safety
1437    ///
1438    /// To prevent accidental full-table scans, this method requires a minimum
1439    /// prefix length of 2 bytes. Use `scan_unchecked` for internal operations
1440    /// that need empty/short prefixes.
1441    ///
1442    /// # Errors
1443    ///
1444    /// Returns `SochDBError::InvalidInput` if prefix is too short.
1445    pub fn scan(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1446        if prefix.len() < Self::MIN_SCAN_PREFIX_LEN {
1447            return Err(SochDBError::InvalidArgument(format!(
1448                "Prefix too short: {} bytes (minimum {} required). \
1449                 Use scan_unchecked() for unrestricted scans.",
1450                prefix.len(),
1451                Self::MIN_SCAN_PREFIX_LEN
1452            )));
1453        }
1454        self.scan_unchecked(txn, prefix)
1455    }
1456
1457    /// Scan keys with a prefix without length validation.
1458    ///
1459    /// # Warning
1460    ///
1461    /// This method allows empty/short prefixes which can cause expensive
1462    /// full-table scans. Use `scan()` unless you specifically need unrestricted
1463    /// prefix access for internal operations.
1464    pub fn scan_unchecked(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1465        let results = self.storage.scan(txn.txn_id, prefix)?;
1466        let bytes: u64 = results
1467            .iter()
1468            .map(|(k, v)| (k.len() + v.len()) as u64)
1469            .sum();
1470        self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1471        Ok(results)
1472    }
1473
1474    /// Scan keys in range
1475    pub fn scan_range(
1476        &self,
1477        txn: TxnHandle,
1478        start: &[u8],
1479        end: &[u8],
1480    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1481        let results = self.storage.scan_range(txn.txn_id, start, end)?;
1482        let bytes: u64 = results
1483            .iter()
1484            .map(|(k, v)| (k.len() + v.len()) as u64)
1485            .sum();
1486        self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1487        Ok(results)
1488    }
1489
1490    /// Streaming scan for very large result sets
1491    ///
1492    /// Returns an iterator that yields (key, value) pairs without
1493    /// materializing the entire result set. Use this for large scans
1494    /// where memory efficiency is important.
1495    ///
1496    /// ## Performance
1497    ///
1498    /// - Memory: O(1) per iteration vs O(N) for scan_range
1499    /// - Latency: First result available immediately vs waiting for all results
1500    /// - Throughput: Slightly lower due to per-item overhead
1501    ///
1502    /// ## Usage
1503    ///
1504    /// ```ignore
1505    /// for result in db.scan_range_iter(txn, b"start", b"end") {
1506    ///     let (key, value) = result?;
1507    ///     // Process immediately - no need to wait for all results
1508    /// }
1509    /// ```
1510    pub fn scan_range_iter<'a>(
1511        &'a self,
1512        txn: TxnHandle,
1513        start: &'a [u8],
1514        end: &'a [u8],
1515    ) -> impl Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a {
1516        let stats = &self.stats;
1517        self.storage
1518            .scan_range_iter(txn.txn_id, start, end)
1519            .map(move |item| {
1520                stats
1521                    .bytes_read
1522                    .fetch_add((item.0.len() + item.1.len()) as u64, Ordering::Relaxed);
1523                Ok(item)
1524            })
1525    }
1526
1527    /// Flush memtable to WAL/Disk
1528    pub fn flush(&self) -> Result<()> {
1529        self.storage.fsync()
1530    }
1531
1532    // =========================================================================
1533    // Path-Native API (SochDB's differentiator)
1534    // =========================================================================
1535
1536    /// Get storage statistics
1537    pub fn storage_stats(&self) -> crate::durable_storage::StorageStats {
1538        self.storage.stats()
1539    }
1540
1541    /// Put a value at a path
1542    ///
1543    /// Path format: "collection/doc_id/field" or "table.row_id.column"
1544    /// Resolution is O(|path|), not O(log N) like B-tree.
1545    pub fn put_path(&self, txn: TxnHandle, path: &str, value: &[u8]) -> Result<()> {
1546        self.put(txn, path.as_bytes(), value)
1547    }
1548
1549    /// Get a value at a path
1550    pub fn get_path(&self, txn: TxnHandle, path: &str) -> Result<Option<Vec<u8>>> {
1551        self.get(txn, path.as_bytes())
1552    }
1553
1554    /// Delete at a path
1555    pub fn delete_path(&self, txn: TxnHandle, path: &str) -> Result<()> {
1556        self.delete(txn, path.as_bytes())
1557    }
1558
1559    /// Scan a path prefix
1560    ///
1561    /// Returns all key-value pairs where key starts with prefix.
1562    /// Useful for: "users/123/" -> all fields of user 123
1563    pub fn scan_path(&self, txn: TxnHandle, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
1564        self.stats.queries_executed.fetch_add(1, Ordering::Relaxed);
1565
1566        let results = self.scan(txn, prefix.as_bytes())?;
1567
1568        Ok(results
1569            .into_iter()
1570            .filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
1571            .collect())
1572    }
1573
1574    // =========================================================================
1575    // Query API
1576    // =========================================================================
1577
1578    /// Execute a path query and return results
1579    ///
1580    /// This is the main query interface for LLM context retrieval.
1581    /// Supports:
1582    /// - Path prefix matching
1583    /// - Column projection (for I/O reduction)
1584    /// - Limit/offset
1585    pub fn query(&self, txn: TxnHandle, path_prefix: &str) -> QueryBuilder<'_> {
1586        QueryBuilder::new(self, txn, path_prefix.to_string())
1587    }
1588
1589    // =========================================================================
1590    // Table API (Higher-level abstraction)
1591    // =========================================================================
1592
1593    /// Register a table schema
1594    pub fn register_table(&self, schema: TableSchema) -> Result<()> {
1595        if self.tables.contains_key(&schema.name) {
1596            return Err(SochDBError::InvalidArgument(format!(
1597                "Table '{}' already exists",
1598                schema.name
1599            )));
1600        }
1601        // Cache the packed schema for fast inserts
1602        let packed_schema = Self::to_packed_schema(&schema);
1603        self.packed_schemas
1604            .insert(schema.name.clone(), packed_schema);
1605        self.tables.insert(schema.name.clone(), schema);
1606        Ok(())
1607    }
1608
1609    /// Get table schema
1610    pub fn get_table_schema(&self, name: &str) -> Option<TableSchema> {
1611        self.tables.get(name).map(|s| s.clone())
1612    }
1613
1614    /// Update the schema for an existing table (used by ALTER TABLE).
1615    ///
1616    /// Replaces the schema in both the `tables` DashMap and the packed schema
1617    /// cache atomically (per-key). The caller is responsible for validating
1618    /// the new schema.
1619    pub fn update_table_schema(&self, old_name: &str, schema: TableSchema) -> Result<()> {
1620        if !self.tables.contains_key(old_name) {
1621            return Err(SochDBError::InvalidArgument(format!(
1622                "Table '{}' not found",
1623                old_name
1624            )));
1625        }
1626        // Remove old entries
1627        self.tables.remove(old_name);
1628        self.packed_schemas.remove(old_name);
1629        // Insert new
1630        let packed = Self::to_packed_schema(&schema);
1631        self.packed_schemas.insert(schema.name.clone(), packed);
1632        self.tables.insert(schema.name.clone(), schema);
1633        Ok(())
1634    }
1635
1636    /// List all tables
1637    pub fn list_tables(&self) -> Vec<String> {
1638        self.tables.iter().map(|e| e.key().clone()).collect()
1639    }
1640
1641    // =========================================================================
1642    // CDC (Change Data Capture)
1643    // =========================================================================
1644
1645    /// Enable CDC on this database, returning the CDC log handle.
1646    ///
1647    /// Subsequent mutations emitted via the SQL execution layer will be
1648    /// recorded in the CDC log for subscriber consumption.
1649    pub fn enable_cdc(&mut self, config: crate::cdc::CdcConfig) -> Arc<crate::cdc::CdcLog> {
1650        let log = crate::cdc::CdcLog::new(config);
1651        self.cdc_log = Some(log.clone());
1652        log
1653    }
1654
1655    /// Get the CDC log handle, if CDC is enabled.
1656    pub fn cdc_log(&self) -> Option<&Arc<crate::cdc::CdcLog>> {
1657        self.cdc_log.as_ref()
1658    }
1659
1660    /// Convert TableSchema to PackedTableSchema for efficient storage
1661    fn to_packed_schema(schema: &TableSchema) -> PackedTableSchema {
1662        let columns = schema
1663            .columns
1664            .iter()
1665            .map(|col| PackedColumnDef {
1666                name: col.name.clone(),
1667                col_type: match col.col_type {
1668                    ColumnType::Int64 => PackedColumnType::Int64,
1669                    ColumnType::UInt64 => PackedColumnType::UInt64,
1670                    ColumnType::Float64 => PackedColumnType::Float64,
1671                    ColumnType::Text => PackedColumnType::Text,
1672                    ColumnType::Binary => PackedColumnType::Binary,
1673                    ColumnType::Bool => PackedColumnType::Bool,
1674                },
1675                nullable: col.nullable,
1676            })
1677            .collect();
1678
1679        PackedTableSchema::new(&schema.name, columns)
1680    }
1681
1682    /// Insert a row into a table
1683    ///
1684    /// Uses packed row format: stores entire row as single key-value pair.
1685    /// This reduces write amplification from 4× to 1× for a 4-column table.
1686    ///
1687    /// # Performance
1688    /// - Before: 4 columns × (WAL entry + MVCC version) = 4 writes
1689    /// - After: 1 packed row = 1 write
1690    /// - Improvement: ~4× fewer WAL entries, ~48% less I/O overhead
1691    pub fn insert_row(
1692        &self,
1693        txn: TxnHandle,
1694        table: &str,
1695        row_id: u64,
1696        values: &HashMap<String, SochValue>,
1697    ) -> Result<()> {
1698        // Use cached packed schema - single DashMap lookup, no clone
1699        let packed_schema = self
1700            .packed_schemas
1701            .get(table)
1702            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1703
1704        // Pack the row using cached schema
1705        let packed_row = PackedRow::pack(&packed_schema, values);
1706
1707        // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1708        let key = KeyBuffer::format_row_key(table, row_id);
1709
1710        self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1711
1712        Ok(())
1713    }
1714
1715    /// Read a row from a table
1716    ///
1717    /// Reads packed row and extracts requested columns in O(k) time.
1718    /// Column projection happens in memory, not storage - all columns are fetched.
1719    pub fn read_row(
1720        &self,
1721        txn: TxnHandle,
1722        table: &str,
1723        row_id: u64,
1724        columns: Option<&[&str]>,
1725    ) -> Result<Option<HashMap<String, SochValue>>> {
1726        let schema = self
1727            .tables
1728            .get(table)
1729            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1730
1731        // Read the packed row with a single key lookup using KeyBuffer
1732        let key = KeyBuffer::format_row_key(table, row_id);
1733        let bytes = match self.get(txn, key.as_bytes())? {
1734            Some(b) => b,
1735            None => return Ok(None),
1736        };
1737
1738        // Use cached packed schema
1739        let packed_schema = self
1740            .packed_schemas
1741            .get(table)
1742            .ok_or_else(|| SochDBError::Internal("Packed schema not found".into()))?;
1743        let packed_row = PackedRow::from_bytes(bytes, packed_schema.num_columns())?;
1744
1745        // Determine which columns to return
1746        let cols_to_read: Vec<&str> = match columns {
1747            Some(c) => c.to_vec(),
1748            None => schema.columns.iter().map(|c| c.name.as_str()).collect(),
1749        };
1750
1751        let mut row = HashMap::new();
1752        for col_name in cols_to_read {
1753            if let Some(idx) = packed_schema.column_index(col_name)
1754                && let Some(col_def) = packed_schema.column(idx)
1755                && let Some(value) = packed_row.get_column(idx, col_def.col_type)
1756            {
1757                row.insert(col_name.to_string(), value);
1758            }
1759        }
1760
1761        Ok(Some(row))
1762    }
1763
1764    /// Insert multiple rows efficiently in a batch
1765    ///
1766    /// This method accumulates all rows and writes them with fewer WAL syncs.
1767    /// Ideal for bulk loading scenarios.
1768    ///
1769    /// # Performance
1770    /// - Uses group commit to batch fsync operations
1771    /// - Expected throughput: 500K-1M rows/sec depending on row size
1772    pub fn insert_rows_batch(
1773        &self,
1774        txn: TxnHandle,
1775        table: &str,
1776        rows: &[(u64, HashMap<String, SochValue>)],
1777    ) -> Result<usize> {
1778        // Use cached packed schema
1779        let packed_schema = self
1780            .packed_schemas
1781            .get(table)
1782            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1783
1784        let mut count = 0;
1785
1786        for (row_id, values) in rows {
1787            // Pack and write using KeyBuffer for efficient key construction
1788            let packed_row = PackedRow::pack(&packed_schema, values);
1789            let key = KeyBuffer::format_row_key(table, *row_id);
1790            self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1791            count += 1;
1792        }
1793
1794        Ok(count)
1795    }
1796
1797    /// Ultra-fast raw put - bypasses all validation
1798    ///
1799    /// Use when you've already validated the data and just need speed.
1800    /// This is ~10× faster than insert_row() for bulk inserts.
1801    #[inline]
1802    pub fn put_raw(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1803        self.storage.write_refs(txn.txn_id, key, value)
1804    }
1805
1806    /// Zero-allocation insert - fastest path for bulk inserts
1807    ///
1808    /// Takes values as a slice in schema column order, avoiding HashMap overhead.
1809    ///
1810    /// # Arguments
1811    /// * `txn` - Transaction handle
1812    /// * `table` - Table name
1813    /// * `row_id` - Row identifier
1814    /// * `values` - Values in schema column order (None = NULL)
1815    ///
1816    /// # Performance
1817    /// - Eliminates ~6 allocations per row vs insert_row()
1818    /// - Expected: 1.2M-1.5M inserts/sec
1819    ///
1820    /// # Example
1821    /// ```ignore
1822    /// let values: &[Option<&SochValue>] = &[
1823    ///     Some(&SochValue::Int(1)),
1824    ///     Some(&SochValue::Text("Alice".into())),
1825    ///     None, // NULL
1826    /// ];
1827    /// db.insert_row_slice(txn, "users", 1, values)?;
1828    /// ```
1829    #[inline]
1830    pub fn insert_row_slice(
1831        &self,
1832        txn: TxnHandle,
1833        table: &str,
1834        row_id: u64,
1835        values: &[Option<&SochValue>],
1836    ) -> Result<()> {
1837        // Use cached packed schema - single DashMap lookup
1838        let packed_schema = self
1839            .packed_schemas
1840            .get(table)
1841            .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1842
1843        // Validate column count matches
1844        if values.len() != packed_schema.num_columns() {
1845            return Err(SochDBError::InvalidArgument(format!(
1846                "Expected {} columns, got {}",
1847                packed_schema.num_columns(),
1848                values.len()
1849            )));
1850        }
1851
1852        // Pack using zero-allocation path
1853        let packed_row = PackedRow::pack_slice(&packed_schema, values);
1854
1855        // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1856        let key = KeyBuffer::format_row_key(table, row_id);
1857
1858        self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1859        Ok(())
1860    }
1861
1862    // =========================================================================
1863    // Maintenance
1864    // =========================================================================
1865
1866    /// Force fsync to disk
1867    pub fn fsync(&self) -> Result<()> {
1868        self.storage.fsync()
1869    }
1870
1871    /// Create a checkpoint
1872    pub fn checkpoint(&self) -> Result<u64> {
1873        self.storage.checkpoint()
1874    }
1875
1876    /// Truncate the WAL file after a checkpoint.
1877    ///
1878    /// See [`DurableStorage::truncate_wal`] for safety notes.
1879    pub fn truncate_wal(&self) -> Result<()> {
1880        self.storage.truncate_wal()
1881    }
1882
1883    /// Run garbage collection
1884    pub fn gc(&self) -> usize {
1885        self.storage.gc()
1886    }
1887
1888    /// Get database statistics
1889    pub fn stats(&self) -> Stats {
1890        Stats {
1891            transactions_started: self.stats.transactions_started.load(Ordering::Relaxed),
1892            transactions_committed: self.stats.transactions_committed.load(Ordering::Relaxed),
1893            transactions_aborted: self.stats.transactions_aborted.load(Ordering::Relaxed),
1894            queries_executed: self.stats.queries_executed.load(Ordering::Relaxed),
1895            bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
1896            bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
1897        }
1898    }
1899
1900    /// Shutdown the database gracefully
1901    pub fn shutdown(&self) -> Result<()> {
1902        if self.shutdown.swap(1, Ordering::SeqCst) == 1 {
1903            return Ok(()); // Already shutting down
1904        }
1905
1906        // Flush any pending writes
1907        self.fsync()?;
1908
1909        // Create clean shutdown marker
1910        let marker = self.path.join(".clean_shutdown");
1911        std::fs::write(&marker, b"ok")?;
1912
1913        Ok(())
1914    }
1915}
1916
1917impl Drop for Database {
1918    fn drop(&mut self) {
1919        // Try graceful shutdown if not already done
1920        if self.shutdown.load(Ordering::SeqCst) == 0 {
1921            let _ = self.fsync();
1922            let marker = self.path.join(".clean_shutdown");
1923            let _ = std::fs::write(&marker, b"ok");
1924        }
1925    }
1926}
1927
1928/// Query builder for fluent query construction
1929pub struct QueryBuilder<'a> {
1930    db: &'a Database,
1931    txn: TxnHandle,
1932    path_prefix: String,
1933    columns: Option<Vec<String>>,
1934    limit: Option<usize>,
1935    offset: Option<usize>,
1936}
1937
1938impl<'a> QueryBuilder<'a> {
1939    fn new(db: &'a Database, txn: TxnHandle, path_prefix: String) -> Self {
1940        Self {
1941            db,
1942            txn,
1943            path_prefix,
1944            columns: None,
1945            limit: None,
1946            offset: None,
1947        }
1948    }
1949
1950    /// Select specific columns (for I/O reduction)
1951    pub fn columns(mut self, cols: &[&str]) -> Self {
1952        self.columns = Some(cols.iter().map(|s| s.to_string()).collect());
1953        self
1954    }
1955
1956    /// Limit results
1957    pub fn limit(mut self, n: usize) -> Self {
1958        self.limit = Some(n);
1959        self
1960    }
1961
1962    /// Skip results
1963    pub fn offset(mut self, n: usize) -> Self {
1964        self.offset = Some(n);
1965        self
1966    }
1967
1968    /// Execute the query
1969    ///
1970    /// Scans packed rows and unpacks them. Each key is "table/row_id" pointing to a packed row.
1971    pub fn execute(self) -> Result<QueryResult> {
1972        self.db
1973            .stats
1974            .queries_executed
1975            .fetch_add(1, Ordering::Relaxed);
1976
1977        // Get schema for the table if we're querying a table
1978        let table_name = self
1979            .path_prefix
1980            .split('/')
1981            .next()
1982            .unwrap_or(&self.path_prefix);
1983        let schema = self.db.tables.get(table_name).map(|s| s.clone());
1984
1985        // When querying a registered table by its bare name, scan the row-key
1986        // prefix "table/" rather than the bare name. Row keys are "table/row_id"
1987        // (see KeyBuffer::format_row_key), so scanning the bare name would:
1988        //   (1) bleed across sibling tables whose names share a prefix — e.g.
1989        //       querying "user" would also match "users/123"; and
1990        //   (2) trip the scan minimum-prefix guard for single-character table
1991        //       names ("t" is 1 byte, below the 2-byte minimum).
1992        // Appending the separator fixes both. Path-style prefixes that already
1993        // contain '/' (e.g. "users/123" to fetch one row's columns) are left
1994        // unchanged so field-prefix scans keep working.
1995        let scan_prefix = if schema.is_some() && !self.path_prefix.contains('/') {
1996            format!("{}/", self.path_prefix)
1997        } else {
1998            self.path_prefix.clone()
1999        };
2000
2001        // Scan the path prefix
2002        let results = self.db.scan_path(self.txn, &scan_prefix)?;
2003
2004        let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
2005        let mut bytes_read = 0usize;
2006
2007        if let Some(ref schema) = schema {
2008            // We have a table schema - use cached packed schema
2009            let packed_schema = self
2010                .db
2011                .packed_schemas
2012                .get(table_name)
2013                .map(|ps| ps.clone())
2014                .unwrap_or_else(|| Database::to_packed_schema(schema));
2015
2016            for (path, value_bytes) in results {
2017                // Parse path: table/row_id
2018                let parts: Vec<&str> = path.split('/').collect();
2019                if parts.len() == 2 {
2020                    // This is a packed row
2021                    bytes_read += value_bytes.len();
2022
2023                    if let Ok(packed_row) =
2024                        PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
2025                    {
2026                        // Unpack all columns or just requested columns
2027                        let mut row = HashMap::new();
2028
2029                        if let Some(ref cols) = self.columns {
2030                            // Only extract requested columns
2031                            for col_name in cols {
2032                                if let Some(idx) = packed_schema.column_index(col_name)
2033                                    && let Some(col_def) = packed_schema.column(idx)
2034                                    && let Some(value) =
2035                                        packed_row.get_column(idx, col_def.col_type)
2036                                {
2037                                    row.insert(col_name.clone(), value);
2038                                }
2039                            }
2040                        } else {
2041                            // Extract all columns
2042                            row = packed_row.unpack(&packed_schema);
2043                        }
2044
2045                        if !row.is_empty() {
2046                            rows.push(row);
2047                        }
2048                    }
2049                }
2050            }
2051        } else {
2052            // Fallback: no schema, try legacy column-per-key format
2053            let mut rows_map: HashMap<String, HashMap<String, SochValue>> = HashMap::new();
2054
2055            for (path, value_bytes) in results {
2056                let parts: Vec<&str> = path.split('/').collect();
2057                if parts.len() >= 3 {
2058                    let row_key = format!("{}/{}", parts[0], parts[1]);
2059                    let col_name = parts[2..].join("/");
2060
2061                    if let Some(ref cols) = self.columns
2062                        && !cols.contains(&col_name)
2063                    {
2064                        continue;
2065                    }
2066
2067                    bytes_read += value_bytes.len();
2068                    let row = rows_map.entry(row_key).or_default();
2069                    row.insert(col_name, deserialize_value(&value_bytes));
2070                }
2071            }
2072
2073            rows = rows_map.into_values().collect();
2074        }
2075
2076        // Apply offset
2077        if let Some(offset) = self.offset {
2078            rows = rows.into_iter().skip(offset).collect();
2079        }
2080
2081        // Apply limit
2082        if let Some(limit) = self.limit {
2083            rows.truncate(limit);
2084        }
2085
2086        // Collect column names
2087        let columns: Vec<String> = self.columns.unwrap_or_else(|| {
2088            rows.iter()
2089                .flat_map(|r| r.keys().cloned())
2090                .collect::<std::collections::HashSet<_>>()
2091                .into_iter()
2092                .collect()
2093        });
2094
2095        Ok(QueryResult {
2096            columns,
2097            rows_scanned: rows.len(),
2098            bytes_read,
2099            rows,
2100        })
2101    }
2102
2103    /// Execute and return TOON format (for LLM efficiency)
2104    pub fn to_toon(self) -> Result<String> {
2105        let result = self.execute()?;
2106        Ok(result.to_toon())
2107    }
2108
2109    /// Execute with lazy iteration - avoids materializing all rows
2110    ///
2111    /// Returns an iterator over rows as `Vec<SochValue>` in schema column order.
2112    /// This is more memory-efficient than `execute()` for large result sets.
2113    ///
2114    /// # Performance
2115    /// - No upfront materialization of all rows
2116    /// - ~40% less memory for large result sets
2117    /// - Ideal for streaming to network or aggregations
2118    ///
2119    /// # Example
2120    /// ```ignore
2121    /// for row_result in db.query(txn, "users").execute_iter()? {
2122    ///     let row = row_result?;
2123    ///     // row is Vec<SochValue> in column order
2124    /// }
2125    /// ```
2126    pub fn execute_iter(self) -> Result<QueryRowIterator> {
2127        self.db
2128            .stats
2129            .queries_executed
2130            .fetch_add(1, Ordering::Relaxed);
2131
2132        let table_name = self
2133            .path_prefix
2134            .split('/')
2135            .next()
2136            .unwrap_or(&self.path_prefix)
2137            .to_string();
2138
2139        // Get packed schema (clone needed for iterator ownership)
2140        let packed_schema = self.db.packed_schemas.get(&table_name).map(|ps| ps.clone());
2141
2142        // Scan the path prefix
2143        let results = self.db.scan_path(self.txn, &self.path_prefix)?;
2144
2145        Ok(QueryRowIterator {
2146            results: results.into_iter(),
2147            packed_schema,
2148            columns: self.columns,
2149            offset: self.offset.unwrap_or(0),
2150            limit: self.limit,
2151            yielded: 0,
2152            skipped: 0,
2153        })
2154    }
2155
2156    /// Execute and return columnar (SIMD-friendly) result format
2157    ///
2158    /// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, returns
2159    /// column-oriented `Vec<TypedColumn>` for vectorized operations.
2160    ///
2161    /// ## Performance Benefits
2162    ///
2163    /// - SIMD: Aggregate operations (sum, avg) use vectorized instructions
2164    /// - Cache: Sequential access maximizes L1/L2 hits
2165    /// - Memory: ~30% less overhead than row-based format
2166    /// - Analytics: Ideal for ML preprocessing and statistics
2167    ///
2168    /// ## Example
2169    ///
2170    /// ```ignore
2171    /// let result = db.query(txn, "users")
2172    ///     .columns(&["id", "score"])
2173    ///     .as_columnar()?;
2174    ///
2175    /// // SIMD-optimized sum
2176    /// let total = result.sum_i64("score").unwrap_or(0);
2177    ///
2178    /// // Direct column access
2179    /// if let Some(scores) = result.column("score") {
2180    ///     for i in 0..scores.len() {
2181    ///         if let Some(v) = scores.get_i64(i) {
2182    ///             println!("Score: {}", v);
2183    ///         }
2184    ///     }
2185    /// }
2186    /// ```
2187    pub fn as_columnar(self) -> Result<ColumnarQueryResult> {
2188        self.db
2189            .stats
2190            .queries_executed
2191            .fetch_add(1, Ordering::Relaxed);
2192
2193        let table_name = self
2194            .path_prefix
2195            .split('/')
2196            .next()
2197            .unwrap_or(&self.path_prefix);
2198        let schema = self.db.tables.get(table_name).map(|s| s.clone());
2199
2200        // Get packed schema
2201        let packed_schema = match self.db.packed_schemas.get(table_name) {
2202            Some(ps) => ps.clone(),
2203            None => return Ok(ColumnarQueryResult::empty()),
2204        };
2205
2206        // Determine columns to fetch
2207        let column_names: Vec<String> = self.columns.clone().unwrap_or_else(|| {
2208            schema
2209                .as_ref()
2210                .map(|s| s.columns.iter().map(|c| c.name.clone()).collect())
2211                .unwrap_or_default()
2212        });
2213
2214        if column_names.is_empty() {
2215            return Ok(ColumnarQueryResult::empty());
2216        }
2217
2218        // Initialize TypedColumns based on schema types
2219        let mut columns: Vec<CoreTypedColumn> = column_names
2220            .iter()
2221            .map(|col_name| {
2222                packed_schema
2223                    .column_index(col_name)
2224                    .and_then(|idx| packed_schema.column(idx))
2225                    .map(|col_def| match col_def.col_type {
2226                        PackedColumnType::Int64 => CoreTypedColumn::new_int64(),
2227                        PackedColumnType::UInt64 => CoreTypedColumn::new_uint64(),
2228                        PackedColumnType::Float64 => CoreTypedColumn::new_float64(),
2229                        PackedColumnType::Text => CoreTypedColumn::new_text(),
2230                        PackedColumnType::Binary => CoreTypedColumn::new_binary(),
2231                        PackedColumnType::Bool => CoreTypedColumn::new_bool(),
2232                        PackedColumnType::Null => CoreTypedColumn::new_text(), // Null column = fallback to text
2233                    })
2234                    .unwrap_or_else(CoreTypedColumn::new_text) // fallback
2235            })
2236            .collect();
2237
2238        // Scan the path prefix
2239        let results = self.db.scan_path(self.txn, &self.path_prefix)?;
2240
2241        let mut row_count = 0;
2242        let mut bytes_read = 0;
2243        let mut skipped = 0;
2244
2245        for (path, value_bytes) in results {
2246            // Parse path: table/row_id
2247            let parts: Vec<&str> = path.split('/').collect();
2248            if parts.len() != 2 {
2249                continue;
2250            }
2251
2252            // Apply offset
2253            if let Some(offset) = self.offset
2254                && skipped < offset
2255            {
2256                skipped += 1;
2257                continue;
2258            }
2259
2260            // Apply limit
2261            if let Some(limit) = self.limit
2262                && row_count >= limit
2263            {
2264                break;
2265            }
2266
2267            bytes_read += value_bytes.len();
2268
2269            if let Ok(packed_row) = PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
2270            {
2271                // Extract each column and push to corresponding TypedColumn
2272                for (col_idx, col_name) in column_names.iter().enumerate() {
2273                    if let Some(schema_idx) = packed_schema.column_index(col_name) {
2274                        if let Some(col_def) = packed_schema.column(schema_idx) {
2275                            let value = packed_row.get_column(schema_idx, col_def.col_type);
2276                            push_value_to_typed_column(&mut columns[col_idx], value);
2277                        } else {
2278                            push_null_to_typed_column(&mut columns[col_idx]);
2279                        }
2280                    } else {
2281                        push_null_to_typed_column(&mut columns[col_idx]);
2282                    }
2283                }
2284                row_count += 1;
2285            }
2286        }
2287
2288        Ok(ColumnarQueryResult {
2289            columns: column_names,
2290            data: columns,
2291            row_count,
2292            bytes_read,
2293        })
2294    }
2295}
2296
2297/// Lazy iterator over query results
2298///
2299/// Unpacks rows on-demand, avoiding upfront materialization.
2300pub struct QueryRowIterator {
2301    results: std::vec::IntoIter<(String, Vec<u8>)>,
2302    packed_schema: Option<PackedTableSchema>,
2303    columns: Option<Vec<String>>,
2304    offset: usize,
2305    limit: Option<usize>,
2306    yielded: usize,
2307    skipped: usize,
2308}
2309
2310impl Iterator for QueryRowIterator {
2311    type Item = Result<Vec<SochValue>>;
2312
2313    fn next(&mut self) -> Option<Self::Item> {
2314        // Check limit
2315        if let Some(limit) = self.limit
2316            && self.yielded >= limit
2317        {
2318            return None;
2319        }
2320
2321        loop {
2322            let (path, value_bytes) = self.results.next()?;
2323
2324            // Parse path: table/row_id
2325            let parts: Vec<&str> = path.split('/').collect();
2326            if parts.len() != 2 {
2327                continue; // Skip non-row entries
2328            }
2329
2330            // Apply offset
2331            if self.skipped < self.offset {
2332                self.skipped += 1;
2333                continue;
2334            }
2335
2336            if let Some(ref schema) = self.packed_schema {
2337                match PackedRow::from_bytes(value_bytes, schema.num_columns()) {
2338                    Ok(packed_row) => {
2339                        let row = if let Some(ref cols) = self.columns {
2340                            // Project specific columns
2341                            cols.iter()
2342                                .map(|col_name| {
2343                                    schema
2344                                        .column_index(col_name)
2345                                        .and_then(|idx| schema.column(idx))
2346                                        .and_then(|col_def| {
2347                                            packed_row.get_column(
2348                                                schema.column_index(col_name).unwrap(),
2349                                                col_def.col_type,
2350                                            )
2351                                        })
2352                                        .unwrap_or(SochValue::Null)
2353                                })
2354                                .collect()
2355                        } else {
2356                            // All columns in order
2357                            packed_row.unpack_to_vec(schema)
2358                        };
2359
2360                        self.yielded += 1;
2361                        return Some(Ok(row));
2362                    }
2363                    Err(e) => return Some(Err(e)),
2364                }
2365            } else {
2366                // No schema - return raw bytes as binary
2367                self.yielded += 1;
2368                return Some(Ok(vec![SochValue::Binary(value_bytes)]));
2369            }
2370        }
2371    }
2372}
2373
2374// Helper functions for serialization (kept for backward compatibility with legacy data)
2375
2376#[allow(dead_code)]
2377fn serialize_value(value: &SochValue) -> Vec<u8> {
2378    // Simple serialization - in production use proper format
2379    match value {
2380        SochValue::Null => vec![0],
2381        SochValue::Int(i) => {
2382            let mut buf = vec![1];
2383            buf.extend_from_slice(&i.to_le_bytes());
2384            buf
2385        }
2386        SochValue::UInt(u) => {
2387            let mut buf = vec![2];
2388            buf.extend_from_slice(&u.to_le_bytes());
2389            buf
2390        }
2391        SochValue::Float(f) => {
2392            let mut buf = vec![3];
2393            buf.extend_from_slice(&f.to_le_bytes());
2394            buf
2395        }
2396        SochValue::Text(s) => {
2397            let mut buf = vec![4];
2398            buf.extend_from_slice(s.as_bytes());
2399            buf
2400        }
2401        SochValue::Bool(b) => vec![5, if *b { 1 } else { 0 }],
2402        SochValue::Binary(b) => {
2403            let mut buf = vec![6];
2404            buf.extend_from_slice(b);
2405            buf
2406        }
2407        _ => {
2408            // Fallback: serialize as text
2409            let s = format!("{:?}", value);
2410            let mut buf = vec![4];
2411            buf.extend_from_slice(s.as_bytes());
2412            buf
2413        }
2414    }
2415}
2416
2417fn deserialize_value(bytes: &[u8]) -> SochValue {
2418    if bytes.is_empty() {
2419        return SochValue::Null;
2420    }
2421
2422    match bytes[0] {
2423        0 => SochValue::Null,
2424        1 if bytes.len() >= 9 => {
2425            let i = i64::from_le_bytes(bytes[1..9].try_into().unwrap());
2426            SochValue::Int(i)
2427        }
2428        2 if bytes.len() >= 9 => {
2429            let u = u64::from_le_bytes(bytes[1..9].try_into().unwrap());
2430            SochValue::UInt(u)
2431        }
2432        3 if bytes.len() >= 9 => {
2433            let f = f64::from_le_bytes(bytes[1..9].try_into().unwrap());
2434            SochValue::Float(f)
2435        }
2436        4 => {
2437            let s = String::from_utf8_lossy(&bytes[1..]).to_string();
2438            SochValue::Text(s)
2439        }
2440        5 if bytes.len() >= 2 => SochValue::Bool(bytes[1] != 0),
2441        6 => SochValue::Binary(bytes[1..].to_vec()),
2442        _ => {
2443            // Treat as text
2444            let s = String::from_utf8_lossy(bytes).to_string();
2445            SochValue::Text(s)
2446        }
2447    }
2448}
2449
2450// ============================================================================
2451// Helper functions for columnar query result building
2452// ============================================================================
2453
2454/// Push a SochValue into a TypedColumn
2455fn push_value_to_typed_column(col: &mut CoreTypedColumn, value: Option<SochValue>) {
2456    match value {
2457        None => push_null_to_typed_column(col),
2458        Some(v) => match (col, v) {
2459            (
2460                CoreTypedColumn::Int64 {
2461                    values,
2462                    validity,
2463                    stats,
2464                },
2465                SochValue::Int(i),
2466            ) => {
2467                values.push(i);
2468                validity.push(true);
2469                stats.update_i64(i);
2470            }
2471            (
2472                CoreTypedColumn::Int64 {
2473                    values,
2474                    validity,
2475                    stats,
2476                },
2477                SochValue::UInt(u),
2478            ) => {
2479                values.push(u as i64);
2480                validity.push(true);
2481                stats.update_i64(u as i64);
2482            }
2483            (
2484                CoreTypedColumn::UInt64 {
2485                    values,
2486                    validity,
2487                    stats,
2488                },
2489                SochValue::UInt(u),
2490            ) => {
2491                values.push(u);
2492                validity.push(true);
2493                stats.update_i64(u as i64);
2494            }
2495            (
2496                CoreTypedColumn::UInt64 {
2497                    values,
2498                    validity,
2499                    stats,
2500                },
2501                SochValue::Int(i),
2502            ) => {
2503                values.push(i as u64);
2504                validity.push(true);
2505                stats.update_i64(i);
2506            }
2507            (
2508                CoreTypedColumn::Float64 {
2509                    values,
2510                    validity,
2511                    stats,
2512                },
2513                SochValue::Float(f),
2514            ) => {
2515                values.push(f);
2516                validity.push(true);
2517                stats.update_f64(f);
2518            }
2519            (
2520                CoreTypedColumn::Float64 {
2521                    values,
2522                    validity,
2523                    stats,
2524                },
2525                SochValue::Int(i),
2526            ) => {
2527                values.push(i as f64);
2528                validity.push(true);
2529                stats.update_f64(i as f64);
2530            }
2531            (
2532                CoreTypedColumn::Text {
2533                    offsets,
2534                    data,
2535                    validity,
2536                    stats,
2537                },
2538                SochValue::Text(s),
2539            ) => {
2540                data.extend_from_slice(s.as_bytes());
2541                offsets.push(data.len() as u32);
2542                validity.push(true);
2543                stats.row_count += 1;
2544            }
2545            (
2546                CoreTypedColumn::Binary {
2547                    offsets,
2548                    data,
2549                    validity,
2550                    stats,
2551                },
2552                SochValue::Binary(b),
2553            ) => {
2554                data.extend_from_slice(&b);
2555                offsets.push(data.len() as u32);
2556                validity.push(true);
2557                stats.row_count += 1;
2558            }
2559            (
2560                CoreTypedColumn::Bool {
2561                    values,
2562                    validity,
2563                    stats,
2564                    len,
2565                },
2566                SochValue::Bool(b),
2567            ) => {
2568                let idx = *len;
2569                *len += 1;
2570                let num_words = (*len).div_ceil(64);
2571                while values.len() < num_words {
2572                    values.push(0);
2573                }
2574                if b {
2575                    let word = idx / 64;
2576                    let bit = idx % 64;
2577                    values[word] |= 1 << bit;
2578                }
2579                validity.push(true);
2580                stats.row_count += 1;
2581            }
2582            // Type mismatch - push as null
2583            (col, _) => push_null_to_typed_column(col),
2584        },
2585    }
2586}
2587
2588/// Push a null value into a TypedColumn
2589fn push_null_to_typed_column(col: &mut CoreTypedColumn) {
2590    match col {
2591        CoreTypedColumn::Int64 {
2592            values,
2593            validity,
2594            stats,
2595        } => {
2596            values.push(0);
2597            validity.push(false);
2598            stats.update_null();
2599        }
2600        CoreTypedColumn::UInt64 {
2601            values,
2602            validity,
2603            stats,
2604        } => {
2605            values.push(0);
2606            validity.push(false);
2607            stats.update_null();
2608        }
2609        CoreTypedColumn::Float64 {
2610            values,
2611            validity,
2612            stats,
2613        } => {
2614            values.push(0.0);
2615            validity.push(false);
2616            stats.update_null();
2617        }
2618        CoreTypedColumn::Text {
2619            offsets,
2620            data: _,
2621            validity,
2622            stats,
2623        } => {
2624            offsets.push(offsets.last().copied().unwrap_or(0));
2625            validity.push(false);
2626            stats.update_null();
2627        }
2628        CoreTypedColumn::Binary {
2629            offsets,
2630            data: _,
2631            validity,
2632            stats,
2633        } => {
2634            offsets.push(offsets.last().copied().unwrap_or(0));
2635            validity.push(false);
2636            stats.update_null();
2637        }
2638        CoreTypedColumn::Bool {
2639            values,
2640            validity,
2641            stats,
2642            len,
2643        } => {
2644            *len += 1;
2645            let num_words = (*len).div_ceil(64);
2646            while values.len() < num_words {
2647                values.push(0);
2648            }
2649            validity.push(false);
2650            stats.update_null();
2651        }
2652    }
2653}
2654
2655#[cfg(test)]
2656mod tests {
2657    use super::*;
2658    use tempfile::tempdir;
2659
2660    #[test]
2661    fn test_database_open_close() {
2662        let dir = tempdir().unwrap();
2663        let db = Database::open(dir.path()).unwrap();
2664
2665        // Should be able to begin a transaction
2666        let txn = db.begin_transaction().unwrap();
2667        assert!(txn.txn_id > 0);
2668
2669        db.abort(txn).unwrap();
2670        db.shutdown().unwrap();
2671    }
2672
2673    #[test]
2674    fn test_database_put_get() {
2675        let dir = tempdir().unwrap();
2676        let db = Database::open(dir.path()).unwrap();
2677
2678        let txn = db.begin_transaction().unwrap();
2679        db.put(txn, b"key1", b"value1").unwrap();
2680
2681        let val = db.get(txn, b"key1").unwrap();
2682        assert_eq!(val, Some(b"value1".to_vec()));
2683
2684        db.commit(txn).unwrap();
2685
2686        // New transaction should see committed data
2687        let txn2 = db.begin_transaction().unwrap();
2688        let val = db.get(txn2, b"key1").unwrap();
2689        assert_eq!(val, Some(b"value1".to_vec()));
2690        db.abort(txn2).unwrap();
2691    }
2692
2693    #[test]
2694    fn test_database_path_api() {
2695        let dir = tempdir().unwrap();
2696        let db = Database::open(dir.path()).unwrap();
2697
2698        let txn = db.begin_transaction().unwrap();
2699
2700        // Write using path API
2701        db.put_path(txn, "users/1/name", b"Alice").unwrap();
2702        db.put_path(txn, "users/1/email", b"alice@example.com")
2703            .unwrap();
2704        db.put_path(txn, "users/2/name", b"Bob").unwrap();
2705
2706        db.commit(txn).unwrap();
2707
2708        // Scan path prefix
2709        let txn2 = db.begin_transaction().unwrap();
2710        let results = db.scan_path(txn2, "users/1/").unwrap();
2711        assert_eq!(results.len(), 2);
2712
2713        db.abort(txn2).unwrap();
2714    }
2715
2716    #[test]
2717    fn test_database_table_api() {
2718        let dir = tempdir().unwrap();
2719        let db = Database::open(dir.path()).unwrap();
2720
2721        // Register table
2722        db.register_table(TableSchema {
2723            name: "users".to_string(),
2724            columns: vec![
2725                ColumnDef {
2726                    name: "name".to_string(),
2727                    col_type: ColumnType::Text,
2728                    nullable: false,
2729                },
2730                ColumnDef {
2731                    name: "age".to_string(),
2732                    col_type: ColumnType::Int64,
2733                    nullable: true,
2734                },
2735            ],
2736        })
2737        .unwrap();
2738
2739        // Insert row
2740        let txn = db.begin_transaction().unwrap();
2741        let mut values = HashMap::new();
2742        values.insert("name".to_string(), SochValue::Text("Alice".to_string()));
2743        values.insert("age".to_string(), SochValue::Int(30));
2744
2745        db.insert_row(txn, "users", 1, &values).unwrap();
2746        db.commit(txn).unwrap();
2747
2748        // Read row
2749        let txn2 = db.begin_transaction().unwrap();
2750        let row = db.read_row(txn2, "users", 1, None).unwrap();
2751        assert!(row.is_some());
2752
2753        let row = row.unwrap();
2754        assert_eq!(row.get("name"), Some(&SochValue::Text("Alice".to_string())));
2755
2756        db.abort(txn2).unwrap();
2757    }
2758
2759    #[test]
2760    fn test_query_does_not_bleed_across_prefix_sibling_tables() {
2761        // Regression: row keys are "table/row_id", so a bare-name prefix scan of
2762        // "user" used to also match "users/..." (cross-table bleed), and a
2763        // single-character table name like "t" tripped the 2-byte scan guard.
2764        // Querying a registered table by name must scan exactly "table/".
2765        let dir = tempdir().unwrap();
2766        let db = Database::open(dir.path()).unwrap();
2767
2768        let col = || ColumnDef {
2769            name: "name".to_string(),
2770            col_type: ColumnType::Text,
2771            nullable: false,
2772        };
2773
2774        // Two sibling tables whose names share a prefix, plus a 1-char table.
2775        for table in ["user", "users", "t"] {
2776            db.register_table(TableSchema {
2777                name: table.to_string(),
2778                columns: vec![col()],
2779            })
2780            .unwrap();
2781        }
2782
2783        let txn = db.begin_transaction().unwrap();
2784        let mut v_user = HashMap::new();
2785        v_user.insert("name".to_string(), SochValue::Text("in_user".to_string()));
2786        db.insert_row(txn, "user", 1, &v_user).unwrap();
2787
2788        let mut v_users = HashMap::new();
2789        v_users.insert("name".to_string(), SochValue::Text("in_users".to_string()));
2790        db.insert_row(txn, "users", 1, &v_users).unwrap();
2791        db.insert_row(txn, "users", 2, &v_users).unwrap();
2792
2793        let mut v_t = HashMap::new();
2794        v_t.insert("name".to_string(), SochValue::Text("in_t".to_string()));
2795        db.insert_row(txn, "t", 1, &v_t).unwrap();
2796        db.commit(txn).unwrap();
2797
2798        // "user" must return exactly its one row — no bleed from "users".
2799        let txn_r = db.begin_read_only_fast();
2800        let user_rows = db.query(txn_r, "user").execute().unwrap().rows;
2801        assert_eq!(user_rows.len(), 1, "querying 'user' bled into 'users'");
2802        assert_eq!(
2803            user_rows[0].get("name"),
2804            Some(&SochValue::Text("in_user".to_string()))
2805        );
2806
2807        // "users" must return its two rows.
2808        let users_rows = db.query(txn_r, "users").execute().unwrap().rows;
2809        assert_eq!(users_rows.len(), 2);
2810
2811        // Single-character table name must be queryable (was rejected by the
2812        // 2-byte scan guard before the "table/" prefix fix).
2813        let t_rows = db.query(txn_r, "t").execute().unwrap().rows;
2814        assert_eq!(t_rows.len(), 1);
2815        assert_eq!(
2816            t_rows[0].get("name"),
2817            Some(&SochValue::Text("in_t".to_string()))
2818        );
2819        db.abort_read_only_fast(txn_r);
2820    }
2821
2822    #[test]
2823    fn test_database_query_builder() {
2824        let dir = tempdir().unwrap();
2825        let db = Database::open(dir.path()).unwrap();
2826
2827        // Insert test data
2828        let txn = db.begin_transaction().unwrap();
2829        db.put_path(txn, "docs/1/title", b"Hello").unwrap();
2830        db.put_path(txn, "docs/1/content", b"World").unwrap();
2831        db.put_path(txn, "docs/2/title", b"Foo").unwrap();
2832        db.put_path(txn, "docs/2/content", b"Bar").unwrap();
2833        db.commit(txn).unwrap();
2834
2835        // Query with limit
2836        let txn2 = db.begin_transaction().unwrap();
2837        let result = db.query(txn2, "docs/").limit(1).execute().unwrap();
2838
2839        assert_eq!(result.rows.len(), 1);
2840        db.abort(txn2).unwrap();
2841    }
2842
2843    #[test]
2844    fn test_database_crash_recovery() {
2845        let dir = tempdir().unwrap();
2846
2847        // Write and commit
2848        {
2849            // Use open_without_lock for crash simulation tests
2850            let db = Database::open_without_lock(dir.path()).unwrap();
2851            // Set sync mode to FULL to ensure data is persisted before "crash"
2852            db.storage.set_sync_mode(2);
2853            let txn = db.begin_transaction().unwrap();
2854            db.put(txn, b"persist", b"this").unwrap();
2855            db.commit(txn).unwrap();
2856            // Don't call shutdown - simulate crash
2857            std::mem::forget(db);
2858        }
2859
2860        // Reopen - should recover
2861        {
2862            let db = Database::open_without_lock(dir.path()).unwrap();
2863            let txn = db.begin_transaction().unwrap();
2864            let val = db.get(txn, b"persist").unwrap();
2865            assert_eq!(val, Some(b"this".to_vec()));
2866            db.abort(txn).unwrap();
2867        }
2868    }
2869
2870    #[test]
2871    fn test_columnar_row_view_zero_alloc() {
2872        use sochdb_core::columnar::TypedColumn;
2873
2874        // Build a small columnar result: 3 rows × 2 columns (id: i64, name: text)
2875        let mut id_col = TypedColumn::new_int64();
2876        id_col.push_i64(Some(1));
2877        id_col.push_i64(Some(2));
2878        id_col.push_i64(Some(3));
2879
2880        let mut name_col = TypedColumn::new_text();
2881        name_col.push_text(Some("Alice"));
2882        name_col.push_text(Some("Bob"));
2883        name_col.push_text(None); // NULL
2884
2885        let cr = ColumnarQueryResult {
2886            columns: vec!["id".to_string(), "name".to_string()],
2887            data: vec![id_col, name_col],
2888            row_count: 3,
2889            bytes_read: 0,
2890        };
2891
2892        // row_view access — zero HashMap allocation
2893        let row0 = cr.row_view(0).unwrap();
2894        assert_eq!(row0.get("id"), Some(SochValue::Int(1)));
2895        assert_eq!(row0.get("name"), Some(SochValue::Text("Alice".to_string())));
2896        assert_eq!(row0.get("nonexistent"), None);
2897
2898        let row2 = cr.row_view(2).unwrap();
2899        assert_eq!(row2.get("id"), Some(SochValue::Int(3)));
2900        assert_eq!(row2.get("name"), Some(SochValue::Null));
2901
2902        // Out of bounds
2903        assert!(cr.row_view(3).is_none());
2904
2905        // values() — positional
2906        let vals = row0.values();
2907        assert_eq!(vals.len(), 2);
2908        assert_eq!(vals[0], SochValue::Int(1));
2909
2910        // to_map() — backward compat
2911        let map = row0.to_map();
2912        assert_eq!(map.get("id"), Some(&SochValue::Int(1)));
2913    }
2914
2915    #[test]
2916    fn test_columnar_into_query_result() {
2917        use sochdb_core::columnar::TypedColumn;
2918
2919        let mut score_col = TypedColumn::new_float64();
2920        score_col.push_f64(Some(9.5));
2921        score_col.push_f64(Some(8.2));
2922
2923        let cr = ColumnarQueryResult {
2924            columns: vec!["score".to_string()],
2925            data: vec![score_col],
2926            row_count: 2,
2927            bytes_read: 100,
2928        };
2929
2930        let qr = cr.into_query_result();
2931        assert_eq!(qr.rows.len(), 2);
2932        assert_eq!(qr.rows[0].get("score"), Some(&SochValue::Float(9.5)));
2933        assert_eq!(qr.rows[1].get("score"), Some(&SochValue::Float(8.2)));
2934        assert_eq!(qr.bytes_read, 100);
2935    }
2936}