amaters_core/storage/
lsm_tree.rs

1//! LSM-Tree (Log-Structured Merge-Tree) implementation
2//!
3//! Multi-level persistent storage engine integrating:
4//! - Memtable (in-memory writes)
5//! - WAL (durability)
6//! - SSTables (persistent storage)
7//! - Block Cache (read optimization)
8
9use crate::error::{AmateRSError, ErrorContext, Result};
10use crate::storage::{
11    BlockCache, BlockCacheConfig, BlockCacheKey, CachedBlock, CompactionConfig, CompactionExecutor,
12    CompactionPlanner, Memtable, MemtableConfig, SSTableConfig, SSTableReader, SSTableWriter,
13    ValueLog, ValueLogConfig, ValuePointer, Wal,
14};
15use crate::types::{CipherBlob, Key};
16use parking_lot::RwLock;
17use std::collections::BTreeMap;
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20
21/// SSTable metadata
22#[derive(Debug, Clone)]
23pub struct SSTableMetadata {
24    /// SSTable file path
25    pub path: PathBuf,
26    /// Minimum key in the SSTable
27    pub min_key: Key,
28    /// Maximum key in the SSTable
29    pub max_key: Key,
30    /// Number of entries
31    pub num_entries: usize,
32    /// File size in bytes
33    pub file_size: u64,
34    /// Level in the LSM-Tree
35    pub level: usize,
36}
37
38/// Level information
39#[derive(Debug, Clone)]
40pub struct LevelInfo {
41    /// Level number (0 = L0, 1 = L1, etc.)
42    pub level: usize,
43    /// SSTables in this level
44    pub sstables: Vec<SSTableMetadata>,
45    /// Total size in bytes
46    pub total_size: u64,
47}
48
49impl LevelInfo {
50    fn new(level: usize) -> Self {
51        Self {
52            level,
53            sstables: Vec::new(),
54            total_size: 0,
55        }
56    }
57
58    fn add_sstable(&mut self, metadata: SSTableMetadata) {
59        self.total_size += metadata.file_size;
60        self.sstables.push(metadata);
61    }
62}
63
64/// LSM-Tree configuration
65#[derive(Debug, Clone)]
66pub struct LsmTreeConfig {
67    /// Directory for storing SSTables
68    pub data_dir: PathBuf,
69    /// WAL directory
70    pub wal_dir: PathBuf,
71    /// Memtable configuration
72    pub memtable_config: MemtableConfig,
73    /// SSTable configuration
74    pub sstable_config: SSTableConfig,
75    /// Block cache configuration
76    pub block_cache_config: BlockCacheConfig,
77    /// Compaction configuration
78    pub compaction_config: CompactionConfig,
79    /// Value log configuration (optional, for WiscKey value separation)
80    pub value_log_config: Option<ValueLogConfig>,
81    /// Maximum number of levels (default: 7)
82    pub max_levels: usize,
83    /// L0 size threshold for compaction (default: 4 SSTables)
84    pub l0_compaction_threshold: usize,
85    /// Level size multiplier (default: 10x per level)
86    pub level_size_multiplier: usize,
87}
88
89impl Default for LsmTreeConfig {
90    fn default() -> Self {
91        Self {
92            data_dir: PathBuf::from("./data"),
93            wal_dir: PathBuf::from("./wal"),
94            memtable_config: MemtableConfig::default(),
95            sstable_config: SSTableConfig::default(),
96            block_cache_config: BlockCacheConfig::default(),
97            compaction_config: CompactionConfig::default(),
98            value_log_config: None, // Disabled by default for backward compatibility
99            max_levels: 7,
100            l0_compaction_threshold: 4,
101            level_size_multiplier: 10,
102        }
103    }
104}
105
106/// LSM-Tree storage engine
107pub struct LsmTree {
108    /// Configuration
109    config: LsmTreeConfig,
110    /// Current memtable (active writes)
111    memtable: Arc<Memtable>,
112    /// Immutable memtable being flushed (if any)
113    immutable_memtable: Arc<RwLock<Option<Arc<Memtable>>>>,
114    /// Write-ahead log
115    wal: Arc<RwLock<Wal>>,
116    /// Value log for large values (WiscKey)
117    value_log: Option<Arc<ValueLog>>,
118    /// Levels (L0, L1, L2, ...)
119    levels: Arc<RwLock<Vec<LevelInfo>>>,
120    /// Block cache for SSTable blocks
121    block_cache: Arc<BlockCache>,
122    /// Next SSTable ID
123    next_sstable_id: Arc<RwLock<u64>>,
124    /// Compaction planner
125    compaction_planner: CompactionPlanner,
126    /// Compaction executor
127    compaction_executor: Arc<RwLock<CompactionExecutor>>,
128}
129
130impl LsmTree {
131    /// Create a new LSM-Tree with default configuration
132    pub fn new<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
133        let config = LsmTreeConfig {
134            data_dir: data_dir.as_ref().to_path_buf(),
135            ..Default::default()
136        };
137        Self::with_config(config)
138    }
139
140    /// Create a new LSM-Tree with custom configuration
141    pub fn with_config(config: LsmTreeConfig) -> Result<Self> {
142        // Create directories
143        std::fs::create_dir_all(&config.data_dir).map_err(|e| {
144            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
145                "Failed to create data directory: {}",
146                e
147            )))
148        })?;
149
150        std::fs::create_dir_all(&config.wal_dir).map_err(|e| {
151            AmateRSError::StorageIntegrity(ErrorContext::new(format!(
152                "Failed to create WAL directory: {}",
153                e
154            )))
155        })?;
156
157        // Initialize WAL
158        let wal_path = config.wal_dir.join("wal.log");
159        let wal = Wal::create(wal_path)?;
160
161        // Initialize memtable
162        let memtable = Memtable::with_config(config.memtable_config.clone());
163
164        // Initialize levels
165        let mut levels = Vec::with_capacity(config.max_levels);
166        for i in 0..config.max_levels {
167            levels.push(LevelInfo::new(i));
168        }
169
170        // Initialize block cache
171        let block_cache = BlockCache::with_config(config.block_cache_config.clone());
172
173        // Initialize compaction
174        let compaction_planner = CompactionPlanner::new(config.compaction_config.clone());
175        let compaction_executor = CompactionExecutor::new(config.sstable_config.clone());
176
177        // Initialize value log if configured
178        let value_log = if let Some(ref vlog_config) = config.value_log_config {
179            Some(Arc::new(ValueLog::with_config(vlog_config.clone())?))
180        } else {
181            None
182        };
183
184        let mut lsm = Self {
185            config,
186            memtable: Arc::new(memtable),
187            immutable_memtable: Arc::new(RwLock::new(None)),
188            wal: Arc::new(RwLock::new(wal)),
189            value_log,
190            levels: Arc::new(RwLock::new(levels)),
191            block_cache: Arc::new(block_cache),
192            next_sstable_id: Arc::new(RwLock::new(0)),
193            compaction_planner,
194            compaction_executor: Arc::new(RwLock::new(compaction_executor)),
195        };
196
197        // Recover existing SSTables from disk
198        lsm.recover_sstables()?;
199
200        Ok(lsm)
201    }
202
203    /// Recover existing SSTables from the data directory
204    fn recover_sstables(&mut self) -> Result<()> {
205        use std::fs;
206
207        // Scan data directory for SSTable files
208        let entries = fs::read_dir(&self.config.data_dir).map_err(|e| {
209            AmateRSError::IoError(ErrorContext::new(format!(
210                "Failed to read data directory: {}",
211                e
212            )))
213        })?;
214
215        let mut sstables_by_level: BTreeMap<usize, Vec<SSTableMetadata>> = BTreeMap::new();
216        let mut max_id = 0u64;
217
218        for entry in entries {
219            let entry = entry.map_err(|e| {
220                AmateRSError::IoError(ErrorContext::new(format!(
221                    "Failed to read directory entry: {}",
222                    e
223                )))
224            })?;
225
226            let path = entry.path();
227            let filename = match path.file_name().and_then(|n| n.to_str()) {
228                Some(name) => name,
229                None => continue,
230            };
231
232            // Parse filename format: L{level}_{id}.sst
233            if filename.starts_with('L') && filename.ends_with(".sst") {
234                let parts: Vec<&str> = filename[1..].trim_end_matches(".sst").split('_').collect();
235                if parts.len() == 2 {
236                    if let (Ok(level), Ok(id)) =
237                        (parts[0].parse::<usize>(), parts[1].parse::<u64>())
238                    {
239                        // Update max ID
240                        if id > max_id {
241                            max_id = id;
242                        }
243
244                        // Read SSTable metadata
245                        let reader = SSTableReader::open(&path)?;
246                        let (min_key, max_key, num_entries) = reader.metadata()?;
247
248                        let file_size = std::fs::metadata(&path)
249                            .map_err(|e| {
250                                AmateRSError::IoError(ErrorContext::new(format!(
251                                    "Failed to get file size: {}",
252                                    e
253                                )))
254                            })?
255                            .len();
256
257                        let metadata = SSTableMetadata {
258                            path: path.clone(),
259                            min_key,
260                            max_key,
261                            num_entries,
262                            file_size,
263                            level,
264                        };
265
266                        sstables_by_level.entry(level).or_default().push(metadata);
267                    }
268                }
269            }
270        }
271
272        // Add recovered SSTables to levels
273        let mut levels = self.levels.write();
274        for (level, mut sstables) in sstables_by_level {
275            if level < levels.len() {
276                // Sort SSTables by min_key for non-L0 levels
277                if level > 0 {
278                    sstables.sort_by(|a, b| a.min_key.cmp(&b.min_key));
279                }
280
281                for metadata in sstables {
282                    levels[level].add_sstable(metadata);
283                }
284            }
285        }
286        drop(levels);
287
288        // Set next SSTable ID
289        *self.next_sstable_id.write() = max_id + 1;
290
291        Ok(())
292    }
293
294    /// Put a key-value pair
295    pub fn put(&self, key: Key, value: CipherBlob) -> Result<()> {
296        // Check if value should be separated to vLog
297        let stored_value = if let Some(ref vlog) = self.value_log {
298            if vlog.should_separate(&value) {
299                // Store in vLog and get pointer
300                let pointer = vlog.append(key.clone(), value)?;
301                vlog.flush()?;
302
303                // Encode pointer as CipherBlob with "VPTR" magic prefix
304                Self::encode_value_pointer(&pointer)
305            } else {
306                // Store value directly
307                value
308            }
309        } else {
310            // No vLog configured, store value directly
311            value
312        };
313
314        // Write to WAL first (durability)
315        {
316            let mut wal = self.wal.write();
317            wal.put(key.clone(), stored_value.clone())?;
318        }
319
320        // Write to memtable
321        self.memtable.put(key, stored_value)?;
322
323        // Check if memtable should be flushed
324        if self.memtable.should_flush() {
325            self.try_flush_memtable()?;
326        }
327
328        Ok(())
329    }
330
331    /// Get a value by key
332    pub fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
333        // Check memtable first (most recent data)
334        if let Some(value) = self.memtable.get(key)? {
335            return self.resolve_value(value);
336        }
337
338        // Check immutable memtable if exists
339        {
340            let immutable = self.immutable_memtable.read();
341            if let Some(ref memtable) = *immutable {
342                if let Some(value) = memtable.get(key)? {
343                    return self.resolve_value(value);
344                }
345            }
346        }
347
348        // Search through levels (L0 to Ln)
349        let levels = self.levels.read();
350        for level_info in levels.iter() {
351            if let Some(value) = self.search_level(level_info, key)? {
352                return self.resolve_value(value);
353            }
354        }
355
356        Ok(None)
357    }
358
359    /// Resolve a value: if it's a ValuePointer, read from vLog; otherwise return as-is
360    fn resolve_value(&self, value: CipherBlob) -> Result<Option<CipherBlob>> {
361        // Check for tombstone (zero-length blob)
362        if value.as_bytes().is_empty() {
363            return Ok(None);
364        }
365
366        if Self::is_value_pointer(&value) {
367            if let Some(ref vlog) = self.value_log {
368                let pointer = Self::decode_value_pointer(&value)?;
369                let actual_value = vlog.read(&pointer)?;
370                Ok(Some(actual_value))
371            } else {
372                Err(AmateRSError::StorageIntegrity(ErrorContext::new(
373                    "Found value pointer but vLog is not configured".to_string(),
374                )))
375            }
376        } else {
377            Ok(Some(value))
378        }
379    }
380
381    /// Delete a key
382    pub fn delete(&self, key: Key) -> Result<()> {
383        // Write tombstone to WAL
384        {
385            let mut wal = self.wal.write();
386            wal.delete(key.clone())?;
387        }
388
389        // Write tombstone to memtable
390        self.memtable.delete(key)?;
391
392        // Check if memtable should be flushed
393        if self.memtable.should_flush() {
394            self.try_flush_memtable()?;
395        }
396
397        Ok(())
398    }
399
400    /// Range scan
401    pub fn range(&self, start: &Key, end: &Key) -> Result<Vec<(Key, CipherBlob)>> {
402        let mut results = BTreeMap::new();
403
404        // Collect from all levels (newer data overwrites older)
405        let levels = self.levels.read();
406        for level_info in levels.iter().rev() {
407            let level_results = self.range_scan_level(level_info, start, end)?;
408            for (k, v) in level_results {
409                results.entry(k).or_insert(v);
410            }
411        }
412
413        // Check immutable memtable
414        {
415            let immutable = self.immutable_memtable.read();
416            if let Some(ref memtable) = *immutable {
417                for (k, v) in memtable.range(start, end) {
418                    results.insert(k, v);
419                }
420            }
421        }
422
423        // Check memtable (most recent)
424        for (k, v) in self.memtable.range(start, end) {
425            results.insert(k, v);
426        }
427
428        Ok(results.into_iter().collect())
429    }
430
431    /// Search a specific level for a key
432    fn search_level(&self, level_info: &LevelInfo, key: &Key) -> Result<Option<CipherBlob>> {
433        // For L0, check all SSTables (may have overlapping ranges)
434        if level_info.level == 0 {
435            // Check newest first
436            for metadata in level_info.sstables.iter().rev() {
437                if key >= &metadata.min_key && key <= &metadata.max_key {
438                    if let Some(value) = self.read_from_sstable(&metadata.path, key)? {
439                        return Ok(Some(value));
440                    }
441                }
442            }
443        } else {
444            // For L1+, SSTables are non-overlapping, use binary search
445            let idx = level_info.sstables.binary_search_by(|metadata| {
446                if key < &metadata.min_key {
447                    std::cmp::Ordering::Greater
448                } else if key > &metadata.max_key {
449                    std::cmp::Ordering::Less
450                } else {
451                    std::cmp::Ordering::Equal
452                }
453            });
454
455            if let Ok(idx) = idx {
456                let metadata = &level_info.sstables[idx];
457                if let Some(value) = self.read_from_sstable(&metadata.path, key)? {
458                    return Ok(Some(value));
459                }
460            }
461        }
462
463        Ok(None)
464    }
465
466    /// Range scan a specific level
467    fn range_scan_level(
468        &self,
469        level_info: &LevelInfo,
470        start: &Key,
471        end: &Key,
472    ) -> Result<Vec<(Key, CipherBlob)>> {
473        let mut results = Vec::new();
474
475        for metadata in &level_info.sstables {
476            // Skip if SSTable range doesn't overlap with query range
477            if &metadata.max_key < start || &metadata.min_key > end {
478                continue;
479            }
480
481            // Read from SSTable
482            let reader = SSTableReader::open(&metadata.path)?;
483            let entries = reader.iter()?;
484
485            for (k, v) in entries {
486                // Range is [start, end) - start inclusive, end exclusive
487                if &k >= start && &k < end {
488                    results.push((k, v));
489                }
490            }
491        }
492
493        Ok(results)
494    }
495
496    /// Read a key from an SSTable with block cache
497    fn read_from_sstable(&self, path: &Path, key: &Key) -> Result<Option<CipherBlob>> {
498        let reader = SSTableReader::open(path)?;
499        reader.get(key)
500    }
501
502    /// Try to flush memtable to L0
503    fn try_flush_memtable(&self) -> Result<()> {
504        // Check if already flushing
505        {
506            let immutable = self.immutable_memtable.read();
507            if immutable.is_some() {
508                // Already flushing, skip
509                return Ok(());
510            }
511        }
512
513        // Swap memtable to immutable
514        {
515            let mut immutable = self.immutable_memtable.write();
516            if immutable.is_some() {
517                return Ok(());
518            }
519
520            // Swap current memtable with a new one
521            let old_memtable = Arc::clone(&self.memtable);
522            let new_memtable = Memtable::with_config(self.config.memtable_config.clone());
523
524            // Note: In a real implementation, we'd use Arc::make_mut or similar
525            // For now, this is a simplified version
526            *immutable = Some(old_memtable);
527        }
528
529        // Flush immutable memtable to SSTable
530        self.flush_immutable_memtable()?;
531
532        Ok(())
533    }
534
535    /// Flush immutable memtable to L0 SSTable
536    fn flush_immutable_memtable(&self) -> Result<()> {
537        let memtable = {
538            let mut immutable = self.immutable_memtable.write();
539            immutable.take()
540        };
541
542        if let Some(memtable) = memtable {
543            // Generate SSTable path
544            let sstable_id = {
545                let mut next_id = self.next_sstable_id.write();
546                let id = *next_id;
547                *next_id += 1;
548                id
549            };
550
551            let sstable_path = self
552                .config
553                .data_dir
554                .join(format!("L0_{:08}.sst", sstable_id));
555
556            // Write SSTable
557            let mut writer = SSTableWriter::new(&sstable_path, self.config.sstable_config.clone())?;
558
559            let entries = memtable.entries();
560            let mut min_key = None;
561            let mut max_key = None;
562            let mut num_entries = 0;
563
564            for (key, value_opt) in entries {
565                // Write both values and tombstones to SSTable
566                // Tombstones are written as zero-length blobs
567                let value = value_opt.unwrap_or_else(|| CipherBlob::new(Vec::new()));
568
569                if min_key.is_none() {
570                    min_key = Some(key.clone());
571                }
572                max_key = Some(key.clone());
573                writer.add(key, value)?;
574                num_entries += 1;
575            }
576
577            writer.finish()?;
578
579            // Get file size
580            let file_size = std::fs::metadata(&sstable_path)
581                .map_err(|e| {
582                    AmateRSError::StorageIntegrity(ErrorContext::new(format!(
583                        "Failed to get SSTable size: {}",
584                        e
585                    )))
586                })?
587                .len();
588
589            // Add to L0
590            if let (Some(min_key), Some(max_key)) = (min_key, max_key) {
591                let metadata = SSTableMetadata {
592                    path: sstable_path,
593                    min_key,
594                    max_key,
595                    num_entries,
596                    file_size,
597                    level: 0,
598                };
599
600                let mut levels = self.levels.write();
601                levels[0].add_sstable(metadata);
602            }
603
604            // Trigger compaction if L0 threshold exceeded
605            self.trigger_compaction()?;
606        }
607
608        Ok(())
609    }
610
611    /// Trigger compaction if needed
612    fn trigger_compaction(&self) -> Result<()> {
613        let levels = self.levels.read();
614
615        // Check L0 compaction threshold
616        let l0_count = levels[0].sstables.len();
617        if self.compaction_planner.needs_l0_compaction(l0_count) {
618            drop(levels); // Release read lock before compaction
619            return self.compact_l0_to_l1();
620        }
621
622        // Check other levels for size-based compaction
623        for level_info in levels.iter() {
624            if level_info.level > 0
625                && self
626                    .compaction_planner
627                    .needs_level_compaction(level_info.level, level_info.total_size)
628            {
629                let source_level = level_info.level;
630                drop(levels); // Release read lock
631                return self.compact_level(source_level);
632            }
633        }
634
635        Ok(())
636    }
637
638    /// Compact L0 to L1
639    fn compact_l0_to_l1(&self) -> Result<()> {
640        let (source_sstables, target_sstables) = {
641            let levels = self.levels.read();
642            let source = levels[0].sstables.clone();
643            let target = if levels.len() > 1 {
644                levels[1].sstables.clone()
645            } else {
646                Vec::new()
647            };
648            (source, target)
649        };
650
651        if let Some(task) =
652            self.compaction_planner
653                .plan_compaction(0, source_sstables, target_sstables)
654        {
655            self.execute_compaction_task(task)?;
656        }
657
658        Ok(())
659    }
660
661    /// Compact a specific level
662    fn compact_level(&self, source_level: usize) -> Result<()> {
663        let (source_sstables, target_sstables) = {
664            let levels = self.levels.read();
665            if source_level >= levels.len() {
666                return Ok(());
667            }
668
669            let source = levels[source_level].sstables.clone();
670            let target = if source_level + 1 < levels.len() {
671                levels[source_level + 1].sstables.clone()
672            } else {
673                Vec::new()
674            };
675            (source, target)
676        };
677
678        if let Some(task) =
679            self.compaction_planner
680                .plan_compaction(source_level, source_sstables, target_sstables)
681        {
682            self.execute_compaction_task(task)?;
683        }
684
685        Ok(())
686    }
687
688    /// Execute a compaction task
689    fn execute_compaction_task(&self, task: crate::storage::CompactionTask) -> Result<()> {
690        // Execute compaction
691        let output_sstables = {
692            let mut executor = self.compaction_executor.write();
693            let mut next_id = self.next_sstable_id.write();
694            executor.execute_compaction(task.clone(), &self.config.data_dir, &mut next_id)?
695        };
696
697        // Update levels: remove old SSTables, add new ones
698        let mut levels = self.levels.write();
699
700        // Remove source SSTables
701        levels[task.source_level]
702            .sstables
703            .retain(|s| !task.source_sstables.iter().any(|ts| ts.path == s.path));
704        levels[task.source_level].total_size = levels[task.source_level]
705            .sstables
706            .iter()
707            .map(|s| s.file_size)
708            .sum();
709
710        // Remove target SSTables
711        if task.target_level < levels.len() {
712            levels[task.target_level]
713                .sstables
714                .retain(|s| !task.target_sstables.iter().any(|ts| ts.path == s.path));
715            levels[task.target_level].total_size = levels[task.target_level]
716                .sstables
717                .iter()
718                .map(|s| s.file_size)
719                .sum();
720
721            // Add new SSTables
722            for sstable in output_sstables {
723                levels[task.target_level].add_sstable(sstable);
724            }
725        }
726
727        drop(levels);
728
729        // Delete old SSTable files
730        for sstable in task.source_sstables.iter().chain(&task.target_sstables) {
731            std::fs::remove_file(&sstable.path).ok();
732        }
733
734        Ok(())
735    }
736
737    /// Get level information
738    pub fn level_info(&self, level: usize) -> Option<LevelInfo> {
739        let levels = self.levels.read();
740        if level < levels.len() {
741            Some(levels[level].clone())
742        } else {
743            None
744        }
745    }
746
747    /// Get all levels information
748    pub fn all_levels_info(&self) -> Vec<LevelInfo> {
749        self.levels.read().clone()
750    }
751
752    /// Get statistics
753    pub fn stats(&self) -> LsmTreeStats {
754        let levels = self.levels.read();
755        let cache_stats = self.block_cache.stats();
756        let compaction_stats = self.compaction_executor.read().stats().clone();
757
758        LsmTreeStats {
759            memtable_size: self.memtable.size_bytes(),
760            num_levels: levels.len(),
761            levels: levels.clone(),
762            cache_hit_rate: cache_stats.hit_rate(),
763            cache_size: cache_stats.size_bytes,
764            compaction_stats,
765        }
766    }
767
768    /// Get all keys from the LSM-Tree
769    pub fn keys(&self) -> Result<Vec<Key>> {
770        let mut key_set = std::collections::BTreeSet::new();
771
772        // Collect from memtable
773        for (key, value_opt) in self.memtable.entries() {
774            if value_opt.is_some() {
775                key_set.insert(key);
776            }
777        }
778
779        // Collect from immutable memtable
780        {
781            let immutable = self.immutable_memtable.read();
782            if let Some(ref memtable) = *immutable {
783                for (key, value_opt) in memtable.entries() {
784                    if value_opt.is_some() {
785                        key_set.insert(key);
786                    }
787                }
788            }
789        }
790
791        // Collect from all levels
792        let levels = self.levels.read();
793        for level_info in levels.iter() {
794            for metadata in &level_info.sstables {
795                let reader = SSTableReader::open(&metadata.path)?;
796                let entries = reader.iter()?;
797                for (key, _) in entries {
798                    key_set.insert(key);
799                }
800            }
801        }
802
803        Ok(key_set.into_iter().collect())
804    }
805
806    /// Flush all pending writes to disk
807    pub fn flush(&self) -> Result<()> {
808        // Flush memtable if it has data
809        if self.memtable.size_bytes() > 0 {
810            self.try_flush_memtable()?;
811        }
812
813        // Flush immutable memtable if exists
814        self.flush_immutable_memtable()?;
815
816        // Flush WAL
817        {
818            let mut wal = self.wal.write();
819            wal.flush()?;
820        }
821
822        // Flush value log if configured
823        if let Some(ref vlog) = self.value_log {
824            vlog.flush()?;
825        }
826
827        Ok(())
828    }
829
830    /// Close the LSM-Tree gracefully
831    pub fn close(&self) -> Result<()> {
832        // Flush all pending writes
833        // File handles will be closed when the structures are dropped
834        self.flush()?;
835        Ok(())
836    }
837
838    // ===== ValuePointer encoding/decoding helpers =====
839
840    /// Encode a ValuePointer as a CipherBlob with "VPTR" magic prefix
841    fn encode_value_pointer(pointer: &ValuePointer) -> CipherBlob {
842        const MAGIC: &[u8] = b"VPTR"; // 4 bytes
843        let pointer_bytes = pointer.encode();
844
845        let mut bytes = Vec::with_capacity(MAGIC.len() + pointer_bytes.len());
846        bytes.extend_from_slice(MAGIC);
847        bytes.extend_from_slice(&pointer_bytes);
848
849        CipherBlob::new(bytes)
850    }
851
852    /// Decode a CipherBlob back to a ValuePointer
853    fn decode_value_pointer(blob: &CipherBlob) -> Result<ValuePointer> {
854        const MAGIC: &[u8] = b"VPTR";
855        let bytes = blob.as_bytes();
856
857        if bytes.len() < MAGIC.len() {
858            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
859                "Invalid value pointer: too short".to_string(),
860            )));
861        }
862
863        if &bytes[..MAGIC.len()] != MAGIC {
864            return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
865                "Invalid value pointer: bad magic".to_string(),
866            )));
867        }
868
869        ValuePointer::decode(&bytes[MAGIC.len()..])
870    }
871
872    /// Check if a CipherBlob contains a ValuePointer
873    fn is_value_pointer(blob: &CipherBlob) -> bool {
874        const MAGIC: &[u8] = b"VPTR";
875        let bytes = blob.as_bytes();
876        bytes.len() >= MAGIC.len() && &bytes[..MAGIC.len()] == MAGIC
877    }
878}
879
880/// LSM-Tree statistics
881#[derive(Debug, Clone)]
882pub struct LsmTreeStats {
883    /// Current memtable size in bytes
884    pub memtable_size: usize,
885    /// Number of levels
886    pub num_levels: usize,
887    /// Level information
888    pub levels: Vec<LevelInfo>,
889    /// Block cache hit rate
890    pub cache_hit_rate: f64,
891    /// Block cache size in bytes
892    pub cache_size: usize,
893    /// Compaction statistics
894    pub compaction_stats: crate::storage::CompactionStats,
895}
896
897#[cfg(test)]
898mod tests {
899    use super::*;
900    use std::env;
901
902    #[test]
903    fn test_lsm_tree_basic_operations() -> Result<()> {
904        let dir = env::temp_dir().join("test_lsm_basic");
905        std::fs::create_dir_all(&dir).ok();
906
907        let lsm = LsmTree::new(&dir)?;
908
909        // Put
910        let key = Key::from_str("test_key");
911        let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
912        lsm.put(key.clone(), value.clone())?;
913
914        // Get
915        let retrieved = lsm.get(&key)?;
916        assert!(retrieved.is_some());
917        assert_eq!(
918            retrieved
919                .expect("Value should be retrievable after put")
920                .as_bytes(),
921            &[1, 2, 3, 4, 5]
922        );
923
924        // Delete
925        lsm.delete(key.clone())?;
926
927        // Verify deleted
928        let retrieved = lsm.get(&key)?;
929        assert!(retrieved.is_none());
930
931        std::fs::remove_dir_all(&dir).ok();
932        Ok(())
933    }
934
935    #[test]
936    fn test_lsm_tree_multiple_keys() -> Result<()> {
937        let dir = env::temp_dir().join("test_lsm_multiple");
938        std::fs::create_dir_all(&dir).ok();
939
940        let lsm = LsmTree::new(&dir)?;
941
942        // Write multiple keys
943        for i in 0..10 {
944            let key = Key::from_str(&format!("key_{:03}", i));
945            let value = CipherBlob::new(vec![i as u8; 100]);
946            lsm.put(key, value)?;
947        }
948
949        // Read back
950        for i in 0..10 {
951            let key = Key::from_str(&format!("key_{:03}", i));
952            let value = lsm.get(&key)?;
953            assert!(value.is_some());
954            assert_eq!(value.expect("Value should exist").as_bytes()[0], i as u8);
955        }
956
957        std::fs::remove_dir_all(&dir).ok();
958        Ok(())
959    }
960
961    #[test]
962    fn test_lsm_tree_range_scan() -> Result<()> {
963        let dir = env::temp_dir().join("test_lsm_range");
964        std::fs::create_dir_all(&dir).ok();
965
966        let lsm = LsmTree::new(&dir)?;
967
968        // Write keys
969        for i in 0..20 {
970            let key = Key::from_str(&format!("key_{:03}", i));
971            let value = CipherBlob::new(vec![i as u8; 50]);
972            lsm.put(key, value)?;
973        }
974
975        // Range scan
976        let start = Key::from_str("key_005");
977        let end = Key::from_str("key_015");
978        let results = lsm.range(&start, &end)?;
979
980        assert!(results.len() >= 10);
981
982        std::fs::remove_dir_all(&dir).ok();
983        Ok(())
984    }
985
986    #[test]
987    fn test_lsm_tree_stats() -> Result<()> {
988        let dir = env::temp_dir().join("test_lsm_stats");
989        std::fs::create_dir_all(&dir).ok();
990
991        let lsm = LsmTree::new(&dir)?;
992
993        // Write some data
994        for i in 0..5 {
995            let key = Key::from_str(&format!("key_{}", i));
996            let value = CipherBlob::new(vec![i as u8; 100]);
997            lsm.put(key, value)?;
998        }
999
1000        let stats = lsm.stats();
1001        assert!(stats.memtable_size > 0);
1002        assert_eq!(stats.num_levels, 7); // Default max levels
1003
1004        std::fs::remove_dir_all(&dir).ok();
1005        Ok(())
1006    }
1007
1008    #[test]
1009    fn test_lsm_tree_compaction_trigger() -> Result<()> {
1010        let dir = env::temp_dir().join("test_lsm_compaction");
1011        std::fs::create_dir_all(&dir).ok();
1012
1013        let mut config = LsmTreeConfig {
1014            data_dir: dir.clone(),
1015            ..Default::default()
1016        };
1017        config.memtable_config.max_size_bytes = 1024; // Small memtable to trigger flushes
1018        config.l0_compaction_threshold = 2; // Low threshold for testing
1019
1020        let lsm = LsmTree::with_config(config)?;
1021
1022        // Write enough data to trigger multiple flushes and compaction
1023        for i in 0..100 {
1024            let key = Key::from_str(&format!("key_{:04}", i));
1025            let value = CipherBlob::new(vec![i as u8; 200]);
1026            lsm.put(key, value)?;
1027        }
1028
1029        // Check stats
1030        let stats = lsm.stats();
1031        assert!(
1032            stats.compaction_stats.total_compactions > 0 || !stats.levels[0].sstables.is_empty()
1033        );
1034
1035        std::fs::remove_dir_all(&dir).ok();
1036        Ok(())
1037    }
1038
1039    #[test]
1040    fn test_lsm_tree_compaction_stats() -> Result<()> {
1041        let dir = env::temp_dir().join("test_lsm_compaction_stats");
1042        std::fs::create_dir_all(&dir).ok();
1043
1044        let mut config = LsmTreeConfig {
1045            data_dir: dir.clone(),
1046            ..Default::default()
1047        };
1048        config.memtable_config.max_size_bytes = 512;
1049
1050        let lsm = LsmTree::with_config(config)?;
1051
1052        // Write data
1053        for i in 0..50 {
1054            let key = Key::from_str(&format!("key_{:04}", i));
1055            let value = CipherBlob::new(vec![i as u8; 100]);
1056            lsm.put(key, value)?;
1057        }
1058
1059        let stats = lsm.stats();
1060        // Compaction may have occurred due to small memtable size
1061        // Just verify stats structure is available (stats are unsigned, so always >= 0)
1062        let _ = stats.compaction_stats.keys_processed;
1063        let _ = stats.compaction_stats.tombstones_removed;
1064
1065        std::fs::remove_dir_all(&dir).ok();
1066        Ok(())
1067    }
1068
1069    #[test]
1070    fn test_lsm_tree_level_organization() -> Result<()> {
1071        let dir = env::temp_dir().join("test_lsm_levels");
1072        std::fs::create_dir_all(&dir).ok();
1073
1074        let mut config = LsmTreeConfig {
1075            data_dir: dir.clone(),
1076            ..Default::default()
1077        };
1078        config.memtable_config.max_size_bytes = 1024;
1079
1080        let lsm = LsmTree::with_config(config)?;
1081
1082        // Write data to populate levels
1083        for i in 0..200 {
1084            let key = Key::from_str(&format!("key_{:05}", i));
1085            let value = CipherBlob::new(vec![i as u8; 150]);
1086            lsm.put(key, value)?;
1087        }
1088
1089        // Verify all data is still readable (regardless of which level)
1090        for i in 0..200 {
1091            let key = Key::from_str(&format!("key_{:05}", i));
1092            let value = lsm.get(&key)?;
1093            assert!(value.is_some());
1094        }
1095
1096        // Check that data exists somewhere in the levels
1097        let stats = lsm.stats();
1098        let total_sstables: usize = stats.levels.iter().map(|l| l.sstables.len()).sum();
1099        assert!(total_sstables > 0 || stats.memtable_size > 0);
1100
1101        std::fs::remove_dir_all(&dir).ok();
1102        Ok(())
1103    }
1104
1105    #[test]
1106    fn test_lsm_tree_bloom_filter_negative_lookups() -> Result<()> {
1107        let dir = env::temp_dir().join("test_lsm_bloom");
1108        std::fs::create_dir_all(&dir).ok();
1109
1110        let mut config = LsmTreeConfig {
1111            data_dir: dir.clone(),
1112            ..Default::default()
1113        };
1114        config.memtable_config.max_size_bytes = 512; // Small to trigger flushes
1115
1116        let lsm = LsmTree::with_config(config)?;
1117
1118        // Write keys that will be flushed to SSTables
1119        for i in 0..100 {
1120            let key = Key::from_str(&format!("exists_{:04}", i));
1121            let value = CipherBlob::new(vec![i as u8; 100]);
1122            lsm.put(key, value)?;
1123        }
1124
1125        // Query for keys that exist (should find them)
1126        for i in 0..100 {
1127            let key = Key::from_str(&format!("exists_{:04}", i));
1128            let result = lsm.get(&key)?;
1129            assert!(result.is_some());
1130        }
1131
1132        // Query for keys that don't exist (bloom filter should help avoid disk reads)
1133        for i in 0..100 {
1134            let key = Key::from_str(&format!("notexists_{:04}", i));
1135            let result = lsm.get(&key)?;
1136            assert!(result.is_none());
1137        }
1138
1139        std::fs::remove_dir_all(&dir).ok();
1140        Ok(())
1141    }
1142
1143    // ===== WiscKey Value Separation Tests =====
1144
1145    #[test]
1146    fn test_lsm_tree_vlog_basic() -> Result<()> {
1147        let dir = env::temp_dir().join("test_lsm_vlog_basic");
1148        std::fs::create_dir_all(&dir).ok();
1149
1150        let config = LsmTreeConfig {
1151            data_dir: dir.clone(),
1152            wal_dir: dir.join("wal"),
1153            value_log_config: Some(ValueLogConfig {
1154                vlog_dir: dir.join("vlog"),
1155                max_file_size: 1024 * 1024, // 1MB
1156                value_threshold: 1024,      // 1KB
1157                sync_on_write: false,
1158                gc_threshold: 0.5,
1159            }),
1160            ..Default::default()
1161        };
1162
1163        let lsm = LsmTree::with_config(config)?;
1164
1165        // Small value (< 1KB) - stored inline
1166        let small_key = Key::from_str("small_key");
1167        let small_value = CipherBlob::new(vec![1u8; 512]);
1168        lsm.put(small_key.clone(), small_value.clone())?;
1169
1170        // Large value (> 1KB) - stored in vLog
1171        let large_key = Key::from_str("large_key");
1172        let large_value = CipherBlob::new(vec![2u8; 2048]);
1173        lsm.put(large_key.clone(), large_value.clone())?;
1174
1175        // Verify both values can be retrieved
1176        let retrieved_small = lsm.get(&small_key)?;
1177        assert!(retrieved_small.is_some());
1178        assert_eq!(
1179            retrieved_small
1180                .expect("Small value should be retrievable")
1181                .as_bytes(),
1182            &vec![1u8; 512]
1183        );
1184
1185        let retrieved_large = lsm.get(&large_key)?;
1186        assert!(retrieved_large.is_some());
1187        assert_eq!(
1188            retrieved_large
1189                .expect("Large value should be retrievable")
1190                .as_bytes(),
1191            &vec![2u8; 2048]
1192        );
1193
1194        std::fs::remove_dir_all(&dir).ok();
1195        Ok(())
1196    }
1197
1198    #[test]
1199    fn test_lsm_tree_vlog_multiple_large_values() -> Result<()> {
1200        let dir = env::temp_dir().join("test_lsm_vlog_multiple");
1201        std::fs::create_dir_all(&dir).ok();
1202
1203        let config = LsmTreeConfig {
1204            data_dir: dir.clone(),
1205            wal_dir: dir.join("wal"),
1206            value_log_config: Some(ValueLogConfig {
1207                vlog_dir: dir.join("vlog"),
1208                max_file_size: 1024 * 1024,
1209                value_threshold: 1024,
1210                sync_on_write: false,
1211                gc_threshold: 0.5,
1212            }),
1213            ..Default::default()
1214        };
1215
1216        let lsm = LsmTree::with_config(config)?;
1217
1218        // Write 20 large values
1219        for i in 0..20 {
1220            let key = Key::from_str(&format!("large_key_{:02}", i));
1221            let value = CipherBlob::new(vec![i as u8; 2048]);
1222            lsm.put(key, value)?;
1223        }
1224
1225        // Read back and verify
1226        for i in 0..20 {
1227            let key = Key::from_str(&format!("large_key_{:02}", i));
1228            let value = lsm.get(&key)?;
1229            assert!(value.is_some());
1230            let retrieved = value.expect("Value should exist");
1231            assert_eq!(retrieved.as_bytes()[0], i as u8);
1232            assert_eq!(retrieved.as_bytes().len(), 2048);
1233        }
1234
1235        std::fs::remove_dir_all(&dir).ok();
1236        Ok(())
1237    }
1238
1239    #[test]
1240    fn test_lsm_tree_vlog_with_flush() -> Result<()> {
1241        let dir = env::temp_dir().join("test_lsm_vlog_flush");
1242        std::fs::create_dir_all(&dir).ok();
1243
1244        let mut config = LsmTreeConfig {
1245            data_dir: dir.clone(),
1246            wal_dir: dir.join("wal"),
1247            value_log_config: Some(ValueLogConfig {
1248                vlog_dir: dir.join("vlog"),
1249                max_file_size: 1024 * 1024,
1250                value_threshold: 1024,
1251                sync_on_write: false,
1252                gc_threshold: 0.5,
1253            }),
1254            ..Default::default()
1255        };
1256        config.memtable_config.max_size_bytes = 4096; // Small memtable to trigger flushes
1257
1258        let lsm = LsmTree::with_config(config)?;
1259
1260        // Write enough data to trigger memtable flush
1261        for i in 0..50 {
1262            let key = Key::from_str(&format!("key_{:03}", i));
1263            let value = CipherBlob::new(vec![i as u8; 1500]); // > 1KB, stored in vLog
1264            lsm.put(key, value)?;
1265        }
1266
1267        // Verify all data is readable after flush
1268        for i in 0..50 {
1269            let key = Key::from_str(&format!("key_{:03}", i));
1270            let value = lsm.get(&key)?;
1271            assert!(value.is_some());
1272            let retrieved = value.expect("Value should exist");
1273            assert_eq!(retrieved.as_bytes()[0], i as u8);
1274        }
1275
1276        std::fs::remove_dir_all(&dir).ok();
1277        Ok(())
1278    }
1279
1280    #[test]
1281    fn test_lsm_tree_vlog_disabled() -> Result<()> {
1282        let dir = env::temp_dir().join("test_lsm_vlog_disabled");
1283        std::fs::create_dir_all(&dir).ok();
1284
1285        let config = LsmTreeConfig {
1286            data_dir: dir.clone(),
1287            value_log_config: None, // vLog disabled
1288            ..Default::default()
1289        };
1290
1291        let lsm = LsmTree::with_config(config)?;
1292
1293        // Large value should still work (stored inline)
1294        let key = Key::from_str("large_key");
1295        let value = CipherBlob::new(vec![42u8; 5000]);
1296        lsm.put(key.clone(), value.clone())?;
1297
1298        let retrieved = lsm.get(&key)?;
1299        assert!(retrieved.is_some());
1300        assert_eq!(
1301            retrieved
1302                .expect("Value should be retrievable after put")
1303                .as_bytes(),
1304            &vec![42u8; 5000]
1305        );
1306
1307        std::fs::remove_dir_all(&dir).ok();
1308        Ok(())
1309    }
1310
1311    #[test]
1312    fn test_lsm_tree_vlog_update() -> Result<()> {
1313        let dir = env::temp_dir().join("test_lsm_vlog_update");
1314        std::fs::create_dir_all(&dir).ok();
1315
1316        let config = LsmTreeConfig {
1317            data_dir: dir.clone(),
1318            wal_dir: dir.join("wal"),
1319            value_log_config: Some(ValueLogConfig {
1320                vlog_dir: dir.join("vlog"),
1321                max_file_size: 1024 * 1024,
1322                value_threshold: 1024,
1323                sync_on_write: false,
1324                gc_threshold: 0.5,
1325            }),
1326            ..Default::default()
1327        };
1328
1329        let lsm = LsmTree::with_config(config)?;
1330
1331        let key = Key::from_str("update_key");
1332
1333        // Write initial large value
1334        let value1 = CipherBlob::new(vec![1u8; 2048]);
1335        lsm.put(key.clone(), value1)?;
1336
1337        // Update with new large value
1338        let value2 = CipherBlob::new(vec![2u8; 2048]);
1339        lsm.put(key.clone(), value2)?;
1340
1341        // Verify latest value is retrieved
1342        let retrieved = lsm.get(&key)?;
1343        assert!(retrieved.is_some());
1344        assert_eq!(
1345            retrieved
1346                .expect("Value should be retrievable after put")
1347                .as_bytes()[0],
1348            2u8
1349        );
1350
1351        std::fs::remove_dir_all(&dir).ok();
1352        Ok(())
1353    }
1354
1355    #[test]
1356    fn test_lsm_tree_vlog_delete() -> Result<()> {
1357        let dir = env::temp_dir().join("test_lsm_vlog_delete");
1358        std::fs::create_dir_all(&dir).ok();
1359
1360        let config = LsmTreeConfig {
1361            data_dir: dir.clone(),
1362            wal_dir: dir.join("wal"),
1363            value_log_config: Some(ValueLogConfig {
1364                vlog_dir: dir.join("vlog"),
1365                max_file_size: 1024 * 1024,
1366                value_threshold: 1024,
1367                sync_on_write: false,
1368                gc_threshold: 0.5,
1369            }),
1370            ..Default::default()
1371        };
1372
1373        let lsm = LsmTree::with_config(config)?;
1374
1375        let key = Key::from_str("delete_key");
1376
1377        // Write large value
1378        let value = CipherBlob::new(vec![42u8; 2048]);
1379        lsm.put(key.clone(), value)?;
1380
1381        // Verify it exists
1382        assert!(lsm.get(&key)?.is_some());
1383
1384        // Delete it
1385        lsm.delete(key.clone())?;
1386
1387        // Verify it's gone
1388        assert!(lsm.get(&key)?.is_none());
1389
1390        std::fs::remove_dir_all(&dir).ok();
1391        Ok(())
1392    }
1393
1394    #[test]
1395    fn test_lsm_tree_value_pointer_encoding() -> Result<()> {
1396        // Test ValuePointer encoding/decoding
1397        let pointer = ValuePointer {
1398            file_id: 123,
1399            offset: 456789,
1400            length: 2048,
1401            checksum: 0xDEADBEEF,
1402        };
1403
1404        // Encode
1405        let encoded = LsmTree::encode_value_pointer(&pointer);
1406
1407        // Check it's marked as a pointer
1408        assert!(LsmTree::is_value_pointer(&encoded));
1409
1410        // Decode
1411        let decoded = LsmTree::decode_value_pointer(&encoded)?;
1412
1413        // Verify
1414        assert_eq!(decoded.file_id, 123);
1415        assert_eq!(decoded.offset, 456789);
1416        assert_eq!(decoded.length, 2048);
1417        assert_eq!(decoded.checksum, 0xDEADBEEF);
1418
1419        // Test non-pointer value
1420        let regular_value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
1421        assert!(!LsmTree::is_value_pointer(&regular_value));
1422
1423        Ok(())
1424    }
1425}