sochdb_storage/database.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SochDB Database Kernel
19//!
20//! The shared core that powers both embedded mode (`SochConnection::open`) and
21//! server mode (`sochdb-server`). This is the "SQLite engine" equivalent.
22//!
23//! ## Architecture
24//!
25//! ```text
26//! ┌──────────────────────────────────────────────────────────────────┐
27//! │ Database Kernel │
28//! │ Arc<Database> - shared by all connections │
29//! ├──────────────────────────────────────────────────────────────────┤
30//! │ │
31//! │ ┌─────────────────┐ ┌─────────────────┐ ┌────────────────┐ │
32//! │ │ DurableStorage │ │ Catalog │ │ Vector Index │ │
33//! │ │ (WAL + MVCC) │ │ (Schema Mgmt) │ │ (HNSW/Vamana) │ │
34//! │ └────────┬────────┘ └────────┬────────┘ └───────┬────────┘ │
35//! │ │ │ │ │
36//! │ └─────────────────────┴─────────────────────┘ │
37//! │ │ │
38//! │ ┌─────────────────────────────────────────────────────────────┐ │
39//! │ │ Query Executor (Path-Native) │ │
40//! │ │ - Path resolution: O(|path|) │ │
41//! │ │ - Column projection: 80% I/O reduction │ │
42//! │ │ - Context selection: Token-aware chunking │ │
43//! │ └─────────────────────────────────────────────────────────────┘ │
44//! │ │
45//! └──────────────────────────────────────────────────────────────────┘
46//!
47//! Deployment Modes:
48//! ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
49//! │ Embedded │ │ IPC Server │ │ TCP Server │
50//! │ (in-proc) │ │ (Unix sock)│ │ (remote) │
51//! └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
52//! │ │ │
53//! └─────────────────┴─────────────────┘
54//! │
55//! Arc<Database>
56//! ```
57//!
58//! ## Latency Model
59//!
60//! Let K = kernel processing cost for a query
61//!
62//! - Embedded: L_emb ≈ K (function call overhead negligible)
63//! - IPC: L_ipc ≈ K + δ_ipc (δ_ipc = ~10-50µs for Unix socket)
64//! - TCP: L_tcp ≈ K + δ_net (δ_net = 100µs-10ms depending on network)
65//!
66//! For LLM context queries where K >> δ_ipc, IPC is "nearly embedded".
67
68use std::collections::HashMap;
69use std::path::{Path, PathBuf};
70use std::sync::Arc;
71use std::sync::atomic::{AtomicU64, Ordering};
72
73use dashmap::DashMap;
74use parking_lot::RwLock;
75
76use crate::durable_storage::{DurableStorage, TransactionMode};
77use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
78use crate::key_buffer::KeyBuffer;
79use crate::packed_row::{PackedColumnDef, PackedColumnType, PackedRow, PackedTableSchema};
80use sochdb_core::catalog::Catalog;
81use sochdb_core::{Result, SochDBError, SochValue};
82
83// Re-export key types
84pub use crate::durable_storage::RecoveryStats;
85
86/// Database configuration
87#[derive(Debug, Clone)]
88pub struct DatabaseConfig {
89 /// Enable group commit for better write throughput
90 pub group_commit: bool,
91 /// Maximum memory for memtables before flush (bytes)
92 pub memtable_size_limit: usize,
93 /// Enable WAL for durability
94 pub wal_enabled: bool,
95 /// Sync mode: fsync after every commit vs periodic
96 pub sync_mode: SyncMode,
97 /// Read-only mode
98 pub read_only: bool,
99
100 /// Enable ordered index for O(log N) prefix scans
101 ///
102 /// # Deprecation Notice
103 ///
104 /// **DEPRECATED since 0.2.0**: Use `default_index_policy` instead for per-table control.
105 /// This field will be removed in v0.3.0.
106 ///
107 /// ## Migration Guide
108 ///
109 /// Replace:
110 /// ```ignore
111 /// DatabaseConfig { enable_ordered_index: true, .. } // Old API
112 /// DatabaseConfig { enable_ordered_index: false, .. } // Old API
113 /// ```
114 ///
115 /// With:
116 /// ```ignore
117 /// DatabaseConfig { default_index_policy: IndexPolicy::ScanOptimized, .. } // Ordered index enabled
118 /// DatabaseConfig { default_index_policy: IndexPolicy::WriteOptimized, .. } // Ordered index disabled
119 /// ```
120 ///
121 /// ## Behavior
122 ///
123 /// When false, saves ~134 ns/op on writes (20% speedup)
124 /// but scan_prefix becomes O(N) instead of O(log N + K).
125 ///
126 /// Set to false for write-heavy workloads without range scans.
127 #[deprecated(
128 since = "0.2.0",
129 note = "Use `default_index_policy` field instead. This field will be removed in v0.3.0. \
130 Set IndexPolicy::ScanOptimized for ordered index, WriteOptimized to disable."
131 )]
132 ///
133 /// Set to false for write-heavy workloads without range scans.
134 pub enable_ordered_index: bool,
135 /// Group commit configuration
136 pub group_commit_config: GroupCommitSettings,
137 /// Default index policy for tables not explicitly configured
138 ///
139 /// This replaces the global `enable_ordered_index` toggle with
140 /// fine-grained per-table control. Use `index_registry` to configure
141 /// individual tables.
142 ///
143 /// | Policy | Insert Cost | Scan Cost | Use Case |
144 /// |----------------|-------------|----------------|------------------------|
145 /// | WriteOptimized | O(1) | O(N) | High-write, rare scan |
146 /// | Balanced | O(1) amort | O(output+logK) | Mixed OLTP |
147 /// | ScanOptimized | O(log N) | O(logN + K) | Analytics, range query |
148 /// | AppendOnly | O(1) | O(N) | Time-series logs |
149 pub default_index_policy: IndexPolicy,
150}
151
152/// Group commit settings - mirrors SQLite's WAL mode tuning
153///
154/// ## Performance Model
155///
156/// Without group commit: Throughput = 1 / L_fsync ≈ 200 commits/sec (L=5ms)
157/// With group commit (batch size K): Throughput = K / L_fsync = K × 200 commits/sec
158///
159/// For K=100: 20,000 commits/sec (100× speedup)
160///
161/// ## SQLite Comparison
162///
163/// | Setting | SQLite Equivalent |
164/// |----------------------------|-----------------------------|
165/// | batch_size = 1 | PRAGMA synchronous = FULL |
166/// | batch_size = 100 | WAL mode with batching |
167/// | max_wait_us = 0 | No delay, immediate flush |
168/// | max_wait_us = 10000 | Up to 10ms delay for batch |
169#[derive(Debug, Clone)]
170pub struct GroupCommitSettings {
171 /// Minimum batch size before flush (default: 1)
172 pub min_batch_size: usize,
173 /// Maximum batch size (default: 1000)
174 pub max_batch_size: usize,
175 /// Maximum wait time before flush in microseconds (default: 10000 = 10ms)
176 pub max_wait_us: u64,
177 /// Expected fsync latency in microseconds (for adaptive sizing)
178 pub fsync_latency_us: u64,
179}
180
181impl Default for GroupCommitSettings {
182 fn default() -> Self {
183 Self {
184 min_batch_size: 1,
185 max_batch_size: 1000,
186 max_wait_us: 10_000, // 10ms
187 fsync_latency_us: 5_000, // 5ms
188 }
189 }
190}
191
192impl GroupCommitSettings {
193 /// High throughput preset - maximizes batching
194 pub fn high_throughput() -> Self {
195 Self {
196 min_batch_size: 50,
197 max_batch_size: 5000,
198 max_wait_us: 50_000, // 50ms
199 fsync_latency_us: 5_000,
200 }
201 }
202
203 /// Low latency preset - minimal batching
204 pub fn low_latency() -> Self {
205 Self {
206 min_batch_size: 1,
207 max_batch_size: 10,
208 max_wait_us: 1_000, // 1ms
209 fsync_latency_us: 5_000,
210 }
211 }
212
213 /// Calculate optimal batch size using Little's Law
214 ///
215 /// N* = sqrt(2 × L_fsync × λ / C_wait)
216 ///
217 /// # Arguments
218 /// * `arrival_rate` - Operations per second
219 /// * `wait_cost` - Cost coefficient for waiting (0.0-1.0)
220 pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
221 let l_fsync = self.fsync_latency_us as f64 / 1_000_000.0;
222 let n_star = (2.0 * l_fsync * arrival_rate / wait_cost.max(0.001)).sqrt();
223 (n_star as usize).clamp(self.min_batch_size, self.max_batch_size)
224 }
225}
226
227impl Default for DatabaseConfig {
228 #[allow(deprecated)]
229 fn default() -> Self {
230 Self {
231 group_commit: true,
232 memtable_size_limit: 64 * 1024 * 1024, // 64MB
233 wal_enabled: true,
234 sync_mode: SyncMode::Normal,
235 read_only: false,
236 enable_ordered_index: true, // Default: enabled for compatibility
237 group_commit_config: GroupCommitSettings::default(),
238 default_index_policy: IndexPolicy::Balanced, // New default: balanced OLTP policy
239 }
240 }
241}
242
243impl DatabaseConfig {
244 /// Create config optimized for throughput (Fast Mode)
245 ///
246 /// - Disables ordered index (saves ~134 ns/op)
247 /// - Uses high-throughput group commit settings
248 /// - Suitable for append-only workloads
249 #[allow(deprecated)]
250 pub fn throughput_optimized() -> Self {
251 Self {
252 group_commit: true,
253 memtable_size_limit: 128 * 1024 * 1024, // 128MB
254 wal_enabled: true,
255 sync_mode: SyncMode::Normal,
256 read_only: false,
257 enable_ordered_index: false,
258 group_commit_config: GroupCommitSettings::high_throughput(),
259 default_index_policy: IndexPolicy::WriteOptimized, // No ordered index overhead
260 }
261 }
262
263 /// Create config optimized for latency
264 ///
265 /// - Keeps ordered index for fast range scans
266 /// - Uses low-latency group commit settings
267 /// - Suitable for OLTP workloads
268 #[allow(deprecated)]
269 pub fn latency_optimized() -> Self {
270 Self {
271 group_commit: true,
272 memtable_size_limit: 32 * 1024 * 1024, // 32MB
273 wal_enabled: true,
274 sync_mode: SyncMode::Full,
275 read_only: false,
276 enable_ordered_index: true,
277 group_commit_config: GroupCommitSettings::low_latency(),
278 default_index_policy: IndexPolicy::ScanOptimized, // Fast range scans
279 }
280 }
281
282 /// Create config matching SQLite defaults
283 #[allow(deprecated)]
284 pub fn sqlite_compatible() -> Self {
285 Self {
286 group_commit: false, // SQLite default is single-commit
287 memtable_size_limit: 64 * 1024 * 1024,
288 wal_enabled: true,
289 sync_mode: SyncMode::Normal, // PRAGMA synchronous = NORMAL
290 read_only: false,
291 enable_ordered_index: true,
292 group_commit_config: GroupCommitSettings::default(),
293 default_index_policy: IndexPolicy::Balanced, // Good default for mixed workloads
294 }
295 }
296
297 /// Get effective ordered index setting, derived from `default_index_policy`.
298 ///
299 /// This is the shim method for the deprecated `enable_ordered_index` field.
300 /// It returns `true` if the policy requires an ordered index (ScanOptimized),
301 /// and `false` otherwise (WriteOptimized, Balanced, AppendOnly).
302 ///
303 /// # Policy Mapping
304 ///
305 /// | IndexPolicy | Returns |
306 /// |------------------|---------|
307 /// | ScanOptimized | true |
308 /// | Balanced | false |
309 /// | WriteOptimized | false |
310 /// | AppendOnly | false |
311 ///
312 /// Note: `Balanced` uses lazy compaction rather than a live ordered index,
313 /// so it returns `false` for the low-level memtable config but still supports
314 /// efficient range scans via sorted runs.
315 pub fn effective_ordered_index(&self) -> bool {
316 matches!(self.default_index_policy, IndexPolicy::ScanOptimized)
317 }
318}
319
320/// WAL sync mode - matches SQLite's PRAGMA synchronous semantics
321///
322/// | SochDB | SQLite | Description |
323/// |------------|--------------|------------------------------------------------|
324/// | Off | OFF (0) | No fsync, risk of data loss on crash |
325/// | Normal | NORMAL (1) | Fsync at checkpoints, not every commit |
326/// | Full | FULL (2) | Fsync every commit (safest, slowest) |
327///
328/// # Performance vs Durability Trade-offs
329///
330/// - **Off**: ~10x faster than Full, but may lose last ~100ms of data on crash
331/// - **Normal**: ~5x faster than Full, durable at checkpoint boundaries
332/// - **Full**: Every commit is fsync'd, no data loss possible
333///
334/// # SQLite Compatibility
335///
336/// ```sql
337/// -- SQLite equivalent settings
338/// PRAGMA synchronous = OFF; -- SyncMode::Off
339/// PRAGMA synchronous = NORMAL; -- SyncMode::Normal
340/// PRAGMA synchronous = FULL; -- SyncMode::Full
341/// ```
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum SyncMode {
344 /// No fsync (equivalent to SQLite PRAGMA synchronous = OFF)
345 ///
346 /// Writes buffered in OS, may lose data on power failure.
347 /// Use for non-critical data or bulk loading.
348 Off = 0,
349
350 /// Fsync at checkpoints (equivalent to SQLite PRAGMA synchronous = NORMAL)
351 ///
352 /// Default mode. Syncs WAL at checkpoint boundaries.
353 /// Good balance of performance and durability.
354 Normal = 1,
355
356 /// Fsync every commit (equivalent to SQLite PRAGMA synchronous = FULL)
357 ///
358 /// Safest mode. Every commit is immediately durable.
359 /// Required for financial/critical data.
360 Full = 2,
361}
362
363impl SyncMode {
364 /// Convert from SQLite synchronous pragma value
365 pub fn from_sqlite_pragma(value: u32) -> Self {
366 match value {
367 0 => SyncMode::Off,
368 1 => SyncMode::Normal,
369 _ => SyncMode::Full, // 2+ treated as Full
370 }
371 }
372
373 /// Convert to SQLite synchronous pragma value
374 pub fn to_sqlite_pragma(self) -> u32 {
375 self as u32
376 }
377
378 /// Parse from string (case-insensitive)
379 pub fn parse(s: &str) -> Option<Self> {
380 match s.to_ascii_uppercase().as_str() {
381 "OFF" | "0" => Some(SyncMode::Off),
382 "NORMAL" | "1" => Some(SyncMode::Normal),
383 "FULL" | "2" => Some(SyncMode::Full),
384 _ => None,
385 }
386 }
387}
388
389/// Table schema for the kernel
390#[derive(Debug, Clone)]
391pub struct TableSchema {
392 pub name: String,
393 pub columns: Vec<ColumnDef>,
394}
395
396/// Column definition
397#[derive(Debug, Clone)]
398pub struct ColumnDef {
399 pub name: String,
400 pub col_type: ColumnType,
401 pub nullable: bool,
402}
403
404/// Column types
405#[derive(Debug, Clone, Copy, PartialEq, Eq)]
406pub enum ColumnType {
407 Int64,
408 UInt64,
409 Float64,
410 Text,
411 Binary,
412 Bool,
413}
414
415/// Transaction handle for kernel operations
416#[derive(Debug, Clone, Copy)]
417pub struct TxnHandle {
418 pub txn_id: u64,
419 pub snapshot_ts: u64,
420}
421
422/// Query result from the kernel
423#[derive(Debug, Clone)]
424pub struct QueryResult {
425 /// Column names
426 pub columns: Vec<String>,
427 /// Row data (each row is a map of column -> value)
428 pub rows: Vec<HashMap<String, SochValue>>,
429 /// Number of rows scanned (for stats)
430 pub rows_scanned: usize,
431 /// Bytes read from storage
432 pub bytes_read: usize,
433}
434
435impl QueryResult {
436 /// Empty result
437 pub fn empty() -> Self {
438 Self {
439 columns: vec![],
440 rows: vec![],
441 rows_scanned: 0,
442 bytes_read: 0,
443 }
444 }
445
446 /// Convert to TOON format for token efficiency
447 pub fn to_toon(&self) -> String {
448 if self.rows.is_empty() {
449 return "[]".to_string();
450 }
451
452 // TOON format: table[N]{cols}: row1; row2; ...
453 let n = self.rows.len();
454 let cols = self.columns.join(",");
455
456 let rows_str: Vec<String> = self
457 .rows
458 .iter()
459 .map(|row| {
460 self.columns
461 .iter()
462 .map(|c| {
463 row.get(c)
464 .map(format_soch_value)
465 .unwrap_or_else(|| "∅".to_string())
466 })
467 .collect::<Vec<_>>()
468 .join(",")
469 })
470 .collect();
471
472 format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
473 }
474}
475
476fn format_soch_value(v: &SochValue) -> String {
477 match v {
478 SochValue::Null => "∅".to_string(),
479 SochValue::Int(i) => i.to_string(),
480 SochValue::UInt(u) => u.to_string(),
481 SochValue::Float(f) => format!("{:.6}", f),
482 SochValue::Text(s) => {
483 if s.contains(',') || s.contains(';') {
484 format!("\"{}\"", s.replace('"', "\\\""))
485 } else {
486 s.clone()
487 }
488 }
489 SochValue::Bool(b) => if *b { "T" } else { "F" }.to_string(),
490 SochValue::Binary(b) => format!("b64:{}", base64_encode(b)),
491 _ => format!("{:?}", v),
492 }
493}
494
495fn base64_encode(data: &[u8]) -> String {
496 // Simple base64 encoding
497 const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
498 let mut result = String::new();
499
500 for chunk in data.chunks(3) {
501 let b0 = chunk[0] as usize;
502 let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
503 let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
504
505 result.push(ALPHABET[b0 >> 2] as char);
506 result.push(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
507
508 if chunk.len() > 1 {
509 result.push(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
510 } else {
511 result.push('=');
512 }
513
514 if chunk.len() > 2 {
515 result.push(ALPHABET[b2 & 0x3f] as char);
516 } else {
517 result.push('=');
518 }
519 }
520
521 result
522}
523
524// ============================================================================
525// Columnar Query Results - SIMD-friendly result format
526// ============================================================================
527
528use sochdb_core::TypedColumn as CoreTypedColumn;
529
530/// Columnar query result - SIMD-friendly format for analytics
531///
532/// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, this returns
533/// column-oriented `Vec<TypedColumn>` for efficient vectorized operations.
534///
535/// ## Memory Layout
536///
537/// Row-oriented (standard):
538/// ```text
539/// Row 0: [id=1, name="Alice", score=85]
540/// Row 1: [id=2, name="Bob", score=92]
541/// Row 2: [id=3, name="Carol", score=78]
542/// ```
543///
544/// Column-oriented (this format):
545/// ```text
546/// id: [1, 2, 3] ← contiguous i64 array (SIMD-friendly)
547/// name: ["Alice", "Bob", "Carol"] ← Arrow-style string encoding
548/// score: [85, 92, 78] ← contiguous i64 array
549/// ```
550///
551/// ## Performance Benefits
552///
553/// - SIMD: Column sums use vectorized instructions (~8× faster)
554/// - Cache: Sequential access pattern maximizes L1/L2 cache hits
555/// - Compression: Same-type data compresses better (5-10× typical)
556/// - Filtering: Bitmap operations instead of row iteration
557///
558/// ## Usage
559///
560/// ```ignore
561/// let result = db.query(txn, "users")
562/// .columns(&["id", "score"])
563/// .as_columnar()?;
564///
565/// // SIMD sum
566/// let total_score = result.column("score")
567/// .map(|c| c.sum_i64())
568/// .unwrap_or(0);
569///
570/// // Stats
571/// println!("Rows: {}, Memory: {} bytes", result.row_count(), result.memory_size());
572/// ```
573#[derive(Debug, Clone)]
574pub struct ColumnarQueryResult {
575 /// Column names in order
576 pub columns: Vec<String>,
577 /// Column data - each TypedColumn contains all values for one column
578 pub data: Vec<CoreTypedColumn>,
579 /// Number of rows
580 pub row_count: usize,
581 /// Bytes read from storage
582 pub bytes_read: usize,
583}
584
585impl ColumnarQueryResult {
586 /// Create an empty result
587 pub fn empty() -> Self {
588 Self {
589 columns: vec![],
590 data: vec![],
591 row_count: 0,
592 bytes_read: 0,
593 }
594 }
595
596 /// Get column by name
597 pub fn column(&self, name: &str) -> Option<&CoreTypedColumn> {
598 self.columns
599 .iter()
600 .position(|c| c == name)
601 .and_then(|idx| self.data.get(idx))
602 }
603
604 /// Get column index by name
605 pub fn column_index(&self, name: &str) -> Option<usize> {
606 self.columns.iter().position(|c| c == name)
607 }
608
609 /// Number of rows
610 pub fn row_count(&self) -> usize {
611 self.row_count
612 }
613
614 /// Number of columns
615 pub fn column_count(&self) -> usize {
616 self.columns.len()
617 }
618
619 /// Total memory size in bytes
620 pub fn memory_size(&self) -> usize {
621 self.data.iter().map(|c| c.memory_size()).sum()
622 }
623
624 /// Sum of an i64 column (SIMD-optimized)
625 pub fn sum_i64(&self, column: &str) -> Option<i64> {
626 self.column(column).map(|c| c.sum_i64())
627 }
628
629 /// Sum of an f64 column (SIMD-optimized)
630 pub fn sum_f64(&self, column: &str) -> Option<f64> {
631 self.column(column).map(|c| c.sum_f64())
632 }
633
634 /// Zero-allocation row access by index.
635 ///
636 /// Returns a lightweight view that resolves column values on demand
637 /// from the underlying columnar arrays — no `HashMap` per row.
638 ///
639 /// ```ignore
640 /// let result = query.as_columnar()?;
641 /// for i in 0..result.row_count() {
642 /// let row = result.row_view(i).unwrap();
643 /// let name = row.get("name"); // SochValue::Text(...)
644 /// }
645 /// ```
646 #[inline]
647 pub fn row_view(&self, index: usize) -> Option<ColumnarRowView<'_>> {
648 if index < self.row_count {
649 Some(ColumnarRowView {
650 result: self,
651 index,
652 })
653 } else {
654 None
655 }
656 }
657
658 /// Convert to row-oriented `QueryResult` for backward compatibility.
659 ///
660 /// This materialises one `HashMap<String, SochValue>` per row, so prefer
661 /// using `row_view()` or direct columnar access when performance matters.
662 pub fn into_query_result(self) -> QueryResult {
663 let rows: Vec<HashMap<String, SochValue>> = (0..self.row_count)
664 .map(|i| {
665 self.columns
666 .iter()
667 .zip(self.data.iter())
668 .map(|(name, col)| (name.clone(), col.value_at(i)))
669 .collect()
670 })
671 .collect();
672
673 QueryResult {
674 columns: self.columns,
675 rows,
676 rows_scanned: self.row_count,
677 bytes_read: self.bytes_read,
678 }
679 }
680
681 /// Get column statistics (min, max, null count)
682 pub fn column_stats(&self, column: &str) -> Option<&sochdb_core::columnar::ColumnStats> {
683 self.column(column).map(|c| c.stats())
684 }
685
686 /// Convert to TOON format (token-efficient)
687 pub fn to_toon(&self) -> String {
688 if self.row_count == 0 {
689 return "[]".to_string();
690 }
691
692 let n = self.row_count;
693 let cols = self.columns.join(",");
694
695 // Build rows from columns
696 let mut rows_str = Vec::with_capacity(n);
697 for i in 0..n {
698 let row: Vec<String> = self
699 .data
700 .iter()
701 .map(|col| format_columnar_value(col, i))
702 .collect();
703 rows_str.push(row.join(","));
704 }
705
706 format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
707 }
708}
709
710/// Format a single value from a TypedColumn at index
711fn format_columnar_value(col: &CoreTypedColumn, idx: usize) -> String {
712 match col {
713 CoreTypedColumn::Int64 {
714 values, validity, ..
715 } => {
716 if validity.is_valid(idx) && idx < values.len() {
717 values[idx].to_string()
718 } else {
719 "∅".to_string()
720 }
721 }
722 CoreTypedColumn::UInt64 {
723 values, validity, ..
724 } => {
725 if validity.is_valid(idx) && idx < values.len() {
726 values[idx].to_string()
727 } else {
728 "∅".to_string()
729 }
730 }
731 CoreTypedColumn::Float64 {
732 values, validity, ..
733 } => {
734 if validity.is_valid(idx) && idx < values.len() {
735 format!("{:.6}", values[idx])
736 } else {
737 "∅".to_string()
738 }
739 }
740 CoreTypedColumn::Text {
741 offsets,
742 data,
743 validity,
744 ..
745 } => {
746 if validity.is_valid(idx) && idx + 1 < offsets.len() {
747 let start = offsets[idx] as usize;
748 let end = offsets[idx + 1] as usize;
749 std::str::from_utf8(&data[start..end])
750 .map(|s| {
751 if s.contains(',') || s.contains(';') {
752 format!("\"{}\"", s.replace('"', "\\\""))
753 } else {
754 s.to_string()
755 }
756 })
757 .unwrap_or_else(|_| "∅".to_string())
758 } else {
759 "∅".to_string()
760 }
761 }
762 CoreTypedColumn::Binary {
763 offsets,
764 data,
765 validity,
766 ..
767 } => {
768 if validity.is_valid(idx) && idx + 1 < offsets.len() {
769 let start = offsets[idx] as usize;
770 let end = offsets[idx + 1] as usize;
771 format!("b64:{}", base64_encode(&data[start..end]))
772 } else {
773 "∅".to_string()
774 }
775 }
776 CoreTypedColumn::Bool {
777 values,
778 validity,
779 len,
780 ..
781 } => {
782 if validity.is_valid(idx) && idx < *len {
783 let word = idx / 64;
784 let bit = idx % 64;
785 if (values[word] >> bit) & 1 == 1 {
786 "T"
787 } else {
788 "F"
789 }
790 .to_string()
791 } else {
792 "∅".to_string()
793 }
794 }
795 }
796}
797
798/// Zero-allocation row view into a `ColumnarQueryResult`.
799///
800/// Provides named-column access (like `HashMap<String, SochValue>`)
801/// without allocating a HashMap per row. Values are read directly
802/// from the underlying typed column arrays.
803///
804/// **Cost per access:** O(1) column index lookup + O(1) array read.
805/// **Allocation:** zero (borrows from `ColumnarQueryResult`).
806#[derive(Debug)]
807pub struct ColumnarRowView<'a> {
808 result: &'a ColumnarQueryResult,
809 index: usize,
810}
811
812impl<'a> ColumnarRowView<'a> {
813 /// Get a value by column name without allocation.
814 ///
815 /// Returns `None` if the column does not exist.
816 /// Returns `Some(SochValue::Null)` if the column exists but the value is NULL.
817 #[inline]
818 pub fn get(&self, column: &str) -> Option<SochValue> {
819 self.result
820 .column_index(column)
821 .map(|ci| self.result.data[ci].value_at(self.index))
822 }
823
824 /// Get all column values as a `Vec<SochValue>` (positional, no HashMap).
825 pub fn values(&self) -> Vec<SochValue> {
826 self.result
827 .data
828 .iter()
829 .map(|col| col.value_at(self.index))
830 .collect()
831 }
832
833 /// Row index within the result set.
834 #[inline]
835 pub fn index(&self) -> usize {
836 self.index
837 }
838
839 /// Materialise this row into a `HashMap<String, SochValue>` for backward
840 /// compatibility. Prefer `get()` for single column access.
841 pub fn to_map(&self) -> HashMap<String, SochValue> {
842 self.result
843 .columns
844 .iter()
845 .zip(self.result.data.iter())
846 .map(|(name, col)| (name.clone(), col.value_at(self.index)))
847 .collect()
848 }
849}
850
851/// Vector search result
852#[derive(Debug, Clone)]
853pub struct VectorSearchResult {
854 pub id: u64,
855 pub distance: f32,
856 pub metadata: Option<HashMap<String, SochValue>>,
857}
858
859/// The SochDB Database Kernel
860///
861/// This is the shared core used by both embedded (`SochConnection`) and
862/// server (`sochdb-server`) modes. It owns all storage, catalog, and
863/// indexing components.
864///
865/// # Thread Safety
866///
867/// The Database is fully thread-safe via internal synchronization:
868/// - Multiple readers can operate concurrently (MVCC snapshots)
869/// - Writers coordinate through WAL and group commit
870/// - All state is behind Arc/RwLock for shared access
871///
872/// # Concurrency Modes
873///
874/// ## Standard Mode (Single Process)
875/// - Uses exclusive file lock (`flock(LOCK_EX)`)
876/// - Best for: Scripts, notebooks, CLI tools
877/// - Open with: `Database::open(path)`
878///
879/// ## Concurrent Mode (Multi-Process/Web Apps)
880/// - Uses lock-free MVCC for reads, single-writer coordination for writes
881/// - Best for: Web servers, Flask/FastAPI apps, hot reloading
882/// - Open with: `Database::open_concurrent(path)`
883///
884/// # Example
885///
886/// ```ignore
887/// // Standard mode (single process)
888/// let db = Database::open("./my_data")?;
889///
890/// // Concurrent mode (multi-reader, single-writer)
891/// let db = Database::open_concurrent("./my_data")?;
892///
893/// // Begin a transaction
894/// let txn = db.begin_transaction()?;
895///
896/// // Write data
897/// db.put(txn, b"user:1:name", b"Alice")?;
898///
899/// // Commit
900/// db.commit(txn)?;
901/// ```
902#[allow(dead_code)]
903pub struct Database {
904 /// Path to database directory
905 path: PathBuf,
906 /// Durable storage layer (WAL + MVCC + memtable)
907 storage: Arc<DurableStorage>,
908 /// Concurrent MVCC manager (for concurrent mode)
909 concurrent_mvcc: Option<Arc<crate::mvcc_concurrent::ConcurrentMvcc>>,
910 /// Schema catalog
911 catalog: Arc<RwLock<Catalog>>,
912 /// Registered table schemas (name -> schema) - lock-free for reads
913 tables: DashMap<String, TableSchema>,
914 /// Cached packed schemas for fast insert (name -> packed schema)
915 packed_schemas: DashMap<String, PackedTableSchema>,
916 /// Per-table index policy registry
917 index_registry: Arc<TableIndexRegistry>,
918 /// Configuration
919 config: DatabaseConfig,
920 /// Statistics
921 stats: DatabaseStats,
922 /// Shutdown flag
923 shutdown: AtomicU64,
924 /// Whether this database is in concurrent mode
925 is_concurrent: bool,
926 /// CDC (Change Data Capture) log for streaming mutations
927 cdc_log: Option<Arc<crate::cdc::CdcLog>>,
928}
929
930/// Database statistics
931struct DatabaseStats {
932 transactions_started: AtomicU64,
933 transactions_committed: AtomicU64,
934 transactions_aborted: AtomicU64,
935 queries_executed: AtomicU64,
936 bytes_written: AtomicU64,
937 bytes_read: AtomicU64,
938}
939
940impl DatabaseStats {
941 fn new() -> Self {
942 Self {
943 transactions_started: AtomicU64::new(0),
944 transactions_committed: AtomicU64::new(0),
945 transactions_aborted: AtomicU64::new(0),
946 queries_executed: AtomicU64::new(0),
947 bytes_written: AtomicU64::new(0),
948 bytes_read: AtomicU64::new(0),
949 }
950 }
951}
952
953/// Public statistics snapshot
954#[derive(Debug, Clone)]
955pub struct Stats {
956 pub transactions_started: u64,
957 pub transactions_committed: u64,
958 pub transactions_aborted: u64,
959 pub queries_executed: u64,
960 pub bytes_written: u64,
961 pub bytes_read: u64,
962}
963
964impl Database {
965 /// Open or create a database at the given path.
966 ///
967 /// This is the primary entry point, similar to `sqlite3_open()`.
968 /// If the database exists, it will be opened and WAL recovery performed.
969 /// If it doesn't exist, a new database will be created.
970 ///
971 /// # Arguments
972 ///
973 /// * `path` - Directory path for the database files
974 ///
975 /// # Returns
976 ///
977 /// An `Arc<Database>` that can be shared across threads and connections.
978 pub fn open<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
979 Self::open_with_config(path, DatabaseConfig::default())
980 }
981
982 /// Open without locking (for testing crash recovery scenarios)
983 ///
984 /// # Safety
985 /// This should ONLY be used in tests that simulate crashes by forgetting
986 /// the storage instance. In production, always use `open()`.
987 #[cfg(test)]
988 pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
989 let path = path.as_ref().to_path_buf();
990 let config = DatabaseConfig::default();
991
992 let storage = Arc::new(DurableStorage::open_without_lock(&path)?);
993
994 let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
995 config.default_index_policy,
996 ));
997
998 let db = Arc::new(Self {
999 path: path.clone(),
1000 storage,
1001 concurrent_mvcc: None,
1002 catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1003 tables: DashMap::new(),
1004 packed_schemas: DashMap::new(),
1005 index_registry,
1006 config,
1007 stats: DatabaseStats::new(),
1008 shutdown: AtomicU64::new(0),
1009 is_concurrent: false,
1010 cdc_log: None,
1011 });
1012
1013 db.recover()?;
1014 Ok(db)
1015 }
1016
1017 /// Open with custom configuration
1018 pub fn open_with_config<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> Result<Arc<Self>> {
1019 let path = path.as_ref().to_path_buf();
1020
1021 // Use IndexPolicy-based storage configuration for automatic memtable selection
1022 // This derives ordered index and memtable type from the policy
1023 let storage = Arc::new(DurableStorage::open_with_policy(
1024 &path,
1025 config.default_index_policy,
1026 config.group_commit,
1027 )?);
1028
1029 // Propagate sync_mode from config to storage engine.
1030 // Without this, DurableStorage defaults to SyncMode::Normal (adaptive fsync).
1031 storage.set_sync_mode(config.sync_mode as u64);
1032
1033 // Create index registry with default policy from config
1034 let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
1035 config.default_index_policy,
1036 ));
1037
1038 let db = Arc::new(Self {
1039 path: path.clone(),
1040 storage,
1041 concurrent_mvcc: None,
1042 catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1043 tables: DashMap::new(),
1044 packed_schemas: DashMap::new(),
1045 index_registry,
1046 config,
1047 stats: DatabaseStats::new(),
1048 shutdown: AtomicU64::new(0),
1049 is_concurrent: false,
1050 cdc_log: None,
1051 });
1052
1053 // Perform crash recovery if needed
1054 db.recover()?;
1055
1056 Ok(db)
1057 }
1058
1059 /// Open database in concurrent mode (multi-reader, single-writer)
1060 ///
1061 /// This mode allows multiple processes to access the database simultaneously:
1062 /// - **Readers**: Lock-free, concurrent access via MVCC snapshots
1063 /// - **Writers**: Single-writer coordination through atomic locks
1064 ///
1065 /// # Use Cases
1066 ///
1067 /// - Web applications (Flask, FastAPI, Django)
1068 /// - Hot reloading development servers
1069 /// - Multi-process worker pools
1070 /// - Any scenario with concurrent read access
1071 ///
1072 /// # Performance
1073 ///
1074 /// - Read latency: ~100ns (lock-free atomic operations)
1075 /// - Write latency: ~60μs amortized (with group commit)
1076 /// - Concurrent readers: Up to 1024 (configurable)
1077 ///
1078 /// # Example
1079 ///
1080 /// ```ignore
1081 /// // Multiple processes can open the same database
1082 /// let db = Database::open_concurrent("./my_data")?;
1083 ///
1084 /// // Reads are lock-free
1085 /// let value = db.get(b"key")?;
1086 ///
1087 /// // Writes coordinate automatically
1088 /// let txn = db.begin_transaction()?;
1089 /// db.put(txn, b"key", b"value")?;
1090 /// db.commit(txn)?;
1091 /// ```
1092 pub fn open_concurrent<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
1093 Self::open_concurrent_with_config(path, DatabaseConfig::default())
1094 }
1095
1096 /// Open database in concurrent mode with custom configuration
1097 pub fn open_concurrent_with_config<P: AsRef<Path>>(
1098 path: P,
1099 config: DatabaseConfig,
1100 ) -> Result<Arc<Self>> {
1101 use crate::mvcc_concurrent::ConcurrentMvcc;
1102
1103 let path = path.as_ref().to_path_buf();
1104 std::fs::create_dir_all(&path)?;
1105
1106 // Open concurrent MVCC manager (this uses shared memory, not exclusive lock)
1107 let concurrent_mvcc = Arc::new(ConcurrentMvcc::open(&path)?);
1108
1109 // Open storage WITHOUT exclusive lock (concurrent MVCC handles coordination)
1110 // We use a special internal method that skips the file lock
1111 let storage = Arc::new(DurableStorage::open_for_concurrent(
1112 &path,
1113 config.default_index_policy,
1114 )?);
1115
1116 // Propagate sync_mode from config to storage engine
1117 storage.set_sync_mode(config.sync_mode as u64);
1118
1119 // Create index registry with default policy from config
1120 let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
1121 config.default_index_policy,
1122 ));
1123
1124 let db = Arc::new(Self {
1125 path: path.clone(),
1126 storage,
1127 concurrent_mvcc: Some(concurrent_mvcc),
1128 catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
1129 tables: DashMap::new(),
1130 packed_schemas: DashMap::new(),
1131 index_registry,
1132 config,
1133 stats: DatabaseStats::new(),
1134 shutdown: AtomicU64::new(0),
1135 is_concurrent: true,
1136 cdc_log: None,
1137 });
1138
1139 // Perform crash recovery if needed
1140 db.recover()?;
1141
1142 // Clean up any stale readers from crashed processes
1143 if let Some(ref mvcc) = db.concurrent_mvcc {
1144 mvcc.cleanup_stale_readers();
1145 }
1146
1147 Ok(db)
1148 }
1149
1150 /// Check if database is in concurrent mode
1151 #[inline]
1152 pub fn is_concurrent(&self) -> bool {
1153 self.is_concurrent
1154 }
1155
1156 /// Perform crash recovery
1157 fn recover(&self) -> Result<RecoveryStats> {
1158 self.storage.recover()
1159 }
1160
1161 /// Get database path
1162 pub fn path(&self) -> &Path {
1163 &self.path
1164 }
1165
1166 // =========================================================================
1167 // Transaction API
1168 // =========================================================================
1169
1170 /// Begin a new transaction
1171 pub fn begin_transaction(&self) -> Result<TxnHandle> {
1172 self.stats
1173 .transactions_started
1174 .fetch_add(1, Ordering::Relaxed);
1175 let txn_id = self.storage.begin_transaction()?;
1176
1177 // Get snapshot timestamp from MVCC
1178 // For now, use txn_id as a proxy (the real snapshot_ts is managed internally)
1179 Ok(TxnHandle {
1180 txn_id,
1181 snapshot_ts: txn_id,
1182 })
1183 }
1184
1185 /// Begin a read-only transaction (optimized: no SSI tracking)
1186 ///
1187 /// Read-only transactions skip SSI read tracking, reducing overhead
1188 /// from ~82ns to ~32ns per read (2.6x faster).
1189 ///
1190 /// Use this for:
1191 /// - SELECT queries that don't modify data
1192 /// - Analytics and reporting queries
1193 /// - Snapshot reads for backup
1194 pub fn begin_read_only(&self) -> Result<TxnHandle> {
1195 self.stats
1196 .transactions_started
1197 .fetch_add(1, Ordering::Relaxed);
1198 let txn_id = self.storage.begin_with_mode(TransactionMode::ReadOnly)?;
1199 Ok(TxnHandle {
1200 txn_id,
1201 snapshot_ts: txn_id,
1202 })
1203 }
1204
1205 /// Begin a lightweight read-only transaction (no WAL overhead).
1206 ///
1207 /// Eliminates WAL mutex acquisitions entirely for read operations.
1208 /// The txn_id is allocated atomically and MVCC snapshot state is created,
1209 /// but NO WAL records are written (no TxnBegin, no TxnAbort).
1210 ///
1211 /// ~5-10x faster per-operation than `begin_read_only()` because it avoids:
1212 /// - 2 WAL mutex lock/unlock cycles per transaction
1213 /// - 2 WAL BufWriter serializations per transaction
1214 ///
1215 /// Callers MUST use `abort_read_only_fast()` to clean up — NOT `commit()`
1216 /// or `abort()`.
1217 #[inline]
1218 pub fn begin_read_only_fast(&self) -> TxnHandle {
1219 let txn_id = self.storage.begin_read_only_fast();
1220 TxnHandle {
1221 txn_id,
1222 snapshot_ts: txn_id,
1223 }
1224 }
1225
1226 /// Abort a fast read-only transaction — O(1), no WAL, no memtable scan.
1227 #[inline]
1228 pub fn abort_read_only_fast(&self, txn: TxnHandle) {
1229 self.storage.abort_read_only_fast(txn.txn_id);
1230 }
1231
1232 /// Read a key WITHOUT any MVCC transaction tracking.
1233 ///
1234 /// Uses the current global timestamp to see all committed writes.
1235 /// Bypasses: begin/abort, active_txns DashMap, record_read, stats.
1236 /// Only safe for single-threaded access with no concurrent writers.
1237 #[inline]
1238 pub fn get_raw_read(&self, key: &[u8]) -> Option<Vec<u8>> {
1239 self.storage.read_latest(key)
1240 }
1241
1242 /// Scan by prefix WITHOUT any MVCC transaction tracking.
1243 ///
1244 /// Uses the current global timestamp. Only safe for single-threaded access.
1245 #[inline]
1246 pub fn scan_raw(&self, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
1247 self.storage.scan_latest(prefix)
1248 }
1249
1250 /// Begin a write-only transaction (optimized: no read tracking)
1251 ///
1252 /// Write-only transactions skip read tracking, improving insert
1253 /// throughput for bulk loading scenarios.
1254 ///
1255 /// Use this for:
1256 /// - Bulk data imports
1257 /// - Append-only logging tables
1258 /// - ETL pipelines
1259 pub fn begin_write_only(&self) -> Result<TxnHandle> {
1260 self.stats
1261 .transactions_started
1262 .fetch_add(1, Ordering::Relaxed);
1263 let txn_id = self.storage.begin_with_mode(TransactionMode::WriteOnly)?;
1264 Ok(TxnHandle {
1265 txn_id,
1266 snapshot_ts: txn_id,
1267 })
1268 }
1269
1270 /// Commit a transaction
1271 ///
1272 /// In concurrent mode, acquires the shared writer lock to ensure
1273 /// WAL writes are serialized across processes, and forces a flush+sync
1274 /// so that subsequent processes see the committed data.
1275 pub fn commit(&self, txn: TxnHandle) -> Result<u64> {
1276 self.stats
1277 .transactions_committed
1278 .fetch_add(1, Ordering::Relaxed);
1279
1280 // In concurrent mode, acquire the cross-process writer lock
1281 // to serialize WAL commits across processes
1282 let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1283 Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1284 } else {
1285 None
1286 };
1287
1288 let commit_ts = self.storage.commit(txn.txn_id)?;
1289
1290 // In concurrent mode, force flush+sync so other processes can see
1291 // the committed data when they open the DB or run recovery.
1292 // Without this, the BufWriter may hold data that isn't visible
1293 // to other processes reading the WAL file.
1294 if self.is_concurrent {
1295 self.storage.flush_wal()?;
1296 self.storage.fsync()?;
1297 }
1298
1299 // Notify concurrent MVCC of commit for GC tracking
1300 if let Some(ref mvcc) = self.concurrent_mvcc {
1301 mvcc.on_commit();
1302 }
1303
1304 Ok(commit_ts)
1305 }
1306
1307 /// Abort a transaction
1308 pub fn abort(&self, txn: TxnHandle) -> Result<()> {
1309 self.stats
1310 .transactions_aborted
1311 .fetch_add(1, Ordering::Relaxed);
1312 self.storage.abort(txn.txn_id)
1313 }
1314
1315 // =========================================================================
1316 // Per-Table Index Policy API
1317 // =========================================================================
1318
1319 /// Configure index policy for a table
1320 ///
1321 /// This allows fine-grained control over write/scan trade-offs per table:
1322 ///
1323 /// | Policy | Insert Cost | Scan Cost | Use Case |
1324 /// |----------------|-------------|----------------|------------------------|
1325 /// | WriteOptimized | O(1) | O(N) | High-write, rare scan |
1326 /// | Balanced | O(1) amort | O(output+logK) | Mixed OLTP |
1327 /// | ScanOptimized | O(log N) | O(logN + K) | Analytics, range query |
1328 /// | AppendOnly | O(1) | O(N) | Time-series logs |
1329 ///
1330 /// # Example
1331 ///
1332 /// ```ignore
1333 /// // Fast inserts for logs table (no ordered index overhead)
1334 /// db.set_table_index_policy("logs", IndexPolicy::WriteOptimized);
1335 ///
1336 /// // Efficient range scans for analytics table
1337 /// db.set_table_index_policy("analytics", IndexPolicy::ScanOptimized);
1338 ///
1339 /// // Balanced for OLTP tables
1340 /// db.set_table_index_policy("users", IndexPolicy::Balanced);
1341 /// ```
1342 pub fn set_table_index_policy(&self, table: &str, policy: IndexPolicy) {
1343 self.index_registry
1344 .configure_table(TableIndexConfig::new(table, policy));
1345 }
1346
1347 /// Get the index policy for a table
1348 pub fn get_table_index_policy(&self, table: &str) -> IndexPolicy {
1349 self.index_registry.get_policy(table)
1350 }
1351
1352 /// Get the index registry for advanced configuration
1353 pub fn index_registry(&self) -> &Arc<TableIndexRegistry> {
1354 &self.index_registry
1355 }
1356
1357 // =========================================================================
1358 // Key-Value API (Low-level)
1359 // =========================================================================
1360
1361 /// Put a key-value pair
1362 ///
1363 /// In concurrent mode, acquires the shared writer lock to ensure
1364 /// WAL writes are serialized across processes.
1365 pub fn put(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1366 self.stats
1367 .bytes_written
1368 .fetch_add((key.len() + value.len()) as u64, Ordering::Relaxed);
1369
1370 // In concurrent mode, acquire cross-process writer lock
1371 let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1372 Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1373 } else {
1374 None
1375 };
1376
1377 // Use write_refs to avoid unnecessary allocations
1378 self.storage.write_refs(txn.txn_id, key, value)
1379 }
1380
1381 /// Batch put multiple key-value pairs with reduced overhead
1382 ///
1383 /// This amortizes per-operation costs over the entire batch:
1384 /// - Single DashMap lookup
1385 /// - Batch MVCC tracking
1386 /// - Batch memtable writes
1387 ///
1388 /// For 100+ entries, this is 2-3x faster than individual puts.
1389 ///
1390 /// # Example
1391 ///
1392 /// ```ignore
1393 /// let writes: Vec<(&[u8], &[u8])> = vec![
1394 /// (b"key1", b"value1"),
1395 /// (b"key2", b"value2"),
1396 /// (b"key3", b"value3"),
1397 /// ];
1398 /// db.put_batch(txn, &writes)?;
1399 /// ```
1400 pub fn put_batch(&self, txn: TxnHandle, writes: &[(&[u8], &[u8])]) -> Result<()> {
1401 let bytes: u64 = writes.iter().map(|(k, v)| (k.len() + v.len()) as u64).sum();
1402 self.stats.bytes_written.fetch_add(bytes, Ordering::Relaxed);
1403
1404 // In concurrent mode, acquire cross-process writer lock
1405 let _writer_guard = if let Some(ref mvcc) = self.concurrent_mvcc {
1406 Some(mvcc.acquire_writer(std::time::Duration::from_secs(5))?)
1407 } else {
1408 None
1409 };
1410
1411 self.storage.write_batch_refs(txn.txn_id, writes)
1412 }
1413
1414 /// Get a value by key
1415 pub fn get(&self, txn: TxnHandle, key: &[u8]) -> Result<Option<Vec<u8>>> {
1416 let result = self.storage.read(txn.txn_id, key)?;
1417 if let Some(ref data) = result {
1418 self.stats
1419 .bytes_read
1420 .fetch_add(data.len() as u64, Ordering::Relaxed);
1421 }
1422 Ok(result)
1423 }
1424
1425 /// Delete a key
1426 pub fn delete(&self, txn: TxnHandle, key: &[u8]) -> Result<()> {
1427 self.storage.delete(txn.txn_id, key.to_vec())
1428 }
1429
1430 /// Minimum prefix length for scan operations.
1431 /// Prevents expensive full-table scans by requiring a meaningful prefix.
1432 pub const MIN_SCAN_PREFIX_LEN: usize = 2;
1433
1434 /// Scan keys with a prefix (enforces minimum prefix length for safety).
1435 ///
1436 /// # Prefix Safety
1437 ///
1438 /// To prevent accidental full-table scans, this method requires a minimum
1439 /// prefix length of 2 bytes. Use `scan_unchecked` for internal operations
1440 /// that need empty/short prefixes.
1441 ///
1442 /// # Errors
1443 ///
1444 /// Returns `SochDBError::InvalidInput` if prefix is too short.
1445 pub fn scan(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1446 if prefix.len() < Self::MIN_SCAN_PREFIX_LEN {
1447 return Err(SochDBError::InvalidArgument(format!(
1448 "Prefix too short: {} bytes (minimum {} required). \
1449 Use scan_unchecked() for unrestricted scans.",
1450 prefix.len(),
1451 Self::MIN_SCAN_PREFIX_LEN
1452 )));
1453 }
1454 self.scan_unchecked(txn, prefix)
1455 }
1456
1457 /// Scan keys with a prefix without length validation.
1458 ///
1459 /// # Warning
1460 ///
1461 /// This method allows empty/short prefixes which can cause expensive
1462 /// full-table scans. Use `scan()` unless you specifically need unrestricted
1463 /// prefix access for internal operations.
1464 pub fn scan_unchecked(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1465 let results = self.storage.scan(txn.txn_id, prefix)?;
1466 let bytes: u64 = results
1467 .iter()
1468 .map(|(k, v)| (k.len() + v.len()) as u64)
1469 .sum();
1470 self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1471 Ok(results)
1472 }
1473
1474 /// Scan keys in range
1475 pub fn scan_range(
1476 &self,
1477 txn: TxnHandle,
1478 start: &[u8],
1479 end: &[u8],
1480 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1481 let results = self.storage.scan_range(txn.txn_id, start, end)?;
1482 let bytes: u64 = results
1483 .iter()
1484 .map(|(k, v)| (k.len() + v.len()) as u64)
1485 .sum();
1486 self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1487 Ok(results)
1488 }
1489
1490 /// Streaming scan for very large result sets
1491 ///
1492 /// Returns an iterator that yields (key, value) pairs without
1493 /// materializing the entire result set. Use this for large scans
1494 /// where memory efficiency is important.
1495 ///
1496 /// ## Performance
1497 ///
1498 /// - Memory: O(1) per iteration vs O(N) for scan_range
1499 /// - Latency: First result available immediately vs waiting for all results
1500 /// - Throughput: Slightly lower due to per-item overhead
1501 ///
1502 /// ## Usage
1503 ///
1504 /// ```ignore
1505 /// for result in db.scan_range_iter(txn, b"start", b"end") {
1506 /// let (key, value) = result?;
1507 /// // Process immediately - no need to wait for all results
1508 /// }
1509 /// ```
1510 pub fn scan_range_iter<'a>(
1511 &'a self,
1512 txn: TxnHandle,
1513 start: &'a [u8],
1514 end: &'a [u8],
1515 ) -> impl Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a {
1516 let stats = &self.stats;
1517 self.storage
1518 .scan_range_iter(txn.txn_id, start, end)
1519 .map(move |item| {
1520 stats
1521 .bytes_read
1522 .fetch_add((item.0.len() + item.1.len()) as u64, Ordering::Relaxed);
1523 Ok(item)
1524 })
1525 }
1526
1527 /// Flush memtable to WAL/Disk
1528 pub fn flush(&self) -> Result<()> {
1529 self.storage.fsync()
1530 }
1531
1532 // =========================================================================
1533 // Path-Native API (SochDB's differentiator)
1534 // =========================================================================
1535
1536 /// Get storage statistics
1537 pub fn storage_stats(&self) -> crate::durable_storage::StorageStats {
1538 self.storage.stats()
1539 }
1540
1541 /// Put a value at a path
1542 ///
1543 /// Path format: "collection/doc_id/field" or "table.row_id.column"
1544 /// Resolution is O(|path|), not O(log N) like B-tree.
1545 pub fn put_path(&self, txn: TxnHandle, path: &str, value: &[u8]) -> Result<()> {
1546 self.put(txn, path.as_bytes(), value)
1547 }
1548
1549 /// Get a value at a path
1550 pub fn get_path(&self, txn: TxnHandle, path: &str) -> Result<Option<Vec<u8>>> {
1551 self.get(txn, path.as_bytes())
1552 }
1553
1554 /// Delete at a path
1555 pub fn delete_path(&self, txn: TxnHandle, path: &str) -> Result<()> {
1556 self.delete(txn, path.as_bytes())
1557 }
1558
1559 /// Scan a path prefix
1560 ///
1561 /// Returns all key-value pairs where key starts with prefix.
1562 /// Useful for: "users/123/" -> all fields of user 123
1563 pub fn scan_path(&self, txn: TxnHandle, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
1564 self.stats.queries_executed.fetch_add(1, Ordering::Relaxed);
1565
1566 let results = self.scan(txn, prefix.as_bytes())?;
1567
1568 Ok(results
1569 .into_iter()
1570 .filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
1571 .collect())
1572 }
1573
1574 // =========================================================================
1575 // Query API
1576 // =========================================================================
1577
1578 /// Execute a path query and return results
1579 ///
1580 /// This is the main query interface for LLM context retrieval.
1581 /// Supports:
1582 /// - Path prefix matching
1583 /// - Column projection (for I/O reduction)
1584 /// - Limit/offset
1585 pub fn query(&self, txn: TxnHandle, path_prefix: &str) -> QueryBuilder<'_> {
1586 QueryBuilder::new(self, txn, path_prefix.to_string())
1587 }
1588
1589 // =========================================================================
1590 // Table API (Higher-level abstraction)
1591 // =========================================================================
1592
1593 /// Register a table schema
1594 pub fn register_table(&self, schema: TableSchema) -> Result<()> {
1595 if self.tables.contains_key(&schema.name) {
1596 return Err(SochDBError::InvalidArgument(format!(
1597 "Table '{}' already exists",
1598 schema.name
1599 )));
1600 }
1601 // Cache the packed schema for fast inserts
1602 let packed_schema = Self::to_packed_schema(&schema);
1603 self.packed_schemas
1604 .insert(schema.name.clone(), packed_schema);
1605 self.tables.insert(schema.name.clone(), schema);
1606 Ok(())
1607 }
1608
1609 /// Get table schema
1610 pub fn get_table_schema(&self, name: &str) -> Option<TableSchema> {
1611 self.tables.get(name).map(|s| s.clone())
1612 }
1613
1614 /// Update the schema for an existing table (used by ALTER TABLE).
1615 ///
1616 /// Replaces the schema in both the `tables` DashMap and the packed schema
1617 /// cache atomically (per-key). The caller is responsible for validating
1618 /// the new schema.
1619 pub fn update_table_schema(&self, old_name: &str, schema: TableSchema) -> Result<()> {
1620 if !self.tables.contains_key(old_name) {
1621 return Err(SochDBError::InvalidArgument(format!(
1622 "Table '{}' not found",
1623 old_name
1624 )));
1625 }
1626 // Remove old entries
1627 self.tables.remove(old_name);
1628 self.packed_schemas.remove(old_name);
1629 // Insert new
1630 let packed = Self::to_packed_schema(&schema);
1631 self.packed_schemas.insert(schema.name.clone(), packed);
1632 self.tables.insert(schema.name.clone(), schema);
1633 Ok(())
1634 }
1635
1636 /// List all tables
1637 pub fn list_tables(&self) -> Vec<String> {
1638 self.tables.iter().map(|e| e.key().clone()).collect()
1639 }
1640
1641 // =========================================================================
1642 // CDC (Change Data Capture)
1643 // =========================================================================
1644
1645 /// Enable CDC on this database, returning the CDC log handle.
1646 ///
1647 /// Subsequent mutations emitted via the SQL execution layer will be
1648 /// recorded in the CDC log for subscriber consumption.
1649 pub fn enable_cdc(&mut self, config: crate::cdc::CdcConfig) -> Arc<crate::cdc::CdcLog> {
1650 let log = crate::cdc::CdcLog::new(config);
1651 self.cdc_log = Some(log.clone());
1652 log
1653 }
1654
1655 /// Get the CDC log handle, if CDC is enabled.
1656 pub fn cdc_log(&self) -> Option<&Arc<crate::cdc::CdcLog>> {
1657 self.cdc_log.as_ref()
1658 }
1659
1660 /// Convert TableSchema to PackedTableSchema for efficient storage
1661 fn to_packed_schema(schema: &TableSchema) -> PackedTableSchema {
1662 let columns = schema
1663 .columns
1664 .iter()
1665 .map(|col| PackedColumnDef {
1666 name: col.name.clone(),
1667 col_type: match col.col_type {
1668 ColumnType::Int64 => PackedColumnType::Int64,
1669 ColumnType::UInt64 => PackedColumnType::UInt64,
1670 ColumnType::Float64 => PackedColumnType::Float64,
1671 ColumnType::Text => PackedColumnType::Text,
1672 ColumnType::Binary => PackedColumnType::Binary,
1673 ColumnType::Bool => PackedColumnType::Bool,
1674 },
1675 nullable: col.nullable,
1676 })
1677 .collect();
1678
1679 PackedTableSchema::new(&schema.name, columns)
1680 }
1681
1682 /// Insert a row into a table
1683 ///
1684 /// Uses packed row format: stores entire row as single key-value pair.
1685 /// This reduces write amplification from 4× to 1× for a 4-column table.
1686 ///
1687 /// # Performance
1688 /// - Before: 4 columns × (WAL entry + MVCC version) = 4 writes
1689 /// - After: 1 packed row = 1 write
1690 /// - Improvement: ~4× fewer WAL entries, ~48% less I/O overhead
1691 pub fn insert_row(
1692 &self,
1693 txn: TxnHandle,
1694 table: &str,
1695 row_id: u64,
1696 values: &HashMap<String, SochValue>,
1697 ) -> Result<()> {
1698 // Use cached packed schema - single DashMap lookup, no clone
1699 let packed_schema = self
1700 .packed_schemas
1701 .get(table)
1702 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1703
1704 // Pack the row using cached schema
1705 let packed_row = PackedRow::pack(&packed_schema, values);
1706
1707 // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1708 let key = KeyBuffer::format_row_key(table, row_id);
1709
1710 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1711
1712 Ok(())
1713 }
1714
1715 /// Read a row from a table
1716 ///
1717 /// Reads packed row and extracts requested columns in O(k) time.
1718 /// Column projection happens in memory, not storage - all columns are fetched.
1719 pub fn read_row(
1720 &self,
1721 txn: TxnHandle,
1722 table: &str,
1723 row_id: u64,
1724 columns: Option<&[&str]>,
1725 ) -> Result<Option<HashMap<String, SochValue>>> {
1726 let schema = self
1727 .tables
1728 .get(table)
1729 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1730
1731 // Read the packed row with a single key lookup using KeyBuffer
1732 let key = KeyBuffer::format_row_key(table, row_id);
1733 let bytes = match self.get(txn, key.as_bytes())? {
1734 Some(b) => b,
1735 None => return Ok(None),
1736 };
1737
1738 // Use cached packed schema
1739 let packed_schema = self
1740 .packed_schemas
1741 .get(table)
1742 .ok_or_else(|| SochDBError::Internal("Packed schema not found".into()))?;
1743 let packed_row = PackedRow::from_bytes(bytes, packed_schema.num_columns())?;
1744
1745 // Determine which columns to return
1746 let cols_to_read: Vec<&str> = match columns {
1747 Some(c) => c.to_vec(),
1748 None => schema.columns.iter().map(|c| c.name.as_str()).collect(),
1749 };
1750
1751 let mut row = HashMap::new();
1752 for col_name in cols_to_read {
1753 if let Some(idx) = packed_schema.column_index(col_name)
1754 && let Some(col_def) = packed_schema.column(idx)
1755 && let Some(value) = packed_row.get_column(idx, col_def.col_type)
1756 {
1757 row.insert(col_name.to_string(), value);
1758 }
1759 }
1760
1761 Ok(Some(row))
1762 }
1763
1764 /// Insert multiple rows efficiently in a batch
1765 ///
1766 /// This method accumulates all rows and writes them with fewer WAL syncs.
1767 /// Ideal for bulk loading scenarios.
1768 ///
1769 /// # Performance
1770 /// - Uses group commit to batch fsync operations
1771 /// - Expected throughput: 500K-1M rows/sec depending on row size
1772 pub fn insert_rows_batch(
1773 &self,
1774 txn: TxnHandle,
1775 table: &str,
1776 rows: &[(u64, HashMap<String, SochValue>)],
1777 ) -> Result<usize> {
1778 // Use cached packed schema
1779 let packed_schema = self
1780 .packed_schemas
1781 .get(table)
1782 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1783
1784 let mut count = 0;
1785
1786 for (row_id, values) in rows {
1787 // Pack and write using KeyBuffer for efficient key construction
1788 let packed_row = PackedRow::pack(&packed_schema, values);
1789 let key = KeyBuffer::format_row_key(table, *row_id);
1790 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1791 count += 1;
1792 }
1793
1794 Ok(count)
1795 }
1796
1797 /// Ultra-fast raw put - bypasses all validation
1798 ///
1799 /// Use when you've already validated the data and just need speed.
1800 /// This is ~10× faster than insert_row() for bulk inserts.
1801 #[inline]
1802 pub fn put_raw(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1803 self.storage.write_refs(txn.txn_id, key, value)
1804 }
1805
1806 /// Zero-allocation insert - fastest path for bulk inserts
1807 ///
1808 /// Takes values as a slice in schema column order, avoiding HashMap overhead.
1809 ///
1810 /// # Arguments
1811 /// * `txn` - Transaction handle
1812 /// * `table` - Table name
1813 /// * `row_id` - Row identifier
1814 /// * `values` - Values in schema column order (None = NULL)
1815 ///
1816 /// # Performance
1817 /// - Eliminates ~6 allocations per row vs insert_row()
1818 /// - Expected: 1.2M-1.5M inserts/sec
1819 ///
1820 /// # Example
1821 /// ```ignore
1822 /// let values: &[Option<&SochValue>] = &[
1823 /// Some(&SochValue::Int(1)),
1824 /// Some(&SochValue::Text("Alice".into())),
1825 /// None, // NULL
1826 /// ];
1827 /// db.insert_row_slice(txn, "users", 1, values)?;
1828 /// ```
1829 #[inline]
1830 pub fn insert_row_slice(
1831 &self,
1832 txn: TxnHandle,
1833 table: &str,
1834 row_id: u64,
1835 values: &[Option<&SochValue>],
1836 ) -> Result<()> {
1837 // Use cached packed schema - single DashMap lookup
1838 let packed_schema = self
1839 .packed_schemas
1840 .get(table)
1841 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1842
1843 // Validate column count matches
1844 if values.len() != packed_schema.num_columns() {
1845 return Err(SochDBError::InvalidArgument(format!(
1846 "Expected {} columns, got {}",
1847 packed_schema.num_columns(),
1848 values.len()
1849 )));
1850 }
1851
1852 // Pack using zero-allocation path
1853 let packed_row = PackedRow::pack_slice(&packed_schema, values);
1854
1855 // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1856 let key = KeyBuffer::format_row_key(table, row_id);
1857
1858 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1859 Ok(())
1860 }
1861
1862 // =========================================================================
1863 // Maintenance
1864 // =========================================================================
1865
1866 /// Force fsync to disk
1867 pub fn fsync(&self) -> Result<()> {
1868 self.storage.fsync()
1869 }
1870
1871 /// Create a checkpoint
1872 pub fn checkpoint(&self) -> Result<u64> {
1873 self.storage.checkpoint()
1874 }
1875
1876 /// Truncate the WAL file after a checkpoint.
1877 ///
1878 /// See [`DurableStorage::truncate_wal`] for safety notes.
1879 pub fn truncate_wal(&self) -> Result<()> {
1880 self.storage.truncate_wal()
1881 }
1882
1883 /// Run garbage collection
1884 pub fn gc(&self) -> usize {
1885 self.storage.gc()
1886 }
1887
1888 /// Get database statistics
1889 pub fn stats(&self) -> Stats {
1890 Stats {
1891 transactions_started: self.stats.transactions_started.load(Ordering::Relaxed),
1892 transactions_committed: self.stats.transactions_committed.load(Ordering::Relaxed),
1893 transactions_aborted: self.stats.transactions_aborted.load(Ordering::Relaxed),
1894 queries_executed: self.stats.queries_executed.load(Ordering::Relaxed),
1895 bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
1896 bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
1897 }
1898 }
1899
1900 /// Shutdown the database gracefully
1901 pub fn shutdown(&self) -> Result<()> {
1902 if self.shutdown.swap(1, Ordering::SeqCst) == 1 {
1903 return Ok(()); // Already shutting down
1904 }
1905
1906 // Flush any pending writes
1907 self.fsync()?;
1908
1909 // Create clean shutdown marker
1910 let marker = self.path.join(".clean_shutdown");
1911 std::fs::write(&marker, b"ok")?;
1912
1913 Ok(())
1914 }
1915}
1916
1917impl Drop for Database {
1918 fn drop(&mut self) {
1919 // Try graceful shutdown if not already done
1920 if self.shutdown.load(Ordering::SeqCst) == 0 {
1921 let _ = self.fsync();
1922 let marker = self.path.join(".clean_shutdown");
1923 let _ = std::fs::write(&marker, b"ok");
1924 }
1925 }
1926}
1927
1928/// Query builder for fluent query construction
1929pub struct QueryBuilder<'a> {
1930 db: &'a Database,
1931 txn: TxnHandle,
1932 path_prefix: String,
1933 columns: Option<Vec<String>>,
1934 limit: Option<usize>,
1935 offset: Option<usize>,
1936}
1937
1938impl<'a> QueryBuilder<'a> {
1939 fn new(db: &'a Database, txn: TxnHandle, path_prefix: String) -> Self {
1940 Self {
1941 db,
1942 txn,
1943 path_prefix,
1944 columns: None,
1945 limit: None,
1946 offset: None,
1947 }
1948 }
1949
1950 /// Select specific columns (for I/O reduction)
1951 pub fn columns(mut self, cols: &[&str]) -> Self {
1952 self.columns = Some(cols.iter().map(|s| s.to_string()).collect());
1953 self
1954 }
1955
1956 /// Limit results
1957 pub fn limit(mut self, n: usize) -> Self {
1958 self.limit = Some(n);
1959 self
1960 }
1961
1962 /// Skip results
1963 pub fn offset(mut self, n: usize) -> Self {
1964 self.offset = Some(n);
1965 self
1966 }
1967
1968 /// Execute the query
1969 ///
1970 /// Scans packed rows and unpacks them. Each key is "table/row_id" pointing to a packed row.
1971 pub fn execute(self) -> Result<QueryResult> {
1972 self.db
1973 .stats
1974 .queries_executed
1975 .fetch_add(1, Ordering::Relaxed);
1976
1977 // Get schema for the table if we're querying a table
1978 let table_name = self
1979 .path_prefix
1980 .split('/')
1981 .next()
1982 .unwrap_or(&self.path_prefix);
1983 let schema = self.db.tables.get(table_name).map(|s| s.clone());
1984
1985 // When querying a registered table by its bare name, scan the row-key
1986 // prefix "table/" rather than the bare name. Row keys are "table/row_id"
1987 // (see KeyBuffer::format_row_key), so scanning the bare name would:
1988 // (1) bleed across sibling tables whose names share a prefix — e.g.
1989 // querying "user" would also match "users/123"; and
1990 // (2) trip the scan minimum-prefix guard for single-character table
1991 // names ("t" is 1 byte, below the 2-byte minimum).
1992 // Appending the separator fixes both. Path-style prefixes that already
1993 // contain '/' (e.g. "users/123" to fetch one row's columns) are left
1994 // unchanged so field-prefix scans keep working.
1995 let scan_prefix = if schema.is_some() && !self.path_prefix.contains('/') {
1996 format!("{}/", self.path_prefix)
1997 } else {
1998 self.path_prefix.clone()
1999 };
2000
2001 // Scan the path prefix
2002 let results = self.db.scan_path(self.txn, &scan_prefix)?;
2003
2004 let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
2005 let mut bytes_read = 0usize;
2006
2007 if let Some(ref schema) = schema {
2008 // We have a table schema - use cached packed schema
2009 let packed_schema = self
2010 .db
2011 .packed_schemas
2012 .get(table_name)
2013 .map(|ps| ps.clone())
2014 .unwrap_or_else(|| Database::to_packed_schema(schema));
2015
2016 for (path, value_bytes) in results {
2017 // Parse path: table/row_id
2018 let parts: Vec<&str> = path.split('/').collect();
2019 if parts.len() == 2 {
2020 // This is a packed row
2021 bytes_read += value_bytes.len();
2022
2023 if let Ok(packed_row) =
2024 PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
2025 {
2026 // Unpack all columns or just requested columns
2027 let mut row = HashMap::new();
2028
2029 if let Some(ref cols) = self.columns {
2030 // Only extract requested columns
2031 for col_name in cols {
2032 if let Some(idx) = packed_schema.column_index(col_name)
2033 && let Some(col_def) = packed_schema.column(idx)
2034 && let Some(value) =
2035 packed_row.get_column(idx, col_def.col_type)
2036 {
2037 row.insert(col_name.clone(), value);
2038 }
2039 }
2040 } else {
2041 // Extract all columns
2042 row = packed_row.unpack(&packed_schema);
2043 }
2044
2045 if !row.is_empty() {
2046 rows.push(row);
2047 }
2048 }
2049 }
2050 }
2051 } else {
2052 // Fallback: no schema, try legacy column-per-key format
2053 let mut rows_map: HashMap<String, HashMap<String, SochValue>> = HashMap::new();
2054
2055 for (path, value_bytes) in results {
2056 let parts: Vec<&str> = path.split('/').collect();
2057 if parts.len() >= 3 {
2058 let row_key = format!("{}/{}", parts[0], parts[1]);
2059 let col_name = parts[2..].join("/");
2060
2061 if let Some(ref cols) = self.columns
2062 && !cols.contains(&col_name)
2063 {
2064 continue;
2065 }
2066
2067 bytes_read += value_bytes.len();
2068 let row = rows_map.entry(row_key).or_default();
2069 row.insert(col_name, deserialize_value(&value_bytes));
2070 }
2071 }
2072
2073 rows = rows_map.into_values().collect();
2074 }
2075
2076 // Apply offset
2077 if let Some(offset) = self.offset {
2078 rows = rows.into_iter().skip(offset).collect();
2079 }
2080
2081 // Apply limit
2082 if let Some(limit) = self.limit {
2083 rows.truncate(limit);
2084 }
2085
2086 // Collect column names
2087 let columns: Vec<String> = self.columns.unwrap_or_else(|| {
2088 rows.iter()
2089 .flat_map(|r| r.keys().cloned())
2090 .collect::<std::collections::HashSet<_>>()
2091 .into_iter()
2092 .collect()
2093 });
2094
2095 Ok(QueryResult {
2096 columns,
2097 rows_scanned: rows.len(),
2098 bytes_read,
2099 rows,
2100 })
2101 }
2102
2103 /// Execute and return TOON format (for LLM efficiency)
2104 pub fn to_toon(self) -> Result<String> {
2105 let result = self.execute()?;
2106 Ok(result.to_toon())
2107 }
2108
2109 /// Execute with lazy iteration - avoids materializing all rows
2110 ///
2111 /// Returns an iterator over rows as `Vec<SochValue>` in schema column order.
2112 /// This is more memory-efficient than `execute()` for large result sets.
2113 ///
2114 /// # Performance
2115 /// - No upfront materialization of all rows
2116 /// - ~40% less memory for large result sets
2117 /// - Ideal for streaming to network or aggregations
2118 ///
2119 /// # Example
2120 /// ```ignore
2121 /// for row_result in db.query(txn, "users").execute_iter()? {
2122 /// let row = row_result?;
2123 /// // row is Vec<SochValue> in column order
2124 /// }
2125 /// ```
2126 pub fn execute_iter(self) -> Result<QueryRowIterator> {
2127 self.db
2128 .stats
2129 .queries_executed
2130 .fetch_add(1, Ordering::Relaxed);
2131
2132 let table_name = self
2133 .path_prefix
2134 .split('/')
2135 .next()
2136 .unwrap_or(&self.path_prefix)
2137 .to_string();
2138
2139 // Get packed schema (clone needed for iterator ownership)
2140 let packed_schema = self.db.packed_schemas.get(&table_name).map(|ps| ps.clone());
2141
2142 // Scan the path prefix
2143 let results = self.db.scan_path(self.txn, &self.path_prefix)?;
2144
2145 Ok(QueryRowIterator {
2146 results: results.into_iter(),
2147 packed_schema,
2148 columns: self.columns,
2149 offset: self.offset.unwrap_or(0),
2150 limit: self.limit,
2151 yielded: 0,
2152 skipped: 0,
2153 })
2154 }
2155
2156 /// Execute and return columnar (SIMD-friendly) result format
2157 ///
2158 /// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, returns
2159 /// column-oriented `Vec<TypedColumn>` for vectorized operations.
2160 ///
2161 /// ## Performance Benefits
2162 ///
2163 /// - SIMD: Aggregate operations (sum, avg) use vectorized instructions
2164 /// - Cache: Sequential access maximizes L1/L2 hits
2165 /// - Memory: ~30% less overhead than row-based format
2166 /// - Analytics: Ideal for ML preprocessing and statistics
2167 ///
2168 /// ## Example
2169 ///
2170 /// ```ignore
2171 /// let result = db.query(txn, "users")
2172 /// .columns(&["id", "score"])
2173 /// .as_columnar()?;
2174 ///
2175 /// // SIMD-optimized sum
2176 /// let total = result.sum_i64("score").unwrap_or(0);
2177 ///
2178 /// // Direct column access
2179 /// if let Some(scores) = result.column("score") {
2180 /// for i in 0..scores.len() {
2181 /// if let Some(v) = scores.get_i64(i) {
2182 /// println!("Score: {}", v);
2183 /// }
2184 /// }
2185 /// }
2186 /// ```
2187 pub fn as_columnar(self) -> Result<ColumnarQueryResult> {
2188 self.db
2189 .stats
2190 .queries_executed
2191 .fetch_add(1, Ordering::Relaxed);
2192
2193 let table_name = self
2194 .path_prefix
2195 .split('/')
2196 .next()
2197 .unwrap_or(&self.path_prefix);
2198 let schema = self.db.tables.get(table_name).map(|s| s.clone());
2199
2200 // Get packed schema
2201 let packed_schema = match self.db.packed_schemas.get(table_name) {
2202 Some(ps) => ps.clone(),
2203 None => return Ok(ColumnarQueryResult::empty()),
2204 };
2205
2206 // Determine columns to fetch
2207 let column_names: Vec<String> = self.columns.clone().unwrap_or_else(|| {
2208 schema
2209 .as_ref()
2210 .map(|s| s.columns.iter().map(|c| c.name.clone()).collect())
2211 .unwrap_or_default()
2212 });
2213
2214 if column_names.is_empty() {
2215 return Ok(ColumnarQueryResult::empty());
2216 }
2217
2218 // Initialize TypedColumns based on schema types
2219 let mut columns: Vec<CoreTypedColumn> = column_names
2220 .iter()
2221 .map(|col_name| {
2222 packed_schema
2223 .column_index(col_name)
2224 .and_then(|idx| packed_schema.column(idx))
2225 .map(|col_def| match col_def.col_type {
2226 PackedColumnType::Int64 => CoreTypedColumn::new_int64(),
2227 PackedColumnType::UInt64 => CoreTypedColumn::new_uint64(),
2228 PackedColumnType::Float64 => CoreTypedColumn::new_float64(),
2229 PackedColumnType::Text => CoreTypedColumn::new_text(),
2230 PackedColumnType::Binary => CoreTypedColumn::new_binary(),
2231 PackedColumnType::Bool => CoreTypedColumn::new_bool(),
2232 PackedColumnType::Null => CoreTypedColumn::new_text(), // Null column = fallback to text
2233 })
2234 .unwrap_or_else(CoreTypedColumn::new_text) // fallback
2235 })
2236 .collect();
2237
2238 // Scan the path prefix
2239 let results = self.db.scan_path(self.txn, &self.path_prefix)?;
2240
2241 let mut row_count = 0;
2242 let mut bytes_read = 0;
2243 let mut skipped = 0;
2244
2245 for (path, value_bytes) in results {
2246 // Parse path: table/row_id
2247 let parts: Vec<&str> = path.split('/').collect();
2248 if parts.len() != 2 {
2249 continue;
2250 }
2251
2252 // Apply offset
2253 if let Some(offset) = self.offset
2254 && skipped < offset
2255 {
2256 skipped += 1;
2257 continue;
2258 }
2259
2260 // Apply limit
2261 if let Some(limit) = self.limit
2262 && row_count >= limit
2263 {
2264 break;
2265 }
2266
2267 bytes_read += value_bytes.len();
2268
2269 if let Ok(packed_row) = PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
2270 {
2271 // Extract each column and push to corresponding TypedColumn
2272 for (col_idx, col_name) in column_names.iter().enumerate() {
2273 if let Some(schema_idx) = packed_schema.column_index(col_name) {
2274 if let Some(col_def) = packed_schema.column(schema_idx) {
2275 let value = packed_row.get_column(schema_idx, col_def.col_type);
2276 push_value_to_typed_column(&mut columns[col_idx], value);
2277 } else {
2278 push_null_to_typed_column(&mut columns[col_idx]);
2279 }
2280 } else {
2281 push_null_to_typed_column(&mut columns[col_idx]);
2282 }
2283 }
2284 row_count += 1;
2285 }
2286 }
2287
2288 Ok(ColumnarQueryResult {
2289 columns: column_names,
2290 data: columns,
2291 row_count,
2292 bytes_read,
2293 })
2294 }
2295}
2296
2297/// Lazy iterator over query results
2298///
2299/// Unpacks rows on-demand, avoiding upfront materialization.
2300pub struct QueryRowIterator {
2301 results: std::vec::IntoIter<(String, Vec<u8>)>,
2302 packed_schema: Option<PackedTableSchema>,
2303 columns: Option<Vec<String>>,
2304 offset: usize,
2305 limit: Option<usize>,
2306 yielded: usize,
2307 skipped: usize,
2308}
2309
2310impl Iterator for QueryRowIterator {
2311 type Item = Result<Vec<SochValue>>;
2312
2313 fn next(&mut self) -> Option<Self::Item> {
2314 // Check limit
2315 if let Some(limit) = self.limit
2316 && self.yielded >= limit
2317 {
2318 return None;
2319 }
2320
2321 loop {
2322 let (path, value_bytes) = self.results.next()?;
2323
2324 // Parse path: table/row_id
2325 let parts: Vec<&str> = path.split('/').collect();
2326 if parts.len() != 2 {
2327 continue; // Skip non-row entries
2328 }
2329
2330 // Apply offset
2331 if self.skipped < self.offset {
2332 self.skipped += 1;
2333 continue;
2334 }
2335
2336 if let Some(ref schema) = self.packed_schema {
2337 match PackedRow::from_bytes(value_bytes, schema.num_columns()) {
2338 Ok(packed_row) => {
2339 let row = if let Some(ref cols) = self.columns {
2340 // Project specific columns
2341 cols.iter()
2342 .map(|col_name| {
2343 schema
2344 .column_index(col_name)
2345 .and_then(|idx| schema.column(idx))
2346 .and_then(|col_def| {
2347 packed_row.get_column(
2348 schema.column_index(col_name).unwrap(),
2349 col_def.col_type,
2350 )
2351 })
2352 .unwrap_or(SochValue::Null)
2353 })
2354 .collect()
2355 } else {
2356 // All columns in order
2357 packed_row.unpack_to_vec(schema)
2358 };
2359
2360 self.yielded += 1;
2361 return Some(Ok(row));
2362 }
2363 Err(e) => return Some(Err(e)),
2364 }
2365 } else {
2366 // No schema - return raw bytes as binary
2367 self.yielded += 1;
2368 return Some(Ok(vec![SochValue::Binary(value_bytes)]));
2369 }
2370 }
2371 }
2372}
2373
2374// Helper functions for serialization (kept for backward compatibility with legacy data)
2375
2376#[allow(dead_code)]
2377fn serialize_value(value: &SochValue) -> Vec<u8> {
2378 // Simple serialization - in production use proper format
2379 match value {
2380 SochValue::Null => vec![0],
2381 SochValue::Int(i) => {
2382 let mut buf = vec![1];
2383 buf.extend_from_slice(&i.to_le_bytes());
2384 buf
2385 }
2386 SochValue::UInt(u) => {
2387 let mut buf = vec![2];
2388 buf.extend_from_slice(&u.to_le_bytes());
2389 buf
2390 }
2391 SochValue::Float(f) => {
2392 let mut buf = vec![3];
2393 buf.extend_from_slice(&f.to_le_bytes());
2394 buf
2395 }
2396 SochValue::Text(s) => {
2397 let mut buf = vec![4];
2398 buf.extend_from_slice(s.as_bytes());
2399 buf
2400 }
2401 SochValue::Bool(b) => vec![5, if *b { 1 } else { 0 }],
2402 SochValue::Binary(b) => {
2403 let mut buf = vec![6];
2404 buf.extend_from_slice(b);
2405 buf
2406 }
2407 _ => {
2408 // Fallback: serialize as text
2409 let s = format!("{:?}", value);
2410 let mut buf = vec![4];
2411 buf.extend_from_slice(s.as_bytes());
2412 buf
2413 }
2414 }
2415}
2416
2417fn deserialize_value(bytes: &[u8]) -> SochValue {
2418 if bytes.is_empty() {
2419 return SochValue::Null;
2420 }
2421
2422 match bytes[0] {
2423 0 => SochValue::Null,
2424 1 if bytes.len() >= 9 => {
2425 let i = i64::from_le_bytes(bytes[1..9].try_into().unwrap());
2426 SochValue::Int(i)
2427 }
2428 2 if bytes.len() >= 9 => {
2429 let u = u64::from_le_bytes(bytes[1..9].try_into().unwrap());
2430 SochValue::UInt(u)
2431 }
2432 3 if bytes.len() >= 9 => {
2433 let f = f64::from_le_bytes(bytes[1..9].try_into().unwrap());
2434 SochValue::Float(f)
2435 }
2436 4 => {
2437 let s = String::from_utf8_lossy(&bytes[1..]).to_string();
2438 SochValue::Text(s)
2439 }
2440 5 if bytes.len() >= 2 => SochValue::Bool(bytes[1] != 0),
2441 6 => SochValue::Binary(bytes[1..].to_vec()),
2442 _ => {
2443 // Treat as text
2444 let s = String::from_utf8_lossy(bytes).to_string();
2445 SochValue::Text(s)
2446 }
2447 }
2448}
2449
2450// ============================================================================
2451// Helper functions for columnar query result building
2452// ============================================================================
2453
2454/// Push a SochValue into a TypedColumn
2455fn push_value_to_typed_column(col: &mut CoreTypedColumn, value: Option<SochValue>) {
2456 match value {
2457 None => push_null_to_typed_column(col),
2458 Some(v) => match (col, v) {
2459 (
2460 CoreTypedColumn::Int64 {
2461 values,
2462 validity,
2463 stats,
2464 },
2465 SochValue::Int(i),
2466 ) => {
2467 values.push(i);
2468 validity.push(true);
2469 stats.update_i64(i);
2470 }
2471 (
2472 CoreTypedColumn::Int64 {
2473 values,
2474 validity,
2475 stats,
2476 },
2477 SochValue::UInt(u),
2478 ) => {
2479 values.push(u as i64);
2480 validity.push(true);
2481 stats.update_i64(u as i64);
2482 }
2483 (
2484 CoreTypedColumn::UInt64 {
2485 values,
2486 validity,
2487 stats,
2488 },
2489 SochValue::UInt(u),
2490 ) => {
2491 values.push(u);
2492 validity.push(true);
2493 stats.update_i64(u as i64);
2494 }
2495 (
2496 CoreTypedColumn::UInt64 {
2497 values,
2498 validity,
2499 stats,
2500 },
2501 SochValue::Int(i),
2502 ) => {
2503 values.push(i as u64);
2504 validity.push(true);
2505 stats.update_i64(i);
2506 }
2507 (
2508 CoreTypedColumn::Float64 {
2509 values,
2510 validity,
2511 stats,
2512 },
2513 SochValue::Float(f),
2514 ) => {
2515 values.push(f);
2516 validity.push(true);
2517 stats.update_f64(f);
2518 }
2519 (
2520 CoreTypedColumn::Float64 {
2521 values,
2522 validity,
2523 stats,
2524 },
2525 SochValue::Int(i),
2526 ) => {
2527 values.push(i as f64);
2528 validity.push(true);
2529 stats.update_f64(i as f64);
2530 }
2531 (
2532 CoreTypedColumn::Text {
2533 offsets,
2534 data,
2535 validity,
2536 stats,
2537 },
2538 SochValue::Text(s),
2539 ) => {
2540 data.extend_from_slice(s.as_bytes());
2541 offsets.push(data.len() as u32);
2542 validity.push(true);
2543 stats.row_count += 1;
2544 }
2545 (
2546 CoreTypedColumn::Binary {
2547 offsets,
2548 data,
2549 validity,
2550 stats,
2551 },
2552 SochValue::Binary(b),
2553 ) => {
2554 data.extend_from_slice(&b);
2555 offsets.push(data.len() as u32);
2556 validity.push(true);
2557 stats.row_count += 1;
2558 }
2559 (
2560 CoreTypedColumn::Bool {
2561 values,
2562 validity,
2563 stats,
2564 len,
2565 },
2566 SochValue::Bool(b),
2567 ) => {
2568 let idx = *len;
2569 *len += 1;
2570 let num_words = (*len).div_ceil(64);
2571 while values.len() < num_words {
2572 values.push(0);
2573 }
2574 if b {
2575 let word = idx / 64;
2576 let bit = idx % 64;
2577 values[word] |= 1 << bit;
2578 }
2579 validity.push(true);
2580 stats.row_count += 1;
2581 }
2582 // Type mismatch - push as null
2583 (col, _) => push_null_to_typed_column(col),
2584 },
2585 }
2586}
2587
2588/// Push a null value into a TypedColumn
2589fn push_null_to_typed_column(col: &mut CoreTypedColumn) {
2590 match col {
2591 CoreTypedColumn::Int64 {
2592 values,
2593 validity,
2594 stats,
2595 } => {
2596 values.push(0);
2597 validity.push(false);
2598 stats.update_null();
2599 }
2600 CoreTypedColumn::UInt64 {
2601 values,
2602 validity,
2603 stats,
2604 } => {
2605 values.push(0);
2606 validity.push(false);
2607 stats.update_null();
2608 }
2609 CoreTypedColumn::Float64 {
2610 values,
2611 validity,
2612 stats,
2613 } => {
2614 values.push(0.0);
2615 validity.push(false);
2616 stats.update_null();
2617 }
2618 CoreTypedColumn::Text {
2619 offsets,
2620 data: _,
2621 validity,
2622 stats,
2623 } => {
2624 offsets.push(offsets.last().copied().unwrap_or(0));
2625 validity.push(false);
2626 stats.update_null();
2627 }
2628 CoreTypedColumn::Binary {
2629 offsets,
2630 data: _,
2631 validity,
2632 stats,
2633 } => {
2634 offsets.push(offsets.last().copied().unwrap_or(0));
2635 validity.push(false);
2636 stats.update_null();
2637 }
2638 CoreTypedColumn::Bool {
2639 values,
2640 validity,
2641 stats,
2642 len,
2643 } => {
2644 *len += 1;
2645 let num_words = (*len).div_ceil(64);
2646 while values.len() < num_words {
2647 values.push(0);
2648 }
2649 validity.push(false);
2650 stats.update_null();
2651 }
2652 }
2653}
2654
2655#[cfg(test)]
2656mod tests {
2657 use super::*;
2658 use tempfile::tempdir;
2659
2660 #[test]
2661 fn test_database_open_close() {
2662 let dir = tempdir().unwrap();
2663 let db = Database::open(dir.path()).unwrap();
2664
2665 // Should be able to begin a transaction
2666 let txn = db.begin_transaction().unwrap();
2667 assert!(txn.txn_id > 0);
2668
2669 db.abort(txn).unwrap();
2670 db.shutdown().unwrap();
2671 }
2672
2673 #[test]
2674 fn test_database_put_get() {
2675 let dir = tempdir().unwrap();
2676 let db = Database::open(dir.path()).unwrap();
2677
2678 let txn = db.begin_transaction().unwrap();
2679 db.put(txn, b"key1", b"value1").unwrap();
2680
2681 let val = db.get(txn, b"key1").unwrap();
2682 assert_eq!(val, Some(b"value1".to_vec()));
2683
2684 db.commit(txn).unwrap();
2685
2686 // New transaction should see committed data
2687 let txn2 = db.begin_transaction().unwrap();
2688 let val = db.get(txn2, b"key1").unwrap();
2689 assert_eq!(val, Some(b"value1".to_vec()));
2690 db.abort(txn2).unwrap();
2691 }
2692
2693 #[test]
2694 fn test_database_path_api() {
2695 let dir = tempdir().unwrap();
2696 let db = Database::open(dir.path()).unwrap();
2697
2698 let txn = db.begin_transaction().unwrap();
2699
2700 // Write using path API
2701 db.put_path(txn, "users/1/name", b"Alice").unwrap();
2702 db.put_path(txn, "users/1/email", b"alice@example.com")
2703 .unwrap();
2704 db.put_path(txn, "users/2/name", b"Bob").unwrap();
2705
2706 db.commit(txn).unwrap();
2707
2708 // Scan path prefix
2709 let txn2 = db.begin_transaction().unwrap();
2710 let results = db.scan_path(txn2, "users/1/").unwrap();
2711 assert_eq!(results.len(), 2);
2712
2713 db.abort(txn2).unwrap();
2714 }
2715
2716 #[test]
2717 fn test_database_table_api() {
2718 let dir = tempdir().unwrap();
2719 let db = Database::open(dir.path()).unwrap();
2720
2721 // Register table
2722 db.register_table(TableSchema {
2723 name: "users".to_string(),
2724 columns: vec![
2725 ColumnDef {
2726 name: "name".to_string(),
2727 col_type: ColumnType::Text,
2728 nullable: false,
2729 },
2730 ColumnDef {
2731 name: "age".to_string(),
2732 col_type: ColumnType::Int64,
2733 nullable: true,
2734 },
2735 ],
2736 })
2737 .unwrap();
2738
2739 // Insert row
2740 let txn = db.begin_transaction().unwrap();
2741 let mut values = HashMap::new();
2742 values.insert("name".to_string(), SochValue::Text("Alice".to_string()));
2743 values.insert("age".to_string(), SochValue::Int(30));
2744
2745 db.insert_row(txn, "users", 1, &values).unwrap();
2746 db.commit(txn).unwrap();
2747
2748 // Read row
2749 let txn2 = db.begin_transaction().unwrap();
2750 let row = db.read_row(txn2, "users", 1, None).unwrap();
2751 assert!(row.is_some());
2752
2753 let row = row.unwrap();
2754 assert_eq!(row.get("name"), Some(&SochValue::Text("Alice".to_string())));
2755
2756 db.abort(txn2).unwrap();
2757 }
2758
2759 #[test]
2760 fn test_query_does_not_bleed_across_prefix_sibling_tables() {
2761 // Regression: row keys are "table/row_id", so a bare-name prefix scan of
2762 // "user" used to also match "users/..." (cross-table bleed), and a
2763 // single-character table name like "t" tripped the 2-byte scan guard.
2764 // Querying a registered table by name must scan exactly "table/".
2765 let dir = tempdir().unwrap();
2766 let db = Database::open(dir.path()).unwrap();
2767
2768 let col = || ColumnDef {
2769 name: "name".to_string(),
2770 col_type: ColumnType::Text,
2771 nullable: false,
2772 };
2773
2774 // Two sibling tables whose names share a prefix, plus a 1-char table.
2775 for table in ["user", "users", "t"] {
2776 db.register_table(TableSchema {
2777 name: table.to_string(),
2778 columns: vec![col()],
2779 })
2780 .unwrap();
2781 }
2782
2783 let txn = db.begin_transaction().unwrap();
2784 let mut v_user = HashMap::new();
2785 v_user.insert("name".to_string(), SochValue::Text("in_user".to_string()));
2786 db.insert_row(txn, "user", 1, &v_user).unwrap();
2787
2788 let mut v_users = HashMap::new();
2789 v_users.insert("name".to_string(), SochValue::Text("in_users".to_string()));
2790 db.insert_row(txn, "users", 1, &v_users).unwrap();
2791 db.insert_row(txn, "users", 2, &v_users).unwrap();
2792
2793 let mut v_t = HashMap::new();
2794 v_t.insert("name".to_string(), SochValue::Text("in_t".to_string()));
2795 db.insert_row(txn, "t", 1, &v_t).unwrap();
2796 db.commit(txn).unwrap();
2797
2798 // "user" must return exactly its one row — no bleed from "users".
2799 let txn_r = db.begin_read_only_fast();
2800 let user_rows = db.query(txn_r, "user").execute().unwrap().rows;
2801 assert_eq!(user_rows.len(), 1, "querying 'user' bled into 'users'");
2802 assert_eq!(
2803 user_rows[0].get("name"),
2804 Some(&SochValue::Text("in_user".to_string()))
2805 );
2806
2807 // "users" must return its two rows.
2808 let users_rows = db.query(txn_r, "users").execute().unwrap().rows;
2809 assert_eq!(users_rows.len(), 2);
2810
2811 // Single-character table name must be queryable (was rejected by the
2812 // 2-byte scan guard before the "table/" prefix fix).
2813 let t_rows = db.query(txn_r, "t").execute().unwrap().rows;
2814 assert_eq!(t_rows.len(), 1);
2815 assert_eq!(
2816 t_rows[0].get("name"),
2817 Some(&SochValue::Text("in_t".to_string()))
2818 );
2819 db.abort_read_only_fast(txn_r);
2820 }
2821
2822 #[test]
2823 fn test_database_query_builder() {
2824 let dir = tempdir().unwrap();
2825 let db = Database::open(dir.path()).unwrap();
2826
2827 // Insert test data
2828 let txn = db.begin_transaction().unwrap();
2829 db.put_path(txn, "docs/1/title", b"Hello").unwrap();
2830 db.put_path(txn, "docs/1/content", b"World").unwrap();
2831 db.put_path(txn, "docs/2/title", b"Foo").unwrap();
2832 db.put_path(txn, "docs/2/content", b"Bar").unwrap();
2833 db.commit(txn).unwrap();
2834
2835 // Query with limit
2836 let txn2 = db.begin_transaction().unwrap();
2837 let result = db.query(txn2, "docs/").limit(1).execute().unwrap();
2838
2839 assert_eq!(result.rows.len(), 1);
2840 db.abort(txn2).unwrap();
2841 }
2842
2843 #[test]
2844 fn test_database_crash_recovery() {
2845 let dir = tempdir().unwrap();
2846
2847 // Write and commit
2848 {
2849 // Use open_without_lock for crash simulation tests
2850 let db = Database::open_without_lock(dir.path()).unwrap();
2851 // Set sync mode to FULL to ensure data is persisted before "crash"
2852 db.storage.set_sync_mode(2);
2853 let txn = db.begin_transaction().unwrap();
2854 db.put(txn, b"persist", b"this").unwrap();
2855 db.commit(txn).unwrap();
2856 // Don't call shutdown - simulate crash
2857 std::mem::forget(db);
2858 }
2859
2860 // Reopen - should recover
2861 {
2862 let db = Database::open_without_lock(dir.path()).unwrap();
2863 let txn = db.begin_transaction().unwrap();
2864 let val = db.get(txn, b"persist").unwrap();
2865 assert_eq!(val, Some(b"this".to_vec()));
2866 db.abort(txn).unwrap();
2867 }
2868 }
2869
2870 #[test]
2871 fn test_columnar_row_view_zero_alloc() {
2872 use sochdb_core::columnar::TypedColumn;
2873
2874 // Build a small columnar result: 3 rows × 2 columns (id: i64, name: text)
2875 let mut id_col = TypedColumn::new_int64();
2876 id_col.push_i64(Some(1));
2877 id_col.push_i64(Some(2));
2878 id_col.push_i64(Some(3));
2879
2880 let mut name_col = TypedColumn::new_text();
2881 name_col.push_text(Some("Alice"));
2882 name_col.push_text(Some("Bob"));
2883 name_col.push_text(None); // NULL
2884
2885 let cr = ColumnarQueryResult {
2886 columns: vec!["id".to_string(), "name".to_string()],
2887 data: vec![id_col, name_col],
2888 row_count: 3,
2889 bytes_read: 0,
2890 };
2891
2892 // row_view access — zero HashMap allocation
2893 let row0 = cr.row_view(0).unwrap();
2894 assert_eq!(row0.get("id"), Some(SochValue::Int(1)));
2895 assert_eq!(row0.get("name"), Some(SochValue::Text("Alice".to_string())));
2896 assert_eq!(row0.get("nonexistent"), None);
2897
2898 let row2 = cr.row_view(2).unwrap();
2899 assert_eq!(row2.get("id"), Some(SochValue::Int(3)));
2900 assert_eq!(row2.get("name"), Some(SochValue::Null));
2901
2902 // Out of bounds
2903 assert!(cr.row_view(3).is_none());
2904
2905 // values() — positional
2906 let vals = row0.values();
2907 assert_eq!(vals.len(), 2);
2908 assert_eq!(vals[0], SochValue::Int(1));
2909
2910 // to_map() — backward compat
2911 let map = row0.to_map();
2912 assert_eq!(map.get("id"), Some(&SochValue::Int(1)));
2913 }
2914
2915 #[test]
2916 fn test_columnar_into_query_result() {
2917 use sochdb_core::columnar::TypedColumn;
2918
2919 let mut score_col = TypedColumn::new_float64();
2920 score_col.push_f64(Some(9.5));
2921 score_col.push_f64(Some(8.2));
2922
2923 let cr = ColumnarQueryResult {
2924 columns: vec!["score".to_string()],
2925 data: vec![score_col],
2926 row_count: 2,
2927 bytes_read: 100,
2928 };
2929
2930 let qr = cr.into_query_result();
2931 assert_eq!(qr.rows.len(), 2);
2932 assert_eq!(qr.rows[0].get("score"), Some(&SochValue::Float(9.5)));
2933 assert_eq!(qr.rows[1].get("score"), Some(&SochValue::Float(8.2)));
2934 assert_eq!(qr.bytes_read, 100);
2935 }
2936}