1use crate::error::{AmateRSError, ErrorContext, Result};
10use crate::storage::{
11 BlockCache, BlockCacheConfig, BlockCacheKey, CachedBlock, CompactionConfig, CompactionExecutor,
12 CompactionPlanner, Memtable, MemtableConfig, SSTableConfig, SSTableReader, SSTableWriter,
13 ValueLog, ValueLogConfig, ValuePointer, Wal,
14};
15use crate::types::{CipherBlob, Key};
16use parking_lot::RwLock;
17use std::collections::BTreeMap;
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20
21#[derive(Debug, Clone)]
23pub struct SSTableMetadata {
24 pub path: PathBuf,
26 pub min_key: Key,
28 pub max_key: Key,
30 pub num_entries: usize,
32 pub file_size: u64,
34 pub level: usize,
36}
37
38#[derive(Debug, Clone)]
40pub struct LevelInfo {
41 pub level: usize,
43 pub sstables: Vec<SSTableMetadata>,
45 pub total_size: u64,
47}
48
49impl LevelInfo {
50 fn new(level: usize) -> Self {
51 Self {
52 level,
53 sstables: Vec::new(),
54 total_size: 0,
55 }
56 }
57
58 fn add_sstable(&mut self, metadata: SSTableMetadata) {
59 self.total_size += metadata.file_size;
60 self.sstables.push(metadata);
61 }
62}
63
64#[derive(Debug, Clone)]
66pub struct LsmTreeConfig {
67 pub data_dir: PathBuf,
69 pub wal_dir: PathBuf,
71 pub memtable_config: MemtableConfig,
73 pub sstable_config: SSTableConfig,
75 pub block_cache_config: BlockCacheConfig,
77 pub compaction_config: CompactionConfig,
79 pub value_log_config: Option<ValueLogConfig>,
81 pub max_levels: usize,
83 pub l0_compaction_threshold: usize,
85 pub level_size_multiplier: usize,
87}
88
89impl Default for LsmTreeConfig {
90 fn default() -> Self {
91 Self {
92 data_dir: PathBuf::from("./data"),
93 wal_dir: PathBuf::from("./wal"),
94 memtable_config: MemtableConfig::default(),
95 sstable_config: SSTableConfig::default(),
96 block_cache_config: BlockCacheConfig::default(),
97 compaction_config: CompactionConfig::default(),
98 value_log_config: None, max_levels: 7,
100 l0_compaction_threshold: 4,
101 level_size_multiplier: 10,
102 }
103 }
104}
105
106pub struct LsmTree {
108 config: LsmTreeConfig,
110 memtable: Arc<Memtable>,
112 immutable_memtable: Arc<RwLock<Option<Arc<Memtable>>>>,
114 wal: Arc<RwLock<Wal>>,
116 value_log: Option<Arc<ValueLog>>,
118 levels: Arc<RwLock<Vec<LevelInfo>>>,
120 block_cache: Arc<BlockCache>,
122 next_sstable_id: Arc<RwLock<u64>>,
124 compaction_planner: CompactionPlanner,
126 compaction_executor: Arc<RwLock<CompactionExecutor>>,
128}
129
130impl LsmTree {
131 pub fn new<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
133 let config = LsmTreeConfig {
134 data_dir: data_dir.as_ref().to_path_buf(),
135 ..Default::default()
136 };
137 Self::with_config(config)
138 }
139
140 pub fn with_config(config: LsmTreeConfig) -> Result<Self> {
142 std::fs::create_dir_all(&config.data_dir).map_err(|e| {
144 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
145 "Failed to create data directory: {}",
146 e
147 )))
148 })?;
149
150 std::fs::create_dir_all(&config.wal_dir).map_err(|e| {
151 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
152 "Failed to create WAL directory: {}",
153 e
154 )))
155 })?;
156
157 let wal_path = config.wal_dir.join("wal.log");
159 let wal = Wal::create(wal_path)?;
160
161 let memtable = Memtable::with_config(config.memtable_config.clone());
163
164 let mut levels = Vec::with_capacity(config.max_levels);
166 for i in 0..config.max_levels {
167 levels.push(LevelInfo::new(i));
168 }
169
170 let block_cache = BlockCache::with_config(config.block_cache_config.clone());
172
173 let compaction_planner = CompactionPlanner::new(config.compaction_config.clone());
175 let compaction_executor = CompactionExecutor::new(config.sstable_config.clone());
176
177 let value_log = if let Some(ref vlog_config) = config.value_log_config {
179 Some(Arc::new(ValueLog::with_config(vlog_config.clone())?))
180 } else {
181 None
182 };
183
184 let mut lsm = Self {
185 config,
186 memtable: Arc::new(memtable),
187 immutable_memtable: Arc::new(RwLock::new(None)),
188 wal: Arc::new(RwLock::new(wal)),
189 value_log,
190 levels: Arc::new(RwLock::new(levels)),
191 block_cache: Arc::new(block_cache),
192 next_sstable_id: Arc::new(RwLock::new(0)),
193 compaction_planner,
194 compaction_executor: Arc::new(RwLock::new(compaction_executor)),
195 };
196
197 lsm.recover_sstables()?;
199
200 Ok(lsm)
201 }
202
203 fn recover_sstables(&mut self) -> Result<()> {
205 use std::fs;
206
207 let entries = fs::read_dir(&self.config.data_dir).map_err(|e| {
209 AmateRSError::IoError(ErrorContext::new(format!(
210 "Failed to read data directory: {}",
211 e
212 )))
213 })?;
214
215 let mut sstables_by_level: BTreeMap<usize, Vec<SSTableMetadata>> = BTreeMap::new();
216 let mut max_id = 0u64;
217
218 for entry in entries {
219 let entry = entry.map_err(|e| {
220 AmateRSError::IoError(ErrorContext::new(format!(
221 "Failed to read directory entry: {}",
222 e
223 )))
224 })?;
225
226 let path = entry.path();
227 let filename = match path.file_name().and_then(|n| n.to_str()) {
228 Some(name) => name,
229 None => continue,
230 };
231
232 if filename.starts_with('L') && filename.ends_with(".sst") {
234 let parts: Vec<&str> = filename[1..].trim_end_matches(".sst").split('_').collect();
235 if parts.len() == 2 {
236 if let (Ok(level), Ok(id)) =
237 (parts[0].parse::<usize>(), parts[1].parse::<u64>())
238 {
239 if id > max_id {
241 max_id = id;
242 }
243
244 let reader = SSTableReader::open(&path)?;
246 let (min_key, max_key, num_entries) = reader.metadata()?;
247
248 let file_size = std::fs::metadata(&path)
249 .map_err(|e| {
250 AmateRSError::IoError(ErrorContext::new(format!(
251 "Failed to get file size: {}",
252 e
253 )))
254 })?
255 .len();
256
257 let metadata = SSTableMetadata {
258 path: path.clone(),
259 min_key,
260 max_key,
261 num_entries,
262 file_size,
263 level,
264 };
265
266 sstables_by_level.entry(level).or_default().push(metadata);
267 }
268 }
269 }
270 }
271
272 let mut levels = self.levels.write();
274 for (level, mut sstables) in sstables_by_level {
275 if level < levels.len() {
276 if level > 0 {
278 sstables.sort_by(|a, b| a.min_key.cmp(&b.min_key));
279 }
280
281 for metadata in sstables {
282 levels[level].add_sstable(metadata);
283 }
284 }
285 }
286 drop(levels);
287
288 *self.next_sstable_id.write() = max_id + 1;
290
291 Ok(())
292 }
293
294 pub fn put(&self, key: Key, value: CipherBlob) -> Result<()> {
296 let stored_value = if let Some(ref vlog) = self.value_log {
298 if vlog.should_separate(&value) {
299 let pointer = vlog.append(key.clone(), value)?;
301 vlog.flush()?;
302
303 Self::encode_value_pointer(&pointer)
305 } else {
306 value
308 }
309 } else {
310 value
312 };
313
314 {
316 let mut wal = self.wal.write();
317 wal.put(key.clone(), stored_value.clone())?;
318 }
319
320 self.memtable.put(key, stored_value)?;
322
323 if self.memtable.should_flush() {
325 self.try_flush_memtable()?;
326 }
327
328 Ok(())
329 }
330
331 pub fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
333 if let Some(value) = self.memtable.get(key)? {
335 return self.resolve_value(value);
336 }
337
338 {
340 let immutable = self.immutable_memtable.read();
341 if let Some(ref memtable) = *immutable {
342 if let Some(value) = memtable.get(key)? {
343 return self.resolve_value(value);
344 }
345 }
346 }
347
348 let levels = self.levels.read();
350 for level_info in levels.iter() {
351 if let Some(value) = self.search_level(level_info, key)? {
352 return self.resolve_value(value);
353 }
354 }
355
356 Ok(None)
357 }
358
359 fn resolve_value(&self, value: CipherBlob) -> Result<Option<CipherBlob>> {
361 if value.as_bytes().is_empty() {
363 return Ok(None);
364 }
365
366 if Self::is_value_pointer(&value) {
367 if let Some(ref vlog) = self.value_log {
368 let pointer = Self::decode_value_pointer(&value)?;
369 let actual_value = vlog.read(&pointer)?;
370 Ok(Some(actual_value))
371 } else {
372 Err(AmateRSError::StorageIntegrity(ErrorContext::new(
373 "Found value pointer but vLog is not configured".to_string(),
374 )))
375 }
376 } else {
377 Ok(Some(value))
378 }
379 }
380
381 pub fn delete(&self, key: Key) -> Result<()> {
383 {
385 let mut wal = self.wal.write();
386 wal.delete(key.clone())?;
387 }
388
389 self.memtable.delete(key)?;
391
392 if self.memtable.should_flush() {
394 self.try_flush_memtable()?;
395 }
396
397 Ok(())
398 }
399
400 pub fn range(&self, start: &Key, end: &Key) -> Result<Vec<(Key, CipherBlob)>> {
402 let mut results = BTreeMap::new();
403
404 let levels = self.levels.read();
406 for level_info in levels.iter().rev() {
407 let level_results = self.range_scan_level(level_info, start, end)?;
408 for (k, v) in level_results {
409 results.entry(k).or_insert(v);
410 }
411 }
412
413 {
415 let immutable = self.immutable_memtable.read();
416 if let Some(ref memtable) = *immutable {
417 for (k, v) in memtable.range(start, end) {
418 results.insert(k, v);
419 }
420 }
421 }
422
423 for (k, v) in self.memtable.range(start, end) {
425 results.insert(k, v);
426 }
427
428 Ok(results.into_iter().collect())
429 }
430
431 fn search_level(&self, level_info: &LevelInfo, key: &Key) -> Result<Option<CipherBlob>> {
433 if level_info.level == 0 {
435 for metadata in level_info.sstables.iter().rev() {
437 if key >= &metadata.min_key && key <= &metadata.max_key {
438 if let Some(value) = self.read_from_sstable(&metadata.path, key)? {
439 return Ok(Some(value));
440 }
441 }
442 }
443 } else {
444 let idx = level_info.sstables.binary_search_by(|metadata| {
446 if key < &metadata.min_key {
447 std::cmp::Ordering::Greater
448 } else if key > &metadata.max_key {
449 std::cmp::Ordering::Less
450 } else {
451 std::cmp::Ordering::Equal
452 }
453 });
454
455 if let Ok(idx) = idx {
456 let metadata = &level_info.sstables[idx];
457 if let Some(value) = self.read_from_sstable(&metadata.path, key)? {
458 return Ok(Some(value));
459 }
460 }
461 }
462
463 Ok(None)
464 }
465
466 fn range_scan_level(
468 &self,
469 level_info: &LevelInfo,
470 start: &Key,
471 end: &Key,
472 ) -> Result<Vec<(Key, CipherBlob)>> {
473 let mut results = Vec::new();
474
475 for metadata in &level_info.sstables {
476 if &metadata.max_key < start || &metadata.min_key > end {
478 continue;
479 }
480
481 let reader = SSTableReader::open(&metadata.path)?;
483 let entries = reader.iter()?;
484
485 for (k, v) in entries {
486 if &k >= start && &k < end {
488 results.push((k, v));
489 }
490 }
491 }
492
493 Ok(results)
494 }
495
496 fn read_from_sstable(&self, path: &Path, key: &Key) -> Result<Option<CipherBlob>> {
498 let reader = SSTableReader::open(path)?;
499 reader.get(key)
500 }
501
502 fn try_flush_memtable(&self) -> Result<()> {
504 {
506 let immutable = self.immutable_memtable.read();
507 if immutable.is_some() {
508 return Ok(());
510 }
511 }
512
513 {
515 let mut immutable = self.immutable_memtable.write();
516 if immutable.is_some() {
517 return Ok(());
518 }
519
520 let old_memtable = Arc::clone(&self.memtable);
522 let new_memtable = Memtable::with_config(self.config.memtable_config.clone());
523
524 *immutable = Some(old_memtable);
527 }
528
529 self.flush_immutable_memtable()?;
531
532 Ok(())
533 }
534
535 fn flush_immutable_memtable(&self) -> Result<()> {
537 let memtable = {
538 let mut immutable = self.immutable_memtable.write();
539 immutable.take()
540 };
541
542 if let Some(memtable) = memtable {
543 let sstable_id = {
545 let mut next_id = self.next_sstable_id.write();
546 let id = *next_id;
547 *next_id += 1;
548 id
549 };
550
551 let sstable_path = self
552 .config
553 .data_dir
554 .join(format!("L0_{:08}.sst", sstable_id));
555
556 let mut writer = SSTableWriter::new(&sstable_path, self.config.sstable_config.clone())?;
558
559 let entries = memtable.entries();
560 let mut min_key = None;
561 let mut max_key = None;
562 let mut num_entries = 0;
563
564 for (key, value_opt) in entries {
565 let value = value_opt.unwrap_or_else(|| CipherBlob::new(Vec::new()));
568
569 if min_key.is_none() {
570 min_key = Some(key.clone());
571 }
572 max_key = Some(key.clone());
573 writer.add(key, value)?;
574 num_entries += 1;
575 }
576
577 writer.finish()?;
578
579 let file_size = std::fs::metadata(&sstable_path)
581 .map_err(|e| {
582 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
583 "Failed to get SSTable size: {}",
584 e
585 )))
586 })?
587 .len();
588
589 if let (Some(min_key), Some(max_key)) = (min_key, max_key) {
591 let metadata = SSTableMetadata {
592 path: sstable_path,
593 min_key,
594 max_key,
595 num_entries,
596 file_size,
597 level: 0,
598 };
599
600 let mut levels = self.levels.write();
601 levels[0].add_sstable(metadata);
602 }
603
604 self.trigger_compaction()?;
606 }
607
608 Ok(())
609 }
610
611 fn trigger_compaction(&self) -> Result<()> {
613 let levels = self.levels.read();
614
615 let l0_count = levels[0].sstables.len();
617 if self.compaction_planner.needs_l0_compaction(l0_count) {
618 drop(levels); return self.compact_l0_to_l1();
620 }
621
622 for level_info in levels.iter() {
624 if level_info.level > 0
625 && self
626 .compaction_planner
627 .needs_level_compaction(level_info.level, level_info.total_size)
628 {
629 let source_level = level_info.level;
630 drop(levels); return self.compact_level(source_level);
632 }
633 }
634
635 Ok(())
636 }
637
638 fn compact_l0_to_l1(&self) -> Result<()> {
640 let (source_sstables, target_sstables) = {
641 let levels = self.levels.read();
642 let source = levels[0].sstables.clone();
643 let target = if levels.len() > 1 {
644 levels[1].sstables.clone()
645 } else {
646 Vec::new()
647 };
648 (source, target)
649 };
650
651 if let Some(task) =
652 self.compaction_planner
653 .plan_compaction(0, source_sstables, target_sstables)
654 {
655 self.execute_compaction_task(task)?;
656 }
657
658 Ok(())
659 }
660
661 fn compact_level(&self, source_level: usize) -> Result<()> {
663 let (source_sstables, target_sstables) = {
664 let levels = self.levels.read();
665 if source_level >= levels.len() {
666 return Ok(());
667 }
668
669 let source = levels[source_level].sstables.clone();
670 let target = if source_level + 1 < levels.len() {
671 levels[source_level + 1].sstables.clone()
672 } else {
673 Vec::new()
674 };
675 (source, target)
676 };
677
678 if let Some(task) =
679 self.compaction_planner
680 .plan_compaction(source_level, source_sstables, target_sstables)
681 {
682 self.execute_compaction_task(task)?;
683 }
684
685 Ok(())
686 }
687
688 fn execute_compaction_task(&self, task: crate::storage::CompactionTask) -> Result<()> {
690 let output_sstables = {
692 let mut executor = self.compaction_executor.write();
693 let mut next_id = self.next_sstable_id.write();
694 executor.execute_compaction(task.clone(), &self.config.data_dir, &mut next_id)?
695 };
696
697 let mut levels = self.levels.write();
699
700 levels[task.source_level]
702 .sstables
703 .retain(|s| !task.source_sstables.iter().any(|ts| ts.path == s.path));
704 levels[task.source_level].total_size = levels[task.source_level]
705 .sstables
706 .iter()
707 .map(|s| s.file_size)
708 .sum();
709
710 if task.target_level < levels.len() {
712 levels[task.target_level]
713 .sstables
714 .retain(|s| !task.target_sstables.iter().any(|ts| ts.path == s.path));
715 levels[task.target_level].total_size = levels[task.target_level]
716 .sstables
717 .iter()
718 .map(|s| s.file_size)
719 .sum();
720
721 for sstable in output_sstables {
723 levels[task.target_level].add_sstable(sstable);
724 }
725 }
726
727 drop(levels);
728
729 for sstable in task.source_sstables.iter().chain(&task.target_sstables) {
731 std::fs::remove_file(&sstable.path).ok();
732 }
733
734 Ok(())
735 }
736
737 pub fn level_info(&self, level: usize) -> Option<LevelInfo> {
739 let levels = self.levels.read();
740 if level < levels.len() {
741 Some(levels[level].clone())
742 } else {
743 None
744 }
745 }
746
747 pub fn all_levels_info(&self) -> Vec<LevelInfo> {
749 self.levels.read().clone()
750 }
751
752 pub fn stats(&self) -> LsmTreeStats {
754 let levels = self.levels.read();
755 let cache_stats = self.block_cache.stats();
756 let compaction_stats = self.compaction_executor.read().stats().clone();
757
758 LsmTreeStats {
759 memtable_size: self.memtable.size_bytes(),
760 num_levels: levels.len(),
761 levels: levels.clone(),
762 cache_hit_rate: cache_stats.hit_rate(),
763 cache_size: cache_stats.size_bytes,
764 compaction_stats,
765 }
766 }
767
768 pub fn keys(&self) -> Result<Vec<Key>> {
770 let mut key_set = std::collections::BTreeSet::new();
771
772 for (key, value_opt) in self.memtable.entries() {
774 if value_opt.is_some() {
775 key_set.insert(key);
776 }
777 }
778
779 {
781 let immutable = self.immutable_memtable.read();
782 if let Some(ref memtable) = *immutable {
783 for (key, value_opt) in memtable.entries() {
784 if value_opt.is_some() {
785 key_set.insert(key);
786 }
787 }
788 }
789 }
790
791 let levels = self.levels.read();
793 for level_info in levels.iter() {
794 for metadata in &level_info.sstables {
795 let reader = SSTableReader::open(&metadata.path)?;
796 let entries = reader.iter()?;
797 for (key, _) in entries {
798 key_set.insert(key);
799 }
800 }
801 }
802
803 Ok(key_set.into_iter().collect())
804 }
805
806 pub fn flush(&self) -> Result<()> {
808 if self.memtable.size_bytes() > 0 {
810 self.try_flush_memtable()?;
811 }
812
813 self.flush_immutable_memtable()?;
815
816 {
818 let mut wal = self.wal.write();
819 wal.flush()?;
820 }
821
822 if let Some(ref vlog) = self.value_log {
824 vlog.flush()?;
825 }
826
827 Ok(())
828 }
829
830 pub fn close(&self) -> Result<()> {
832 self.flush()?;
835 Ok(())
836 }
837
838 fn encode_value_pointer(pointer: &ValuePointer) -> CipherBlob {
842 const MAGIC: &[u8] = b"VPTR"; let pointer_bytes = pointer.encode();
844
845 let mut bytes = Vec::with_capacity(MAGIC.len() + pointer_bytes.len());
846 bytes.extend_from_slice(MAGIC);
847 bytes.extend_from_slice(&pointer_bytes);
848
849 CipherBlob::new(bytes)
850 }
851
852 fn decode_value_pointer(blob: &CipherBlob) -> Result<ValuePointer> {
854 const MAGIC: &[u8] = b"VPTR";
855 let bytes = blob.as_bytes();
856
857 if bytes.len() < MAGIC.len() {
858 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
859 "Invalid value pointer: too short".to_string(),
860 )));
861 }
862
863 if &bytes[..MAGIC.len()] != MAGIC {
864 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
865 "Invalid value pointer: bad magic".to_string(),
866 )));
867 }
868
869 ValuePointer::decode(&bytes[MAGIC.len()..])
870 }
871
872 fn is_value_pointer(blob: &CipherBlob) -> bool {
874 const MAGIC: &[u8] = b"VPTR";
875 let bytes = blob.as_bytes();
876 bytes.len() >= MAGIC.len() && &bytes[..MAGIC.len()] == MAGIC
877 }
878}
879
880#[derive(Debug, Clone)]
882pub struct LsmTreeStats {
883 pub memtable_size: usize,
885 pub num_levels: usize,
887 pub levels: Vec<LevelInfo>,
889 pub cache_hit_rate: f64,
891 pub cache_size: usize,
893 pub compaction_stats: crate::storage::CompactionStats,
895}
896
897#[cfg(test)]
898mod tests {
899 use super::*;
900 use std::env;
901
902 #[test]
903 fn test_lsm_tree_basic_operations() -> Result<()> {
904 let dir = env::temp_dir().join("test_lsm_basic");
905 std::fs::create_dir_all(&dir).ok();
906
907 let lsm = LsmTree::new(&dir)?;
908
909 let key = Key::from_str("test_key");
911 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
912 lsm.put(key.clone(), value.clone())?;
913
914 let retrieved = lsm.get(&key)?;
916 assert!(retrieved.is_some());
917 assert_eq!(
918 retrieved
919 .expect("Value should be retrievable after put")
920 .as_bytes(),
921 &[1, 2, 3, 4, 5]
922 );
923
924 lsm.delete(key.clone())?;
926
927 let retrieved = lsm.get(&key)?;
929 assert!(retrieved.is_none());
930
931 std::fs::remove_dir_all(&dir).ok();
932 Ok(())
933 }
934
935 #[test]
936 fn test_lsm_tree_multiple_keys() -> Result<()> {
937 let dir = env::temp_dir().join("test_lsm_multiple");
938 std::fs::create_dir_all(&dir).ok();
939
940 let lsm = LsmTree::new(&dir)?;
941
942 for i in 0..10 {
944 let key = Key::from_str(&format!("key_{:03}", i));
945 let value = CipherBlob::new(vec![i as u8; 100]);
946 lsm.put(key, value)?;
947 }
948
949 for i in 0..10 {
951 let key = Key::from_str(&format!("key_{:03}", i));
952 let value = lsm.get(&key)?;
953 assert!(value.is_some());
954 assert_eq!(value.expect("Value should exist").as_bytes()[0], i as u8);
955 }
956
957 std::fs::remove_dir_all(&dir).ok();
958 Ok(())
959 }
960
961 #[test]
962 fn test_lsm_tree_range_scan() -> Result<()> {
963 let dir = env::temp_dir().join("test_lsm_range");
964 std::fs::create_dir_all(&dir).ok();
965
966 let lsm = LsmTree::new(&dir)?;
967
968 for i in 0..20 {
970 let key = Key::from_str(&format!("key_{:03}", i));
971 let value = CipherBlob::new(vec![i as u8; 50]);
972 lsm.put(key, value)?;
973 }
974
975 let start = Key::from_str("key_005");
977 let end = Key::from_str("key_015");
978 let results = lsm.range(&start, &end)?;
979
980 assert!(results.len() >= 10);
981
982 std::fs::remove_dir_all(&dir).ok();
983 Ok(())
984 }
985
986 #[test]
987 fn test_lsm_tree_stats() -> Result<()> {
988 let dir = env::temp_dir().join("test_lsm_stats");
989 std::fs::create_dir_all(&dir).ok();
990
991 let lsm = LsmTree::new(&dir)?;
992
993 for i in 0..5 {
995 let key = Key::from_str(&format!("key_{}", i));
996 let value = CipherBlob::new(vec![i as u8; 100]);
997 lsm.put(key, value)?;
998 }
999
1000 let stats = lsm.stats();
1001 assert!(stats.memtable_size > 0);
1002 assert_eq!(stats.num_levels, 7); std::fs::remove_dir_all(&dir).ok();
1005 Ok(())
1006 }
1007
1008 #[test]
1009 fn test_lsm_tree_compaction_trigger() -> Result<()> {
1010 let dir = env::temp_dir().join("test_lsm_compaction");
1011 std::fs::create_dir_all(&dir).ok();
1012
1013 let mut config = LsmTreeConfig {
1014 data_dir: dir.clone(),
1015 ..Default::default()
1016 };
1017 config.memtable_config.max_size_bytes = 1024; config.l0_compaction_threshold = 2; let lsm = LsmTree::with_config(config)?;
1021
1022 for i in 0..100 {
1024 let key = Key::from_str(&format!("key_{:04}", i));
1025 let value = CipherBlob::new(vec![i as u8; 200]);
1026 lsm.put(key, value)?;
1027 }
1028
1029 let stats = lsm.stats();
1031 assert!(
1032 stats.compaction_stats.total_compactions > 0 || !stats.levels[0].sstables.is_empty()
1033 );
1034
1035 std::fs::remove_dir_all(&dir).ok();
1036 Ok(())
1037 }
1038
1039 #[test]
1040 fn test_lsm_tree_compaction_stats() -> Result<()> {
1041 let dir = env::temp_dir().join("test_lsm_compaction_stats");
1042 std::fs::create_dir_all(&dir).ok();
1043
1044 let mut config = LsmTreeConfig {
1045 data_dir: dir.clone(),
1046 ..Default::default()
1047 };
1048 config.memtable_config.max_size_bytes = 512;
1049
1050 let lsm = LsmTree::with_config(config)?;
1051
1052 for i in 0..50 {
1054 let key = Key::from_str(&format!("key_{:04}", i));
1055 let value = CipherBlob::new(vec![i as u8; 100]);
1056 lsm.put(key, value)?;
1057 }
1058
1059 let stats = lsm.stats();
1060 let _ = stats.compaction_stats.keys_processed;
1063 let _ = stats.compaction_stats.tombstones_removed;
1064
1065 std::fs::remove_dir_all(&dir).ok();
1066 Ok(())
1067 }
1068
1069 #[test]
1070 fn test_lsm_tree_level_organization() -> Result<()> {
1071 let dir = env::temp_dir().join("test_lsm_levels");
1072 std::fs::create_dir_all(&dir).ok();
1073
1074 let mut config = LsmTreeConfig {
1075 data_dir: dir.clone(),
1076 ..Default::default()
1077 };
1078 config.memtable_config.max_size_bytes = 1024;
1079
1080 let lsm = LsmTree::with_config(config)?;
1081
1082 for i in 0..200 {
1084 let key = Key::from_str(&format!("key_{:05}", i));
1085 let value = CipherBlob::new(vec![i as u8; 150]);
1086 lsm.put(key, value)?;
1087 }
1088
1089 for i in 0..200 {
1091 let key = Key::from_str(&format!("key_{:05}", i));
1092 let value = lsm.get(&key)?;
1093 assert!(value.is_some());
1094 }
1095
1096 let stats = lsm.stats();
1098 let total_sstables: usize = stats.levels.iter().map(|l| l.sstables.len()).sum();
1099 assert!(total_sstables > 0 || stats.memtable_size > 0);
1100
1101 std::fs::remove_dir_all(&dir).ok();
1102 Ok(())
1103 }
1104
1105 #[test]
1106 fn test_lsm_tree_bloom_filter_negative_lookups() -> Result<()> {
1107 let dir = env::temp_dir().join("test_lsm_bloom");
1108 std::fs::create_dir_all(&dir).ok();
1109
1110 let mut config = LsmTreeConfig {
1111 data_dir: dir.clone(),
1112 ..Default::default()
1113 };
1114 config.memtable_config.max_size_bytes = 512; let lsm = LsmTree::with_config(config)?;
1117
1118 for i in 0..100 {
1120 let key = Key::from_str(&format!("exists_{:04}", i));
1121 let value = CipherBlob::new(vec![i as u8; 100]);
1122 lsm.put(key, value)?;
1123 }
1124
1125 for i in 0..100 {
1127 let key = Key::from_str(&format!("exists_{:04}", i));
1128 let result = lsm.get(&key)?;
1129 assert!(result.is_some());
1130 }
1131
1132 for i in 0..100 {
1134 let key = Key::from_str(&format!("notexists_{:04}", i));
1135 let result = lsm.get(&key)?;
1136 assert!(result.is_none());
1137 }
1138
1139 std::fs::remove_dir_all(&dir).ok();
1140 Ok(())
1141 }
1142
1143 #[test]
1146 fn test_lsm_tree_vlog_basic() -> Result<()> {
1147 let dir = env::temp_dir().join("test_lsm_vlog_basic");
1148 std::fs::create_dir_all(&dir).ok();
1149
1150 let config = LsmTreeConfig {
1151 data_dir: dir.clone(),
1152 wal_dir: dir.join("wal"),
1153 value_log_config: Some(ValueLogConfig {
1154 vlog_dir: dir.join("vlog"),
1155 max_file_size: 1024 * 1024, value_threshold: 1024, sync_on_write: false,
1158 gc_threshold: 0.5,
1159 }),
1160 ..Default::default()
1161 };
1162
1163 let lsm = LsmTree::with_config(config)?;
1164
1165 let small_key = Key::from_str("small_key");
1167 let small_value = CipherBlob::new(vec![1u8; 512]);
1168 lsm.put(small_key.clone(), small_value.clone())?;
1169
1170 let large_key = Key::from_str("large_key");
1172 let large_value = CipherBlob::new(vec![2u8; 2048]);
1173 lsm.put(large_key.clone(), large_value.clone())?;
1174
1175 let retrieved_small = lsm.get(&small_key)?;
1177 assert!(retrieved_small.is_some());
1178 assert_eq!(
1179 retrieved_small
1180 .expect("Small value should be retrievable")
1181 .as_bytes(),
1182 &vec![1u8; 512]
1183 );
1184
1185 let retrieved_large = lsm.get(&large_key)?;
1186 assert!(retrieved_large.is_some());
1187 assert_eq!(
1188 retrieved_large
1189 .expect("Large value should be retrievable")
1190 .as_bytes(),
1191 &vec![2u8; 2048]
1192 );
1193
1194 std::fs::remove_dir_all(&dir).ok();
1195 Ok(())
1196 }
1197
1198 #[test]
1199 fn test_lsm_tree_vlog_multiple_large_values() -> Result<()> {
1200 let dir = env::temp_dir().join("test_lsm_vlog_multiple");
1201 std::fs::create_dir_all(&dir).ok();
1202
1203 let config = LsmTreeConfig {
1204 data_dir: dir.clone(),
1205 wal_dir: dir.join("wal"),
1206 value_log_config: Some(ValueLogConfig {
1207 vlog_dir: dir.join("vlog"),
1208 max_file_size: 1024 * 1024,
1209 value_threshold: 1024,
1210 sync_on_write: false,
1211 gc_threshold: 0.5,
1212 }),
1213 ..Default::default()
1214 };
1215
1216 let lsm = LsmTree::with_config(config)?;
1217
1218 for i in 0..20 {
1220 let key = Key::from_str(&format!("large_key_{:02}", i));
1221 let value = CipherBlob::new(vec![i as u8; 2048]);
1222 lsm.put(key, value)?;
1223 }
1224
1225 for i in 0..20 {
1227 let key = Key::from_str(&format!("large_key_{:02}", i));
1228 let value = lsm.get(&key)?;
1229 assert!(value.is_some());
1230 let retrieved = value.expect("Value should exist");
1231 assert_eq!(retrieved.as_bytes()[0], i as u8);
1232 assert_eq!(retrieved.as_bytes().len(), 2048);
1233 }
1234
1235 std::fs::remove_dir_all(&dir).ok();
1236 Ok(())
1237 }
1238
1239 #[test]
1240 fn test_lsm_tree_vlog_with_flush() -> Result<()> {
1241 let dir = env::temp_dir().join("test_lsm_vlog_flush");
1242 std::fs::create_dir_all(&dir).ok();
1243
1244 let mut config = LsmTreeConfig {
1245 data_dir: dir.clone(),
1246 wal_dir: dir.join("wal"),
1247 value_log_config: Some(ValueLogConfig {
1248 vlog_dir: dir.join("vlog"),
1249 max_file_size: 1024 * 1024,
1250 value_threshold: 1024,
1251 sync_on_write: false,
1252 gc_threshold: 0.5,
1253 }),
1254 ..Default::default()
1255 };
1256 config.memtable_config.max_size_bytes = 4096; let lsm = LsmTree::with_config(config)?;
1259
1260 for i in 0..50 {
1262 let key = Key::from_str(&format!("key_{:03}", i));
1263 let value = CipherBlob::new(vec![i as u8; 1500]); lsm.put(key, value)?;
1265 }
1266
1267 for i in 0..50 {
1269 let key = Key::from_str(&format!("key_{:03}", i));
1270 let value = lsm.get(&key)?;
1271 assert!(value.is_some());
1272 let retrieved = value.expect("Value should exist");
1273 assert_eq!(retrieved.as_bytes()[0], i as u8);
1274 }
1275
1276 std::fs::remove_dir_all(&dir).ok();
1277 Ok(())
1278 }
1279
1280 #[test]
1281 fn test_lsm_tree_vlog_disabled() -> Result<()> {
1282 let dir = env::temp_dir().join("test_lsm_vlog_disabled");
1283 std::fs::create_dir_all(&dir).ok();
1284
1285 let config = LsmTreeConfig {
1286 data_dir: dir.clone(),
1287 value_log_config: None, ..Default::default()
1289 };
1290
1291 let lsm = LsmTree::with_config(config)?;
1292
1293 let key = Key::from_str("large_key");
1295 let value = CipherBlob::new(vec![42u8; 5000]);
1296 lsm.put(key.clone(), value.clone())?;
1297
1298 let retrieved = lsm.get(&key)?;
1299 assert!(retrieved.is_some());
1300 assert_eq!(
1301 retrieved
1302 .expect("Value should be retrievable after put")
1303 .as_bytes(),
1304 &vec![42u8; 5000]
1305 );
1306
1307 std::fs::remove_dir_all(&dir).ok();
1308 Ok(())
1309 }
1310
1311 #[test]
1312 fn test_lsm_tree_vlog_update() -> Result<()> {
1313 let dir = env::temp_dir().join("test_lsm_vlog_update");
1314 std::fs::create_dir_all(&dir).ok();
1315
1316 let config = LsmTreeConfig {
1317 data_dir: dir.clone(),
1318 wal_dir: dir.join("wal"),
1319 value_log_config: Some(ValueLogConfig {
1320 vlog_dir: dir.join("vlog"),
1321 max_file_size: 1024 * 1024,
1322 value_threshold: 1024,
1323 sync_on_write: false,
1324 gc_threshold: 0.5,
1325 }),
1326 ..Default::default()
1327 };
1328
1329 let lsm = LsmTree::with_config(config)?;
1330
1331 let key = Key::from_str("update_key");
1332
1333 let value1 = CipherBlob::new(vec![1u8; 2048]);
1335 lsm.put(key.clone(), value1)?;
1336
1337 let value2 = CipherBlob::new(vec![2u8; 2048]);
1339 lsm.put(key.clone(), value2)?;
1340
1341 let retrieved = lsm.get(&key)?;
1343 assert!(retrieved.is_some());
1344 assert_eq!(
1345 retrieved
1346 .expect("Value should be retrievable after put")
1347 .as_bytes()[0],
1348 2u8
1349 );
1350
1351 std::fs::remove_dir_all(&dir).ok();
1352 Ok(())
1353 }
1354
1355 #[test]
1356 fn test_lsm_tree_vlog_delete() -> Result<()> {
1357 let dir = env::temp_dir().join("test_lsm_vlog_delete");
1358 std::fs::create_dir_all(&dir).ok();
1359
1360 let config = LsmTreeConfig {
1361 data_dir: dir.clone(),
1362 wal_dir: dir.join("wal"),
1363 value_log_config: Some(ValueLogConfig {
1364 vlog_dir: dir.join("vlog"),
1365 max_file_size: 1024 * 1024,
1366 value_threshold: 1024,
1367 sync_on_write: false,
1368 gc_threshold: 0.5,
1369 }),
1370 ..Default::default()
1371 };
1372
1373 let lsm = LsmTree::with_config(config)?;
1374
1375 let key = Key::from_str("delete_key");
1376
1377 let value = CipherBlob::new(vec![42u8; 2048]);
1379 lsm.put(key.clone(), value)?;
1380
1381 assert!(lsm.get(&key)?.is_some());
1383
1384 lsm.delete(key.clone())?;
1386
1387 assert!(lsm.get(&key)?.is_none());
1389
1390 std::fs::remove_dir_all(&dir).ok();
1391 Ok(())
1392 }
1393
1394 #[test]
1395 fn test_lsm_tree_value_pointer_encoding() -> Result<()> {
1396 let pointer = ValuePointer {
1398 file_id: 123,
1399 offset: 456789,
1400 length: 2048,
1401 checksum: 0xDEADBEEF,
1402 };
1403
1404 let encoded = LsmTree::encode_value_pointer(&pointer);
1406
1407 assert!(LsmTree::is_value_pointer(&encoded));
1409
1410 let decoded = LsmTree::decode_value_pointer(&encoded)?;
1412
1413 assert_eq!(decoded.file_id, 123);
1415 assert_eq!(decoded.offset, 456789);
1416 assert_eq!(decoded.length, 2048);
1417 assert_eq!(decoded.checksum, 0xDEADBEEF);
1418
1419 let regular_value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
1421 assert!(!LsmTree::is_value_pointer(®ular_value));
1422
1423 Ok(())
1424 }
1425}