Skip to main content

sochdb_storage/
storage_engine.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! StorageEngine Trait Abstraction (Task 1)
19//!
20//! Decouples the query engine from concrete storage implementations.
21//! Enables pluggable backends: LSMTree for compatibility, Lscs for columnar TOON workloads.
22//!
23//! ## I/O Reduction Model
24//!
25//! ```text
26//! Traditional Row I/O: O(N × K) where N=rows, K=total columns
27//! Columnar I/O:        O(N × k) where k=selected columns
28//!
29//! For k/K = 0.2 (typical TOON projection):
30//! I/O_reduction = 1 - (k/K) = 80%
31//! ```
32
33use sochdb_core::{Result, SochDBError, SochRow, SochValue};
34use std::ops::Range;
35use std::path::Path;
36use std::sync::Arc;
37
38/// Transaction handle for ACID operations
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
40pub struct TxnHandle {
41    /// Transaction ID
42    pub txn_id: u64,
43    /// Snapshot version for MVCC reads
44    pub snapshot_version: u64,
45    /// Transaction start timestamp
46    pub start_ts: u64,
47}
48
49impl TxnHandle {
50    /// Create a new transaction handle
51    pub fn new(txn_id: u64, snapshot_version: u64) -> Self {
52        let start_ts = std::time::SystemTime::now()
53            .duration_since(std::time::UNIX_EPOCH)
54            .unwrap()
55            .as_micros() as u64;
56        Self {
57            txn_id,
58            snapshot_version,
59            start_ts,
60        }
61    }
62}
63
64/// Column identifier
65pub type ColumnId = u32;
66
67/// Row identifier
68pub type RowId = u64;
69
70/// A row of data with column values
71#[derive(Debug, Clone)]
72pub struct Row {
73    /// Row ID
74    pub id: RowId,
75    /// Column values (indexed by column position)
76    pub values: Vec<Option<Vec<u8>>>,
77    /// Transaction start timestamp (MVCC)
78    pub txn_start: u64,
79    /// Transaction end timestamp (MVCC, 0 = active)
80    pub txn_end: u64,
81}
82
83impl Row {
84    /// Create a new row
85    pub fn new(id: RowId, values: Vec<Option<Vec<u8>>>) -> Self {
86        let now = std::time::SystemTime::now()
87            .duration_since(std::time::UNIX_EPOCH)
88            .unwrap()
89            .as_micros() as u64;
90        Self {
91            id,
92            values,
93            txn_start: now,
94            txn_end: 0,
95        }
96    }
97
98    /// Check if row is visible at given snapshot
99    pub fn is_visible(&self, snapshot_version: u64) -> bool {
100        self.txn_start <= snapshot_version && (self.txn_end == 0 || self.txn_end > snapshot_version)
101    }
102
103    /// Convert to SochRow
104    pub fn to_soch_row(&self, _schema: &[String]) -> SochRow {
105        let values: Vec<SochValue> = self
106            .values
107            .iter()
108            .map(|v| match v {
109                Some(bytes) => {
110                    // Try to interpret as string first
111                    if let Ok(s) = std::str::from_utf8(bytes) {
112                        SochValue::Text(s.to_string())
113                    } else {
114                        SochValue::Binary(bytes.clone())
115                    }
116                }
117                None => SochValue::Null,
118            })
119            .collect();
120        SochRow::new(values)
121    }
122}
123
124/// Iterator over columns
125pub struct ColumnIterator {
126    /// Current position
127    position: usize,
128    /// Rows with projected columns
129    rows: Vec<Row>,
130    /// Column IDs being iterated
131    column_ids: Vec<ColumnId>,
132}
133
134impl ColumnIterator {
135    /// Create a new column iterator
136    pub fn new(rows: Vec<Row>, column_ids: Vec<ColumnId>) -> Self {
137        Self {
138            position: 0,
139            rows,
140            column_ids,
141        }
142    }
143
144    /// Get column IDs
145    pub fn column_ids(&self) -> &[ColumnId] {
146        &self.column_ids
147    }
148}
149
150impl Iterator for ColumnIterator {
151    type Item = Row;
152
153    fn next(&mut self) -> Option<Self::Item> {
154        if self.position < self.rows.len() {
155            let row = self.rows[self.position].clone();
156            self.position += 1;
157            Some(row)
158        } else {
159            None
160        }
161    }
162}
163
164/// Storage engine statistics
165#[derive(Debug, Clone, Default)]
166pub struct StorageStats {
167    /// Total rows stored
168    pub total_rows: u64,
169    /// Total bytes on disk
170    pub disk_bytes: u64,
171    /// Bytes in memory (memtables)
172    pub memory_bytes: u64,
173    /// Number of levels
174    pub num_levels: u32,
175    /// Files per level
176    pub files_per_level: Vec<u32>,
177    /// Read amplification
178    pub read_amplification: f64,
179    /// Write amplification
180    pub write_amplification: f64,
181}
182
183/// StorageEngine trait - the core abstraction for pluggable storage backends
184///
185/// Implementations:
186/// - `Lscs`: Columnar storage for TOON workloads (80% I/O reduction for projections)
187/// - `LegacyLsmTree`: Row-oriented storage for compatibility
188pub trait StorageEngine: Send + Sync {
189    /// Begin a new transaction
190    fn begin_txn(&self) -> Result<TxnHandle>;
191
192    /// Get a single row by key
193    fn get(&self, txn: &TxnHandle, key: &[u8]) -> Result<Option<Row>>;
194
195    /// Put a row (insert or update)
196    fn put(&self, txn: &TxnHandle, key: &[u8], row: Row) -> Result<()>;
197
198    /// Delete a row
199    fn delete(&self, txn: &TxnHandle, key: &[u8]) -> Result<()>;
200
201    /// Scan a range of rows
202    fn scan(&self, txn: &TxnHandle, range: Range<Vec<u8>>) -> Result<Vec<Row>>;
203
204    /// Scan columns selectively (columnar optimization)
205    ///
206    /// This is the key optimization for TOON workloads:
207    /// - Traditional: Read all columns O(N × K)
208    /// - Columnar: Read only selected columns O(N × k)
209    ///
210    /// For k/K = 0.2, this is 80% I/O reduction
211    fn scan_columns(
212        &self,
213        txn: &TxnHandle,
214        range: Range<Vec<u8>>,
215        cols: &[ColumnId],
216    ) -> Result<ColumnIterator>;
217
218    /// Commit a transaction
219    fn commit(&self, txn: TxnHandle) -> Result<()>;
220
221    /// Abort a transaction
222    fn abort(&self, txn: TxnHandle) -> Result<()>;
223
224    /// Get storage statistics
225    fn stats(&self) -> StorageStats;
226
227    /// Force flush memtables to disk
228    fn flush(&self) -> Result<()>;
229
230    /// Trigger compaction
231    fn compact(&self) -> Result<()>;
232
233    /// Close the storage engine
234    fn close(&self) -> Result<()>;
235}
236
237/// Open a storage engine from a path
238pub fn open_storage_engine<P: AsRef<Path>>(
239    path: P,
240    engine_type: StorageEngineType,
241) -> Result<Arc<dyn StorageEngine>> {
242    match engine_type {
243        StorageEngineType::Lscs => {
244            use crate::lscs::{ColumnDef, ColumnType, Lscs, LscsConfig, TableSchema};
245
246            // Create default schema for general-purpose storage
247            let schema = TableSchema::new(
248                "default".to_string(),
249                vec![
250                    ColumnDef {
251                        name: "key".to_string(),
252                        col_type: ColumnType::Binary,
253                        nullable: false,
254                    },
255                    ColumnDef {
256                        name: "value".to_string(),
257                        col_type: ColumnType::Binary,
258                        nullable: true,
259                    },
260                ],
261            )
262            .with_mvcc();
263
264            let lscs = Lscs::new(path.as_ref().to_path_buf(), schema, LscsConfig::default())?;
265            Ok(Arc::new(LscsAdapter::new(lscs)))
266        }
267        StorageEngineType::Legacy => Err(SochDBError::InvalidArgument(
268            "Legacy LSMTree has been removed; use Lscs".to_string(),
269        )),
270    }
271}
272
273/// Storage engine types
274#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
275pub enum StorageEngineType {
276    /// LSCS columnar storage (default)
277    #[default]
278    Lscs,
279    /// Legacy row-oriented storage (deprecated)
280    Legacy,
281}
282
283/// Adapter to make Lscs implement StorageEngine
284pub struct LscsAdapter {
285    inner: crate::lscs::Lscs,
286    next_txn_id: std::sync::atomic::AtomicU64,
287    version_counter: std::sync::atomic::AtomicU64,
288}
289
290impl LscsAdapter {
291    /// Create a new adapter
292    pub fn new(lscs: crate::lscs::Lscs) -> Self {
293        Self {
294            inner: lscs,
295            next_txn_id: std::sync::atomic::AtomicU64::new(1),
296            version_counter: std::sync::atomic::AtomicU64::new(1),
297        }
298    }
299}
300
301impl StorageEngine for LscsAdapter {
302    fn begin_txn(&self) -> Result<TxnHandle> {
303        let txn_id = self
304            .next_txn_id
305            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
306        let snapshot = self
307            .version_counter
308            .load(std::sync::atomic::Ordering::Acquire);
309        Ok(TxnHandle::new(txn_id, snapshot))
310    }
311
312    fn get(&self, txn: &TxnHandle, key: &[u8]) -> Result<Option<Row>> {
313        // Use learned index for O(1) lookup
314        let row_id = u64::from_le_bytes(
315            key.try_into()
316                .map_err(|_| SochDBError::InvalidArgument("Key must be 8 bytes".to_string()))?,
317        );
318
319        // Use LSCS get() with MVCC filtering
320        if let Some(values) = self.inner.get(row_id)? {
321            // Extract MVCC timestamps from the last two columns (__txn_start, __txn_end)
322            let num_cols = values.len();
323            let (txn_start, txn_end) = if num_cols >= 2 {
324                let start = values[num_cols - 2]
325                    .as_ref()
326                    .and_then(|v| v.get(..8))
327                    .map(|b| u64::from_le_bytes(b.try_into().unwrap()))
328                    .unwrap_or(0);
329                let end = values[num_cols - 1]
330                    .as_ref()
331                    .and_then(|v| v.get(..8))
332                    .map(|b| u64::from_le_bytes(b.try_into().unwrap()))
333                    .unwrap_or(0);
334                (start, end)
335            } else {
336                (0, 0)
337            };
338
339            let row = Row {
340                id: row_id,
341                values,
342                txn_start,
343                txn_end,
344            };
345
346            // Apply MVCC visibility check
347            if row.is_visible(txn.snapshot_version) {
348                return Ok(Some(row));
349            }
350        }
351
352        Ok(None)
353    }
354
355    fn put(&self, _txn: &TxnHandle, key: &[u8], row: Row) -> Result<()> {
356        let values: Vec<Option<&[u8]>> = row.values.iter().map(|v| v.as_deref()).collect();
357        let _ = key; // Key is derived from row ID
358        self.inner.insert(&values)?;
359        Ok(())
360    }
361
362    fn delete(&self, txn: &TxnHandle, key: &[u8]) -> Result<()> {
363        // Mark row as deleted by setting txn_end
364        // In MVCC, delete is a "tombstone" - we mark the row with an end timestamp
365        let row_id = u64::from_le_bytes(
366            key.try_into()
367                .map_err(|_| SochDBError::InvalidArgument("Key must be 8 bytes".to_string()))?,
368        );
369
370        // Write tombstone by updating __txn_end column to current transaction timestamp
371        self.inner
372            .mark_deleted(row_id, txn.txn_id, txn.snapshot_version)?;
373        Ok(())
374    }
375
376    fn scan(&self, txn: &TxnHandle, range: Range<Vec<u8>>) -> Result<Vec<Row>> {
377        // Parse range as row IDs
378        let start = if range.start.len() >= 8 {
379            u64::from_le_bytes(range.start[..8].try_into().unwrap())
380        } else {
381            0
382        };
383        let end = if range.end.len() >= 8 {
384            u64::from_le_bytes(range.end[..8].try_into().unwrap())
385        } else {
386            u64::MAX
387        };
388
389        // Use LSCS scan_range
390        let scan_results = self.inner.scan_range(start, end)?;
391        let rows: Vec<Row> = scan_results
392            .into_iter()
393            .filter_map(|(row_id, values)| {
394                let row = Row {
395                    id: row_id,
396                    values,
397                    txn_start: 0,
398                    txn_end: 0,
399                };
400                if row.is_visible(txn.snapshot_version) {
401                    Some(row)
402                } else {
403                    None
404                }
405            })
406            .collect();
407
408        Ok(rows)
409    }
410
411    fn scan_columns(
412        &self,
413        txn: &TxnHandle,
414        range: Range<Vec<u8>>,
415        cols: &[ColumnId],
416    ) -> Result<ColumnIterator> {
417        // Parse range as row IDs
418        let start = if range.start.len() >= 8 {
419            u64::from_le_bytes(range.start[..8].try_into().unwrap())
420        } else {
421            0
422        };
423        let end = if range.end.len() >= 8 {
424            u64::from_le_bytes(range.end[..8].try_into().unwrap())
425        } else {
426            u64::MAX
427        };
428
429        // Columnar selective read - only read requested columns
430        // This achieves 80% I/O reduction when reading 20% of columns
431        let col_indices: Vec<usize> = cols.iter().map(|&c| c as usize).collect();
432
433        let scan_results = self.inner.scan_columns_range(start, end, &col_indices)?;
434
435        let rows: Vec<Row> = scan_results
436            .into_iter()
437            .filter_map(|(row_id, values)| {
438                let row = Row {
439                    id: row_id,
440                    values,
441                    txn_start: 0,
442                    txn_end: 0,
443                };
444                if row.is_visible(txn.snapshot_version) {
445                    Some(row)
446                } else {
447                    None
448                }
449            })
450            .collect();
451
452        Ok(ColumnIterator::new(rows, cols.to_vec()))
453    }
454
455    fn commit(&self, txn: TxnHandle) -> Result<()> {
456        // Ensure durability by calling fsync
457        self.inner.fsync()?;
458        self.version_counter
459            .fetch_add(1, std::sync::atomic::Ordering::Release);
460        let _ = txn;
461        Ok(())
462    }
463
464    fn abort(&self, _txn: TxnHandle) -> Result<()> {
465        Ok(())
466    }
467
468    fn stats(&self) -> StorageStats {
469        let lscs_stats = self.inner.stats();
470        StorageStats {
471            total_rows: lscs_stats.next_row_id,
472            disk_bytes: lscs_stats.disk_bytes,
473            memory_bytes: lscs_stats.active_memtable_bytes as u64,
474            num_levels: lscs_stats.level_row_counts.len() as u32,
475            files_per_level: vec![0; lscs_stats.level_row_counts.len()],
476            read_amplification: 1.0,
477            write_amplification: 1.0,
478        }
479    }
480
481    fn flush(&self) -> Result<()> {
482        self.inner.flush()
483    }
484
485    fn compact(&self) -> Result<()> {
486        self.inner.compact()
487    }
488
489    fn close(&self) -> Result<()> {
490        self.flush()
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497
498    #[test]
499    fn test_txn_handle() {
500        let handle = TxnHandle::new(1, 100);
501        assert_eq!(handle.txn_id, 1);
502        assert_eq!(handle.snapshot_version, 100);
503        assert!(handle.start_ts > 0);
504    }
505
506    #[test]
507    fn test_row_visibility() {
508        let mut row = Row::new(1, vec![Some(b"test".to_vec())]);
509        row.txn_start = 100;
510        row.txn_end = 0;
511
512        // Active row visible at any version >= 100
513        assert!(row.is_visible(100));
514        assert!(row.is_visible(200));
515        assert!(!row.is_visible(99));
516
517        // Deleted row
518        row.txn_end = 150;
519        assert!(row.is_visible(120)); // Between start and end
520        assert!(!row.is_visible(150)); // At deletion
521        assert!(!row.is_visible(200)); // After deletion
522    }
523
524    #[test]
525    fn test_column_iterator() {
526        let rows = vec![
527            Row::new(1, vec![Some(b"a".to_vec()), Some(b"b".to_vec())]),
528            Row::new(2, vec![Some(b"c".to_vec()), Some(b"d".to_vec())]),
529        ];
530        let mut iter = ColumnIterator::new(rows, vec![0, 1]);
531
532        assert_eq!(iter.column_ids(), &[0, 1]);
533        assert!(iter.next().is_some());
534        assert!(iter.next().is_some());
535        assert!(iter.next().is_none());
536    }
537}