rustlite_storage/
lib.rs

1//! # RustLite Storage Engine
2//!
3//! LSM-tree based persistent storage engine for RustLite.
4//!
5//! ## ⚠️ Internal Implementation Detail
6//!
7//! **This crate is an internal implementation detail of RustLite.**
8//!
9//! Users should depend on the main [`rustlite`](https://crates.io/crates/rustlite) crate
10//! instead, which provides the stable public API. This crate's API may change
11//! without notice between minor versions.
12//!
13//! ```toml
14//! # In your Cargo.toml - use the main crate, not this one:
15//! [dependencies]
16//! rustlite = "0.3"
17//! ```
18//!
19//! ---
20//!
21//! This crate provides the storage engine for RustLite, implementing an
22//! LSM-tree (Log-Structured Merge-tree) architecture with:
23//!
24//! - **Memtable**: In-memory write buffer using BTreeMap for sorted order
25//! - **SSTable**: Immutable on-disk sorted string tables
26//! - **Compaction**: Background merging to reduce read amplification
27//! - **Manifest**: Metadata tracking for crash recovery
28//!
29//! ## Architecture
30//!
31//! ```text
32//! Writes → Memtable (memory) → SSTable (disk)
33//!              ↓                    ↓
34//!         Flush when full    Compact to lower levels
35//! ```
36
37use rustlite_core::{Error, Result};
38use rustlite_wal::{RecordPayload, SyncMode, WalConfig, WalManager, WalRecord};
39use std::path::{Path, PathBuf};
40use std::sync::{Arc, Mutex, RwLock};
41
42pub mod compaction;
43pub mod manifest;
44pub mod memtable;
45pub mod sstable;
46
47pub use compaction::{CompactionConfig, CompactionStats, CompactionWorker};
48pub use manifest::{Manifest, ManifestSSTable};
49pub use memtable::{Memtable, MemtableEntry};
50pub use sstable::{SSTableEntry, SSTableMeta, SSTableReader, SSTableWriter};
51
52/// Default memtable flush threshold (4MB)
53const DEFAULT_MEMTABLE_SIZE: u64 = 4 * 1024 * 1024;
54
55/// Storage engine configuration
56#[derive(Debug, Clone)]
57pub struct StorageConfig {
58    /// Maximum memtable size before flushing
59    pub memtable_size: u64,
60    /// Sync mode for WAL
61    pub sync_mode: SyncMode,
62    /// Compaction configuration
63    pub compaction: CompactionConfig,
64    /// Enable background compaction
65    pub enable_compaction: bool,
66}
67
68impl Default for StorageConfig {
69    fn default() -> Self {
70        Self {
71            memtable_size: DEFAULT_MEMTABLE_SIZE,
72            sync_mode: SyncMode::Sync,
73            compaction: CompactionConfig::default(),
74            enable_compaction: true,
75        }
76    }
77}
78
79/// Storage engine manager
80///
81/// Provides a persistent key-value storage using LSM-tree architecture.
82pub struct StorageEngine {
83    /// Database directory
84    dir: PathBuf,
85    /// Configuration
86    config: StorageConfig,
87    /// Active memtable
88    memtable: Arc<RwLock<Memtable>>,
89    /// Immutable memtables being flushed
90    immutable_memtables: Arc<Mutex<Vec<Arc<Memtable>>>>,
91    /// Write-ahead log
92    wal: Arc<Mutex<WalManager>>,
93    /// Manifest
94    manifest: Arc<Mutex<Manifest>>,
95    /// Compaction worker
96    compactor: Arc<Mutex<CompactionWorker>>,
97    /// Current sequence number
98    sequence: Arc<RwLock<u64>>,
99}
100
101impl StorageEngine {
102    /// Open or create a storage engine at the given path
103    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
104        Self::open_with_config(path, StorageConfig::default())
105    }
106
107    /// Open or create a storage engine with custom configuration
108    pub fn open_with_config(path: impl AsRef<Path>, config: StorageConfig) -> Result<Self> {
109        let dir = path.as_ref().to_path_buf();
110        std::fs::create_dir_all(&dir)?;
111
112        // Create subdirectories
113        std::fs::create_dir_all(dir.join("wal"))?;
114        std::fs::create_dir_all(dir.join("sst"))?;
115
116        // Open WAL
117        let wal_config = WalConfig {
118            wal_dir: dir.join("wal"),
119            sync_mode: config.sync_mode,
120            ..Default::default()
121        };
122        let mut wal = WalManager::new(wal_config)?;
123        wal.open()?;
124
125        // Open manifest
126        let manifest = Manifest::open(&dir)?;
127        let sequence = manifest.sequence();
128
129        // Create compactor
130        let compactor = CompactionWorker::new(&dir, config.compaction.clone());
131
132        // Create memtable
133        let memtable = Memtable::with_sequence(sequence);
134
135        let engine = Self {
136            dir,
137            config,
138            memtable: Arc::new(RwLock::new(memtable)),
139            immutable_memtables: Arc::new(Mutex::new(Vec::new())),
140            wal: Arc::new(Mutex::new(wal)),
141            manifest: Arc::new(Mutex::new(manifest)),
142            compactor: Arc::new(Mutex::new(compactor)),
143            sequence: Arc::new(RwLock::new(sequence)),
144        };
145
146        // Recover from WAL
147        engine.recover()?;
148
149        Ok(engine)
150    }
151
152    /// Recover from WAL after crash
153    fn recover(&self) -> Result<()> {
154        let wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
155        let records = wal.recover()?;
156
157        let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
158
159        for record in records {
160            match &record.payload {
161                RecordPayload::Put { key, value } => {
162                    memtable.put(key.clone(), value.clone());
163                }
164                RecordPayload::Delete { key } => {
165                    memtable.delete(key.clone());
166                }
167                _ => {}
168            }
169        }
170
171        Ok(())
172    }
173
174    /// Insert or update a key-value pair
175    pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
176        // Get next sequence number
177        let _seq = {
178            let mut sequence = self.sequence.write().map_err(|_| Error::LockPoisoned)?;
179            *sequence += 1;
180            *sequence
181        };
182
183        // Write to WAL first
184        {
185            let mut wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
186            let record = WalRecord::put(key.to_vec(), value.to_vec());
187            wal.append(record)?;
188        }
189
190        // Write to memtable
191        {
192            let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
193            memtable.put(key.to_vec(), value.to_vec());
194        }
195
196        // Check if flush is needed
197        self.maybe_flush()?;
198
199        Ok(())
200    }
201
202    /// Retrieve a value by key
203    pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
204        // Check active memtable first
205        {
206            let memtable = self.memtable.read().map_err(|_| Error::LockPoisoned)?;
207            if let Some(result) = memtable.get(key) {
208                return match result {
209                    Some(value) => Ok(Some(value.to_vec())),
210                    None => Ok(None), // Tombstone
211                };
212            }
213        }
214
215        // Check immutable memtables (newest first)
216        {
217            let immutable = self
218                .immutable_memtables
219                .lock()
220                .map_err(|_| Error::LockPoisoned)?;
221            for mt in immutable.iter().rev() {
222                if let Some(result) = mt.get(key) {
223                    return match result {
224                        Some(value) => Ok(Some(value.to_vec())),
225                        None => Ok(None), // Tombstone
226                    };
227                }
228            }
229        }
230
231        // Check SSTables (newest first, level 0 first)
232        {
233            let manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
234
235            // Check each level
236            for level in 0..7 {
237                let sstables = manifest.sstables_at_level(level);
238
239                // Sort by sequence (newest first)
240                let mut sorted: Vec<_> = sstables.iter().collect();
241                sorted.sort_by(|a, b| b.sequence.cmp(&a.sequence));
242
243                for sst in sorted {
244                    // Quick range check
245                    if key < sst.min_key.as_slice() || key > sst.max_key.as_slice() {
246                        continue;
247                    }
248
249                    // Open and search SSTable
250                    let path = PathBuf::from(&sst.path);
251                    if let Ok(mut reader) = SSTableReader::open(&path) {
252                        if let Ok(Some(entry)) = reader.get(key) {
253                            if entry.is_tombstone() {
254                                return Ok(None);
255                            }
256                            return Ok(Some(entry.value));
257                        }
258                    }
259                }
260            }
261        }
262
263        Ok(None)
264    }
265
266    /// Delete a key
267    pub fn delete(&self, key: &[u8]) -> Result<()> {
268        // Get next sequence number
269        let _seq = {
270            let mut sequence = self.sequence.write().map_err(|_| Error::LockPoisoned)?;
271            *sequence += 1;
272            *sequence
273        };
274
275        // Write to WAL first
276        {
277            let mut wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
278            let record = WalRecord::delete(key.to_vec());
279            wal.append(record)?;
280        }
281
282        // Write tombstone to memtable
283        {
284            let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
285            memtable.delete(key.to_vec());
286        }
287
288        Ok(())
289    }
290
291    /// Check if memtable needs flushing and trigger if so
292    fn maybe_flush(&self) -> Result<()> {
293        let should_flush = {
294            let memtable = self.memtable.read().map_err(|_| Error::LockPoisoned)?;
295            memtable.size_bytes() >= self.config.memtable_size
296        };
297
298        if should_flush {
299            self.flush()?;
300        }
301
302        Ok(())
303    }
304
305    /// Flush the current memtable to disk as an SSTable
306    pub fn flush(&self) -> Result<()> {
307        // Swap memtable
308        let old_memtable = {
309            let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
310            let sequence = memtable.sequence();
311            let old = std::mem::replace(&mut *memtable, Memtable::with_sequence(sequence));
312            Arc::new(old)
313        };
314
315        if old_memtable.is_empty() {
316            return Ok(());
317        }
318
319        // Add to immutable list
320        {
321            let mut immutable = self
322                .immutable_memtables
323                .lock()
324                .map_err(|_| Error::LockPoisoned)?;
325            immutable.push(Arc::clone(&old_memtable));
326        }
327
328        // Generate SSTable path
329        let timestamp = std::time::SystemTime::now()
330            .duration_since(std::time::UNIX_EPOCH)
331            .unwrap_or_default()
332            .as_millis();
333        let sst_path = self.dir.join("sst").join(format!("L0_{}.sst", timestamp));
334
335        // Create a cloned memtable for iteration
336        let mt_for_iter = {
337            let entries: Vec<_> = old_memtable
338                .iter()
339                .map(|(k, v)| (k.clone(), v.clone()))
340                .collect();
341            entries
342        };
343
344        // Write SSTable
345        let meta = SSTableWriter::from_memtable(&sst_path, mt_for_iter.into_iter())?;
346
347        // Update manifest
348        {
349            let mut manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
350            manifest.add_sstable(&meta)?;
351            manifest.update_sequence(old_memtable.sequence())?;
352        }
353
354        // Remove from immutable list
355        {
356            let mut immutable = self
357                .immutable_memtables
358                .lock()
359                .map_err(|_| Error::LockPoisoned)?;
360            immutable.retain(|m| !Arc::ptr_eq(m, &old_memtable));
361        }
362
363        // Maybe trigger compaction
364        if self.config.enable_compaction {
365            self.maybe_compact()?;
366        }
367
368        Ok(())
369    }
370
371    /// Check if compaction is needed and run if so
372    fn maybe_compact(&self) -> Result<()> {
373        let mut compactor = self.compactor.lock().map_err(|_| Error::LockPoisoned)?;
374        let mut manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
375
376        if compactor.needs_compaction(&manifest) {
377            compactor.compact_level0(&mut manifest)?;
378        }
379
380        Ok(())
381    }
382
383    /// Force sync all data to disk
384    pub fn sync(&self) -> Result<()> {
385        // Sync WAL
386        {
387            let mut wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
388            wal.sync()?;
389        }
390
391        // Flush memtable
392        self.flush()?;
393
394        // Rewrite manifest
395        {
396            let mut manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
397            manifest.rewrite()?;
398        }
399
400        Ok(())
401    }
402
403    /// Get storage statistics
404    pub fn stats(&self) -> StorageStats {
405        let memtable = self.memtable.read().ok();
406        let manifest = self.manifest.lock().ok();
407        let compactor = self.compactor.lock().ok();
408
409        let (memtable_size, memtable_entries) = match &memtable {
410            Some(m) => (m.size_bytes(), m.len()),
411            None => (0, 0),
412        };
413
414        StorageStats {
415            memtable_size,
416            memtable_entries,
417            sstable_count: manifest
418                .as_ref()
419                .map(|m| m.all_sstables().len())
420                .unwrap_or(0),
421            total_disk_size: manifest.as_ref().map(|m| m.total_size()).unwrap_or(0),
422            level_counts: manifest.map(|m| m.level_counts()).unwrap_or_default(),
423            compaction_stats: compactor.map(|c| c.stats().clone()).unwrap_or_default(),
424        }
425    }
426
427    /// Close the storage engine
428    pub fn close(self) -> Result<()> {
429        // Flush any remaining data
430        self.flush()?;
431        self.sync()?;
432        Ok(())
433    }
434}
435
436/// Storage statistics
437#[derive(Debug, Clone, Default)]
438pub struct StorageStats {
439    /// Current memtable size in bytes
440    pub memtable_size: u64,
441    /// Number of entries in memtable
442    pub memtable_entries: usize,
443    /// Total number of SSTables
444    pub sstable_count: usize,
445    /// Total disk size of SSTables
446    pub total_disk_size: u64,
447    /// Number of SSTables at each level
448    pub level_counts: Vec<usize>,
449    /// Compaction statistics
450    pub compaction_stats: CompactionStats,
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use tempfile::tempdir;
457
458    #[test]
459    fn test_storage_engine_basic() {
460        let dir = tempdir().unwrap();
461        let engine = StorageEngine::open(dir.path()).unwrap();
462
463        // Put and get
464        engine.put(b"key1", b"value1").unwrap();
465        engine.put(b"key2", b"value2").unwrap();
466
467        assert_eq!(engine.get(b"key1").unwrap(), Some(b"value1".to_vec()));
468        assert_eq!(engine.get(b"key2").unwrap(), Some(b"value2".to_vec()));
469        assert_eq!(engine.get(b"key3").unwrap(), None);
470    }
471
472    #[test]
473    fn test_storage_engine_update() {
474        let dir = tempdir().unwrap();
475        let engine = StorageEngine::open(dir.path()).unwrap();
476
477        engine.put(b"key", b"value1").unwrap();
478        assert_eq!(engine.get(b"key").unwrap(), Some(b"value1".to_vec()));
479
480        engine.put(b"key", b"value2").unwrap();
481        assert_eq!(engine.get(b"key").unwrap(), Some(b"value2".to_vec()));
482    }
483
484    #[test]
485    fn test_storage_engine_delete() {
486        let dir = tempdir().unwrap();
487        let engine = StorageEngine::open(dir.path()).unwrap();
488
489        engine.put(b"key", b"value").unwrap();
490        assert_eq!(engine.get(b"key").unwrap(), Some(b"value".to_vec()));
491
492        engine.delete(b"key").unwrap();
493        assert_eq!(engine.get(b"key").unwrap(), None);
494    }
495
496    #[test]
497    fn test_storage_engine_flush() {
498        let dir = tempdir().unwrap();
499        let config = StorageConfig {
500            memtable_size: 100, // Very small to trigger flush
501            enable_compaction: false,
502            ..Default::default()
503        };
504        let engine = StorageEngine::open_with_config(dir.path(), config).unwrap();
505
506        // Write enough to trigger flush
507        for i in 0..10 {
508            let key = format!("key{:03}", i);
509            let value = format!("value{}", i);
510            engine.put(key.as_bytes(), value.as_bytes()).unwrap();
511        }
512
513        // Force flush
514        engine.flush().unwrap();
515
516        // Data should still be accessible
517        assert_eq!(engine.get(b"key000").unwrap(), Some(b"value0".to_vec()));
518
519        // Check stats
520        let stats = engine.stats();
521        assert!(stats.sstable_count > 0 || stats.memtable_entries > 0);
522    }
523
524    #[test]
525    fn test_storage_engine_recovery() {
526        let dir = tempdir().unwrap();
527
528        // Write some data
529        {
530            let engine = StorageEngine::open(dir.path()).unwrap();
531            engine.put(b"persistent", b"data").unwrap();
532            // Don't call close - simulate crash
533        }
534
535        // Reopen and verify data is recovered
536        {
537            let engine = StorageEngine::open(dir.path()).unwrap();
538            assert_eq!(engine.get(b"persistent").unwrap(), Some(b"data".to_vec()));
539        }
540    }
541
542    #[test]
543    fn test_storage_stats() {
544        let dir = tempdir().unwrap();
545        let engine = StorageEngine::open(dir.path()).unwrap();
546
547        engine.put(b"key", b"value").unwrap();
548
549        let stats = engine.stats();
550        assert!(stats.memtable_size > 0 || stats.memtable_entries > 0);
551    }
552}