sochdb_storage/database.rs
1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SochDB Database Kernel
16//!
17//! The shared core that powers both embedded mode (`SochConnection::open`) and
18//! server mode (`sochdb-server`). This is the "SQLite engine" equivalent.
19//!
20//! ## Architecture
21//!
22//! ```text
23//! ┌──────────────────────────────────────────────────────────────────┐
24//! │ Database Kernel │
25//! │ Arc<Database> - shared by all connections │
26//! ├──────────────────────────────────────────────────────────────────┤
27//! │ │
28//! │ ┌─────────────────┐ ┌─────────────────┐ ┌────────────────┐ │
29//! │ │ DurableStorage │ │ Catalog │ │ Vector Index │ │
30//! │ │ (WAL + MVCC) │ │ (Schema Mgmt) │ │ (HNSW/Vamana) │ │
31//! │ └────────┬────────┘ └────────┬────────┘ └───────┬────────┘ │
32//! │ │ │ │ │
33//! │ └─────────────────────┴─────────────────────┘ │
34//! │ │ │
35//! │ ┌─────────────────────────────────────────────────────────────┐ │
36//! │ │ Query Executor (Path-Native) │ │
37//! │ │ - Path resolution: O(|path|) │ │
38//! │ │ - Column projection: 80% I/O reduction │ │
39//! │ │ - Context selection: Token-aware chunking │ │
40//! │ └─────────────────────────────────────────────────────────────┘ │
41//! │ │
42//! └──────────────────────────────────────────────────────────────────┘
43//!
44//! Deployment Modes:
45//! ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
46//! │ Embedded │ │ IPC Server │ │ TCP Server │
47//! │ (in-proc) │ │ (Unix sock)│ │ (remote) │
48//! └──────┬──────┘ └──────┬──────┘ └──────┬──────┘
49//! │ │ │
50//! └─────────────────┴─────────────────┘
51//! │
52//! Arc<Database>
53//! ```
54//!
55//! ## Latency Model
56//!
57//! Let K = kernel processing cost for a query
58//!
59//! - Embedded: L_emb ≈ K (function call overhead negligible)
60//! - IPC: L_ipc ≈ K + δ_ipc (δ_ipc = ~10-50µs for Unix socket)
61//! - TCP: L_tcp ≈ K + δ_net (δ_net = 100µs-10ms depending on network)
62//!
63//! For LLM context queries where K >> δ_ipc, IPC is "nearly embedded".
64
65use std::collections::HashMap;
66use std::path::{Path, PathBuf};
67use std::sync::Arc;
68use std::sync::atomic::{AtomicU64, Ordering};
69
70use dashmap::DashMap;
71use parking_lot::RwLock;
72
73use crate::durable_storage::{DurableStorage, TransactionMode};
74use crate::index_policy::{IndexPolicy, TableIndexConfig, TableIndexRegistry};
75use crate::key_buffer::KeyBuffer;
76use crate::packed_row::{PackedColumnDef, PackedColumnType, PackedRow, PackedTableSchema};
77use sochdb_core::catalog::Catalog;
78use sochdb_core::{Result, SochDBError, SochValue};
79
80// Re-export key types
81pub use crate::durable_storage::RecoveryStats;
82
83/// Database configuration
84#[derive(Debug, Clone)]
85pub struct DatabaseConfig {
86 /// Enable group commit for better write throughput
87 pub group_commit: bool,
88 /// Maximum memory for memtables before flush (bytes)
89 pub memtable_size_limit: usize,
90 /// Enable WAL for durability
91 pub wal_enabled: bool,
92 /// Sync mode: fsync after every commit vs periodic
93 pub sync_mode: SyncMode,
94 /// Read-only mode
95 pub read_only: bool,
96
97 /// Enable ordered index for O(log N) prefix scans
98 ///
99 /// # Deprecation Notice
100 ///
101 /// **DEPRECATED since 0.2.0**: Use `default_index_policy` instead for per-table control.
102 /// This field will be removed in v0.3.0.
103 ///
104 /// ## Migration Guide
105 ///
106 /// Replace:
107 /// ```ignore
108 /// DatabaseConfig { enable_ordered_index: true, .. } // Old API
109 /// DatabaseConfig { enable_ordered_index: false, .. } // Old API
110 /// ```
111 ///
112 /// With:
113 /// ```ignore
114 /// DatabaseConfig { default_index_policy: IndexPolicy::ScanOptimized, .. } // Ordered index enabled
115 /// DatabaseConfig { default_index_policy: IndexPolicy::WriteOptimized, .. } // Ordered index disabled
116 /// ```
117 ///
118 /// ## Behavior
119 ///
120 /// When false, saves ~134 ns/op on writes (20% speedup)
121 /// but scan_prefix becomes O(N) instead of O(log N + K).
122 ///
123 /// Set to false for write-heavy workloads without range scans.
124 #[deprecated(
125 since = "0.2.0",
126 note = "Use `default_index_policy` field instead. This field will be removed in v0.3.0. \
127 Set IndexPolicy::ScanOptimized for ordered index, WriteOptimized to disable."
128 )]
129 ///
130 /// Set to false for write-heavy workloads without range scans.
131 pub enable_ordered_index: bool,
132 /// Group commit configuration
133 pub group_commit_config: GroupCommitSettings,
134 /// Default index policy for tables not explicitly configured
135 ///
136 /// This replaces the global `enable_ordered_index` toggle with
137 /// fine-grained per-table control. Use `index_registry` to configure
138 /// individual tables.
139 ///
140 /// | Policy | Insert Cost | Scan Cost | Use Case |
141 /// |----------------|-------------|----------------|------------------------|
142 /// | WriteOptimized | O(1) | O(N) | High-write, rare scan |
143 /// | Balanced | O(1) amort | O(output+logK) | Mixed OLTP |
144 /// | ScanOptimized | O(log N) | O(logN + K) | Analytics, range query |
145 /// | AppendOnly | O(1) | O(N) | Time-series logs |
146 pub default_index_policy: IndexPolicy,
147}
148
149/// Group commit settings - mirrors SQLite's WAL mode tuning
150///
151/// ## Performance Model
152///
153/// Without group commit: Throughput = 1 / L_fsync ≈ 200 commits/sec (L=5ms)
154/// With group commit (batch size K): Throughput = K / L_fsync = K × 200 commits/sec
155///
156/// For K=100: 20,000 commits/sec (100× speedup)
157///
158/// ## SQLite Comparison
159///
160/// | Setting | SQLite Equivalent |
161/// |----------------------------|-----------------------------|
162/// | batch_size = 1 | PRAGMA synchronous = FULL |
163/// | batch_size = 100 | WAL mode with batching |
164/// | max_wait_us = 0 | No delay, immediate flush |
165/// | max_wait_us = 10000 | Up to 10ms delay for batch |
166#[derive(Debug, Clone)]
167pub struct GroupCommitSettings {
168 /// Minimum batch size before flush (default: 1)
169 pub min_batch_size: usize,
170 /// Maximum batch size (default: 1000)
171 pub max_batch_size: usize,
172 /// Maximum wait time before flush in microseconds (default: 10000 = 10ms)
173 pub max_wait_us: u64,
174 /// Expected fsync latency in microseconds (for adaptive sizing)
175 pub fsync_latency_us: u64,
176}
177
178impl Default for GroupCommitSettings {
179 fn default() -> Self {
180 Self {
181 min_batch_size: 1,
182 max_batch_size: 1000,
183 max_wait_us: 10_000, // 10ms
184 fsync_latency_us: 5_000, // 5ms
185 }
186 }
187}
188
189impl GroupCommitSettings {
190 /// High throughput preset - maximizes batching
191 pub fn high_throughput() -> Self {
192 Self {
193 min_batch_size: 50,
194 max_batch_size: 5000,
195 max_wait_us: 50_000, // 50ms
196 fsync_latency_us: 5_000,
197 }
198 }
199
200 /// Low latency preset - minimal batching
201 pub fn low_latency() -> Self {
202 Self {
203 min_batch_size: 1,
204 max_batch_size: 10,
205 max_wait_us: 1_000, // 1ms
206 fsync_latency_us: 5_000,
207 }
208 }
209
210 /// Calculate optimal batch size using Little's Law
211 ///
212 /// N* = sqrt(2 × L_fsync × λ / C_wait)
213 ///
214 /// # Arguments
215 /// * `arrival_rate` - Operations per second
216 /// * `wait_cost` - Cost coefficient for waiting (0.0-1.0)
217 pub fn optimal_batch_size(&self, arrival_rate: f64, wait_cost: f64) -> usize {
218 let l_fsync = self.fsync_latency_us as f64 / 1_000_000.0;
219 let n_star = (2.0 * l_fsync * arrival_rate / wait_cost.max(0.001)).sqrt();
220 (n_star as usize).clamp(self.min_batch_size, self.max_batch_size)
221 }
222}
223
224impl Default for DatabaseConfig {
225 #[allow(deprecated)]
226 fn default() -> Self {
227 Self {
228 group_commit: true,
229 memtable_size_limit: 64 * 1024 * 1024, // 64MB
230 wal_enabled: true,
231 sync_mode: SyncMode::Normal,
232 read_only: false,
233 enable_ordered_index: true, // Default: enabled for compatibility
234 group_commit_config: GroupCommitSettings::default(),
235 default_index_policy: IndexPolicy::Balanced, // New default: balanced OLTP policy
236 }
237 }
238}
239
240impl DatabaseConfig {
241 /// Create config optimized for throughput (Fast Mode)
242 ///
243 /// - Disables ordered index (saves ~134 ns/op)
244 /// - Uses high-throughput group commit settings
245 /// - Suitable for append-only workloads
246 #[allow(deprecated)]
247 pub fn throughput_optimized() -> Self {
248 Self {
249 group_commit: true,
250 memtable_size_limit: 128 * 1024 * 1024, // 128MB
251 wal_enabled: true,
252 sync_mode: SyncMode::Normal,
253 read_only: false,
254 enable_ordered_index: false,
255 group_commit_config: GroupCommitSettings::high_throughput(),
256 default_index_policy: IndexPolicy::WriteOptimized, // No ordered index overhead
257 }
258 }
259
260 /// Create config optimized for latency
261 ///
262 /// - Keeps ordered index for fast range scans
263 /// - Uses low-latency group commit settings
264 /// - Suitable for OLTP workloads
265 #[allow(deprecated)]
266 pub fn latency_optimized() -> Self {
267 Self {
268 group_commit: true,
269 memtable_size_limit: 32 * 1024 * 1024, // 32MB
270 wal_enabled: true,
271 sync_mode: SyncMode::Full,
272 read_only: false,
273 enable_ordered_index: true,
274 group_commit_config: GroupCommitSettings::low_latency(),
275 default_index_policy: IndexPolicy::ScanOptimized, // Fast range scans
276 }
277 }
278
279 /// Create config matching SQLite defaults
280 #[allow(deprecated)]
281 pub fn sqlite_compatible() -> Self {
282 Self {
283 group_commit: false, // SQLite default is single-commit
284 memtable_size_limit: 64 * 1024 * 1024,
285 wal_enabled: true,
286 sync_mode: SyncMode::Normal, // PRAGMA synchronous = NORMAL
287 read_only: false,
288 enable_ordered_index: true,
289 group_commit_config: GroupCommitSettings::default(),
290 default_index_policy: IndexPolicy::Balanced, // Good default for mixed workloads
291 }
292 }
293
294 /// Get effective ordered index setting, derived from `default_index_policy`.
295 ///
296 /// This is the shim method for the deprecated `enable_ordered_index` field.
297 /// It returns `true` if the policy requires an ordered index (ScanOptimized),
298 /// and `false` otherwise (WriteOptimized, Balanced, AppendOnly).
299 ///
300 /// # Policy Mapping
301 ///
302 /// | IndexPolicy | Returns |
303 /// |------------------|---------|
304 /// | ScanOptimized | true |
305 /// | Balanced | false |
306 /// | WriteOptimized | false |
307 /// | AppendOnly | false |
308 ///
309 /// Note: `Balanced` uses lazy compaction rather than a live ordered index,
310 /// so it returns `false` for the low-level memtable config but still supports
311 /// efficient range scans via sorted runs.
312 pub fn effective_ordered_index(&self) -> bool {
313 matches!(self.default_index_policy, IndexPolicy::ScanOptimized)
314 }
315}
316
317/// WAL sync mode - matches SQLite's PRAGMA synchronous semantics
318///
319/// | SochDB | SQLite | Description |
320/// |------------|--------------|------------------------------------------------|
321/// | Off | OFF (0) | No fsync, risk of data loss on crash |
322/// | Normal | NORMAL (1) | Fsync at checkpoints, not every commit |
323/// | Full | FULL (2) | Fsync every commit (safest, slowest) |
324///
325/// # Performance vs Durability Trade-offs
326///
327/// - **Off**: ~10x faster than Full, but may lose last ~100ms of data on crash
328/// - **Normal**: ~5x faster than Full, durable at checkpoint boundaries
329/// - **Full**: Every commit is fsync'd, no data loss possible
330///
331/// # SQLite Compatibility
332///
333/// ```sql
334/// -- SQLite equivalent settings
335/// PRAGMA synchronous = OFF; -- SyncMode::Off
336/// PRAGMA synchronous = NORMAL; -- SyncMode::Normal
337/// PRAGMA synchronous = FULL; -- SyncMode::Full
338/// ```
339#[derive(Debug, Clone, Copy, PartialEq, Eq)]
340pub enum SyncMode {
341 /// No fsync (equivalent to SQLite PRAGMA synchronous = OFF)
342 ///
343 /// Writes buffered in OS, may lose data on power failure.
344 /// Use for non-critical data or bulk loading.
345 Off = 0,
346
347 /// Fsync at checkpoints (equivalent to SQLite PRAGMA synchronous = NORMAL)
348 ///
349 /// Default mode. Syncs WAL at checkpoint boundaries.
350 /// Good balance of performance and durability.
351 Normal = 1,
352
353 /// Fsync every commit (equivalent to SQLite PRAGMA synchronous = FULL)
354 ///
355 /// Safest mode. Every commit is immediately durable.
356 /// Required for financial/critical data.
357 Full = 2,
358}
359
360impl SyncMode {
361 /// Convert from SQLite synchronous pragma value
362 pub fn from_sqlite_pragma(value: u32) -> Self {
363 match value {
364 0 => SyncMode::Off,
365 1 => SyncMode::Normal,
366 _ => SyncMode::Full, // 2+ treated as Full
367 }
368 }
369
370 /// Convert to SQLite synchronous pragma value
371 pub fn to_sqlite_pragma(self) -> u32 {
372 self as u32
373 }
374
375 /// Parse from string (case-insensitive)
376 pub fn parse(s: &str) -> Option<Self> {
377 match s.to_ascii_uppercase().as_str() {
378 "OFF" | "0" => Some(SyncMode::Off),
379 "NORMAL" | "1" => Some(SyncMode::Normal),
380 "FULL" | "2" => Some(SyncMode::Full),
381 _ => None,
382 }
383 }
384}
385
386/// Table schema for the kernel
387#[derive(Debug, Clone)]
388pub struct TableSchema {
389 pub name: String,
390 pub columns: Vec<ColumnDef>,
391}
392
393/// Column definition
394#[derive(Debug, Clone)]
395pub struct ColumnDef {
396 pub name: String,
397 pub col_type: ColumnType,
398 pub nullable: bool,
399}
400
401/// Column types
402#[derive(Debug, Clone, Copy, PartialEq, Eq)]
403pub enum ColumnType {
404 Int64,
405 UInt64,
406 Float64,
407 Text,
408 Binary,
409 Bool,
410}
411
412/// Transaction handle for kernel operations
413#[derive(Debug, Clone, Copy)]
414pub struct TxnHandle {
415 pub txn_id: u64,
416 pub snapshot_ts: u64,
417}
418
419/// Query result from the kernel
420#[derive(Debug, Clone)]
421pub struct QueryResult {
422 /// Column names
423 pub columns: Vec<String>,
424 /// Row data (each row is a map of column -> value)
425 pub rows: Vec<HashMap<String, SochValue>>,
426 /// Number of rows scanned (for stats)
427 pub rows_scanned: usize,
428 /// Bytes read from storage
429 pub bytes_read: usize,
430}
431
432impl QueryResult {
433 /// Empty result
434 pub fn empty() -> Self {
435 Self {
436 columns: vec![],
437 rows: vec![],
438 rows_scanned: 0,
439 bytes_read: 0,
440 }
441 }
442
443 /// Convert to TOON format for token efficiency
444 pub fn to_toon(&self) -> String {
445 if self.rows.is_empty() {
446 return "[]".to_string();
447 }
448
449 // TOON format: table[N]{cols}: row1; row2; ...
450 let n = self.rows.len();
451 let cols = self.columns.join(",");
452
453 let rows_str: Vec<String> = self
454 .rows
455 .iter()
456 .map(|row| {
457 self.columns
458 .iter()
459 .map(|c| {
460 row.get(c)
461 .map(format_soch_value)
462 .unwrap_or_else(|| "∅".to_string())
463 })
464 .collect::<Vec<_>>()
465 .join(",")
466 })
467 .collect();
468
469 format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
470 }
471}
472
473fn format_soch_value(v: &SochValue) -> String {
474 match v {
475 SochValue::Null => "∅".to_string(),
476 SochValue::Int(i) => i.to_string(),
477 SochValue::UInt(u) => u.to_string(),
478 SochValue::Float(f) => format!("{:.6}", f),
479 SochValue::Text(s) => {
480 if s.contains(',') || s.contains(';') {
481 format!("\"{}\"", s.replace('"', "\\\""))
482 } else {
483 s.clone()
484 }
485 }
486 SochValue::Bool(b) => if *b { "T" } else { "F" }.to_string(),
487 SochValue::Binary(b) => format!("b64:{}", base64_encode(b)),
488 _ => format!("{:?}", v),
489 }
490}
491
492fn base64_encode(data: &[u8]) -> String {
493 // Simple base64 encoding
494 const ALPHABET: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
495 let mut result = String::new();
496
497 for chunk in data.chunks(3) {
498 let b0 = chunk[0] as usize;
499 let b1 = chunk.get(1).copied().unwrap_or(0) as usize;
500 let b2 = chunk.get(2).copied().unwrap_or(0) as usize;
501
502 result.push(ALPHABET[b0 >> 2] as char);
503 result.push(ALPHABET[((b0 & 0x03) << 4) | (b1 >> 4)] as char);
504
505 if chunk.len() > 1 {
506 result.push(ALPHABET[((b1 & 0x0f) << 2) | (b2 >> 6)] as char);
507 } else {
508 result.push('=');
509 }
510
511 if chunk.len() > 2 {
512 result.push(ALPHABET[b2 & 0x3f] as char);
513 } else {
514 result.push('=');
515 }
516 }
517
518 result
519}
520
521// ============================================================================
522// Columnar Query Results - SIMD-friendly result format
523// ============================================================================
524
525use sochdb_core::TypedColumn as CoreTypedColumn;
526
527/// Columnar query result - SIMD-friendly format for analytics
528///
529/// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, this returns
530/// column-oriented `Vec<TypedColumn>` for efficient vectorized operations.
531///
532/// ## Memory Layout
533///
534/// Row-oriented (standard):
535/// ```text
536/// Row 0: [id=1, name="Alice", score=85]
537/// Row 1: [id=2, name="Bob", score=92]
538/// Row 2: [id=3, name="Carol", score=78]
539/// ```
540///
541/// Column-oriented (this format):
542/// ```text
543/// id: [1, 2, 3] ← contiguous i64 array (SIMD-friendly)
544/// name: ["Alice", "Bob", "Carol"] ← Arrow-style string encoding
545/// score: [85, 92, 78] ← contiguous i64 array
546/// ```
547///
548/// ## Performance Benefits
549///
550/// - SIMD: Column sums use vectorized instructions (~8× faster)
551/// - Cache: Sequential access pattern maximizes L1/L2 cache hits
552/// - Compression: Same-type data compresses better (5-10× typical)
553/// - Filtering: Bitmap operations instead of row iteration
554///
555/// ## Usage
556///
557/// ```ignore
558/// let result = db.query(txn, "users")
559/// .columns(&["id", "score"])
560/// .as_columnar()?;
561///
562/// // SIMD sum
563/// let total_score = result.column("score")
564/// .map(|c| c.sum_i64())
565/// .unwrap_or(0);
566///
567/// // Stats
568/// println!("Rows: {}, Memory: {} bytes", result.row_count(), result.memory_size());
569/// ```
570#[derive(Debug, Clone)]
571pub struct ColumnarQueryResult {
572 /// Column names in order
573 pub columns: Vec<String>,
574 /// Column data - each TypedColumn contains all values for one column
575 pub data: Vec<CoreTypedColumn>,
576 /// Number of rows
577 pub row_count: usize,
578 /// Bytes read from storage
579 pub bytes_read: usize,
580}
581
582impl ColumnarQueryResult {
583 /// Create an empty result
584 pub fn empty() -> Self {
585 Self {
586 columns: vec![],
587 data: vec![],
588 row_count: 0,
589 bytes_read: 0,
590 }
591 }
592
593 /// Get column by name
594 pub fn column(&self, name: &str) -> Option<&CoreTypedColumn> {
595 self.columns
596 .iter()
597 .position(|c| c == name)
598 .and_then(|idx| self.data.get(idx))
599 }
600
601 /// Get column index by name
602 pub fn column_index(&self, name: &str) -> Option<usize> {
603 self.columns.iter().position(|c| c == name)
604 }
605
606 /// Number of rows
607 pub fn row_count(&self) -> usize {
608 self.row_count
609 }
610
611 /// Number of columns
612 pub fn column_count(&self) -> usize {
613 self.columns.len()
614 }
615
616 /// Total memory size in bytes
617 pub fn memory_size(&self) -> usize {
618 self.data.iter().map(|c| c.memory_size()).sum()
619 }
620
621 /// Sum of an i64 column (SIMD-optimized)
622 pub fn sum_i64(&self, column: &str) -> Option<i64> {
623 self.column(column).map(|c| c.sum_i64())
624 }
625
626 /// Sum of an f64 column (SIMD-optimized)
627 pub fn sum_f64(&self, column: &str) -> Option<f64> {
628 self.column(column).map(|c| c.sum_f64())
629 }
630
631 /// Get column statistics (min, max, null count)
632 pub fn column_stats(&self, column: &str) -> Option<&sochdb_core::columnar::ColumnStats> {
633 self.column(column).map(|c| c.stats())
634 }
635
636 /// Convert to TOON format (token-efficient)
637 pub fn to_toon(&self) -> String {
638 if self.row_count == 0 {
639 return "[]".to_string();
640 }
641
642 let n = self.row_count;
643 let cols = self.columns.join(",");
644
645 // Build rows from columns
646 let mut rows_str = Vec::with_capacity(n);
647 for i in 0..n {
648 let row: Vec<String> = self
649 .data
650 .iter()
651 .map(|col| format_columnar_value(col, i))
652 .collect();
653 rows_str.push(row.join(","));
654 }
655
656 format!("result[{}]{{{}}}:{}", n, cols, rows_str.join(";"))
657 }
658}
659
660/// Format a single value from a TypedColumn at index
661fn format_columnar_value(col: &CoreTypedColumn, idx: usize) -> String {
662 match col {
663 CoreTypedColumn::Int64 {
664 values, validity, ..
665 } => {
666 if validity.is_valid(idx) && idx < values.len() {
667 values[idx].to_string()
668 } else {
669 "∅".to_string()
670 }
671 }
672 CoreTypedColumn::UInt64 {
673 values, validity, ..
674 } => {
675 if validity.is_valid(idx) && idx < values.len() {
676 values[idx].to_string()
677 } else {
678 "∅".to_string()
679 }
680 }
681 CoreTypedColumn::Float64 {
682 values, validity, ..
683 } => {
684 if validity.is_valid(idx) && idx < values.len() {
685 format!("{:.6}", values[idx])
686 } else {
687 "∅".to_string()
688 }
689 }
690 CoreTypedColumn::Text {
691 offsets,
692 data,
693 validity,
694 ..
695 } => {
696 if validity.is_valid(idx) && idx + 1 < offsets.len() {
697 let start = offsets[idx] as usize;
698 let end = offsets[idx + 1] as usize;
699 std::str::from_utf8(&data[start..end])
700 .map(|s| {
701 if s.contains(',') || s.contains(';') {
702 format!("\"{}\"", s.replace('"', "\\\""))
703 } else {
704 s.to_string()
705 }
706 })
707 .unwrap_or_else(|_| "∅".to_string())
708 } else {
709 "∅".to_string()
710 }
711 }
712 CoreTypedColumn::Binary {
713 offsets,
714 data,
715 validity,
716 ..
717 } => {
718 if validity.is_valid(idx) && idx + 1 < offsets.len() {
719 let start = offsets[idx] as usize;
720 let end = offsets[idx + 1] as usize;
721 format!("b64:{}", base64_encode(&data[start..end]))
722 } else {
723 "∅".to_string()
724 }
725 }
726 CoreTypedColumn::Bool {
727 values,
728 validity,
729 len,
730 ..
731 } => {
732 if validity.is_valid(idx) && idx < *len {
733 let word = idx / 64;
734 let bit = idx % 64;
735 if (values[word] >> bit) & 1 == 1 {
736 "T"
737 } else {
738 "F"
739 }
740 .to_string()
741 } else {
742 "∅".to_string()
743 }
744 }
745 }
746}
747
748/// Vector search result
749#[derive(Debug, Clone)]
750pub struct VectorSearchResult {
751 pub id: u64,
752 pub distance: f32,
753 pub metadata: Option<HashMap<String, SochValue>>,
754}
755
756/// The SochDB Database Kernel
757///
758/// This is the shared core used by both embedded (`SochConnection`) and
759/// server (`sochdb-server`) modes. It owns all storage, catalog, and
760/// indexing components.
761///
762/// # Thread Safety
763///
764/// The Database is fully thread-safe via internal synchronization:
765/// - Multiple readers can operate concurrently (MVCC snapshots)
766/// - Writers coordinate through WAL and group commit
767/// - All state is behind Arc/RwLock for shared access
768///
769/// # Example
770///
771/// ```ignore
772/// // Open a database (SQLite-style)
773/// let db = Database::open("./my_data")?;
774///
775/// // Begin a transaction
776/// let txn = db.begin_transaction()?;
777///
778/// // Write data
779/// db.put(txn, b"user:1:name", b"Alice")?;
780///
781/// // Commit
782/// db.commit(txn)?;
783/// ```
784#[allow(dead_code)]
785pub struct Database {
786 /// Path to database directory
787 path: PathBuf,
788 /// Durable storage layer (WAL + MVCC + memtable)
789 storage: Arc<DurableStorage>,
790 /// Schema catalog
791 catalog: Arc<RwLock<Catalog>>,
792 /// Registered table schemas (name -> schema) - lock-free for reads
793 tables: DashMap<String, TableSchema>,
794 /// Cached packed schemas for fast insert (name -> packed schema)
795 packed_schemas: DashMap<String, PackedTableSchema>,
796 /// Per-table index policy registry
797 index_registry: Arc<TableIndexRegistry>,
798 /// Configuration
799 config: DatabaseConfig,
800 /// Statistics
801 stats: DatabaseStats,
802 /// Shutdown flag
803 shutdown: AtomicU64,
804}
805
806/// Database statistics
807struct DatabaseStats {
808 transactions_started: AtomicU64,
809 transactions_committed: AtomicU64,
810 transactions_aborted: AtomicU64,
811 queries_executed: AtomicU64,
812 bytes_written: AtomicU64,
813 bytes_read: AtomicU64,
814}
815
816impl DatabaseStats {
817 fn new() -> Self {
818 Self {
819 transactions_started: AtomicU64::new(0),
820 transactions_committed: AtomicU64::new(0),
821 transactions_aborted: AtomicU64::new(0),
822 queries_executed: AtomicU64::new(0),
823 bytes_written: AtomicU64::new(0),
824 bytes_read: AtomicU64::new(0),
825 }
826 }
827}
828
829/// Public statistics snapshot
830#[derive(Debug, Clone)]
831pub struct Stats {
832 pub transactions_started: u64,
833 pub transactions_committed: u64,
834 pub transactions_aborted: u64,
835 pub queries_executed: u64,
836 pub bytes_written: u64,
837 pub bytes_read: u64,
838}
839
840impl Database {
841 /// Open or create a database at the given path.
842 ///
843 /// This is the primary entry point, similar to `sqlite3_open()`.
844 /// If the database exists, it will be opened and WAL recovery performed.
845 /// If it doesn't exist, a new database will be created.
846 ///
847 /// # Arguments
848 ///
849 /// * `path` - Directory path for the database files
850 ///
851 /// # Returns
852 ///
853 /// An `Arc<Database>` that can be shared across threads and connections.
854 pub fn open<P: AsRef<Path>>(path: P) -> Result<Arc<Self>> {
855 Self::open_with_config(path, DatabaseConfig::default())
856 }
857
858 /// Open with custom configuration
859 pub fn open_with_config<P: AsRef<Path>>(path: P, config: DatabaseConfig) -> Result<Arc<Self>> {
860 let path = path.as_ref().to_path_buf();
861
862 // Use IndexPolicy-based storage configuration for automatic memtable selection
863 // This derives ordered index and memtable type from the policy
864 let storage = Arc::new(DurableStorage::open_with_policy(
865 &path,
866 config.default_index_policy,
867 config.group_commit,
868 )?);
869
870 // Create index registry with default policy from config
871 let index_registry = Arc::new(TableIndexRegistry::with_default_policy(
872 config.default_index_policy,
873 ));
874
875 let db = Arc::new(Self {
876 path: path.clone(),
877 storage,
878 catalog: Arc::new(RwLock::new(Catalog::new("sochdb"))),
879 tables: DashMap::new(),
880 packed_schemas: DashMap::new(),
881 index_registry,
882 config,
883 stats: DatabaseStats::new(),
884 shutdown: AtomicU64::new(0),
885 });
886
887 // Perform crash recovery if needed
888 db.recover()?;
889
890 Ok(db)
891 }
892
893 /// Perform crash recovery
894 fn recover(&self) -> Result<RecoveryStats> {
895 self.storage.recover()
896 }
897
898 /// Get database path
899 pub fn path(&self) -> &Path {
900 &self.path
901 }
902
903 // =========================================================================
904 // Transaction API
905 // =========================================================================
906
907 /// Begin a new transaction
908 pub fn begin_transaction(&self) -> Result<TxnHandle> {
909 self.stats
910 .transactions_started
911 .fetch_add(1, Ordering::Relaxed);
912 let txn_id = self.storage.begin_transaction()?;
913
914 // Get snapshot timestamp from MVCC
915 // For now, use txn_id as a proxy (the real snapshot_ts is managed internally)
916 Ok(TxnHandle {
917 txn_id,
918 snapshot_ts: txn_id,
919 })
920 }
921
922 /// Begin a read-only transaction (optimized: no SSI tracking)
923 ///
924 /// Read-only transactions skip SSI read tracking, reducing overhead
925 /// from ~82ns to ~32ns per read (2.6x faster).
926 ///
927 /// Use this for:
928 /// - SELECT queries that don't modify data
929 /// - Analytics and reporting queries
930 /// - Snapshot reads for backup
931 pub fn begin_read_only(&self) -> Result<TxnHandle> {
932 self.stats
933 .transactions_started
934 .fetch_add(1, Ordering::Relaxed);
935 let txn_id = self.storage.begin_with_mode(TransactionMode::ReadOnly)?;
936 Ok(TxnHandle {
937 txn_id,
938 snapshot_ts: txn_id,
939 })
940 }
941
942 /// Begin a write-only transaction (optimized: no read tracking)
943 ///
944 /// Write-only transactions skip read tracking, improving insert
945 /// throughput for bulk loading scenarios.
946 ///
947 /// Use this for:
948 /// - Bulk data imports
949 /// - Append-only logging tables
950 /// - ETL pipelines
951 pub fn begin_write_only(&self) -> Result<TxnHandle> {
952 self.stats
953 .transactions_started
954 .fetch_add(1, Ordering::Relaxed);
955 let txn_id = self.storage.begin_with_mode(TransactionMode::WriteOnly)?;
956 Ok(TxnHandle {
957 txn_id,
958 snapshot_ts: txn_id,
959 })
960 }
961
962 /// Commit a transaction
963 pub fn commit(&self, txn: TxnHandle) -> Result<u64> {
964 self.stats
965 .transactions_committed
966 .fetch_add(1, Ordering::Relaxed);
967 self.storage.commit(txn.txn_id)
968 }
969
970 /// Abort a transaction
971 pub fn abort(&self, txn: TxnHandle) -> Result<()> {
972 self.stats
973 .transactions_aborted
974 .fetch_add(1, Ordering::Relaxed);
975 self.storage.abort(txn.txn_id)
976 }
977
978 // =========================================================================
979 // Per-Table Index Policy API
980 // =========================================================================
981
982 /// Configure index policy for a table
983 ///
984 /// This allows fine-grained control over write/scan trade-offs per table:
985 ///
986 /// | Policy | Insert Cost | Scan Cost | Use Case |
987 /// |----------------|-------------|----------------|------------------------|
988 /// | WriteOptimized | O(1) | O(N) | High-write, rare scan |
989 /// | Balanced | O(1) amort | O(output+logK) | Mixed OLTP |
990 /// | ScanOptimized | O(log N) | O(logN + K) | Analytics, range query |
991 /// | AppendOnly | O(1) | O(N) | Time-series logs |
992 ///
993 /// # Example
994 ///
995 /// ```ignore
996 /// // Fast inserts for logs table (no ordered index overhead)
997 /// db.set_table_index_policy("logs", IndexPolicy::WriteOptimized);
998 ///
999 /// // Efficient range scans for analytics table
1000 /// db.set_table_index_policy("analytics", IndexPolicy::ScanOptimized);
1001 ///
1002 /// // Balanced for OLTP tables
1003 /// db.set_table_index_policy("users", IndexPolicy::Balanced);
1004 /// ```
1005 pub fn set_table_index_policy(&self, table: &str, policy: IndexPolicy) {
1006 self.index_registry.configure_table(
1007 TableIndexConfig::new(table, policy)
1008 );
1009 }
1010
1011 /// Get the index policy for a table
1012 pub fn get_table_index_policy(&self, table: &str) -> IndexPolicy {
1013 self.index_registry.get_policy(table)
1014 }
1015
1016 /// Get the index registry for advanced configuration
1017 pub fn index_registry(&self) -> &Arc<TableIndexRegistry> {
1018 &self.index_registry
1019 }
1020
1021 // =========================================================================
1022 // Key-Value API (Low-level)
1023 // =========================================================================
1024
1025 /// Put a key-value pair
1026 pub fn put(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1027 self.stats
1028 .bytes_written
1029 .fetch_add((key.len() + value.len()) as u64, Ordering::Relaxed);
1030 // Use write_refs to avoid unnecessary allocations
1031 self.storage.write_refs(txn.txn_id, key, value)
1032 }
1033
1034 /// Batch put multiple key-value pairs with reduced overhead
1035 ///
1036 /// This amortizes per-operation costs over the entire batch:
1037 /// - Single DashMap lookup
1038 /// - Batch MVCC tracking
1039 /// - Batch memtable writes
1040 ///
1041 /// For 100+ entries, this is 2-3x faster than individual puts.
1042 ///
1043 /// # Example
1044 ///
1045 /// ```ignore
1046 /// let writes: Vec<(&[u8], &[u8])> = vec![
1047 /// (b"key1", b"value1"),
1048 /// (b"key2", b"value2"),
1049 /// (b"key3", b"value3"),
1050 /// ];
1051 /// db.put_batch(txn, &writes)?;
1052 /// ```
1053 pub fn put_batch(&self, txn: TxnHandle, writes: &[(&[u8], &[u8])]) -> Result<()> {
1054 let bytes: u64 = writes
1055 .iter()
1056 .map(|(k, v)| (k.len() + v.len()) as u64)
1057 .sum();
1058 self.stats.bytes_written.fetch_add(bytes, Ordering::Relaxed);
1059 self.storage.write_batch_refs(txn.txn_id, writes)
1060 }
1061
1062 /// Get a value by key
1063 pub fn get(&self, txn: TxnHandle, key: &[u8]) -> Result<Option<Vec<u8>>> {
1064 let result = self.storage.read(txn.txn_id, key)?;
1065 if let Some(ref data) = result {
1066 self.stats
1067 .bytes_read
1068 .fetch_add(data.len() as u64, Ordering::Relaxed);
1069 }
1070 Ok(result)
1071 }
1072
1073 /// Delete a key
1074 pub fn delete(&self, txn: TxnHandle, key: &[u8]) -> Result<()> {
1075 self.storage.delete(txn.txn_id, key.to_vec())
1076 }
1077
1078 /// Minimum prefix length for scan operations.
1079 /// Prevents expensive full-table scans by requiring a meaningful prefix.
1080 pub const MIN_SCAN_PREFIX_LEN: usize = 2;
1081
1082 /// Scan keys with a prefix (enforces minimum prefix length for safety).
1083 ///
1084 /// # Prefix Safety
1085 ///
1086 /// To prevent accidental full-table scans, this method requires a minimum
1087 /// prefix length of 2 bytes. Use `scan_unchecked` for internal operations
1088 /// that need empty/short prefixes.
1089 ///
1090 /// # Errors
1091 ///
1092 /// Returns `SochDBError::InvalidInput` if prefix is too short.
1093 pub fn scan(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1094 if prefix.len() < Self::MIN_SCAN_PREFIX_LEN {
1095 return Err(SochDBError::InvalidArgument(format!(
1096 "Prefix too short: {} bytes (minimum {} required). \
1097 Use scan_unchecked() for unrestricted scans.",
1098 prefix.len(),
1099 Self::MIN_SCAN_PREFIX_LEN
1100 )));
1101 }
1102 self.scan_unchecked(txn, prefix)
1103 }
1104
1105 /// Scan keys with a prefix without length validation.
1106 ///
1107 /// # Warning
1108 ///
1109 /// This method allows empty/short prefixes which can cause expensive
1110 /// full-table scans. Use `scan()` unless you specifically need unrestricted
1111 /// prefix access for internal operations.
1112 pub fn scan_unchecked(&self, txn: TxnHandle, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1113 let results = self.storage.scan(txn.txn_id, prefix)?;
1114 let bytes: u64 = results
1115 .iter()
1116 .map(|(k, v)| (k.len() + v.len()) as u64)
1117 .sum();
1118 self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1119 Ok(results)
1120 }
1121
1122 /// Scan keys in range
1123 pub fn scan_range(
1124 &self,
1125 txn: TxnHandle,
1126 start: &[u8],
1127 end: &[u8],
1128 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
1129 let results = self.storage.scan_range(txn.txn_id, start, end)?;
1130 let bytes: u64 = results
1131 .iter()
1132 .map(|(k, v)| (k.len() + v.len()) as u64)
1133 .sum();
1134 self.stats.bytes_read.fetch_add(bytes, Ordering::Relaxed);
1135 Ok(results)
1136 }
1137
1138 /// Streaming scan for very large result sets
1139 ///
1140 /// Returns an iterator that yields (key, value) pairs without
1141 /// materializing the entire result set. Use this for large scans
1142 /// where memory efficiency is important.
1143 ///
1144 /// ## Performance
1145 ///
1146 /// - Memory: O(1) per iteration vs O(N) for scan_range
1147 /// - Latency: First result available immediately vs waiting for all results
1148 /// - Throughput: Slightly lower due to per-item overhead
1149 ///
1150 /// ## Usage
1151 ///
1152 /// ```ignore
1153 /// for result in db.scan_range_iter(txn, b"start", b"end") {
1154 /// let (key, value) = result?;
1155 /// // Process immediately - no need to wait for all results
1156 /// }
1157 /// ```
1158 pub fn scan_range_iter<'a>(
1159 &'a self,
1160 txn: TxnHandle,
1161 start: &'a [u8],
1162 end: &'a [u8],
1163 ) -> impl Iterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a {
1164 let stats = &self.stats;
1165 self.storage
1166 .scan_range_iter(txn.txn_id, start, end)
1167 .map(move |item| {
1168 stats.bytes_read.fetch_add(
1169 (item.0.len() + item.1.len()) as u64,
1170 Ordering::Relaxed,
1171 );
1172 Ok(item)
1173 })
1174 }
1175
1176 /// Flush memtable to WAL/Disk
1177 pub fn flush(&self) -> Result<()> {
1178 self.storage.fsync()
1179 }
1180
1181 // =========================================================================
1182 // Path-Native API (SochDB's differentiator)
1183 // =========================================================================
1184
1185 /// Get storage statistics
1186 pub fn storage_stats(&self) -> crate::durable_storage::StorageStats {
1187 self.storage.stats()
1188 }
1189
1190 /// Put a value at a path
1191 ///
1192 /// Path format: "collection/doc_id/field" or "table.row_id.column"
1193 /// Resolution is O(|path|), not O(log N) like B-tree.
1194 pub fn put_path(&self, txn: TxnHandle, path: &str, value: &[u8]) -> Result<()> {
1195 self.put(txn, path.as_bytes(), value)
1196 }
1197
1198 /// Get a value at a path
1199 pub fn get_path(&self, txn: TxnHandle, path: &str) -> Result<Option<Vec<u8>>> {
1200 self.get(txn, path.as_bytes())
1201 }
1202
1203 /// Delete at a path
1204 pub fn delete_path(&self, txn: TxnHandle, path: &str) -> Result<()> {
1205 self.delete(txn, path.as_bytes())
1206 }
1207
1208 /// Scan a path prefix
1209 ///
1210 /// Returns all key-value pairs where key starts with prefix.
1211 /// Useful for: "users/123/" -> all fields of user 123
1212 pub fn scan_path(&self, txn: TxnHandle, prefix: &str) -> Result<Vec<(String, Vec<u8>)>> {
1213 self.stats.queries_executed.fetch_add(1, Ordering::Relaxed);
1214
1215 let results = self.scan(txn, prefix.as_bytes())?;
1216
1217 Ok(results
1218 .into_iter()
1219 .filter_map(|(k, v)| String::from_utf8(k).ok().map(|path| (path, v)))
1220 .collect())
1221 }
1222
1223 // =========================================================================
1224 // Query API
1225 // =========================================================================
1226
1227 /// Execute a path query and return results
1228 ///
1229 /// This is the main query interface for LLM context retrieval.
1230 /// Supports:
1231 /// - Path prefix matching
1232 /// - Column projection (for I/O reduction)
1233 /// - Limit/offset
1234 pub fn query(&self, txn: TxnHandle, path_prefix: &str) -> QueryBuilder<'_> {
1235 QueryBuilder::new(self, txn, path_prefix.to_string())
1236 }
1237
1238 // =========================================================================
1239 // Table API (Higher-level abstraction)
1240 // =========================================================================
1241
1242 /// Register a table schema
1243 pub fn register_table(&self, schema: TableSchema) -> Result<()> {
1244 if self.tables.contains_key(&schema.name) {
1245 return Err(SochDBError::InvalidArgument(format!(
1246 "Table '{}' already exists",
1247 schema.name
1248 )));
1249 }
1250 // Cache the packed schema for fast inserts
1251 let packed_schema = Self::to_packed_schema(&schema);
1252 self.packed_schemas
1253 .insert(schema.name.clone(), packed_schema);
1254 self.tables.insert(schema.name.clone(), schema);
1255 Ok(())
1256 }
1257
1258 /// Get table schema
1259 pub fn get_table_schema(&self, name: &str) -> Option<TableSchema> {
1260 self.tables.get(name).map(|s| s.clone())
1261 }
1262
1263 /// List all tables
1264 pub fn list_tables(&self) -> Vec<String> {
1265 self.tables.iter().map(|e| e.key().clone()).collect()
1266 }
1267 /// Convert TableSchema to PackedTableSchema for efficient storage
1268 fn to_packed_schema(schema: &TableSchema) -> PackedTableSchema {
1269 let columns = schema
1270 .columns
1271 .iter()
1272 .map(|col| PackedColumnDef {
1273 name: col.name.clone(),
1274 col_type: match col.col_type {
1275 ColumnType::Int64 => PackedColumnType::Int64,
1276 ColumnType::UInt64 => PackedColumnType::UInt64,
1277 ColumnType::Float64 => PackedColumnType::Float64,
1278 ColumnType::Text => PackedColumnType::Text,
1279 ColumnType::Binary => PackedColumnType::Binary,
1280 ColumnType::Bool => PackedColumnType::Bool,
1281 },
1282 nullable: col.nullable,
1283 })
1284 .collect();
1285
1286 PackedTableSchema::new(&schema.name, columns)
1287 }
1288
1289 /// Insert a row into a table
1290 ///
1291 /// Uses packed row format: stores entire row as single key-value pair.
1292 /// This reduces write amplification from 4× to 1× for a 4-column table.
1293 ///
1294 /// # Performance
1295 /// - Before: 4 columns × (WAL entry + MVCC version) = 4 writes
1296 /// - After: 1 packed row = 1 write
1297 /// - Improvement: ~4× fewer WAL entries, ~48% less I/O overhead
1298 pub fn insert_row(
1299 &self,
1300 txn: TxnHandle,
1301 table: &str,
1302 row_id: u64,
1303 values: &HashMap<String, SochValue>,
1304 ) -> Result<()> {
1305 // Use cached packed schema - single DashMap lookup, no clone
1306 let packed_schema = self
1307 .packed_schemas
1308 .get(table)
1309 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1310
1311 // Pack the row using cached schema
1312 let packed_row = PackedRow::pack(&packed_schema, values);
1313
1314 // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1315 let key = KeyBuffer::format_row_key(table, row_id);
1316
1317 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1318
1319 Ok(())
1320 }
1321
1322 /// Read a row from a table
1323 ///
1324 /// Reads packed row and extracts requested columns in O(k) time.
1325 /// Column projection happens in memory, not storage - all columns are fetched.
1326 pub fn read_row(
1327 &self,
1328 txn: TxnHandle,
1329 table: &str,
1330 row_id: u64,
1331 columns: Option<&[&str]>,
1332 ) -> Result<Option<HashMap<String, SochValue>>> {
1333 let schema = self
1334 .tables
1335 .get(table)
1336 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1337
1338 // Read the packed row with a single key lookup using KeyBuffer
1339 let key = KeyBuffer::format_row_key(table, row_id);
1340 let bytes = match self.get(txn, key.as_bytes())? {
1341 Some(b) => b,
1342 None => return Ok(None),
1343 };
1344
1345 // Use cached packed schema
1346 let packed_schema = self
1347 .packed_schemas
1348 .get(table)
1349 .ok_or_else(|| SochDBError::Internal("Packed schema not found".into()))?;
1350 let packed_row = PackedRow::from_bytes(bytes, packed_schema.num_columns())?;
1351
1352 // Determine which columns to return
1353 let cols_to_read: Vec<&str> = match columns {
1354 Some(c) => c.to_vec(),
1355 None => schema.columns.iter().map(|c| c.name.as_str()).collect(),
1356 };
1357
1358 let mut row = HashMap::new();
1359 for col_name in cols_to_read {
1360 if let Some(idx) = packed_schema.column_index(col_name)
1361 && let Some(col_def) = packed_schema.column(idx)
1362 && let Some(value) = packed_row.get_column(idx, col_def.col_type)
1363 {
1364 row.insert(col_name.to_string(), value);
1365 }
1366 }
1367
1368 Ok(Some(row))
1369 }
1370
1371 /// Insert multiple rows efficiently in a batch
1372 ///
1373 /// This method accumulates all rows and writes them with fewer WAL syncs.
1374 /// Ideal for bulk loading scenarios.
1375 ///
1376 /// # Performance
1377 /// - Uses group commit to batch fsync operations
1378 /// - Expected throughput: 500K-1M rows/sec depending on row size
1379 pub fn insert_rows_batch(
1380 &self,
1381 txn: TxnHandle,
1382 table: &str,
1383 rows: &[(u64, HashMap<String, SochValue>)],
1384 ) -> Result<usize> {
1385 // Use cached packed schema
1386 let packed_schema = self
1387 .packed_schemas
1388 .get(table)
1389 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1390
1391 let mut count = 0;
1392
1393 for (row_id, values) in rows {
1394 // Pack and write using KeyBuffer for efficient key construction
1395 let packed_row = PackedRow::pack(&packed_schema, values);
1396 let key = KeyBuffer::format_row_key(table, *row_id);
1397 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1398 count += 1;
1399 }
1400
1401 Ok(count)
1402 }
1403
1404 /// Ultra-fast raw put - bypasses all validation
1405 ///
1406 /// Use when you've already validated the data and just need speed.
1407 /// This is ~10× faster than insert_row() for bulk inserts.
1408 #[inline]
1409 pub fn put_raw(&self, txn: TxnHandle, key: &[u8], value: &[u8]) -> Result<()> {
1410 self.storage.write_refs(txn.txn_id, key, value)
1411 }
1412
1413 /// Zero-allocation insert - fastest path for bulk inserts
1414 ///
1415 /// Takes values as a slice in schema column order, avoiding HashMap overhead.
1416 ///
1417 /// # Arguments
1418 /// * `txn` - Transaction handle
1419 /// * `table` - Table name
1420 /// * `row_id` - Row identifier
1421 /// * `values` - Values in schema column order (None = NULL)
1422 ///
1423 /// # Performance
1424 /// - Eliminates ~6 allocations per row vs insert_row()
1425 /// - Expected: 1.2M-1.5M inserts/sec
1426 ///
1427 /// # Example
1428 /// ```ignore
1429 /// let values: &[Option<&SochValue>] = &[
1430 /// Some(&SochValue::Int(1)),
1431 /// Some(&SochValue::Text("Alice".into())),
1432 /// None, // NULL
1433 /// ];
1434 /// db.insert_row_slice(txn, "users", 1, values)?;
1435 /// ```
1436 #[inline]
1437 pub fn insert_row_slice(
1438 &self,
1439 txn: TxnHandle,
1440 table: &str,
1441 row_id: u64,
1442 values: &[Option<&SochValue>],
1443 ) -> Result<()> {
1444 // Use cached packed schema - single DashMap lookup
1445 let packed_schema = self
1446 .packed_schemas
1447 .get(table)
1448 .ok_or_else(|| SochDBError::InvalidArgument(format!("Table '{}' not found", table)))?;
1449
1450 // Validate column count matches
1451 if values.len() != packed_schema.num_columns() {
1452 return Err(SochDBError::InvalidArgument(format!(
1453 "Expected {} columns, got {}",
1454 packed_schema.num_columns(),
1455 values.len()
1456 )));
1457 }
1458
1459 // Pack using zero-allocation path
1460 let packed_row = PackedRow::pack_slice(&packed_schema, values);
1461
1462 // Build key using KeyBuffer - optimized stack allocation (~12-15ns vs ~30-35ns for write!())
1463 let key = KeyBuffer::format_row_key(table, row_id);
1464
1465 self.put(txn, key.as_bytes(), packed_row.as_bytes())?;
1466 Ok(())
1467 }
1468
1469 // =========================================================================
1470 // Maintenance
1471 // =========================================================================
1472
1473 /// Force fsync to disk
1474 pub fn fsync(&self) -> Result<()> {
1475 self.storage.fsync()
1476 }
1477
1478 /// Create a checkpoint
1479 pub fn checkpoint(&self) -> Result<u64> {
1480 self.storage.checkpoint()
1481 }
1482
1483 /// Run garbage collection
1484 pub fn gc(&self) -> usize {
1485 self.storage.gc()
1486 }
1487
1488 /// Get database statistics
1489 pub fn stats(&self) -> Stats {
1490 Stats {
1491 transactions_started: self.stats.transactions_started.load(Ordering::Relaxed),
1492 transactions_committed: self.stats.transactions_committed.load(Ordering::Relaxed),
1493 transactions_aborted: self.stats.transactions_aborted.load(Ordering::Relaxed),
1494 queries_executed: self.stats.queries_executed.load(Ordering::Relaxed),
1495 bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
1496 bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
1497 }
1498 }
1499
1500 /// Shutdown the database gracefully
1501 pub fn shutdown(&self) -> Result<()> {
1502 if self.shutdown.swap(1, Ordering::SeqCst) == 1 {
1503 return Ok(()); // Already shutting down
1504 }
1505
1506 // Flush any pending writes
1507 self.fsync()?;
1508
1509 // Create clean shutdown marker
1510 let marker = self.path.join(".clean_shutdown");
1511 std::fs::write(&marker, b"ok")?;
1512
1513 Ok(())
1514 }
1515}
1516
1517impl Drop for Database {
1518 fn drop(&mut self) {
1519 // Try graceful shutdown if not already done
1520 if self.shutdown.load(Ordering::SeqCst) == 0 {
1521 let _ = self.fsync();
1522 let marker = self.path.join(".clean_shutdown");
1523 let _ = std::fs::write(&marker, b"ok");
1524 }
1525 }
1526}
1527
1528/// Query builder for fluent query construction
1529pub struct QueryBuilder<'a> {
1530 db: &'a Database,
1531 txn: TxnHandle,
1532 path_prefix: String,
1533 columns: Option<Vec<String>>,
1534 limit: Option<usize>,
1535 offset: Option<usize>,
1536}
1537
1538impl<'a> QueryBuilder<'a> {
1539 fn new(db: &'a Database, txn: TxnHandle, path_prefix: String) -> Self {
1540 Self {
1541 db,
1542 txn,
1543 path_prefix,
1544 columns: None,
1545 limit: None,
1546 offset: None,
1547 }
1548 }
1549
1550 /// Select specific columns (for I/O reduction)
1551 pub fn columns(mut self, cols: &[&str]) -> Self {
1552 self.columns = Some(cols.iter().map(|s| s.to_string()).collect());
1553 self
1554 }
1555
1556 /// Limit results
1557 pub fn limit(mut self, n: usize) -> Self {
1558 self.limit = Some(n);
1559 self
1560 }
1561
1562 /// Skip results
1563 pub fn offset(mut self, n: usize) -> Self {
1564 self.offset = Some(n);
1565 self
1566 }
1567
1568 /// Execute the query
1569 ///
1570 /// Scans packed rows and unpacks them. Each key is "table/row_id" pointing to a packed row.
1571 pub fn execute(self) -> Result<QueryResult> {
1572 self.db
1573 .stats
1574 .queries_executed
1575 .fetch_add(1, Ordering::Relaxed);
1576
1577 // Get schema for the table if we're querying a table
1578 let table_name = self
1579 .path_prefix
1580 .split('/')
1581 .next()
1582 .unwrap_or(&self.path_prefix);
1583 let schema = self.db.tables.get(table_name).map(|s| s.clone());
1584
1585 // Scan the path prefix
1586 let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1587
1588 let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
1589 let mut bytes_read = 0usize;
1590
1591 if let Some(ref schema) = schema {
1592 // We have a table schema - use cached packed schema
1593 let packed_schema = self
1594 .db
1595 .packed_schemas
1596 .get(table_name)
1597 .map(|ps| ps.clone())
1598 .unwrap_or_else(|| Database::to_packed_schema(schema));
1599
1600 for (path, value_bytes) in results {
1601 // Parse path: table/row_id
1602 let parts: Vec<&str> = path.split('/').collect();
1603 if parts.len() == 2 {
1604 // This is a packed row
1605 bytes_read += value_bytes.len();
1606
1607 if let Ok(packed_row) =
1608 PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
1609 {
1610 // Unpack all columns or just requested columns
1611 let mut row = HashMap::new();
1612
1613 if let Some(ref cols) = self.columns {
1614 // Only extract requested columns
1615 for col_name in cols {
1616 if let Some(idx) = packed_schema.column_index(col_name)
1617 && let Some(col_def) = packed_schema.column(idx)
1618 && let Some(value) =
1619 packed_row.get_column(idx, col_def.col_type)
1620 {
1621 row.insert(col_name.clone(), value);
1622 }
1623 }
1624 } else {
1625 // Extract all columns
1626 row = packed_row.unpack(&packed_schema);
1627 }
1628
1629 if !row.is_empty() {
1630 rows.push(row);
1631 }
1632 }
1633 }
1634 }
1635 } else {
1636 // Fallback: no schema, try legacy column-per-key format
1637 let mut rows_map: HashMap<String, HashMap<String, SochValue>> = HashMap::new();
1638
1639 for (path, value_bytes) in results {
1640 let parts: Vec<&str> = path.split('/').collect();
1641 if parts.len() >= 3 {
1642 let row_key = format!("{}/{}", parts[0], parts[1]);
1643 let col_name = parts[2..].join("/");
1644
1645 if let Some(ref cols) = self.columns
1646 && !cols.contains(&col_name)
1647 {
1648 continue;
1649 }
1650
1651 bytes_read += value_bytes.len();
1652 let row = rows_map.entry(row_key).or_default();
1653 row.insert(col_name, deserialize_value(&value_bytes));
1654 }
1655 }
1656
1657 rows = rows_map.into_values().collect();
1658 }
1659
1660 // Apply offset
1661 if let Some(offset) = self.offset {
1662 rows = rows.into_iter().skip(offset).collect();
1663 }
1664
1665 // Apply limit
1666 if let Some(limit) = self.limit {
1667 rows.truncate(limit);
1668 }
1669
1670 // Collect column names
1671 let columns: Vec<String> = self.columns.unwrap_or_else(|| {
1672 rows.iter()
1673 .flat_map(|r| r.keys().cloned())
1674 .collect::<std::collections::HashSet<_>>()
1675 .into_iter()
1676 .collect()
1677 });
1678
1679 Ok(QueryResult {
1680 columns,
1681 rows_scanned: rows.len(),
1682 bytes_read,
1683 rows,
1684 })
1685 }
1686
1687 /// Execute and return TOON format (for LLM efficiency)
1688 pub fn to_toon(self) -> Result<String> {
1689 let result = self.execute()?;
1690 Ok(result.to_toon())
1691 }
1692
1693 /// Execute with lazy iteration - avoids materializing all rows
1694 ///
1695 /// Returns an iterator over rows as `Vec<SochValue>` in schema column order.
1696 /// This is more memory-efficient than `execute()` for large result sets.
1697 ///
1698 /// # Performance
1699 /// - No upfront materialization of all rows
1700 /// - ~40% less memory for large result sets
1701 /// - Ideal for streaming to network or aggregations
1702 ///
1703 /// # Example
1704 /// ```ignore
1705 /// for row_result in db.query(txn, "users").execute_iter()? {
1706 /// let row = row_result?;
1707 /// // row is Vec<SochValue> in column order
1708 /// }
1709 /// ```
1710 pub fn execute_iter(self) -> Result<QueryRowIterator> {
1711 self.db
1712 .stats
1713 .queries_executed
1714 .fetch_add(1, Ordering::Relaxed);
1715
1716 let table_name = self
1717 .path_prefix
1718 .split('/')
1719 .next()
1720 .unwrap_or(&self.path_prefix)
1721 .to_string();
1722
1723 // Get packed schema (clone needed for iterator ownership)
1724 let packed_schema = self.db.packed_schemas.get(&table_name).map(|ps| ps.clone());
1725
1726 // Scan the path prefix
1727 let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1728
1729 Ok(QueryRowIterator {
1730 results: results.into_iter(),
1731 packed_schema,
1732 columns: self.columns,
1733 offset: self.offset.unwrap_or(0),
1734 limit: self.limit,
1735 yielded: 0,
1736 skipped: 0,
1737 })
1738 }
1739
1740 /// Execute and return columnar (SIMD-friendly) result format
1741 ///
1742 /// Instead of row-oriented `Vec<HashMap<String, SochValue>>`, returns
1743 /// column-oriented `Vec<TypedColumn>` for vectorized operations.
1744 ///
1745 /// ## Performance Benefits
1746 ///
1747 /// - SIMD: Aggregate operations (sum, avg) use vectorized instructions
1748 /// - Cache: Sequential access maximizes L1/L2 hits
1749 /// - Memory: ~30% less overhead than row-based format
1750 /// - Analytics: Ideal for ML preprocessing and statistics
1751 ///
1752 /// ## Example
1753 ///
1754 /// ```ignore
1755 /// let result = db.query(txn, "users")
1756 /// .columns(&["id", "score"])
1757 /// .as_columnar()?;
1758 ///
1759 /// // SIMD-optimized sum
1760 /// let total = result.sum_i64("score").unwrap_or(0);
1761 ///
1762 /// // Direct column access
1763 /// if let Some(scores) = result.column("score") {
1764 /// for i in 0..scores.len() {
1765 /// if let Some(v) = scores.get_i64(i) {
1766 /// println!("Score: {}", v);
1767 /// }
1768 /// }
1769 /// }
1770 /// ```
1771 pub fn as_columnar(self) -> Result<ColumnarQueryResult> {
1772 self.db
1773 .stats
1774 .queries_executed
1775 .fetch_add(1, Ordering::Relaxed);
1776
1777 let table_name = self
1778 .path_prefix
1779 .split('/')
1780 .next()
1781 .unwrap_or(&self.path_prefix);
1782 let schema = self.db.tables.get(table_name).map(|s| s.clone());
1783
1784 // Get packed schema
1785 let packed_schema = match self.db.packed_schemas.get(table_name) {
1786 Some(ps) => ps.clone(),
1787 None => return Ok(ColumnarQueryResult::empty()),
1788 };
1789
1790 // Determine columns to fetch
1791 let column_names: Vec<String> = self.columns.clone().unwrap_or_else(|| {
1792 schema
1793 .as_ref()
1794 .map(|s| s.columns.iter().map(|c| c.name.clone()).collect())
1795 .unwrap_or_default()
1796 });
1797
1798 if column_names.is_empty() {
1799 return Ok(ColumnarQueryResult::empty());
1800 }
1801
1802 // Initialize TypedColumns based on schema types
1803 let mut columns: Vec<CoreTypedColumn> = column_names
1804 .iter()
1805 .map(|col_name| {
1806 packed_schema
1807 .column_index(col_name)
1808 .and_then(|idx| packed_schema.column(idx))
1809 .map(|col_def| match col_def.col_type {
1810 PackedColumnType::Int64 => CoreTypedColumn::new_int64(),
1811 PackedColumnType::UInt64 => CoreTypedColumn::new_uint64(),
1812 PackedColumnType::Float64 => CoreTypedColumn::new_float64(),
1813 PackedColumnType::Text => CoreTypedColumn::new_text(),
1814 PackedColumnType::Binary => CoreTypedColumn::new_binary(),
1815 PackedColumnType::Bool => CoreTypedColumn::new_bool(),
1816 PackedColumnType::Null => CoreTypedColumn::new_text(), // Null column = fallback to text
1817 })
1818 .unwrap_or_else(CoreTypedColumn::new_text) // fallback
1819 })
1820 .collect();
1821
1822 // Scan the path prefix
1823 let results = self.db.scan_path(self.txn, &self.path_prefix)?;
1824
1825 let mut row_count = 0;
1826 let mut bytes_read = 0;
1827 let mut skipped = 0;
1828
1829 for (path, value_bytes) in results {
1830 // Parse path: table/row_id
1831 let parts: Vec<&str> = path.split('/').collect();
1832 if parts.len() != 2 {
1833 continue;
1834 }
1835
1836 // Apply offset
1837 if let Some(offset) = self.offset
1838 && skipped < offset
1839 {
1840 skipped += 1;
1841 continue;
1842 }
1843
1844 // Apply limit
1845 if let Some(limit) = self.limit
1846 && row_count >= limit
1847 {
1848 break;
1849 }
1850
1851 bytes_read += value_bytes.len();
1852
1853 if let Ok(packed_row) = PackedRow::from_bytes(value_bytes, packed_schema.num_columns())
1854 {
1855 // Extract each column and push to corresponding TypedColumn
1856 for (col_idx, col_name) in column_names.iter().enumerate() {
1857 if let Some(schema_idx) = packed_schema.column_index(col_name) {
1858 if let Some(col_def) = packed_schema.column(schema_idx) {
1859 let value = packed_row.get_column(schema_idx, col_def.col_type);
1860 push_value_to_typed_column(&mut columns[col_idx], value);
1861 } else {
1862 push_null_to_typed_column(&mut columns[col_idx]);
1863 }
1864 } else {
1865 push_null_to_typed_column(&mut columns[col_idx]);
1866 }
1867 }
1868 row_count += 1;
1869 }
1870 }
1871
1872 Ok(ColumnarQueryResult {
1873 columns: column_names,
1874 data: columns,
1875 row_count,
1876 bytes_read,
1877 })
1878 }
1879}
1880
1881/// Lazy iterator over query results
1882///
1883/// Unpacks rows on-demand, avoiding upfront materialization.
1884pub struct QueryRowIterator {
1885 results: std::vec::IntoIter<(String, Vec<u8>)>,
1886 packed_schema: Option<PackedTableSchema>,
1887 columns: Option<Vec<String>>,
1888 offset: usize,
1889 limit: Option<usize>,
1890 yielded: usize,
1891 skipped: usize,
1892}
1893
1894impl Iterator for QueryRowIterator {
1895 type Item = Result<Vec<SochValue>>;
1896
1897 fn next(&mut self) -> Option<Self::Item> {
1898 // Check limit
1899 if let Some(limit) = self.limit
1900 && self.yielded >= limit
1901 {
1902 return None;
1903 }
1904
1905 loop {
1906 let (path, value_bytes) = self.results.next()?;
1907
1908 // Parse path: table/row_id
1909 let parts: Vec<&str> = path.split('/').collect();
1910 if parts.len() != 2 {
1911 continue; // Skip non-row entries
1912 }
1913
1914 // Apply offset
1915 if self.skipped < self.offset {
1916 self.skipped += 1;
1917 continue;
1918 }
1919
1920 if let Some(ref schema) = self.packed_schema {
1921 match PackedRow::from_bytes(value_bytes, schema.num_columns()) {
1922 Ok(packed_row) => {
1923 let row = if let Some(ref cols) = self.columns {
1924 // Project specific columns
1925 cols.iter()
1926 .map(|col_name| {
1927 schema
1928 .column_index(col_name)
1929 .and_then(|idx| schema.column(idx))
1930 .and_then(|col_def| {
1931 packed_row.get_column(
1932 schema.column_index(col_name).unwrap(),
1933 col_def.col_type,
1934 )
1935 })
1936 .unwrap_or(SochValue::Null)
1937 })
1938 .collect()
1939 } else {
1940 // All columns in order
1941 packed_row.unpack_to_vec(schema)
1942 };
1943
1944 self.yielded += 1;
1945 return Some(Ok(row));
1946 }
1947 Err(e) => return Some(Err(e)),
1948 }
1949 } else {
1950 // No schema - return raw bytes as binary
1951 self.yielded += 1;
1952 return Some(Ok(vec![SochValue::Binary(value_bytes)]));
1953 }
1954 }
1955 }
1956}
1957
1958// Helper functions for serialization (kept for backward compatibility with legacy data)
1959
1960#[allow(dead_code)]
1961fn serialize_value(value: &SochValue) -> Vec<u8> {
1962 // Simple serialization - in production use proper format
1963 match value {
1964 SochValue::Null => vec![0],
1965 SochValue::Int(i) => {
1966 let mut buf = vec![1];
1967 buf.extend_from_slice(&i.to_le_bytes());
1968 buf
1969 }
1970 SochValue::UInt(u) => {
1971 let mut buf = vec![2];
1972 buf.extend_from_slice(&u.to_le_bytes());
1973 buf
1974 }
1975 SochValue::Float(f) => {
1976 let mut buf = vec![3];
1977 buf.extend_from_slice(&f.to_le_bytes());
1978 buf
1979 }
1980 SochValue::Text(s) => {
1981 let mut buf = vec![4];
1982 buf.extend_from_slice(s.as_bytes());
1983 buf
1984 }
1985 SochValue::Bool(b) => vec![5, if *b { 1 } else { 0 }],
1986 SochValue::Binary(b) => {
1987 let mut buf = vec![6];
1988 buf.extend_from_slice(b);
1989 buf
1990 }
1991 _ => {
1992 // Fallback: serialize as text
1993 let s = format!("{:?}", value);
1994 let mut buf = vec![4];
1995 buf.extend_from_slice(s.as_bytes());
1996 buf
1997 }
1998 }
1999}
2000
2001fn deserialize_value(bytes: &[u8]) -> SochValue {
2002 if bytes.is_empty() {
2003 return SochValue::Null;
2004 }
2005
2006 match bytes[0] {
2007 0 => SochValue::Null,
2008 1 if bytes.len() >= 9 => {
2009 let i = i64::from_le_bytes(bytes[1..9].try_into().unwrap());
2010 SochValue::Int(i)
2011 }
2012 2 if bytes.len() >= 9 => {
2013 let u = u64::from_le_bytes(bytes[1..9].try_into().unwrap());
2014 SochValue::UInt(u)
2015 }
2016 3 if bytes.len() >= 9 => {
2017 let f = f64::from_le_bytes(bytes[1..9].try_into().unwrap());
2018 SochValue::Float(f)
2019 }
2020 4 => {
2021 let s = String::from_utf8_lossy(&bytes[1..]).to_string();
2022 SochValue::Text(s)
2023 }
2024 5 if bytes.len() >= 2 => SochValue::Bool(bytes[1] != 0),
2025 6 => SochValue::Binary(bytes[1..].to_vec()),
2026 _ => {
2027 // Treat as text
2028 let s = String::from_utf8_lossy(bytes).to_string();
2029 SochValue::Text(s)
2030 }
2031 }
2032}
2033
2034// ============================================================================
2035// Helper functions for columnar query result building
2036// ============================================================================
2037
2038/// Push a SochValue into a TypedColumn
2039fn push_value_to_typed_column(col: &mut CoreTypedColumn, value: Option<SochValue>) {
2040 match value {
2041 None => push_null_to_typed_column(col),
2042 Some(v) => match (col, v) {
2043 (
2044 CoreTypedColumn::Int64 {
2045 values,
2046 validity,
2047 stats,
2048 },
2049 SochValue::Int(i),
2050 ) => {
2051 values.push(i);
2052 validity.push(true);
2053 stats.update_i64(i);
2054 }
2055 (
2056 CoreTypedColumn::Int64 {
2057 values,
2058 validity,
2059 stats,
2060 },
2061 SochValue::UInt(u),
2062 ) => {
2063 values.push(u as i64);
2064 validity.push(true);
2065 stats.update_i64(u as i64);
2066 }
2067 (
2068 CoreTypedColumn::UInt64 {
2069 values,
2070 validity,
2071 stats,
2072 },
2073 SochValue::UInt(u),
2074 ) => {
2075 values.push(u);
2076 validity.push(true);
2077 stats.update_i64(u as i64);
2078 }
2079 (
2080 CoreTypedColumn::UInt64 {
2081 values,
2082 validity,
2083 stats,
2084 },
2085 SochValue::Int(i),
2086 ) => {
2087 values.push(i as u64);
2088 validity.push(true);
2089 stats.update_i64(i);
2090 }
2091 (
2092 CoreTypedColumn::Float64 {
2093 values,
2094 validity,
2095 stats,
2096 },
2097 SochValue::Float(f),
2098 ) => {
2099 values.push(f);
2100 validity.push(true);
2101 stats.update_f64(f);
2102 }
2103 (
2104 CoreTypedColumn::Float64 {
2105 values,
2106 validity,
2107 stats,
2108 },
2109 SochValue::Int(i),
2110 ) => {
2111 values.push(i as f64);
2112 validity.push(true);
2113 stats.update_f64(i as f64);
2114 }
2115 (
2116 CoreTypedColumn::Text {
2117 offsets,
2118 data,
2119 validity,
2120 stats,
2121 },
2122 SochValue::Text(s),
2123 ) => {
2124 data.extend_from_slice(s.as_bytes());
2125 offsets.push(data.len() as u32);
2126 validity.push(true);
2127 stats.row_count += 1;
2128 }
2129 (
2130 CoreTypedColumn::Binary {
2131 offsets,
2132 data,
2133 validity,
2134 stats,
2135 },
2136 SochValue::Binary(b),
2137 ) => {
2138 data.extend_from_slice(&b);
2139 offsets.push(data.len() as u32);
2140 validity.push(true);
2141 stats.row_count += 1;
2142 }
2143 (
2144 CoreTypedColumn::Bool {
2145 values,
2146 validity,
2147 stats,
2148 len,
2149 },
2150 SochValue::Bool(b),
2151 ) => {
2152 let idx = *len;
2153 *len += 1;
2154 let num_words = (*len).div_ceil(64);
2155 while values.len() < num_words {
2156 values.push(0);
2157 }
2158 if b {
2159 let word = idx / 64;
2160 let bit = idx % 64;
2161 values[word] |= 1 << bit;
2162 }
2163 validity.push(true);
2164 stats.row_count += 1;
2165 }
2166 // Type mismatch - push as null
2167 (col, _) => push_null_to_typed_column(col),
2168 },
2169 }
2170}
2171
2172/// Push a null value into a TypedColumn
2173fn push_null_to_typed_column(col: &mut CoreTypedColumn) {
2174 match col {
2175 CoreTypedColumn::Int64 {
2176 values,
2177 validity,
2178 stats,
2179 } => {
2180 values.push(0);
2181 validity.push(false);
2182 stats.update_null();
2183 }
2184 CoreTypedColumn::UInt64 {
2185 values,
2186 validity,
2187 stats,
2188 } => {
2189 values.push(0);
2190 validity.push(false);
2191 stats.update_null();
2192 }
2193 CoreTypedColumn::Float64 {
2194 values,
2195 validity,
2196 stats,
2197 } => {
2198 values.push(0.0);
2199 validity.push(false);
2200 stats.update_null();
2201 }
2202 CoreTypedColumn::Text {
2203 offsets,
2204 data: _,
2205 validity,
2206 stats,
2207 } => {
2208 offsets.push(offsets.last().copied().unwrap_or(0));
2209 validity.push(false);
2210 stats.update_null();
2211 }
2212 CoreTypedColumn::Binary {
2213 offsets,
2214 data: _,
2215 validity,
2216 stats,
2217 } => {
2218 offsets.push(offsets.last().copied().unwrap_or(0));
2219 validity.push(false);
2220 stats.update_null();
2221 }
2222 CoreTypedColumn::Bool {
2223 values,
2224 validity,
2225 stats,
2226 len,
2227 } => {
2228 *len += 1;
2229 let num_words = (*len).div_ceil(64);
2230 while values.len() < num_words {
2231 values.push(0);
2232 }
2233 validity.push(false);
2234 stats.update_null();
2235 }
2236 }
2237}
2238
2239#[cfg(test)]
2240mod tests {
2241 use super::*;
2242 use tempfile::tempdir;
2243
2244 #[test]
2245 fn test_database_open_close() {
2246 let dir = tempdir().unwrap();
2247 let db = Database::open(dir.path()).unwrap();
2248
2249 // Should be able to begin a transaction
2250 let txn = db.begin_transaction().unwrap();
2251 assert!(txn.txn_id > 0);
2252
2253 db.abort(txn).unwrap();
2254 db.shutdown().unwrap();
2255 }
2256
2257 #[test]
2258 fn test_database_put_get() {
2259 let dir = tempdir().unwrap();
2260 let db = Database::open(dir.path()).unwrap();
2261
2262 let txn = db.begin_transaction().unwrap();
2263 db.put(txn, b"key1", b"value1").unwrap();
2264
2265 let val = db.get(txn, b"key1").unwrap();
2266 assert_eq!(val, Some(b"value1".to_vec()));
2267
2268 db.commit(txn).unwrap();
2269
2270 // New transaction should see committed data
2271 let txn2 = db.begin_transaction().unwrap();
2272 let val = db.get(txn2, b"key1").unwrap();
2273 assert_eq!(val, Some(b"value1".to_vec()));
2274 db.abort(txn2).unwrap();
2275 }
2276
2277 #[test]
2278 fn test_database_path_api() {
2279 let dir = tempdir().unwrap();
2280 let db = Database::open(dir.path()).unwrap();
2281
2282 let txn = db.begin_transaction().unwrap();
2283
2284 // Write using path API
2285 db.put_path(txn, "users/1/name", b"Alice").unwrap();
2286 db.put_path(txn, "users/1/email", b"alice@example.com")
2287 .unwrap();
2288 db.put_path(txn, "users/2/name", b"Bob").unwrap();
2289
2290 db.commit(txn).unwrap();
2291
2292 // Scan path prefix
2293 let txn2 = db.begin_transaction().unwrap();
2294 let results = db.scan_path(txn2, "users/1/").unwrap();
2295 assert_eq!(results.len(), 2);
2296
2297 db.abort(txn2).unwrap();
2298 }
2299
2300 #[test]
2301 fn test_database_table_api() {
2302 let dir = tempdir().unwrap();
2303 let db = Database::open(dir.path()).unwrap();
2304
2305 // Register table
2306 db.register_table(TableSchema {
2307 name: "users".to_string(),
2308 columns: vec![
2309 ColumnDef {
2310 name: "name".to_string(),
2311 col_type: ColumnType::Text,
2312 nullable: false,
2313 },
2314 ColumnDef {
2315 name: "age".to_string(),
2316 col_type: ColumnType::Int64,
2317 nullable: true,
2318 },
2319 ],
2320 })
2321 .unwrap();
2322
2323 // Insert row
2324 let txn = db.begin_transaction().unwrap();
2325 let mut values = HashMap::new();
2326 values.insert("name".to_string(), SochValue::Text("Alice".to_string()));
2327 values.insert("age".to_string(), SochValue::Int(30));
2328
2329 db.insert_row(txn, "users", 1, &values).unwrap();
2330 db.commit(txn).unwrap();
2331
2332 // Read row
2333 let txn2 = db.begin_transaction().unwrap();
2334 let row = db.read_row(txn2, "users", 1, None).unwrap();
2335 assert!(row.is_some());
2336
2337 let row = row.unwrap();
2338 assert_eq!(row.get("name"), Some(&SochValue::Text("Alice".to_string())));
2339
2340 db.abort(txn2).unwrap();
2341 }
2342
2343 #[test]
2344 fn test_database_query_builder() {
2345 let dir = tempdir().unwrap();
2346 let db = Database::open(dir.path()).unwrap();
2347
2348 // Insert test data
2349 let txn = db.begin_transaction().unwrap();
2350 db.put_path(txn, "docs/1/title", b"Hello").unwrap();
2351 db.put_path(txn, "docs/1/content", b"World").unwrap();
2352 db.put_path(txn, "docs/2/title", b"Foo").unwrap();
2353 db.put_path(txn, "docs/2/content", b"Bar").unwrap();
2354 db.commit(txn).unwrap();
2355
2356 // Query with limit
2357 let txn2 = db.begin_transaction().unwrap();
2358 let result = db.query(txn2, "docs/").limit(1).execute().unwrap();
2359
2360 assert_eq!(result.rows.len(), 1);
2361 db.abort(txn2).unwrap();
2362 }
2363
2364 #[test]
2365 fn test_database_crash_recovery() {
2366 let dir = tempdir().unwrap();
2367
2368 // Write and commit
2369 {
2370 let db = Database::open(dir.path()).unwrap();
2371 let txn = db.begin_transaction().unwrap();
2372 db.put(txn, b"persist", b"this").unwrap();
2373 db.commit(txn).unwrap();
2374 // Don't call shutdown - simulate crash
2375 std::mem::forget(db);
2376 }
2377
2378 // Reopen - should recover
2379 {
2380 let db = Database::open(dir.path()).unwrap();
2381 let txn = db.begin_transaction().unwrap();
2382 let val = db.get(txn, b"persist").unwrap();
2383 assert_eq!(val, Some(b"this".to_vec()));
2384 db.abort(txn).unwrap();
2385 }
2386 }
2387}