sochdb_storage/database.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SochDB Database Kernel
19//!
20//! The shared core that powers both embedded mode (`SochConnection::open`) and
21//! server mode (`sochdb-server`). This is the "SQLite engine" equivalent.
22//!
23//! ## Architecture
24//!
25//! ```text
26//! ┌──────────────────────────────────────────────────────────────────┐
27//! │ Database Kernel │
28//! │ Arc<Database> - shared by all connections │
29//! ├──────────────────────────────────────────────────────────────────┤
30//! │ │
31//! │ ┌─────────────────┐ ┌─────────────────┐ ┌────────────────┐ │
32//! │ │ DurableStorage │ │ Catalog │ │ Vector Index │ │
33//! │ │ (WAL + MVCC) │ │ (Schema Mgmt) │ │ (HNSW/Vamana) │ │
34//! │ └────────┬────────┘ └────────┬────────┘ └───────┬────────┘ │
35//! │ │ │ │ │
36//! │ └─────────────────────┴─────────────────────┘ │
37//! │ │ │
38//! │ ┌─────────────────────────────────────────────────────────────┐ │
39//! │ │ Query Executor (Path-Native) │ │
40//! │ │ - Path resolution: O(|path|) │ │
41//! │ │ - Column projection: 80% I/O reduction │ │
42//! │ │ - Context selection: Token-aware chunking │ │
43//! │ └─────────────────────────────────────────────────────────────┘ │
44//! │ │
45//! └──────────────────────────────────────────────────────────────────┘
46//!
47//! Deployment Modes:
48//! ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
49//! │ Embedded │ │ IPC Server │ │ TCP Server │
50//! │ (in-proc) │ │ (Unix sock)│ │ (remote) │
51//! └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
52//! │ │ │
53//! └─────────────────┴─────────────────┘
54//! │
55//! Arc<Database>
56//! ```
57//!
58//! ## Latency Model
59//!
60//! Let K = kernel processing cost for a query
61//!
62//! - Embedded: L_emb ≈ K (function call overhead negligible)
63//! - IPC: L_ipc ≈ K + δ_ipc (δ_ipc = ~10-50µs for Unix socket)
64//! - TCP: L_tcp ≈ K + δ_net (δ_net = 100µs-10ms depending on network)
65//!
66//! For LLM context queries where K >> δ_ipc, IPC is "nearly embedded".
67
68use std::collections::HashMap;
69use std::path::{Path, PathBuf};
70use std::sync::Arc;
71use std::sync::atomic::{AtomicU64, Ordering};
72
73use dashmap::DashMap;
74use parking_lot::RwLock;
75
76use crate::durable_storage::{DurableStorage, TransactionMode};
77use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
78use crate::key_buffer::KeyBuffer;
79use crate::packed_row::{PackedColumnDef, PackedColumnType, PackedRow, PackedTableSchema};
80use sochdb_core::catalog::Catalog;
81use sochdb_core::{Result, SochDBError, SochValue};
82
83// Re-export key types
84pub use crate::durable_storage::RecoveryStats;
85
86/// Database configuration
87#[derive(Debug, Clone)]
88pub struct DatabaseConfig {
89 /// Enable group commit for better write throughput
90 pub group_commit: bool,
91 /// Maximum memory for memtables before flush (bytes)
92 pub memtable_size_limit: usize,
93 /// Enable WAL for durability
94 pub wal_enabled: bool,
95 /// Sync mode: fsync after every commit vs periodic
96 pub sync_mode: SyncMode,
97 /// Read-only mode
98 pub read_only: bool,
99
100 /// Enable ordered index for O(log N) prefix scans
101 ///
102 /// # Deprecation Notice
103 ///
104 /// **DEPRECATED since 0.2.0**: Use `default_index_policy` instead for per-table control.
105 /// This field will be removed in v0.3.0.
106 ///
107 /// ## Migration Guide
108 ///
109 /// Replace:
110 /// ```ignore
111 /// DatabaseConfig { enable_ordered_index: true, .. } // Old API
112 /// DatabaseConfig { enable_ordered_index: false, .. } // Old API
113 /// ```
114 ///
115 /// With:
116 /// ```ignore
117 /// DatabaseConfig { default_index_policy: IndexPolicy::ScanOptimized, .. } // Ordered index enabled
118 /// DatabaseConfig { default_index_policy: IndexPolicy::WriteOptimized, .. } // Ordered index disabled
119 /// ```
120 ///
121 /// ## Behavior
122 ///
123 /// When false, saves ~134 ns/op on writes (20% speedup)
124 /// but scan_prefix becomes O(N) instead of O(log N + K).
125 ///
126 /// Set to false for write-heavy workloads without range scans.
127 #[deprecated(
128 since = "0.2.0",
129 note = "Use `default_index_policy` field instead. This field will be removed in v0.3.0. \
130 Set IndexPolicy::ScanOptimized for ordered index, WriteOptimized to disable."
131 )]
132 ///
133 /// Set to false for write-heavy workloads without range scans.
134 pub enable_ordered_index: bool,
135 /// Group commit configuration
136 pub group_commit_config: GroupCommitSettings,
137 /// Default index policy for tables not explicitly configured
138 ///
139 /// This replaces the global `enable_ordered_index` toggle with
140 /// fine-grained per-table control. Use `index_registry` to configure
141 /// individual tables.
142 ///
143 /// | Policy | Insert Cost | Scan Cost | Use Case |
144 /// |----------------|-------------|----------------|------------------------|
145 /// | WriteOptimized | O(1) | O(N) | High-write, rare scan |
146 /// | Balanced | O(1) amort | O(output+logK) | Mixed OLTP |
147 /// | ScanOptimized | O(log N) | O(logN + K) | Analytics, range query |
148 /// | AppendOnly | O(1) | O(N) | Time-series logs |
149 pub default_index_policy: IndexPolicy,
150}
151
152/// Group commit settings - mirrors SQLite's WAL mode tuning
153///
154/// ## Performance Model
155///
156/// Without group commit: Throughput = 1 / L_fsync ≈ 200 commits/sec (L=5ms)
157/// With group commit (batch size K): Throughput = K / L_fsync = K × 200 commits/sec
158///
159/// For K=100: 20,000 commits/sec (100× speedup)
160///
161/// ## SQLite Comparison
162///
163/// | Setting | SQLite Equivalent |
164/// |----------------------------|-----------------------------|
165/// | batch_size = 1 | PRAGMA synchronous = FULL |
166/// | batch_size = 100 | WAL mode with batching |
167/// | max_wait_us = 0 | No delay, immediate flush |
168/// | max_wait_us = 10000 | Up to 10ms delay for batch |
169#[derive(Debug, Clone)]
170pub struct GroupCommitSettings {
171 /// Minimum batch size before flush (default: 1)
172 pub min_batch_size: usize,
173 /// Maximum batch size (default: 1000)
174 pub max_batch_size: usize,
175 /// Maximum wait time before flush in microseconds (default: 10000 = 10ms)
176 pub max_wait_us: u64,
177 /// Expected fsync latency in microseconds (for adaptive sizing)
178 pub fsync_latency_us: u64,
179}
180
181impl Default for GroupCommitSettings {
182 fn default() -> Self {
183 Self {
184 min_batch_size: 1,
185 max_batch_size: 1000,
186 max_wait_us: 10_000, // 10ms
187 fsync_latency_us: 5_000, // 5ms
188 }
189 }
190}
191
192impl GroupCommitSettings {
193 /// High throughput preset - maximizes batching
194 pub fn high_throughput() -> Self {
195 Self {
196 min_batch_size: 50,
197 max_batch_size: 5000,
198 max_wait_us: 50_000, // 50ms
199 fsync_latency_us: 5_000,
200 }
201 }
202
203 /// Low latency preset - minimal batching
204 pub fn low_latency() -> Self {
205 Self {
206 min_batch_size: 1,
207 max_batch_size: 10,
208 max_wait_us: 1_000, // 1ms
209 fsync_latency_us: 5_000,
210 }
211 }
212
213 /// Calculate optimal batch size using Little's Law
214 ///
215 /// N* = sqrt(2 × L_fsync × λ / C_wait)
216 ///
217 /// # Arguments
218 /// * `arrival_rate` - Operations per second
219 /// * `wait_cost` - Cost coefficient for waiting (0.0-1.0)
220 pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
221 let l_fsync = self.fsync_latency_us as f64 / 1_000_000.0;
222 let n_star = (2.0 * l_fsync * arrival_rate / wait_cost.max(0.001)).sqrt();
223 (n_star as usize).clamp(self.min_batch_size, self.max_batch_size)
224 }
225}
226
227impl Default for DatabaseConfig {
228 #[allow(deprecated)]
229 fn default() -> Self {
230 Self {
231 group_commit: true,
232 memtable_size_limit: 64 * 1024 * 1024, // 64MB
233 wal_enabled: true,
234 sync_mode: SyncMode::Normal,
235 read_only: false,
236 enable_ordered_index: true, // Default: enabled for compatibility
237 group_commit_config: GroupCommitSettings::default(),
238 default_index_policy: IndexPolicy::Balanced, // New default: balanced OLTP policy
239 }
240 }
241}
242
243impl DatabaseConfig {
244 /// Create config optimized for throughput (Fast Mode)
245 ///
246 /// - Disables ordered index (saves ~134 ns/op)
247 /// - Uses high-throughput group commit settings
248 /// - Suitable for append-only workloads
249 #[allow(deprecated)]
250 pub fn throughput_optimized() -> Self {
251 Self {
252 group_commit: true,
253 memtable_size_limit: 128 * 1024 * 1024, // 128MB
254 wal_enabled: true,
255 sync_mode: SyncMode::Normal,
256 read_only: false,
257 enable_ordered_index: false,
258 group_commit_config: GroupCommitSettings::high_throughput(),
259 default_index_policy: IndexPolicy::WriteOptimized, // No ordered index overhead
260 }
261 }
262
263 /// Create config optimized for latency
264 ///
265 /// - Keeps ordered index for fast range scans
266 /// - Uses low-latency group commit settings
267 /// - Suitable for OLTP workloads
268 #[allow(deprecated)]
269 pub fn latency_optimized() -> Self {
270 Self {
271 group_commit: true,
272 memtable_size_limit: 32 * 1024 * 1024, // 32MB
273 wal_enabled: true,
274 sync_mode: SyncMode::Full,
275 read_only: false,
276 enable_ordered_index: true,
277 group_commit_config: GroupCommitSettings::low_latency(),
278 default_index_policy: IndexPolicy::ScanOptimized, // Fast range scans
279 }
280 }
281
282 /// Create config matching SQLite defaults
283 #[allow(deprecated)]
284 pub fn sqlite_compatible() -> Self {
285 Self {
286 group_commit: false, // SQLite default is single-commit
287 memtable_size_limit: 64 * 1024 * 1024,
288 wal_enabled: true,
289 sync_mode: SyncMode::Normal, // PRAGMA synchronous = NORMAL
290 read_only: false,
291 enable_ordered_index: true,
292 group_commit_config: GroupCommitSettings::default(),
293 default_index_policy: IndexPolicy::Balanced, // Good default for mixed workloads
294 }
295 }
296
297 /// Get effective ordered index setting, derived from `default_index_policy`.
298 ///
299 /// This is the shim method for the deprecated `enable_ordered_index` field.
300 /// It returns `true` if the policy requires an ordered index (ScanOptimized),
301 /// and `false` otherwise (WriteOptimized, Balanced, AppendOnly).
302 ///
303 /// # Policy Mapping
304 ///
305 /// | IndexPolicy | Returns |
306 /// |------------------|---------|
307 /// | ScanOptimized | true |
308 /// | Balanced | false |
309 /// | WriteOptimized | false |
310 /// | AppendOnly | false |
311 ///
312 /// Note: `Balanced` uses lazy compaction rather than a live ordered index,
313 /// so it returns `false` for the low-level memtable config but still supports
314 /// efficient range scans via sorted runs.
315 pub fn effective_ordered_index(&self) -> bool {
316 matches!(self.default_index_policy, IndexPolicy::ScanOptimized)
317 }
318}
319
320/// WAL sync mode - matches SQLite's PRAGMA synchronous semantics
321///
322/// | SochDB | SQLite | Description |
323/// |------------|--------------|------------------------------------------------|
324/// | Off | OFF (0) | No fsync, risk of data loss on crash |
325/// | Normal | NORMAL (1) | Fsync at checkpoints, not every commit |
326/// | Full | FULL (2) | Fsync every commit (safest, slowest) |
327///
328/// # Performance vs Durability Trade-offs
329///
330/// - **Off**: ~10x faster than Full, but may lose last ~100ms of data on crash
331/// - **Normal**: ~5x faster than Full, durable at checkpoint boundaries
332/// - **Full**: Every commit is fsync'd, no data loss possible
333///
334/// # SQLite Compatibility
335///
336/// ```sql
337/// -- SQLite equivalent settings
338/// PRAGMA synchronous = OFF; -- SyncMode::Off
339/// PRAGMA synchronous = NORMAL; -- SyncMode::Normal
340/// PRAGMA synchronous = FULL; -- SyncMode::Full
341/// ```
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum SyncMode {
344 /// No fsync (equivalent to SQLite PRAGMA synchronous = OFF)
345 ///
346 /// Writes buffered in OS, may lose data on power failure.
347 /// Use for non-critical data or bulk loading.
348 Off = 0,
349
350 /// Fsync at checkpoints (equivalent to SQLite PRAGMA synchronous = NORMAL)
351 ///
352 /// Default mode. Syncs WAL at checkpoint boundaries.
353 /// Good balance of performance and durability.
354 Normal = 1,
355
356 /// Fsync every commit (equivalent to SQLite PRAGMA synchronous = FULL)
357 ///
358 /// Safest mode. Every commit is immediately durable.
359 /// Required for financial/critical data.
360 Full = 2,
361}
362
363impl SyncMode {
364 /// Convert from SQLite synchronous pragma value
365 pub fn from_sqlite_pragma(value: u32) -> Self {
366 match value {
367 0 => SyncMode::Off,
368 1 => SyncMode::Normal,
369 _ => SyncMode::Full, // 2+ treated as Full
370 }
371 }
372
373 /// Convert to SQLite synchronous pragma value
374 pub fn to_sqlite_pragma(self) -> u32 {
375 self as u32
376 }
377
378 /// Parse from string (case-insensitive)
379 pub fn parse(s: &str) -> Option<Self> {
380 match s.to_ascii_uppercase().as_str() {
381 "OFF" | "0" => Some(SyncMode::Off),
382 "NORMAL" | "1" => Some(SyncMode::Normal),
383 "FULL" | "2" => Some(SyncMode::Full),
384 _ => None,
385 }
386 }
387}
388
389/// Table schema for the kernel
390#[derive(Debug, Clone)]
391pub struct TableSchema {
392 pub name: String,
393 pub columns: Vec<ColumnDef>,
394}
395
396/// Column definition
397#[derive(Debug, Clone)]
398pub struct ColumnDef {
399 pub name: String,
400 pub col_type: ColumnType,
401 pub nullable: bool,
402}
403
404/// Column types
405#[derive(Debug, Clone, Copy, PartialEq, Eq)]
406pub enum ColumnType {
407 Int64,
408 UInt64,
409 Float64,
410 Text,
411 Binary,
412 Bool,
413}
414
415/// Transaction handle for kernel operations
416#[derive(Debug, Clone, Copy)]
417pub struct TxnHandle {
418 pub txn_id: u64,
419 pub snapshot_ts: u64,
420}
421
422/// Query result from the kernel
423#[derive(Debug, Clone)]
424pub struct QueryResult {
425 /// Column names
426 pub columns: Vec<String>,
427 /// Row data (each row is a map of column -> value)
428 pub rows: Vec<HashMap<String, SochValue>>,
429 /// Number of rows scanned (for stats)
430 pub rows_scanned: usize,
431 /// Bytes read from storage
432 pub bytes_read: usize,
433}
434
435impl QueryResult {
436 /// Empty result
437 pub fn empty() -> Self {
438 Self {
439 columns: vec![],
440 rows: vec![],
441 rows_scanned: 0,
442 bytes_read: 0,
443 }
444 }
445
446 /// Convert to TOON format for token efficiency
447 pub fn to_toon(&self) -> String {
448 if self.rows.is_empty() {
449 return "[]".to_string();
450 }
451
452 // TOON format: table[N]{cols}: row1; row2; ...
453 let n = self.rows.len();
454 let cols = self.columns.join(",");
455
456 let rows_str: Vec<String> = self
457 .rows
458 .iter()
459 .map(|row| {
460 self.columns
461 .iter()
462 .map(|c| {
463 row.get(c)
464 .map(format_soch_value)
465 .unwrap_or_else(|| "∅".to_string())
466 })
467 .collect::<Vec<_>>()
468 .join(",")
469 })
470 .collect();
471
472 format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
473 }
474}
475
476fn format_soch_value(v: &SochValue) -> String {
477 match v {
478 SochValue::Null => "∅".to_string(),
479 SochValue::Int(i) => i.to_string(),
480 SochValue::UInt(u) => u.to_string(),
481 SochValue::Float(f) => format!("{:.6}", f),
482 SochValue::Text(s) => {
483 if s.contains(',') || s.contains(';') {
484 format!("\"{}\"", s.replace('"', "\\\""))
485 } else {
486 s.clone()
487 }
488 }
489 SochValue::Bool(b) => if *b { "T" } else { "F" }.to_string(),
490 SochValue::Binary(b) => format!("b64:{}", base64_encode(b)),
491 _ => format!("{:?}", v),
492 }
493}
494
495fn base64_encode(data: &[u8]) -> String {
496 // Simple base64 encoding
497 const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
498 let mut result = String::new();
499
500 for chunk in data.chunks(3) {
501 let b0 = chunk[0] as usize;
502 let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
503 let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
504
505 result.push(ALPHABET[b0 >> 2] as char);
506 result.push(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
507
508 if chunk.len() > 1 {
509 result.push(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
510 } else {
511 result.push('=');
512 }
513
514 if chunk.len() > 2 {
515 result.push(ALPHABET[b2 & 0x3f] as char);
516 } else {
517 result.push('=');
518 }
519 }
520
521 result
522}
523
524// ============================================================================
525// Columnar Query Results - SIMD-friendly result format
526// ============================================================================
527
528use sochdb_core::TypedColumn as CoreTypedColumn;
529
530/// Columnar query result - SIMD-friendly format for analytics
531///
532/// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, this returns
533/// column-oriented `Vec<TypedColumn>` for efficient vectorized operations.
534///
535/// ## Memory Layout
536///
537/// Row-oriented (standard):
538/// ```text
539/// Row 0: [id=1, name="Alice", score=85]
540/// Row 1: [id=2, name="Bob", score=92]
541/// Row 2: [id=3, name="Carol", score=78]
542/// ```
543///
544/// Column-oriented (this format):
545/// ```text
546/// id: [1, 2, 3] ← contiguous i64 array (SIMD-friendly)
547/// name: ["Alice", "Bob", "Carol"] ← Arrow-style string encoding
548/// score: [85, 92, 78] ← contiguous i64 array
549/// ```
550///
551/// ## Performance Benefits
552///
553/// - SIMD: Column sums use vectorized instructions (~8× faster)
554/// - Cache: Sequential access pattern maximizes L1/L2 cache hits
555/// - Compression: Same-type data compresses better (5-10× typical)
556/// - Filtering: Bitmap operations instead of row iteration
557///
558/// ## Usage
559///
560/// ```ignore
561/// let result = db.query(txn, "users")
562/// .columns(&["id", "score"])
563/// .as_columnar()?;
564///
565/// // SIMD sum
566/// let total_score = result.column("score")
567/// .map(|c| c.sum_i64())
568/// .unwrap_or(0);
569///
570/// // Stats
571/// println!("Rows: {}, Memory: {} bytes", result.row_count(), result.memory_size());
572/// ```
573#[derive(Debug, Clone)]
574pub struct ColumnarQueryResult {
575 /// Column names in order
576 pub columns: Vec<String>,
577 /// Column data - each TypedColumn contains all values for one column
578 pub data: Vec<CoreTypedColumn>,
579 /// Number of rows
580 pub row_count: usize,
581 /// Bytes read from storage
582 pub bytes_read: usize,
583}
584
585impl ColumnarQueryResult {
586 /// Create an empty result
587 pub fn empty() -> Self {
588 Self {
589 columns: vec![],
590 data: vec![],
591 row_count: 0,
592 bytes_read: 0,
593 }
594 }
595
596 /// Get column by name
597 pub fn column(&self, name: &str) -> Option<&CoreTypedColumn> {
598 self.columns
599 .iter()
600 .position(|c| c == name)
601 .and_then(|idx| self.data.get(idx))
602 }
603
604 /// Get column index by name
605 pub fn column_index(&self, name: &str) -> Option<usize> {
606 self.columns.iter().position(|c| c == name)
607 }
608
609 /// Number of rows
610 pub fn row_count(&self) -> usize {
611 self.row_count
612 }
613
614 /// Number of columns
615 pub fn column_count(&self) -> usize {
616 self.columns.len()
617 }
618
619 /// Total memory size in bytes
620 pub fn memory_size(&self) -> usize {
621 self.data.iter().map(|c| c.memory_size()).sum()
622 }
623
624 /// Sum of an i64 column (SIMD-optimized)
625 pub fn sum_i64(&self, column: &str) -> Option<i64> {
626 self.column(column).map(|c| c.sum_i64())
627 }
628
629 /// Sum of an f64 column (SIMD-optimized)
630 pub fn sum_f64(&self, column: &str) -> Option<f64> {
631 self.column(column).map(|c| c.sum_f64())
632 }
633
634 /// Get column statistics (min, max, null count)
635 pub fn column_stats(&self, column: &str) -> Option<&sochdb_core::columnar::ColumnStats> {
636 self.column(column).map(|c| c.stats())
637 }
638
639 /// Convert to TOON format (token-efficient)
640 pub fn to_toon(&self) -> String {
641 if self.row_count == 0 {
642 return "[]".to_string();
643 }
644
645 let n = self.row_count;
646 let cols = self.columns.join(",");
647
648 // Build rows from columns
649 let mut rows_str = Vec::with_capacity(n);
650 for i in 0..n {
651 let row: Vec<String> = self
652 .data
653 .iter()
654 .map(|col| format_columnar_value(col, i))
655 .collect();
656 rows_str.push(row.join(","));
657 }
658
659 format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
660 }
661}
662
663/// Format a single value from a TypedColumn at index
664fn format_columnar_value(col: &CoreTypedColumn, idx: usize) -> String {
665 match col {
666 CoreTypedColumn::Int64 {
667 values, validity, ..
668 } => {
669 if validity.is_valid(idx) && idx < values.len() {
670 values[idx].to_string()
671 } else {
672 "∅".to_string()
673 }
674 }
675 CoreTypedColumn::UInt64 {
676 values, validity, ..
677 } => {
678 if validity.is_valid(idx) && idx < values.len() {
679 values[idx].to_string()
680 } else {
681 "∅".to_string()
682 }
683 }
684 CoreTypedColumn::Float64 {
685 values, validity, ..
686 } => {
687 if validity.is_valid(idx) && idx < values.len() {
688 format!("{:.6}", values[idx])
689 } else {
690 "∅".to_string()
691 }
692 }
693 CoreTypedColumn::Text {
694 offsets,
695 data,
696 validity,
697 ..
698 } => {
699 if validity.is_valid(idx) && idx + 1 < offsets.len() {
700 let start = offsets[idx] as usize;
701 let end = offsets[idx + 1] as usize;
702 std::str::from_utf8(&data[start..end])
703 .map(|s| {
704 if s.contains(',') || s.contains(';') {
705 format!("\"{}\"", s.replace('"', "\\\""))
706 } else {
707 s.to_string()
708 }
709 })
710 .unwrap_or_else(|_| "∅".to_string())
711 } else {
712 "∅".to_string()
713 }
714 }
715 CoreTypedColumn::Binary {
716 offsets,
717 data,
718 validity,
719 ..
720 } => {
721 if validity.is_valid(idx) && idx + 1 < offsets.len() {
722 let start = offsets[idx] as usize;
723 let end = offsets[idx + 1] as usize;
724 format!("b64:{}", base64_encode(&data[start..end]))
725 } else {
726 "∅".to_string()
727 }
728 }
729 CoreTypedColumn::Bool {
730 values,
731 validity,
732 len,
733 ..
734 } => {
735 if validity.is_valid(idx) && idx < *len {
736 let word = idx / 64;
737 let bit = idx % 64;
738 if (values[word] >> bit) & 1 == 1 {
739 "T"
740 } else {
741 "F"
742 }
743 .to_string()
744 } else {
745 "∅".to_string()
746 }
747 }
748 }
749}
750
751/// Vector search result
752#[derive(Debug, Clone)]
753pub struct VectorSearchResult {
754 pub id: u64,
755 pub distance: f32,
756 pub metadata: Option<HashMap<String, SochValue>>,
757}
758
759/// The SochDB Database Kernel
760///
761/// This is the shared core used by both embedded (`SochConnection`) and
762/// server (`sochdb-server`) modes. It owns all storage, catalog, and
763/// indexing components.
764///
765/// # Thread Safety
766///
767/// The Database is fully thread-safe via internal synchronization:
768/// - Multiple readers can operate concurrently (MVCC snapshots)
769/// - Writers coordinate through WAL and group commit
770/// - All state is behind Arc/RwLock for shared access
771///
772/// # Concurrency Modes
773///
774/// ## Standard Mode (Single Process)
775/// - Uses exclusive file lock (`flock(LOCK_EX)`)
776/// - Best for: Scripts, notebooks, CLI tools
777/// - Open with: `Database::open(path)`
778///
779/// ## Concurrent Mode (Multi-Process/Web Apps)
780/// - Uses lock-free MVCC for reads, single-writer coordination for writes
781/// - Best for: Web servers, Flask/FastAPI apps, hot reloading
782/// - Open with: `Database::open_concurrent(path)`
783///
784/// # Example
785///
786/// ```ignore
787/// // Standard mode (single process)
788/// let db = Database::open("./my_data")?;
789///
790/// // Concurrent mode (multi-reader, single-writer)
791/// let db = Database::open_concurrent("./my_data")?;
792///
793/// // Begin a transaction
794/// let txn = db.begin_transaction()?;
795///
796/// // Write data
797/// db.put(txn, b"user:1:name", b"Alice")?;
798///
799/// // Commit
800/// db.commit(txn)?;
801/// ```
802#[allow(dead_code)]
803pub struct Database {
804 /// Path to database directory
805 path: PathBuf,
806 /// Durable storage layer (WAL + MVCC + memtable)
807 storage: Arc<DurableStorage>,
808 /// Concurrent MVCC manager (for concurrent mode)
809 concurrent_mvcc: Option<Arc<crate::mvcc_concurrent::ConcurrentMvcc>>,
810 /// Schema catalog
811 catalog: Arc<RwLock<Catalog>>,
812 /// Registered table schemas (name -> schema) - lock-free for reads
813 tables: DashMap<String, TableSchema>,
814 /// Cached packed schemas for fast insert (name -> packed schema)
815 packed_schemas: DashMap<String, PackedTableSchema>,
816 /// Per-table index policy registry
817 index_registry: Arc<TableIndexRegistry>,
818 /// Configuration
819 config: DatabaseConfig,
820 /// Statistics
821 stats: DatabaseStats,
822 /// Shutdown flag
823 shutdown: AtomicU64,
824 /// Whether this database is in concurrent mode
825 is_concurrent: bool,
826}
827
828/// Database statistics
829struct DatabaseStats {
830 transactions_started: AtomicU64,
831 transactions_committed: AtomicU64,
832 transactions_aborted: AtomicU64,
833 queries_executed: AtomicU64,
834 bytes_written: AtomicU64,
835 bytes_read: AtomicU64,
836}
837
838impl DatabaseStats {
839 fn new() -> Self {
840 Self {
841 transactions_started: AtomicU64::new(0),
842 transactions_committed: AtomicU64::new(0),
843 transactions_aborted: AtomicU64::new(0),
844 queries_executed: AtomicU64::new(0),
845 bytes_written: AtomicU64::new(0),
846 bytes_read: AtomicU64::new(0),
847 }
848 }
849}
850
851/// Public statistics snapshot
852#[derive(Debug, Clone)]
853pub struct Stats {
854 pub transactions_started: u64,
855 pub transactions_committed: u64,
856 pub transactions_aborted: u64,
857 pub queries_executed: u64,
858 pub bytes_written: u64,
859 pub bytes_read: u64,
860}
861
862impl Database {
863 /// Open or create a database at the given path.
864 ///
865 /// This is the primary entry point, similar to `sqlite3_open()`.
866 /// If the database exists, it will be opened and WAL recovery performed.
867 /// If it doesn't exist, a new database will be created.
868 ///
869 /// # Arguments
870 ///
871 /// * `path` - Directory path for the database files
872 ///
873 /// # Returns
874 ///
875 /// An `Arc<Database>` that can be shared across threads and connections.
876 pub fn open<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
877 Self::open_with_config(path, DatabaseConfig::default())
878 }
879
880 /// Open without locking (for testing crash recovery scenarios)
881 ///
882 /// # Safety
883 /// This should ONLY be used in tests that simulate crashes by forgetting
884 /// the storage instance. In production, always use `open()`.
885 #[cfg(test)]
886 pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
887 let path = path.as_ref().to_path_buf();
888 let config = DatabaseConfig::default();
889
890 let storage = Arc::new(DurableStorage::open_without_lock(&path)?);
891
892 let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
893 config.default_index_policy,
894 ));
895
896 let db = Arc::new(Self {
897 path: path.clone(),
898 storage,
899 concurrent_mvcc: None,
900 catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
901 tables: DashMap::new(),
902 packed_schemas: DashMap::new(),
903 index_registry,
904 config,
905 stats: DatabaseStats::new(),
906 shutdown: AtomicU64::new(0),
907 is_concurrent: false,
908 });
909
910 db.recover()?;
911 Ok(db)
912 }
913
914 /// Open with custom configuration
915 pub fn open_with_config<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> Result<Arc<Self>> {
916 let path = path.as_ref().to_path_buf();
917
918 // Use IndexPolicy-based storage configuration for automatic memtable selection
919 // This derives ordered index and memtable type from the policy
920 let storage = Arc::new(DurableStorage::open_with_policy(
921 &path,
922 config.default_index_policy,
923 config.group_commit,
924 )?);
925
926 // Create index registry with default policy from config
927 let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
928 config.default_index_policy,
929 ));
930
931 let db = Arc::new(Self {
932 path: path.clone(),
933 storage,
934 concurrent_mvcc: None,
935 catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
936 tables: DashMap::new(),
937 packed_schemas: DashMap::new(),
938 index_registry,
939 config,
940 stats: DatabaseStats::new(),
941 shutdown: AtomicU64::new(0),
942 is_concurrent: false,
943 });
944
945 // Perform crash recovery if needed
946 db.recover()?;
947
948 Ok(db)
949 }
950
951 /// Open database in concurrent mode (multi-reader, single-writer)
952 ///
953 /// This mode allows multiple processes to access the database simultaneously:
954 /// - **Readers**: Lock-free, concurrent access via MVCC snapshots
955 /// - **Writers**: Single-writer coordination through atomic locks
956 ///
957 /// # Use Cases
958 ///
959 /// - Web applications (Flask, FastAPI, Django)
960 /// - Hot reloading development servers
961 /// - Multi-process worker pools
962 /// - Any scenario with concurrent read access
963 ///
964 /// # Performance
965 ///
966 /// - Read latency: ~100ns (lock-free atomic operations)
967 /// - Write latency: ~60μs amortized (with group commit)
968 /// - Concurrent readers: Up to 1024 (configurable)
969 ///
970 /// # Example
971 ///
972 /// ```ignore
973 /// // Multiple processes can open the same database
974 /// let db = Database::open_concurrent("./my_data")?;
975 ///
976 /// // Reads are lock-free
977 /// let value = db.get(b"key")?;
978 ///
979 /// // Writes coordinate automatically
980 /// let txn = db.begin_transaction()?;
981 /// db.put(txn, b"key", b"value")?;
982 /// db.commit(txn)?;
983 /// ```
984 pub fn open_concurrent<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
985 Self::open_concurrent_with_config(path, DatabaseConfig::default())
986 }
987
988 /// Open database in concurrent mode with custom configuration
989 pub fn open_concurrent_with_config<P: AsRef<Path>>(
990 path: P,
991 config: DatabaseConfig,
992 ) -> Result<Arc<Self>> {
993 use crate::mvcc_concurrent::ConcurrentMvcc;
994
995 let path = path.as_ref().to_path_buf();
996 std::fs::create_dir_all(&path)?;
997
998 // Open concurrent MVCC manager (this uses shared memory, not exclusive lock)
999 let concurrent_mvcc = Arc::new(ConcurrentMvcc::open(&path)?);
1000
1001 // Open storage WITHOUT exclusive lock (concurrent MVCC handles coordination)
1002 // We use a special internal method that skips the file lock
1003 let storage = Arc::new(DurableStorage::open_for_concurrent(&path, config.default_index_policy)?);
1004
1005 // Create index registry with default policy from config
1006 let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
1007 config.default_index_policy,
1008 ));
1009
1010 let db = Arc::new(Self {
1011 path: path.clone(),
1012 storage,
1013 concurrent_mvcc: Some(concurrent_mvcc),
1014 catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1015 tables: DashMap::new(),
1016 packed_schemas: DashMap::new(),
1017 index_registry,
1018 config,
1019 stats: DatabaseStats::new(),
1020 shutdown: AtomicU64::new(0),
1021 is_concurrent: true,
1022 });
1023
1024 // Perform crash recovery if needed
1025 db.recover()?;
1026
1027 // Clean up any stale readers from crashed processes
1028 if let Some(ref mvcc) = db.concurrent_mvcc {
1029 mvcc.cleanup_stale_readers();
1030 }
1031
1032 Ok(db)
1033 }
1034
1035 /// Check if database is in concurrent mode
1036 #[inline]
1037 pub fn is_concurrent(&self) -> bool {
1038 self.is_concurrent
1039 }
1040
1041 /// Perform crash recovery
1042 fn recover(&self) -> Result<RecoveryStats> {
1043 self.storage.recover()
1044 }
1045
1046 /// Get database path
1047 pub fn path(&self) -> &Path {
1048 &self.path
1049 }
1050
1051 // =========================================================================
1052 // Transaction API
1053 // =========================================================================
1054
1055 /// Begin a new transaction
1056 pub fn begin_transaction(&self) -> Result<TxnHandle> {
1057 self.stats
1058 .transactions_started
1059 .fetch_add(1, Ordering::Relaxed);
1060 let txn_id = self.storage.begin_transaction()?;
1061
1062 // Get snapshot timestamp from MVCC
1063 // For now, use txn_id as a proxy (the real snapshot_ts is managed internally)
1064 Ok(TxnHandle {
1065 txn_id,
1066 snapshot_ts: txn_id,
1067 })
1068 }
1069
1070 /// Begin a read-only transaction (optimized: no SSI tracking)
1071 ///
1072 /// Read-only transactions skip SSI read tracking, reducing overhead
1073 /// from ~82ns to ~32ns per read (2.6x faster).
1074 ///
1075 /// Use this for:
1076 /// - SELECT queries that don't modify data
1077 /// - Analytics and reporting queries
1078 /// - Snapshot reads for backup
1079 pub fn begin_read_only(&self) -> Result<TxnHandle> {
1080 self.stats
1081 .transactions_started
1082 .fetch_add(1, Ordering::Relaxed);
1083 let txn_id = self.storage.begin_with_mode(TransactionMode::ReadOnly)?;
1084 Ok(TxnHandle {
1085 txn_id,
1086 snapshot_ts: txn_id,
1087 })
1088 }
1089
1090 /// Begin a write-only transaction (optimized: no read tracking)
1091 ///
1092 /// Write-only transactions skip read tracking, improving insert
1093 /// throughput for bulk loading scenarios.
1094 ///
1095 /// Use this for:
1096 /// - Bulk data imports
1097 /// - Append-only logging tables
1098 /// - ETL pipelines
1099 pub fn begin_write_only(&self) -> Result<TxnHandle> {
1100 self.stats
1101 .transactions_started
1102 .fetch_add(1, Ordering::Relaxed);
1103 let txn_id = self.storage.begin_with_mode(TransactionMode::WriteOnly)?;
1104 Ok(TxnHandle {
1105 txn_id,
1106 snapshot_ts: txn_id,
1107 })
1108 }
1109
1110 /// Commit a transaction
1111 ///
1112 /// In concurrent mode, acquires the shared writer lock to ensure
1113 /// WAL writes are serialized across processes, and forces a flush+sync
1114 /// so that subsequent processes see the committed data.
1115 pub fn commit(&self, txn: TxnHandle) -> Result<u64> {
1116 self.stats
1117 .transactions_committed
1118 .fetch_add(1, Ordering::Relaxed);
1119
1120 // In concurrent mode, acquire the cross-process writer lock
1121 // to serialize WAL commits across processes
1122 let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1123 Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1124 } else {
1125 None
1126 };
1127
1128 let commit_ts = self.storage.commit(txn.txn_id)?;
1129
1130 // In concurrent mode, force flush+sync so other processes can see
1131 // the committed data when they open the DB or run recovery.
1132 // Without this, the BufWriter may hold data that isn't visible
1133 // to other processes reading the WAL file.
1134 if self.is_concurrent {
1135 self.storage.flush_wal()?;
1136 self.storage.fsync()?;
1137 }
1138
1139 // Notify concurrent MVCC of commit for GC tracking
1140 if let Some(ref mvcc) = self.concurrent_mvcc {
1141 mvcc.on_commit();
1142 }
1143
1144 Ok(commit_ts)
1145 }
1146
1147 /// Abort a transaction
1148 pub fn abort(&self, txn: TxnHandle) -> Result<()> {
1149 self.stats
1150 .transactions_aborted
1151 .fetch_add(1, Ordering::Relaxed);
1152 self.storage.abort(txn.txn_id)
1153 }
1154
1155 // =========================================================================
1156 // Per-Table Index Policy API
1157 // =========================================================================
1158
1159 /// Configure index policy for a table
1160 ///
1161 /// This allows fine-grained control over write/scan trade-offs per table:
1162 ///
1163 /// | Policy | Insert Cost | Scan Cost | Use Case |
1164 /// |----------------|-------------|----------------|------------------------|
1165 /// | WriteOptimized | O(1) | O(N) | High-write, rare scan |
1166 /// | Balanced | O(1) amort | O(output+logK) | Mixed OLTP |
1167 /// | ScanOptimized | O(log N) | O(logN + K) | Analytics, range query |
1168 /// | AppendOnly | O(1) | O(N) | Time-series logs |
1169 ///
1170 /// # Example
1171 ///
1172 /// ```ignore
1173 /// // Fast inserts for logs table (no ordered index overhead)
1174 /// db.set_table_index_policy("logs", IndexPolicy::WriteOptimized);
1175 ///
1176 /// // Efficient range scans for analytics table
1177 /// db.set_table_index_policy("analytics", IndexPolicy::ScanOptimized);
1178 ///
1179 /// // Balanced for OLTP tables
1180 /// db.set_table_index_policy("users", IndexPolicy::Balanced);
1181 /// ```
1182 pub fn set_table_index_policy(&self, table: &str, policy: IndexPolicy) {
1183 self.index_registry.configure_table(
1184 TableIndexConfig::new(table, policy)
1185 );
1186 }
1187
1188 /// Get the index policy for a table
1189 pub fn get_table_index_policy(&self, table: &str) -> IndexPolicy {
1190 self.index_registry.get_policy(table)
1191 }
1192
1193 /// Get the index registry for advanced configuration
1194 pub fn index_registry(&self) -> &Arc<TableIndexRegistry> {
1195 &self.index_registry
1196 }
1197
1198 // =========================================================================
1199 // Key-Value API (Low-level)
1200 // =========================================================================
1201
1202 /// Put a key-value pair
1203 ///
1204 /// In concurrent mode, acquires the shared writer lock to ensure
1205 /// WAL writes are serialized across processes.
1206 pub fn put(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1207 self.stats
1208 .bytes_written
1209 .fetch_add((key.len() + value.len()) as u64, Ordering::Relaxed);
1210
1211 // In concurrent mode, acquire cross-process writer lock
1212 let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1213 Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1214 } else {
1215 None
1216 };
1217
1218 // Use write_refs to avoid unnecessary allocations
1219 self.storage.write_refs(txn.txn_id, key, value)
1220 }
1221
1222 /// Batch put multiple key-value pairs with reduced overhead
1223 ///
1224 /// This amortizes per-operation costs over the entire batch:
1225 /// - Single DashMap lookup
1226 /// - Batch MVCC tracking
1227 /// - Batch memtable writes
1228 ///
1229 /// For 100+ entries, this is 2-3x faster than individual puts.
1230 ///
1231 /// # Example
1232 ///
1233 /// ```ignore
1234 /// let writes: Vec<(&[u8], &[u8])> = vec![
1235 /// (b"key1", b"value1"),
1236 /// (b"key2", b"value2"),
1237 /// (b"key3", b"value3"),
1238 /// ];
1239 /// db.put_batch(txn, &writes)?;
1240 /// ```
1241 pub fn put_batch(&self, txn: TxnHandle, writes: &[(&[u8], &[u8])]) -> Result<()> {
1242 let bytes: u64 = writes
1243 .iter()
1244 .map(|(k, v)| (k.len() + v.len()) as u64)
1245 .sum();
1246 self.stats.bytes_written.fetch_add(bytes, Ordering::Relaxed);
1247
1248 // In concurrent mode, acquire cross-process writer lock
1249 let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1250 Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1251 } else {
1252 None
1253 };
1254
1255 self.storage.write_batch_refs(txn.txn_id, writes)
1256 }
1257
1258 /// Get a value by key
1259 pub fn get(&self, txn: TxnHandle, key: &[u8]) -> Result<Option<Vec<u8>>> {
1260 let result = self.storage.read(txn.txn_id, key)?;
1261 if let Some(ref data) = result {
1262 self.stats
1263 .bytes_read
1264 .fetch_add(data.len() as u64, Ordering::Relaxed);
1265 }
1266 Ok(result)
1267 }
1268
1269 /// Delete a key
1270 pub fn delete(&self, txn: TxnHandle, key: &[u8]) -> Result<()> {
1271 self.storage.delete(txn.txn_id, key.to_vec())
1272 }
1273
1274 /// Minimum prefix length for scan operations.
1275 /// Prevents expensive full-table scans by requiring a meaningful prefix.
1276 pub const MIN_SCAN_PREFIX_LEN: usize = 2;
1277
1278 /// Scan keys with a prefix (enforces minimum prefix length for safety).
1279 ///
1280 /// # Prefix Safety
1281 ///
1282 /// To prevent accidental full-table scans, this method requires a minimum
1283 /// prefix length of 2 bytes. Use `scan_unchecked` for internal operations
1284 /// that need empty/short prefixes.
1285 ///
1286 /// # Errors
1287 ///
1288 /// Returns `SochDBError::InvalidInput` if prefix is too short.
1289 pub fn scan(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1290 if prefix.len() < Self::MIN_SCAN_PREFIX_LEN {
1291 return Err(SochDBError::InvalidArgument(format!(
1292 "Prefix too short: {} bytes (minimum {} required). \
1293 Use scan_unchecked() for unrestricted scans.",
1294 prefix.len(),
1295 Self::MIN_SCAN_PREFIX_LEN
1296 )));
1297 }
1298 self.scan_unchecked(txn, prefix)
1299 }
1300
1301 /// Scan keys with a prefix without length validation.
1302 ///
1303 /// # Warning
1304 ///
1305 /// This method allows empty/short prefixes which can cause expensive
1306 /// full-table scans. Use `scan()` unless you specifically need unrestricted
1307 /// prefix access for internal operations.
1308 pub fn scan_unchecked(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1309 let results = self.storage.scan(txn.txn_id, prefix)?;
1310 let bytes: u64 = results
1311 .iter()
1312 .map(|(k, v)| (k.len() + v.len()) as u64)
1313 .sum();
1314 self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1315 Ok(results)
1316 }
1317
1318 /// Scan keys in range
1319 pub fn scan_range(
1320 &self,
1321 txn: TxnHandle,
1322 start: &[u8],
1323 end: &[u8],
1324 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1325 let results = self.storage.scan_range(txn.txn_id, start, end)?;
1326 let bytes: u64 = results
1327 .iter()
1328 .map(|(k, v)| (k.len() + v.len()) as u64)
1329 .sum();
1330 self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1331 Ok(results)
1332 }
1333
1334 /// Streaming scan for very large result sets
1335 ///
1336 /// Returns an iterator that yields (key, value) pairs without
1337 /// materializing the entire result set. Use this for large scans
1338 /// where memory efficiency is important.
1339 ///
1340 /// ## Performance
1341 ///
1342 /// - Memory: O(1) per iteration vs O(N) for scan_range
1343 /// - Latency: First result available immediately vs waiting for all results
1344 /// - Throughput: Slightly lower due to per-item overhead
1345 ///
1346 /// ## Usage
1347 ///
1348 /// ```ignore
1349 /// for result in db.scan_range_iter(txn, b"start", b"end") {
1350 /// let (key, value) = result?;
1351 /// // Process immediately - no need to wait for all results
1352 /// }
1353 /// ```
1354 pub fn scan_range_iter<'a>(
1355 &'a self,
1356 txn: TxnHandle,
1357 start: &'a [u8],
1358 end: &'a [u8],
1359 ) -> impl Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a {
1360 let stats = &self.stats;
1361 self.storage
1362 .scan_range_iter(txn.txn_id, start, end)
1363 .map(move |item| {
1364 stats.bytes_read.fetch_add(
1365 (item.0.len() + item.1.len()) as u64,
1366 Ordering::Relaxed,
1367 );
1368 Ok(item)
1369 })
1370 }
1371
1372 /// Flush memtable to WAL/Disk
1373 pub fn flush(&self) -> Result<()> {
1374 self.storage.fsync()
1375 }
1376
1377 // =========================================================================
1378 // Path-Native API (SochDB's differentiator)
1379 // =========================================================================
1380
1381 /// Get storage statistics
1382 pub fn storage_stats(&self) -> crate::durable_storage::StorageStats {
1383 self.storage.stats()
1384 }
1385
1386 /// Put a value at a path
1387 ///
1388 /// Path format: "collection/doc_id/field" or "table.row_id.column"
1389 /// Resolution is O(|path|), not O(log N) like B-tree.
1390 pub fn put_path(&self, txn: TxnHandle, path: &str, value: &[u8]) -> Result<()> {
1391 self.put(txn, path.as_bytes(), value)
1392 }
1393
1394 /// Get a value at a path
1395 pub fn get_path(&self, txn: TxnHandle, path: &str) -> Result<Option<Vec<u8>>> {
1396 self.get(txn, path.as_bytes())
1397 }
1398
1399 /// Delete at a path
1400 pub fn delete_path(&self, txn: TxnHandle, path: &str) -> Result<()> {
1401 self.delete(txn, path.as_bytes())
1402 }
1403
1404 /// Scan a path prefix
1405 ///
1406 /// Returns all key-value pairs where key starts with prefix.
1407 /// Useful for: "users/123/" -> all fields of user 123
1408 pub fn scan_path(&self, txn: TxnHandle, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
1409 self.stats.queries_executed.fetch_add(1, Ordering::Relaxed);
1410
1411 let results = self.scan(txn, prefix.as_bytes())?;
1412
1413 Ok(results
1414 .into_iter()
1415 .filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
1416 .collect())
1417 }
1418
1419 // =========================================================================
1420 // Query API
1421 // =========================================================================
1422
1423 /// Execute a path query and return results
1424 ///
1425 /// This is the main query interface for LLM context retrieval.
1426 /// Supports:
1427 /// - Path prefix matching
1428 /// - Column projection (for I/O reduction)
1429 /// - Limit/offset
1430 pub fn query(&self, txn: TxnHandle, path_prefix: &str) -> QueryBuilder<'_> {
1431 QueryBuilder::new(self, txn, path_prefix.to_string())
1432 }
1433
1434 // =========================================================================
1435 // Table API (Higher-level abstraction)
1436 // =========================================================================
1437
1438 /// Register a table schema
1439 pub fn register_table(&self, schema: TableSchema) -> Result<()> {
1440 if self.tables.contains_key(&schema.name) {
1441 return Err(SochDBError::InvalidArgument(format!(
1442 "Table '{}' already exists",
1443 schema.name
1444 )));
1445 }
1446 // Cache the packed schema for fast inserts
1447 let packed_schema = Self::to_packed_schema(&schema);
1448 self.packed_schemas
1449 .insert(schema.name.clone(), packed_schema);
1450 self.tables.insert(schema.name.clone(), schema);
1451 Ok(())
1452 }
1453
1454 /// Get table schema
1455 pub fn get_table_schema(&self, name: &str) -> Option<TableSchema> {
1456 self.tables.get(name).map(|s| s.clone())
1457 }
1458
1459 /// List all tables
1460 pub fn list_tables(&self) -> Vec<String> {
1461 self.tables.iter().map(|e| e.key().clone()).collect()
1462 }
1463 /// Convert TableSchema to PackedTableSchema for efficient storage
1464 fn to_packed_schema(schema: &TableSchema) -> PackedTableSchema {
1465 let columns = schema
1466 .columns
1467 .iter()
1468 .map(|col| PackedColumnDef {
1469 name: col.name.clone(),
1470 col_type: match col.col_type {
1471 ColumnType::Int64 => PackedColumnType::Int64,
1472 ColumnType::UInt64 => PackedColumnType::UInt64,
1473 ColumnType::Float64 => PackedColumnType::Float64,
1474 ColumnType::Text => PackedColumnType::Text,
1475 ColumnType::Binary => PackedColumnType::Binary,
1476 ColumnType::Bool => PackedColumnType::Bool,
1477 },
1478 nullable: col.nullable,
1479 })
1480 .collect();
1481
1482 PackedTableSchema::new(&schema.name, columns)
1483 }
1484
1485 /// Insert a row into a table
1486 ///
1487 /// Uses packed row format: stores entire row as single key-value pair.
1488 /// This reduces write amplification from 4× to 1× for a 4-column table.
1489 ///
1490 /// # Performance
1491 /// - Before: 4 columns × (WAL entry + MVCC version) = 4 writes
1492 /// - After: 1 packed row = 1 write
1493 /// - Improvement: ~4× fewer WAL entries, ~48% less I/O overhead
1494 pub fn insert_row(
1495 &self,
1496 txn: TxnHandle,
1497 table: &str,
1498 row_id: u64,
1499 values: &HashMap<String, SochValue>,
1500 ) -> Result<()> {
1501 // Use cached packed schema - single DashMap lookup, no clone
1502 let packed_schema = self
1503 .packed_schemas
1504 .get(table)
1505 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1506
1507 // Pack the row using cached schema
1508 let packed_row = PackedRow::pack(&packed_schema, values);
1509
1510 // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1511 let key = KeyBuffer::format_row_key(table, row_id);
1512
1513 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1514
1515 Ok(())
1516 }
1517
1518 /// Read a row from a table
1519 ///
1520 /// Reads packed row and extracts requested columns in O(k) time.
1521 /// Column projection happens in memory, not storage - all columns are fetched.
1522 pub fn read_row(
1523 &self,
1524 txn: TxnHandle,
1525 table: &str,
1526 row_id: u64,
1527 columns: Option<&[&str]>,
1528 ) -> Result<Option<HashMap<String, SochValue>>> {
1529 let schema = self
1530 .tables
1531 .get(table)
1532 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1533
1534 // Read the packed row with a single key lookup using KeyBuffer
1535 let key = KeyBuffer::format_row_key(table, row_id);
1536 let bytes = match self.get(txn, key.as_bytes())? {
1537 Some(b) => b,
1538 None => return Ok(None),
1539 };
1540
1541 // Use cached packed schema
1542 let packed_schema = self
1543 .packed_schemas
1544 .get(table)
1545 .ok_or_else(|| SochDBError::Internal("Packed schema not found".into()))?;
1546 let packed_row = PackedRow::from_bytes(bytes, packed_schema.num_columns())?;
1547
1548 // Determine which columns to return
1549 let cols_to_read: Vec<&str> = match columns {
1550 Some(c) => c.to_vec(),
1551 None => schema.columns.iter().map(|c| c.name.as_str()).collect(),
1552 };
1553
1554 let mut row = HashMap::new();
1555 for col_name in cols_to_read {
1556 if let Some(idx) = packed_schema.column_index(col_name)
1557 && let Some(col_def) = packed_schema.column(idx)
1558 && let Some(value) = packed_row.get_column(idx, col_def.col_type)
1559 {
1560 row.insert(col_name.to_string(), value);
1561 }
1562 }
1563
1564 Ok(Some(row))
1565 }
1566
1567 /// Insert multiple rows efficiently in a batch
1568 ///
1569 /// This method accumulates all rows and writes them with fewer WAL syncs.
1570 /// Ideal for bulk loading scenarios.
1571 ///
1572 /// # Performance
1573 /// - Uses group commit to batch fsync operations
1574 /// - Expected throughput: 500K-1M rows/sec depending on row size
1575 pub fn insert_rows_batch(
1576 &self,
1577 txn: TxnHandle,
1578 table: &str,
1579 rows: &[(u64, HashMap<String, SochValue>)],
1580 ) -> Result<usize> {
1581 // Use cached packed schema
1582 let packed_schema = self
1583 .packed_schemas
1584 .get(table)
1585 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1586
1587 let mut count = 0;
1588
1589 for (row_id, values) in rows {
1590 // Pack and write using KeyBuffer for efficient key construction
1591 let packed_row = PackedRow::pack(&packed_schema, values);
1592 let key = KeyBuffer::format_row_key(table, *row_id);
1593 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1594 count += 1;
1595 }
1596
1597 Ok(count)
1598 }
1599
1600 /// Ultra-fast raw put - bypasses all validation
1601 ///
1602 /// Use when you've already validated the data and just need speed.
1603 /// This is ~10× faster than insert_row() for bulk inserts.
1604 #[inline]
1605 pub fn put_raw(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1606 self.storage.write_refs(txn.txn_id, key, value)
1607 }
1608
1609 /// Zero-allocation insert - fastest path for bulk inserts
1610 ///
1611 /// Takes values as a slice in schema column order, avoiding HashMap overhead.
1612 ///
1613 /// # Arguments
1614 /// * `txn` - Transaction handle
1615 /// * `table` - Table name
1616 /// * `row_id` - Row identifier
1617 /// * `values` - Values in schema column order (None = NULL)
1618 ///
1619 /// # Performance
1620 /// - Eliminates ~6 allocations per row vs insert_row()
1621 /// - Expected: 1.2M-1.5M inserts/sec
1622 ///
1623 /// # Example
1624 /// ```ignore
1625 /// let values: &[Option<&SochValue>] = &[
1626 /// Some(&SochValue::Int(1)),
1627 /// Some(&SochValue::Text("Alice".into())),
1628 /// None, // NULL
1629 /// ];
1630 /// db.insert_row_slice(txn, "users", 1, values)?;
1631 /// ```
1632 #[inline]
1633 pub fn insert_row_slice(
1634 &self,
1635 txn: TxnHandle,
1636 table: &str,
1637 row_id: u64,
1638 values: &[Option<&SochValue>],
1639 ) -> Result<()> {
1640 // Use cached packed schema - single DashMap lookup
1641 let packed_schema = self
1642 .packed_schemas
1643 .get(table)
1644 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1645
1646 // Validate column count matches
1647 if values.len() != packed_schema.num_columns() {
1648 return Err(SochDBError::InvalidArgument(format!(
1649 "Expected {} columns, got {}",
1650 packed_schema.num_columns(),
1651 values.len()
1652 )));
1653 }
1654
1655 // Pack using zero-allocation path
1656 let packed_row = PackedRow::pack_slice(&packed_schema, values);
1657
1658 // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1659 let key = KeyBuffer::format_row_key(table, row_id);
1660
1661 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1662 Ok(())
1663 }
1664
1665 // =========================================================================
1666 // Maintenance
1667 // =========================================================================
1668
1669 /// Force fsync to disk
1670 pub fn fsync(&self) -> Result<()> {
1671 self.storage.fsync()
1672 }
1673
1674 /// Create a checkpoint
1675 pub fn checkpoint(&self) -> Result<u64> {
1676 self.storage.checkpoint()
1677 }
1678
1679 /// Truncate the WAL file after a checkpoint.
1680 ///
1681 /// See [`DurableStorage::truncate_wal`] for safety notes.
1682 pub fn truncate_wal(&self) -> Result<()> {
1683 self.storage.truncate_wal()
1684 }
1685
1686 /// Run garbage collection
1687 pub fn gc(&self) -> usize {
1688 self.storage.gc()
1689 }
1690
1691 /// Get database statistics
1692 pub fn stats(&self) -> Stats {
1693 Stats {
1694 transactions_started: self.stats.transactions_started.load(Ordering::Relaxed),
1695 transactions_committed: self.stats.transactions_committed.load(Ordering::Relaxed),
1696 transactions_aborted: self.stats.transactions_aborted.load(Ordering::Relaxed),
1697 queries_executed: self.stats.queries_executed.load(Ordering::Relaxed),
1698 bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
1699 bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
1700 }
1701 }
1702
1703 /// Shutdown the database gracefully
1704 pub fn shutdown(&self) -> Result<()> {
1705 if self.shutdown.swap(1, Ordering::SeqCst) == 1 {
1706 return Ok(()); // Already shutting down
1707 }
1708
1709 // Flush any pending writes
1710 self.fsync()?;
1711
1712 // Create clean shutdown marker
1713 let marker = self.path.join(".clean_shutdown");
1714 std::fs::write(&marker, b"ok")?;
1715
1716 Ok(())
1717 }
1718}
1719
1720impl Drop for Database {
1721 fn drop(&mut self) {
1722 // Try graceful shutdown if not already done
1723 if self.shutdown.load(Ordering::SeqCst) == 0 {
1724 let _ = self.fsync();
1725 let marker = self.path.join(".clean_shutdown");
1726 let _ = std::fs::write(&marker, b"ok");
1727 }
1728 }
1729}
1730
1731/// Query builder for fluent query construction
1732pub struct QueryBuilder<'a> {
1733 db: &'a Database,
1734 txn: TxnHandle,
1735 path_prefix: String,
1736 columns: Option<Vec<String>>,
1737 limit: Option<usize>,
1738 offset: Option<usize>,
1739}
1740
1741impl<'a> QueryBuilder<'a> {
1742 fn new(db: &'a Database, txn: TxnHandle, path_prefix: String) -> Self {
1743 Self {
1744 db,
1745 txn,
1746 path_prefix,
1747 columns: None,
1748 limit: None,
1749 offset: None,
1750 }
1751 }
1752
1753 /// Select specific columns (for I/O reduction)
1754 pub fn columns(mut self, cols: &[&str]) -> Self {
1755 self.columns = Some(cols.iter().map(|s| s.to_string()).collect());
1756 self
1757 }
1758
1759 /// Limit results
1760 pub fn limit(mut self, n: usize) -> Self {
1761 self.limit = Some(n);
1762 self
1763 }
1764
1765 /// Skip results
1766 pub fn offset(mut self, n: usize) -> Self {
1767 self.offset = Some(n);
1768 self
1769 }
1770
1771 /// Execute the query
1772 ///
1773 /// Scans packed rows and unpacks them. Each key is "table/row_id" pointing to a packed row.
1774 pub fn execute(self) -> Result<QueryResult> {
1775 self.db
1776 .stats
1777 .queries_executed
1778 .fetch_add(1, Ordering::Relaxed);
1779
1780 // Get schema for the table if we're querying a table
1781 let table_name = self
1782 .path_prefix
1783 .split('/')
1784 .next()
1785 .unwrap_or(&self.path_prefix);
1786 let schema = self.db.tables.get(table_name).map(|s| s.clone());
1787
1788 // Scan the path prefix
1789 let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1790
1791 let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
1792 let mut bytes_read = 0usize;
1793
1794 if let Some(ref schema) = schema {
1795 // We have a table schema - use cached packed schema
1796 let packed_schema = self
1797 .db
1798 .packed_schemas
1799 .get(table_name)
1800 .map(|ps| ps.clone())
1801 .unwrap_or_else(|| Database::to_packed_schema(schema));
1802
1803 for (path, value_bytes) in results {
1804 // Parse path: table/row_id
1805 let parts: Vec<&str> = path.split('/').collect();
1806 if parts.len() == 2 {
1807 // This is a packed row
1808 bytes_read += value_bytes.len();
1809
1810 if let Ok(packed_row) =
1811 PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
1812 {
1813 // Unpack all columns or just requested columns
1814 let mut row = HashMap::new();
1815
1816 if let Some(ref cols) = self.columns {
1817 // Only extract requested columns
1818 for col_name in cols {
1819 if let Some(idx) = packed_schema.column_index(col_name)
1820 && let Some(col_def) = packed_schema.column(idx)
1821 && let Some(value) =
1822 packed_row.get_column(idx, col_def.col_type)
1823 {
1824 row.insert(col_name.clone(), value);
1825 }
1826 }
1827 } else {
1828 // Extract all columns
1829 row = packed_row.unpack(&packed_schema);
1830 }
1831
1832 if !row.is_empty() {
1833 rows.push(row);
1834 }
1835 }
1836 }
1837 }
1838 } else {
1839 // Fallback: no schema, try legacy column-per-key format
1840 let mut rows_map: HashMap<String, HashMap<String, SochValue>> = HashMap::new();
1841
1842 for (path, value_bytes) in results {
1843 let parts: Vec<&str> = path.split('/').collect();
1844 if parts.len() >= 3 {
1845 let row_key = format!("{}/{}", parts[0], parts[1]);
1846 let col_name = parts[2..].join("/");
1847
1848 if let Some(ref cols) = self.columns
1849 && !cols.contains(&col_name)
1850 {
1851 continue;
1852 }
1853
1854 bytes_read += value_bytes.len();
1855 let row = rows_map.entry(row_key).or_default();
1856 row.insert(col_name, deserialize_value(&value_bytes));
1857 }
1858 }
1859
1860 rows = rows_map.into_values().collect();
1861 }
1862
1863 // Apply offset
1864 if let Some(offset) = self.offset {
1865 rows = rows.into_iter().skip(offset).collect();
1866 }
1867
1868 // Apply limit
1869 if let Some(limit) = self.limit {
1870 rows.truncate(limit);
1871 }
1872
1873 // Collect column names
1874 let columns: Vec<String> = self.columns.unwrap_or_else(|| {
1875 rows.iter()
1876 .flat_map(|r| r.keys().cloned())
1877 .collect::<std::collections::HashSet<_>>()
1878 .into_iter()
1879 .collect()
1880 });
1881
1882 Ok(QueryResult {
1883 columns,
1884 rows_scanned: rows.len(),
1885 bytes_read,
1886 rows,
1887 })
1888 }
1889
1890 /// Execute and return TOON format (for LLM efficiency)
1891 pub fn to_toon(self) -> Result<String> {
1892 let result = self.execute()?;
1893 Ok(result.to_toon())
1894 }
1895
1896 /// Execute with lazy iteration - avoids materializing all rows
1897 ///
1898 /// Returns an iterator over rows as `Vec<SochValue>` in schema column order.
1899 /// This is more memory-efficient than `execute()` for large result sets.
1900 ///
1901 /// # Performance
1902 /// - No upfront materialization of all rows
1903 /// - ~40% less memory for large result sets
1904 /// - Ideal for streaming to network or aggregations
1905 ///
1906 /// # Example
1907 /// ```ignore
1908 /// for row_result in db.query(txn, "users").execute_iter()? {
1909 /// let row = row_result?;
1910 /// // row is Vec<SochValue> in column order
1911 /// }
1912 /// ```
1913 pub fn execute_iter(self) -> Result<QueryRowIterator> {
1914 self.db
1915 .stats
1916 .queries_executed
1917 .fetch_add(1, Ordering::Relaxed);
1918
1919 let table_name = self
1920 .path_prefix
1921 .split('/')
1922 .next()
1923 .unwrap_or(&self.path_prefix)
1924 .to_string();
1925
1926 // Get packed schema (clone needed for iterator ownership)
1927 let packed_schema = self.db.packed_schemas.get(&table_name).map(|ps| ps.clone());
1928
1929 // Scan the path prefix
1930 let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1931
1932 Ok(QueryRowIterator {
1933 results: results.into_iter(),
1934 packed_schema,
1935 columns: self.columns,
1936 offset: self.offset.unwrap_or(0),
1937 limit: self.limit,
1938 yielded: 0,
1939 skipped: 0,
1940 })
1941 }
1942
1943 /// Execute and return columnar (SIMD-friendly) result format
1944 ///
1945 /// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, returns
1946 /// column-oriented `Vec<TypedColumn>` for vectorized operations.
1947 ///
1948 /// ## Performance Benefits
1949 ///
1950 /// - SIMD: Aggregate operations (sum, avg) use vectorized instructions
1951 /// - Cache: Sequential access maximizes L1/L2 hits
1952 /// - Memory: ~30% less overhead than row-based format
1953 /// - Analytics: Ideal for ML preprocessing and statistics
1954 ///
1955 /// ## Example
1956 ///
1957 /// ```ignore
1958 /// let result = db.query(txn, "users")
1959 /// .columns(&["id", "score"])
1960 /// .as_columnar()?;
1961 ///
1962 /// // SIMD-optimized sum
1963 /// let total = result.sum_i64("score").unwrap_or(0);
1964 ///
1965 /// // Direct column access
1966 /// if let Some(scores) = result.column("score") {
1967 /// for i in 0..scores.len() {
1968 /// if let Some(v) = scores.get_i64(i) {
1969 /// println!("Score: {}", v);
1970 /// }
1971 /// }
1972 /// }
1973 /// ```
1974 pub fn as_columnar(self) -> Result<ColumnarQueryResult> {
1975 self.db
1976 .stats
1977 .queries_executed
1978 .fetch_add(1, Ordering::Relaxed);
1979
1980 let table_name = self
1981 .path_prefix
1982 .split('/')
1983 .next()
1984 .unwrap_or(&self.path_prefix);
1985 let schema = self.db.tables.get(table_name).map(|s| s.clone());
1986
1987 // Get packed schema
1988 let packed_schema = match self.db.packed_schemas.get(table_name) {
1989 Some(ps) => ps.clone(),
1990 None => return Ok(ColumnarQueryResult::empty()),
1991 };
1992
1993 // Determine columns to fetch
1994 let column_names: Vec<String> = self.columns.clone().unwrap_or_else(|| {
1995 schema
1996 .as_ref()
1997 .map(|s| s.columns.iter().map(|c| c.name.clone()).collect())
1998 .unwrap_or_default()
1999 });
2000
2001 if column_names.is_empty() {
2002 return Ok(ColumnarQueryResult::empty());
2003 }
2004
2005 // Initialize TypedColumns based on schema types
2006 let mut columns: Vec<CoreTypedColumn> = column_names
2007 .iter()
2008 .map(|col_name| {
2009 packed_schema
2010 .column_index(col_name)
2011 .and_then(|idx| packed_schema.column(idx))
2012 .map(|col_def| match col_def.col_type {
2013 PackedColumnType::Int64 => CoreTypedColumn::new_int64(),
2014 PackedColumnType::UInt64 => CoreTypedColumn::new_uint64(),
2015 PackedColumnType::Float64 => CoreTypedColumn::new_float64(),
2016 PackedColumnType::Text => CoreTypedColumn::new_text(),
2017 PackedColumnType::Binary => CoreTypedColumn::new_binary(),
2018 PackedColumnType::Bool => CoreTypedColumn::new_bool(),
2019 PackedColumnType::Null => CoreTypedColumn::new_text(), // Null column = fallback to text
2020 })
2021 .unwrap_or_else(CoreTypedColumn::new_text) // fallback
2022 })
2023 .collect();
2024
2025 // Scan the path prefix
2026 let results = self.db.scan_path(self.txn, &self.path_prefix)?;
2027
2028 let mut row_count = 0;
2029 let mut bytes_read = 0;
2030 let mut skipped = 0;
2031
2032 for (path, value_bytes) in results {
2033 // Parse path: table/row_id
2034 let parts: Vec<&str> = path.split('/').collect();
2035 if parts.len() != 2 {
2036 continue;
2037 }
2038
2039 // Apply offset
2040 if let Some(offset) = self.offset
2041 && skipped < offset
2042 {
2043 skipped += 1;
2044 continue;
2045 }
2046
2047 // Apply limit
2048 if let Some(limit) = self.limit
2049 && row_count >= limit
2050 {
2051 break;
2052 }
2053
2054 bytes_read += value_bytes.len();
2055
2056 if let Ok(packed_row) = PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
2057 {
2058 // Extract each column and push to corresponding TypedColumn
2059 for (col_idx, col_name) in column_names.iter().enumerate() {
2060 if let Some(schema_idx) = packed_schema.column_index(col_name) {
2061 if let Some(col_def) = packed_schema.column(schema_idx) {
2062 let value = packed_row.get_column(schema_idx, col_def.col_type);
2063 push_value_to_typed_column(&mut columns[col_idx], value);
2064 } else {
2065 push_null_to_typed_column(&mut columns[col_idx]);
2066 }
2067 } else {
2068 push_null_to_typed_column(&mut columns[col_idx]);
2069 }
2070 }
2071 row_count += 1;
2072 }
2073 }
2074
2075 Ok(ColumnarQueryResult {
2076 columns: column_names,
2077 data: columns,
2078 row_count,
2079 bytes_read,
2080 })
2081 }
2082}
2083
2084/// Lazy iterator over query results
2085///
2086/// Unpacks rows on-demand, avoiding upfront materialization.
2087pub struct QueryRowIterator {
2088 results: std::vec::IntoIter<(String, Vec<u8>)>,
2089 packed_schema: Option<PackedTableSchema>,
2090 columns: Option<Vec<String>>,
2091 offset: usize,
2092 limit: Option<usize>,
2093 yielded: usize,
2094 skipped: usize,
2095}
2096
2097impl Iterator for QueryRowIterator {
2098 type Item = Result<Vec<SochValue>>;
2099
2100 fn next(&mut self) -> Option<Self::Item> {
2101 // Check limit
2102 if let Some(limit) = self.limit
2103 && self.yielded >= limit
2104 {
2105 return None;
2106 }
2107
2108 loop {
2109 let (path, value_bytes) = self.results.next()?;
2110
2111 // Parse path: table/row_id
2112 let parts: Vec<&str> = path.split('/').collect();
2113 if parts.len() != 2 {
2114 continue; // Skip non-row entries
2115 }
2116
2117 // Apply offset
2118 if self.skipped < self.offset {
2119 self.skipped += 1;
2120 continue;
2121 }
2122
2123 if let Some(ref schema) = self.packed_schema {
2124 match PackedRow::from_bytes(value_bytes, schema.num_columns()) {
2125 Ok(packed_row) => {
2126 let row = if let Some(ref cols) = self.columns {
2127 // Project specific columns
2128 cols.iter()
2129 .map(|col_name| {
2130 schema
2131 .column_index(col_name)
2132 .and_then(|idx| schema.column(idx))
2133 .and_then(|col_def| {
2134 packed_row.get_column(
2135 schema.column_index(col_name).unwrap(),
2136 col_def.col_type,
2137 )
2138 })
2139 .unwrap_or(SochValue::Null)
2140 })
2141 .collect()
2142 } else {
2143 // All columns in order
2144 packed_row.unpack_to_vec(schema)
2145 };
2146
2147 self.yielded += 1;
2148 return Some(Ok(row));
2149 }
2150 Err(e) => return Some(Err(e)),
2151 }
2152 } else {
2153 // No schema - return raw bytes as binary
2154 self.yielded += 1;
2155 return Some(Ok(vec![SochValue::Binary(value_bytes)]));
2156 }
2157 }
2158 }
2159}
2160
2161// Helper functions for serialization (kept for backward compatibility with legacy data)
2162
2163#[allow(dead_code)]
2164fn serialize_value(value: &SochValue) -> Vec<u8> {
2165 // Simple serialization - in production use proper format
2166 match value {
2167 SochValue::Null => vec![0],
2168 SochValue::Int(i) => {
2169 let mut buf = vec![1];
2170 buf.extend_from_slice(&i.to_le_bytes());
2171 buf
2172 }
2173 SochValue::UInt(u) => {
2174 let mut buf = vec![2];
2175 buf.extend_from_slice(&u.to_le_bytes());
2176 buf
2177 }
2178 SochValue::Float(f) => {
2179 let mut buf = vec![3];
2180 buf.extend_from_slice(&f.to_le_bytes());
2181 buf
2182 }
2183 SochValue::Text(s) => {
2184 let mut buf = vec![4];
2185 buf.extend_from_slice(s.as_bytes());
2186 buf
2187 }
2188 SochValue::Bool(b) => vec![5, if *b { 1 } else { 0 }],
2189 SochValue::Binary(b) => {
2190 let mut buf = vec![6];
2191 buf.extend_from_slice(b);
2192 buf
2193 }
2194 _ => {
2195 // Fallback: serialize as text
2196 let s = format!("{:?}", value);
2197 let mut buf = vec![4];
2198 buf.extend_from_slice(s.as_bytes());
2199 buf
2200 }
2201 }
2202}
2203
2204fn deserialize_value(bytes: &[u8]) -> SochValue {
2205 if bytes.is_empty() {
2206 return SochValue::Null;
2207 }
2208
2209 match bytes[0] {
2210 0 => SochValue::Null,
2211 1 if bytes.len() >= 9 => {
2212 let i = i64::from_le_bytes(bytes[1..9].try_into().unwrap());
2213 SochValue::Int(i)
2214 }
2215 2 if bytes.len() >= 9 => {
2216 let u = u64::from_le_bytes(bytes[1..9].try_into().unwrap());
2217 SochValue::UInt(u)
2218 }
2219 3 if bytes.len() >= 9 => {
2220 let f = f64::from_le_bytes(bytes[1..9].try_into().unwrap());
2221 SochValue::Float(f)
2222 }
2223 4 => {
2224 let s = String::from_utf8_lossy(&bytes[1..]).to_string();
2225 SochValue::Text(s)
2226 }
2227 5 if bytes.len() >= 2 => SochValue::Bool(bytes[1] != 0),
2228 6 => SochValue::Binary(bytes[1..].to_vec()),
2229 _ => {
2230 // Treat as text
2231 let s = String::from_utf8_lossy(bytes).to_string();
2232 SochValue::Text(s)
2233 }
2234 }
2235}
2236
2237// ============================================================================
2238// Helper functions for columnar query result building
2239// ============================================================================
2240
2241/// Push a SochValue into a TypedColumn
2242fn push_value_to_typed_column(col: &mut CoreTypedColumn, value: Option<SochValue>) {
2243 match value {
2244 None => push_null_to_typed_column(col),
2245 Some(v) => match (col, v) {
2246 (
2247 CoreTypedColumn::Int64 {
2248 values,
2249 validity,
2250 stats,
2251 },
2252 SochValue::Int(i),
2253 ) => {
2254 values.push(i);
2255 validity.push(true);
2256 stats.update_i64(i);
2257 }
2258 (
2259 CoreTypedColumn::Int64 {
2260 values,
2261 validity,
2262 stats,
2263 },
2264 SochValue::UInt(u),
2265 ) => {
2266 values.push(u as i64);
2267 validity.push(true);
2268 stats.update_i64(u as i64);
2269 }
2270 (
2271 CoreTypedColumn::UInt64 {
2272 values,
2273 validity,
2274 stats,
2275 },
2276 SochValue::UInt(u),
2277 ) => {
2278 values.push(u);
2279 validity.push(true);
2280 stats.update_i64(u as i64);
2281 }
2282 (
2283 CoreTypedColumn::UInt64 {
2284 values,
2285 validity,
2286 stats,
2287 },
2288 SochValue::Int(i),
2289 ) => {
2290 values.push(i as u64);
2291 validity.push(true);
2292 stats.update_i64(i);
2293 }
2294 (
2295 CoreTypedColumn::Float64 {
2296 values,
2297 validity,
2298 stats,
2299 },
2300 SochValue::Float(f),
2301 ) => {
2302 values.push(f);
2303 validity.push(true);
2304 stats.update_f64(f);
2305 }
2306 (
2307 CoreTypedColumn::Float64 {
2308 values,
2309 validity,
2310 stats,
2311 },
2312 SochValue::Int(i),
2313 ) => {
2314 values.push(i as f64);
2315 validity.push(true);
2316 stats.update_f64(i as f64);
2317 }
2318 (
2319 CoreTypedColumn::Text {
2320 offsets,
2321 data,
2322 validity,
2323 stats,
2324 },
2325 SochValue::Text(s),
2326 ) => {
2327 data.extend_from_slice(s.as_bytes());
2328 offsets.push(data.len() as u32);
2329 validity.push(true);
2330 stats.row_count += 1;
2331 }
2332 (
2333 CoreTypedColumn::Binary {
2334 offsets,
2335 data,
2336 validity,
2337 stats,
2338 },
2339 SochValue::Binary(b),
2340 ) => {
2341 data.extend_from_slice(&b);
2342 offsets.push(data.len() as u32);
2343 validity.push(true);
2344 stats.row_count += 1;
2345 }
2346 (
2347 CoreTypedColumn::Bool {
2348 values,
2349 validity,
2350 stats,
2351 len,
2352 },
2353 SochValue::Bool(b),
2354 ) => {
2355 let idx = *len;
2356 *len += 1;
2357 let num_words = (*len).div_ceil(64);
2358 while values.len() < num_words {
2359 values.push(0);
2360 }
2361 if b {
2362 let word = idx / 64;
2363 let bit = idx % 64;
2364 values[word] |= 1 << bit;
2365 }
2366 validity.push(true);
2367 stats.row_count += 1;
2368 }
2369 // Type mismatch - push as null
2370 (col, _) => push_null_to_typed_column(col),
2371 },
2372 }
2373}
2374
2375/// Push a null value into a TypedColumn
2376fn push_null_to_typed_column(col: &mut CoreTypedColumn) {
2377 match col {
2378 CoreTypedColumn::Int64 {
2379 values,
2380 validity,
2381 stats,
2382 } => {
2383 values.push(0);
2384 validity.push(false);
2385 stats.update_null();
2386 }
2387 CoreTypedColumn::UInt64 {
2388 values,
2389 validity,
2390 stats,
2391 } => {
2392 values.push(0);
2393 validity.push(false);
2394 stats.update_null();
2395 }
2396 CoreTypedColumn::Float64 {
2397 values,
2398 validity,
2399 stats,
2400 } => {
2401 values.push(0.0);
2402 validity.push(false);
2403 stats.update_null();
2404 }
2405 CoreTypedColumn::Text {
2406 offsets,
2407 data: _,
2408 validity,
2409 stats,
2410 } => {
2411 offsets.push(offsets.last().copied().unwrap_or(0));
2412 validity.push(false);
2413 stats.update_null();
2414 }
2415 CoreTypedColumn::Binary {
2416 offsets,
2417 data: _,
2418 validity,
2419 stats,
2420 } => {
2421 offsets.push(offsets.last().copied().unwrap_or(0));
2422 validity.push(false);
2423 stats.update_null();
2424 }
2425 CoreTypedColumn::Bool {
2426 values,
2427 validity,
2428 stats,
2429 len,
2430 } => {
2431 *len += 1;
2432 let num_words = (*len).div_ceil(64);
2433 while values.len() < num_words {
2434 values.push(0);
2435 }
2436 validity.push(false);
2437 stats.update_null();
2438 }
2439 }
2440}
2441
2442#[cfg(test)]
2443mod tests {
2444 use super::*;
2445 use tempfile::tempdir;
2446
2447 #[test]
2448 fn test_database_open_close() {
2449 let dir = tempdir().unwrap();
2450 let db = Database::open(dir.path()).unwrap();
2451
2452 // Should be able to begin a transaction
2453 let txn = db.begin_transaction().unwrap();
2454 assert!(txn.txn_id > 0);
2455
2456 db.abort(txn).unwrap();
2457 db.shutdown().unwrap();
2458 }
2459
2460 #[test]
2461 fn test_database_put_get() {
2462 let dir = tempdir().unwrap();
2463 let db = Database::open(dir.path()).unwrap();
2464
2465 let txn = db.begin_transaction().unwrap();
2466 db.put(txn, b"key1", b"value1").unwrap();
2467
2468 let val = db.get(txn, b"key1").unwrap();
2469 assert_eq!(val, Some(b"value1".to_vec()));
2470
2471 db.commit(txn).unwrap();
2472
2473 // New transaction should see committed data
2474 let txn2 = db.begin_transaction().unwrap();
2475 let val = db.get(txn2, b"key1").unwrap();
2476 assert_eq!(val, Some(b"value1".to_vec()));
2477 db.abort(txn2).unwrap();
2478 }
2479
2480 #[test]
2481 fn test_database_path_api() {
2482 let dir = tempdir().unwrap();
2483 let db = Database::open(dir.path()).unwrap();
2484
2485 let txn = db.begin_transaction().unwrap();
2486
2487 // Write using path API
2488 db.put_path(txn, "users/1/name", b"Alice").unwrap();
2489 db.put_path(txn, "users/1/email", b"alice@example.com")
2490 .unwrap();
2491 db.put_path(txn, "users/2/name", b"Bob").unwrap();
2492
2493 db.commit(txn).unwrap();
2494
2495 // Scan path prefix
2496 let txn2 = db.begin_transaction().unwrap();
2497 let results = db.scan_path(txn2, "users/1/").unwrap();
2498 assert_eq!(results.len(), 2);
2499
2500 db.abort(txn2).unwrap();
2501 }
2502
2503 #[test]
2504 fn test_database_table_api() {
2505 let dir = tempdir().unwrap();
2506 let db = Database::open(dir.path()).unwrap();
2507
2508 // Register table
2509 db.register_table(TableSchema {
2510 name: "users".to_string(),
2511 columns: vec![
2512 ColumnDef {
2513 name: "name".to_string(),
2514 col_type: ColumnType::Text,
2515 nullable: false,
2516 },
2517 ColumnDef {
2518 name: "age".to_string(),
2519 col_type: ColumnType::Int64,
2520 nullable: true,
2521 },
2522 ],
2523 })
2524 .unwrap();
2525
2526 // Insert row
2527 let txn = db.begin_transaction().unwrap();
2528 let mut values = HashMap::new();
2529 values.insert("name".to_string(), SochValue::Text("Alice".to_string()));
2530 values.insert("age".to_string(), SochValue::Int(30));
2531
2532 db.insert_row(txn, "users", 1, &values).unwrap();
2533 db.commit(txn).unwrap();
2534
2535 // Read row
2536 let txn2 = db.begin_transaction().unwrap();
2537 let row = db.read_row(txn2, "users", 1, None).unwrap();
2538 assert!(row.is_some());
2539
2540 let row = row.unwrap();
2541 assert_eq!(row.get("name"), Some(&SochValue::Text("Alice".to_string())));
2542
2543 db.abort(txn2).unwrap();
2544 }
2545
2546 #[test]
2547 fn test_database_query_builder() {
2548 let dir = tempdir().unwrap();
2549 let db = Database::open(dir.path()).unwrap();
2550
2551 // Insert test data
2552 let txn = db.begin_transaction().unwrap();
2553 db.put_path(txn, "docs/1/title", b"Hello").unwrap();
2554 db.put_path(txn, "docs/1/content", b"World").unwrap();
2555 db.put_path(txn, "docs/2/title", b"Foo").unwrap();
2556 db.put_path(txn, "docs/2/content", b"Bar").unwrap();
2557 db.commit(txn).unwrap();
2558
2559 // Query with limit
2560 let txn2 = db.begin_transaction().unwrap();
2561 let result = db.query(txn2, "docs/").limit(1).execute().unwrap();
2562
2563 assert_eq!(result.rows.len(), 1);
2564 db.abort(txn2).unwrap();
2565 }
2566
2567 #[test]
2568 fn test_database_crash_recovery() {
2569 let dir = tempdir().unwrap();
2570
2571 // Write and commit
2572 {
2573 // Use open_without_lock for crash simulation tests
2574 let db = Database::open_without_lock(dir.path()).unwrap();
2575 // Set sync mode to FULL to ensure data is persisted before "crash"
2576 db.storage.set_sync_mode(2);
2577 let txn = db.begin_transaction().unwrap();
2578 db.put(txn, b"persist", b"this").unwrap();
2579 db.commit(txn).unwrap();
2580 // Don't call shutdown - simulate crash
2581 std::mem::forget(db);
2582 }
2583
2584 // Reopen - should recover
2585 {
2586 let db = Database::open_without_lock(dir.path()).unwrap();
2587 let txn = db.begin_transaction().unwrap();
2588 let val = db.get(txn, b"persist").unwrap();
2589 assert_eq!(val, Some(b"this".to_vec()));
2590 db.abort(txn).unwrap();
2591 }
2592 }
2593}