rustlite_storage/
lib.rs

1//! # RustLite Storage Engine
2//!
3//! LSM-tree based persistent storage engine for RustLite.
4//!
5//! ## ⚠️ Internal Implementation Detail
6//!
7//! **This crate is an internal implementation detail of RustLite.**
8//! 
9//! Users should depend on the main [`rustlite`](https://crates.io/crates/rustlite) crate
10//! instead, which provides the stable public API. This crate's API may change
11//! without notice between minor versions.
12//!
13//! ```toml
14//! # In your Cargo.toml - use the main crate, not this one:
15//! [dependencies]
16//! rustlite = "0.2"
17//! ```
18//!
19//! ---
20//!
21//! This crate provides the storage engine for RustLite, implementing an
22//! LSM-tree (Log-Structured Merge-tree) architecture with:
23//!
24//! - **Memtable**: In-memory write buffer using BTreeMap for sorted order
25//! - **SSTable**: Immutable on-disk sorted string tables
26//! - **Compaction**: Background merging to reduce read amplification
27//! - **Manifest**: Metadata tracking for crash recovery
28//!
29//! ## Architecture
30//!
31//! ```text
32//! Writes → Memtable (memory) → SSTable (disk)
33//!              ↓                    ↓
34//!         Flush when full    Compact to lower levels
35//! ```
36
37use rustlite_core::{Error, Result};
38use rustlite_wal::{RecordPayload, SyncMode, WalConfig, WalManager, WalRecord};
39use std::path::{Path, PathBuf};
40use std::sync::{Arc, Mutex, RwLock};
41
42pub mod compaction;
43pub mod manifest;
44pub mod memtable;
45pub mod sstable;
46
47pub use compaction::{CompactionConfig, CompactionStats, CompactionWorker};
48pub use manifest::{Manifest, ManifestSSTable};
49pub use memtable::{Memtable, MemtableEntry};
50pub use sstable::{SSTableEntry, SSTableMeta, SSTableReader, SSTableWriter};
51
52/// Default memtable flush threshold (4MB)
53const DEFAULT_MEMTABLE_SIZE: u64 = 4 * 1024 * 1024;
54
55/// Storage engine configuration
56#[derive(Debug, Clone)]
57pub struct StorageConfig {
58    /// Maximum memtable size before flushing
59    pub memtable_size: u64,
60    /// Sync mode for WAL
61    pub sync_mode: SyncMode,
62    /// Compaction configuration
63    pub compaction: CompactionConfig,
64    /// Enable background compaction
65    pub enable_compaction: bool,
66}
67
68impl Default for StorageConfig {
69    fn default() -> Self {
70        Self {
71            memtable_size: DEFAULT_MEMTABLE_SIZE,
72            sync_mode: SyncMode::Sync,
73            compaction: CompactionConfig::default(),
74            enable_compaction: true,
75        }
76    }
77}
78
79/// Storage engine manager
80///
81/// Provides a persistent key-value storage using LSM-tree architecture.
82pub struct StorageEngine {
83    /// Database directory
84    dir: PathBuf,
85    /// Configuration
86    config: StorageConfig,
87    /// Active memtable
88    memtable: Arc<RwLock<Memtable>>,
89    /// Immutable memtables being flushed
90    immutable_memtables: Arc<Mutex<Vec<Arc<Memtable>>>>,
91    /// Write-ahead log
92    wal: Arc<Mutex<WalManager>>,
93    /// Manifest
94    manifest: Arc<Mutex<Manifest>>,
95    /// Compaction worker
96    compactor: Arc<Mutex<CompactionWorker>>,
97    /// Current sequence number
98    sequence: Arc<RwLock<u64>>,
99}
100
101impl StorageEngine {
102    /// Open or create a storage engine at the given path
103    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
104        Self::open_with_config(path, StorageConfig::default())
105    }
106
107    /// Open or create a storage engine with custom configuration
108    pub fn open_with_config(path: impl AsRef<Path>, config: StorageConfig) -> Result<Self> {
109        let dir = path.as_ref().to_path_buf();
110        std::fs::create_dir_all(&dir)?;
111
112        // Create subdirectories
113        std::fs::create_dir_all(dir.join("wal"))?;
114        std::fs::create_dir_all(dir.join("sst"))?;
115
116        // Open WAL
117        let wal_config = WalConfig {
118            wal_dir: dir.join("wal"),
119            sync_mode: config.sync_mode,
120            ..Default::default()
121        };
122        let mut wal = WalManager::new(wal_config)?;
123        wal.open()?;
124
125        // Open manifest
126        let manifest = Manifest::open(&dir)?;
127        let sequence = manifest.sequence();
128
129        // Create compactor
130        let compactor = CompactionWorker::new(&dir, config.compaction.clone());
131
132        // Create memtable
133        let memtable = Memtable::with_sequence(sequence);
134
135        let engine = Self {
136            dir,
137            config,
138            memtable: Arc::new(RwLock::new(memtable)),
139            immutable_memtables: Arc::new(Mutex::new(Vec::new())),
140            wal: Arc::new(Mutex::new(wal)),
141            manifest: Arc::new(Mutex::new(manifest)),
142            compactor: Arc::new(Mutex::new(compactor)),
143            sequence: Arc::new(RwLock::new(sequence)),
144        };
145
146        // Recover from WAL
147        engine.recover()?;
148
149        Ok(engine)
150    }
151
152    /// Recover from WAL after crash
153    fn recover(&self) -> Result<()> {
154        let wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
155        let records = wal.recover()?;
156
157        let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
158
159        for record in records {
160            match &record.payload {
161                RecordPayload::Put { key, value } => {
162                    memtable.put(key.clone(), value.clone());
163                }
164                RecordPayload::Delete { key } => {
165                    memtable.delete(key.clone());
166                }
167                _ => {}
168            }
169        }
170        
171        Ok(())
172    }
173
174    /// Insert or update a key-value pair
175    pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
176        // Get next sequence number
177        let _seq = {
178            let mut sequence = self.sequence.write().map_err(|_| Error::LockPoisoned)?;
179            *sequence += 1;
180            *sequence
181        };
182
183        // Write to WAL first
184        {
185            let mut wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
186            let record = WalRecord::put(key.to_vec(), value.to_vec());
187            wal.append(record)?;
188        }
189
190        // Write to memtable
191        {
192            let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
193            memtable.put(key.to_vec(), value.to_vec());
194        }
195
196        // Check if flush is needed
197        self.maybe_flush()?;
198
199        Ok(())
200    }
201
202    /// Retrieve a value by key
203    pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
204        // Check active memtable first
205        {
206            let memtable = self.memtable.read().map_err(|_| Error::LockPoisoned)?;
207            if let Some(result) = memtable.get(key) {
208                return match result {
209                    Some(value) => Ok(Some(value.to_vec())),
210                    None => Ok(None), // Tombstone
211                };
212            }
213        }
214        
215        // Check immutable memtables (newest first)
216        {
217            let immutable = self.immutable_memtables.lock().map_err(|_| Error::LockPoisoned)?;
218            for mt in immutable.iter().rev() {
219                if let Some(result) = mt.get(key) {
220                    return match result {
221                        Some(value) => Ok(Some(value.to_vec())),
222                        None => Ok(None), // Tombstone
223                    };
224                }
225            }
226        }
227        
228        // Check SSTables (newest first, level 0 first)
229        {
230            let manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
231            
232            // Check each level
233            for level in 0..7 {
234                let sstables = manifest.sstables_at_level(level);
235                
236                // Sort by sequence (newest first)
237                let mut sorted: Vec<_> = sstables.iter().collect();
238                sorted.sort_by(|a, b| b.sequence.cmp(&a.sequence));
239                
240                for sst in sorted {
241                    // Quick range check
242                    if key < sst.min_key.as_slice() || key > sst.max_key.as_slice() {
243                        continue;
244                    }
245                    
246                    // Open and search SSTable
247                    let path = PathBuf::from(&sst.path);
248                    if let Ok(mut reader) = SSTableReader::open(&path) {
249                        if let Ok(Some(entry)) = reader.get(key) {
250                            if entry.is_tombstone() {
251                                return Ok(None);
252                            }
253                            return Ok(Some(entry.value));
254                        }
255                    }
256                }
257            }
258        }
259        
260        Ok(None)
261    }
262
263    /// Delete a key
264    pub fn delete(&self, key: &[u8]) -> Result<()> {
265        // Get next sequence number
266        let _seq = {
267            let mut sequence = self.sequence.write().map_err(|_| Error::LockPoisoned)?;
268            *sequence += 1;
269            *sequence
270        };
271
272        // Write to WAL first
273        {
274            let mut wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
275            let record = WalRecord::delete(key.to_vec());
276            wal.append(record)?;
277        }
278
279        // Write tombstone to memtable
280        {
281            let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
282            memtable.delete(key.to_vec());
283        }
284
285        Ok(())
286    }
287
288    /// Check if memtable needs flushing and trigger if so
289    fn maybe_flush(&self) -> Result<()> {
290        let should_flush = {
291            let memtable = self.memtable.read().map_err(|_| Error::LockPoisoned)?;
292            memtable.size_bytes() >= self.config.memtable_size
293        };
294        
295        if should_flush {
296            self.flush()?;
297        }
298        
299        Ok(())
300    }
301
302    /// Flush the current memtable to disk as an SSTable
303    pub fn flush(&self) -> Result<()> {
304        // Swap memtable
305        let old_memtable = {
306            let mut memtable = self.memtable.write().map_err(|_| Error::LockPoisoned)?;
307            let sequence = memtable.sequence();
308            let old = std::mem::replace(&mut *memtable, Memtable::with_sequence(sequence));
309            Arc::new(old)
310        };
311        
312        if old_memtable.is_empty() {
313            return Ok(());
314        }
315        
316        // Add to immutable list
317        {
318            let mut immutable = self.immutable_memtables.lock().map_err(|_| Error::LockPoisoned)?;
319            immutable.push(Arc::clone(&old_memtable));
320        }
321        
322        // Generate SSTable path
323        let timestamp = std::time::SystemTime::now()
324            .duration_since(std::time::UNIX_EPOCH)
325            .unwrap_or_default()
326            .as_millis();
327        let sst_path = self.dir.join("sst").join(format!("L0_{}.sst", timestamp));
328        
329        // Create a cloned memtable for iteration
330        let mt_for_iter = {
331            let entries: Vec<_> = old_memtable.iter()
332                .map(|(k, v)| (k.clone(), v.clone()))
333                .collect();
334            entries
335        };
336        
337        // Write SSTable
338        let meta = SSTableWriter::from_memtable(&sst_path, mt_for_iter.into_iter())?;
339        
340        // Update manifest
341        {
342            let mut manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
343            manifest.add_sstable(&meta)?;
344            manifest.update_sequence(old_memtable.sequence())?;
345        }
346        
347        // Remove from immutable list
348        {
349            let mut immutable = self.immutable_memtables.lock().map_err(|_| Error::LockPoisoned)?;
350            immutable.retain(|m| !Arc::ptr_eq(m, &old_memtable));
351        }
352        
353        // Maybe trigger compaction
354        if self.config.enable_compaction {
355            self.maybe_compact()?;
356        }
357        
358        Ok(())
359    }
360
361    /// Check if compaction is needed and run if so
362    fn maybe_compact(&self) -> Result<()> {
363        let mut compactor = self.compactor.lock().map_err(|_| Error::LockPoisoned)?;
364        let mut manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
365        
366        if compactor.needs_compaction(&manifest) {
367            compactor.compact_level0(&mut manifest)?;
368        }
369        
370        Ok(())
371    }
372
373    /// Force sync all data to disk
374    pub fn sync(&self) -> Result<()> {
375        // Sync WAL
376        {
377            let mut wal = self.wal.lock().map_err(|_| Error::LockPoisoned)?;
378            wal.sync()?;
379        }
380        
381        // Flush memtable
382        self.flush()?;
383        
384        // Rewrite manifest
385        {
386            let mut manifest = self.manifest.lock().map_err(|_| Error::LockPoisoned)?;
387            manifest.rewrite()?;
388        }
389        
390        Ok(())
391    }
392
393    /// Get storage statistics
394    pub fn stats(&self) -> StorageStats {
395        let memtable = self.memtable.read().ok();
396        let manifest = self.manifest.lock().ok();
397        let compactor = self.compactor.lock().ok();
398
399        let (memtable_size, memtable_entries) = match &memtable {
400            Some(m) => (m.size_bytes(), m.len()),
401            None => (0, 0),
402        };
403
404        StorageStats {
405            memtable_size,
406            memtable_entries,
407            sstable_count: manifest.as_ref().map(|m| m.all_sstables().len()).unwrap_or(0),
408            total_disk_size: manifest.as_ref().map(|m| m.total_size()).unwrap_or(0),
409            level_counts: manifest.map(|m| m.level_counts()).unwrap_or_default(),
410            compaction_stats: compactor.map(|c| c.stats().clone()).unwrap_or_default(),
411        }
412    }
413
414    /// Close the storage engine
415    pub fn close(self) -> Result<()> {
416        // Flush any remaining data
417        self.flush()?;
418        self.sync()?;
419        Ok(())
420    }
421}
422
423/// Storage statistics
424#[derive(Debug, Clone, Default)]
425pub struct StorageStats {
426    /// Current memtable size in bytes
427    pub memtable_size: u64,
428    /// Number of entries in memtable
429    pub memtable_entries: usize,
430    /// Total number of SSTables
431    pub sstable_count: usize,
432    /// Total disk size of SSTables
433    pub total_disk_size: u64,
434    /// Number of SSTables at each level
435    pub level_counts: Vec<usize>,
436    /// Compaction statistics
437    pub compaction_stats: CompactionStats,
438}
439
440#[cfg(test)]
441mod tests {
442    use super::*;
443    use tempfile::tempdir;
444
445    #[test]
446    fn test_storage_engine_basic() {
447        let dir = tempdir().unwrap();
448        let engine = StorageEngine::open(dir.path()).unwrap();
449        
450        // Put and get
451        engine.put(b"key1", b"value1").unwrap();
452        engine.put(b"key2", b"value2").unwrap();
453        
454        assert_eq!(engine.get(b"key1").unwrap(), Some(b"value1".to_vec()));
455        assert_eq!(engine.get(b"key2").unwrap(), Some(b"value2".to_vec()));
456        assert_eq!(engine.get(b"key3").unwrap(), None);
457    }
458
459    #[test]
460    fn test_storage_engine_update() {
461        let dir = tempdir().unwrap();
462        let engine = StorageEngine::open(dir.path()).unwrap();
463        
464        engine.put(b"key", b"value1").unwrap();
465        assert_eq!(engine.get(b"key").unwrap(), Some(b"value1".to_vec()));
466        
467        engine.put(b"key", b"value2").unwrap();
468        assert_eq!(engine.get(b"key").unwrap(), Some(b"value2".to_vec()));
469    }
470
471    #[test]
472    fn test_storage_engine_delete() {
473        let dir = tempdir().unwrap();
474        let engine = StorageEngine::open(dir.path()).unwrap();
475        
476        engine.put(b"key", b"value").unwrap();
477        assert_eq!(engine.get(b"key").unwrap(), Some(b"value".to_vec()));
478        
479        engine.delete(b"key").unwrap();
480        assert_eq!(engine.get(b"key").unwrap(), None);
481    }
482
483    #[test]
484    fn test_storage_engine_flush() {
485        let dir = tempdir().unwrap();
486        let config = StorageConfig {
487            memtable_size: 100, // Very small to trigger flush
488            enable_compaction: false,
489            ..Default::default()
490        };
491        let engine = StorageEngine::open_with_config(dir.path(), config).unwrap();
492        
493        // Write enough to trigger flush
494        for i in 0..10 {
495            let key = format!("key{:03}", i);
496            let value = format!("value{}", i);
497            engine.put(key.as_bytes(), value.as_bytes()).unwrap();
498        }
499        
500        // Force flush
501        engine.flush().unwrap();
502        
503        // Data should still be accessible
504        assert_eq!(engine.get(b"key000").unwrap(), Some(b"value0".to_vec()));
505        
506        // Check stats
507        let stats = engine.stats();
508        assert!(stats.sstable_count > 0 || stats.memtable_entries > 0);
509    }
510
511    #[test]
512    fn test_storage_engine_recovery() {
513        let dir = tempdir().unwrap();
514        
515        // Write some data
516        {
517            let engine = StorageEngine::open(dir.path()).unwrap();
518            engine.put(b"persistent", b"data").unwrap();
519            // Don't call close - simulate crash
520        }
521        
522        // Reopen and verify data is recovered
523        {
524            let engine = StorageEngine::open(dir.path()).unwrap();
525            assert_eq!(engine.get(b"persistent").unwrap(), Some(b"data".to_vec()));
526        }
527    }
528
529    #[test]
530    fn test_storage_stats() {
531        let dir = tempdir().unwrap();
532        let engine = StorageEngine::open(dir.path()).unwrap();
533        
534        engine.put(b"key", b"value").unwrap();
535        
536        let stats = engine.stats();
537        assert!(stats.memtable_size > 0 || stats.memtable_entries > 0);
538    }
539}