sochdb_storage/database.rs
1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SochDB Database Kernel
16//!
17//! The shared core that powers both embedded mode (`SochConnection::open`) and
18//! server mode (`sochdb-server`). This is the "SQLite engine" equivalent.
19//!
20//! ## Architecture
21//!
22//! ```text
23//! ┌──────────────────────────────────────────────────────────────────┐
24//! │ Database Kernel │
25//! │ Arc<Database> - shared by all connections │
26//! ├──────────────────────────────────────────────────────────────────┤
27//! │ │
28//! │ ┌─────────────────┐ ┌─────────────────┐ ┌────────────────┐ │
29//! │ │ DurableStorage │ │ Catalog │ │ Vector Index │ │
30//! │ │ (WAL + MVCC) │ │ (Schema Mgmt) │ │ (HNSW/Vamana) │ │
31//! │ └────────┬────────┘ └────────┬────────┘ └───────┬────────┘ │
32//! │ │ │ │ │
33//! │ └─────────────────────┴─────────────────────┘ │
34//! │ │ │
35//! │ ┌─────────────────────────────────────────────────────────────┐ │
36//! │ │ Query Executor (Path-Native) │ │
37//! │ │ - Path resolution: O(|path|) │ │
38//! │ │ - Column projection: 80% I/O reduction │ │
39//! │ │ - Context selection: Token-aware chunking │ │
40//! │ └─────────────────────────────────────────────────────────────┘ │
41//! │ │
42//! └──────────────────────────────────────────────────────────────────┘
43//!
44//! Deployment Modes:
45//! ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
46//! │ Embedded │ │ IPC Server │ │ TCP Server │
47//! │ (in-proc) │ │ (Unix sock)│ │ (remote) │
48//! └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
49//! │ │ │
50//! └─────────────────┴─────────────────┘
51//! │
52//! Arc<Database>
53//! ```
54//!
55//! ## Latency Model
56//!
57//! Let K = kernel processing cost for a query
58//!
59//! - Embedded: L_emb ≈ K (function call overhead negligible)
60//! - IPC: L_ipc ≈ K + δ_ipc (δ_ipc = ~10-50µs for Unix socket)
61//! - TCP: L_tcp ≈ K + δ_net (δ_net = 100µs-10ms depending on network)
62//!
63//! For LLM context queries where K >> δ_ipc, IPC is "nearly embedded".
64
65use std::collections::HashMap;
66use std::path::{Path, PathBuf};
67use std::sync::Arc;
68use std::sync::atomic::{AtomicU64, Ordering};
69
70use dashmap::DashMap;
71use parking_lot::RwLock;
72
73use crate::durable_storage::{DurableStorage, TransactionMode};
74use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
75use crate::key_buffer::KeyBuffer;
76use crate::packed_row::{PackedColumnDef, PackedColumnType, PackedRow, PackedTableSchema};
77use sochdb_core::catalog::Catalog;
78use sochdb_core::{Result, SochDBError, SochValue};
79
80// Re-export key types
81pub use crate::durable_storage::RecoveryStats;
82
83/// Database configuration
84#[derive(Debug, Clone)]
85pub struct DatabaseConfig {
86 /// Enable group commit for better write throughput
87 pub group_commit: bool,
88 /// Maximum memory for memtables before flush (bytes)
89 pub memtable_size_limit: usize,
90 /// Enable WAL for durability
91 pub wal_enabled: bool,
92 /// Sync mode: fsync after every commit vs periodic
93 pub sync_mode: SyncMode,
94 /// Read-only mode
95 pub read_only: bool,
96
97 /// Enable ordered index for O(log N) prefix scans
98 ///
99 /// # Deprecation Notice
100 ///
101 /// **DEPRECATED since 0.2.0**: Use `default_index_policy` instead for per-table control.
102 /// This field will be removed in v0.3.0.
103 ///
104 /// ## Migration Guide
105 ///
106 /// Replace:
107 /// ```ignore
108 /// DatabaseConfig { enable_ordered_index: true, .. } // Old API
109 /// DatabaseConfig { enable_ordered_index: false, .. } // Old API
110 /// ```
111 ///
112 /// With:
113 /// ```ignore
114 /// DatabaseConfig { default_index_policy: IndexPolicy::ScanOptimized, .. } // Ordered index enabled
115 /// DatabaseConfig { default_index_policy: IndexPolicy::WriteOptimized, .. } // Ordered index disabled
116 /// ```
117 ///
118 /// ## Behavior
119 ///
120 /// When false, saves ~134 ns/op on writes (20% speedup)
121 /// but scan_prefix becomes O(N) instead of O(log N + K).
122 ///
123 /// Set to false for write-heavy workloads without range scans.
124 #[deprecated(
125 since = "0.2.0",
126 note = "Use `default_index_policy` field instead. This field will be removed in v0.3.0. \
127 Set IndexPolicy::ScanOptimized for ordered index, WriteOptimized to disable."
128 )]
129 ///
130 /// Set to false for write-heavy workloads without range scans.
131 pub enable_ordered_index: bool,
132 /// Group commit configuration
133 pub group_commit_config: GroupCommitSettings,
134 /// Default index policy for tables not explicitly configured
135 ///
136 /// This replaces the global `enable_ordered_index` toggle with
137 /// fine-grained per-table control. Use `index_registry` to configure
138 /// individual tables.
139 ///
140 /// | Policy | Insert Cost | Scan Cost | Use Case |
141 /// |----------------|-------------|----------------|------------------------|
142 /// | WriteOptimized | O(1) | O(N) | High-write, rare scan |
143 /// | Balanced | O(1) amort | O(output+logK) | Mixed OLTP |
144 /// | ScanOptimized | O(log N) | O(logN + K) | Analytics, range query |
145 /// | AppendOnly | O(1) | O(N) | Time-series logs |
146 pub default_index_policy: IndexPolicy,
147}
148
149/// Group commit settings - mirrors SQLite's WAL mode tuning
150///
151/// ## Performance Model
152///
153/// Without group commit: Throughput = 1 / L_fsync ≈ 200 commits/sec (L=5ms)
154/// With group commit (batch size K): Throughput = K / L_fsync = K × 200 commits/sec
155///
156/// For K=100: 20,000 commits/sec (100× speedup)
157///
158/// ## SQLite Comparison
159///
160/// | Setting | SQLite Equivalent |
161/// |----------------------------|-----------------------------|
162/// | batch_size = 1 | PRAGMA synchronous = FULL |
163/// | batch_size = 100 | WAL mode with batching |
164/// | max_wait_us = 0 | No delay, immediate flush |
165/// | max_wait_us = 10000 | Up to 10ms delay for batch |
166#[derive(Debug, Clone)]
167pub struct GroupCommitSettings {
168 /// Minimum batch size before flush (default: 1)
169 pub min_batch_size: usize,
170 /// Maximum batch size (default: 1000)
171 pub max_batch_size: usize,
172 /// Maximum wait time before flush in microseconds (default: 10000 = 10ms)
173 pub max_wait_us: u64,
174 /// Expected fsync latency in microseconds (for adaptive sizing)
175 pub fsync_latency_us: u64,
176}
177
178impl Default for GroupCommitSettings {
179 fn default() -> Self {
180 Self {
181 min_batch_size: 1,
182 max_batch_size: 1000,
183 max_wait_us: 10_000, // 10ms
184 fsync_latency_us: 5_000, // 5ms
185 }
186 }
187}
188
189impl GroupCommitSettings {
190 /// High throughput preset - maximizes batching
191 pub fn high_throughput() -> Self {
192 Self {
193 min_batch_size: 50,
194 max_batch_size: 5000,
195 max_wait_us: 50_000, // 50ms
196 fsync_latency_us: 5_000,
197 }
198 }
199
200 /// Low latency preset - minimal batching
201 pub fn low_latency() -> Self {
202 Self {
203 min_batch_size: 1,
204 max_batch_size: 10,
205 max_wait_us: 1_000, // 1ms
206 fsync_latency_us: 5_000,
207 }
208 }
209
210 /// Calculate optimal batch size using Little's Law
211 ///
212 /// N* = sqrt(2 × L_fsync × λ / C_wait)
213 ///
214 /// # Arguments
215 /// * `arrival_rate` - Operations per second
216 /// * `wait_cost` - Cost coefficient for waiting (0.0-1.0)
217 pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
218 let l_fsync = self.fsync_latency_us as f64 / 1_000_000.0;
219 let n_star = (2.0 * l_fsync * arrival_rate / wait_cost.max(0.001)).sqrt();
220 (n_star as usize).clamp(self.min_batch_size, self.max_batch_size)
221 }
222}
223
224impl Default for DatabaseConfig {
225 #[allow(deprecated)]
226 fn default() -> Self {
227 Self {
228 group_commit: true,
229 memtable_size_limit: 64 * 1024 * 1024, // 64MB
230 wal_enabled: true,
231 sync_mode: SyncMode::Normal,
232 read_only: false,
233 enable_ordered_index: true, // Default: enabled for compatibility
234 group_commit_config: GroupCommitSettings::default(),
235 default_index_policy: IndexPolicy::Balanced, // New default: balanced OLTP policy
236 }
237 }
238}
239
240impl DatabaseConfig {
241 /// Create config optimized for throughput (Fast Mode)
242 ///
243 /// - Disables ordered index (saves ~134 ns/op)
244 /// - Uses high-throughput group commit settings
245 /// - Suitable for append-only workloads
246 #[allow(deprecated)]
247 pub fn throughput_optimized() -> Self {
248 Self {
249 group_commit: true,
250 memtable_size_limit: 128 * 1024 * 1024, // 128MB
251 wal_enabled: true,
252 sync_mode: SyncMode::Normal,
253 read_only: false,
254 enable_ordered_index: false,
255 group_commit_config: GroupCommitSettings::high_throughput(),
256 default_index_policy: IndexPolicy::WriteOptimized, // No ordered index overhead
257 }
258 }
259
260 /// Create config optimized for latency
261 ///
262 /// - Keeps ordered index for fast range scans
263 /// - Uses low-latency group commit settings
264 /// - Suitable for OLTP workloads
265 #[allow(deprecated)]
266 pub fn latency_optimized() -> Self {
267 Self {
268 group_commit: true,
269 memtable_size_limit: 32 * 1024 * 1024, // 32MB
270 wal_enabled: true,
271 sync_mode: SyncMode::Full,
272 read_only: false,
273 enable_ordered_index: true,
274 group_commit_config: GroupCommitSettings::low_latency(),
275 default_index_policy: IndexPolicy::ScanOptimized, // Fast range scans
276 }
277 }
278
279 /// Create config matching SQLite defaults
280 #[allow(deprecated)]
281 pub fn sqlite_compatible() -> Self {
282 Self {
283 group_commit: false, // SQLite default is single-commit
284 memtable_size_limit: 64 * 1024 * 1024,
285 wal_enabled: true,
286 sync_mode: SyncMode::Normal, // PRAGMA synchronous = NORMAL
287 read_only: false,
288 enable_ordered_index: true,
289 group_commit_config: GroupCommitSettings::default(),
290 default_index_policy: IndexPolicy::Balanced, // Good default for mixed workloads
291 }
292 }
293
294 /// Get effective ordered index setting, derived from `default_index_policy`.
295 ///
296 /// This is the shim method for the deprecated `enable_ordered_index` field.
297 /// It returns `true` if the policy requires an ordered index (ScanOptimized),
298 /// and `false` otherwise (WriteOptimized, Balanced, AppendOnly).
299 ///
300 /// # Policy Mapping
301 ///
302 /// | IndexPolicy | Returns |
303 /// |------------------|---------|
304 /// | ScanOptimized | true |
305 /// | Balanced | false |
306 /// | WriteOptimized | false |
307 /// | AppendOnly | false |
308 ///
309 /// Note: `Balanced` uses lazy compaction rather than a live ordered index,
310 /// so it returns `false` for the low-level memtable config but still supports
311 /// efficient range scans via sorted runs.
312 pub fn effective_ordered_index(&self) -> bool {
313 matches!(self.default_index_policy, IndexPolicy::ScanOptimized)
314 }
315}
316
317/// WAL sync mode - matches SQLite's PRAGMA synchronous semantics
318///
319/// | SochDB | SQLite | Description |
320/// |------------|--------------|------------------------------------------------|
321/// | Off | OFF (0) | No fsync, risk of data loss on crash |
322/// | Normal | NORMAL (1) | Fsync at checkpoints, not every commit |
323/// | Full | FULL (2) | Fsync every commit (safest, slowest) |
324///
325/// # Performance vs Durability Trade-offs
326///
327/// - **Off**: ~10x faster than Full, but may lose last ~100ms of data on crash
328/// - **Normal**: ~5x faster than Full, durable at checkpoint boundaries
329/// - **Full**: Every commit is fsync'd, no data loss possible
330///
331/// # SQLite Compatibility
332///
333/// ```sql
334/// -- SQLite equivalent settings
335/// PRAGMA synchronous = OFF; -- SyncMode::Off
336/// PRAGMA synchronous = NORMAL; -- SyncMode::Normal
337/// PRAGMA synchronous = FULL; -- SyncMode::Full
338/// ```
339#[derive(Debug, Clone, Copy, PartialEq, Eq)]
340pub enum SyncMode {
341 /// No fsync (equivalent to SQLite PRAGMA synchronous = OFF)
342 ///
343 /// Writes buffered in OS, may lose data on power failure.
344 /// Use for non-critical data or bulk loading.
345 Off = 0,
346
347 /// Fsync at checkpoints (equivalent to SQLite PRAGMA synchronous = NORMAL)
348 ///
349 /// Default mode. Syncs WAL at checkpoint boundaries.
350 /// Good balance of performance and durability.
351 Normal = 1,
352
353 /// Fsync every commit (equivalent to SQLite PRAGMA synchronous = FULL)
354 ///
355 /// Safest mode. Every commit is immediately durable.
356 /// Required for financial/critical data.
357 Full = 2,
358}
359
360impl SyncMode {
361 /// Convert from SQLite synchronous pragma value
362 pub fn from_sqlite_pragma(value: u32) -> Self {
363 match value {
364 0 => SyncMode::Off,
365 1 => SyncMode::Normal,
366 _ => SyncMode::Full, // 2+ treated as Full
367 }
368 }
369
370 /// Convert to SQLite synchronous pragma value
371 pub fn to_sqlite_pragma(self) -> u32 {
372 self as u32
373 }
374
375 /// Parse from string (case-insensitive)
376 pub fn parse(s: &str) -> Option<Self> {
377 match s.to_ascii_uppercase().as_str() {
378 "OFF" | "0" => Some(SyncMode::Off),
379 "NORMAL" | "1" => Some(SyncMode::Normal),
380 "FULL" | "2" => Some(SyncMode::Full),
381 _ => None,
382 }
383 }
384}
385
386/// Table schema for the kernel
387#[derive(Debug, Clone)]
388pub struct TableSchema {
389 pub name: String,
390 pub columns: Vec<ColumnDef>,
391}
392
393/// Column definition
394#[derive(Debug, Clone)]
395pub struct ColumnDef {
396 pub name: String,
397 pub col_type: ColumnType,
398 pub nullable: bool,
399}
400
401/// Column types
402#[derive(Debug, Clone, Copy, PartialEq, Eq)]
403pub enum ColumnType {
404 Int64,
405 UInt64,
406 Float64,
407 Text,
408 Binary,
409 Bool,
410}
411
412/// Transaction handle for kernel operations
413#[derive(Debug, Clone, Copy)]
414pub struct TxnHandle {
415 pub txn_id: u64,
416 pub snapshot_ts: u64,
417}
418
419/// Query result from the kernel
420#[derive(Debug, Clone)]
421pub struct QueryResult {
422 /// Column names
423 pub columns: Vec<String>,
424 /// Row data (each row is a map of column -> value)
425 pub rows: Vec<HashMap<String, SochValue>>,
426 /// Number of rows scanned (for stats)
427 pub rows_scanned: usize,
428 /// Bytes read from storage
429 pub bytes_read: usize,
430}
431
432impl QueryResult {
433 /// Empty result
434 pub fn empty() -> Self {
435 Self {
436 columns: vec![],
437 rows: vec![],
438 rows_scanned: 0,
439 bytes_read: 0,
440 }
441 }
442
443 /// Convert to TOON format for token efficiency
444 pub fn to_toon(&self) -> String {
445 if self.rows.is_empty() {
446 return "[]".to_string();
447 }
448
449 // TOON format: table[N]{cols}: row1; row2; ...
450 let n = self.rows.len();
451 let cols = self.columns.join(",");
452
453 let rows_str: Vec<String> = self
454 .rows
455 .iter()
456 .map(|row| {
457 self.columns
458 .iter()
459 .map(|c| {
460 row.get(c)
461 .map(format_soch_value)
462 .unwrap_or_else(|| "∅".to_string())
463 })
464 .collect::<Vec<_>>()
465 .join(",")
466 })
467 .collect();
468
469 format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
470 }
471}
472
473fn format_soch_value(v: &SochValue) -> String {
474 match v {
475 SochValue::Null => "∅".to_string(),
476 SochValue::Int(i) => i.to_string(),
477 SochValue::UInt(u) => u.to_string(),
478 SochValue::Float(f) => format!("{:.6}", f),
479 SochValue::Text(s) => {
480 if s.contains(',') || s.contains(';') {
481 format!("\"{}\"", s.replace('"', "\\\""))
482 } else {
483 s.clone()
484 }
485 }
486 SochValue::Bool(b) => if *b { "T" } else { "F" }.to_string(),
487 SochValue::Binary(b) => format!("b64:{}", base64_encode(b)),
488 _ => format!("{:?}", v),
489 }
490}
491
492fn base64_encode(data: &[u8]) -> String {
493 // Simple base64 encoding
494 const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
495 let mut result = String::new();
496
497 for chunk in data.chunks(3) {
498 let b0 = chunk[0] as usize;
499 let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
500 let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
501
502 result.push(ALPHABET[b0 >> 2] as char);
503 result.push(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
504
505 if chunk.len() > 1 {
506 result.push(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
507 } else {
508 result.push('=');
509 }
510
511 if chunk.len() > 2 {
512 result.push(ALPHABET[b2 & 0x3f] as char);
513 } else {
514 result.push('=');
515 }
516 }
517
518 result
519}
520
521// ============================================================================
522// Columnar Query Results - SIMD-friendly result format
523// ============================================================================
524
525use sochdb_core::TypedColumn as CoreTypedColumn;
526
527/// Columnar query result - SIMD-friendly format for analytics
528///
529/// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, this returns
530/// column-oriented `Vec<TypedColumn>` for efficient vectorized operations.
531///
532/// ## Memory Layout
533///
534/// Row-oriented (standard):
535/// ```text
536/// Row 0: [id=1, name="Alice", score=85]
537/// Row 1: [id=2, name="Bob", score=92]
538/// Row 2: [id=3, name="Carol", score=78]
539/// ```
540///
541/// Column-oriented (this format):
542/// ```text
543/// id: [1, 2, 3] ← contiguous i64 array (SIMD-friendly)
544/// name: ["Alice", "Bob", "Carol"] ← Arrow-style string encoding
545/// score: [85, 92, 78] ← contiguous i64 array
546/// ```
547///
548/// ## Performance Benefits
549///
550/// - SIMD: Column sums use vectorized instructions (~8× faster)
551/// - Cache: Sequential access pattern maximizes L1/L2 cache hits
552/// - Compression: Same-type data compresses better (5-10× typical)
553/// - Filtering: Bitmap operations instead of row iteration
554///
555/// ## Usage
556///
557/// ```ignore
558/// let result = db.query(txn, "users")
559/// .columns(&["id", "score"])
560/// .as_columnar()?;
561///
562/// // SIMD sum
563/// let total_score = result.column("score")
564/// .map(|c| c.sum_i64())
565/// .unwrap_or(0);
566///
567/// // Stats
568/// println!("Rows: {}, Memory: {} bytes", result.row_count(), result.memory_size());
569/// ```
570#[derive(Debug, Clone)]
571pub struct ColumnarQueryResult {
572 /// Column names in order
573 pub columns: Vec<String>,
574 /// Column data - each TypedColumn contains all values for one column
575 pub data: Vec<CoreTypedColumn>,
576 /// Number of rows
577 pub row_count: usize,
578 /// Bytes read from storage
579 pub bytes_read: usize,
580}
581
582impl ColumnarQueryResult {
583 /// Create an empty result
584 pub fn empty() -> Self {
585 Self {
586 columns: vec![],
587 data: vec![],
588 row_count: 0,
589 bytes_read: 0,
590 }
591 }
592
593 /// Get column by name
594 pub fn column(&self, name: &str) -> Option<&CoreTypedColumn> {
595 self.columns
596 .iter()
597 .position(|c| c == name)
598 .and_then(|idx| self.data.get(idx))
599 }
600
601 /// Get column index by name
602 pub fn column_index(&self, name: &str) -> Option<usize> {
603 self.columns.iter().position(|c| c == name)
604 }
605
606 /// Number of rows
607 pub fn row_count(&self) -> usize {
608 self.row_count
609 }
610
611 /// Number of columns
612 pub fn column_count(&self) -> usize {
613 self.columns.len()
614 }
615
616 /// Total memory size in bytes
617 pub fn memory_size(&self) -> usize {
618 self.data.iter().map(|c| c.memory_size()).sum()
619 }
620
621 /// Sum of an i64 column (SIMD-optimized)
622 pub fn sum_i64(&self, column: &str) -> Option<i64> {
623 self.column(column).map(|c| c.sum_i64())
624 }
625
626 /// Sum of an f64 column (SIMD-optimized)
627 pub fn sum_f64(&self, column: &str) -> Option<f64> {
628 self.column(column).map(|c| c.sum_f64())
629 }
630
631 /// Get column statistics (min, max, null count)
632 pub fn column_stats(&self, column: &str) -> Option<&sochdb_core::columnar::ColumnStats> {
633 self.column(column).map(|c| c.stats())
634 }
635
636 /// Convert to TOON format (token-efficient)
637 pub fn to_toon(&self) -> String {
638 if self.row_count == 0 {
639 return "[]".to_string();
640 }
641
642 let n = self.row_count;
643 let cols = self.columns.join(",");
644
645 // Build rows from columns
646 let mut rows_str = Vec::with_capacity(n);
647 for i in 0..n {
648 let row: Vec<String> = self
649 .data
650 .iter()
651 .map(|col| format_columnar_value(col, i))
652 .collect();
653 rows_str.push(row.join(","));
654 }
655
656 format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
657 }
658}
659
660/// Format a single value from a TypedColumn at index
661fn format_columnar_value(col: &CoreTypedColumn, idx: usize) -> String {
662 match col {
663 CoreTypedColumn::Int64 {
664 values, validity, ..
665 } => {
666 if validity.is_valid(idx) && idx < values.len() {
667 values[idx].to_string()
668 } else {
669 "∅".to_string()
670 }
671 }
672 CoreTypedColumn::UInt64 {
673 values, validity, ..
674 } => {
675 if validity.is_valid(idx) && idx < values.len() {
676 values[idx].to_string()
677 } else {
678 "∅".to_string()
679 }
680 }
681 CoreTypedColumn::Float64 {
682 values, validity, ..
683 } => {
684 if validity.is_valid(idx) && idx < values.len() {
685 format!("{:.6}", values[idx])
686 } else {
687 "∅".to_string()
688 }
689 }
690 CoreTypedColumn::Text {
691 offsets,
692 data,
693 validity,
694 ..
695 } => {
696 if validity.is_valid(idx) && idx + 1 < offsets.len() {
697 let start = offsets[idx] as usize;
698 let end = offsets[idx + 1] as usize;
699 std::str::from_utf8(&data[start..end])
700 .map(|s| {
701 if s.contains(',') || s.contains(';') {
702 format!("\"{}\"", s.replace('"', "\\\""))
703 } else {
704 s.to_string()
705 }
706 })
707 .unwrap_or_else(|_| "∅".to_string())
708 } else {
709 "∅".to_string()
710 }
711 }
712 CoreTypedColumn::Binary {
713 offsets,
714 data,
715 validity,
716 ..
717 } => {
718 if validity.is_valid(idx) && idx + 1 < offsets.len() {
719 let start = offsets[idx] as usize;
720 let end = offsets[idx + 1] as usize;
721 format!("b64:{}", base64_encode(&data[start..end]))
722 } else {
723 "∅".to_string()
724 }
725 }
726 CoreTypedColumn::Bool {
727 values,
728 validity,
729 len,
730 ..
731 } => {
732 if validity.is_valid(idx) && idx < *len {
733 let word = idx / 64;
734 let bit = idx % 64;
735 if (values[word] >> bit) & 1 == 1 {
736 "T"
737 } else {
738 "F"
739 }
740 .to_string()
741 } else {
742 "∅".to_string()
743 }
744 }
745 }
746}
747
748/// Vector search result
749#[derive(Debug, Clone)]
750pub struct VectorSearchResult {
751 pub id: u64,
752 pub distance: f32,
753 pub metadata: Option<HashMap<String, SochValue>>,
754}
755
756/// The SochDB Database Kernel
757///
758/// This is the shared core used by both embedded (`SochConnection`) and
759/// server (`sochdb-server`) modes. It owns all storage, catalog, and
760/// indexing components.
761///
762/// # Thread Safety
763///
764/// The Database is fully thread-safe via internal synchronization:
765/// - Multiple readers can operate concurrently (MVCC snapshots)
766/// - Writers coordinate through WAL and group commit
767/// - All state is behind Arc/RwLock for shared access
768///
769/// # Example
770///
771/// ```ignore
772/// // Open a database (SQLite-style)
773/// let db = Database::open("./my_data")?;
774///
775/// // Begin a transaction
776/// let txn = db.begin_transaction()?;
777///
778/// // Write data
779/// db.put(txn, b"user:1:name", b"Alice")?;
780///
781/// // Commit
782/// db.commit(txn)?;
783/// ```
784#[allow(dead_code)]
785pub struct Database {
786 /// Path to database directory
787 path: PathBuf,
788 /// Durable storage layer (WAL + MVCC + memtable)
789 storage: Arc<DurableStorage>,
790 /// Schema catalog
791 catalog: Arc<RwLock<Catalog>>,
792 /// Registered table schemas (name -> schema) - lock-free for reads
793 tables: DashMap<String, TableSchema>,
794 /// Cached packed schemas for fast insert (name -> packed schema)
795 packed_schemas: DashMap<String, PackedTableSchema>,
796 /// Per-table index policy registry
797 index_registry: Arc<TableIndexRegistry>,
798 /// Configuration
799 config: DatabaseConfig,
800 /// Statistics
801 stats: DatabaseStats,
802 /// Shutdown flag
803 shutdown: AtomicU64,
804}
805
806/// Database statistics
807struct DatabaseStats {
808 transactions_started: AtomicU64,
809 transactions_committed: AtomicU64,
810 transactions_aborted: AtomicU64,
811 queries_executed: AtomicU64,
812 bytes_written: AtomicU64,
813 bytes_read: AtomicU64,
814}
815
816impl DatabaseStats {
817 fn new() -> Self {
818 Self {
819 transactions_started: AtomicU64::new(0),
820 transactions_committed: AtomicU64::new(0),
821 transactions_aborted: AtomicU64::new(0),
822 queries_executed: AtomicU64::new(0),
823 bytes_written: AtomicU64::new(0),
824 bytes_read: AtomicU64::new(0),
825 }
826 }
827}
828
829/// Public statistics snapshot
830#[derive(Debug, Clone)]
831pub struct Stats {
832 pub transactions_started: u64,
833 pub transactions_committed: u64,
834 pub transactions_aborted: u64,
835 pub queries_executed: u64,
836 pub bytes_written: u64,
837 pub bytes_read: u64,
838}
839
840impl Database {
841 /// Open or create a database at the given path.
842 ///
843 /// This is the primary entry point, similar to `sqlite3_open()`.
844 /// If the database exists, it will be opened and WAL recovery performed.
845 /// If it doesn't exist, a new database will be created.
846 ///
847 /// # Arguments
848 ///
849 /// * `path` - Directory path for the database files
850 ///
851 /// # Returns
852 ///
853 /// An `Arc<Database>` that can be shared across threads and connections.
854 pub fn open<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
855 Self::open_with_config(path, DatabaseConfig::default())
856 }
857
858 /// Open without locking (for testing crash recovery scenarios)
859 ///
860 /// # Safety
861 /// This should ONLY be used in tests that simulate crashes by forgetting
862 /// the storage instance. In production, always use `open()`.
863 #[cfg(test)]
864 pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
865 let path = path.as_ref().to_path_buf();
866 let config = DatabaseConfig::default();
867
868 let storage = Arc::new(DurableStorage::open_without_lock(&path)?);
869
870 let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
871 config.default_index_policy,
872 ));
873
874 let db = Arc::new(Self {
875 path: path.clone(),
876 storage,
877 catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
878 tables: DashMap::new(),
879 packed_schemas: DashMap::new(),
880 index_registry,
881 config,
882 stats: DatabaseStats::new(),
883 shutdown: AtomicU64::new(0),
884 });
885
886 db.recover()?;
887 Ok(db)
888 }
889
890 /// Open with custom configuration
891 pub fn open_with_config<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> Result<Arc<Self>> {
892 let path = path.as_ref().to_path_buf();
893
894 // Use IndexPolicy-based storage configuration for automatic memtable selection
895 // This derives ordered index and memtable type from the policy
896 let storage = Arc::new(DurableStorage::open_with_policy(
897 &path,
898 config.default_index_policy,
899 config.group_commit,
900 )?);
901
902 // Create index registry with default policy from config
903 let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
904 config.default_index_policy,
905 ));
906
907 let db = Arc::new(Self {
908 path: path.clone(),
909 storage,
910 catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
911 tables: DashMap::new(),
912 packed_schemas: DashMap::new(),
913 index_registry,
914 config,
915 stats: DatabaseStats::new(),
916 shutdown: AtomicU64::new(0),
917 });
918
919 // Perform crash recovery if needed
920 db.recover()?;
921
922 Ok(db)
923 }
924
925 /// Perform crash recovery
926 fn recover(&self) -> Result<RecoveryStats> {
927 self.storage.recover()
928 }
929
930 /// Get database path
931 pub fn path(&self) -> &Path {
932 &self.path
933 }
934
935 // =========================================================================
936 // Transaction API
937 // =========================================================================
938
939 /// Begin a new transaction
940 pub fn begin_transaction(&self) -> Result<TxnHandle> {
941 self.stats
942 .transactions_started
943 .fetch_add(1, Ordering::Relaxed);
944 let txn_id = self.storage.begin_transaction()?;
945
946 // Get snapshot timestamp from MVCC
947 // For now, use txn_id as a proxy (the real snapshot_ts is managed internally)
948 Ok(TxnHandle {
949 txn_id,
950 snapshot_ts: txn_id,
951 })
952 }
953
954 /// Begin a read-only transaction (optimized: no SSI tracking)
955 ///
956 /// Read-only transactions skip SSI read tracking, reducing overhead
957 /// from ~82ns to ~32ns per read (2.6x faster).
958 ///
959 /// Use this for:
960 /// - SELECT queries that don't modify data
961 /// - Analytics and reporting queries
962 /// - Snapshot reads for backup
963 pub fn begin_read_only(&self) -> Result<TxnHandle> {
964 self.stats
965 .transactions_started
966 .fetch_add(1, Ordering::Relaxed);
967 let txn_id = self.storage.begin_with_mode(TransactionMode::ReadOnly)?;
968 Ok(TxnHandle {
969 txn_id,
970 snapshot_ts: txn_id,
971 })
972 }
973
974 /// Begin a write-only transaction (optimized: no read tracking)
975 ///
976 /// Write-only transactions skip read tracking, improving insert
977 /// throughput for bulk loading scenarios.
978 ///
979 /// Use this for:
980 /// - Bulk data imports
981 /// - Append-only logging tables
982 /// - ETL pipelines
983 pub fn begin_write_only(&self) -> Result<TxnHandle> {
984 self.stats
985 .transactions_started
986 .fetch_add(1, Ordering::Relaxed);
987 let txn_id = self.storage.begin_with_mode(TransactionMode::WriteOnly)?;
988 Ok(TxnHandle {
989 txn_id,
990 snapshot_ts: txn_id,
991 })
992 }
993
994 /// Commit a transaction
995 pub fn commit(&self, txn: TxnHandle) -> Result<u64> {
996 self.stats
997 .transactions_committed
998 .fetch_add(1, Ordering::Relaxed);
999 self.storage.commit(txn.txn_id)
1000 }
1001
1002 /// Abort a transaction
1003 pub fn abort(&self, txn: TxnHandle) -> Result<()> {
1004 self.stats
1005 .transactions_aborted
1006 .fetch_add(1, Ordering::Relaxed);
1007 self.storage.abort(txn.txn_id)
1008 }
1009
1010 // =========================================================================
1011 // Per-Table Index Policy API
1012 // =========================================================================
1013
1014 /// Configure index policy for a table
1015 ///
1016 /// This allows fine-grained control over write/scan trade-offs per table:
1017 ///
1018 /// | Policy | Insert Cost | Scan Cost | Use Case |
1019 /// |----------------|-------------|----------------|------------------------|
1020 /// | WriteOptimized | O(1) | O(N) | High-write, rare scan |
1021 /// | Balanced | O(1) amort | O(output+logK) | Mixed OLTP |
1022 /// | ScanOptimized | O(log N) | O(logN + K) | Analytics, range query |
1023 /// | AppendOnly | O(1) | O(N) | Time-series logs |
1024 ///
1025 /// # Example
1026 ///
1027 /// ```ignore
1028 /// // Fast inserts for logs table (no ordered index overhead)
1029 /// db.set_table_index_policy("logs", IndexPolicy::WriteOptimized);
1030 ///
1031 /// // Efficient range scans for analytics table
1032 /// db.set_table_index_policy("analytics", IndexPolicy::ScanOptimized);
1033 ///
1034 /// // Balanced for OLTP tables
1035 /// db.set_table_index_policy("users", IndexPolicy::Balanced);
1036 /// ```
1037 pub fn set_table_index_policy(&self, table: &str, policy: IndexPolicy) {
1038 self.index_registry.configure_table(
1039 TableIndexConfig::new(table, policy)
1040 );
1041 }
1042
1043 /// Get the index policy for a table
1044 pub fn get_table_index_policy(&self, table: &str) -> IndexPolicy {
1045 self.index_registry.get_policy(table)
1046 }
1047
1048 /// Get the index registry for advanced configuration
1049 pub fn index_registry(&self) -> &Arc<TableIndexRegistry> {
1050 &self.index_registry
1051 }
1052
1053 // =========================================================================
1054 // Key-Value API (Low-level)
1055 // =========================================================================
1056
1057 /// Put a key-value pair
1058 pub fn put(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1059 self.stats
1060 .bytes_written
1061 .fetch_add((key.len() + value.len()) as u64, Ordering::Relaxed);
1062 // Use write_refs to avoid unnecessary allocations
1063 self.storage.write_refs(txn.txn_id, key, value)
1064 }
1065
1066 /// Batch put multiple key-value pairs with reduced overhead
1067 ///
1068 /// This amortizes per-operation costs over the entire batch:
1069 /// - Single DashMap lookup
1070 /// - Batch MVCC tracking
1071 /// - Batch memtable writes
1072 ///
1073 /// For 100+ entries, this is 2-3x faster than individual puts.
1074 ///
1075 /// # Example
1076 ///
1077 /// ```ignore
1078 /// let writes: Vec<(&[u8], &[u8])> = vec![
1079 /// (b"key1", b"value1"),
1080 /// (b"key2", b"value2"),
1081 /// (b"key3", b"value3"),
1082 /// ];
1083 /// db.put_batch(txn, &writes)?;
1084 /// ```
1085 pub fn put_batch(&self, txn: TxnHandle, writes: &[(&[u8], &[u8])]) -> Result<()> {
1086 let bytes: u64 = writes
1087 .iter()
1088 .map(|(k, v)| (k.len() + v.len()) as u64)
1089 .sum();
1090 self.stats.bytes_written.fetch_add(bytes, Ordering::Relaxed);
1091 self.storage.write_batch_refs(txn.txn_id, writes)
1092 }
1093
1094 /// Get a value by key
1095 pub fn get(&self, txn: TxnHandle, key: &[u8]) -> Result<Option<Vec<u8>>> {
1096 let result = self.storage.read(txn.txn_id, key)?;
1097 if let Some(ref data) = result {
1098 self.stats
1099 .bytes_read
1100 .fetch_add(data.len() as u64, Ordering::Relaxed);
1101 }
1102 Ok(result)
1103 }
1104
1105 /// Delete a key
1106 pub fn delete(&self, txn: TxnHandle, key: &[u8]) -> Result<()> {
1107 self.storage.delete(txn.txn_id, key.to_vec())
1108 }
1109
1110 /// Minimum prefix length for scan operations.
1111 /// Prevents expensive full-table scans by requiring a meaningful prefix.
1112 pub const MIN_SCAN_PREFIX_LEN: usize = 2;
1113
1114 /// Scan keys with a prefix (enforces minimum prefix length for safety).
1115 ///
1116 /// # Prefix Safety
1117 ///
1118 /// To prevent accidental full-table scans, this method requires a minimum
1119 /// prefix length of 2 bytes. Use `scan_unchecked` for internal operations
1120 /// that need empty/short prefixes.
1121 ///
1122 /// # Errors
1123 ///
1124 /// Returns `SochDBError::InvalidInput` if prefix is too short.
1125 pub fn scan(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1126 if prefix.len() < Self::MIN_SCAN_PREFIX_LEN {
1127 return Err(SochDBError::InvalidArgument(format!(
1128 "Prefix too short: {} bytes (minimum {} required). \
1129 Use scan_unchecked() for unrestricted scans.",
1130 prefix.len(),
1131 Self::MIN_SCAN_PREFIX_LEN
1132 )));
1133 }
1134 self.scan_unchecked(txn, prefix)
1135 }
1136
1137 /// Scan keys with a prefix without length validation.
1138 ///
1139 /// # Warning
1140 ///
1141 /// This method allows empty/short prefixes which can cause expensive
1142 /// full-table scans. Use `scan()` unless you specifically need unrestricted
1143 /// prefix access for internal operations.
1144 pub fn scan_unchecked(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1145 let results = self.storage.scan(txn.txn_id, prefix)?;
1146 let bytes: u64 = results
1147 .iter()
1148 .map(|(k, v)| (k.len() + v.len()) as u64)
1149 .sum();
1150 self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1151 Ok(results)
1152 }
1153
1154 /// Scan keys in range
1155 pub fn scan_range(
1156 &self,
1157 txn: TxnHandle,
1158 start: &[u8],
1159 end: &[u8],
1160 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1161 let results = self.storage.scan_range(txn.txn_id, start, end)?;
1162 let bytes: u64 = results
1163 .iter()
1164 .map(|(k, v)| (k.len() + v.len()) as u64)
1165 .sum();
1166 self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1167 Ok(results)
1168 }
1169
1170 /// Streaming scan for very large result sets
1171 ///
1172 /// Returns an iterator that yields (key, value) pairs without
1173 /// materializing the entire result set. Use this for large scans
1174 /// where memory efficiency is important.
1175 ///
1176 /// ## Performance
1177 ///
1178 /// - Memory: O(1) per iteration vs O(N) for scan_range
1179 /// - Latency: First result available immediately vs waiting for all results
1180 /// - Throughput: Slightly lower due to per-item overhead
1181 ///
1182 /// ## Usage
1183 ///
1184 /// ```ignore
1185 /// for result in db.scan_range_iter(txn, b"start", b"end") {
1186 /// let (key, value) = result?;
1187 /// // Process immediately - no need to wait for all results
1188 /// }
1189 /// ```
1190 pub fn scan_range_iter<'a>(
1191 &'a self,
1192 txn: TxnHandle,
1193 start: &'a [u8],
1194 end: &'a [u8],
1195 ) -> impl Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a {
1196 let stats = &self.stats;
1197 self.storage
1198 .scan_range_iter(txn.txn_id, start, end)
1199 .map(move |item| {
1200 stats.bytes_read.fetch_add(
1201 (item.0.len() + item.1.len()) as u64,
1202 Ordering::Relaxed,
1203 );
1204 Ok(item)
1205 })
1206 }
1207
1208 /// Flush memtable to WAL/Disk
1209 pub fn flush(&self) -> Result<()> {
1210 self.storage.fsync()
1211 }
1212
1213 // =========================================================================
1214 // Path-Native API (SochDB's differentiator)
1215 // =========================================================================
1216
1217 /// Get storage statistics
1218 pub fn storage_stats(&self) -> crate::durable_storage::StorageStats {
1219 self.storage.stats()
1220 }
1221
1222 /// Put a value at a path
1223 ///
1224 /// Path format: "collection/doc_id/field" or "table.row_id.column"
1225 /// Resolution is O(|path|), not O(log N) like B-tree.
1226 pub fn put_path(&self, txn: TxnHandle, path: &str, value: &[u8]) -> Result<()> {
1227 self.put(txn, path.as_bytes(), value)
1228 }
1229
1230 /// Get a value at a path
1231 pub fn get_path(&self, txn: TxnHandle, path: &str) -> Result<Option<Vec<u8>>> {
1232 self.get(txn, path.as_bytes())
1233 }
1234
1235 /// Delete at a path
1236 pub fn delete_path(&self, txn: TxnHandle, path: &str) -> Result<()> {
1237 self.delete(txn, path.as_bytes())
1238 }
1239
1240 /// Scan a path prefix
1241 ///
1242 /// Returns all key-value pairs where key starts with prefix.
1243 /// Useful for: "users/123/" -> all fields of user 123
1244 pub fn scan_path(&self, txn: TxnHandle, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
1245 self.stats.queries_executed.fetch_add(1, Ordering::Relaxed);
1246
1247 let results = self.scan(txn, prefix.as_bytes())?;
1248
1249 Ok(results
1250 .into_iter()
1251 .filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
1252 .collect())
1253 }
1254
1255 // =========================================================================
1256 // Query API
1257 // =========================================================================
1258
1259 /// Execute a path query and return results
1260 ///
1261 /// This is the main query interface for LLM context retrieval.
1262 /// Supports:
1263 /// - Path prefix matching
1264 /// - Column projection (for I/O reduction)
1265 /// - Limit/offset
1266 pub fn query(&self, txn: TxnHandle, path_prefix: &str) -> QueryBuilder<'_> {
1267 QueryBuilder::new(self, txn, path_prefix.to_string())
1268 }
1269
1270 // =========================================================================
1271 // Table API (Higher-level abstraction)
1272 // =========================================================================
1273
1274 /// Register a table schema
1275 pub fn register_table(&self, schema: TableSchema) -> Result<()> {
1276 if self.tables.contains_key(&schema.name) {
1277 return Err(SochDBError::InvalidArgument(format!(
1278 "Table '{}' already exists",
1279 schema.name
1280 )));
1281 }
1282 // Cache the packed schema for fast inserts
1283 let packed_schema = Self::to_packed_schema(&schema);
1284 self.packed_schemas
1285 .insert(schema.name.clone(), packed_schema);
1286 self.tables.insert(schema.name.clone(), schema);
1287 Ok(())
1288 }
1289
1290 /// Get table schema
1291 pub fn get_table_schema(&self, name: &str) -> Option<TableSchema> {
1292 self.tables.get(name).map(|s| s.clone())
1293 }
1294
1295 /// List all tables
1296 pub fn list_tables(&self) -> Vec<String> {
1297 self.tables.iter().map(|e| e.key().clone()).collect()
1298 }
1299 /// Convert TableSchema to PackedTableSchema for efficient storage
1300 fn to_packed_schema(schema: &TableSchema) -> PackedTableSchema {
1301 let columns = schema
1302 .columns
1303 .iter()
1304 .map(|col| PackedColumnDef {
1305 name: col.name.clone(),
1306 col_type: match col.col_type {
1307 ColumnType::Int64 => PackedColumnType::Int64,
1308 ColumnType::UInt64 => PackedColumnType::UInt64,
1309 ColumnType::Float64 => PackedColumnType::Float64,
1310 ColumnType::Text => PackedColumnType::Text,
1311 ColumnType::Binary => PackedColumnType::Binary,
1312 ColumnType::Bool => PackedColumnType::Bool,
1313 },
1314 nullable: col.nullable,
1315 })
1316 .collect();
1317
1318 PackedTableSchema::new(&schema.name, columns)
1319 }
1320
1321 /// Insert a row into a table
1322 ///
1323 /// Uses packed row format: stores entire row as single key-value pair.
1324 /// This reduces write amplification from 4× to 1× for a 4-column table.
1325 ///
1326 /// # Performance
1327 /// - Before: 4 columns × (WAL entry + MVCC version) = 4 writes
1328 /// - After: 1 packed row = 1 write
1329 /// - Improvement: ~4× fewer WAL entries, ~48% less I/O overhead
1330 pub fn insert_row(
1331 &self,
1332 txn: TxnHandle,
1333 table: &str,
1334 row_id: u64,
1335 values: &HashMap<String, SochValue>,
1336 ) -> Result<()> {
1337 // Use cached packed schema - single DashMap lookup, no clone
1338 let packed_schema = self
1339 .packed_schemas
1340 .get(table)
1341 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1342
1343 // Pack the row using cached schema
1344 let packed_row = PackedRow::pack(&packed_schema, values);
1345
1346 // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1347 let key = KeyBuffer::format_row_key(table, row_id);
1348
1349 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1350
1351 Ok(())
1352 }
1353
1354 /// Read a row from a table
1355 ///
1356 /// Reads packed row and extracts requested columns in O(k) time.
1357 /// Column projection happens in memory, not storage - all columns are fetched.
1358 pub fn read_row(
1359 &self,
1360 txn: TxnHandle,
1361 table: &str,
1362 row_id: u64,
1363 columns: Option<&[&str]>,
1364 ) -> Result<Option<HashMap<String, SochValue>>> {
1365 let schema = self
1366 .tables
1367 .get(table)
1368 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1369
1370 // Read the packed row with a single key lookup using KeyBuffer
1371 let key = KeyBuffer::format_row_key(table, row_id);
1372 let bytes = match self.get(txn, key.as_bytes())? {
1373 Some(b) => b,
1374 None => return Ok(None),
1375 };
1376
1377 // Use cached packed schema
1378 let packed_schema = self
1379 .packed_schemas
1380 .get(table)
1381 .ok_or_else(|| SochDBError::Internal("Packed schema not found".into()))?;
1382 let packed_row = PackedRow::from_bytes(bytes, packed_schema.num_columns())?;
1383
1384 // Determine which columns to return
1385 let cols_to_read: Vec<&str> = match columns {
1386 Some(c) => c.to_vec(),
1387 None => schema.columns.iter().map(|c| c.name.as_str()).collect(),
1388 };
1389
1390 let mut row = HashMap::new();
1391 for col_name in cols_to_read {
1392 if let Some(idx) = packed_schema.column_index(col_name)
1393 && let Some(col_def) = packed_schema.column(idx)
1394 && let Some(value) = packed_row.get_column(idx, col_def.col_type)
1395 {
1396 row.insert(col_name.to_string(), value);
1397 }
1398 }
1399
1400 Ok(Some(row))
1401 }
1402
1403 /// Insert multiple rows efficiently in a batch
1404 ///
1405 /// This method accumulates all rows and writes them with fewer WAL syncs.
1406 /// Ideal for bulk loading scenarios.
1407 ///
1408 /// # Performance
1409 /// - Uses group commit to batch fsync operations
1410 /// - Expected throughput: 500K-1M rows/sec depending on row size
1411 pub fn insert_rows_batch(
1412 &self,
1413 txn: TxnHandle,
1414 table: &str,
1415 rows: &[(u64, HashMap<String, SochValue>)],
1416 ) -> Result<usize> {
1417 // Use cached packed schema
1418 let packed_schema = self
1419 .packed_schemas
1420 .get(table)
1421 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1422
1423 let mut count = 0;
1424
1425 for (row_id, values) in rows {
1426 // Pack and write using KeyBuffer for efficient key construction
1427 let packed_row = PackedRow::pack(&packed_schema, values);
1428 let key = KeyBuffer::format_row_key(table, *row_id);
1429 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1430 count += 1;
1431 }
1432
1433 Ok(count)
1434 }
1435
1436 /// Ultra-fast raw put - bypasses all validation
1437 ///
1438 /// Use when you've already validated the data and just need speed.
1439 /// This is ~10× faster than insert_row() for bulk inserts.
1440 #[inline]
1441 pub fn put_raw(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1442 self.storage.write_refs(txn.txn_id, key, value)
1443 }
1444
1445 /// Zero-allocation insert - fastest path for bulk inserts
1446 ///
1447 /// Takes values as a slice in schema column order, avoiding HashMap overhead.
1448 ///
1449 /// # Arguments
1450 /// * `txn` - Transaction handle
1451 /// * `table` - Table name
1452 /// * `row_id` - Row identifier
1453 /// * `values` - Values in schema column order (None = NULL)
1454 ///
1455 /// # Performance
1456 /// - Eliminates ~6 allocations per row vs insert_row()
1457 /// - Expected: 1.2M-1.5M inserts/sec
1458 ///
1459 /// # Example
1460 /// ```ignore
1461 /// let values: &[Option<&SochValue>] = &[
1462 /// Some(&SochValue::Int(1)),
1463 /// Some(&SochValue::Text("Alice".into())),
1464 /// None, // NULL
1465 /// ];
1466 /// db.insert_row_slice(txn, "users", 1, values)?;
1467 /// ```
1468 #[inline]
1469 pub fn insert_row_slice(
1470 &self,
1471 txn: TxnHandle,
1472 table: &str,
1473 row_id: u64,
1474 values: &[Option<&SochValue>],
1475 ) -> Result<()> {
1476 // Use cached packed schema - single DashMap lookup
1477 let packed_schema = self
1478 .packed_schemas
1479 .get(table)
1480 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1481
1482 // Validate column count matches
1483 if values.len() != packed_schema.num_columns() {
1484 return Err(SochDBError::InvalidArgument(format!(
1485 "Expected {} columns, got {}",
1486 packed_schema.num_columns(),
1487 values.len()
1488 )));
1489 }
1490
1491 // Pack using zero-allocation path
1492 let packed_row = PackedRow::pack_slice(&packed_schema, values);
1493
1494 // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1495 let key = KeyBuffer::format_row_key(table, row_id);
1496
1497 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1498 Ok(())
1499 }
1500
1501 // =========================================================================
1502 // Maintenance
1503 // =========================================================================
1504
1505 /// Force fsync to disk
1506 pub fn fsync(&self) -> Result<()> {
1507 self.storage.fsync()
1508 }
1509
1510 /// Create a checkpoint
1511 pub fn checkpoint(&self) -> Result<u64> {
1512 self.storage.checkpoint()
1513 }
1514
1515 /// Run garbage collection
1516 pub fn gc(&self) -> usize {
1517 self.storage.gc()
1518 }
1519
1520 /// Get database statistics
1521 pub fn stats(&self) -> Stats {
1522 Stats {
1523 transactions_started: self.stats.transactions_started.load(Ordering::Relaxed),
1524 transactions_committed: self.stats.transactions_committed.load(Ordering::Relaxed),
1525 transactions_aborted: self.stats.transactions_aborted.load(Ordering::Relaxed),
1526 queries_executed: self.stats.queries_executed.load(Ordering::Relaxed),
1527 bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
1528 bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
1529 }
1530 }
1531
1532 /// Shutdown the database gracefully
1533 pub fn shutdown(&self) -> Result<()> {
1534 if self.shutdown.swap(1, Ordering::SeqCst) == 1 {
1535 return Ok(()); // Already shutting down
1536 }
1537
1538 // Flush any pending writes
1539 self.fsync()?;
1540
1541 // Create clean shutdown marker
1542 let marker = self.path.join(".clean_shutdown");
1543 std::fs::write(&marker, b"ok")?;
1544
1545 Ok(())
1546 }
1547}
1548
1549impl Drop for Database {
1550 fn drop(&mut self) {
1551 // Try graceful shutdown if not already done
1552 if self.shutdown.load(Ordering::SeqCst) == 0 {
1553 let _ = self.fsync();
1554 let marker = self.path.join(".clean_shutdown");
1555 let _ = std::fs::write(&marker, b"ok");
1556 }
1557 }
1558}
1559
1560/// Query builder for fluent query construction
1561pub struct QueryBuilder<'a> {
1562 db: &'a Database,
1563 txn: TxnHandle,
1564 path_prefix: String,
1565 columns: Option<Vec<String>>,
1566 limit: Option<usize>,
1567 offset: Option<usize>,
1568}
1569
1570impl<'a> QueryBuilder<'a> {
1571 fn new(db: &'a Database, txn: TxnHandle, path_prefix: String) -> Self {
1572 Self {
1573 db,
1574 txn,
1575 path_prefix,
1576 columns: None,
1577 limit: None,
1578 offset: None,
1579 }
1580 }
1581
1582 /// Select specific columns (for I/O reduction)
1583 pub fn columns(mut self, cols: &[&str]) -> Self {
1584 self.columns = Some(cols.iter().map(|s| s.to_string()).collect());
1585 self
1586 }
1587
1588 /// Limit results
1589 pub fn limit(mut self, n: usize) -> Self {
1590 self.limit = Some(n);
1591 self
1592 }
1593
1594 /// Skip results
1595 pub fn offset(mut self, n: usize) -> Self {
1596 self.offset = Some(n);
1597 self
1598 }
1599
1600 /// Execute the query
1601 ///
1602 /// Scans packed rows and unpacks them. Each key is "table/row_id" pointing to a packed row.
1603 pub fn execute(self) -> Result<QueryResult> {
1604 self.db
1605 .stats
1606 .queries_executed
1607 .fetch_add(1, Ordering::Relaxed);
1608
1609 // Get schema for the table if we're querying a table
1610 let table_name = self
1611 .path_prefix
1612 .split('/')
1613 .next()
1614 .unwrap_or(&self.path_prefix);
1615 let schema = self.db.tables.get(table_name).map(|s| s.clone());
1616
1617 // Scan the path prefix
1618 let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1619
1620 let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
1621 let mut bytes_read = 0usize;
1622
1623 if let Some(ref schema) = schema {
1624 // We have a table schema - use cached packed schema
1625 let packed_schema = self
1626 .db
1627 .packed_schemas
1628 .get(table_name)
1629 .map(|ps| ps.clone())
1630 .unwrap_or_else(|| Database::to_packed_schema(schema));
1631
1632 for (path, value_bytes) in results {
1633 // Parse path: table/row_id
1634 let parts: Vec<&str> = path.split('/').collect();
1635 if parts.len() == 2 {
1636 // This is a packed row
1637 bytes_read += value_bytes.len();
1638
1639 if let Ok(packed_row) =
1640 PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
1641 {
1642 // Unpack all columns or just requested columns
1643 let mut row = HashMap::new();
1644
1645 if let Some(ref cols) = self.columns {
1646 // Only extract requested columns
1647 for col_name in cols {
1648 if let Some(idx) = packed_schema.column_index(col_name)
1649 && let Some(col_def) = packed_schema.column(idx)
1650 && let Some(value) =
1651 packed_row.get_column(idx, col_def.col_type)
1652 {
1653 row.insert(col_name.clone(), value);
1654 }
1655 }
1656 } else {
1657 // Extract all columns
1658 row = packed_row.unpack(&packed_schema);
1659 }
1660
1661 if !row.is_empty() {
1662 rows.push(row);
1663 }
1664 }
1665 }
1666 }
1667 } else {
1668 // Fallback: no schema, try legacy column-per-key format
1669 let mut rows_map: HashMap<String, HashMap<String, SochValue>> = HashMap::new();
1670
1671 for (path, value_bytes) in results {
1672 let parts: Vec<&str> = path.split('/').collect();
1673 if parts.len() >= 3 {
1674 let row_key = format!("{}/{}", parts[0], parts[1]);
1675 let col_name = parts[2..].join("/");
1676
1677 if let Some(ref cols) = self.columns
1678 && !cols.contains(&col_name)
1679 {
1680 continue;
1681 }
1682
1683 bytes_read += value_bytes.len();
1684 let row = rows_map.entry(row_key).or_default();
1685 row.insert(col_name, deserialize_value(&value_bytes));
1686 }
1687 }
1688
1689 rows = rows_map.into_values().collect();
1690 }
1691
1692 // Apply offset
1693 if let Some(offset) = self.offset {
1694 rows = rows.into_iter().skip(offset).collect();
1695 }
1696
1697 // Apply limit
1698 if let Some(limit) = self.limit {
1699 rows.truncate(limit);
1700 }
1701
1702 // Collect column names
1703 let columns: Vec<String> = self.columns.unwrap_or_else(|| {
1704 rows.iter()
1705 .flat_map(|r| r.keys().cloned())
1706 .collect::<std::collections::HashSet<_>>()
1707 .into_iter()
1708 .collect()
1709 });
1710
1711 Ok(QueryResult {
1712 columns,
1713 rows_scanned: rows.len(),
1714 bytes_read,
1715 rows,
1716 })
1717 }
1718
1719 /// Execute and return TOON format (for LLM efficiency)
1720 pub fn to_toon(self) -> Result<String> {
1721 let result = self.execute()?;
1722 Ok(result.to_toon())
1723 }
1724
1725 /// Execute with lazy iteration - avoids materializing all rows
1726 ///
1727 /// Returns an iterator over rows as `Vec<SochValue>` in schema column order.
1728 /// This is more memory-efficient than `execute()` for large result sets.
1729 ///
1730 /// # Performance
1731 /// - No upfront materialization of all rows
1732 /// - ~40% less memory for large result sets
1733 /// - Ideal for streaming to network or aggregations
1734 ///
1735 /// # Example
1736 /// ```ignore
1737 /// for row_result in db.query(txn, "users").execute_iter()? {
1738 /// let row = row_result?;
1739 /// // row is Vec<SochValue> in column order
1740 /// }
1741 /// ```
1742 pub fn execute_iter(self) -> Result<QueryRowIterator> {
1743 self.db
1744 .stats
1745 .queries_executed
1746 .fetch_add(1, Ordering::Relaxed);
1747
1748 let table_name = self
1749 .path_prefix
1750 .split('/')
1751 .next()
1752 .unwrap_or(&self.path_prefix)
1753 .to_string();
1754
1755 // Get packed schema (clone needed for iterator ownership)
1756 let packed_schema = self.db.packed_schemas.get(&table_name).map(|ps| ps.clone());
1757
1758 // Scan the path prefix
1759 let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1760
1761 Ok(QueryRowIterator {
1762 results: results.into_iter(),
1763 packed_schema,
1764 columns: self.columns,
1765 offset: self.offset.unwrap_or(0),
1766 limit: self.limit,
1767 yielded: 0,
1768 skipped: 0,
1769 })
1770 }
1771
1772 /// Execute and return columnar (SIMD-friendly) result format
1773 ///
1774 /// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, returns
1775 /// column-oriented `Vec<TypedColumn>` for vectorized operations.
1776 ///
1777 /// ## Performance Benefits
1778 ///
1779 /// - SIMD: Aggregate operations (sum, avg) use vectorized instructions
1780 /// - Cache: Sequential access maximizes L1/L2 hits
1781 /// - Memory: ~30% less overhead than row-based format
1782 /// - Analytics: Ideal for ML preprocessing and statistics
1783 ///
1784 /// ## Example
1785 ///
1786 /// ```ignore
1787 /// let result = db.query(txn, "users")
1788 /// .columns(&["id", "score"])
1789 /// .as_columnar()?;
1790 ///
1791 /// // SIMD-optimized sum
1792 /// let total = result.sum_i64("score").unwrap_or(0);
1793 ///
1794 /// // Direct column access
1795 /// if let Some(scores) = result.column("score") {
1796 /// for i in 0..scores.len() {
1797 /// if let Some(v) = scores.get_i64(i) {
1798 /// println!("Score: {}", v);
1799 /// }
1800 /// }
1801 /// }
1802 /// ```
1803 pub fn as_columnar(self) -> Result<ColumnarQueryResult> {
1804 self.db
1805 .stats
1806 .queries_executed
1807 .fetch_add(1, Ordering::Relaxed);
1808
1809 let table_name = self
1810 .path_prefix
1811 .split('/')
1812 .next()
1813 .unwrap_or(&self.path_prefix);
1814 let schema = self.db.tables.get(table_name).map(|s| s.clone());
1815
1816 // Get packed schema
1817 let packed_schema = match self.db.packed_schemas.get(table_name) {
1818 Some(ps) => ps.clone(),
1819 None => return Ok(ColumnarQueryResult::empty()),
1820 };
1821
1822 // Determine columns to fetch
1823 let column_names: Vec<String> = self.columns.clone().unwrap_or_else(|| {
1824 schema
1825 .as_ref()
1826 .map(|s| s.columns.iter().map(|c| c.name.clone()).collect())
1827 .unwrap_or_default()
1828 });
1829
1830 if column_names.is_empty() {
1831 return Ok(ColumnarQueryResult::empty());
1832 }
1833
1834 // Initialize TypedColumns based on schema types
1835 let mut columns: Vec<CoreTypedColumn> = column_names
1836 .iter()
1837 .map(|col_name| {
1838 packed_schema
1839 .column_index(col_name)
1840 .and_then(|idx| packed_schema.column(idx))
1841 .map(|col_def| match col_def.col_type {
1842 PackedColumnType::Int64 => CoreTypedColumn::new_int64(),
1843 PackedColumnType::UInt64 => CoreTypedColumn::new_uint64(),
1844 PackedColumnType::Float64 => CoreTypedColumn::new_float64(),
1845 PackedColumnType::Text => CoreTypedColumn::new_text(),
1846 PackedColumnType::Binary => CoreTypedColumn::new_binary(),
1847 PackedColumnType::Bool => CoreTypedColumn::new_bool(),
1848 PackedColumnType::Null => CoreTypedColumn::new_text(), // Null column = fallback to text
1849 })
1850 .unwrap_or_else(CoreTypedColumn::new_text) // fallback
1851 })
1852 .collect();
1853
1854 // Scan the path prefix
1855 let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1856
1857 let mut row_count = 0;
1858 let mut bytes_read = 0;
1859 let mut skipped = 0;
1860
1861 for (path, value_bytes) in results {
1862 // Parse path: table/row_id
1863 let parts: Vec<&str> = path.split('/').collect();
1864 if parts.len() != 2 {
1865 continue;
1866 }
1867
1868 // Apply offset
1869 if let Some(offset) = self.offset
1870 && skipped < offset
1871 {
1872 skipped += 1;
1873 continue;
1874 }
1875
1876 // Apply limit
1877 if let Some(limit) = self.limit
1878 && row_count >= limit
1879 {
1880 break;
1881 }
1882
1883 bytes_read += value_bytes.len();
1884
1885 if let Ok(packed_row) = PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
1886 {
1887 // Extract each column and push to corresponding TypedColumn
1888 for (col_idx, col_name) in column_names.iter().enumerate() {
1889 if let Some(schema_idx) = packed_schema.column_index(col_name) {
1890 if let Some(col_def) = packed_schema.column(schema_idx) {
1891 let value = packed_row.get_column(schema_idx, col_def.col_type);
1892 push_value_to_typed_column(&mut columns[col_idx], value);
1893 } else {
1894 push_null_to_typed_column(&mut columns[col_idx]);
1895 }
1896 } else {
1897 push_null_to_typed_column(&mut columns[col_idx]);
1898 }
1899 }
1900 row_count += 1;
1901 }
1902 }
1903
1904 Ok(ColumnarQueryResult {
1905 columns: column_names,
1906 data: columns,
1907 row_count,
1908 bytes_read,
1909 })
1910 }
1911}
1912
1913/// Lazy iterator over query results
1914///
1915/// Unpacks rows on-demand, avoiding upfront materialization.
1916pub struct QueryRowIterator {
1917 results: std::vec::IntoIter<(String, Vec<u8>)>,
1918 packed_schema: Option<PackedTableSchema>,
1919 columns: Option<Vec<String>>,
1920 offset: usize,
1921 limit: Option<usize>,
1922 yielded: usize,
1923 skipped: usize,
1924}
1925
1926impl Iterator for QueryRowIterator {
1927 type Item = Result<Vec<SochValue>>;
1928
1929 fn next(&mut self) -> Option<Self::Item> {
1930 // Check limit
1931 if let Some(limit) = self.limit
1932 && self.yielded >= limit
1933 {
1934 return None;
1935 }
1936
1937 loop {
1938 let (path, value_bytes) = self.results.next()?;
1939
1940 // Parse path: table/row_id
1941 let parts: Vec<&str> = path.split('/').collect();
1942 if parts.len() != 2 {
1943 continue; // Skip non-row entries
1944 }
1945
1946 // Apply offset
1947 if self.skipped < self.offset {
1948 self.skipped += 1;
1949 continue;
1950 }
1951
1952 if let Some(ref schema) = self.packed_schema {
1953 match PackedRow::from_bytes(value_bytes, schema.num_columns()) {
1954 Ok(packed_row) => {
1955 let row = if let Some(ref cols) = self.columns {
1956 // Project specific columns
1957 cols.iter()
1958 .map(|col_name| {
1959 schema
1960 .column_index(col_name)
1961 .and_then(|idx| schema.column(idx))
1962 .and_then(|col_def| {
1963 packed_row.get_column(
1964 schema.column_index(col_name).unwrap(),
1965 col_def.col_type,
1966 )
1967 })
1968 .unwrap_or(SochValue::Null)
1969 })
1970 .collect()
1971 } else {
1972 // All columns in order
1973 packed_row.unpack_to_vec(schema)
1974 };
1975
1976 self.yielded += 1;
1977 return Some(Ok(row));
1978 }
1979 Err(e) => return Some(Err(e)),
1980 }
1981 } else {
1982 // No schema - return raw bytes as binary
1983 self.yielded += 1;
1984 return Some(Ok(vec![SochValue::Binary(value_bytes)]));
1985 }
1986 }
1987 }
1988}
1989
1990// Helper functions for serialization (kept for backward compatibility with legacy data)
1991
1992#[allow(dead_code)]
1993fn serialize_value(value: &SochValue) -> Vec<u8> {
1994 // Simple serialization - in production use proper format
1995 match value {
1996 SochValue::Null => vec![0],
1997 SochValue::Int(i) => {
1998 let mut buf = vec![1];
1999 buf.extend_from_slice(&i.to_le_bytes());
2000 buf
2001 }
2002 SochValue::UInt(u) => {
2003 let mut buf = vec![2];
2004 buf.extend_from_slice(&u.to_le_bytes());
2005 buf
2006 }
2007 SochValue::Float(f) => {
2008 let mut buf = vec![3];
2009 buf.extend_from_slice(&f.to_le_bytes());
2010 buf
2011 }
2012 SochValue::Text(s) => {
2013 let mut buf = vec![4];
2014 buf.extend_from_slice(s.as_bytes());
2015 buf
2016 }
2017 SochValue::Bool(b) => vec![5, if *b { 1 } else { 0 }],
2018 SochValue::Binary(b) => {
2019 let mut buf = vec![6];
2020 buf.extend_from_slice(b);
2021 buf
2022 }
2023 _ => {
2024 // Fallback: serialize as text
2025 let s = format!("{:?}", value);
2026 let mut buf = vec![4];
2027 buf.extend_from_slice(s.as_bytes());
2028 buf
2029 }
2030 }
2031}
2032
2033fn deserialize_value(bytes: &[u8]) -> SochValue {
2034 if bytes.is_empty() {
2035 return SochValue::Null;
2036 }
2037
2038 match bytes[0] {
2039 0 => SochValue::Null,
2040 1 if bytes.len() >= 9 => {
2041 let i = i64::from_le_bytes(bytes[1..9].try_into().unwrap());
2042 SochValue::Int(i)
2043 }
2044 2 if bytes.len() >= 9 => {
2045 let u = u64::from_le_bytes(bytes[1..9].try_into().unwrap());
2046 SochValue::UInt(u)
2047 }
2048 3 if bytes.len() >= 9 => {
2049 let f = f64::from_le_bytes(bytes[1..9].try_into().unwrap());
2050 SochValue::Float(f)
2051 }
2052 4 => {
2053 let s = String::from_utf8_lossy(&bytes[1..]).to_string();
2054 SochValue::Text(s)
2055 }
2056 5 if bytes.len() >= 2 => SochValue::Bool(bytes[1] != 0),
2057 6 => SochValue::Binary(bytes[1..].to_vec()),
2058 _ => {
2059 // Treat as text
2060 let s = String::from_utf8_lossy(bytes).to_string();
2061 SochValue::Text(s)
2062 }
2063 }
2064}
2065
2066// ============================================================================
2067// Helper functions for columnar query result building
2068// ============================================================================
2069
2070/// Push a SochValue into a TypedColumn
2071fn push_value_to_typed_column(col: &mut CoreTypedColumn, value: Option<SochValue>) {
2072 match value {
2073 None => push_null_to_typed_column(col),
2074 Some(v) => match (col, v) {
2075 (
2076 CoreTypedColumn::Int64 {
2077 values,
2078 validity,
2079 stats,
2080 },
2081 SochValue::Int(i),
2082 ) => {
2083 values.push(i);
2084 validity.push(true);
2085 stats.update_i64(i);
2086 }
2087 (
2088 CoreTypedColumn::Int64 {
2089 values,
2090 validity,
2091 stats,
2092 },
2093 SochValue::UInt(u),
2094 ) => {
2095 values.push(u as i64);
2096 validity.push(true);
2097 stats.update_i64(u as i64);
2098 }
2099 (
2100 CoreTypedColumn::UInt64 {
2101 values,
2102 validity,
2103 stats,
2104 },
2105 SochValue::UInt(u),
2106 ) => {
2107 values.push(u);
2108 validity.push(true);
2109 stats.update_i64(u as i64);
2110 }
2111 (
2112 CoreTypedColumn::UInt64 {
2113 values,
2114 validity,
2115 stats,
2116 },
2117 SochValue::Int(i),
2118 ) => {
2119 values.push(i as u64);
2120 validity.push(true);
2121 stats.update_i64(i);
2122 }
2123 (
2124 CoreTypedColumn::Float64 {
2125 values,
2126 validity,
2127 stats,
2128 },
2129 SochValue::Float(f),
2130 ) => {
2131 values.push(f);
2132 validity.push(true);
2133 stats.update_f64(f);
2134 }
2135 (
2136 CoreTypedColumn::Float64 {
2137 values,
2138 validity,
2139 stats,
2140 },
2141 SochValue::Int(i),
2142 ) => {
2143 values.push(i as f64);
2144 validity.push(true);
2145 stats.update_f64(i as f64);
2146 }
2147 (
2148 CoreTypedColumn::Text {
2149 offsets,
2150 data,
2151 validity,
2152 stats,
2153 },
2154 SochValue::Text(s),
2155 ) => {
2156 data.extend_from_slice(s.as_bytes());
2157 offsets.push(data.len() as u32);
2158 validity.push(true);
2159 stats.row_count += 1;
2160 }
2161 (
2162 CoreTypedColumn::Binary {
2163 offsets,
2164 data,
2165 validity,
2166 stats,
2167 },
2168 SochValue::Binary(b),
2169 ) => {
2170 data.extend_from_slice(&b);
2171 offsets.push(data.len() as u32);
2172 validity.push(true);
2173 stats.row_count += 1;
2174 }
2175 (
2176 CoreTypedColumn::Bool {
2177 values,
2178 validity,
2179 stats,
2180 len,
2181 },
2182 SochValue::Bool(b),
2183 ) => {
2184 let idx = *len;
2185 *len += 1;
2186 let num_words = (*len).div_ceil(64);
2187 while values.len() < num_words {
2188 values.push(0);
2189 }
2190 if b {
2191 let word = idx / 64;
2192 let bit = idx % 64;
2193 values[word] |= 1 << bit;
2194 }
2195 validity.push(true);
2196 stats.row_count += 1;
2197 }
2198 // Type mismatch - push as null
2199 (col, _) => push_null_to_typed_column(col),
2200 },
2201 }
2202}
2203
2204/// Push a null value into a TypedColumn
2205fn push_null_to_typed_column(col: &mut CoreTypedColumn) {
2206 match col {
2207 CoreTypedColumn::Int64 {
2208 values,
2209 validity,
2210 stats,
2211 } => {
2212 values.push(0);
2213 validity.push(false);
2214 stats.update_null();
2215 }
2216 CoreTypedColumn::UInt64 {
2217 values,
2218 validity,
2219 stats,
2220 } => {
2221 values.push(0);
2222 validity.push(false);
2223 stats.update_null();
2224 }
2225 CoreTypedColumn::Float64 {
2226 values,
2227 validity,
2228 stats,
2229 } => {
2230 values.push(0.0);
2231 validity.push(false);
2232 stats.update_null();
2233 }
2234 CoreTypedColumn::Text {
2235 offsets,
2236 data: _,
2237 validity,
2238 stats,
2239 } => {
2240 offsets.push(offsets.last().copied().unwrap_or(0));
2241 validity.push(false);
2242 stats.update_null();
2243 }
2244 CoreTypedColumn::Binary {
2245 offsets,
2246 data: _,
2247 validity,
2248 stats,
2249 } => {
2250 offsets.push(offsets.last().copied().unwrap_or(0));
2251 validity.push(false);
2252 stats.update_null();
2253 }
2254 CoreTypedColumn::Bool {
2255 values,
2256 validity,
2257 stats,
2258 len,
2259 } => {
2260 *len += 1;
2261 let num_words = (*len).div_ceil(64);
2262 while values.len() < num_words {
2263 values.push(0);
2264 }
2265 validity.push(false);
2266 stats.update_null();
2267 }
2268 }
2269}
2270
2271#[cfg(test)]
2272mod tests {
2273 use super::*;
2274 use tempfile::tempdir;
2275
2276 #[test]
2277 fn test_database_open_close() {
2278 let dir = tempdir().unwrap();
2279 let db = Database::open(dir.path()).unwrap();
2280
2281 // Should be able to begin a transaction
2282 let txn = db.begin_transaction().unwrap();
2283 assert!(txn.txn_id > 0);
2284
2285 db.abort(txn).unwrap();
2286 db.shutdown().unwrap();
2287 }
2288
2289 #[test]
2290 fn test_database_put_get() {
2291 let dir = tempdir().unwrap();
2292 let db = Database::open(dir.path()).unwrap();
2293
2294 let txn = db.begin_transaction().unwrap();
2295 db.put(txn, b"key1", b"value1").unwrap();
2296
2297 let val = db.get(txn, b"key1").unwrap();
2298 assert_eq!(val, Some(b"value1".to_vec()));
2299
2300 db.commit(txn).unwrap();
2301
2302 // New transaction should see committed data
2303 let txn2 = db.begin_transaction().unwrap();
2304 let val = db.get(txn2, b"key1").unwrap();
2305 assert_eq!(val, Some(b"value1".to_vec()));
2306 db.abort(txn2).unwrap();
2307 }
2308
2309 #[test]
2310 fn test_database_path_api() {
2311 let dir = tempdir().unwrap();
2312 let db = Database::open(dir.path()).unwrap();
2313
2314 let txn = db.begin_transaction().unwrap();
2315
2316 // Write using path API
2317 db.put_path(txn, "users/1/name", b"Alice").unwrap();
2318 db.put_path(txn, "users/1/email", b"alice@example.com")
2319 .unwrap();
2320 db.put_path(txn, "users/2/name", b"Bob").unwrap();
2321
2322 db.commit(txn).unwrap();
2323
2324 // Scan path prefix
2325 let txn2 = db.begin_transaction().unwrap();
2326 let results = db.scan_path(txn2, "users/1/").unwrap();
2327 assert_eq!(results.len(), 2);
2328
2329 db.abort(txn2).unwrap();
2330 }
2331
2332 #[test]
2333 fn test_database_table_api() {
2334 let dir = tempdir().unwrap();
2335 let db = Database::open(dir.path()).unwrap();
2336
2337 // Register table
2338 db.register_table(TableSchema {
2339 name: "users".to_string(),
2340 columns: vec![
2341 ColumnDef {
2342 name: "name".to_string(),
2343 col_type: ColumnType::Text,
2344 nullable: false,
2345 },
2346 ColumnDef {
2347 name: "age".to_string(),
2348 col_type: ColumnType::Int64,
2349 nullable: true,
2350 },
2351 ],
2352 })
2353 .unwrap();
2354
2355 // Insert row
2356 let txn = db.begin_transaction().unwrap();
2357 let mut values = HashMap::new();
2358 values.insert("name".to_string(), SochValue::Text("Alice".to_string()));
2359 values.insert("age".to_string(), SochValue::Int(30));
2360
2361 db.insert_row(txn, "users", 1, &values).unwrap();
2362 db.commit(txn).unwrap();
2363
2364 // Read row
2365 let txn2 = db.begin_transaction().unwrap();
2366 let row = db.read_row(txn2, "users", 1, None).unwrap();
2367 assert!(row.is_some());
2368
2369 let row = row.unwrap();
2370 assert_eq!(row.get("name"), Some(&SochValue::Text("Alice".to_string())));
2371
2372 db.abort(txn2).unwrap();
2373 }
2374
2375 #[test]
2376 fn test_database_query_builder() {
2377 let dir = tempdir().unwrap();
2378 let db = Database::open(dir.path()).unwrap();
2379
2380 // Insert test data
2381 let txn = db.begin_transaction().unwrap();
2382 db.put_path(txn, "docs/1/title", b"Hello").unwrap();
2383 db.put_path(txn, "docs/1/content", b"World").unwrap();
2384 db.put_path(txn, "docs/2/title", b"Foo").unwrap();
2385 db.put_path(txn, "docs/2/content", b"Bar").unwrap();
2386 db.commit(txn).unwrap();
2387
2388 // Query with limit
2389 let txn2 = db.begin_transaction().unwrap();
2390 let result = db.query(txn2, "docs/").limit(1).execute().unwrap();
2391
2392 assert_eq!(result.rows.len(), 1);
2393 db.abort(txn2).unwrap();
2394 }
2395
2396 #[test]
2397 fn test_database_crash_recovery() {
2398 let dir = tempdir().unwrap();
2399
2400 // Write and commit
2401 {
2402 // Use open_without_lock for crash simulation tests
2403 let db = Database::open_without_lock(dir.path()).unwrap();
2404 // Set sync mode to FULL to ensure data is persisted before "crash"
2405 db.storage.set_sync_mode(2);
2406 let txn = db.begin_transaction().unwrap();
2407 db.put(txn, b"persist", b"this").unwrap();
2408 db.commit(txn).unwrap();
2409 // Don't call shutdown - simulate crash
2410 std::mem::forget(db);
2411 }
2412
2413 // Reopen - should recover
2414 {
2415 let db = Database::open_without_lock(dir.path()).unwrap();
2416 let txn = db.begin_transaction().unwrap();
2417 let val = db.get(txn, b"persist").unwrap();
2418 assert_eq!(val, Some(b"this".to_vec()));
2419 db.abort(txn).unwrap();
2420 }
2421 }
2422}