sochdb_storage/
storage_engine.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! StorageEngine Trait Abstraction (Task 1)
16//!
17//! Decouples the query engine from concrete storage implementations.
18//! Enables pluggable backends: LSMTree for compatibility, Lscs for columnar TOON workloads.
19//!
20//! ## I/O Reduction Model
21//!
22//! ```text
23//! Traditional Row I/O: O(N × K) where N=rows, K=total columns
24//! Columnar I/O:        O(N × k) where k=selected columns
25//!
26//! For k/K = 0.2 (typical TOON projection):
27//! I/O_reduction = 1 - (k/K) = 80%
28//! ```
29
30use std::ops::Range;
31use std::path::Path;
32use std::sync::Arc;
33use sochdb_core::{Result, SochDBError, SochRow, SochValue};
34
35/// Transaction handle for ACID operations
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
37pub struct TxnHandle {
38    /// Transaction ID
39    pub txn_id: u64,
40    /// Snapshot version for MVCC reads
41    pub snapshot_version: u64,
42    /// Transaction start timestamp
43    pub start_ts: u64,
44}
45
46impl TxnHandle {
47    /// Create a new transaction handle
48    pub fn new(txn_id: u64, snapshot_version: u64) -> Self {
49        let start_ts = std::time::SystemTime::now()
50            .duration_since(std::time::UNIX_EPOCH)
51            .unwrap()
52            .as_micros() as u64;
53        Self {
54            txn_id,
55            snapshot_version,
56            start_ts,
57        }
58    }
59}
60
61/// Column identifier
62pub type ColumnId = u32;
63
64/// Row identifier
65pub type RowId = u64;
66
67/// A row of data with column values
68#[derive(Debug, Clone)]
69pub struct Row {
70    /// Row ID
71    pub id: RowId,
72    /// Column values (indexed by column position)
73    pub values: Vec<Option<Vec<u8>>>,
74    /// Transaction start timestamp (MVCC)
75    pub txn_start: u64,
76    /// Transaction end timestamp (MVCC, 0 = active)
77    pub txn_end: u64,
78}
79
80impl Row {
81    /// Create a new row
82    pub fn new(id: RowId, values: Vec<Option<Vec<u8>>>) -> Self {
83        let now = std::time::SystemTime::now()
84            .duration_since(std::time::UNIX_EPOCH)
85            .unwrap()
86            .as_micros() as u64;
87        Self {
88            id,
89            values,
90            txn_start: now,
91            txn_end: 0,
92        }
93    }
94
95    /// Check if row is visible at given snapshot
96    pub fn is_visible(&self, snapshot_version: u64) -> bool {
97        self.txn_start <= snapshot_version && (self.txn_end == 0 || self.txn_end > snapshot_version)
98    }
99
100    /// Convert to SochRow
101    pub fn to_soch_row(&self, _schema: &[String]) -> SochRow {
102        let values: Vec<SochValue> = self
103            .values
104            .iter()
105            .map(|v| match v {
106                Some(bytes) => {
107                    // Try to interpret as string first
108                    if let Ok(s) = std::str::from_utf8(bytes) {
109                        SochValue::Text(s.to_string())
110                    } else {
111                        SochValue::Binary(bytes.clone())
112                    }
113                }
114                None => SochValue::Null,
115            })
116            .collect();
117        SochRow::new(values)
118    }
119}
120
121/// Iterator over columns
122pub struct ColumnIterator {
123    /// Current position
124    position: usize,
125    /// Rows with projected columns
126    rows: Vec<Row>,
127    /// Column IDs being iterated
128    column_ids: Vec<ColumnId>,
129}
130
131impl ColumnIterator {
132    /// Create a new column iterator
133    pub fn new(rows: Vec<Row>, column_ids: Vec<ColumnId>) -> Self {
134        Self {
135            position: 0,
136            rows,
137            column_ids,
138        }
139    }
140
141    /// Get column IDs
142    pub fn column_ids(&self) -> &[ColumnId] {
143        &self.column_ids
144    }
145}
146
147impl Iterator for ColumnIterator {
148    type Item = Row;
149
150    fn next(&mut self) -> Option<Self::Item> {
151        if self.position < self.rows.len() {
152            let row = self.rows[self.position].clone();
153            self.position += 1;
154            Some(row)
155        } else {
156            None
157        }
158    }
159}
160
161/// Storage engine statistics
162#[derive(Debug, Clone, Default)]
163pub struct StorageStats {
164    /// Total rows stored
165    pub total_rows: u64,
166    /// Total bytes on disk
167    pub disk_bytes: u64,
168    /// Bytes in memory (memtables)
169    pub memory_bytes: u64,
170    /// Number of levels
171    pub num_levels: u32,
172    /// Files per level
173    pub files_per_level: Vec<u32>,
174    /// Read amplification
175    pub read_amplification: f64,
176    /// Write amplification
177    pub write_amplification: f64,
178}
179
180/// StorageEngine trait - the core abstraction for pluggable storage backends
181///
182/// Implementations:
183/// - `Lscs`: Columnar storage for TOON workloads (80% I/O reduction for projections)
184/// - `LegacyLsmTree`: Row-oriented storage for compatibility
185pub trait StorageEngine: Send + Sync {
186    /// Begin a new transaction
187    fn begin_txn(&self) -> Result<TxnHandle>;
188
189    /// Get a single row by key
190    fn get(&self, txn: &TxnHandle, key: &[u8]) -> Result<Option<Row>>;
191
192    /// Put a row (insert or update)
193    fn put(&self, txn: &TxnHandle, key: &[u8], row: Row) -> Result<()>;
194
195    /// Delete a row
196    fn delete(&self, txn: &TxnHandle, key: &[u8]) -> Result<()>;
197
198    /// Scan a range of rows
199    fn scan(&self, txn: &TxnHandle, range: Range<Vec<u8>>) -> Result<Vec<Row>>;
200
201    /// Scan columns selectively (columnar optimization)
202    ///
203    /// This is the key optimization for TOON workloads:
204    /// - Traditional: Read all columns O(N × K)
205    /// - Columnar: Read only selected columns O(N × k)
206    ///
207    /// For k/K = 0.2, this is 80% I/O reduction
208    fn scan_columns(
209        &self,
210        txn: &TxnHandle,
211        range: Range<Vec<u8>>,
212        cols: &[ColumnId],
213    ) -> Result<ColumnIterator>;
214
215    /// Commit a transaction
216    fn commit(&self, txn: TxnHandle) -> Result<()>;
217
218    /// Abort a transaction
219    fn abort(&self, txn: TxnHandle) -> Result<()>;
220
221    /// Get storage statistics
222    fn stats(&self) -> StorageStats;
223
224    /// Force flush memtables to disk
225    fn flush(&self) -> Result<()>;
226
227    /// Trigger compaction
228    fn compact(&self) -> Result<()>;
229
230    /// Close the storage engine
231    fn close(&self) -> Result<()>;
232}
233
234/// Open a storage engine from a path
235pub fn open_storage_engine<P: AsRef<Path>>(
236    path: P,
237    engine_type: StorageEngineType,
238) -> Result<Arc<dyn StorageEngine>> {
239    match engine_type {
240        StorageEngineType::Lscs => {
241            use crate::lscs::{ColumnDef, ColumnType, Lscs, LscsConfig, TableSchema};
242
243            // Create default schema for general-purpose storage
244            let schema = TableSchema::new(
245                "default".to_string(),
246                vec![
247                    ColumnDef {
248                        name: "key".to_string(),
249                        col_type: ColumnType::Binary,
250                        nullable: false,
251                    },
252                    ColumnDef {
253                        name: "value".to_string(),
254                        col_type: ColumnType::Binary,
255                        nullable: true,
256                    },
257                ],
258            )
259            .with_mvcc();
260
261            let lscs = Lscs::new(path.as_ref().to_path_buf(), schema, LscsConfig::default())?;
262            Ok(Arc::new(LscsAdapter::new(lscs)))
263        }
264        StorageEngineType::Legacy => Err(SochDBError::InvalidArgument(
265            "Legacy LSMTree has been removed; use Lscs".to_string(),
266        )),
267    }
268}
269
270/// Storage engine types
271#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
272pub enum StorageEngineType {
273    /// LSCS columnar storage (default)
274    #[default]
275    Lscs,
276    /// Legacy row-oriented storage (deprecated)
277    Legacy,
278}
279
280/// Adapter to make Lscs implement StorageEngine
281pub struct LscsAdapter {
282    inner: crate::lscs::Lscs,
283    next_txn_id: std::sync::atomic::AtomicU64,
284    version_counter: std::sync::atomic::AtomicU64,
285}
286
287impl LscsAdapter {
288    /// Create a new adapter
289    pub fn new(lscs: crate::lscs::Lscs) -> Self {
290        Self {
291            inner: lscs,
292            next_txn_id: std::sync::atomic::AtomicU64::new(1),
293            version_counter: std::sync::atomic::AtomicU64::new(1),
294        }
295    }
296}
297
298impl StorageEngine for LscsAdapter {
299    fn begin_txn(&self) -> Result<TxnHandle> {
300        let txn_id = self
301            .next_txn_id
302            .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
303        let snapshot = self
304            .version_counter
305            .load(std::sync::atomic::Ordering::Acquire);
306        Ok(TxnHandle::new(txn_id, snapshot))
307    }
308
309    fn get(&self, txn: &TxnHandle, key: &[u8]) -> Result<Option<Row>> {
310        // Use learned index for O(1) lookup
311        let row_id = u64::from_le_bytes(
312            key.try_into()
313                .map_err(|_| SochDBError::InvalidArgument("Key must be 8 bytes".to_string()))?,
314        );
315
316        // Use LSCS get() with MVCC filtering
317        if let Some(values) = self.inner.get(row_id)? {
318            // Extract MVCC timestamps from the last two columns (__txn_start, __txn_end)
319            let num_cols = values.len();
320            let (txn_start, txn_end) = if num_cols >= 2 {
321                let start = values[num_cols - 2]
322                    .as_ref()
323                    .and_then(|v| v.get(..8))
324                    .map(|b| u64::from_le_bytes(b.try_into().unwrap()))
325                    .unwrap_or(0);
326                let end = values[num_cols - 1]
327                    .as_ref()
328                    .and_then(|v| v.get(..8))
329                    .map(|b| u64::from_le_bytes(b.try_into().unwrap()))
330                    .unwrap_or(0);
331                (start, end)
332            } else {
333                (0, 0)
334            };
335
336            let row = Row {
337                id: row_id,
338                values,
339                txn_start,
340                txn_end,
341            };
342
343            // Apply MVCC visibility check
344            if row.is_visible(txn.snapshot_version) {
345                return Ok(Some(row));
346            }
347        }
348
349        Ok(None)
350    }
351
352    fn put(&self, _txn: &TxnHandle, key: &[u8], row: Row) -> Result<()> {
353        let values: Vec<Option<&[u8]>> = row.values.iter().map(|v| v.as_deref()).collect();
354        let _ = key; // Key is derived from row ID
355        self.inner.insert(&values)?;
356        Ok(())
357    }
358
359    fn delete(&self, txn: &TxnHandle, key: &[u8]) -> Result<()> {
360        // Mark row as deleted by setting txn_end
361        // In MVCC, delete is a "tombstone" - we mark the row with an end timestamp
362        let row_id = u64::from_le_bytes(
363            key.try_into()
364                .map_err(|_| SochDBError::InvalidArgument("Key must be 8 bytes".to_string()))?,
365        );
366
367        // Write tombstone by updating __txn_end column to current transaction timestamp
368        self.inner
369            .mark_deleted(row_id, txn.txn_id, txn.snapshot_version)?;
370        Ok(())
371    }
372
373    fn scan(&self, txn: &TxnHandle, range: Range<Vec<u8>>) -> Result<Vec<Row>> {
374        // Parse range as row IDs
375        let start = if range.start.len() >= 8 {
376            u64::from_le_bytes(range.start[..8].try_into().unwrap())
377        } else {
378            0
379        };
380        let end = if range.end.len() >= 8 {
381            u64::from_le_bytes(range.end[..8].try_into().unwrap())
382        } else {
383            u64::MAX
384        };
385
386        // Use LSCS scan_range
387        let scan_results = self.inner.scan_range(start, end)?;
388        let rows: Vec<Row> = scan_results
389            .into_iter()
390            .filter_map(|(row_id, values)| {
391                let row = Row {
392                    id: row_id,
393                    values,
394                    txn_start: 0,
395                    txn_end: 0,
396                };
397                if row.is_visible(txn.snapshot_version) {
398                    Some(row)
399                } else {
400                    None
401                }
402            })
403            .collect();
404
405        Ok(rows)
406    }
407
408    fn scan_columns(
409        &self,
410        txn: &TxnHandle,
411        range: Range<Vec<u8>>,
412        cols: &[ColumnId],
413    ) -> Result<ColumnIterator> {
414        // Parse range as row IDs
415        let start = if range.start.len() >= 8 {
416            u64::from_le_bytes(range.start[..8].try_into().unwrap())
417        } else {
418            0
419        };
420        let end = if range.end.len() >= 8 {
421            u64::from_le_bytes(range.end[..8].try_into().unwrap())
422        } else {
423            u64::MAX
424        };
425
426        // Columnar selective read - only read requested columns
427        // This achieves 80% I/O reduction when reading 20% of columns
428        let col_indices: Vec<usize> = cols.iter().map(|&c| c as usize).collect();
429
430        let scan_results = self.inner.scan_columns_range(start, end, &col_indices)?;
431
432        let rows: Vec<Row> = scan_results
433            .into_iter()
434            .filter_map(|(row_id, values)| {
435                let row = Row {
436                    id: row_id,
437                    values,
438                    txn_start: 0,
439                    txn_end: 0,
440                };
441                if row.is_visible(txn.snapshot_version) {
442                    Some(row)
443                } else {
444                    None
445                }
446            })
447            .collect();
448
449        Ok(ColumnIterator::new(rows, cols.to_vec()))
450    }
451
452    fn commit(&self, txn: TxnHandle) -> Result<()> {
453        // Ensure durability by calling fsync
454        self.inner.fsync()?;
455        self.version_counter
456            .fetch_add(1, std::sync::atomic::Ordering::Release);
457        let _ = txn;
458        Ok(())
459    }
460
461    fn abort(&self, _txn: TxnHandle) -> Result<()> {
462        Ok(())
463    }
464
465    fn stats(&self) -> StorageStats {
466        let lscs_stats = self.inner.stats();
467        StorageStats {
468            total_rows: lscs_stats.next_row_id,
469            disk_bytes: lscs_stats.disk_bytes,
470            memory_bytes: lscs_stats.active_memtable_bytes as u64,
471            num_levels: lscs_stats.level_row_counts.len() as u32,
472            files_per_level: vec![0; lscs_stats.level_row_counts.len()],
473            read_amplification: 1.0,
474            write_amplification: 1.0,
475        }
476    }
477
478    fn flush(&self) -> Result<()> {
479        self.inner.flush()
480    }
481
482    fn compact(&self) -> Result<()> {
483        self.inner.compact()
484    }
485
486    fn close(&self) -> Result<()> {
487        self.flush()
488    }
489}
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494
495    #[test]
496    fn test_txn_handle() {
497        let handle = TxnHandle::new(1, 100);
498        assert_eq!(handle.txn_id, 1);
499        assert_eq!(handle.snapshot_version, 100);
500        assert!(handle.start_ts > 0);
501    }
502
503    #[test]
504    fn test_row_visibility() {
505        let mut row = Row::new(1, vec![Some(b"test".to_vec())]);
506        row.txn_start = 100;
507        row.txn_end = 0;
508
509        // Active row visible at any version >= 100
510        assert!(row.is_visible(100));
511        assert!(row.is_visible(200));
512        assert!(!row.is_visible(99));
513
514        // Deleted row
515        row.txn_end = 150;
516        assert!(row.is_visible(120)); // Between start and end
517        assert!(!row.is_visible(150)); // At deletion
518        assert!(!row.is_visible(200)); // After deletion
519    }
520
521    #[test]
522    fn test_column_iterator() {
523        let rows = vec![
524            Row::new(1, vec![Some(b"a".to_vec()), Some(b"b".to_vec())]),
525            Row::new(2, vec![Some(b"c".to_vec()), Some(b"d".to_vec())]),
526        ];
527        let mut iter = ColumnIterator::new(rows, vec![0, 1]);
528
529        assert_eq!(iter.column_ids(), &[0, 1]);
530        assert!(iter.next().is_some());
531        assert!(iter.next().is_some());
532        assert!(iter.next().is_none());
533    }
534}