Skip to main content

amaters_core/storage/
lsm_tree.rs

1//! LSM-Tree (Log-Structured Merge-Tree) implementation
2//!
3//! Multi-level persistent storage engine integrating:
4//! - Memtable (in-memory writes)
5//! - WAL (durability)
6//! - SSTables (persistent storage)
7//! - Block Cache (read optimization)
8
9use crate::error::{AmateRSError, ErrorContext, Result};
10use crate::storage::{
11    BlockCache, BlockCacheConfig, BlockCacheKey, CachedBlock, CompactionConfig, CompactionExecutor,
12    CompactionPlanner, Memtable, MemtableConfig, SSTableConfig, SSTableReader, SSTableWriter,
13    ValueLog, ValueLogConfig, ValuePointer, Wal,
14};
15use crate::types::{CipherBlob, Key};
16use parking_lot::RwLock;
17use std::collections::BTreeMap;
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20
21/// SSTable metadata
22#[derive(Debug, Clone)]
23pub struct SSTableMetadata {
24    /// SSTable file path
25    pub path: PathBuf,
26    /// Minimum key in the SSTable
27    pub min_key: Key,
28    /// Maximum key in the SSTable
29    pub max_key: Key,
30    /// Number of entries
31    pub num_entries: usize,
32    /// File size in bytes
33    pub file_size: u64,
34    /// Level in the LSM-Tree
35    pub level: usize,
36}
37
38/// Level information
39#[derive(Debug, Clone)]
40pub struct LevelInfo {
41    /// Level number (0 = L0, 1 = L1, etc.)
42    pub level: usize,
43    /// SSTables in this level
44    pub sstables: Vec<SSTableMetadata>,
45    /// Total size in bytes
46    pub total_size: u64,
47}
48
49impl LevelInfo {
50    fn new(level: usize) -> Self {
51        Self {
52            level,
53            sstables: Vec::new(),
54            total_size: 0,
55        }
56    }
57
58    fn add_sstable(&mut self, metadata: SSTableMetadata) {
59        self.total_size += metadata.file_size;
60        self.sstables.push(metadata);
61    }
62}
63
64/// LSM-Tree configuration
65#[derive(Debug, Clone)]
66pub struct LsmTreeConfig {
67    /// Directory for storing SSTables
68    pub data_dir: PathBuf,
69    /// WAL directory
70    pub wal_dir: PathBuf,
71    /// Memtable configuration
72    pub memtable_config: MemtableConfig,
73    /// SSTable configuration
74    pub sstable_config: SSTableConfig,
75    /// Block cache configuration
76    pub block_cache_config: BlockCacheConfig,
77    /// Compaction configuration
78    pub compaction_config: CompactionConfig,
79    /// Value log configuration (optional, for WiscKey value separation)
80    pub value_log_config: Option<ValueLogConfig>,
81    /// Maximum number of levels (default: 7)
82    pub max_levels: usize,
83    /// L0 size threshold for compaction (default: 4 SSTables)
84    pub l0_compaction_threshold: usize,
85    /// Level size multiplier (default: 10x per level)
86    pub level_size_multiplier: usize,
87}
88
89impl Default for LsmTreeConfig {
90    fn default() -> Self {
91        Self {
92            data_dir: PathBuf::from("./data"),
93            wal_dir: PathBuf::from("./wal"),
94            memtable_config: MemtableConfig::default(),
95            sstable_config: SSTableConfig::default(),
96            block_cache_config: BlockCacheConfig::default(),
97            compaction_config: CompactionConfig::default(),
98            value_log_config: None, // Disabled by default for backward compatibility
99            max_levels: 7,
100            l0_compaction_threshold: 4,
101            level_size_multiplier: 10,
102        }
103    }
104}
105
106/// Read-ahead prefetch configuration for sequential scan optimization.
107#[derive(Debug, Clone)]
108pub struct PrefetchConfig {
109    /// Number of blocks to prefetch ahead during sequential scan.
110    pub read_ahead_blocks: usize,
111    /// Whether to use OS-level madvise(MADV_SEQUENTIAL) hint.
112    /// Disabled by default to maintain Pure Rust portability.
113    pub use_madvise: bool,
114}
115
116impl Default for PrefetchConfig {
117    fn default() -> Self {
118        Self {
119            read_ahead_blocks: 4,
120            use_madvise: false,
121        }
122    }
123}
124
125/// LSM-Tree storage engine
126pub struct LsmTree {
127    /// Configuration
128    config: LsmTreeConfig,
129    /// Current memtable (active writes)
130    memtable: Arc<Memtable>,
131    /// Immutable memtable being flushed (if any)
132    immutable_memtable: Arc<RwLock<Option<Arc<Memtable>>>>,
133    /// Write-ahead log
134    wal: Arc<RwLock<Wal>>,
135    /// Value log for large values (WiscKey)
136    value_log: Option<Arc<ValueLog>>,
137    /// Levels (L0, L1, L2, ...)
138    levels: Arc<RwLock<Vec<LevelInfo>>>,
139    /// Block cache for SSTable blocks
140    block_cache: Arc<BlockCache>,
141    /// Next SSTable ID
142    next_sstable_id: Arc<RwLock<u64>>,
143    /// Compaction planner
144    compaction_planner: CompactionPlanner,
145    /// Compaction executor
146    compaction_executor: Arc<RwLock<CompactionExecutor>>,
147    /// Read-ahead prefetch configuration for sequential scans.
148    prefetch_config: PrefetchConfig,
149}
150
151impl LsmTree {
152    /// Create a new LSM-Tree with default configuration
153    pub fn new<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
154        let config = LsmTreeConfig {
155            data_dir: data_dir.as_ref().to_path_buf(),
156            ..Default::default()
157        };
158        Self::with_config(config)
159    }
160
161    /// Create a new LSM-Tree with custom configuration
162    pub fn with_config(config: LsmTreeConfig) -> Result<Self> {
163        // Create directories
164        std::fs::create_dir_all(&config.data_dir).map_err(|e| {
165            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
166                "Failed to create data directory: {}",
167                e
168            )))
169        })?;
170
171        std::fs::create_dir_all(&config.wal_dir).map_err(|e| {
172            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
173                "Failed to create WAL directory: {}",
174                e
175            )))
176        })?;
177
178        // Initialize WAL
179        let wal_path = config.wal_dir.join("wal.log");
180        let wal = Wal::create(wal_path)?;
181
182        // Initialize memtable
183        let memtable = Memtable::with_config(config.memtable_config.clone());
184
185        // Initialize levels
186        let mut levels = Vec::with_capacity(config.max_levels);
187        for i in 0..config.max_levels {
188            levels.push(LevelInfo::new(i));
189        }
190
191        // Initialize block cache
192        let block_cache = BlockCache::with_config(config.block_cache_config.clone());
193
194        // Initialize compaction
195        let compaction_planner = CompactionPlanner::new(config.compaction_config.clone());
196        let compaction_executor = CompactionExecutor::new(config.sstable_config.clone());
197
198        // Initialize value log if configured
199        let value_log = if let Some(ref vlog_config) = config.value_log_config {
200            Some(Arc::new(ValueLog::with_config(vlog_config.clone())?))
201        } else {
202            None
203        };
204
205        let mut lsm = Self {
206            config,
207            memtable: Arc::new(memtable),
208            immutable_memtable: Arc::new(RwLock::new(None)),
209            wal: Arc::new(RwLock::new(wal)),
210            value_log,
211            levels: Arc::new(RwLock::new(levels)),
212            block_cache: Arc::new(block_cache),
213            next_sstable_id: Arc::new(RwLock::new(0)),
214            compaction_planner,
215            compaction_executor: Arc::new(RwLock::new(compaction_executor)),
216            prefetch_config: PrefetchConfig::default(),
217        };
218
219        // Recover existing SSTables from disk
220        lsm.recover_sstables()?;
221
222        Ok(lsm)
223    }
224
225    /// Recover existing SSTables from the data directory
226    fn recover_sstables(&mut self) -> Result<()> {
227        use std::fs;
228
229        // Scan data directory for SSTable files
230        let entries = fs::read_dir(&self.config.data_dir).map_err(|e| {
231            AmateRSError::IoError(ErrorContext::new(format!(
232                "Failed to read data directory: {}",
233                e
234            )))
235        })?;
236
237        let mut sstables_by_level: BTreeMap<usize, Vec<SSTableMetadata>> = BTreeMap::new();
238        let mut max_id = 0u64;
239
240        for entry in entries {
241            let entry = entry.map_err(|e| {
242                AmateRSError::IoError(ErrorContext::new(format!(
243                    "Failed to read directory entry: {}",
244                    e
245                )))
246            })?;
247
248            let path = entry.path();
249            let filename = match path.file_name().and_then(|n| n.to_str()) {
250                Some(name) => name,
251                None => continue,
252            };
253
254            // Parse filename format: L{level}_{id}.sst
255            if filename.starts_with('L') && filename.ends_with(".sst") {
256                let parts: Vec<&str> = filename[1..].trim_end_matches(".sst").split('_').collect();
257                if parts.len() == 2 {
258                    if let (Ok(level), Ok(id)) =
259                        (parts[0].parse::<usize>(), parts[1].parse::<u64>())
260                    {
261                        // Update max ID
262                        if id > max_id {
263                            max_id = id;
264                        }
265
266                        // Read SSTable metadata
267                        let reader = SSTableReader::open(&path)?;
268                        let (min_key, max_key, num_entries) = reader.metadata()?;
269
270                        let file_size = std::fs::metadata(&path)
271                            .map_err(|e| {
272                                AmateRSError::IoError(ErrorContext::new(format!(
273                                    "Failed to get file size: {}",
274                                    e
275                                )))
276                            })?
277                            .len();
278
279                        let metadata = SSTableMetadata {
280                            path: path.clone(),
281                            min_key,
282                            max_key,
283                            num_entries,
284                            file_size,
285                            level,
286                        };
287
288                        sstables_by_level.entry(level).or_default().push(metadata);
289                    }
290                }
291            }
292        }
293
294        // Add recovered SSTables to levels
295        let mut levels = self.levels.write();
296        for (level, mut sstables) in sstables_by_level {
297            if level < levels.len() {
298                // Sort SSTables by min_key for non-L0 levels
299                if level > 0 {
300                    sstables.sort_by(|a, b| a.min_key.cmp(&b.min_key));
301                }
302
303                for metadata in sstables {
304                    levels[level].add_sstable(metadata);
305                }
306            }
307        }
308        drop(levels);
309
310        // Set next SSTable ID
311        *self.next_sstable_id.write() = max_id + 1;
312
313        Ok(())
314    }
315
316    /// Put a key-value pair
317    pub fn put(&self, key: Key, value: CipherBlob) -> Result<()> {
318        // Check if value should be separated to vLog
319        let stored_value = if let Some(ref vlog) = self.value_log {
320            if vlog.should_separate(&value) {
321                // Store in vLog and get pointer
322                let pointer = vlog.append(key.clone(), value)?;
323                vlog.flush()?;
324
325                // Encode pointer as CipherBlob with "VPTR" magic prefix
326                Self::encode_value_pointer(&pointer)
327            } else {
328                // Store value directly
329                value
330            }
331        } else {
332            // No vLog configured, store value directly
333            value
334        };
335
336        // Write to WAL first (durability)
337        {
338            let mut wal = self.wal.write();
339            wal.put(key.clone(), stored_value.clone())?;
340        }
341
342        // Write to memtable
343        self.memtable.put(key, stored_value)?;
344
345        // Check if memtable should be flushed
346        if self.memtable.should_flush() {
347            self.try_flush_memtable()?;
348        }
349
350        Ok(())
351    }
352
353    /// Get a value by key
354    pub fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
355        // Check memtable first (most recent data)
356        if let Some(value) = self.memtable.get(key)? {
357            return self.resolve_value(value);
358        }
359
360        // Check immutable memtable if exists
361        {
362            let immutable = self.immutable_memtable.read();
363            if let Some(ref memtable) = *immutable {
364                if let Some(value) = memtable.get(key)? {
365                    return self.resolve_value(value);
366                }
367            }
368        }
369
370        // Search through levels (L0 to Ln)
371        let levels = self.levels.read();
372        for level_info in levels.iter() {
373            if let Some(value) = self.search_level(level_info, key)? {
374                return self.resolve_value(value);
375            }
376        }
377
378        Ok(None)
379    }
380
381    /// Resolve a value: if it's a ValuePointer, read from vLog; otherwise return as-is
382    fn resolve_value(&self, value: CipherBlob) -> Result<Option<CipherBlob>> {
383        // Check for tombstone (zero-length blob)
384        if value.as_bytes().is_empty() {
385            return Ok(None);
386        }
387
388        if Self::is_value_pointer(&value) {
389            if let Some(ref vlog) = self.value_log {
390                let pointer = Self::decode_value_pointer(&value)?;
391                let actual_value = vlog.read(&pointer)?;
392                Ok(Some(actual_value))
393            } else {
394                Err(AmateRSError::StorageIntegrity(ErrorContext::new(
395                    "Found value pointer but vLog is not configured".to_string(),
396                )))
397            }
398        } else {
399            Ok(Some(value))
400        }
401    }
402
403    /// Delete a key
404    pub fn delete(&self, key: Key) -> Result<()> {
405        // Write tombstone to WAL
406        {
407            let mut wal = self.wal.write();
408            wal.delete(key.clone())?;
409        }
410
411        // Write tombstone to memtable
412        self.memtable.delete(key)?;
413
414        // Check if memtable should be flushed
415        if self.memtable.should_flush() {
416            self.try_flush_memtable()?;
417        }
418
419        Ok(())
420    }
421
422    /// Range scan
423    pub fn range(&self, start: &Key, end: &Key) -> Result<Vec<(Key, CipherBlob)>> {
424        let mut results = BTreeMap::new();
425
426        // Collect from all levels (newer data overwrites older)
427        let levels = self.levels.read();
428        for level_info in levels.iter().rev() {
429            let level_results = self.range_scan_level(level_info, start, end)?;
430            for (k, v) in level_results {
431                results.entry(k).or_insert(v);
432            }
433        }
434
435        // Check immutable memtable
436        {
437            let immutable = self.immutable_memtable.read();
438            if let Some(ref memtable) = *immutable {
439                for (k, v) in memtable.range(start, end) {
440                    results.insert(k, v);
441                }
442            }
443        }
444
445        // Check memtable (most recent)
446        for (k, v) in self.memtable.range(start, end) {
447            results.insert(k, v);
448        }
449
450        Ok(results.into_iter().collect())
451    }
452
453    /// Search a specific level for a key
454    fn search_level(&self, level_info: &LevelInfo, key: &Key) -> Result<Option<CipherBlob>> {
455        // For L0, check all SSTables (may have overlapping ranges)
456        if level_info.level == 0 {
457            // Check newest first
458            for metadata in level_info.sstables.iter().rev() {
459                if key >= &metadata.min_key && key <= &metadata.max_key {
460                    if let Some(value) = self.read_from_sstable(&metadata.path, key)? {
461                        return Ok(Some(value));
462                    }
463                }
464            }
465        } else {
466            // For L1+, SSTables are non-overlapping, use binary search
467            let idx = level_info.sstables.binary_search_by(|metadata| {
468                if key < &metadata.min_key {
469                    std::cmp::Ordering::Greater
470                } else if key > &metadata.max_key {
471                    std::cmp::Ordering::Less
472                } else {
473                    std::cmp::Ordering::Equal
474                }
475            });
476
477            if let Ok(idx) = idx {
478                let metadata = &level_info.sstables[idx];
479                if let Some(value) = self.read_from_sstable(&metadata.path, key)? {
480                    return Ok(Some(value));
481                }
482            }
483        }
484
485        Ok(None)
486    }
487
488    /// Range scan a specific level
489    fn range_scan_level(
490        &self,
491        level_info: &LevelInfo,
492        start: &Key,
493        end: &Key,
494    ) -> Result<Vec<(Key, CipherBlob)>> {
495        let mut results = Vec::new();
496
497        for metadata in &level_info.sstables {
498            // Skip if SSTable range doesn't overlap with query range
499            if &metadata.max_key < start || &metadata.min_key > end {
500                continue;
501            }
502
503            // Read from SSTable
504            let reader = SSTableReader::open(&metadata.path)?;
505            let entries = reader.iter()?;
506
507            for (k, v) in entries {
508                // Range is [start, end) - start inclusive, end exclusive
509                if &k >= start && &k < end {
510                    results.push((k, v));
511                }
512            }
513        }
514
515        Ok(results)
516    }
517
518    /// Read a key from an SSTable with block cache
519    fn read_from_sstable(&self, path: &Path, key: &Key) -> Result<Option<CipherBlob>> {
520        let reader = SSTableReader::open(path)?;
521        reader.get(key)
522    }
523
524    /// Try to flush memtable to L0
525    fn try_flush_memtable(&self) -> Result<()> {
526        // Check if already flushing
527        {
528            let immutable = self.immutable_memtable.read();
529            if immutable.is_some() {
530                // Already flushing, skip
531                return Ok(());
532            }
533        }
534
535        // Swap memtable to immutable
536        {
537            let mut immutable = self.immutable_memtable.write();
538            if immutable.is_some() {
539                return Ok(());
540            }
541
542            // Swap current memtable with a new one
543            let old_memtable = Arc::clone(&self.memtable);
544            let new_memtable = Memtable::with_config(self.config.memtable_config.clone());
545
546            // Note: In a real implementation, we'd use Arc::make_mut or similar
547            // For now, this is a simplified version
548            *immutable = Some(old_memtable);
549        }
550
551        // Flush immutable memtable to SSTable
552        self.flush_immutable_memtable()?;
553
554        Ok(())
555    }
556
557    /// Flush immutable memtable to L0 SSTable
558    fn flush_immutable_memtable(&self) -> Result<()> {
559        let _span = tracing::debug_span!("amaters.storage.flush").entered();
560        let memtable = {
561            let mut immutable = self.immutable_memtable.write();
562            immutable.take()
563        };
564
565        if let Some(memtable) = memtable {
566            // Generate SSTable path
567            let sstable_id = {
568                let mut next_id = self.next_sstable_id.write();
569                let id = *next_id;
570                *next_id += 1;
571                id
572            };
573
574            let sstable_path = self
575                .config
576                .data_dir
577                .join(format!("L0_{:08}.sst", sstable_id));
578
579            // Write SSTable
580            let mut writer = SSTableWriter::new(&sstable_path, self.config.sstable_config.clone())?;
581
582            let entries = memtable.entries();
583            let mut min_key = None;
584            let mut max_key = None;
585            let mut num_entries = 0;
586
587            for (key, value_opt) in entries {
588                // Write both values and tombstones to SSTable
589                // Tombstones are written as zero-length blobs
590                let value = value_opt.unwrap_or_else(|| CipherBlob::new(Vec::new()));
591
592                if min_key.is_none() {
593                    min_key = Some(key.clone());
594                }
595                max_key = Some(key.clone());
596                writer.add(key, value)?;
597                num_entries += 1;
598            }
599
600            writer.finish()?;
601
602            // Get file size
603            let file_size = std::fs::metadata(&sstable_path)
604                .map_err(|e| {
605                    AmateRSError::StorageIntegrity(ErrorContext::new(format!(
606                        "Failed to get SSTable size: {}",
607                        e
608                    )))
609                })?
610                .len();
611
612            // Add to L0
613            if let (Some(min_key), Some(max_key)) = (min_key, max_key) {
614                let metadata = SSTableMetadata {
615                    path: sstable_path,
616                    min_key,
617                    max_key,
618                    num_entries,
619                    file_size,
620                    level: 0,
621                };
622
623                let mut levels = self.levels.write();
624                levels[0].add_sstable(metadata);
625            }
626
627            // Trigger compaction if L0 threshold exceeded
628            self.trigger_compaction()?;
629        }
630
631        Ok(())
632    }
633
634    /// Trigger compaction if needed
635    fn trigger_compaction(&self) -> Result<()> {
636        let levels = self.levels.read();
637
638        // Check L0 compaction threshold
639        let l0_count = levels[0].sstables.len();
640        if self.compaction_planner.needs_l0_compaction(l0_count) {
641            drop(levels); // Release read lock before compaction
642            return self.compact_l0_to_l1();
643        }
644
645        // Check other levels for size-based compaction
646        for level_info in levels.iter() {
647            if level_info.level > 0
648                && self
649                    .compaction_planner
650                    .needs_level_compaction(level_info.level, level_info.total_size)
651            {
652                let source_level = level_info.level;
653                drop(levels); // Release read lock
654                return self.compact_level(source_level);
655            }
656        }
657
658        Ok(())
659    }
660
661    /// Compact L0 to L1
662    fn compact_l0_to_l1(&self) -> Result<()> {
663        let (source_sstables, target_sstables) = {
664            let levels = self.levels.read();
665            let source = levels[0].sstables.clone();
666            let target = if levels.len() > 1 {
667                levels[1].sstables.clone()
668            } else {
669                Vec::new()
670            };
671            (source, target)
672        };
673
674        if let Some(task) =
675            self.compaction_planner
676                .plan_compaction(0, source_sstables, target_sstables)
677        {
678            self.execute_compaction_task(task)?;
679        }
680
681        Ok(())
682    }
683
684    /// Compact a specific level
685    fn compact_level(&self, source_level: usize) -> Result<()> {
686        let (source_sstables, target_sstables) = {
687            let levels = self.levels.read();
688            if source_level >= levels.len() {
689                return Ok(());
690            }
691
692            let source = levels[source_level].sstables.clone();
693            let target = if source_level + 1 < levels.len() {
694                levels[source_level + 1].sstables.clone()
695            } else {
696                Vec::new()
697            };
698            (source, target)
699        };
700
701        if let Some(task) =
702            self.compaction_planner
703                .plan_compaction(source_level, source_sstables, target_sstables)
704        {
705            self.execute_compaction_task(task)?;
706        }
707
708        Ok(())
709    }
710
711    /// Execute a compaction task
712    fn execute_compaction_task(&self, task: crate::storage::CompactionTask) -> Result<()> {
713        // Execute compaction
714        let output_sstables = {
715            let mut executor = self.compaction_executor.write();
716            let mut next_id = self.next_sstable_id.write();
717            executor.execute_compaction(task.clone(), &self.config.data_dir, &mut next_id)?
718        };
719
720        // Update levels: remove old SSTables, add new ones
721        let mut levels = self.levels.write();
722
723        // Remove source SSTables
724        levels[task.source_level]
725            .sstables
726            .retain(|s| !task.source_sstables.iter().any(|ts| ts.path == s.path));
727        levels[task.source_level].total_size = levels[task.source_level]
728            .sstables
729            .iter()
730            .map(|s| s.file_size)
731            .sum();
732
733        // Remove target SSTables
734        if task.target_level < levels.len() {
735            levels[task.target_level]
736                .sstables
737                .retain(|s| !task.target_sstables.iter().any(|ts| ts.path == s.path));
738            levels[task.target_level].total_size = levels[task.target_level]
739                .sstables
740                .iter()
741                .map(|s| s.file_size)
742                .sum();
743
744            // Add new SSTables
745            for sstable in output_sstables {
746                levels[task.target_level].add_sstable(sstable);
747            }
748        }
749
750        drop(levels);
751
752        // Delete old SSTable files
753        for sstable in task.source_sstables.iter().chain(&task.target_sstables) {
754            std::fs::remove_file(&sstable.path).ok();
755        }
756
757        Ok(())
758    }
759
760    /// Get level information
761    pub fn level_info(&self, level: usize) -> Option<LevelInfo> {
762        let levels = self.levels.read();
763        if level < levels.len() {
764            Some(levels[level].clone())
765        } else {
766            None
767        }
768    }
769
770    /// Get all levels information
771    pub fn all_levels_info(&self) -> Vec<LevelInfo> {
772        self.levels.read().clone()
773    }
774
775    /// Configure read-ahead prefetching for sequential scans.
776    ///
777    /// Currently stores the config; prefetching is triggered automatically
778    /// when sequential access patterns are detected during range scans.
779    pub fn with_prefetch(mut self, config: PrefetchConfig) -> Self {
780        self.prefetch_config = config;
781        self
782    }
783
784    /// Get statistics
785    pub fn stats(&self) -> LsmTreeStats {
786        let levels = self.levels.read();
787        let cache_stats = self.block_cache.stats();
788        let compaction_stats = self.compaction_executor.read().stats_snapshot();
789
790        LsmTreeStats {
791            memtable_size: self.memtable.size_bytes(),
792            num_levels: levels.len(),
793            levels: levels.clone(),
794            cache_hit_rate: cache_stats.hit_rate(),
795            cache_size: cache_stats.size_bytes,
796            compaction_stats,
797        }
798    }
799
800    /// Get all keys from the LSM-Tree
801    pub fn keys(&self) -> Result<Vec<Key>> {
802        let mut key_set = std::collections::BTreeSet::new();
803
804        // Collect from memtable
805        for (key, value_opt) in self.memtable.entries() {
806            if value_opt.is_some() {
807                key_set.insert(key);
808            }
809        }
810
811        // Collect from immutable memtable
812        {
813            let immutable = self.immutable_memtable.read();
814            if let Some(ref memtable) = *immutable {
815                for (key, value_opt) in memtable.entries() {
816                    if value_opt.is_some() {
817                        key_set.insert(key);
818                    }
819                }
820            }
821        }
822
823        // Collect from all levels
824        let levels = self.levels.read();
825        for level_info in levels.iter() {
826            for metadata in &level_info.sstables {
827                let reader = SSTableReader::open(&metadata.path)?;
828                let entries = reader.iter()?;
829                for (key, _) in entries {
830                    key_set.insert(key);
831                }
832            }
833        }
834
835        Ok(key_set.into_iter().collect())
836    }
837
838    /// Flush all pending writes to disk
839    pub fn flush(&self) -> Result<()> {
840        let _span = tracing::debug_span!("amaters.storage.flush_explicit").entered();
841        // Flush memtable if it has data
842        if self.memtable.size_bytes() > 0 {
843            self.try_flush_memtable()?;
844        }
845
846        // Flush immutable memtable if exists
847        self.flush_immutable_memtable()?;
848
849        // Flush WAL
850        {
851            let mut wal = self.wal.write();
852            wal.flush()?;
853        }
854
855        // Flush value log if configured
856        if let Some(ref vlog) = self.value_log {
857            vlog.flush()?;
858        }
859
860        Ok(())
861    }
862
863    /// Close the LSM-Tree gracefully
864    pub fn close(&self) -> Result<()> {
865        // Flush all pending writes
866        // File handles will be closed when the structures are dropped
867        self.flush()?;
868        Ok(())
869    }
870
871    // ===== ValuePointer encoding/decoding helpers =====
872
873    /// Encode a ValuePointer as a CipherBlob with "VPTR" magic prefix
874    fn encode_value_pointer(pointer: &ValuePointer) -> CipherBlob {
875        const MAGIC: &[u8] = b"VPTR"; // 4 bytes
876        let pointer_bytes = pointer.encode();
877
878        let mut bytes = Vec::with_capacity(MAGIC.len() + pointer_bytes.len());
879        bytes.extend_from_slice(MAGIC);
880        bytes.extend_from_slice(&pointer_bytes);
881
882        CipherBlob::new(bytes)
883    }
884
885    /// Decode a CipherBlob back to a ValuePointer
886    fn decode_value_pointer(blob: &CipherBlob) -> Result<ValuePointer> {
887        const MAGIC: &[u8] = b"VPTR";
888        let bytes = blob.as_bytes();
889
890        if bytes.len() < MAGIC.len() {
891            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
892                "Invalid value pointer: too short".to_string(),
893            )));
894        }
895
896        if &bytes[..MAGIC.len()] != MAGIC {
897            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
898                "Invalid value pointer: bad magic".to_string(),
899            )));
900        }
901
902        ValuePointer::decode(&bytes[MAGIC.len()..])
903    }
904
905    /// Check if a CipherBlob contains a ValuePointer
906    fn is_value_pointer(blob: &CipherBlob) -> bool {
907        const MAGIC: &[u8] = b"VPTR";
908        let bytes = blob.as_bytes();
909        bytes.len() >= MAGIC.len() && &bytes[..MAGIC.len()] == MAGIC
910    }
911}
912
913/// LSM-Tree statistics
914#[derive(Debug, Clone)]
915pub struct LsmTreeStats {
916    /// Current memtable size in bytes
917    pub memtable_size: usize,
918    /// Number of levels
919    pub num_levels: usize,
920    /// Level information
921    pub levels: Vec<LevelInfo>,
922    /// Block cache hit rate
923    pub cache_hit_rate: f64,
924    /// Block cache size in bytes
925    pub cache_size: usize,
926    /// Compaction statistics
927    pub compaction_stats: crate::storage::CompactionStatsSnapshot,
928}
929
930#[cfg(test)]
931mod tests {
932    use super::*;
933    use std::env;
934
935    #[test]
936    fn test_lsm_tree_basic_operations() -> Result<()> {
937        let dir = env::temp_dir().join("test_lsm_basic");
938        std::fs::create_dir_all(&dir).ok();
939
940        let lsm = LsmTree::new(&dir)?;
941
942        // Put
943        let key = Key::from_str("test_key");
944        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
945        lsm.put(key.clone(), value.clone())?;
946
947        // Get
948        let retrieved = lsm.get(&key)?;
949        assert!(retrieved.is_some());
950        assert_eq!(
951            retrieved
952                .expect("Value should be retrievable after put")
953                .as_bytes(),
954            &[1, 2, 3, 4, 5]
955        );
956
957        // Delete
958        lsm.delete(key.clone())?;
959
960        // Verify deleted
961        let retrieved = lsm.get(&key)?;
962        assert!(retrieved.is_none());
963
964        std::fs::remove_dir_all(&dir).ok();
965        Ok(())
966    }
967
968    #[test]
969    fn test_lsm_tree_multiple_keys() -> Result<()> {
970        let dir = env::temp_dir().join("test_lsm_multiple");
971        std::fs::create_dir_all(&dir).ok();
972
973        let lsm = LsmTree::new(&dir)?;
974
975        // Write multiple keys
976        for i in 0..10 {
977            let key = Key::from_str(&format!("key_{:03}", i));
978            let value = CipherBlob::new(vec![i as u8; 100]);
979            lsm.put(key, value)?;
980        }
981
982        // Read back
983        for i in 0..10 {
984            let key = Key::from_str(&format!("key_{:03}", i));
985            let value = lsm.get(&key)?;
986            assert!(value.is_some());
987            assert_eq!(value.expect("Value should exist").as_bytes()[0], i as u8);
988        }
989
990        std::fs::remove_dir_all(&dir).ok();
991        Ok(())
992    }
993
994    #[test]
995    fn test_lsm_tree_range_scan() -> Result<()> {
996        let dir = env::temp_dir().join("test_lsm_range");
997        std::fs::create_dir_all(&dir).ok();
998
999        let lsm = LsmTree::new(&dir)?;
1000
1001        // Write keys
1002        for i in 0..20 {
1003            let key = Key::from_str(&format!("key_{:03}", i));
1004            let value = CipherBlob::new(vec![i as u8; 50]);
1005            lsm.put(key, value)?;
1006        }
1007
1008        // Range scan
1009        let start = Key::from_str("key_005");
1010        let end = Key::from_str("key_015");
1011        let results = lsm.range(&start, &end)?;
1012
1013        assert!(results.len() >= 10);
1014
1015        std::fs::remove_dir_all(&dir).ok();
1016        Ok(())
1017    }
1018
1019    #[test]
1020    fn test_lsm_tree_stats() -> Result<()> {
1021        let dir = env::temp_dir().join("test_lsm_stats");
1022        std::fs::create_dir_all(&dir).ok();
1023
1024        let lsm = LsmTree::new(&dir)?;
1025
1026        // Write some data
1027        for i in 0..5 {
1028            let key = Key::from_str(&format!("key_{}", i));
1029            let value = CipherBlob::new(vec![i as u8; 100]);
1030            lsm.put(key, value)?;
1031        }
1032
1033        let stats = lsm.stats();
1034        assert!(stats.memtable_size > 0);
1035        assert_eq!(stats.num_levels, 7); // Default max levels
1036
1037        std::fs::remove_dir_all(&dir).ok();
1038        Ok(())
1039    }
1040
1041    #[test]
1042    fn test_lsm_tree_compaction_trigger() -> Result<()> {
1043        let dir = env::temp_dir().join("test_lsm_compaction");
1044        std::fs::create_dir_all(&dir).ok();
1045
1046        let mut config = LsmTreeConfig {
1047            data_dir: dir.clone(),
1048            ..Default::default()
1049        };
1050        config.memtable_config.max_size_bytes = 1024; // Small memtable to trigger flushes
1051        config.l0_compaction_threshold = 2; // Low threshold for testing
1052
1053        let lsm = LsmTree::with_config(config)?;
1054
1055        // Write enough data to trigger multiple flushes and compaction
1056        for i in 0..100 {
1057            let key = Key::from_str(&format!("key_{:04}", i));
1058            let value = CipherBlob::new(vec![i as u8; 200]);
1059            lsm.put(key, value)?;
1060        }
1061
1062        // Check stats
1063        let stats = lsm.stats();
1064        assert!(
1065            stats.compaction_stats.compactions_completed > 0
1066                || !stats.levels[0].sstables.is_empty()
1067        );
1068
1069        std::fs::remove_dir_all(&dir).ok();
1070        Ok(())
1071    }
1072
1073    #[test]
1074    fn test_lsm_tree_compaction_stats() -> Result<()> {
1075        let dir = env::temp_dir().join("test_lsm_compaction_stats");
1076        std::fs::create_dir_all(&dir).ok();
1077
1078        let mut config = LsmTreeConfig {
1079            data_dir: dir.clone(),
1080            ..Default::default()
1081        };
1082        config.memtable_config.max_size_bytes = 512;
1083
1084        let lsm = LsmTree::with_config(config)?;
1085
1086        // Write data
1087        for i in 0..50 {
1088            let key = Key::from_str(&format!("key_{:04}", i));
1089            let value = CipherBlob::new(vec![i as u8; 100]);
1090            lsm.put(key, value)?;
1091        }
1092
1093        let stats = lsm.stats();
1094        // Compaction may have occurred due to small memtable size
1095        // Just verify stats structure is available (stats are unsigned, so always >= 0)
1096        let _ = stats.compaction_stats.keys_processed;
1097        let _ = stats.compaction_stats.tombstones_removed;
1098
1099        std::fs::remove_dir_all(&dir).ok();
1100        Ok(())
1101    }
1102
1103    #[test]
1104    fn test_lsm_tree_level_organization() -> Result<()> {
1105        let dir = env::temp_dir().join("test_lsm_levels");
1106        std::fs::create_dir_all(&dir).ok();
1107
1108        let mut config = LsmTreeConfig {
1109            data_dir: dir.clone(),
1110            ..Default::default()
1111        };
1112        config.memtable_config.max_size_bytes = 1024;
1113
1114        let lsm = LsmTree::with_config(config)?;
1115
1116        // Write data to populate levels
1117        for i in 0..200 {
1118            let key = Key::from_str(&format!("key_{:05}", i));
1119            let value = CipherBlob::new(vec![i as u8; 150]);
1120            lsm.put(key, value)?;
1121        }
1122
1123        // Verify all data is still readable (regardless of which level)
1124        for i in 0..200 {
1125            let key = Key::from_str(&format!("key_{:05}", i));
1126            let value = lsm.get(&key)?;
1127            assert!(value.is_some());
1128        }
1129
1130        // Check that data exists somewhere in the levels
1131        let stats = lsm.stats();
1132        let total_sstables: usize = stats.levels.iter().map(|l| l.sstables.len()).sum();
1133        assert!(total_sstables > 0 || stats.memtable_size > 0);
1134
1135        std::fs::remove_dir_all(&dir).ok();
1136        Ok(())
1137    }
1138
1139    #[test]
1140    fn test_lsm_tree_bloom_filter_negative_lookups() -> Result<()> {
1141        let dir = env::temp_dir().join("test_lsm_bloom");
1142        std::fs::create_dir_all(&dir).ok();
1143
1144        let mut config = LsmTreeConfig {
1145            data_dir: dir.clone(),
1146            ..Default::default()
1147        };
1148        config.memtable_config.max_size_bytes = 512; // Small to trigger flushes
1149
1150        let lsm = LsmTree::with_config(config)?;
1151
1152        // Write keys that will be flushed to SSTables
1153        for i in 0..100 {
1154            let key = Key::from_str(&format!("exists_{:04}", i));
1155            let value = CipherBlob::new(vec![i as u8; 100]);
1156            lsm.put(key, value)?;
1157        }
1158
1159        // Query for keys that exist (should find them)
1160        for i in 0..100 {
1161            let key = Key::from_str(&format!("exists_{:04}", i));
1162            let result = lsm.get(&key)?;
1163            assert!(result.is_some());
1164        }
1165
1166        // Query for keys that don't exist (bloom filter should help avoid disk reads)
1167        for i in 0..100 {
1168            let key = Key::from_str(&format!("notexists_{:04}", i));
1169            let result = lsm.get(&key)?;
1170            assert!(result.is_none());
1171        }
1172
1173        std::fs::remove_dir_all(&dir).ok();
1174        Ok(())
1175    }
1176
1177    // ===== WiscKey Value Separation Tests =====
1178
1179    #[test]
1180    fn test_lsm_tree_vlog_basic() -> Result<()> {
1181        let dir = env::temp_dir().join("test_lsm_vlog_basic");
1182        std::fs::create_dir_all(&dir).ok();
1183
1184        let config = LsmTreeConfig {
1185            data_dir: dir.clone(),
1186            wal_dir: dir.join("wal"),
1187            value_log_config: Some(ValueLogConfig {
1188                vlog_dir: dir.join("vlog"),
1189                max_file_size: 1024 * 1024, // 1MB
1190                value_threshold: 1024,      // 1KB
1191                sync_on_write: false,
1192                gc_threshold: 0.5,
1193            }),
1194            ..Default::default()
1195        };
1196
1197        let lsm = LsmTree::with_config(config)?;
1198
1199        // Small value (< 1KB) - stored inline
1200        let small_key = Key::from_str("small_key");
1201        let small_value = CipherBlob::new(vec![1u8; 512]);
1202        lsm.put(small_key.clone(), small_value.clone())?;
1203
1204        // Large value (> 1KB) - stored in vLog
1205        let large_key = Key::from_str("large_key");
1206        let large_value = CipherBlob::new(vec![2u8; 2048]);
1207        lsm.put(large_key.clone(), large_value.clone())?;
1208
1209        // Verify both values can be retrieved
1210        let retrieved_small = lsm.get(&small_key)?;
1211        assert!(retrieved_small.is_some());
1212        assert_eq!(
1213            retrieved_small
1214                .expect("Small value should be retrievable")
1215                .as_bytes(),
1216            &vec![1u8; 512]
1217        );
1218
1219        let retrieved_large = lsm.get(&large_key)?;
1220        assert!(retrieved_large.is_some());
1221        assert_eq!(
1222            retrieved_large
1223                .expect("Large value should be retrievable")
1224                .as_bytes(),
1225            &vec![2u8; 2048]
1226        );
1227
1228        std::fs::remove_dir_all(&dir).ok();
1229        Ok(())
1230    }
1231
1232    #[test]
1233    fn test_lsm_tree_vlog_multiple_large_values() -> Result<()> {
1234        let dir = env::temp_dir().join("test_lsm_vlog_multiple");
1235        std::fs::create_dir_all(&dir).ok();
1236
1237        let config = LsmTreeConfig {
1238            data_dir: dir.clone(),
1239            wal_dir: dir.join("wal"),
1240            value_log_config: Some(ValueLogConfig {
1241                vlog_dir: dir.join("vlog"),
1242                max_file_size: 1024 * 1024,
1243                value_threshold: 1024,
1244                sync_on_write: false,
1245                gc_threshold: 0.5,
1246            }),
1247            ..Default::default()
1248        };
1249
1250        let lsm = LsmTree::with_config(config)?;
1251
1252        // Write 20 large values
1253        for i in 0..20 {
1254            let key = Key::from_str(&format!("large_key_{:02}", i));
1255            let value = CipherBlob::new(vec![i as u8; 2048]);
1256            lsm.put(key, value)?;
1257        }
1258
1259        // Read back and verify
1260        for i in 0..20 {
1261            let key = Key::from_str(&format!("large_key_{:02}", i));
1262            let value = lsm.get(&key)?;
1263            assert!(value.is_some());
1264            let retrieved = value.expect("Value should exist");
1265            assert_eq!(retrieved.as_bytes()[0], i as u8);
1266            assert_eq!(retrieved.as_bytes().len(), 2048);
1267        }
1268
1269        std::fs::remove_dir_all(&dir).ok();
1270        Ok(())
1271    }
1272
1273    #[test]
1274    fn test_lsm_tree_vlog_with_flush() -> Result<()> {
1275        let dir = env::temp_dir().join("test_lsm_vlog_flush");
1276        std::fs::create_dir_all(&dir).ok();
1277
1278        let mut config = LsmTreeConfig {
1279            data_dir: dir.clone(),
1280            wal_dir: dir.join("wal"),
1281            value_log_config: Some(ValueLogConfig {
1282                vlog_dir: dir.join("vlog"),
1283                max_file_size: 1024 * 1024,
1284                value_threshold: 1024,
1285                sync_on_write: false,
1286                gc_threshold: 0.5,
1287            }),
1288            ..Default::default()
1289        };
1290        config.memtable_config.max_size_bytes = 4096; // Small memtable to trigger flushes
1291
1292        let lsm = LsmTree::with_config(config)?;
1293
1294        // Write enough data to trigger memtable flush
1295        for i in 0..50 {
1296            let key = Key::from_str(&format!("key_{:03}", i));
1297            let value = CipherBlob::new(vec![i as u8; 1500]); // > 1KB, stored in vLog
1298            lsm.put(key, value)?;
1299        }
1300
1301        // Verify all data is readable after flush
1302        for i in 0..50 {
1303            let key = Key::from_str(&format!("key_{:03}", i));
1304            let value = lsm.get(&key)?;
1305            assert!(value.is_some());
1306            let retrieved = value.expect("Value should exist");
1307            assert_eq!(retrieved.as_bytes()[0], i as u8);
1308        }
1309
1310        std::fs::remove_dir_all(&dir).ok();
1311        Ok(())
1312    }
1313
1314    #[test]
1315    fn test_lsm_tree_vlog_disabled() -> Result<()> {
1316        let dir = env::temp_dir().join("test_lsm_vlog_disabled");
1317        std::fs::create_dir_all(&dir).ok();
1318
1319        let config = LsmTreeConfig {
1320            data_dir: dir.clone(),
1321            value_log_config: None, // vLog disabled
1322            ..Default::default()
1323        };
1324
1325        let lsm = LsmTree::with_config(config)?;
1326
1327        // Large value should still work (stored inline)
1328        let key = Key::from_str("large_key");
1329        let value = CipherBlob::new(vec![42u8; 5000]);
1330        lsm.put(key.clone(), value.clone())?;
1331
1332        let retrieved = lsm.get(&key)?;
1333        assert!(retrieved.is_some());
1334        assert_eq!(
1335            retrieved
1336                .expect("Value should be retrievable after put")
1337                .as_bytes(),
1338            &vec![42u8; 5000]
1339        );
1340
1341        std::fs::remove_dir_all(&dir).ok();
1342        Ok(())
1343    }
1344
1345    #[test]
1346    fn test_lsm_tree_vlog_update() -> Result<()> {
1347        let dir = env::temp_dir().join("test_lsm_vlog_update");
1348        std::fs::create_dir_all(&dir).ok();
1349
1350        let config = LsmTreeConfig {
1351            data_dir: dir.clone(),
1352            wal_dir: dir.join("wal"),
1353            value_log_config: Some(ValueLogConfig {
1354                vlog_dir: dir.join("vlog"),
1355                max_file_size: 1024 * 1024,
1356                value_threshold: 1024,
1357                sync_on_write: false,
1358                gc_threshold: 0.5,
1359            }),
1360            ..Default::default()
1361        };
1362
1363        let lsm = LsmTree::with_config(config)?;
1364
1365        let key = Key::from_str("update_key");
1366
1367        // Write initial large value
1368        let value1 = CipherBlob::new(vec![1u8; 2048]);
1369        lsm.put(key.clone(), value1)?;
1370
1371        // Update with new large value
1372        let value2 = CipherBlob::new(vec![2u8; 2048]);
1373        lsm.put(key.clone(), value2)?;
1374
1375        // Verify latest value is retrieved
1376        let retrieved = lsm.get(&key)?;
1377        assert!(retrieved.is_some());
1378        assert_eq!(
1379            retrieved
1380                .expect("Value should be retrievable after put")
1381                .as_bytes()[0],
1382            2u8
1383        );
1384
1385        std::fs::remove_dir_all(&dir).ok();
1386        Ok(())
1387    }
1388
1389    #[test]
1390    fn test_lsm_tree_vlog_delete() -> Result<()> {
1391        let dir = env::temp_dir().join("test_lsm_vlog_delete");
1392        std::fs::create_dir_all(&dir).ok();
1393
1394        let config = LsmTreeConfig {
1395            data_dir: dir.clone(),
1396            wal_dir: dir.join("wal"),
1397            value_log_config: Some(ValueLogConfig {
1398                vlog_dir: dir.join("vlog"),
1399                max_file_size: 1024 * 1024,
1400                value_threshold: 1024,
1401                sync_on_write: false,
1402                gc_threshold: 0.5,
1403            }),
1404            ..Default::default()
1405        };
1406
1407        let lsm = LsmTree::with_config(config)?;
1408
1409        let key = Key::from_str("delete_key");
1410
1411        // Write large value
1412        let value = CipherBlob::new(vec![42u8; 2048]);
1413        lsm.put(key.clone(), value)?;
1414
1415        // Verify it exists
1416        assert!(lsm.get(&key)?.is_some());
1417
1418        // Delete it
1419        lsm.delete(key.clone())?;
1420
1421        // Verify it's gone
1422        assert!(lsm.get(&key)?.is_none());
1423
1424        std::fs::remove_dir_all(&dir).ok();
1425        Ok(())
1426    }
1427
1428    #[test]
1429    fn test_lsm_tree_value_pointer_encoding() -> Result<()> {
1430        // Test ValuePointer encoding/decoding
1431        let pointer = ValuePointer {
1432            file_id: 123,
1433            offset: 456789,
1434            length: 2048,
1435            checksum: 0xDEADBEEF,
1436        };
1437
1438        // Encode
1439        let encoded = LsmTree::encode_value_pointer(&pointer);
1440
1441        // Check it's marked as a pointer
1442        assert!(LsmTree::is_value_pointer(&encoded));
1443
1444        // Decode
1445        let decoded = LsmTree::decode_value_pointer(&encoded)?;
1446
1447        // Verify
1448        assert_eq!(decoded.file_id, 123);
1449        assert_eq!(decoded.offset, 456789);
1450        assert_eq!(decoded.length, 2048);
1451        assert_eq!(decoded.checksum, 0xDEADBEEF);
1452
1453        // Test non-pointer value
1454        let regular_value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
1455        assert!(!LsmTree::is_value_pointer(&regular_value));
1456
1457        Ok(())
1458    }
1459
1460    #[test]
1461    fn test_prefetch_config_default() {
1462        let cfg = PrefetchConfig::default();
1463        assert_eq!(cfg.read_ahead_blocks, 4);
1464        assert!(!cfg.use_madvise);
1465    }
1466
1467    #[test]
1468    fn test_lsm_tree_with_prefetch_config() {
1469        let dir = env::temp_dir().join("test_lsm_prefetch_config");
1470        std::fs::create_dir_all(&dir).ok();
1471
1472        let lsm = LsmTree::new(&dir)
1473            .expect("LsmTree::new failed")
1474            .with_prefetch(PrefetchConfig {
1475                read_ahead_blocks: 8,
1476                use_madvise: false,
1477            });
1478
1479        // Verify config was stored without panic
1480        assert_eq!(lsm.prefetch_config.read_ahead_blocks, 8);
1481        assert!(!lsm.prefetch_config.use_madvise);
1482
1483        std::fs::remove_dir_all(&dir).ok();
1484    }
1485}