sochdb_storage/database.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SochDB Database Kernel
19//!
20//! The shared core that powers both embedded mode (`SochConnection::open`) and
21//! server mode (`sochdb-server`). This is the "SQLite engine" equivalent.
22//!
23//! ## Architecture
24//!
25//! ```text
26//! ┌──────────────────────────────────────────────────────────────────┐
27//! │ Database Kernel │
28//! │ Arc<Database> - shared by all connections │
29//! ├──────────────────────────────────────────────────────────────────┤
30//! │ │
31//! │ ┌─────────────────┐ ┌─────────────────┐ ┌────────────────┐ │
32//! │ │ DurableStorage │ │ Catalog │ │ Vector Index │ │
33//! │ │ (WAL + MVCC) │ │ (Schema Mgmt) │ │ (HNSW/Vamana) │ │
34//! │ └────────┬────────┘ └────────┬────────┘ └───────┬────────┘ │
35//! │ │ │ │ │
36//! │ └─────────────────────┴─────────────────────┘ │
37//! │ │ │
38//! │ ┌─────────────────────────────────────────────────────────────┐ │
39//! │ │ Query Executor (Path-Native) │ │
40//! │ │ - Path resolution: O(|path|) │ │
41//! │ │ - Column projection: 80% I/O reduction │ │
42//! │ │ - Context selection: Token-aware chunking │ │
43//! │ └─────────────────────────────────────────────────────────────┘ │
44//! │ │
45//! └──────────────────────────────────────────────────────────────────┘
46//!
47//! Deployment Modes:
48//! ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
49//! │ Embedded │ │ IPC Server │ │ TCP Server │
50//! │ (in-proc) │ │ (Unix sock)│ │ (remote) │
51//! └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
52//! │ │ │
53//! └─────────────────┴─────────────────┘
54//! │
55//! Arc<Database>
56//! ```
57//!
58//! ## Latency Model
59//!
60//! Let K = kernel processing cost for a query
61//!
62//! - Embedded: L_emb ≈ K (function call overhead negligible)
63//! - IPC: L_ipc ≈ K + δ_ipc (δ_ipc = ~10-50µs for Unix socket)
64//! - TCP: L_tcp ≈ K + δ_net (δ_net = 100µs-10ms depending on network)
65//!
66//! For LLM context queries where K >> δ_ipc, IPC is "nearly embedded".
67
68use std::collections::HashMap;
69use std::path::{Path, PathBuf};
70use std::sync::Arc;
71use std::sync::atomic::{AtomicU64, Ordering};
72
73use dashmap::DashMap;
74use parking_lot::RwLock;
75
76use crate::durable_storage::{DurableStorage, TransactionMode};
77use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
78use crate::key_buffer::KeyBuffer;
79use crate::packed_row::{PackedColumnDef, PackedColumnType, PackedRow, PackedTableSchema};
80use sochdb_core::catalog::Catalog;
81use sochdb_core::{Result, SochDBError, SochValue};
82
83// Re-export key types
84pub use crate::durable_storage::RecoveryStats;
85use crate::durable_storage::StorageEncryption;
86
87/// Database configuration
88#[derive(Debug, Clone)]
89pub struct DatabaseConfig {
90 /// Enable group commit for better write throughput
91 pub group_commit: bool,
92 /// Maximum memory for memtables before flush (bytes)
93 pub memtable_size_limit: usize,
94 /// Enable WAL for durability
95 pub wal_enabled: bool,
96 /// Sync mode: fsync after every commit vs periodic
97 pub sync_mode: SyncMode,
98 /// Read-only mode
99 pub read_only: bool,
100
101 /// Enable ordered index for O(log N) prefix scans
102 ///
103 /// # Deprecation Notice
104 ///
105 /// **DEPRECATED since 0.2.0**: Use `default_index_policy` instead for per-table control.
106 /// This field will be removed in v0.3.0.
107 ///
108 /// ## Migration Guide
109 ///
110 /// Replace:
111 /// ```ignore
112 /// DatabaseConfig { enable_ordered_index: true, .. } // Old API
113 /// DatabaseConfig { enable_ordered_index: false, .. } // Old API
114 /// ```
115 ///
116 /// With:
117 /// ```ignore
118 /// DatabaseConfig { default_index_policy: IndexPolicy::ScanOptimized, .. } // Ordered index enabled
119 /// DatabaseConfig { default_index_policy: IndexPolicy::WriteOptimized, .. } // Ordered index disabled
120 /// ```
121 ///
122 /// ## Behavior
123 ///
124 /// When false, saves ~134 ns/op on writes (20% speedup)
125 /// but scan_prefix becomes O(N) instead of O(log N + K).
126 ///
127 /// Set to false for write-heavy workloads without range scans.
128 #[deprecated(
129 since = "0.2.0",
130 note = "Use `default_index_policy` field instead. This field will be removed in v0.3.0. \
131 Set IndexPolicy::ScanOptimized for ordered index, WriteOptimized to disable."
132 )]
133 ///
134 /// Set to false for write-heavy workloads without range scans.
135 pub enable_ordered_index: bool,
136 /// Group commit configuration
137 pub group_commit_config: GroupCommitSettings,
138 /// Default index policy for tables not explicitly configured
139 ///
140 /// This replaces the global `enable_ordered_index` toggle with
141 /// fine-grained per-table control. Use `index_registry` to configure
142 /// individual tables.
143 ///
144 /// | Policy | Insert Cost | Scan Cost | Use Case |
145 /// |----------------|-------------|----------------|------------------------|
146 /// | WriteOptimized | O(1) | O(N) | High-write, rare scan |
147 /// | Balanced | O(1) amort | O(output+logK) | Mixed OLTP |
148 /// | ScanOptimized | O(log N) | O(logN + K) | Analytics, range query |
149 /// | AppendOnly | O(1) | O(N) | Time-series logs |
150 pub default_index_policy: IndexPolicy,
151}
152
153/// Group commit settings - mirrors SQLite's WAL mode tuning
154///
155/// ## Performance Model
156///
157/// Without group commit: Throughput = 1 / L_fsync ≈ 200 commits/sec (L=5ms)
158/// With group commit (batch size K): Throughput = K / L_fsync = K × 200 commits/sec
159///
160/// For K=100: 20,000 commits/sec (100× speedup)
161///
162/// ## SQLite Comparison
163///
164/// | Setting | SQLite Equivalent |
165/// |----------------------------|-----------------------------|
166/// | batch_size = 1 | PRAGMA synchronous = FULL |
167/// | batch_size = 100 | WAL mode with batching |
168/// | max_wait_us = 0 | No delay, immediate flush |
169/// | max_wait_us = 10000 | Up to 10ms delay for batch |
170#[derive(Debug, Clone)]
171pub struct GroupCommitSettings {
172 /// Minimum batch size before flush (default: 1)
173 pub min_batch_size: usize,
174 /// Maximum batch size (default: 1000)
175 pub max_batch_size: usize,
176 /// Maximum wait time before flush in microseconds (default: 10000 = 10ms)
177 pub max_wait_us: u64,
178 /// Expected fsync latency in microseconds (for adaptive sizing)
179 pub fsync_latency_us: u64,
180}
181
182impl Default for GroupCommitSettings {
183 fn default() -> Self {
184 Self {
185 min_batch_size: 1,
186 max_batch_size: 1000,
187 max_wait_us: 10_000, // 10ms
188 fsync_latency_us: 5_000, // 5ms
189 }
190 }
191}
192
193impl GroupCommitSettings {
194 /// High throughput preset - maximizes batching
195 pub fn high_throughput() -> Self {
196 Self {
197 min_batch_size: 50,
198 max_batch_size: 5000,
199 max_wait_us: 50_000, // 50ms
200 fsync_latency_us: 5_000,
201 }
202 }
203
204 /// Low latency preset - minimal batching
205 pub fn low_latency() -> Self {
206 Self {
207 min_batch_size: 1,
208 max_batch_size: 10,
209 max_wait_us: 1_000, // 1ms
210 fsync_latency_us: 5_000,
211 }
212 }
213
214 /// Calculate optimal batch size using Little's Law
215 ///
216 /// N* = sqrt(2 × L_fsync × λ / C_wait)
217 ///
218 /// # Arguments
219 /// * `arrival_rate` - Operations per second
220 /// * `wait_cost` - Cost coefficient for waiting (0.0-1.0)
221 pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
222 let l_fsync = self.fsync_latency_us as f64 / 1_000_000.0;
223 let n_star = (2.0 * l_fsync * arrival_rate / wait_cost.max(0.001)).sqrt();
224 (n_star as usize).clamp(self.min_batch_size, self.max_batch_size)
225 }
226}
227
228impl Default for DatabaseConfig {
229 #[allow(deprecated)]
230 fn default() -> Self {
231 Self {
232 group_commit: true,
233 memtable_size_limit: 64 * 1024 * 1024, // 64MB
234 wal_enabled: true,
235 sync_mode: SyncMode::Normal,
236 read_only: false,
237 enable_ordered_index: true, // Default: enabled for compatibility
238 group_commit_config: GroupCommitSettings::default(),
239 default_index_policy: IndexPolicy::Balanced, // New default: balanced OLTP policy
240 }
241 }
242}
243
244impl DatabaseConfig {
245 /// Create config optimized for throughput (Fast Mode)
246 ///
247 /// - Disables ordered index (saves ~134 ns/op)
248 /// - Uses high-throughput group commit settings
249 /// - Suitable for append-only workloads
250 #[allow(deprecated)]
251 pub fn throughput_optimized() -> Self {
252 Self {
253 group_commit: true,
254 memtable_size_limit: 128 * 1024 * 1024, // 128MB
255 wal_enabled: true,
256 sync_mode: SyncMode::Normal,
257 read_only: false,
258 enable_ordered_index: false,
259 group_commit_config: GroupCommitSettings::high_throughput(),
260 default_index_policy: IndexPolicy::WriteOptimized, // No ordered index overhead
261 }
262 }
263
264 /// Create config optimized for latency
265 ///
266 /// - Keeps ordered index for fast range scans
267 /// - Uses low-latency group commit settings
268 /// - Suitable for OLTP workloads
269 #[allow(deprecated)]
270 pub fn latency_optimized() -> Self {
271 Self {
272 group_commit: true,
273 memtable_size_limit: 32 * 1024 * 1024, // 32MB
274 wal_enabled: true,
275 sync_mode: SyncMode::Full,
276 read_only: false,
277 enable_ordered_index: true,
278 group_commit_config: GroupCommitSettings::low_latency(),
279 default_index_policy: IndexPolicy::ScanOptimized, // Fast range scans
280 }
281 }
282
283 /// Create config matching SQLite defaults
284 #[allow(deprecated)]
285 pub fn sqlite_compatible() -> Self {
286 Self {
287 group_commit: false, // SQLite default is single-commit
288 memtable_size_limit: 64 * 1024 * 1024,
289 wal_enabled: true,
290 sync_mode: SyncMode::Normal, // PRAGMA synchronous = NORMAL
291 read_only: false,
292 enable_ordered_index: true,
293 group_commit_config: GroupCommitSettings::default(),
294 default_index_policy: IndexPolicy::Balanced, // Good default for mixed workloads
295 }
296 }
297
298 /// Get effective ordered index setting, derived from `default_index_policy`.
299 ///
300 /// This is the shim method for the deprecated `enable_ordered_index` field.
301 /// It returns `true` if the policy requires an ordered index (ScanOptimized),
302 /// and `false` otherwise (WriteOptimized, Balanced, AppendOnly).
303 ///
304 /// # Policy Mapping
305 ///
306 /// | IndexPolicy | Returns |
307 /// |------------------|---------|
308 /// | ScanOptimized | true |
309 /// | Balanced | false |
310 /// | WriteOptimized | false |
311 /// | AppendOnly | false |
312 ///
313 /// Note: `Balanced` uses lazy compaction rather than a live ordered index,
314 /// so it returns `false` for the low-level memtable config but still supports
315 /// efficient range scans via sorted runs.
316 pub fn effective_ordered_index(&self) -> bool {
317 matches!(self.default_index_policy, IndexPolicy::ScanOptimized)
318 }
319}
320
321/// WAL sync mode - matches SQLite's PRAGMA synchronous semantics
322///
323/// | SochDB | SQLite | Description |
324/// |------------|--------------|------------------------------------------------|
325/// | Off | OFF (0) | No fsync, risk of data loss on crash |
326/// | Normal | NORMAL (1) | Fsync at checkpoints, not every commit |
327/// | Full | FULL (2) | Fsync every commit (safest, slowest) |
328///
329/// # Performance vs Durability Trade-offs
330///
331/// - **Off**: ~10x faster than Full, but may lose last ~100ms of data on crash
332/// - **Normal**: ~5x faster than Full, durable at checkpoint boundaries
333/// - **Full**: Every commit is fsync'd, no data loss possible
334///
335/// # SQLite Compatibility
336///
337/// ```sql
338/// -- SQLite equivalent settings
339/// PRAGMA synchronous = OFF; -- SyncMode::Off
340/// PRAGMA synchronous = NORMAL; -- SyncMode::Normal
341/// PRAGMA synchronous = FULL; -- SyncMode::Full
342/// ```
343#[derive(Debug, Clone, Copy, PartialEq, Eq)]
344pub enum SyncMode {
345 /// No fsync (equivalent to SQLite PRAGMA synchronous = OFF)
346 ///
347 /// Writes buffered in OS, may lose data on power failure.
348 /// Use for non-critical data or bulk loading.
349 Off = 0,
350
351 /// Fsync at checkpoints (equivalent to SQLite PRAGMA synchronous = NORMAL)
352 ///
353 /// Default mode. Syncs WAL at checkpoint boundaries.
354 /// Good balance of performance and durability.
355 Normal = 1,
356
357 /// Fsync every commit (equivalent to SQLite PRAGMA synchronous = FULL)
358 ///
359 /// Safest mode. Every commit is immediately durable.
360 /// Required for financial/critical data.
361 Full = 2,
362}
363
364impl SyncMode {
365 /// Convert from SQLite synchronous pragma value
366 pub fn from_sqlite_pragma(value: u32) -> Self {
367 match value {
368 0 => SyncMode::Off,
369 1 => SyncMode::Normal,
370 _ => SyncMode::Full, // 2+ treated as Full
371 }
372 }
373
374 /// Convert to SQLite synchronous pragma value
375 pub fn to_sqlite_pragma(self) -> u32 {
376 self as u32
377 }
378
379 /// Parse from string (case-insensitive)
380 pub fn parse(s: &str) -> Option<Self> {
381 match s.to_ascii_uppercase().as_str() {
382 "OFF" | "0" => Some(SyncMode::Off),
383 "NORMAL" | "1" => Some(SyncMode::Normal),
384 "FULL" | "2" => Some(SyncMode::Full),
385 _ => None,
386 }
387 }
388}
389
390/// Table schema for the kernel
391#[derive(Debug, Clone)]
392pub struct TableSchema {
393 pub name: String,
394 pub columns: Vec<ColumnDef>,
395}
396
397/// Column definition
398#[derive(Debug, Clone)]
399pub struct ColumnDef {
400 pub name: String,
401 pub col_type: ColumnType,
402 pub nullable: bool,
403}
404
405/// Column types
406#[derive(Debug, Clone, Copy, PartialEq, Eq)]
407pub enum ColumnType {
408 Int64,
409 UInt64,
410 Float64,
411 Text,
412 Binary,
413 Bool,
414}
415
416/// Transaction handle for kernel operations
417#[derive(Debug, Clone, Copy)]
418pub struct TxnHandle {
419 pub txn_id: u64,
420 pub snapshot_ts: u64,
421}
422
423/// Query result from the kernel
424#[derive(Debug, Clone)]
425pub struct QueryResult {
426 /// Column names
427 pub columns: Vec<String>,
428 /// Row data (each row is a map of column -> value)
429 pub rows: Vec<HashMap<String, SochValue>>,
430 /// Number of rows scanned (for stats)
431 pub rows_scanned: usize,
432 /// Bytes read from storage
433 pub bytes_read: usize,
434}
435
436impl QueryResult {
437 /// Empty result
438 pub fn empty() -> Self {
439 Self {
440 columns: vec![],
441 rows: vec![],
442 rows_scanned: 0,
443 bytes_read: 0,
444 }
445 }
446
447 /// Convert to TOON format for token efficiency
448 pub fn to_toon(&self) -> String {
449 if self.rows.is_empty() {
450 return "[]".to_string();
451 }
452
453 // TOON format: table[N]{cols}: row1; row2; ...
454 let n = self.rows.len();
455 let cols = self.columns.join(",");
456
457 let rows_str: Vec<String> = self
458 .rows
459 .iter()
460 .map(|row| {
461 self.columns
462 .iter()
463 .map(|c| {
464 row.get(c)
465 .map(format_soch_value)
466 .unwrap_or_else(|| "∅".to_string())
467 })
468 .collect::<Vec<_>>()
469 .join(",")
470 })
471 .collect();
472
473 format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
474 }
475}
476
477fn format_soch_value(v: &SochValue) -> String {
478 match v {
479 SochValue::Null => "∅".to_string(),
480 SochValue::Int(i) => i.to_string(),
481 SochValue::UInt(u) => u.to_string(),
482 SochValue::Float(f) => format!("{:.6}", f),
483 SochValue::Text(s) => {
484 if s.contains(',') || s.contains(';') {
485 format!("\"{}\"", s.replace('"', "\\\""))
486 } else {
487 s.clone()
488 }
489 }
490 SochValue::Bool(b) => if *b { "T" } else { "F" }.to_string(),
491 SochValue::Binary(b) => format!("b64:{}", base64_encode(b)),
492 _ => format!("{:?}", v),
493 }
494}
495
496fn base64_encode(data: &[u8]) -> String {
497 // Simple base64 encoding
498 const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
499 let mut result = String::new();
500
501 for chunk in data.chunks(3) {
502 let b0 = chunk[0] as usize;
503 let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
504 let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
505
506 result.push(ALPHABET[b0 >> 2] as char);
507 result.push(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
508
509 if chunk.len() > 1 {
510 result.push(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
511 } else {
512 result.push('=');
513 }
514
515 if chunk.len() > 2 {
516 result.push(ALPHABET[b2 & 0x3f] as char);
517 } else {
518 result.push('=');
519 }
520 }
521
522 result
523}
524
525// ============================================================================
526// Columnar Query Results - SIMD-friendly result format
527// ============================================================================
528
529use sochdb_core::TypedColumn as CoreTypedColumn;
530
531/// Columnar query result - SIMD-friendly format for analytics
532///
533/// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, this returns
534/// column-oriented `Vec<TypedColumn>` for efficient vectorized operations.
535///
536/// ## Memory Layout
537///
538/// Row-oriented (standard):
539/// ```text
540/// Row 0: [id=1, name="Alice", score=85]
541/// Row 1: [id=2, name="Bob", score=92]
542/// Row 2: [id=3, name="Carol", score=78]
543/// ```
544///
545/// Column-oriented (this format):
546/// ```text
547/// id: [1, 2, 3] ← contiguous i64 array (SIMD-friendly)
548/// name: ["Alice", "Bob", "Carol"] ← Arrow-style string encoding
549/// score: [85, 92, 78] ← contiguous i64 array
550/// ```
551///
552/// ## Performance Benefits
553///
554/// - SIMD: Column sums use vectorized instructions (~8× faster)
555/// - Cache: Sequential access pattern maximizes L1/L2 cache hits
556/// - Compression: Same-type data compresses better (5-10× typical)
557/// - Filtering: Bitmap operations instead of row iteration
558///
559/// ## Usage
560///
561/// ```ignore
562/// let result = db.query(txn, "users")
563/// .columns(&["id", "score"])
564/// .as_columnar()?;
565///
566/// // SIMD sum
567/// let total_score = result.column("score")
568/// .map(|c| c.sum_i64())
569/// .unwrap_or(0);
570///
571/// // Stats
572/// println!("Rows: {}, Memory: {} bytes", result.row_count(), result.memory_size());
573/// ```
574#[derive(Debug, Clone)]
575pub struct ColumnarQueryResult {
576 /// Column names in order
577 pub columns: Vec<String>,
578 /// Column data - each TypedColumn contains all values for one column
579 pub data: Vec<CoreTypedColumn>,
580 /// Number of rows
581 pub row_count: usize,
582 /// Bytes read from storage
583 pub bytes_read: usize,
584}
585
586impl ColumnarQueryResult {
587 /// Create an empty result
588 pub fn empty() -> Self {
589 Self {
590 columns: vec![],
591 data: vec![],
592 row_count: 0,
593 bytes_read: 0,
594 }
595 }
596
597 /// Get column by name
598 pub fn column(&self, name: &str) -> Option<&CoreTypedColumn> {
599 self.columns
600 .iter()
601 .position(|c| c == name)
602 .and_then(|idx| self.data.get(idx))
603 }
604
605 /// Get column index by name
606 pub fn column_index(&self, name: &str) -> Option<usize> {
607 self.columns.iter().position(|c| c == name)
608 }
609
610 /// Number of rows
611 pub fn row_count(&self) -> usize {
612 self.row_count
613 }
614
615 /// Number of columns
616 pub fn column_count(&self) -> usize {
617 self.columns.len()
618 }
619
620 /// Total memory size in bytes
621 pub fn memory_size(&self) -> usize {
622 self.data.iter().map(|c| c.memory_size()).sum()
623 }
624
625 /// Sum of an i64 column (SIMD-optimized)
626 pub fn sum_i64(&self, column: &str) -> Option<i64> {
627 self.column(column).map(|c| c.sum_i64())
628 }
629
630 /// Sum of an f64 column (SIMD-optimized)
631 pub fn sum_f64(&self, column: &str) -> Option<f64> {
632 self.column(column).map(|c| c.sum_f64())
633 }
634
635 /// Zero-allocation row access by index.
636 ///
637 /// Returns a lightweight view that resolves column values on demand
638 /// from the underlying columnar arrays — no `HashMap` per row.
639 ///
640 /// ```ignore
641 /// let result = query.as_columnar()?;
642 /// for i in 0..result.row_count() {
643 /// let row = result.row_view(i).unwrap();
644 /// let name = row.get("name"); // SochValue::Text(...)
645 /// }
646 /// ```
647 #[inline]
648 pub fn row_view(&self, index: usize) -> Option<ColumnarRowView<'_>> {
649 if index < self.row_count {
650 Some(ColumnarRowView {
651 result: self,
652 index,
653 })
654 } else {
655 None
656 }
657 }
658
659 /// Convert to row-oriented `QueryResult` for backward compatibility.
660 ///
661 /// This materialises one `HashMap<String, SochValue>` per row, so prefer
662 /// using `row_view()` or direct columnar access when performance matters.
663 pub fn into_query_result(self) -> QueryResult {
664 let rows: Vec<HashMap<String, SochValue>> = (0..self.row_count)
665 .map(|i| {
666 self.columns
667 .iter()
668 .zip(self.data.iter())
669 .map(|(name, col)| (name.clone(), col.value_at(i)))
670 .collect()
671 })
672 .collect();
673
674 QueryResult {
675 columns: self.columns,
676 rows,
677 rows_scanned: self.row_count,
678 bytes_read: self.bytes_read,
679 }
680 }
681
682 /// Get column statistics (min, max, null count)
683 pub fn column_stats(&self, column: &str) -> Option<&sochdb_core::columnar::ColumnStats> {
684 self.column(column).map(|c| c.stats())
685 }
686
687 /// Convert to TOON format (token-efficient)
688 pub fn to_toon(&self) -> String {
689 if self.row_count == 0 {
690 return "[]".to_string();
691 }
692
693 let n = self.row_count;
694 let cols = self.columns.join(",");
695
696 // Build rows from columns
697 let mut rows_str = Vec::with_capacity(n);
698 for i in 0..n {
699 let row: Vec<String> = self
700 .data
701 .iter()
702 .map(|col| format_columnar_value(col, i))
703 .collect();
704 rows_str.push(row.join(","));
705 }
706
707 format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
708 }
709}
710
711/// Format a single value from a TypedColumn at index
712fn format_columnar_value(col: &CoreTypedColumn, idx: usize) -> String {
713 match col {
714 CoreTypedColumn::Int64 {
715 values, validity, ..
716 } => {
717 if validity.is_valid(idx) && idx < values.len() {
718 values[idx].to_string()
719 } else {
720 "∅".to_string()
721 }
722 }
723 CoreTypedColumn::UInt64 {
724 values, validity, ..
725 } => {
726 if validity.is_valid(idx) && idx < values.len() {
727 values[idx].to_string()
728 } else {
729 "∅".to_string()
730 }
731 }
732 CoreTypedColumn::Float64 {
733 values, validity, ..
734 } => {
735 if validity.is_valid(idx) && idx < values.len() {
736 format!("{:.6}", values[idx])
737 } else {
738 "∅".to_string()
739 }
740 }
741 CoreTypedColumn::Text {
742 offsets,
743 data,
744 validity,
745 ..
746 } => {
747 if validity.is_valid(idx) && idx + 1 < offsets.len() {
748 let start = offsets[idx] as usize;
749 let end = offsets[idx + 1] as usize;
750 std::str::from_utf8(&data[start..end])
751 .map(|s| {
752 if s.contains(',') || s.contains(';') {
753 format!("\"{}\"", s.replace('"', "\\\""))
754 } else {
755 s.to_string()
756 }
757 })
758 .unwrap_or_else(|_| "∅".to_string())
759 } else {
760 "∅".to_string()
761 }
762 }
763 CoreTypedColumn::Binary {
764 offsets,
765 data,
766 validity,
767 ..
768 } => {
769 if validity.is_valid(idx) && idx + 1 < offsets.len() {
770 let start = offsets[idx] as usize;
771 let end = offsets[idx + 1] as usize;
772 format!("b64:{}", base64_encode(&data[start..end]))
773 } else {
774 "∅".to_string()
775 }
776 }
777 CoreTypedColumn::Bool {
778 values,
779 validity,
780 len,
781 ..
782 } => {
783 if validity.is_valid(idx) && idx < *len {
784 let word = idx / 64;
785 let bit = idx % 64;
786 if (values[word] >> bit) & 1 == 1 {
787 "T"
788 } else {
789 "F"
790 }
791 .to_string()
792 } else {
793 "∅".to_string()
794 }
795 }
796 }
797}
798
799/// Zero-allocation row view into a `ColumnarQueryResult`.
800///
801/// Provides named-column access (like `HashMap<String, SochValue>`)
802/// without allocating a HashMap per row. Values are read directly
803/// from the underlying typed column arrays.
804///
805/// **Cost per access:** O(1) column index lookup + O(1) array read.
806/// **Allocation:** zero (borrows from `ColumnarQueryResult`).
807#[derive(Debug)]
808pub struct ColumnarRowView<'a> {
809 result: &'a ColumnarQueryResult,
810 index: usize,
811}
812
813impl<'a> ColumnarRowView<'a> {
814 /// Get a value by column name without allocation.
815 ///
816 /// Returns `None` if the column does not exist.
817 /// Returns `Some(SochValue::Null)` if the column exists but the value is NULL.
818 #[inline]
819 pub fn get(&self, column: &str) -> Option<SochValue> {
820 self.result
821 .column_index(column)
822 .map(|ci| self.result.data[ci].value_at(self.index))
823 }
824
825 /// Get all column values as a `Vec<SochValue>` (positional, no HashMap).
826 pub fn values(&self) -> Vec<SochValue> {
827 self.result
828 .data
829 .iter()
830 .map(|col| col.value_at(self.index))
831 .collect()
832 }
833
834 /// Row index within the result set.
835 #[inline]
836 pub fn index(&self) -> usize {
837 self.index
838 }
839
840 /// Materialise this row into a `HashMap<String, SochValue>` for backward
841 /// compatibility. Prefer `get()` for single column access.
842 pub fn to_map(&self) -> HashMap<String, SochValue> {
843 self.result
844 .columns
845 .iter()
846 .zip(self.result.data.iter())
847 .map(|(name, col)| (name.clone(), col.value_at(self.index)))
848 .collect()
849 }
850}
851
852/// Vector search result
853#[derive(Debug, Clone)]
854pub struct VectorSearchResult {
855 pub id: u64,
856 pub distance: f32,
857 pub metadata: Option<HashMap<String, SochValue>>,
858}
859
860/// The SochDB Database Kernel
861///
862/// This is the shared core used by both embedded (`SochConnection`) and
863/// server (`sochdb-server`) modes. It owns all storage, catalog, and
864/// indexing components.
865///
866/// # Thread Safety
867///
868/// The Database is fully thread-safe via internal synchronization:
869/// - Multiple readers can operate concurrently (MVCC snapshots)
870/// - Writers coordinate through WAL and group commit
871/// - All state is behind Arc/RwLock for shared access
872///
873/// # Concurrency Modes
874///
875/// ## Standard Mode (Single Process)
876/// - Uses exclusive file lock (`flock(LOCK_EX)`)
877/// - Best for: Scripts, notebooks, CLI tools
878/// - Open with: `Database::open(path)`
879///
880/// ## Concurrent Mode (Multi-Process/Web Apps)
881/// - Uses lock-free MVCC for reads, single-writer coordination for writes
882/// - Best for: Web servers, Flask/FastAPI apps, hot reloading
883/// - Open with: `Database::open_concurrent(path)`
884///
885/// # Example
886///
887/// ```ignore
888/// // Standard mode (single process)
889/// let db = Database::open("./my_data")?;
890///
891/// // Concurrent mode (multi-reader, single-writer)
892/// let db = Database::open_concurrent("./my_data")?;
893///
894/// // Begin a transaction
895/// let txn = db.begin_transaction()?;
896///
897/// // Write data
898/// db.put(txn, b"user:1:name", b"Alice")?;
899///
900/// // Commit
901/// db.commit(txn)?;
902/// ```
903#[allow(dead_code)]
904pub struct Database {
905 /// Path to database directory
906 path: PathBuf,
907 /// Durable storage layer (WAL + MVCC + memtable)
908 storage: Arc<DurableStorage>,
909 /// Concurrent MVCC manager (for concurrent mode)
910 concurrent_mvcc: Option<Arc<crate::mvcc_concurrent::ConcurrentMvcc>>,
911 /// Schema catalog
912 catalog: Arc<RwLock<Catalog>>,
913 /// Registered table schemas (name -> schema) - lock-free for reads
914 tables: DashMap<String, TableSchema>,
915 /// Cached packed schemas for fast insert (name -> packed schema)
916 packed_schemas: DashMap<String, PackedTableSchema>,
917 /// Per-table index policy registry
918 index_registry: Arc<TableIndexRegistry>,
919 /// Configuration
920 config: DatabaseConfig,
921 /// Statistics
922 stats: DatabaseStats,
923 /// Shutdown flag
924 shutdown: AtomicU64,
925 /// Whether this database is in concurrent mode
926 is_concurrent: bool,
927 /// CDC (Change Data Capture) log for streaming mutations
928 cdc_log: Option<Arc<crate::cdc::CdcLog>>,
929}
930
931/// Database statistics
932struct DatabaseStats {
933 transactions_started: AtomicU64,
934 transactions_committed: AtomicU64,
935 transactions_aborted: AtomicU64,
936 queries_executed: AtomicU64,
937 bytes_written: AtomicU64,
938 bytes_read: AtomicU64,
939}
940
941impl DatabaseStats {
942 fn new() -> Self {
943 Self {
944 transactions_started: AtomicU64::new(0),
945 transactions_committed: AtomicU64::new(0),
946 transactions_aborted: AtomicU64::new(0),
947 queries_executed: AtomicU64::new(0),
948 bytes_written: AtomicU64::new(0),
949 bytes_read: AtomicU64::new(0),
950 }
951 }
952}
953
954/// Public statistics snapshot
955#[derive(Debug, Clone)]
956pub struct Stats {
957 pub transactions_started: u64,
958 pub transactions_committed: u64,
959 pub transactions_aborted: u64,
960 pub queries_executed: u64,
961 pub bytes_written: u64,
962 pub bytes_read: u64,
963}
964
965impl Database {
966 /// Open or create a database at the given path.
967 ///
968 /// This is the primary entry point, similar to `sqlite3_open()`.
969 /// If the database exists, it will be opened and WAL recovery performed.
970 /// If it doesn't exist, a new database will be created.
971 ///
972 /// # Arguments
973 ///
974 /// * `path` - Directory path for the database files
975 ///
976 /// # Returns
977 ///
978 /// An `Arc<Database>` that can be shared across threads and connections.
979 pub fn open<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
980 Self::open_with_config(path, DatabaseConfig::default())
981 }
982
983 /// Open without locking (for testing crash recovery scenarios)
984 ///
985 /// # Safety
986 /// This should ONLY be used in tests that simulate crashes by forgetting
987 /// the storage instance. In production, always use `open()`.
988 #[cfg(test)]
989 pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
990 let path = path.as_ref().to_path_buf();
991 let config = DatabaseConfig::default();
992
993 let storage = Arc::new(DurableStorage::open_without_lock(&path)?);
994
995 let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
996 config.default_index_policy,
997 ));
998
999 let db = Arc::new(Self {
1000 path: path.clone(),
1001 storage,
1002 concurrent_mvcc: None,
1003 catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1004 tables: DashMap::new(),
1005 packed_schemas: DashMap::new(),
1006 index_registry,
1007 config,
1008 stats: DatabaseStats::new(),
1009 shutdown: AtomicU64::new(0),
1010 is_concurrent: false,
1011 cdc_log: None,
1012 });
1013
1014 db.recover()?;
1015 Ok(db)
1016 }
1017
1018 /// Open with custom configuration
1019 pub fn open_with_config<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> Result<Arc<Self>> {
1020 Self::open_with_config_and_encryption(path, config, StorageEncryption::disabled())
1021 }
1022
1023 /// Open with custom configuration AND at-rest encryption.
1024 ///
1025 /// Threads the supplied [`StorageEncryption`] (the KEK envelope) down to the
1026 /// keyring + WAL. `StorageEncryption::disabled()` is exactly plaintext
1027 /// [`Self::open_with_config`]. With a KEK the database is opened (or created
1028 /// on first open) encrypted; a wrong/missing key for an already-encrypted DB
1029 /// fails closed. `StorageEncryption` is move-only (zeroizing), so it is a
1030 /// by-value parameter rather than a `DatabaseConfig` field.
1031 pub fn open_with_config_and_encryption<P: AsRef<Path>>(
1032 path: P,
1033 config: DatabaseConfig,
1034 encryption: StorageEncryption,
1035 ) -> Result<Arc<Self>> {
1036 let path = path.as_ref().to_path_buf();
1037
1038 // Use IndexPolicy-based storage configuration for automatic memtable selection
1039 // This derives ordered index and memtable type from the policy
1040 let storage = Arc::new(DurableStorage::open_with_policy_encrypted(
1041 &path,
1042 config.default_index_policy,
1043 config.group_commit,
1044 encryption,
1045 )?);
1046
1047 // Propagate sync_mode from config to storage engine.
1048 // Without this, DurableStorage defaults to SyncMode::Normal (adaptive fsync).
1049 storage.set_sync_mode(config.sync_mode as u64);
1050
1051 // Create index registry with default policy from config
1052 let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
1053 config.default_index_policy,
1054 ));
1055
1056 let db = Arc::new(Self {
1057 path: path.clone(),
1058 storage,
1059 concurrent_mvcc: None,
1060 catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1061 tables: DashMap::new(),
1062 packed_schemas: DashMap::new(),
1063 index_registry,
1064 config,
1065 stats: DatabaseStats::new(),
1066 shutdown: AtomicU64::new(0),
1067 is_concurrent: false,
1068 cdc_log: None,
1069 });
1070
1071 // Perform crash recovery if needed
1072 db.recover()?;
1073
1074 Ok(db)
1075 }
1076
1077 /// Open database in concurrent mode (multi-reader, single-writer)
1078 ///
1079 /// This mode allows multiple processes to access the database simultaneously:
1080 /// - **Readers**: Lock-free, concurrent access via MVCC snapshots
1081 /// - **Writers**: Single-writer coordination through atomic locks
1082 ///
1083 /// # Use Cases
1084 ///
1085 /// - Web applications (Flask, FastAPI, Django)
1086 /// - Hot reloading development servers
1087 /// - Multi-process worker pools
1088 /// - Any scenario with concurrent read access
1089 ///
1090 /// # Performance
1091 ///
1092 /// - Read latency: ~100ns (lock-free atomic operations)
1093 /// - Write latency: ~60μs amortized (with group commit)
1094 /// - Concurrent readers: Up to 1024 (configurable)
1095 ///
1096 /// # Example
1097 ///
1098 /// ```ignore
1099 /// // Multiple processes can open the same database
1100 /// let db = Database::open_concurrent("./my_data")?;
1101 ///
1102 /// // Reads are lock-free
1103 /// let value = db.get(b"key")?;
1104 ///
1105 /// // Writes coordinate automatically
1106 /// let txn = db.begin_transaction()?;
1107 /// db.put(txn, b"key", b"value")?;
1108 /// db.commit(txn)?;
1109 /// ```
1110 pub fn open_concurrent<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
1111 Self::open_concurrent_with_config(path, DatabaseConfig::default())
1112 }
1113
1114 /// Open database in concurrent mode with custom configuration
1115 pub fn open_concurrent_with_config<P: AsRef<Path>>(
1116 path: P,
1117 config: DatabaseConfig,
1118 ) -> Result<Arc<Self>> {
1119 Self::open_concurrent_with_config_and_encryption(
1120 path,
1121 config,
1122 StorageEncryption::disabled(),
1123 )
1124 }
1125
1126 /// Open in concurrent (multi-reader) mode with at-rest encryption.
1127 ///
1128 /// The encryption-aware sibling of [`Self::open_concurrent_with_config`] —
1129 /// makes the multi-process path able to open an encrypted database instead of
1130 /// failing closed for lack of a key channel.
1131 pub fn open_concurrent_with_config_and_encryption<P: AsRef<Path>>(
1132 path: P,
1133 config: DatabaseConfig,
1134 encryption: StorageEncryption,
1135 ) -> Result<Arc<Self>> {
1136 use crate::mvcc_concurrent::ConcurrentMvcc;
1137
1138 let path = path.as_ref().to_path_buf();
1139 std::fs::create_dir_all(&path)?;
1140
1141 // Open concurrent MVCC manager (this uses shared memory, not exclusive lock)
1142 let concurrent_mvcc = Arc::new(ConcurrentMvcc::open(&path)?);
1143
1144 // Open storage WITHOUT exclusive lock (concurrent MVCC handles coordination)
1145 // We use a special internal method that skips the file lock
1146 let storage = Arc::new(DurableStorage::open_for_concurrent_encrypted(
1147 &path,
1148 config.default_index_policy,
1149 encryption,
1150 )?);
1151
1152 // Propagate sync_mode from config to storage engine
1153 storage.set_sync_mode(config.sync_mode as u64);
1154
1155 // Create index registry with default policy from config
1156 let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
1157 config.default_index_policy,
1158 ));
1159
1160 let db = Arc::new(Self {
1161 path: path.clone(),
1162 storage,
1163 concurrent_mvcc: Some(concurrent_mvcc),
1164 catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1165 tables: DashMap::new(),
1166 packed_schemas: DashMap::new(),
1167 index_registry,
1168 config,
1169 stats: DatabaseStats::new(),
1170 shutdown: AtomicU64::new(0),
1171 is_concurrent: true,
1172 cdc_log: None,
1173 });
1174
1175 // Perform crash recovery if needed
1176 db.recover()?;
1177
1178 // Clean up any stale readers from crashed processes
1179 if let Some(ref mvcc) = db.concurrent_mvcc {
1180 mvcc.cleanup_stale_readers();
1181 }
1182
1183 Ok(db)
1184 }
1185
1186 /// Check if database is in concurrent mode
1187 #[inline]
1188 pub fn is_concurrent(&self) -> bool {
1189 self.is_concurrent
1190 }
1191
1192 /// Perform crash recovery
1193 fn recover(&self) -> Result<RecoveryStats> {
1194 self.storage.recover()
1195 }
1196
1197 /// Get database path
1198 pub fn path(&self) -> &Path {
1199 &self.path
1200 }
1201
1202 // =========================================================================
1203 // Transaction API
1204 // =========================================================================
1205
1206 /// Begin a new transaction
1207 pub fn begin_transaction(&self) -> Result<TxnHandle> {
1208 self.stats
1209 .transactions_started
1210 .fetch_add(1, Ordering::Relaxed);
1211 let txn_id = self.storage.begin_transaction()?;
1212
1213 // Get snapshot timestamp from MVCC
1214 // For now, use txn_id as a proxy (the real snapshot_ts is managed internally)
1215 Ok(TxnHandle {
1216 txn_id,
1217 snapshot_ts: txn_id,
1218 })
1219 }
1220
1221 /// Begin a read-only transaction (optimized: no SSI tracking)
1222 ///
1223 /// Read-only transactions skip SSI read tracking, reducing overhead
1224 /// from ~82ns to ~32ns per read (2.6x faster).
1225 ///
1226 /// Use this for:
1227 /// - SELECT queries that don't modify data
1228 /// - Analytics and reporting queries
1229 /// - Snapshot reads for backup
1230 pub fn begin_read_only(&self) -> Result<TxnHandle> {
1231 self.stats
1232 .transactions_started
1233 .fetch_add(1, Ordering::Relaxed);
1234 let txn_id = self.storage.begin_with_mode(TransactionMode::ReadOnly)?;
1235 Ok(TxnHandle {
1236 txn_id,
1237 snapshot_ts: txn_id,
1238 })
1239 }
1240
1241 /// Begin a lightweight read-only transaction (no WAL overhead).
1242 ///
1243 /// Eliminates WAL mutex acquisitions entirely for read operations.
1244 /// The txn_id is allocated atomically and MVCC snapshot state is created,
1245 /// but NO WAL records are written (no TxnBegin, no TxnAbort).
1246 ///
1247 /// ~5-10x faster per-operation than `begin_read_only()` because it avoids:
1248 /// - 2 WAL mutex lock/unlock cycles per transaction
1249 /// - 2 WAL BufWriter serializations per transaction
1250 ///
1251 /// Callers MUST use `abort_read_only_fast()` to clean up — NOT `commit()`
1252 /// or `abort()`.
1253 #[inline]
1254 pub fn begin_read_only_fast(&self) -> TxnHandle {
1255 let txn_id = self.storage.begin_read_only_fast();
1256 TxnHandle {
1257 txn_id,
1258 snapshot_ts: txn_id,
1259 }
1260 }
1261
1262 /// Abort a fast read-only transaction — O(1), no WAL, no memtable scan.
1263 #[inline]
1264 pub fn abort_read_only_fast(&self, txn: TxnHandle) {
1265 self.storage.abort_read_only_fast(txn.txn_id);
1266 }
1267
1268 /// Read a key WITHOUT any MVCC transaction tracking.
1269 ///
1270 /// Uses the current global timestamp to see all committed writes.
1271 /// Bypasses: begin/abort, active_txns DashMap, record_read, stats.
1272 /// Only safe for single-threaded access with no concurrent writers.
1273 #[inline]
1274 pub fn get_raw_read(&self, key: &[u8]) -> Option<Vec<u8>> {
1275 self.storage.read_latest(key)
1276 }
1277
1278 /// Scan by prefix WITHOUT any MVCC transaction tracking.
1279 ///
1280 /// Uses the current global timestamp. Only safe for single-threaded access.
1281 #[inline]
1282 pub fn scan_raw(&self, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
1283 self.storage.scan_latest(prefix)
1284 }
1285
1286 /// Begin a write-only transaction (optimized: no read tracking)
1287 ///
1288 /// Write-only transactions skip read tracking, improving insert
1289 /// throughput for bulk loading scenarios.
1290 ///
1291 /// Use this for:
1292 /// - Bulk data imports
1293 /// - Append-only logging tables
1294 /// - ETL pipelines
1295 pub fn begin_write_only(&self) -> Result<TxnHandle> {
1296 self.stats
1297 .transactions_started
1298 .fetch_add(1, Ordering::Relaxed);
1299 let txn_id = self.storage.begin_with_mode(TransactionMode::WriteOnly)?;
1300 Ok(TxnHandle {
1301 txn_id,
1302 snapshot_ts: txn_id,
1303 })
1304 }
1305
1306 /// Commit a transaction
1307 ///
1308 /// In concurrent mode, acquires the shared writer lock to ensure
1309 /// WAL writes are serialized across processes, and forces a flush+sync
1310 /// so that subsequent processes see the committed data.
1311 pub fn commit(&self, txn: TxnHandle) -> Result<u64> {
1312 self.stats
1313 .transactions_committed
1314 .fetch_add(1, Ordering::Relaxed);
1315
1316 // In concurrent mode, acquire the cross-process writer lock
1317 // to serialize WAL commits across processes
1318 let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1319 Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1320 } else {
1321 None
1322 };
1323
1324 let commit_ts = self.storage.commit(txn.txn_id)?;
1325
1326 // In concurrent mode, force flush+sync so other processes can see
1327 // the committed data when they open the DB or run recovery.
1328 // Without this, the BufWriter may hold data that isn't visible
1329 // to other processes reading the WAL file.
1330 if self.is_concurrent {
1331 self.storage.flush_wal()?;
1332 self.storage.fsync()?;
1333 }
1334
1335 // Notify concurrent MVCC of commit for GC tracking
1336 if let Some(ref mvcc) = self.concurrent_mvcc {
1337 mvcc.on_commit();
1338 }
1339
1340 Ok(commit_ts)
1341 }
1342
1343 /// Abort a transaction
1344 pub fn abort(&self, txn: TxnHandle) -> Result<()> {
1345 self.stats
1346 .transactions_aborted
1347 .fetch_add(1, Ordering::Relaxed);
1348 self.storage.abort(txn.txn_id)
1349 }
1350
1351 // =========================================================================
1352 // Per-Table Index Policy API
1353 // =========================================================================
1354
1355 /// Configure index policy for a table
1356 ///
1357 /// This allows fine-grained control over write/scan trade-offs per table:
1358 ///
1359 /// | Policy | Insert Cost | Scan Cost | Use Case |
1360 /// |----------------|-------------|----------------|------------------------|
1361 /// | WriteOptimized | O(1) | O(N) | High-write, rare scan |
1362 /// | Balanced | O(1) amort | O(output+logK) | Mixed OLTP |
1363 /// | ScanOptimized | O(log N) | O(logN + K) | Analytics, range query |
1364 /// | AppendOnly | O(1) | O(N) | Time-series logs |
1365 ///
1366 /// # Example
1367 ///
1368 /// ```ignore
1369 /// // Fast inserts for logs table (no ordered index overhead)
1370 /// db.set_table_index_policy("logs", IndexPolicy::WriteOptimized);
1371 ///
1372 /// // Efficient range scans for analytics table
1373 /// db.set_table_index_policy("analytics", IndexPolicy::ScanOptimized);
1374 ///
1375 /// // Balanced for OLTP tables
1376 /// db.set_table_index_policy("users", IndexPolicy::Balanced);
1377 /// ```
1378 pub fn set_table_index_policy(&self, table: &str, policy: IndexPolicy) {
1379 self.index_registry
1380 .configure_table(TableIndexConfig::new(table, policy));
1381 }
1382
1383 /// Get the index policy for a table
1384 pub fn get_table_index_policy(&self, table: &str) -> IndexPolicy {
1385 self.index_registry.get_policy(table)
1386 }
1387
1388 /// Get the index registry for advanced configuration
1389 pub fn index_registry(&self) -> &Arc<TableIndexRegistry> {
1390 &self.index_registry
1391 }
1392
1393 // =========================================================================
1394 // Key-Value API (Low-level)
1395 // =========================================================================
1396
1397 /// Put a key-value pair
1398 ///
1399 /// In concurrent mode, acquires the shared writer lock to ensure
1400 /// WAL writes are serialized across processes.
1401 pub fn put(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1402 self.stats
1403 .bytes_written
1404 .fetch_add((key.len() + value.len()) as u64, Ordering::Relaxed);
1405
1406 // In concurrent mode, acquire cross-process writer lock
1407 let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1408 Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1409 } else {
1410 None
1411 };
1412
1413 // Use write_refs to avoid unnecessary allocations
1414 self.storage.write_refs(txn.txn_id, key, value)
1415 }
1416
1417 /// Batch put multiple key-value pairs with reduced overhead
1418 ///
1419 /// This amortizes per-operation costs over the entire batch:
1420 /// - Single DashMap lookup
1421 /// - Batch MVCC tracking
1422 /// - Batch memtable writes
1423 ///
1424 /// For 100+ entries, this is 2-3x faster than individual puts.
1425 ///
1426 /// # Example
1427 ///
1428 /// ```ignore
1429 /// let writes: Vec<(&[u8], &[u8])> = vec![
1430 /// (b"key1", b"value1"),
1431 /// (b"key2", b"value2"),
1432 /// (b"key3", b"value3"),
1433 /// ];
1434 /// db.put_batch(txn, &writes)?;
1435 /// ```
1436 pub fn put_batch(&self, txn: TxnHandle, writes: &[(&[u8], &[u8])]) -> Result<()> {
1437 let bytes: u64 = writes.iter().map(|(k, v)| (k.len() + v.len()) as u64).sum();
1438 self.stats.bytes_written.fetch_add(bytes, Ordering::Relaxed);
1439
1440 // In concurrent mode, acquire cross-process writer lock
1441 let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1442 Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1443 } else {
1444 None
1445 };
1446
1447 self.storage.write_batch_refs(txn.txn_id, writes)
1448 }
1449
1450 /// Get a value by key
1451 pub fn get(&self, txn: TxnHandle, key: &[u8]) -> Result<Option<Vec<u8>>> {
1452 let result = self.storage.read(txn.txn_id, key)?;
1453 if let Some(ref data) = result {
1454 self.stats
1455 .bytes_read
1456 .fetch_add(data.len() as u64, Ordering::Relaxed);
1457 }
1458 Ok(result)
1459 }
1460
1461 /// Delete a key
1462 pub fn delete(&self, txn: TxnHandle, key: &[u8]) -> Result<()> {
1463 self.storage.delete(txn.txn_id, key.to_vec())
1464 }
1465
1466 /// Minimum prefix length for scan operations.
1467 /// Prevents expensive full-table scans by requiring a meaningful prefix.
1468 pub const MIN_SCAN_PREFIX_LEN: usize = 2;
1469
1470 /// Scan keys with a prefix (enforces minimum prefix length for safety).
1471 ///
1472 /// # Prefix Safety
1473 ///
1474 /// To prevent accidental full-table scans, this method requires a minimum
1475 /// prefix length of 2 bytes. Use `scan_unchecked` for internal operations
1476 /// that need empty/short prefixes.
1477 ///
1478 /// # Errors
1479 ///
1480 /// Returns `SochDBError::InvalidInput` if prefix is too short.
1481 pub fn scan(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1482 if prefix.len() < Self::MIN_SCAN_PREFIX_LEN {
1483 return Err(SochDBError::InvalidArgument(format!(
1484 "Prefix too short: {} bytes (minimum {} required). \
1485 Use scan_unchecked() for unrestricted scans.",
1486 prefix.len(),
1487 Self::MIN_SCAN_PREFIX_LEN
1488 )));
1489 }
1490 self.scan_unchecked(txn, prefix)
1491 }
1492
1493 /// Scan keys with a prefix without length validation.
1494 ///
1495 /// # Warning
1496 ///
1497 /// This method allows empty/short prefixes which can cause expensive
1498 /// full-table scans. Use `scan()` unless you specifically need unrestricted
1499 /// prefix access for internal operations.
1500 pub fn scan_unchecked(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1501 let results = self.storage.scan(txn.txn_id, prefix)?;
1502 let bytes: u64 = results
1503 .iter()
1504 .map(|(k, v)| (k.len() + v.len()) as u64)
1505 .sum();
1506 self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1507 Ok(results)
1508 }
1509
1510 /// Scan keys in range
1511 pub fn scan_range(
1512 &self,
1513 txn: TxnHandle,
1514 start: &[u8],
1515 end: &[u8],
1516 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1517 let results = self.storage.scan_range(txn.txn_id, start, end)?;
1518 let bytes: u64 = results
1519 .iter()
1520 .map(|(k, v)| (k.len() + v.len()) as u64)
1521 .sum();
1522 self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1523 Ok(results)
1524 }
1525
1526 /// Streaming scan for very large result sets
1527 ///
1528 /// Returns an iterator that yields (key, value) pairs without
1529 /// materializing the entire result set. Use this for large scans
1530 /// where memory efficiency is important.
1531 ///
1532 /// ## Performance
1533 ///
1534 /// - Memory: O(1) per iteration vs O(N) for scan_range
1535 /// - Latency: First result available immediately vs waiting for all results
1536 /// - Throughput: Slightly lower due to per-item overhead
1537 ///
1538 /// ## Usage
1539 ///
1540 /// ```ignore
1541 /// for result in db.scan_range_iter(txn, b"start", b"end") {
1542 /// let (key, value) = result?;
1543 /// // Process immediately - no need to wait for all results
1544 /// }
1545 /// ```
1546 pub fn scan_range_iter<'a>(
1547 &'a self,
1548 txn: TxnHandle,
1549 start: &'a [u8],
1550 end: &'a [u8],
1551 ) -> impl Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a {
1552 let stats = &self.stats;
1553 self.storage
1554 .scan_range_iter(txn.txn_id, start, end)
1555 .map(move |item| {
1556 stats
1557 .bytes_read
1558 .fetch_add((item.0.len() + item.1.len()) as u64, Ordering::Relaxed);
1559 Ok(item)
1560 })
1561 }
1562
1563 /// Flush memtable to WAL/Disk
1564 pub fn flush(&self) -> Result<()> {
1565 self.storage.fsync()
1566 }
1567
1568 // =========================================================================
1569 // Path-Native API (SochDB's differentiator)
1570 // =========================================================================
1571
1572 /// Get storage statistics
1573 pub fn storage_stats(&self) -> crate::durable_storage::StorageStats {
1574 self.storage.stats()
1575 }
1576
1577 /// Put a value at a path
1578 ///
1579 /// Path format: "collection/doc_id/field" or "table.row_id.column"
1580 /// Resolution is O(|path|), not O(log N) like B-tree.
1581 pub fn put_path(&self, txn: TxnHandle, path: &str, value: &[u8]) -> Result<()> {
1582 self.put(txn, path.as_bytes(), value)
1583 }
1584
1585 /// Get a value at a path
1586 pub fn get_path(&self, txn: TxnHandle, path: &str) -> Result<Option<Vec<u8>>> {
1587 self.get(txn, path.as_bytes())
1588 }
1589
1590 /// Delete at a path
1591 pub fn delete_path(&self, txn: TxnHandle, path: &str) -> Result<()> {
1592 self.delete(txn, path.as_bytes())
1593 }
1594
1595 /// Scan a path prefix
1596 ///
1597 /// Returns all key-value pairs where key starts with prefix.
1598 /// Useful for: "users/123/" -> all fields of user 123
1599 pub fn scan_path(&self, txn: TxnHandle, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
1600 self.stats.queries_executed.fetch_add(1, Ordering::Relaxed);
1601
1602 let results = self.scan(txn, prefix.as_bytes())?;
1603
1604 Ok(results
1605 .into_iter()
1606 .filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
1607 .collect())
1608 }
1609
1610 // =========================================================================
1611 // Query API
1612 // =========================================================================
1613
1614 /// Execute a path query and return results
1615 ///
1616 /// This is the main query interface for LLM context retrieval.
1617 /// Supports:
1618 /// - Path prefix matching
1619 /// - Column projection (for I/O reduction)
1620 /// - Limit/offset
1621 pub fn query(&self, txn: TxnHandle, path_prefix: &str) -> QueryBuilder<'_> {
1622 QueryBuilder::new(self, txn, path_prefix.to_string())
1623 }
1624
1625 // =========================================================================
1626 // Table API (Higher-level abstraction)
1627 // =========================================================================
1628
1629 /// Register a table schema
1630 pub fn register_table(&self, schema: TableSchema) -> Result<()> {
1631 if self.tables.contains_key(&schema.name) {
1632 return Err(SochDBError::InvalidArgument(format!(
1633 "Table '{}' already exists",
1634 schema.name
1635 )));
1636 }
1637 // Cache the packed schema for fast inserts
1638 let packed_schema = Self::to_packed_schema(&schema);
1639 self.packed_schemas
1640 .insert(schema.name.clone(), packed_schema);
1641 self.tables.insert(schema.name.clone(), schema);
1642 Ok(())
1643 }
1644
1645 /// Get table schema
1646 pub fn get_table_schema(&self, name: &str) -> Option<TableSchema> {
1647 self.tables.get(name).map(|s| s.clone())
1648 }
1649
1650 /// Update the schema for an existing table (used by ALTER TABLE).
1651 ///
1652 /// Replaces the schema in both the `tables` DashMap and the packed schema
1653 /// cache atomically (per-key). The caller is responsible for validating
1654 /// the new schema.
1655 pub fn update_table_schema(&self, old_name: &str, schema: TableSchema) -> Result<()> {
1656 if !self.tables.contains_key(old_name) {
1657 return Err(SochDBError::InvalidArgument(format!(
1658 "Table '{}' not found",
1659 old_name
1660 )));
1661 }
1662 // Remove old entries
1663 self.tables.remove(old_name);
1664 self.packed_schemas.remove(old_name);
1665 // Insert new
1666 let packed = Self::to_packed_schema(&schema);
1667 self.packed_schemas.insert(schema.name.clone(), packed);
1668 self.tables.insert(schema.name.clone(), schema);
1669 Ok(())
1670 }
1671
1672 /// List all tables
1673 pub fn list_tables(&self) -> Vec<String> {
1674 self.tables.iter().map(|e| e.key().clone()).collect()
1675 }
1676
1677 // =========================================================================
1678 // CDC (Change Data Capture)
1679 // =========================================================================
1680
1681 /// Enable CDC on this database, returning the CDC log handle.
1682 ///
1683 /// Subsequent mutations emitted via the SQL execution layer will be
1684 /// recorded in the CDC log for subscriber consumption.
1685 pub fn enable_cdc(&mut self, config: crate::cdc::CdcConfig) -> Arc<crate::cdc::CdcLog> {
1686 let log = crate::cdc::CdcLog::new(config);
1687 self.cdc_log = Some(log.clone());
1688 log
1689 }
1690
1691 /// Get the CDC log handle, if CDC is enabled.
1692 pub fn cdc_log(&self) -> Option<&Arc<crate::cdc::CdcLog>> {
1693 self.cdc_log.as_ref()
1694 }
1695
1696 /// Convert TableSchema to PackedTableSchema for efficient storage
1697 fn to_packed_schema(schema: &TableSchema) -> PackedTableSchema {
1698 let columns = schema
1699 .columns
1700 .iter()
1701 .map(|col| PackedColumnDef {
1702 name: col.name.clone(),
1703 col_type: match col.col_type {
1704 ColumnType::Int64 => PackedColumnType::Int64,
1705 ColumnType::UInt64 => PackedColumnType::UInt64,
1706 ColumnType::Float64 => PackedColumnType::Float64,
1707 ColumnType::Text => PackedColumnType::Text,
1708 ColumnType::Binary => PackedColumnType::Binary,
1709 ColumnType::Bool => PackedColumnType::Bool,
1710 },
1711 nullable: col.nullable,
1712 })
1713 .collect();
1714
1715 PackedTableSchema::new(&schema.name, columns)
1716 }
1717
1718 /// Insert a row into a table
1719 ///
1720 /// Uses packed row format: stores entire row as single key-value pair.
1721 /// This reduces write amplification from 4× to 1× for a 4-column table.
1722 ///
1723 /// # Performance
1724 /// - Before: 4 columns × (WAL entry + MVCC version) = 4 writes
1725 /// - After: 1 packed row = 1 write
1726 /// - Improvement: ~4× fewer WAL entries, ~48% less I/O overhead
1727 pub fn insert_row(
1728 &self,
1729 txn: TxnHandle,
1730 table: &str,
1731 row_id: u64,
1732 values: &HashMap<String, SochValue>,
1733 ) -> Result<()> {
1734 // Use cached packed schema - single DashMap lookup, no clone
1735 let packed_schema = self
1736 .packed_schemas
1737 .get(table)
1738 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1739
1740 // Pack the row using cached schema
1741 let packed_row = PackedRow::pack(&packed_schema, values);
1742
1743 // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1744 let key = KeyBuffer::format_row_key(table, row_id);
1745
1746 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1747
1748 Ok(())
1749 }
1750
1751 /// Read a row from a table
1752 ///
1753 /// Reads packed row and extracts requested columns in O(k) time.
1754 /// Column projection happens in memory, not storage - all columns are fetched.
1755 pub fn read_row(
1756 &self,
1757 txn: TxnHandle,
1758 table: &str,
1759 row_id: u64,
1760 columns: Option<&[&str]>,
1761 ) -> Result<Option<HashMap<String, SochValue>>> {
1762 let schema = self
1763 .tables
1764 .get(table)
1765 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1766
1767 // Read the packed row with a single key lookup using KeyBuffer
1768 let key = KeyBuffer::format_row_key(table, row_id);
1769 let bytes = match self.get(txn, key.as_bytes())? {
1770 Some(b) => b,
1771 None => return Ok(None),
1772 };
1773
1774 // Use cached packed schema
1775 let packed_schema = self
1776 .packed_schemas
1777 .get(table)
1778 .ok_or_else(|| SochDBError::Internal("Packed schema not found".into()))?;
1779 let packed_row = PackedRow::from_bytes(bytes, packed_schema.num_columns())?;
1780
1781 // Determine which columns to return
1782 let cols_to_read: Vec<&str> = match columns {
1783 Some(c) => c.to_vec(),
1784 None => schema.columns.iter().map(|c| c.name.as_str()).collect(),
1785 };
1786
1787 let mut row = HashMap::new();
1788 for col_name in cols_to_read {
1789 if let Some(idx) = packed_schema.column_index(col_name)
1790 && let Some(col_def) = packed_schema.column(idx)
1791 && let Some(value) = packed_row.get_column(idx, col_def.col_type)
1792 {
1793 row.insert(col_name.to_string(), value);
1794 }
1795 }
1796
1797 Ok(Some(row))
1798 }
1799
1800 /// Insert multiple rows efficiently in a batch
1801 ///
1802 /// This method accumulates all rows and writes them with fewer WAL syncs.
1803 /// Ideal for bulk loading scenarios.
1804 ///
1805 /// # Performance
1806 /// - Uses group commit to batch fsync operations
1807 /// - Expected throughput: 500K-1M rows/sec depending on row size
1808 pub fn insert_rows_batch(
1809 &self,
1810 txn: TxnHandle,
1811 table: &str,
1812 rows: &[(u64, HashMap<String, SochValue>)],
1813 ) -> Result<usize> {
1814 // Use cached packed schema
1815 let packed_schema = self
1816 .packed_schemas
1817 .get(table)
1818 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1819
1820 let mut count = 0;
1821
1822 for (row_id, values) in rows {
1823 // Pack and write using KeyBuffer for efficient key construction
1824 let packed_row = PackedRow::pack(&packed_schema, values);
1825 let key = KeyBuffer::format_row_key(table, *row_id);
1826 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1827 count += 1;
1828 }
1829
1830 Ok(count)
1831 }
1832
1833 /// Ultra-fast raw put - bypasses all validation
1834 ///
1835 /// Use when you've already validated the data and just need speed.
1836 /// This is ~10× faster than insert_row() for bulk inserts.
1837 #[inline]
1838 pub fn put_raw(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1839 self.storage.write_refs(txn.txn_id, key, value)
1840 }
1841
1842 /// Zero-allocation insert - fastest path for bulk inserts
1843 ///
1844 /// Takes values as a slice in schema column order, avoiding HashMap overhead.
1845 ///
1846 /// # Arguments
1847 /// * `txn` - Transaction handle
1848 /// * `table` - Table name
1849 /// * `row_id` - Row identifier
1850 /// * `values` - Values in schema column order (None = NULL)
1851 ///
1852 /// # Performance
1853 /// - Eliminates ~6 allocations per row vs insert_row()
1854 /// - Expected: 1.2M-1.5M inserts/sec
1855 ///
1856 /// # Example
1857 /// ```ignore
1858 /// let values: &[Option<&SochValue>] = &[
1859 /// Some(&SochValue::Int(1)),
1860 /// Some(&SochValue::Text("Alice".into())),
1861 /// None, // NULL
1862 /// ];
1863 /// db.insert_row_slice(txn, "users", 1, values)?;
1864 /// ```
1865 #[inline]
1866 pub fn insert_row_slice(
1867 &self,
1868 txn: TxnHandle,
1869 table: &str,
1870 row_id: u64,
1871 values: &[Option<&SochValue>],
1872 ) -> Result<()> {
1873 // Use cached packed schema - single DashMap lookup
1874 let packed_schema = self
1875 .packed_schemas
1876 .get(table)
1877 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1878
1879 // Validate column count matches
1880 if values.len() != packed_schema.num_columns() {
1881 return Err(SochDBError::InvalidArgument(format!(
1882 "Expected {} columns, got {}",
1883 packed_schema.num_columns(),
1884 values.len()
1885 )));
1886 }
1887
1888 // Pack using zero-allocation path
1889 let packed_row = PackedRow::pack_slice(&packed_schema, values);
1890
1891 // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1892 let key = KeyBuffer::format_row_key(table, row_id);
1893
1894 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1895 Ok(())
1896 }
1897
1898 // =========================================================================
1899 // Maintenance
1900 // =========================================================================
1901
1902 /// Force fsync to disk
1903 pub fn fsync(&self) -> Result<()> {
1904 self.storage.fsync()
1905 }
1906
1907 /// Create a checkpoint
1908 pub fn checkpoint(&self) -> Result<u64> {
1909 self.storage.checkpoint()
1910 }
1911
1912 /// Truncate the WAL file after a checkpoint.
1913 ///
1914 /// See [`DurableStorage::truncate_wal`] for safety notes.
1915 pub fn truncate_wal(&self) -> Result<()> {
1916 self.storage.truncate_wal()
1917 }
1918
1919 /// Run garbage collection
1920 pub fn gc(&self) -> usize {
1921 self.storage.gc()
1922 }
1923
1924 /// Get database statistics
1925 pub fn stats(&self) -> Stats {
1926 Stats {
1927 transactions_started: self.stats.transactions_started.load(Ordering::Relaxed),
1928 transactions_committed: self.stats.transactions_committed.load(Ordering::Relaxed),
1929 transactions_aborted: self.stats.transactions_aborted.load(Ordering::Relaxed),
1930 queries_executed: self.stats.queries_executed.load(Ordering::Relaxed),
1931 bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
1932 bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
1933 }
1934 }
1935
1936 /// Shutdown the database gracefully
1937 pub fn shutdown(&self) -> Result<()> {
1938 if self.shutdown.swap(1, Ordering::SeqCst) == 1 {
1939 return Ok(()); // Already shutting down
1940 }
1941
1942 // Flush any pending writes
1943 self.fsync()?;
1944
1945 // Create clean shutdown marker
1946 let marker = self.path.join(".clean_shutdown");
1947 std::fs::write(&marker, b"ok")?;
1948
1949 Ok(())
1950 }
1951}
1952
1953impl Drop for Database {
1954 fn drop(&mut self) {
1955 // Try graceful shutdown if not already done
1956 if self.shutdown.load(Ordering::SeqCst) == 0 {
1957 let _ = self.fsync();
1958 let marker = self.path.join(".clean_shutdown");
1959 let _ = std::fs::write(&marker, b"ok");
1960 }
1961 }
1962}
1963
1964/// Query builder for fluent query construction
1965pub struct QueryBuilder<'a> {
1966 db: &'a Database,
1967 txn: TxnHandle,
1968 path_prefix: String,
1969 columns: Option<Vec<String>>,
1970 limit: Option<usize>,
1971 offset: Option<usize>,
1972}
1973
1974impl<'a> QueryBuilder<'a> {
1975 fn new(db: &'a Database, txn: TxnHandle, path_prefix: String) -> Self {
1976 Self {
1977 db,
1978 txn,
1979 path_prefix,
1980 columns: None,
1981 limit: None,
1982 offset: None,
1983 }
1984 }
1985
1986 /// Select specific columns (for I/O reduction)
1987 pub fn columns(mut self, cols: &[&str]) -> Self {
1988 self.columns = Some(cols.iter().map(|s| s.to_string()).collect());
1989 self
1990 }
1991
1992 /// Limit results
1993 pub fn limit(mut self, n: usize) -> Self {
1994 self.limit = Some(n);
1995 self
1996 }
1997
1998 /// Skip results
1999 pub fn offset(mut self, n: usize) -> Self {
2000 self.offset = Some(n);
2001 self
2002 }
2003
2004 /// Execute the query
2005 ///
2006 /// Scans packed rows and unpacks them. Each key is "table/row_id" pointing to a packed row.
2007 pub fn execute(self) -> Result<QueryResult> {
2008 self.db
2009 .stats
2010 .queries_executed
2011 .fetch_add(1, Ordering::Relaxed);
2012
2013 // Get schema for the table if we're querying a table
2014 let table_name = self
2015 .path_prefix
2016 .split('/')
2017 .next()
2018 .unwrap_or(&self.path_prefix);
2019 let schema = self.db.tables.get(table_name).map(|s| s.clone());
2020
2021 // When querying a registered table by its bare name, scan the row-key
2022 // prefix "table/" rather than the bare name. Row keys are "table/row_id"
2023 // (see KeyBuffer::format_row_key), so scanning the bare name would:
2024 // (1) bleed across sibling tables whose names share a prefix — e.g.
2025 // querying "user" would also match "users/123"; and
2026 // (2) trip the scan minimum-prefix guard for single-character table
2027 // names ("t" is 1 byte, below the 2-byte minimum).
2028 // Appending the separator fixes both. Path-style prefixes that already
2029 // contain '/' (e.g. "users/123" to fetch one row's columns) are left
2030 // unchanged so field-prefix scans keep working.
2031 let scan_prefix = if schema.is_some() && !self.path_prefix.contains('/') {
2032 format!("{}/", self.path_prefix)
2033 } else {
2034 self.path_prefix.clone()
2035 };
2036
2037 // Scan the path prefix
2038 let results = self.db.scan_path(self.txn, &scan_prefix)?;
2039
2040 let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
2041 let mut bytes_read = 0usize;
2042
2043 if let Some(ref schema) = schema {
2044 // We have a table schema - use cached packed schema
2045 let packed_schema = self
2046 .db
2047 .packed_schemas
2048 .get(table_name)
2049 .map(|ps| ps.clone())
2050 .unwrap_or_else(|| Database::to_packed_schema(schema));
2051
2052 for (path, value_bytes) in results {
2053 // Parse path: table/row_id
2054 let parts: Vec<&str> = path.split('/').collect();
2055 if parts.len() == 2 {
2056 // This is a packed row
2057 bytes_read += value_bytes.len();
2058
2059 if let Ok(packed_row) =
2060 PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
2061 {
2062 // Unpack all columns or just requested columns
2063 let mut row = HashMap::new();
2064
2065 if let Some(ref cols) = self.columns {
2066 // Only extract requested columns
2067 for col_name in cols {
2068 if let Some(idx) = packed_schema.column_index(col_name)
2069 && let Some(col_def) = packed_schema.column(idx)
2070 && let Some(value) =
2071 packed_row.get_column(idx, col_def.col_type)
2072 {
2073 row.insert(col_name.clone(), value);
2074 }
2075 }
2076 } else {
2077 // Extract all columns
2078 row = packed_row.unpack(&packed_schema);
2079 }
2080
2081 if !row.is_empty() {
2082 rows.push(row);
2083 }
2084 }
2085 }
2086 }
2087 } else {
2088 // Fallback: no schema, try legacy column-per-key format
2089 let mut rows_map: HashMap<String, HashMap<String, SochValue>> = HashMap::new();
2090
2091 for (path, value_bytes) in results {
2092 let parts: Vec<&str> = path.split('/').collect();
2093 if parts.len() >= 3 {
2094 let row_key = format!("{}/{}", parts[0], parts[1]);
2095 let col_name = parts[2..].join("/");
2096
2097 if let Some(ref cols) = self.columns
2098 && !cols.contains(&col_name)
2099 {
2100 continue;
2101 }
2102
2103 bytes_read += value_bytes.len();
2104 let row = rows_map.entry(row_key).or_default();
2105 row.insert(col_name, deserialize_value(&value_bytes));
2106 }
2107 }
2108
2109 rows = rows_map.into_values().collect();
2110 }
2111
2112 // Apply offset
2113 if let Some(offset) = self.offset {
2114 rows = rows.into_iter().skip(offset).collect();
2115 }
2116
2117 // Apply limit
2118 if let Some(limit) = self.limit {
2119 rows.truncate(limit);
2120 }
2121
2122 // Collect column names
2123 let columns: Vec<String> = self.columns.unwrap_or_else(|| {
2124 rows.iter()
2125 .flat_map(|r| r.keys().cloned())
2126 .collect::<std::collections::HashSet<_>>()
2127 .into_iter()
2128 .collect()
2129 });
2130
2131 Ok(QueryResult {
2132 columns,
2133 rows_scanned: rows.len(),
2134 bytes_read,
2135 rows,
2136 })
2137 }
2138
2139 /// Execute and return TOON format (for LLM efficiency)
2140 pub fn to_toon(self) -> Result<String> {
2141 let result = self.execute()?;
2142 Ok(result.to_toon())
2143 }
2144
2145 /// Execute with lazy iteration - avoids materializing all rows
2146 ///
2147 /// Returns an iterator over rows as `Vec<SochValue>` in schema column order.
2148 /// This is more memory-efficient than `execute()` for large result sets.
2149 ///
2150 /// # Performance
2151 /// - No upfront materialization of all rows
2152 /// - ~40% less memory for large result sets
2153 /// - Ideal for streaming to network or aggregations
2154 ///
2155 /// # Example
2156 /// ```ignore
2157 /// for row_result in db.query(txn, "users").execute_iter()? {
2158 /// let row = row_result?;
2159 /// // row is Vec<SochValue> in column order
2160 /// }
2161 /// ```
2162 pub fn execute_iter(self) -> Result<QueryRowIterator> {
2163 self.db
2164 .stats
2165 .queries_executed
2166 .fetch_add(1, Ordering::Relaxed);
2167
2168 let table_name = self
2169 .path_prefix
2170 .split('/')
2171 .next()
2172 .unwrap_or(&self.path_prefix)
2173 .to_string();
2174
2175 // Get packed schema (clone needed for iterator ownership)
2176 let packed_schema = self.db.packed_schemas.get(&table_name).map(|ps| ps.clone());
2177
2178 // Scan the path prefix
2179 let results = self.db.scan_path(self.txn, &self.path_prefix)?;
2180
2181 Ok(QueryRowIterator {
2182 results: results.into_iter(),
2183 packed_schema,
2184 columns: self.columns,
2185 offset: self.offset.unwrap_or(0),
2186 limit: self.limit,
2187 yielded: 0,
2188 skipped: 0,
2189 })
2190 }
2191
2192 /// Execute and return columnar (SIMD-friendly) result format
2193 ///
2194 /// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, returns
2195 /// column-oriented `Vec<TypedColumn>` for vectorized operations.
2196 ///
2197 /// ## Performance Benefits
2198 ///
2199 /// - SIMD: Aggregate operations (sum, avg) use vectorized instructions
2200 /// - Cache: Sequential access maximizes L1/L2 hits
2201 /// - Memory: ~30% less overhead than row-based format
2202 /// - Analytics: Ideal for ML preprocessing and statistics
2203 ///
2204 /// ## Example
2205 ///
2206 /// ```ignore
2207 /// let result = db.query(txn, "users")
2208 /// .columns(&["id", "score"])
2209 /// .as_columnar()?;
2210 ///
2211 /// // SIMD-optimized sum
2212 /// let total = result.sum_i64("score").unwrap_or(0);
2213 ///
2214 /// // Direct column access
2215 /// if let Some(scores) = result.column("score") {
2216 /// for i in 0..scores.len() {
2217 /// if let Some(v) = scores.get_i64(i) {
2218 /// println!("Score: {}", v);
2219 /// }
2220 /// }
2221 /// }
2222 /// ```
2223 pub fn as_columnar(self) -> Result<ColumnarQueryResult> {
2224 self.db
2225 .stats
2226 .queries_executed
2227 .fetch_add(1, Ordering::Relaxed);
2228
2229 let table_name = self
2230 .path_prefix
2231 .split('/')
2232 .next()
2233 .unwrap_or(&self.path_prefix);
2234 let schema = self.db.tables.get(table_name).map(|s| s.clone());
2235
2236 // Get packed schema
2237 let packed_schema = match self.db.packed_schemas.get(table_name) {
2238 Some(ps) => ps.clone(),
2239 None => return Ok(ColumnarQueryResult::empty()),
2240 };
2241
2242 // Determine columns to fetch
2243 let column_names: Vec<String> = self.columns.clone().unwrap_or_else(|| {
2244 schema
2245 .as_ref()
2246 .map(|s| s.columns.iter().map(|c| c.name.clone()).collect())
2247 .unwrap_or_default()
2248 });
2249
2250 if column_names.is_empty() {
2251 return Ok(ColumnarQueryResult::empty());
2252 }
2253
2254 // Initialize TypedColumns based on schema types
2255 let mut columns: Vec<CoreTypedColumn> = column_names
2256 .iter()
2257 .map(|col_name| {
2258 packed_schema
2259 .column_index(col_name)
2260 .and_then(|idx| packed_schema.column(idx))
2261 .map(|col_def| match col_def.col_type {
2262 PackedColumnType::Int64 => CoreTypedColumn::new_int64(),
2263 PackedColumnType::UInt64 => CoreTypedColumn::new_uint64(),
2264 PackedColumnType::Float64 => CoreTypedColumn::new_float64(),
2265 PackedColumnType::Text => CoreTypedColumn::new_text(),
2266 PackedColumnType::Binary => CoreTypedColumn::new_binary(),
2267 PackedColumnType::Bool => CoreTypedColumn::new_bool(),
2268 PackedColumnType::Null => CoreTypedColumn::new_text(), // Null column = fallback to text
2269 })
2270 .unwrap_or_else(CoreTypedColumn::new_text) // fallback
2271 })
2272 .collect();
2273
2274 // Scan the path prefix
2275 let results = self.db.scan_path(self.txn, &self.path_prefix)?;
2276
2277 let mut row_count = 0;
2278 let mut bytes_read = 0;
2279 let mut skipped = 0;
2280
2281 for (path, value_bytes) in results {
2282 // Parse path: table/row_id
2283 let parts: Vec<&str> = path.split('/').collect();
2284 if parts.len() != 2 {
2285 continue;
2286 }
2287
2288 // Apply offset
2289 if let Some(offset) = self.offset
2290 && skipped < offset
2291 {
2292 skipped += 1;
2293 continue;
2294 }
2295
2296 // Apply limit
2297 if let Some(limit) = self.limit
2298 && row_count >= limit
2299 {
2300 break;
2301 }
2302
2303 bytes_read += value_bytes.len();
2304
2305 if let Ok(packed_row) = PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
2306 {
2307 // Extract each column and push to corresponding TypedColumn
2308 for (col_idx, col_name) in column_names.iter().enumerate() {
2309 if let Some(schema_idx) = packed_schema.column_index(col_name) {
2310 if let Some(col_def) = packed_schema.column(schema_idx) {
2311 let value = packed_row.get_column(schema_idx, col_def.col_type);
2312 push_value_to_typed_column(&mut columns[col_idx], value);
2313 } else {
2314 push_null_to_typed_column(&mut columns[col_idx]);
2315 }
2316 } else {
2317 push_null_to_typed_column(&mut columns[col_idx]);
2318 }
2319 }
2320 row_count += 1;
2321 }
2322 }
2323
2324 Ok(ColumnarQueryResult {
2325 columns: column_names,
2326 data: columns,
2327 row_count,
2328 bytes_read,
2329 })
2330 }
2331}
2332
2333/// Lazy iterator over query results
2334///
2335/// Unpacks rows on-demand, avoiding upfront materialization.
2336pub struct QueryRowIterator {
2337 results: std::vec::IntoIter<(String, Vec<u8>)>,
2338 packed_schema: Option<PackedTableSchema>,
2339 columns: Option<Vec<String>>,
2340 offset: usize,
2341 limit: Option<usize>,
2342 yielded: usize,
2343 skipped: usize,
2344}
2345
2346impl Iterator for QueryRowIterator {
2347 type Item = Result<Vec<SochValue>>;
2348
2349 fn next(&mut self) -> Option<Self::Item> {
2350 // Check limit
2351 if let Some(limit) = self.limit
2352 && self.yielded >= limit
2353 {
2354 return None;
2355 }
2356
2357 loop {
2358 let (path, value_bytes) = self.results.next()?;
2359
2360 // Parse path: table/row_id
2361 let parts: Vec<&str> = path.split('/').collect();
2362 if parts.len() != 2 {
2363 continue; // Skip non-row entries
2364 }
2365
2366 // Apply offset
2367 if self.skipped < self.offset {
2368 self.skipped += 1;
2369 continue;
2370 }
2371
2372 if let Some(ref schema) = self.packed_schema {
2373 match PackedRow::from_bytes(value_bytes, schema.num_columns()) {
2374 Ok(packed_row) => {
2375 let row = if let Some(ref cols) = self.columns {
2376 // Project specific columns
2377 cols.iter()
2378 .map(|col_name| {
2379 schema
2380 .column_index(col_name)
2381 .and_then(|idx| schema.column(idx))
2382 .and_then(|col_def| {
2383 packed_row.get_column(
2384 schema.column_index(col_name).unwrap(),
2385 col_def.col_type,
2386 )
2387 })
2388 .unwrap_or(SochValue::Null)
2389 })
2390 .collect()
2391 } else {
2392 // All columns in order
2393 packed_row.unpack_to_vec(schema)
2394 };
2395
2396 self.yielded += 1;
2397 return Some(Ok(row));
2398 }
2399 Err(e) => return Some(Err(e)),
2400 }
2401 } else {
2402 // No schema - return raw bytes as binary
2403 self.yielded += 1;
2404 return Some(Ok(vec![SochValue::Binary(value_bytes)]));
2405 }
2406 }
2407 }
2408}
2409
2410// Helper functions for serialization (kept for backward compatibility with legacy data)
2411
2412#[allow(dead_code)]
2413fn serialize_value(value: &SochValue) -> Vec<u8> {
2414 // Simple serialization - in production use proper format
2415 match value {
2416 SochValue::Null => vec![0],
2417 SochValue::Int(i) => {
2418 let mut buf = vec![1];
2419 buf.extend_from_slice(&i.to_le_bytes());
2420 buf
2421 }
2422 SochValue::UInt(u) => {
2423 let mut buf = vec![2];
2424 buf.extend_from_slice(&u.to_le_bytes());
2425 buf
2426 }
2427 SochValue::Float(f) => {
2428 let mut buf = vec![3];
2429 buf.extend_from_slice(&f.to_le_bytes());
2430 buf
2431 }
2432 SochValue::Text(s) => {
2433 let mut buf = vec![4];
2434 buf.extend_from_slice(s.as_bytes());
2435 buf
2436 }
2437 SochValue::Bool(b) => vec![5, if *b { 1 } else { 0 }],
2438 SochValue::Binary(b) => {
2439 let mut buf = vec![6];
2440 buf.extend_from_slice(b);
2441 buf
2442 }
2443 _ => {
2444 // Fallback: serialize as text
2445 let s = format!("{:?}", value);
2446 let mut buf = vec![4];
2447 buf.extend_from_slice(s.as_bytes());
2448 buf
2449 }
2450 }
2451}
2452
2453fn deserialize_value(bytes: &[u8]) -> SochValue {
2454 if bytes.is_empty() {
2455 return SochValue::Null;
2456 }
2457
2458 match bytes[0] {
2459 0 => SochValue::Null,
2460 1 if bytes.len() >= 9 => {
2461 let i = i64::from_le_bytes(bytes[1..9].try_into().unwrap());
2462 SochValue::Int(i)
2463 }
2464 2 if bytes.len() >= 9 => {
2465 let u = u64::from_le_bytes(bytes[1..9].try_into().unwrap());
2466 SochValue::UInt(u)
2467 }
2468 3 if bytes.len() >= 9 => {
2469 let f = f64::from_le_bytes(bytes[1..9].try_into().unwrap());
2470 SochValue::Float(f)
2471 }
2472 4 => {
2473 let s = String::from_utf8_lossy(&bytes[1..]).to_string();
2474 SochValue::Text(s)
2475 }
2476 5 if bytes.len() >= 2 => SochValue::Bool(bytes[1] != 0),
2477 6 => SochValue::Binary(bytes[1..].to_vec()),
2478 _ => {
2479 // Treat as text
2480 let s = String::from_utf8_lossy(bytes).to_string();
2481 SochValue::Text(s)
2482 }
2483 }
2484}
2485
2486// ============================================================================
2487// Helper functions for columnar query result building
2488// ============================================================================
2489
2490/// Push a SochValue into a TypedColumn
2491fn push_value_to_typed_column(col: &mut CoreTypedColumn, value: Option<SochValue>) {
2492 match value {
2493 None => push_null_to_typed_column(col),
2494 Some(v) => match (col, v) {
2495 (
2496 CoreTypedColumn::Int64 {
2497 values,
2498 validity,
2499 stats,
2500 },
2501 SochValue::Int(i),
2502 ) => {
2503 values.push(i);
2504 validity.push(true);
2505 stats.update_i64(i);
2506 }
2507 (
2508 CoreTypedColumn::Int64 {
2509 values,
2510 validity,
2511 stats,
2512 },
2513 SochValue::UInt(u),
2514 ) => {
2515 values.push(u as i64);
2516 validity.push(true);
2517 stats.update_i64(u as i64);
2518 }
2519 (
2520 CoreTypedColumn::UInt64 {
2521 values,
2522 validity,
2523 stats,
2524 },
2525 SochValue::UInt(u),
2526 ) => {
2527 values.push(u);
2528 validity.push(true);
2529 stats.update_i64(u as i64);
2530 }
2531 (
2532 CoreTypedColumn::UInt64 {
2533 values,
2534 validity,
2535 stats,
2536 },
2537 SochValue::Int(i),
2538 ) => {
2539 values.push(i as u64);
2540 validity.push(true);
2541 stats.update_i64(i);
2542 }
2543 (
2544 CoreTypedColumn::Float64 {
2545 values,
2546 validity,
2547 stats,
2548 },
2549 SochValue::Float(f),
2550 ) => {
2551 values.push(f);
2552 validity.push(true);
2553 stats.update_f64(f);
2554 }
2555 (
2556 CoreTypedColumn::Float64 {
2557 values,
2558 validity,
2559 stats,
2560 },
2561 SochValue::Int(i),
2562 ) => {
2563 values.push(i as f64);
2564 validity.push(true);
2565 stats.update_f64(i as f64);
2566 }
2567 (
2568 CoreTypedColumn::Text {
2569 offsets,
2570 data,
2571 validity,
2572 stats,
2573 },
2574 SochValue::Text(s),
2575 ) => {
2576 data.extend_from_slice(s.as_bytes());
2577 offsets.push(data.len() as u32);
2578 validity.push(true);
2579 stats.row_count += 1;
2580 }
2581 (
2582 CoreTypedColumn::Binary {
2583 offsets,
2584 data,
2585 validity,
2586 stats,
2587 },
2588 SochValue::Binary(b),
2589 ) => {
2590 data.extend_from_slice(&b);
2591 offsets.push(data.len() as u32);
2592 validity.push(true);
2593 stats.row_count += 1;
2594 }
2595 (
2596 CoreTypedColumn::Bool {
2597 values,
2598 validity,
2599 stats,
2600 len,
2601 },
2602 SochValue::Bool(b),
2603 ) => {
2604 let idx = *len;
2605 *len += 1;
2606 let num_words = (*len).div_ceil(64);
2607 while values.len() < num_words {
2608 values.push(0);
2609 }
2610 if b {
2611 let word = idx / 64;
2612 let bit = idx % 64;
2613 values[word] |= 1 << bit;
2614 }
2615 validity.push(true);
2616 stats.row_count += 1;
2617 }
2618 // Type mismatch - push as null
2619 (col, _) => push_null_to_typed_column(col),
2620 },
2621 }
2622}
2623
2624/// Push a null value into a TypedColumn
2625fn push_null_to_typed_column(col: &mut CoreTypedColumn) {
2626 match col {
2627 CoreTypedColumn::Int64 {
2628 values,
2629 validity,
2630 stats,
2631 } => {
2632 values.push(0);
2633 validity.push(false);
2634 stats.update_null();
2635 }
2636 CoreTypedColumn::UInt64 {
2637 values,
2638 validity,
2639 stats,
2640 } => {
2641 values.push(0);
2642 validity.push(false);
2643 stats.update_null();
2644 }
2645 CoreTypedColumn::Float64 {
2646 values,
2647 validity,
2648 stats,
2649 } => {
2650 values.push(0.0);
2651 validity.push(false);
2652 stats.update_null();
2653 }
2654 CoreTypedColumn::Text {
2655 offsets,
2656 data: _,
2657 validity,
2658 stats,
2659 } => {
2660 offsets.push(offsets.last().copied().unwrap_or(0));
2661 validity.push(false);
2662 stats.update_null();
2663 }
2664 CoreTypedColumn::Binary {
2665 offsets,
2666 data: _,
2667 validity,
2668 stats,
2669 } => {
2670 offsets.push(offsets.last().copied().unwrap_or(0));
2671 validity.push(false);
2672 stats.update_null();
2673 }
2674 CoreTypedColumn::Bool {
2675 values,
2676 validity,
2677 stats,
2678 len,
2679 } => {
2680 *len += 1;
2681 let num_words = (*len).div_ceil(64);
2682 while values.len() < num_words {
2683 values.push(0);
2684 }
2685 validity.push(false);
2686 stats.update_null();
2687 }
2688 }
2689}
2690
2691#[cfg(test)]
2692mod tests {
2693 use super::*;
2694 use tempfile::tempdir;
2695
2696 #[test]
2697 fn test_database_open_close() {
2698 let dir = tempdir().unwrap();
2699 let db = Database::open(dir.path()).unwrap();
2700
2701 // Should be able to begin a transaction
2702 let txn = db.begin_transaction().unwrap();
2703 assert!(txn.txn_id > 0);
2704
2705 db.abort(txn).unwrap();
2706 db.shutdown().unwrap();
2707 }
2708
2709 #[test]
2710 fn test_database_put_get() {
2711 let dir = tempdir().unwrap();
2712 let db = Database::open(dir.path()).unwrap();
2713
2714 let txn = db.begin_transaction().unwrap();
2715 db.put(txn, b"key1", b"value1").unwrap();
2716
2717 let val = db.get(txn, b"key1").unwrap();
2718 assert_eq!(val, Some(b"value1".to_vec()));
2719
2720 db.commit(txn).unwrap();
2721
2722 // New transaction should see committed data
2723 let txn2 = db.begin_transaction().unwrap();
2724 let val = db.get(txn2, b"key1").unwrap();
2725 assert_eq!(val, Some(b"value1".to_vec()));
2726 db.abort(txn2).unwrap();
2727 }
2728
2729 #[test]
2730 fn test_database_encryption_end_to_end() {
2731 use crate::durable_storage::StorageEncryption;
2732 use crate::encryption::EncryptionKey;
2733
2734 let dir = tempdir().unwrap();
2735 let kek = [0x5Au8; 32];
2736 let enc = || StorageEncryption::with_kek(EncryptionKey::new(kek), "test");
2737
2738 // Create encrypted DB through the kernel, write committed data.
2739 {
2740 let db = Database::open_with_config_and_encryption(
2741 dir.path(),
2742 DatabaseConfig::default(),
2743 enc(),
2744 )
2745 .unwrap();
2746 assert!(db.storage.is_encrypted());
2747 let txn = db.begin_transaction().unwrap();
2748 db.put(txn, b"secret", b"value").unwrap();
2749 db.commit(txn).unwrap();
2750 db.shutdown().unwrap();
2751 }
2752 assert!(dir.path().join("keyring.json").exists());
2753
2754 // Reopen with the correct KEK -> committed data round-trips.
2755 {
2756 let db = Database::open_with_config_and_encryption(
2757 dir.path(),
2758 DatabaseConfig::default(),
2759 enc(),
2760 )
2761 .unwrap();
2762 let txn = db.begin_transaction().unwrap();
2763 assert_eq!(db.get(txn, b"secret").unwrap(), Some(b"value".to_vec()));
2764 db.abort(txn).unwrap();
2765 db.shutdown().unwrap();
2766 }
2767
2768 // Wrong KEK -> fail closed (no silent plaintext/empty open).
2769 {
2770 let wrong = Database::open_with_config_and_encryption(
2771 dir.path(),
2772 DatabaseConfig::default(),
2773 StorageEncryption::with_kek(EncryptionKey::new([0u8; 32]), "test"),
2774 );
2775 assert!(wrong.is_err(), "wrong KEK must fail closed at the kernel");
2776 }
2777
2778 // No key on an encrypted DB -> fail closed.
2779 {
2780 let no_key = Database::open_with_config(dir.path(), DatabaseConfig::default());
2781 assert!(
2782 no_key.is_err(),
2783 "encrypted DB opened without key must fail closed"
2784 );
2785 }
2786 }
2787
2788 #[test]
2789 fn test_database_concurrent_encryption() {
2790 use crate::durable_storage::StorageEncryption;
2791 use crate::encryption::EncryptionKey;
2792
2793 let dir = tempdir().unwrap();
2794 let kek = [0x33u8; 32];
2795
2796 {
2797 let db = Database::open_concurrent_with_config_and_encryption(
2798 dir.path(),
2799 DatabaseConfig::default(),
2800 StorageEncryption::with_kek(EncryptionKey::new(kek), "test"),
2801 )
2802 .unwrap();
2803 assert!(db.storage.is_encrypted());
2804 let txn = db.begin_transaction().unwrap();
2805 db.put(txn, b"ck", b"cv").unwrap();
2806 db.commit(txn).unwrap();
2807 db.shutdown().unwrap();
2808 }
2809
2810 // Reopen concurrent with correct key.
2811 let db = Database::open_concurrent_with_config_and_encryption(
2812 dir.path(),
2813 DatabaseConfig::default(),
2814 StorageEncryption::with_kek(EncryptionKey::new(kek), "test"),
2815 )
2816 .unwrap();
2817 let txn = db.begin_transaction().unwrap();
2818 assert_eq!(db.get(txn, b"ck").unwrap(), Some(b"cv".to_vec()));
2819 db.abort(txn).unwrap();
2820 db.shutdown().unwrap();
2821 }
2822
2823 #[test]
2824 fn test_database_path_api() {
2825 let dir = tempdir().unwrap();
2826 let db = Database::open(dir.path()).unwrap();
2827
2828 let txn = db.begin_transaction().unwrap();
2829
2830 // Write using path API
2831 db.put_path(txn, "users/1/name", b"Alice").unwrap();
2832 db.put_path(txn, "users/1/email", b"alice@example.com")
2833 .unwrap();
2834 db.put_path(txn, "users/2/name", b"Bob").unwrap();
2835
2836 db.commit(txn).unwrap();
2837
2838 // Scan path prefix
2839 let txn2 = db.begin_transaction().unwrap();
2840 let results = db.scan_path(txn2, "users/1/").unwrap();
2841 assert_eq!(results.len(), 2);
2842
2843 db.abort(txn2).unwrap();
2844 }
2845
2846 #[test]
2847 fn test_database_table_api() {
2848 let dir = tempdir().unwrap();
2849 let db = Database::open(dir.path()).unwrap();
2850
2851 // Register table
2852 db.register_table(TableSchema {
2853 name: "users".to_string(),
2854 columns: vec![
2855 ColumnDef {
2856 name: "name".to_string(),
2857 col_type: ColumnType::Text,
2858 nullable: false,
2859 },
2860 ColumnDef {
2861 name: "age".to_string(),
2862 col_type: ColumnType::Int64,
2863 nullable: true,
2864 },
2865 ],
2866 })
2867 .unwrap();
2868
2869 // Insert row
2870 let txn = db.begin_transaction().unwrap();
2871 let mut values = HashMap::new();
2872 values.insert("name".to_string(), SochValue::Text("Alice".to_string()));
2873 values.insert("age".to_string(), SochValue::Int(30));
2874
2875 db.insert_row(txn, "users", 1, &values).unwrap();
2876 db.commit(txn).unwrap();
2877
2878 // Read row
2879 let txn2 = db.begin_transaction().unwrap();
2880 let row = db.read_row(txn2, "users", 1, None).unwrap();
2881 assert!(row.is_some());
2882
2883 let row = row.unwrap();
2884 assert_eq!(row.get("name"), Some(&SochValue::Text("Alice".to_string())));
2885
2886 db.abort(txn2).unwrap();
2887 }
2888
2889 #[test]
2890 fn test_query_does_not_bleed_across_prefix_sibling_tables() {
2891 // Regression: row keys are "table/row_id", so a bare-name prefix scan of
2892 // "user" used to also match "users/..." (cross-table bleed), and a
2893 // single-character table name like "t" tripped the 2-byte scan guard.
2894 // Querying a registered table by name must scan exactly "table/".
2895 let dir = tempdir().unwrap();
2896 let db = Database::open(dir.path()).unwrap();
2897
2898 let col = || ColumnDef {
2899 name: "name".to_string(),
2900 col_type: ColumnType::Text,
2901 nullable: false,
2902 };
2903
2904 // Two sibling tables whose names share a prefix, plus a 1-char table.
2905 for table in ["user", "users", "t"] {
2906 db.register_table(TableSchema {
2907 name: table.to_string(),
2908 columns: vec![col()],
2909 })
2910 .unwrap();
2911 }
2912
2913 let txn = db.begin_transaction().unwrap();
2914 let mut v_user = HashMap::new();
2915 v_user.insert("name".to_string(), SochValue::Text("in_user".to_string()));
2916 db.insert_row(txn, "user", 1, &v_user).unwrap();
2917
2918 let mut v_users = HashMap::new();
2919 v_users.insert("name".to_string(), SochValue::Text("in_users".to_string()));
2920 db.insert_row(txn, "users", 1, &v_users).unwrap();
2921 db.insert_row(txn, "users", 2, &v_users).unwrap();
2922
2923 let mut v_t = HashMap::new();
2924 v_t.insert("name".to_string(), SochValue::Text("in_t".to_string()));
2925 db.insert_row(txn, "t", 1, &v_t).unwrap();
2926 db.commit(txn).unwrap();
2927
2928 // "user" must return exactly its one row — no bleed from "users".
2929 let txn_r = db.begin_read_only_fast();
2930 let user_rows = db.query(txn_r, "user").execute().unwrap().rows;
2931 assert_eq!(user_rows.len(), 1, "querying 'user' bled into 'users'");
2932 assert_eq!(
2933 user_rows[0].get("name"),
2934 Some(&SochValue::Text("in_user".to_string()))
2935 );
2936
2937 // "users" must return its two rows.
2938 let users_rows = db.query(txn_r, "users").execute().unwrap().rows;
2939 assert_eq!(users_rows.len(), 2);
2940
2941 // Single-character table name must be queryable (was rejected by the
2942 // 2-byte scan guard before the "table/" prefix fix).
2943 let t_rows = db.query(txn_r, "t").execute().unwrap().rows;
2944 assert_eq!(t_rows.len(), 1);
2945 assert_eq!(
2946 t_rows[0].get("name"),
2947 Some(&SochValue::Text("in_t".to_string()))
2948 );
2949 db.abort_read_only_fast(txn_r);
2950 }
2951
2952 #[test]
2953 fn test_database_query_builder() {
2954 let dir = tempdir().unwrap();
2955 let db = Database::open(dir.path()).unwrap();
2956
2957 // Insert test data
2958 let txn = db.begin_transaction().unwrap();
2959 db.put_path(txn, "docs/1/title", b"Hello").unwrap();
2960 db.put_path(txn, "docs/1/content", b"World").unwrap();
2961 db.put_path(txn, "docs/2/title", b"Foo").unwrap();
2962 db.put_path(txn, "docs/2/content", b"Bar").unwrap();
2963 db.commit(txn).unwrap();
2964
2965 // Query with limit
2966 let txn2 = db.begin_transaction().unwrap();
2967 let result = db.query(txn2, "docs/").limit(1).execute().unwrap();
2968
2969 assert_eq!(result.rows.len(), 1);
2970 db.abort(txn2).unwrap();
2971 }
2972
2973 #[test]
2974 fn test_database_crash_recovery() {
2975 let dir = tempdir().unwrap();
2976
2977 // Write and commit
2978 {
2979 // Use open_without_lock for crash simulation tests
2980 let db = Database::open_without_lock(dir.path()).unwrap();
2981 // Set sync mode to FULL to ensure data is persisted before "crash"
2982 db.storage.set_sync_mode(2);
2983 let txn = db.begin_transaction().unwrap();
2984 db.put(txn, b"persist", b"this").unwrap();
2985 db.commit(txn).unwrap();
2986 // Don't call shutdown - simulate crash
2987 std::mem::forget(db);
2988 }
2989
2990 // Reopen - should recover
2991 {
2992 let db = Database::open_without_lock(dir.path()).unwrap();
2993 let txn = db.begin_transaction().unwrap();
2994 let val = db.get(txn, b"persist").unwrap();
2995 assert_eq!(val, Some(b"this".to_vec()));
2996 db.abort(txn).unwrap();
2997 }
2998 }
2999
3000 #[test]
3001 fn test_columnar_row_view_zero_alloc() {
3002 use sochdb_core::columnar::TypedColumn;
3003
3004 // Build a small columnar result: 3 rows × 2 columns (id: i64, name: text)
3005 let mut id_col = TypedColumn::new_int64();
3006 id_col.push_i64(Some(1));
3007 id_col.push_i64(Some(2));
3008 id_col.push_i64(Some(3));
3009
3010 let mut name_col = TypedColumn::new_text();
3011 name_col.push_text(Some("Alice"));
3012 name_col.push_text(Some("Bob"));
3013 name_col.push_text(None); // NULL
3014
3015 let cr = ColumnarQueryResult {
3016 columns: vec!["id".to_string(), "name".to_string()],
3017 data: vec![id_col, name_col],
3018 row_count: 3,
3019 bytes_read: 0,
3020 };
3021
3022 // row_view access — zero HashMap allocation
3023 let row0 = cr.row_view(0).unwrap();
3024 assert_eq!(row0.get("id"), Some(SochValue::Int(1)));
3025 assert_eq!(row0.get("name"), Some(SochValue::Text("Alice".to_string())));
3026 assert_eq!(row0.get("nonexistent"), None);
3027
3028 let row2 = cr.row_view(2).unwrap();
3029 assert_eq!(row2.get("id"), Some(SochValue::Int(3)));
3030 assert_eq!(row2.get("name"), Some(SochValue::Null));
3031
3032 // Out of bounds
3033 assert!(cr.row_view(3).is_none());
3034
3035 // values() — positional
3036 let vals = row0.values();
3037 assert_eq!(vals.len(), 2);
3038 assert_eq!(vals[0], SochValue::Int(1));
3039
3040 // to_map() — backward compat
3041 let map = row0.to_map();
3042 assert_eq!(map.get("id"), Some(&SochValue::Int(1)));
3043 }
3044
3045 #[test]
3046 fn test_columnar_into_query_result() {
3047 use sochdb_core::columnar::TypedColumn;
3048
3049 let mut score_col = TypedColumn::new_float64();
3050 score_col.push_f64(Some(9.5));
3051 score_col.push_f64(Some(8.2));
3052
3053 let cr = ColumnarQueryResult {
3054 columns: vec!["score".to_string()],
3055 data: vec![score_col],
3056 row_count: 2,
3057 bytes_read: 100,
3058 };
3059
3060 let qr = cr.into_query_result();
3061 assert_eq!(qr.rows.len(), 2);
3062 assert_eq!(qr.rows[0].get("score"), Some(&SochValue::Float(9.5)));
3063 assert_eq!(qr.rows[1].get("score"), Some(&SochValue::Float(8.2)));
3064 assert_eq!(qr.bytes_read, 100);
3065 }
3066}