Skip to main content

amaters_core/storage/
lsm_tree.rs

1//! LSM-Tree (Log-Structured Merge-Tree) implementation
2//!
3//! Multi-level persistent storage engine integrating:
4//! - Memtable (in-memory writes)
5//! - WAL (durability)
6//! - SSTables (persistent storage)
7//! - Block Cache (read optimization)
8
9use crate::error::{AmateRSError, ErrorContext, Result};
10use crate::storage::{
11    BlockCache, BlockCacheConfig, BlockCacheKey, CachedBlock, CompactionConfig, CompactionExecutor,
12    CompactionPlanner, Memtable, MemtableConfig, SSTableConfig, SSTableReader, SSTableWriter,
13    ValueLog, ValueLogConfig, ValuePointer, Wal,
14};
15use crate::types::{CipherBlob, Key};
16use parking_lot::RwLock;
17use std::collections::BTreeMap;
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20
21/// SSTable metadata
22#[derive(Debug, Clone)]
23pub struct SSTableMetadata {
24    /// SSTable file path
25    pub path: PathBuf,
26    /// Minimum key in the SSTable
27    pub min_key: Key,
28    /// Maximum key in the SSTable
29    pub max_key: Key,
30    /// Number of entries
31    pub num_entries: usize,
32    /// File size in bytes
33    pub file_size: u64,
34    /// Level in the LSM-Tree
35    pub level: usize,
36}
37
38/// Level information
39#[derive(Debug, Clone)]
40pub struct LevelInfo {
41    /// Level number (0 = L0, 1 = L1, etc.)
42    pub level: usize,
43    /// SSTables in this level
44    pub sstables: Vec<SSTableMetadata>,
45    /// Total size in bytes
46    pub total_size: u64,
47}
48
49impl LevelInfo {
50    fn new(level: usize) -> Self {
51        Self {
52            level,
53            sstables: Vec::new(),
54            total_size: 0,
55        }
56    }
57
58    fn add_sstable(&mut self, metadata: SSTableMetadata) {
59        self.total_size += metadata.file_size;
60        self.sstables.push(metadata);
61    }
62}
63
64/// LSM-Tree configuration
65#[derive(Debug, Clone)]
66pub struct LsmTreeConfig {
67    /// Directory for storing SSTables
68    pub data_dir: PathBuf,
69    /// WAL directory
70    pub wal_dir: PathBuf,
71    /// Memtable configuration
72    pub memtable_config: MemtableConfig,
73    /// SSTable configuration
74    pub sstable_config: SSTableConfig,
75    /// Block cache configuration
76    pub block_cache_config: BlockCacheConfig,
77    /// Compaction configuration
78    pub compaction_config: CompactionConfig,
79    /// Value log configuration (optional, for WiscKey value separation)
80    pub value_log_config: Option<ValueLogConfig>,
81    /// Maximum number of levels (default: 7)
82    pub max_levels: usize,
83    /// L0 size threshold for compaction (default: 4 SSTables)
84    pub l0_compaction_threshold: usize,
85    /// Level size multiplier (default: 10x per level)
86    pub level_size_multiplier: usize,
87}
88
89impl Default for LsmTreeConfig {
90    fn default() -> Self {
91        Self {
92            data_dir: PathBuf::from("./data"),
93            wal_dir: PathBuf::from("./wal"),
94            memtable_config: MemtableConfig::default(),
95            sstable_config: SSTableConfig::default(),
96            block_cache_config: BlockCacheConfig::default(),
97            compaction_config: CompactionConfig::default(),
98            value_log_config: None, // Disabled by default for backward compatibility
99            max_levels: 7,
100            l0_compaction_threshold: 4,
101            level_size_multiplier: 10,
102        }
103    }
104}
105
106/// LSM-Tree storage engine
107pub struct LsmTree {
108    /// Configuration
109    config: LsmTreeConfig,
110    /// Current memtable (active writes)
111    memtable: Arc<Memtable>,
112    /// Immutable memtable being flushed (if any)
113    immutable_memtable: Arc<RwLock<Option<Arc<Memtable>>>>,
114    /// Write-ahead log
115    wal: Arc<RwLock<Wal>>,
116    /// Value log for large values (WiscKey)
117    value_log: Option<Arc<ValueLog>>,
118    /// Levels (L0, L1, L2, ...)
119    levels: Arc<RwLock<Vec<LevelInfo>>>,
120    /// Block cache for SSTable blocks
121    block_cache: Arc<BlockCache>,
122    /// Next SSTable ID
123    next_sstable_id: Arc<RwLock<u64>>,
124    /// Compaction planner
125    compaction_planner: CompactionPlanner,
126    /// Compaction executor
127    compaction_executor: Arc<RwLock<CompactionExecutor>>,
128}
129
130impl LsmTree {
131    /// Create a new LSM-Tree with default configuration
132    pub fn new<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
133        let config = LsmTreeConfig {
134            data_dir: data_dir.as_ref().to_path_buf(),
135            ..Default::default()
136        };
137        Self::with_config(config)
138    }
139
140    /// Create a new LSM-Tree with custom configuration
141    pub fn with_config(config: LsmTreeConfig) -> Result<Self> {
142        // Create directories
143        std::fs::create_dir_all(&config.data_dir).map_err(|e| {
144            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
145                "Failed to create data directory: {}",
146                e
147            )))
148        })?;
149
150        std::fs::create_dir_all(&config.wal_dir).map_err(|e| {
151            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
152                "Failed to create WAL directory: {}",
153                e
154            )))
155        })?;
156
157        // Initialize WAL
158        let wal_path = config.wal_dir.join("wal.log");
159        let wal = Wal::create(wal_path)?;
160
161        // Initialize memtable
162        let memtable = Memtable::with_config(config.memtable_config.clone());
163
164        // Initialize levels
165        let mut levels = Vec::with_capacity(config.max_levels);
166        for i in 0..config.max_levels {
167            levels.push(LevelInfo::new(i));
168        }
169
170        // Initialize block cache
171        let block_cache = BlockCache::with_config(config.block_cache_config.clone());
172
173        // Initialize compaction
174        let compaction_planner = CompactionPlanner::new(config.compaction_config.clone());
175        let compaction_executor = CompactionExecutor::new(config.sstable_config.clone());
176
177        // Initialize value log if configured
178        let value_log = if let Some(ref vlog_config) = config.value_log_config {
179            Some(Arc::new(ValueLog::with_config(vlog_config.clone())?))
180        } else {
181            None
182        };
183
184        let mut lsm = Self {
185            config,
186            memtable: Arc::new(memtable),
187            immutable_memtable: Arc::new(RwLock::new(None)),
188            wal: Arc::new(RwLock::new(wal)),
189            value_log,
190            levels: Arc::new(RwLock::new(levels)),
191            block_cache: Arc::new(block_cache),
192            next_sstable_id: Arc::new(RwLock::new(0)),
193            compaction_planner,
194            compaction_executor: Arc::new(RwLock::new(compaction_executor)),
195        };
196
197        // Recover existing SSTables from disk
198        lsm.recover_sstables()?;
199
200        Ok(lsm)
201    }
202
203    /// Recover existing SSTables from the data directory
204    fn recover_sstables(&mut self) -> Result<()> {
205        use std::fs;
206
207        // Scan data directory for SSTable files
208        let entries = fs::read_dir(&self.config.data_dir).map_err(|e| {
209            AmateRSError::IoError(ErrorContext::new(format!(
210                "Failed to read data directory: {}",
211                e
212            )))
213        })?;
214
215        let mut sstables_by_level: BTreeMap<usize, Vec<SSTableMetadata>> = BTreeMap::new();
216        let mut max_id = 0u64;
217
218        for entry in entries {
219            let entry = entry.map_err(|e| {
220                AmateRSError::IoError(ErrorContext::new(format!(
221                    "Failed to read directory entry: {}",
222                    e
223                )))
224            })?;
225
226            let path = entry.path();
227            let filename = match path.file_name().and_then(|n| n.to_str()) {
228                Some(name) => name,
229                None => continue,
230            };
231
232            // Parse filename format: L{level}_{id}.sst
233            if filename.starts_with('L') && filename.ends_with(".sst") {
234                let parts: Vec<&str> = filename[1..].trim_end_matches(".sst").split('_').collect();
235                if parts.len() == 2 {
236                    if let (Ok(level), Ok(id)) =
237                        (parts[0].parse::<usize>(), parts[1].parse::<u64>())
238                    {
239                        // Update max ID
240                        if id > max_id {
241                            max_id = id;
242                        }
243
244                        // Read SSTable metadata
245                        let reader = SSTableReader::open(&path)?;
246                        let (min_key, max_key, num_entries) = reader.metadata()?;
247
248                        let file_size = std::fs::metadata(&path)
249                            .map_err(|e| {
250                                AmateRSError::IoError(ErrorContext::new(format!(
251                                    "Failed to get file size: {}",
252                                    e
253                                )))
254                            })?
255                            .len();
256
257                        let metadata = SSTableMetadata {
258                            path: path.clone(),
259                            min_key,
260                            max_key,
261                            num_entries,
262                            file_size,
263                            level,
264                        };
265
266                        sstables_by_level.entry(level).or_default().push(metadata);
267                    }
268                }
269            }
270        }
271
272        // Add recovered SSTables to levels
273        let mut levels = self.levels.write();
274        for (level, mut sstables) in sstables_by_level {
275            if level < levels.len() {
276                // Sort SSTables by min_key for non-L0 levels
277                if level > 0 {
278                    sstables.sort_by(|a, b| a.min_key.cmp(&b.min_key));
279                }
280
281                for metadata in sstables {
282                    levels[level].add_sstable(metadata);
283                }
284            }
285        }
286        drop(levels);
287
288        // Set next SSTable ID
289        *self.next_sstable_id.write() = max_id + 1;
290
291        Ok(())
292    }
293
294    /// Put a key-value pair
295    pub fn put(&self, key: Key, value: CipherBlob) -> Result<()> {
296        // Check if value should be separated to vLog
297        let stored_value = if let Some(ref vlog) = self.value_log {
298            if vlog.should_separate(&value) {
299                // Store in vLog and get pointer
300                let pointer = vlog.append(key.clone(), value)?;
301                vlog.flush()?;
302
303                // Encode pointer as CipherBlob with "VPTR" magic prefix
304                Self::encode_value_pointer(&pointer)
305            } else {
306                // Store value directly
307                value
308            }
309        } else {
310            // No vLog configured, store value directly
311            value
312        };
313
314        // Write to WAL first (durability)
315        {
316            let mut wal = self.wal.write();
317            wal.put(key.clone(), stored_value.clone())?;
318        }
319
320        // Write to memtable
321        self.memtable.put(key, stored_value)?;
322
323        // Check if memtable should be flushed
324        if self.memtable.should_flush() {
325            self.try_flush_memtable()?;
326        }
327
328        Ok(())
329    }
330
331    /// Get a value by key
332    pub fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
333        // Check memtable first (most recent data)
334        if let Some(value) = self.memtable.get(key)? {
335            return self.resolve_value(value);
336        }
337
338        // Check immutable memtable if exists
339        {
340            let immutable = self.immutable_memtable.read();
341            if let Some(ref memtable) = *immutable {
342                if let Some(value) = memtable.get(key)? {
343                    return self.resolve_value(value);
344                }
345            }
346        }
347
348        // Search through levels (L0 to Ln)
349        let levels = self.levels.read();
350        for level_info in levels.iter() {
351            if let Some(value) = self.search_level(level_info, key)? {
352                return self.resolve_value(value);
353            }
354        }
355
356        Ok(None)
357    }
358
359    /// Resolve a value: if it's a ValuePointer, read from vLog; otherwise return as-is
360    fn resolve_value(&self, value: CipherBlob) -> Result<Option<CipherBlob>> {
361        // Check for tombstone (zero-length blob)
362        if value.as_bytes().is_empty() {
363            return Ok(None);
364        }
365
366        if Self::is_value_pointer(&value) {
367            if let Some(ref vlog) = self.value_log {
368                let pointer = Self::decode_value_pointer(&value)?;
369                let actual_value = vlog.read(&pointer)?;
370                Ok(Some(actual_value))
371            } else {
372                Err(AmateRSError::StorageIntegrity(ErrorContext::new(
373                    "Found value pointer but vLog is not configured".to_string(),
374                )))
375            }
376        } else {
377            Ok(Some(value))
378        }
379    }
380
381    /// Delete a key
382    pub fn delete(&self, key: Key) -> Result<()> {
383        // Write tombstone to WAL
384        {
385            let mut wal = self.wal.write();
386            wal.delete(key.clone())?;
387        }
388
389        // Write tombstone to memtable
390        self.memtable.delete(key)?;
391
392        // Check if memtable should be flushed
393        if self.memtable.should_flush() {
394            self.try_flush_memtable()?;
395        }
396
397        Ok(())
398    }
399
400    /// Range scan
401    pub fn range(&self, start: &Key, end: &Key) -> Result<Vec<(Key, CipherBlob)>> {
402        let mut results = BTreeMap::new();
403
404        // Collect from all levels (newer data overwrites older)
405        let levels = self.levels.read();
406        for level_info in levels.iter().rev() {
407            let level_results = self.range_scan_level(level_info, start, end)?;
408            for (k, v) in level_results {
409                results.entry(k).or_insert(v);
410            }
411        }
412
413        // Check immutable memtable
414        {
415            let immutable = self.immutable_memtable.read();
416            if let Some(ref memtable) = *immutable {
417                for (k, v) in memtable.range(start, end) {
418                    results.insert(k, v);
419                }
420            }
421        }
422
423        // Check memtable (most recent)
424        for (k, v) in self.memtable.range(start, end) {
425            results.insert(k, v);
426        }
427
428        Ok(results.into_iter().collect())
429    }
430
431    /// Search a specific level for a key
432    fn search_level(&self, level_info: &LevelInfo, key: &Key) -> Result<Option<CipherBlob>> {
433        // For L0, check all SSTables (may have overlapping ranges)
434        if level_info.level == 0 {
435            // Check newest first
436            for metadata in level_info.sstables.iter().rev() {
437                if key >= &metadata.min_key && key <= &metadata.max_key {
438                    if let Some(value) = self.read_from_sstable(&metadata.path, key)? {
439                        return Ok(Some(value));
440                    }
441                }
442            }
443        } else {
444            // For L1+, SSTables are non-overlapping, use binary search
445            let idx = level_info.sstables.binary_search_by(|metadata| {
446                if key < &metadata.min_key {
447                    std::cmp::Ordering::Greater
448                } else if key > &metadata.max_key {
449                    std::cmp::Ordering::Less
450                } else {
451                    std::cmp::Ordering::Equal
452                }
453            });
454
455            if let Ok(idx) = idx {
456                let metadata = &level_info.sstables[idx];
457                if let Some(value) = self.read_from_sstable(&metadata.path, key)? {
458                    return Ok(Some(value));
459                }
460            }
461        }
462
463        Ok(None)
464    }
465
466    /// Range scan a specific level
467    fn range_scan_level(
468        &self,
469        level_info: &LevelInfo,
470        start: &Key,
471        end: &Key,
472    ) -> Result<Vec<(Key, CipherBlob)>> {
473        let mut results = Vec::new();
474
475        for metadata in &level_info.sstables {
476            // Skip if SSTable range doesn't overlap with query range
477            if &metadata.max_key < start || &metadata.min_key > end {
478                continue;
479            }
480
481            // Read from SSTable
482            let reader = SSTableReader::open(&metadata.path)?;
483            let entries = reader.iter()?;
484
485            for (k, v) in entries {
486                // Range is [start, end) - start inclusive, end exclusive
487                if &k >= start && &k < end {
488                    results.push((k, v));
489                }
490            }
491        }
492
493        Ok(results)
494    }
495
496    /// Read a key from an SSTable with block cache
497    fn read_from_sstable(&self, path: &Path, key: &Key) -> Result<Option<CipherBlob>> {
498        let reader = SSTableReader::open(path)?;
499        reader.get(key)
500    }
501
502    /// Try to flush memtable to L0
503    fn try_flush_memtable(&self) -> Result<()> {
504        // Check if already flushing
505        {
506            let immutable = self.immutable_memtable.read();
507            if immutable.is_some() {
508                // Already flushing, skip
509                return Ok(());
510            }
511        }
512
513        // Swap memtable to immutable
514        {
515            let mut immutable = self.immutable_memtable.write();
516            if immutable.is_some() {
517                return Ok(());
518            }
519
520            // Swap current memtable with a new one
521            let old_memtable = Arc::clone(&self.memtable);
522            let new_memtable = Memtable::with_config(self.config.memtable_config.clone());
523
524            // Note: In a real implementation, we'd use Arc::make_mut or similar
525            // For now, this is a simplified version
526            *immutable = Some(old_memtable);
527        }
528
529        // Flush immutable memtable to SSTable
530        self.flush_immutable_memtable()?;
531
532        Ok(())
533    }
534
535    /// Flush immutable memtable to L0 SSTable
536    fn flush_immutable_memtable(&self) -> Result<()> {
537        let memtable = {
538            let mut immutable = self.immutable_memtable.write();
539            immutable.take()
540        };
541
542        if let Some(memtable) = memtable {
543            // Generate SSTable path
544            let sstable_id = {
545                let mut next_id = self.next_sstable_id.write();
546                let id = *next_id;
547                *next_id += 1;
548                id
549            };
550
551            let sstable_path = self
552                .config
553                .data_dir
554                .join(format!("L0_{:08}.sst", sstable_id));
555
556            // Write SSTable
557            let mut writer = SSTableWriter::new(&sstable_path, self.config.sstable_config.clone())?;
558
559            let entries = memtable.entries();
560            let mut min_key = None;
561            let mut max_key = None;
562            let mut num_entries = 0;
563
564            for (key, value_opt) in entries {
565                // Write both values and tombstones to SSTable
566                // Tombstones are written as zero-length blobs
567                let value = value_opt.unwrap_or_else(|| CipherBlob::new(Vec::new()));
568
569                if min_key.is_none() {
570                    min_key = Some(key.clone());
571                }
572                max_key = Some(key.clone());
573                writer.add(key, value)?;
574                num_entries += 1;
575            }
576
577            writer.finish()?;
578
579            // Get file size
580            let file_size = std::fs::metadata(&sstable_path)
581                .map_err(|e| {
582                    AmateRSError::StorageIntegrity(ErrorContext::new(format!(
583                        "Failed to get SSTable size: {}",
584                        e
585                    )))
586                })?
587                .len();
588
589            // Add to L0
590            if let (Some(min_key), Some(max_key)) = (min_key, max_key) {
591                let metadata = SSTableMetadata {
592                    path: sstable_path,
593                    min_key,
594                    max_key,
595                    num_entries,
596                    file_size,
597                    level: 0,
598                };
599
600                let mut levels = self.levels.write();
601                levels[0].add_sstable(metadata);
602            }
603
604            // Trigger compaction if L0 threshold exceeded
605            self.trigger_compaction()?;
606        }
607
608        Ok(())
609    }
610
611    /// Trigger compaction if needed
612    fn trigger_compaction(&self) -> Result<()> {
613        let levels = self.levels.read();
614
615        // Check L0 compaction threshold
616        let l0_count = levels[0].sstables.len();
617        if self.compaction_planner.needs_l0_compaction(l0_count) {
618            drop(levels); // Release read lock before compaction
619            return self.compact_l0_to_l1();
620        }
621
622        // Check other levels for size-based compaction
623        for level_info in levels.iter() {
624            if level_info.level > 0
625                && self
626                    .compaction_planner
627                    .needs_level_compaction(level_info.level, level_info.total_size)
628            {
629                let source_level = level_info.level;
630                drop(levels); // Release read lock
631                return self.compact_level(source_level);
632            }
633        }
634
635        Ok(())
636    }
637
638    /// Compact L0 to L1
639    fn compact_l0_to_l1(&self) -> Result<()> {
640        let (source_sstables, target_sstables) = {
641            let levels = self.levels.read();
642            let source = levels[0].sstables.clone();
643            let target = if levels.len() > 1 {
644                levels[1].sstables.clone()
645            } else {
646                Vec::new()
647            };
648            (source, target)
649        };
650
651        if let Some(task) =
652            self.compaction_planner
653                .plan_compaction(0, source_sstables, target_sstables)
654        {
655            self.execute_compaction_task(task)?;
656        }
657
658        Ok(())
659    }
660
661    /// Compact a specific level
662    fn compact_level(&self, source_level: usize) -> Result<()> {
663        let (source_sstables, target_sstables) = {
664            let levels = self.levels.read();
665            if source_level >= levels.len() {
666                return Ok(());
667            }
668
669            let source = levels[source_level].sstables.clone();
670            let target = if source_level + 1 < levels.len() {
671                levels[source_level + 1].sstables.clone()
672            } else {
673                Vec::new()
674            };
675            (source, target)
676        };
677
678        if let Some(task) =
679            self.compaction_planner
680                .plan_compaction(source_level, source_sstables, target_sstables)
681        {
682            self.execute_compaction_task(task)?;
683        }
684
685        Ok(())
686    }
687
688    /// Execute a compaction task
689    fn execute_compaction_task(&self, task: crate::storage::CompactionTask) -> Result<()> {
690        // Execute compaction
691        let output_sstables = {
692            let mut executor = self.compaction_executor.write();
693            let mut next_id = self.next_sstable_id.write();
694            executor.execute_compaction(task.clone(), &self.config.data_dir, &mut next_id)?
695        };
696
697        // Update levels: remove old SSTables, add new ones
698        let mut levels = self.levels.write();
699
700        // Remove source SSTables
701        levels[task.source_level]
702            .sstables
703            .retain(|s| !task.source_sstables.iter().any(|ts| ts.path == s.path));
704        levels[task.source_level].total_size = levels[task.source_level]
705            .sstables
706            .iter()
707            .map(|s| s.file_size)
708            .sum();
709
710        // Remove target SSTables
711        if task.target_level < levels.len() {
712            levels[task.target_level]
713                .sstables
714                .retain(|s| !task.target_sstables.iter().any(|ts| ts.path == s.path));
715            levels[task.target_level].total_size = levels[task.target_level]
716                .sstables
717                .iter()
718                .map(|s| s.file_size)
719                .sum();
720
721            // Add new SSTables
722            for sstable in output_sstables {
723                levels[task.target_level].add_sstable(sstable);
724            }
725        }
726
727        drop(levels);
728
729        // Delete old SSTable files
730        for sstable in task.source_sstables.iter().chain(&task.target_sstables) {
731            std::fs::remove_file(&sstable.path).ok();
732        }
733
734        Ok(())
735    }
736
737    /// Get level information
738    pub fn level_info(&self, level: usize) -> Option<LevelInfo> {
739        let levels = self.levels.read();
740        if level < levels.len() {
741            Some(levels[level].clone())
742        } else {
743            None
744        }
745    }
746
747    /// Get all levels information
748    pub fn all_levels_info(&self) -> Vec<LevelInfo> {
749        self.levels.read().clone()
750    }
751
752    /// Get statistics
753    pub fn stats(&self) -> LsmTreeStats {
754        let levels = self.levels.read();
755        let cache_stats = self.block_cache.stats();
756        let compaction_stats = self.compaction_executor.read().stats_snapshot();
757
758        LsmTreeStats {
759            memtable_size: self.memtable.size_bytes(),
760            num_levels: levels.len(),
761            levels: levels.clone(),
762            cache_hit_rate: cache_stats.hit_rate(),
763            cache_size: cache_stats.size_bytes,
764            compaction_stats,
765        }
766    }
767
768    /// Get all keys from the LSM-Tree
769    pub fn keys(&self) -> Result<Vec<Key>> {
770        let mut key_set = std::collections::BTreeSet::new();
771
772        // Collect from memtable
773        for (key, value_opt) in self.memtable.entries() {
774            if value_opt.is_some() {
775                key_set.insert(key);
776            }
777        }
778
779        // Collect from immutable memtable
780        {
781            let immutable = self.immutable_memtable.read();
782            if let Some(ref memtable) = *immutable {
783                for (key, value_opt) in memtable.entries() {
784                    if value_opt.is_some() {
785                        key_set.insert(key);
786                    }
787                }
788            }
789        }
790
791        // Collect from all levels
792        let levels = self.levels.read();
793        for level_info in levels.iter() {
794            for metadata in &level_info.sstables {
795                let reader = SSTableReader::open(&metadata.path)?;
796                let entries = reader.iter()?;
797                for (key, _) in entries {
798                    key_set.insert(key);
799                }
800            }
801        }
802
803        Ok(key_set.into_iter().collect())
804    }
805
806    /// Flush all pending writes to disk
807    pub fn flush(&self) -> Result<()> {
808        // Flush memtable if it has data
809        if self.memtable.size_bytes() > 0 {
810            self.try_flush_memtable()?;
811        }
812
813        // Flush immutable memtable if exists
814        self.flush_immutable_memtable()?;
815
816        // Flush WAL
817        {
818            let mut wal = self.wal.write();
819            wal.flush()?;
820        }
821
822        // Flush value log if configured
823        if let Some(ref vlog) = self.value_log {
824            vlog.flush()?;
825        }
826
827        Ok(())
828    }
829
830    /// Close the LSM-Tree gracefully
831    pub fn close(&self) -> Result<()> {
832        // Flush all pending writes
833        // File handles will be closed when the structures are dropped
834        self.flush()?;
835        Ok(())
836    }
837
838    // ===== ValuePointer encoding/decoding helpers =====
839
840    /// Encode a ValuePointer as a CipherBlob with "VPTR" magic prefix
841    fn encode_value_pointer(pointer: &ValuePointer) -> CipherBlob {
842        const MAGIC: &[u8] = b"VPTR"; // 4 bytes
843        let pointer_bytes = pointer.encode();
844
845        let mut bytes = Vec::with_capacity(MAGIC.len() + pointer_bytes.len());
846        bytes.extend_from_slice(MAGIC);
847        bytes.extend_from_slice(&pointer_bytes);
848
849        CipherBlob::new(bytes)
850    }
851
852    /// Decode a CipherBlob back to a ValuePointer
853    fn decode_value_pointer(blob: &CipherBlob) -> Result<ValuePointer> {
854        const MAGIC: &[u8] = b"VPTR";
855        let bytes = blob.as_bytes();
856
857        if bytes.len() < MAGIC.len() {
858            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
859                "Invalid value pointer: too short".to_string(),
860            )));
861        }
862
863        if &bytes[..MAGIC.len()] != MAGIC {
864            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
865                "Invalid value pointer: bad magic".to_string(),
866            )));
867        }
868
869        ValuePointer::decode(&bytes[MAGIC.len()..])
870    }
871
872    /// Check if a CipherBlob contains a ValuePointer
873    fn is_value_pointer(blob: &CipherBlob) -> bool {
874        const MAGIC: &[u8] = b"VPTR";
875        let bytes = blob.as_bytes();
876        bytes.len() >= MAGIC.len() && &bytes[..MAGIC.len()] == MAGIC
877    }
878}
879
880/// LSM-Tree statistics
881#[derive(Debug, Clone)]
882pub struct LsmTreeStats {
883    /// Current memtable size in bytes
884    pub memtable_size: usize,
885    /// Number of levels
886    pub num_levels: usize,
887    /// Level information
888    pub levels: Vec<LevelInfo>,
889    /// Block cache hit rate
890    pub cache_hit_rate: f64,
891    /// Block cache size in bytes
892    pub cache_size: usize,
893    /// Compaction statistics
894    pub compaction_stats: crate::storage::CompactionStatsSnapshot,
895}
896
897#[cfg(test)]
898mod tests {
899    use super::*;
900    use std::env;
901
902    #[test]
903    fn test_lsm_tree_basic_operations() -> Result<()> {
904        let dir = env::temp_dir().join("test_lsm_basic");
905        std::fs::create_dir_all(&dir).ok();
906
907        let lsm = LsmTree::new(&dir)?;
908
909        // Put
910        let key = Key::from_str("test_key");
911        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
912        lsm.put(key.clone(), value.clone())?;
913
914        // Get
915        let retrieved = lsm.get(&key)?;
916        assert!(retrieved.is_some());
917        assert_eq!(
918            retrieved
919                .expect("Value should be retrievable after put")
920                .as_bytes(),
921            &[1, 2, 3, 4, 5]
922        );
923
924        // Delete
925        lsm.delete(key.clone())?;
926
927        // Verify deleted
928        let retrieved = lsm.get(&key)?;
929        assert!(retrieved.is_none());
930
931        std::fs::remove_dir_all(&dir).ok();
932        Ok(())
933    }
934
935    #[test]
936    fn test_lsm_tree_multiple_keys() -> Result<()> {
937        let dir = env::temp_dir().join("test_lsm_multiple");
938        std::fs::create_dir_all(&dir).ok();
939
940        let lsm = LsmTree::new(&dir)?;
941
942        // Write multiple keys
943        for i in 0..10 {
944            let key = Key::from_str(&format!("key_{:03}", i));
945            let value = CipherBlob::new(vec![i as u8; 100]);
946            lsm.put(key, value)?;
947        }
948
949        // Read back
950        for i in 0..10 {
951            let key = Key::from_str(&format!("key_{:03}", i));
952            let value = lsm.get(&key)?;
953            assert!(value.is_some());
954            assert_eq!(value.expect("Value should exist").as_bytes()[0], i as u8);
955        }
956
957        std::fs::remove_dir_all(&dir).ok();
958        Ok(())
959    }
960
961    #[test]
962    fn test_lsm_tree_range_scan() -> Result<()> {
963        let dir = env::temp_dir().join("test_lsm_range");
964        std::fs::create_dir_all(&dir).ok();
965
966        let lsm = LsmTree::new(&dir)?;
967
968        // Write keys
969        for i in 0..20 {
970            let key = Key::from_str(&format!("key_{:03}", i));
971            let value = CipherBlob::new(vec![i as u8; 50]);
972            lsm.put(key, value)?;
973        }
974
975        // Range scan
976        let start = Key::from_str("key_005");
977        let end = Key::from_str("key_015");
978        let results = lsm.range(&start, &end)?;
979
980        assert!(results.len() >= 10);
981
982        std::fs::remove_dir_all(&dir).ok();
983        Ok(())
984    }
985
986    #[test]
987    fn test_lsm_tree_stats() -> Result<()> {
988        let dir = env::temp_dir().join("test_lsm_stats");
989        std::fs::create_dir_all(&dir).ok();
990
991        let lsm = LsmTree::new(&dir)?;
992
993        // Write some data
994        for i in 0..5 {
995            let key = Key::from_str(&format!("key_{}", i));
996            let value = CipherBlob::new(vec![i as u8; 100]);
997            lsm.put(key, value)?;
998        }
999
1000        let stats = lsm.stats();
1001        assert!(stats.memtable_size > 0);
1002        assert_eq!(stats.num_levels, 7); // Default max levels
1003
1004        std::fs::remove_dir_all(&dir).ok();
1005        Ok(())
1006    }
1007
1008    #[test]
1009    fn test_lsm_tree_compaction_trigger() -> Result<()> {
1010        let dir = env::temp_dir().join("test_lsm_compaction");
1011        std::fs::create_dir_all(&dir).ok();
1012
1013        let mut config = LsmTreeConfig {
1014            data_dir: dir.clone(),
1015            ..Default::default()
1016        };
1017        config.memtable_config.max_size_bytes = 1024; // Small memtable to trigger flushes
1018        config.l0_compaction_threshold = 2; // Low threshold for testing
1019
1020        let lsm = LsmTree::with_config(config)?;
1021
1022        // Write enough data to trigger multiple flushes and compaction
1023        for i in 0..100 {
1024            let key = Key::from_str(&format!("key_{:04}", i));
1025            let value = CipherBlob::new(vec![i as u8; 200]);
1026            lsm.put(key, value)?;
1027        }
1028
1029        // Check stats
1030        let stats = lsm.stats();
1031        assert!(
1032            stats.compaction_stats.compactions_completed > 0
1033                || !stats.levels[0].sstables.is_empty()
1034        );
1035
1036        std::fs::remove_dir_all(&dir).ok();
1037        Ok(())
1038    }
1039
1040    #[test]
1041    fn test_lsm_tree_compaction_stats() -> Result<()> {
1042        let dir = env::temp_dir().join("test_lsm_compaction_stats");
1043        std::fs::create_dir_all(&dir).ok();
1044
1045        let mut config = LsmTreeConfig {
1046            data_dir: dir.clone(),
1047            ..Default::default()
1048        };
1049        config.memtable_config.max_size_bytes = 512;
1050
1051        let lsm = LsmTree::with_config(config)?;
1052
1053        // Write data
1054        for i in 0..50 {
1055            let key = Key::from_str(&format!("key_{:04}", i));
1056            let value = CipherBlob::new(vec![i as u8; 100]);
1057            lsm.put(key, value)?;
1058        }
1059
1060        let stats = lsm.stats();
1061        // Compaction may have occurred due to small memtable size
1062        // Just verify stats structure is available (stats are unsigned, so always >= 0)
1063        let _ = stats.compaction_stats.keys_processed;
1064        let _ = stats.compaction_stats.tombstones_removed;
1065
1066        std::fs::remove_dir_all(&dir).ok();
1067        Ok(())
1068    }
1069
1070    #[test]
1071    fn test_lsm_tree_level_organization() -> Result<()> {
1072        let dir = env::temp_dir().join("test_lsm_levels");
1073        std::fs::create_dir_all(&dir).ok();
1074
1075        let mut config = LsmTreeConfig {
1076            data_dir: dir.clone(),
1077            ..Default::default()
1078        };
1079        config.memtable_config.max_size_bytes = 1024;
1080
1081        let lsm = LsmTree::with_config(config)?;
1082
1083        // Write data to populate levels
1084        for i in 0..200 {
1085            let key = Key::from_str(&format!("key_{:05}", i));
1086            let value = CipherBlob::new(vec![i as u8; 150]);
1087            lsm.put(key, value)?;
1088        }
1089
1090        // Verify all data is still readable (regardless of which level)
1091        for i in 0..200 {
1092            let key = Key::from_str(&format!("key_{:05}", i));
1093            let value = lsm.get(&key)?;
1094            assert!(value.is_some());
1095        }
1096
1097        // Check that data exists somewhere in the levels
1098        let stats = lsm.stats();
1099        let total_sstables: usize = stats.levels.iter().map(|l| l.sstables.len()).sum();
1100        assert!(total_sstables > 0 || stats.memtable_size > 0);
1101
1102        std::fs::remove_dir_all(&dir).ok();
1103        Ok(())
1104    }
1105
1106    #[test]
1107    fn test_lsm_tree_bloom_filter_negative_lookups() -> Result<()> {
1108        let dir = env::temp_dir().join("test_lsm_bloom");
1109        std::fs::create_dir_all(&dir).ok();
1110
1111        let mut config = LsmTreeConfig {
1112            data_dir: dir.clone(),
1113            ..Default::default()
1114        };
1115        config.memtable_config.max_size_bytes = 512; // Small to trigger flushes
1116
1117        let lsm = LsmTree::with_config(config)?;
1118
1119        // Write keys that will be flushed to SSTables
1120        for i in 0..100 {
1121            let key = Key::from_str(&format!("exists_{:04}", i));
1122            let value = CipherBlob::new(vec![i as u8; 100]);
1123            lsm.put(key, value)?;
1124        }
1125
1126        // Query for keys that exist (should find them)
1127        for i in 0..100 {
1128            let key = Key::from_str(&format!("exists_{:04}", i));
1129            let result = lsm.get(&key)?;
1130            assert!(result.is_some());
1131        }
1132
1133        // Query for keys that don't exist (bloom filter should help avoid disk reads)
1134        for i in 0..100 {
1135            let key = Key::from_str(&format!("notexists_{:04}", i));
1136            let result = lsm.get(&key)?;
1137            assert!(result.is_none());
1138        }
1139
1140        std::fs::remove_dir_all(&dir).ok();
1141        Ok(())
1142    }
1143
1144    // ===== WiscKey Value Separation Tests =====
1145
1146    #[test]
1147    fn test_lsm_tree_vlog_basic() -> Result<()> {
1148        let dir = env::temp_dir().join("test_lsm_vlog_basic");
1149        std::fs::create_dir_all(&dir).ok();
1150
1151        let config = LsmTreeConfig {
1152            data_dir: dir.clone(),
1153            wal_dir: dir.join("wal"),
1154            value_log_config: Some(ValueLogConfig {
1155                vlog_dir: dir.join("vlog"),
1156                max_file_size: 1024 * 1024, // 1MB
1157                value_threshold: 1024,      // 1KB
1158                sync_on_write: false,
1159                gc_threshold: 0.5,
1160            }),
1161            ..Default::default()
1162        };
1163
1164        let lsm = LsmTree::with_config(config)?;
1165
1166        // Small value (< 1KB) - stored inline
1167        let small_key = Key::from_str("small_key");
1168        let small_value = CipherBlob::new(vec![1u8; 512]);
1169        lsm.put(small_key.clone(), small_value.clone())?;
1170
1171        // Large value (> 1KB) - stored in vLog
1172        let large_key = Key::from_str("large_key");
1173        let large_value = CipherBlob::new(vec![2u8; 2048]);
1174        lsm.put(large_key.clone(), large_value.clone())?;
1175
1176        // Verify both values can be retrieved
1177        let retrieved_small = lsm.get(&small_key)?;
1178        assert!(retrieved_small.is_some());
1179        assert_eq!(
1180            retrieved_small
1181                .expect("Small value should be retrievable")
1182                .as_bytes(),
1183            &vec![1u8; 512]
1184        );
1185
1186        let retrieved_large = lsm.get(&large_key)?;
1187        assert!(retrieved_large.is_some());
1188        assert_eq!(
1189            retrieved_large
1190                .expect("Large value should be retrievable")
1191                .as_bytes(),
1192            &vec![2u8; 2048]
1193        );
1194
1195        std::fs::remove_dir_all(&dir).ok();
1196        Ok(())
1197    }
1198
1199    #[test]
1200    fn test_lsm_tree_vlog_multiple_large_values() -> Result<()> {
1201        let dir = env::temp_dir().join("test_lsm_vlog_multiple");
1202        std::fs::create_dir_all(&dir).ok();
1203
1204        let config = LsmTreeConfig {
1205            data_dir: dir.clone(),
1206            wal_dir: dir.join("wal"),
1207            value_log_config: Some(ValueLogConfig {
1208                vlog_dir: dir.join("vlog"),
1209                max_file_size: 1024 * 1024,
1210                value_threshold: 1024,
1211                sync_on_write: false,
1212                gc_threshold: 0.5,
1213            }),
1214            ..Default::default()
1215        };
1216
1217        let lsm = LsmTree::with_config(config)?;
1218
1219        // Write 20 large values
1220        for i in 0..20 {
1221            let key = Key::from_str(&format!("large_key_{:02}", i));
1222            let value = CipherBlob::new(vec![i as u8; 2048]);
1223            lsm.put(key, value)?;
1224        }
1225
1226        // Read back and verify
1227        for i in 0..20 {
1228            let key = Key::from_str(&format!("large_key_{:02}", i));
1229            let value = lsm.get(&key)?;
1230            assert!(value.is_some());
1231            let retrieved = value.expect("Value should exist");
1232            assert_eq!(retrieved.as_bytes()[0], i as u8);
1233            assert_eq!(retrieved.as_bytes().len(), 2048);
1234        }
1235
1236        std::fs::remove_dir_all(&dir).ok();
1237        Ok(())
1238    }
1239
1240    #[test]
1241    fn test_lsm_tree_vlog_with_flush() -> Result<()> {
1242        let dir = env::temp_dir().join("test_lsm_vlog_flush");
1243        std::fs::create_dir_all(&dir).ok();
1244
1245        let mut config = LsmTreeConfig {
1246            data_dir: dir.clone(),
1247            wal_dir: dir.join("wal"),
1248            value_log_config: Some(ValueLogConfig {
1249                vlog_dir: dir.join("vlog"),
1250                max_file_size: 1024 * 1024,
1251                value_threshold: 1024,
1252                sync_on_write: false,
1253                gc_threshold: 0.5,
1254            }),
1255            ..Default::default()
1256        };
1257        config.memtable_config.max_size_bytes = 4096; // Small memtable to trigger flushes
1258
1259        let lsm = LsmTree::with_config(config)?;
1260
1261        // Write enough data to trigger memtable flush
1262        for i in 0..50 {
1263            let key = Key::from_str(&format!("key_{:03}", i));
1264            let value = CipherBlob::new(vec![i as u8; 1500]); // > 1KB, stored in vLog
1265            lsm.put(key, value)?;
1266        }
1267
1268        // Verify all data is readable after flush
1269        for i in 0..50 {
1270            let key = Key::from_str(&format!("key_{:03}", i));
1271            let value = lsm.get(&key)?;
1272            assert!(value.is_some());
1273            let retrieved = value.expect("Value should exist");
1274            assert_eq!(retrieved.as_bytes()[0], i as u8);
1275        }
1276
1277        std::fs::remove_dir_all(&dir).ok();
1278        Ok(())
1279    }
1280
1281    #[test]
1282    fn test_lsm_tree_vlog_disabled() -> Result<()> {
1283        let dir = env::temp_dir().join("test_lsm_vlog_disabled");
1284        std::fs::create_dir_all(&dir).ok();
1285
1286        let config = LsmTreeConfig {
1287            data_dir: dir.clone(),
1288            value_log_config: None, // vLog disabled
1289            ..Default::default()
1290        };
1291
1292        let lsm = LsmTree::with_config(config)?;
1293
1294        // Large value should still work (stored inline)
1295        let key = Key::from_str("large_key");
1296        let value = CipherBlob::new(vec![42u8; 5000]);
1297        lsm.put(key.clone(), value.clone())?;
1298
1299        let retrieved = lsm.get(&key)?;
1300        assert!(retrieved.is_some());
1301        assert_eq!(
1302            retrieved
1303                .expect("Value should be retrievable after put")
1304                .as_bytes(),
1305            &vec![42u8; 5000]
1306        );
1307
1308        std::fs::remove_dir_all(&dir).ok();
1309        Ok(())
1310    }
1311
1312    #[test]
1313    fn test_lsm_tree_vlog_update() -> Result<()> {
1314        let dir = env::temp_dir().join("test_lsm_vlog_update");
1315        std::fs::create_dir_all(&dir).ok();
1316
1317        let config = LsmTreeConfig {
1318            data_dir: dir.clone(),
1319            wal_dir: dir.join("wal"),
1320            value_log_config: Some(ValueLogConfig {
1321                vlog_dir: dir.join("vlog"),
1322                max_file_size: 1024 * 1024,
1323                value_threshold: 1024,
1324                sync_on_write: false,
1325                gc_threshold: 0.5,
1326            }),
1327            ..Default::default()
1328        };
1329
1330        let lsm = LsmTree::with_config(config)?;
1331
1332        let key = Key::from_str("update_key");
1333
1334        // Write initial large value
1335        let value1 = CipherBlob::new(vec![1u8; 2048]);
1336        lsm.put(key.clone(), value1)?;
1337
1338        // Update with new large value
1339        let value2 = CipherBlob::new(vec![2u8; 2048]);
1340        lsm.put(key.clone(), value2)?;
1341
1342        // Verify latest value is retrieved
1343        let retrieved = lsm.get(&key)?;
1344        assert!(retrieved.is_some());
1345        assert_eq!(
1346            retrieved
1347                .expect("Value should be retrievable after put")
1348                .as_bytes()[0],
1349            2u8
1350        );
1351
1352        std::fs::remove_dir_all(&dir).ok();
1353        Ok(())
1354    }
1355
1356    #[test]
1357    fn test_lsm_tree_vlog_delete() -> Result<()> {
1358        let dir = env::temp_dir().join("test_lsm_vlog_delete");
1359        std::fs::create_dir_all(&dir).ok();
1360
1361        let config = LsmTreeConfig {
1362            data_dir: dir.clone(),
1363            wal_dir: dir.join("wal"),
1364            value_log_config: Some(ValueLogConfig {
1365                vlog_dir: dir.join("vlog"),
1366                max_file_size: 1024 * 1024,
1367                value_threshold: 1024,
1368                sync_on_write: false,
1369                gc_threshold: 0.5,
1370            }),
1371            ..Default::default()
1372        };
1373
1374        let lsm = LsmTree::with_config(config)?;
1375
1376        let key = Key::from_str("delete_key");
1377
1378        // Write large value
1379        let value = CipherBlob::new(vec![42u8; 2048]);
1380        lsm.put(key.clone(), value)?;
1381
1382        // Verify it exists
1383        assert!(lsm.get(&key)?.is_some());
1384
1385        // Delete it
1386        lsm.delete(key.clone())?;
1387
1388        // Verify it's gone
1389        assert!(lsm.get(&key)?.is_none());
1390
1391        std::fs::remove_dir_all(&dir).ok();
1392        Ok(())
1393    }
1394
1395    #[test]
1396    fn test_lsm_tree_value_pointer_encoding() -> Result<()> {
1397        // Test ValuePointer encoding/decoding
1398        let pointer = ValuePointer {
1399            file_id: 123,
1400            offset: 456789,
1401            length: 2048,
1402            checksum: 0xDEADBEEF,
1403        };
1404
1405        // Encode
1406        let encoded = LsmTree::encode_value_pointer(&pointer);
1407
1408        // Check it's marked as a pointer
1409        assert!(LsmTree::is_value_pointer(&encoded));
1410
1411        // Decode
1412        let decoded = LsmTree::decode_value_pointer(&encoded)?;
1413
1414        // Verify
1415        assert_eq!(decoded.file_id, 123);
1416        assert_eq!(decoded.offset, 456789);
1417        assert_eq!(decoded.length, 2048);
1418        assert_eq!(decoded.checksum, 0xDEADBEEF);
1419
1420        // Test non-pointer value
1421        let regular_value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
1422        assert!(!LsmTree::is_value_pointer(&regular_value));
1423
1424        Ok(())
1425    }
1426}