1use crate::error::{AmateRSError, ErrorContext, Result};
10use crate::storage::{
11 BlockCache, BlockCacheConfig, BlockCacheKey, CachedBlock, CompactionConfig, CompactionExecutor,
12 CompactionPlanner, Memtable, MemtableConfig, SSTableConfig, SSTableReader, SSTableWriter,
13 ValueLog, ValueLogConfig, ValuePointer, Wal,
14};
15use crate::types::{CipherBlob, Key};
16use parking_lot::RwLock;
17use std::collections::BTreeMap;
18use std::path::{Path, PathBuf};
19use std::sync::Arc;
20
21#[derive(Debug, Clone)]
23pub struct SSTableMetadata {
24 pub path: PathBuf,
26 pub min_key: Key,
28 pub max_key: Key,
30 pub num_entries: usize,
32 pub file_size: u64,
34 pub level: usize,
36}
37
38#[derive(Debug, Clone)]
40pub struct LevelInfo {
41 pub level: usize,
43 pub sstables: Vec<SSTableMetadata>,
45 pub total_size: u64,
47}
48
49impl LevelInfo {
50 fn new(level: usize) -> Self {
51 Self {
52 level,
53 sstables: Vec::new(),
54 total_size: 0,
55 }
56 }
57
58 fn add_sstable(&mut self, metadata: SSTableMetadata) {
59 self.total_size += metadata.file_size;
60 self.sstables.push(metadata);
61 }
62}
63
64#[derive(Debug, Clone)]
66pub struct LsmTreeConfig {
67 pub data_dir: PathBuf,
69 pub wal_dir: PathBuf,
71 pub memtable_config: MemtableConfig,
73 pub sstable_config: SSTableConfig,
75 pub block_cache_config: BlockCacheConfig,
77 pub compaction_config: CompactionConfig,
79 pub value_log_config: Option<ValueLogConfig>,
81 pub max_levels: usize,
83 pub l0_compaction_threshold: usize,
85 pub level_size_multiplier: usize,
87}
88
89impl Default for LsmTreeConfig {
90 fn default() -> Self {
91 Self {
92 data_dir: PathBuf::from("./data"),
93 wal_dir: PathBuf::from("./wal"),
94 memtable_config: MemtableConfig::default(),
95 sstable_config: SSTableConfig::default(),
96 block_cache_config: BlockCacheConfig::default(),
97 compaction_config: CompactionConfig::default(),
98 value_log_config: None, max_levels: 7,
100 l0_compaction_threshold: 4,
101 level_size_multiplier: 10,
102 }
103 }
104}
105
106#[derive(Debug, Clone)]
108pub struct PrefetchConfig {
109 pub read_ahead_blocks: usize,
111 pub use_madvise: bool,
114}
115
116impl Default for PrefetchConfig {
117 fn default() -> Self {
118 Self {
119 read_ahead_blocks: 4,
120 use_madvise: false,
121 }
122 }
123}
124
125pub struct LsmTree {
127 config: LsmTreeConfig,
129 memtable: Arc<Memtable>,
131 immutable_memtable: Arc<RwLock<Option<Arc<Memtable>>>>,
133 wal: Arc<RwLock<Wal>>,
135 value_log: Option<Arc<ValueLog>>,
137 levels: Arc<RwLock<Vec<LevelInfo>>>,
139 block_cache: Arc<BlockCache>,
141 next_sstable_id: Arc<RwLock<u64>>,
143 compaction_planner: CompactionPlanner,
145 compaction_executor: Arc<RwLock<CompactionExecutor>>,
147 prefetch_config: PrefetchConfig,
149}
150
151impl LsmTree {
152 pub fn new<P: AsRef<Path>>(data_dir: P) -> Result<Self> {
154 let config = LsmTreeConfig {
155 data_dir: data_dir.as_ref().to_path_buf(),
156 ..Default::default()
157 };
158 Self::with_config(config)
159 }
160
161 pub fn with_config(config: LsmTreeConfig) -> Result<Self> {
163 std::fs::create_dir_all(&config.data_dir).map_err(|e| {
165 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
166 "Failed to create data directory: {}",
167 e
168 )))
169 })?;
170
171 std::fs::create_dir_all(&config.wal_dir).map_err(|e| {
172 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
173 "Failed to create WAL directory: {}",
174 e
175 )))
176 })?;
177
178 let wal_path = config.wal_dir.join("wal.log");
180 let wal = Wal::create(wal_path)?;
181
182 let memtable = Memtable::with_config(config.memtable_config.clone());
184
185 let mut levels = Vec::with_capacity(config.max_levels);
187 for i in 0..config.max_levels {
188 levels.push(LevelInfo::new(i));
189 }
190
191 let block_cache = BlockCache::with_config(config.block_cache_config.clone());
193
194 let compaction_planner = CompactionPlanner::new(config.compaction_config.clone());
196 let compaction_executor = CompactionExecutor::new(config.sstable_config.clone());
197
198 let value_log = if let Some(ref vlog_config) = config.value_log_config {
200 Some(Arc::new(ValueLog::with_config(vlog_config.clone())?))
201 } else {
202 None
203 };
204
205 let mut lsm = Self {
206 config,
207 memtable: Arc::new(memtable),
208 immutable_memtable: Arc::new(RwLock::new(None)),
209 wal: Arc::new(RwLock::new(wal)),
210 value_log,
211 levels: Arc::new(RwLock::new(levels)),
212 block_cache: Arc::new(block_cache),
213 next_sstable_id: Arc::new(RwLock::new(0)),
214 compaction_planner,
215 compaction_executor: Arc::new(RwLock::new(compaction_executor)),
216 prefetch_config: PrefetchConfig::default(),
217 };
218
219 lsm.recover_sstables()?;
221
222 Ok(lsm)
223 }
224
225 fn recover_sstables(&mut self) -> Result<()> {
227 use std::fs;
228
229 let entries = fs::read_dir(&self.config.data_dir).map_err(|e| {
231 AmateRSError::IoError(ErrorContext::new(format!(
232 "Failed to read data directory: {}",
233 e
234 )))
235 })?;
236
237 let mut sstables_by_level: BTreeMap<usize, Vec<SSTableMetadata>> = BTreeMap::new();
238 let mut max_id = 0u64;
239
240 for entry in entries {
241 let entry = entry.map_err(|e| {
242 AmateRSError::IoError(ErrorContext::new(format!(
243 "Failed to read directory entry: {}",
244 e
245 )))
246 })?;
247
248 let path = entry.path();
249 let filename = match path.file_name().and_then(|n| n.to_str()) {
250 Some(name) => name,
251 None => continue,
252 };
253
254 if filename.starts_with('L') && filename.ends_with(".sst") {
256 let parts: Vec<&str> = filename[1..].trim_end_matches(".sst").split('_').collect();
257 if parts.len() == 2 {
258 if let (Ok(level), Ok(id)) =
259 (parts[0].parse::<usize>(), parts[1].parse::<u64>())
260 {
261 if id > max_id {
263 max_id = id;
264 }
265
266 let reader = SSTableReader::open(&path)?;
268 let (min_key, max_key, num_entries) = reader.metadata()?;
269
270 let file_size = std::fs::metadata(&path)
271 .map_err(|e| {
272 AmateRSError::IoError(ErrorContext::new(format!(
273 "Failed to get file size: {}",
274 e
275 )))
276 })?
277 .len();
278
279 let metadata = SSTableMetadata {
280 path: path.clone(),
281 min_key,
282 max_key,
283 num_entries,
284 file_size,
285 level,
286 };
287
288 sstables_by_level.entry(level).or_default().push(metadata);
289 }
290 }
291 }
292 }
293
294 let mut levels = self.levels.write();
296 for (level, mut sstables) in sstables_by_level {
297 if level < levels.len() {
298 if level > 0 {
300 sstables.sort_by(|a, b| a.min_key.cmp(&b.min_key));
301 }
302
303 for metadata in sstables {
304 levels[level].add_sstable(metadata);
305 }
306 }
307 }
308 drop(levels);
309
310 *self.next_sstable_id.write() = max_id + 1;
312
313 Ok(())
314 }
315
316 pub fn put(&self, key: Key, value: CipherBlob) -> Result<()> {
318 let stored_value = if let Some(ref vlog) = self.value_log {
320 if vlog.should_separate(&value) {
321 let pointer = vlog.append(key.clone(), value)?;
323 vlog.flush()?;
324
325 Self::encode_value_pointer(&pointer)
327 } else {
328 value
330 }
331 } else {
332 value
334 };
335
336 {
338 let mut wal = self.wal.write();
339 wal.put(key.clone(), stored_value.clone())?;
340 }
341
342 self.memtable.put(key, stored_value)?;
344
345 if self.memtable.should_flush() {
347 self.try_flush_memtable()?;
348 }
349
350 Ok(())
351 }
352
353 pub fn get(&self, key: &Key) -> Result<Option<CipherBlob>> {
355 if let Some(value) = self.memtable.get(key)? {
357 return self.resolve_value(value);
358 }
359
360 {
362 let immutable = self.immutable_memtable.read();
363 if let Some(ref memtable) = *immutable {
364 if let Some(value) = memtable.get(key)? {
365 return self.resolve_value(value);
366 }
367 }
368 }
369
370 let levels = self.levels.read();
372 for level_info in levels.iter() {
373 if let Some(value) = self.search_level(level_info, key)? {
374 return self.resolve_value(value);
375 }
376 }
377
378 Ok(None)
379 }
380
381 fn resolve_value(&self, value: CipherBlob) -> Result<Option<CipherBlob>> {
383 if value.as_bytes().is_empty() {
385 return Ok(None);
386 }
387
388 if Self::is_value_pointer(&value) {
389 if let Some(ref vlog) = self.value_log {
390 let pointer = Self::decode_value_pointer(&value)?;
391 let actual_value = vlog.read(&pointer)?;
392 Ok(Some(actual_value))
393 } else {
394 Err(AmateRSError::StorageIntegrity(ErrorContext::new(
395 "Found value pointer but vLog is not configured".to_string(),
396 )))
397 }
398 } else {
399 Ok(Some(value))
400 }
401 }
402
403 pub fn delete(&self, key: Key) -> Result<()> {
405 {
407 let mut wal = self.wal.write();
408 wal.delete(key.clone())?;
409 }
410
411 self.memtable.delete(key)?;
413
414 if self.memtable.should_flush() {
416 self.try_flush_memtable()?;
417 }
418
419 Ok(())
420 }
421
422 pub fn range(&self, start: &Key, end: &Key) -> Result<Vec<(Key, CipherBlob)>> {
424 let mut results = BTreeMap::new();
425
426 let levels = self.levels.read();
428 for level_info in levels.iter().rev() {
429 let level_results = self.range_scan_level(level_info, start, end)?;
430 for (k, v) in level_results {
431 results.entry(k).or_insert(v);
432 }
433 }
434
435 {
437 let immutable = self.immutable_memtable.read();
438 if let Some(ref memtable) = *immutable {
439 for (k, v) in memtable.range(start, end) {
440 results.insert(k, v);
441 }
442 }
443 }
444
445 for (k, v) in self.memtable.range(start, end) {
447 results.insert(k, v);
448 }
449
450 Ok(results.into_iter().collect())
451 }
452
453 fn search_level(&self, level_info: &LevelInfo, key: &Key) -> Result<Option<CipherBlob>> {
455 if level_info.level == 0 {
457 for metadata in level_info.sstables.iter().rev() {
459 if key >= &metadata.min_key && key <= &metadata.max_key {
460 if let Some(value) = self.read_from_sstable(&metadata.path, key)? {
461 return Ok(Some(value));
462 }
463 }
464 }
465 } else {
466 let idx = level_info.sstables.binary_search_by(|metadata| {
468 if key < &metadata.min_key {
469 std::cmp::Ordering::Greater
470 } else if key > &metadata.max_key {
471 std::cmp::Ordering::Less
472 } else {
473 std::cmp::Ordering::Equal
474 }
475 });
476
477 if let Ok(idx) = idx {
478 let metadata = &level_info.sstables[idx];
479 if let Some(value) = self.read_from_sstable(&metadata.path, key)? {
480 return Ok(Some(value));
481 }
482 }
483 }
484
485 Ok(None)
486 }
487
488 fn range_scan_level(
490 &self,
491 level_info: &LevelInfo,
492 start: &Key,
493 end: &Key,
494 ) -> Result<Vec<(Key, CipherBlob)>> {
495 let mut results = Vec::new();
496
497 for metadata in &level_info.sstables {
498 if &metadata.max_key < start || &metadata.min_key > end {
500 continue;
501 }
502
503 let reader = SSTableReader::open(&metadata.path)?;
505 let entries = reader.iter()?;
506
507 for (k, v) in entries {
508 if &k >= start && &k < end {
510 results.push((k, v));
511 }
512 }
513 }
514
515 Ok(results)
516 }
517
518 fn read_from_sstable(&self, path: &Path, key: &Key) -> Result<Option<CipherBlob>> {
520 let reader = SSTableReader::open(path)?;
521 reader.get(key)
522 }
523
524 fn try_flush_memtable(&self) -> Result<()> {
526 {
528 let immutable = self.immutable_memtable.read();
529 if immutable.is_some() {
530 return Ok(());
532 }
533 }
534
535 {
537 let mut immutable = self.immutable_memtable.write();
538 if immutable.is_some() {
539 return Ok(());
540 }
541
542 let old_memtable = Arc::clone(&self.memtable);
544 let new_memtable = Memtable::with_config(self.config.memtable_config.clone());
545
546 *immutable = Some(old_memtable);
549 }
550
551 self.flush_immutable_memtable()?;
553
554 Ok(())
555 }
556
557 fn flush_immutable_memtable(&self) -> Result<()> {
559 let _span = tracing::debug_span!("amaters.storage.flush").entered();
560 let memtable = {
561 let mut immutable = self.immutable_memtable.write();
562 immutable.take()
563 };
564
565 if let Some(memtable) = memtable {
566 let sstable_id = {
568 let mut next_id = self.next_sstable_id.write();
569 let id = *next_id;
570 *next_id += 1;
571 id
572 };
573
574 let sstable_path = self
575 .config
576 .data_dir
577 .join(format!("L0_{:08}.sst", sstable_id));
578
579 let mut writer = SSTableWriter::new(&sstable_path, self.config.sstable_config.clone())?;
581
582 let entries = memtable.entries();
583 let mut min_key = None;
584 let mut max_key = None;
585 let mut num_entries = 0;
586
587 for (key, value_opt) in entries {
588 let value = value_opt.unwrap_or_else(|| CipherBlob::new(Vec::new()));
591
592 if min_key.is_none() {
593 min_key = Some(key.clone());
594 }
595 max_key = Some(key.clone());
596 writer.add(key, value)?;
597 num_entries += 1;
598 }
599
600 writer.finish()?;
601
602 let file_size = std::fs::metadata(&sstable_path)
604 .map_err(|e| {
605 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
606 "Failed to get SSTable size: {}",
607 e
608 )))
609 })?
610 .len();
611
612 if let (Some(min_key), Some(max_key)) = (min_key, max_key) {
614 let metadata = SSTableMetadata {
615 path: sstable_path,
616 min_key,
617 max_key,
618 num_entries,
619 file_size,
620 level: 0,
621 };
622
623 let mut levels = self.levels.write();
624 levels[0].add_sstable(metadata);
625 }
626
627 self.trigger_compaction()?;
629 }
630
631 Ok(())
632 }
633
634 fn trigger_compaction(&self) -> Result<()> {
636 let levels = self.levels.read();
637
638 let l0_count = levels[0].sstables.len();
640 if self.compaction_planner.needs_l0_compaction(l0_count) {
641 drop(levels); return self.compact_l0_to_l1();
643 }
644
645 for level_info in levels.iter() {
647 if level_info.level > 0
648 && self
649 .compaction_planner
650 .needs_level_compaction(level_info.level, level_info.total_size)
651 {
652 let source_level = level_info.level;
653 drop(levels); return self.compact_level(source_level);
655 }
656 }
657
658 Ok(())
659 }
660
661 fn compact_l0_to_l1(&self) -> Result<()> {
663 let (source_sstables, target_sstables) = {
664 let levels = self.levels.read();
665 let source = levels[0].sstables.clone();
666 let target = if levels.len() > 1 {
667 levels[1].sstables.clone()
668 } else {
669 Vec::new()
670 };
671 (source, target)
672 };
673
674 if let Some(task) =
675 self.compaction_planner
676 .plan_compaction(0, source_sstables, target_sstables)
677 {
678 self.execute_compaction_task(task)?;
679 }
680
681 Ok(())
682 }
683
684 fn compact_level(&self, source_level: usize) -> Result<()> {
686 let (source_sstables, target_sstables) = {
687 let levels = self.levels.read();
688 if source_level >= levels.len() {
689 return Ok(());
690 }
691
692 let source = levels[source_level].sstables.clone();
693 let target = if source_level + 1 < levels.len() {
694 levels[source_level + 1].sstables.clone()
695 } else {
696 Vec::new()
697 };
698 (source, target)
699 };
700
701 if let Some(task) =
702 self.compaction_planner
703 .plan_compaction(source_level, source_sstables, target_sstables)
704 {
705 self.execute_compaction_task(task)?;
706 }
707
708 Ok(())
709 }
710
711 fn execute_compaction_task(&self, task: crate::storage::CompactionTask) -> Result<()> {
713 let output_sstables = {
715 let mut executor = self.compaction_executor.write();
716 let mut next_id = self.next_sstable_id.write();
717 executor.execute_compaction(task.clone(), &self.config.data_dir, &mut next_id)?
718 };
719
720 let mut levels = self.levels.write();
722
723 levels[task.source_level]
725 .sstables
726 .retain(|s| !task.source_sstables.iter().any(|ts| ts.path == s.path));
727 levels[task.source_level].total_size = levels[task.source_level]
728 .sstables
729 .iter()
730 .map(|s| s.file_size)
731 .sum();
732
733 if task.target_level < levels.len() {
735 levels[task.target_level]
736 .sstables
737 .retain(|s| !task.target_sstables.iter().any(|ts| ts.path == s.path));
738 levels[task.target_level].total_size = levels[task.target_level]
739 .sstables
740 .iter()
741 .map(|s| s.file_size)
742 .sum();
743
744 for sstable in output_sstables {
746 levels[task.target_level].add_sstable(sstable);
747 }
748 }
749
750 drop(levels);
751
752 for sstable in task.source_sstables.iter().chain(&task.target_sstables) {
754 std::fs::remove_file(&sstable.path).ok();
755 }
756
757 Ok(())
758 }
759
760 pub fn level_info(&self, level: usize) -> Option<LevelInfo> {
762 let levels = self.levels.read();
763 if level < levels.len() {
764 Some(levels[level].clone())
765 } else {
766 None
767 }
768 }
769
770 pub fn all_levels_info(&self) -> Vec<LevelInfo> {
772 self.levels.read().clone()
773 }
774
775 pub fn with_prefetch(mut self, config: PrefetchConfig) -> Self {
780 self.prefetch_config = config;
781 self
782 }
783
784 pub fn stats(&self) -> LsmTreeStats {
786 let levels = self.levels.read();
787 let cache_stats = self.block_cache.stats();
788 let compaction_stats = self.compaction_executor.read().stats_snapshot();
789
790 LsmTreeStats {
791 memtable_size: self.memtable.size_bytes(),
792 num_levels: levels.len(),
793 levels: levels.clone(),
794 cache_hit_rate: cache_stats.hit_rate(),
795 cache_size: cache_stats.size_bytes,
796 compaction_stats,
797 }
798 }
799
800 pub fn keys(&self) -> Result<Vec<Key>> {
802 let mut key_set = std::collections::BTreeSet::new();
803
804 for (key, value_opt) in self.memtable.entries() {
806 if value_opt.is_some() {
807 key_set.insert(key);
808 }
809 }
810
811 {
813 let immutable = self.immutable_memtable.read();
814 if let Some(ref memtable) = *immutable {
815 for (key, value_opt) in memtable.entries() {
816 if value_opt.is_some() {
817 key_set.insert(key);
818 }
819 }
820 }
821 }
822
823 let levels = self.levels.read();
825 for level_info in levels.iter() {
826 for metadata in &level_info.sstables {
827 let reader = SSTableReader::open(&metadata.path)?;
828 let entries = reader.iter()?;
829 for (key, _) in entries {
830 key_set.insert(key);
831 }
832 }
833 }
834
835 Ok(key_set.into_iter().collect())
836 }
837
838 pub fn flush(&self) -> Result<()> {
840 let _span = tracing::debug_span!("amaters.storage.flush_explicit").entered();
841 if self.memtable.size_bytes() > 0 {
843 self.try_flush_memtable()?;
844 }
845
846 self.flush_immutable_memtable()?;
848
849 {
851 let mut wal = self.wal.write();
852 wal.flush()?;
853 }
854
855 if let Some(ref vlog) = self.value_log {
857 vlog.flush()?;
858 }
859
860 Ok(())
861 }
862
863 pub fn close(&self) -> Result<()> {
865 self.flush()?;
868 Ok(())
869 }
870
871 fn encode_value_pointer(pointer: &ValuePointer) -> CipherBlob {
875 const MAGIC: &[u8] = b"VPTR"; let pointer_bytes = pointer.encode();
877
878 let mut bytes = Vec::with_capacity(MAGIC.len() + pointer_bytes.len());
879 bytes.extend_from_slice(MAGIC);
880 bytes.extend_from_slice(&pointer_bytes);
881
882 CipherBlob::new(bytes)
883 }
884
885 fn decode_value_pointer(blob: &CipherBlob) -> Result<ValuePointer> {
887 const MAGIC: &[u8] = b"VPTR";
888 let bytes = blob.as_bytes();
889
890 if bytes.len() < MAGIC.len() {
891 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
892 "Invalid value pointer: too short".to_string(),
893 )));
894 }
895
896 if &bytes[..MAGIC.len()] != MAGIC {
897 return Err(AmateRSError::StorageIntegrity(ErrorContext::new(
898 "Invalid value pointer: bad magic".to_string(),
899 )));
900 }
901
902 ValuePointer::decode(&bytes[MAGIC.len()..])
903 }
904
905 fn is_value_pointer(blob: &CipherBlob) -> bool {
907 const MAGIC: &[u8] = b"VPTR";
908 let bytes = blob.as_bytes();
909 bytes.len() >= MAGIC.len() && &bytes[..MAGIC.len()] == MAGIC
910 }
911}
912
913#[derive(Debug, Clone)]
915pub struct LsmTreeStats {
916 pub memtable_size: usize,
918 pub num_levels: usize,
920 pub levels: Vec<LevelInfo>,
922 pub cache_hit_rate: f64,
924 pub cache_size: usize,
926 pub compaction_stats: crate::storage::CompactionStatsSnapshot,
928}
929
930#[cfg(test)]
931mod tests {
932 use super::*;
933 use std::env;
934
935 #[test]
936 fn test_lsm_tree_basic_operations() -> Result<()> {
937 let dir = env::temp_dir().join("test_lsm_basic");
938 std::fs::create_dir_all(&dir).ok();
939
940 let lsm = LsmTree::new(&dir)?;
941
942 let key = Key::from_str("test_key");
944 let value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
945 lsm.put(key.clone(), value.clone())?;
946
947 let retrieved = lsm.get(&key)?;
949 assert!(retrieved.is_some());
950 assert_eq!(
951 retrieved
952 .expect("Value should be retrievable after put")
953 .as_bytes(),
954 &[1, 2, 3, 4, 5]
955 );
956
957 lsm.delete(key.clone())?;
959
960 let retrieved = lsm.get(&key)?;
962 assert!(retrieved.is_none());
963
964 std::fs::remove_dir_all(&dir).ok();
965 Ok(())
966 }
967
968 #[test]
969 fn test_lsm_tree_multiple_keys() -> Result<()> {
970 let dir = env::temp_dir().join("test_lsm_multiple");
971 std::fs::create_dir_all(&dir).ok();
972
973 let lsm = LsmTree::new(&dir)?;
974
975 for i in 0..10 {
977 let key = Key::from_str(&format!("key_{:03}", i));
978 let value = CipherBlob::new(vec![i as u8; 100]);
979 lsm.put(key, value)?;
980 }
981
982 for i in 0..10 {
984 let key = Key::from_str(&format!("key_{:03}", i));
985 let value = lsm.get(&key)?;
986 assert!(value.is_some());
987 assert_eq!(value.expect("Value should exist").as_bytes()[0], i as u8);
988 }
989
990 std::fs::remove_dir_all(&dir).ok();
991 Ok(())
992 }
993
994 #[test]
995 fn test_lsm_tree_range_scan() -> Result<()> {
996 let dir = env::temp_dir().join("test_lsm_range");
997 std::fs::create_dir_all(&dir).ok();
998
999 let lsm = LsmTree::new(&dir)?;
1000
1001 for i in 0..20 {
1003 let key = Key::from_str(&format!("key_{:03}", i));
1004 let value = CipherBlob::new(vec![i as u8; 50]);
1005 lsm.put(key, value)?;
1006 }
1007
1008 let start = Key::from_str("key_005");
1010 let end = Key::from_str("key_015");
1011 let results = lsm.range(&start, &end)?;
1012
1013 assert!(results.len() >= 10);
1014
1015 std::fs::remove_dir_all(&dir).ok();
1016 Ok(())
1017 }
1018
1019 #[test]
1020 fn test_lsm_tree_stats() -> Result<()> {
1021 let dir = env::temp_dir().join("test_lsm_stats");
1022 std::fs::create_dir_all(&dir).ok();
1023
1024 let lsm = LsmTree::new(&dir)?;
1025
1026 for i in 0..5 {
1028 let key = Key::from_str(&format!("key_{}", i));
1029 let value = CipherBlob::new(vec![i as u8; 100]);
1030 lsm.put(key, value)?;
1031 }
1032
1033 let stats = lsm.stats();
1034 assert!(stats.memtable_size > 0);
1035 assert_eq!(stats.num_levels, 7); std::fs::remove_dir_all(&dir).ok();
1038 Ok(())
1039 }
1040
1041 #[test]
1042 fn test_lsm_tree_compaction_trigger() -> Result<()> {
1043 let dir = env::temp_dir().join("test_lsm_compaction");
1044 std::fs::create_dir_all(&dir).ok();
1045
1046 let mut config = LsmTreeConfig {
1047 data_dir: dir.clone(),
1048 ..Default::default()
1049 };
1050 config.memtable_config.max_size_bytes = 1024; config.l0_compaction_threshold = 2; let lsm = LsmTree::with_config(config)?;
1054
1055 for i in 0..100 {
1057 let key = Key::from_str(&format!("key_{:04}", i));
1058 let value = CipherBlob::new(vec![i as u8; 200]);
1059 lsm.put(key, value)?;
1060 }
1061
1062 let stats = lsm.stats();
1064 assert!(
1065 stats.compaction_stats.compactions_completed > 0
1066 || !stats.levels[0].sstables.is_empty()
1067 );
1068
1069 std::fs::remove_dir_all(&dir).ok();
1070 Ok(())
1071 }
1072
1073 #[test]
1074 fn test_lsm_tree_compaction_stats() -> Result<()> {
1075 let dir = env::temp_dir().join("test_lsm_compaction_stats");
1076 std::fs::create_dir_all(&dir).ok();
1077
1078 let mut config = LsmTreeConfig {
1079 data_dir: dir.clone(),
1080 ..Default::default()
1081 };
1082 config.memtable_config.max_size_bytes = 512;
1083
1084 let lsm = LsmTree::with_config(config)?;
1085
1086 for i in 0..50 {
1088 let key = Key::from_str(&format!("key_{:04}", i));
1089 let value = CipherBlob::new(vec![i as u8; 100]);
1090 lsm.put(key, value)?;
1091 }
1092
1093 let stats = lsm.stats();
1094 let _ = stats.compaction_stats.keys_processed;
1097 let _ = stats.compaction_stats.tombstones_removed;
1098
1099 std::fs::remove_dir_all(&dir).ok();
1100 Ok(())
1101 }
1102
1103 #[test]
1104 fn test_lsm_tree_level_organization() -> Result<()> {
1105 let dir = env::temp_dir().join("test_lsm_levels");
1106 std::fs::create_dir_all(&dir).ok();
1107
1108 let mut config = LsmTreeConfig {
1109 data_dir: dir.clone(),
1110 ..Default::default()
1111 };
1112 config.memtable_config.max_size_bytes = 1024;
1113
1114 let lsm = LsmTree::with_config(config)?;
1115
1116 for i in 0..200 {
1118 let key = Key::from_str(&format!("key_{:05}", i));
1119 let value = CipherBlob::new(vec![i as u8; 150]);
1120 lsm.put(key, value)?;
1121 }
1122
1123 for i in 0..200 {
1125 let key = Key::from_str(&format!("key_{:05}", i));
1126 let value = lsm.get(&key)?;
1127 assert!(value.is_some());
1128 }
1129
1130 let stats = lsm.stats();
1132 let total_sstables: usize = stats.levels.iter().map(|l| l.sstables.len()).sum();
1133 assert!(total_sstables > 0 || stats.memtable_size > 0);
1134
1135 std::fs::remove_dir_all(&dir).ok();
1136 Ok(())
1137 }
1138
1139 #[test]
1140 fn test_lsm_tree_bloom_filter_negative_lookups() -> Result<()> {
1141 let dir = env::temp_dir().join("test_lsm_bloom");
1142 std::fs::create_dir_all(&dir).ok();
1143
1144 let mut config = LsmTreeConfig {
1145 data_dir: dir.clone(),
1146 ..Default::default()
1147 };
1148 config.memtable_config.max_size_bytes = 512; let lsm = LsmTree::with_config(config)?;
1151
1152 for i in 0..100 {
1154 let key = Key::from_str(&format!("exists_{:04}", i));
1155 let value = CipherBlob::new(vec![i as u8; 100]);
1156 lsm.put(key, value)?;
1157 }
1158
1159 for i in 0..100 {
1161 let key = Key::from_str(&format!("exists_{:04}", i));
1162 let result = lsm.get(&key)?;
1163 assert!(result.is_some());
1164 }
1165
1166 for i in 0..100 {
1168 let key = Key::from_str(&format!("notexists_{:04}", i));
1169 let result = lsm.get(&key)?;
1170 assert!(result.is_none());
1171 }
1172
1173 std::fs::remove_dir_all(&dir).ok();
1174 Ok(())
1175 }
1176
1177 #[test]
1180 fn test_lsm_tree_vlog_basic() -> Result<()> {
1181 let dir = env::temp_dir().join("test_lsm_vlog_basic");
1182 std::fs::create_dir_all(&dir).ok();
1183
1184 let config = LsmTreeConfig {
1185 data_dir: dir.clone(),
1186 wal_dir: dir.join("wal"),
1187 value_log_config: Some(ValueLogConfig {
1188 vlog_dir: dir.join("vlog"),
1189 max_file_size: 1024 * 1024, value_threshold: 1024, sync_on_write: false,
1192 gc_threshold: 0.5,
1193 }),
1194 ..Default::default()
1195 };
1196
1197 let lsm = LsmTree::with_config(config)?;
1198
1199 let small_key = Key::from_str("small_key");
1201 let small_value = CipherBlob::new(vec![1u8; 512]);
1202 lsm.put(small_key.clone(), small_value.clone())?;
1203
1204 let large_key = Key::from_str("large_key");
1206 let large_value = CipherBlob::new(vec![2u8; 2048]);
1207 lsm.put(large_key.clone(), large_value.clone())?;
1208
1209 let retrieved_small = lsm.get(&small_key)?;
1211 assert!(retrieved_small.is_some());
1212 assert_eq!(
1213 retrieved_small
1214 .expect("Small value should be retrievable")
1215 .as_bytes(),
1216 &vec![1u8; 512]
1217 );
1218
1219 let retrieved_large = lsm.get(&large_key)?;
1220 assert!(retrieved_large.is_some());
1221 assert_eq!(
1222 retrieved_large
1223 .expect("Large value should be retrievable")
1224 .as_bytes(),
1225 &vec![2u8; 2048]
1226 );
1227
1228 std::fs::remove_dir_all(&dir).ok();
1229 Ok(())
1230 }
1231
1232 #[test]
1233 fn test_lsm_tree_vlog_multiple_large_values() -> Result<()> {
1234 let dir = env::temp_dir().join("test_lsm_vlog_multiple");
1235 std::fs::create_dir_all(&dir).ok();
1236
1237 let config = LsmTreeConfig {
1238 data_dir: dir.clone(),
1239 wal_dir: dir.join("wal"),
1240 value_log_config: Some(ValueLogConfig {
1241 vlog_dir: dir.join("vlog"),
1242 max_file_size: 1024 * 1024,
1243 value_threshold: 1024,
1244 sync_on_write: false,
1245 gc_threshold: 0.5,
1246 }),
1247 ..Default::default()
1248 };
1249
1250 let lsm = LsmTree::with_config(config)?;
1251
1252 for i in 0..20 {
1254 let key = Key::from_str(&format!("large_key_{:02}", i));
1255 let value = CipherBlob::new(vec![i as u8; 2048]);
1256 lsm.put(key, value)?;
1257 }
1258
1259 for i in 0..20 {
1261 let key = Key::from_str(&format!("large_key_{:02}", i));
1262 let value = lsm.get(&key)?;
1263 assert!(value.is_some());
1264 let retrieved = value.expect("Value should exist");
1265 assert_eq!(retrieved.as_bytes()[0], i as u8);
1266 assert_eq!(retrieved.as_bytes().len(), 2048);
1267 }
1268
1269 std::fs::remove_dir_all(&dir).ok();
1270 Ok(())
1271 }
1272
1273 #[test]
1274 fn test_lsm_tree_vlog_with_flush() -> Result<()> {
1275 let dir = env::temp_dir().join("test_lsm_vlog_flush");
1276 std::fs::create_dir_all(&dir).ok();
1277
1278 let mut config = LsmTreeConfig {
1279 data_dir: dir.clone(),
1280 wal_dir: dir.join("wal"),
1281 value_log_config: Some(ValueLogConfig {
1282 vlog_dir: dir.join("vlog"),
1283 max_file_size: 1024 * 1024,
1284 value_threshold: 1024,
1285 sync_on_write: false,
1286 gc_threshold: 0.5,
1287 }),
1288 ..Default::default()
1289 };
1290 config.memtable_config.max_size_bytes = 4096; let lsm = LsmTree::with_config(config)?;
1293
1294 for i in 0..50 {
1296 let key = Key::from_str(&format!("key_{:03}", i));
1297 let value = CipherBlob::new(vec![i as u8; 1500]); lsm.put(key, value)?;
1299 }
1300
1301 for i in 0..50 {
1303 let key = Key::from_str(&format!("key_{:03}", i));
1304 let value = lsm.get(&key)?;
1305 assert!(value.is_some());
1306 let retrieved = value.expect("Value should exist");
1307 assert_eq!(retrieved.as_bytes()[0], i as u8);
1308 }
1309
1310 std::fs::remove_dir_all(&dir).ok();
1311 Ok(())
1312 }
1313
1314 #[test]
1315 fn test_lsm_tree_vlog_disabled() -> Result<()> {
1316 let dir = env::temp_dir().join("test_lsm_vlog_disabled");
1317 std::fs::create_dir_all(&dir).ok();
1318
1319 let config = LsmTreeConfig {
1320 data_dir: dir.clone(),
1321 value_log_config: None, ..Default::default()
1323 };
1324
1325 let lsm = LsmTree::with_config(config)?;
1326
1327 let key = Key::from_str("large_key");
1329 let value = CipherBlob::new(vec![42u8; 5000]);
1330 lsm.put(key.clone(), value.clone())?;
1331
1332 let retrieved = lsm.get(&key)?;
1333 assert!(retrieved.is_some());
1334 assert_eq!(
1335 retrieved
1336 .expect("Value should be retrievable after put")
1337 .as_bytes(),
1338 &vec![42u8; 5000]
1339 );
1340
1341 std::fs::remove_dir_all(&dir).ok();
1342 Ok(())
1343 }
1344
1345 #[test]
1346 fn test_lsm_tree_vlog_update() -> Result<()> {
1347 let dir = env::temp_dir().join("test_lsm_vlog_update");
1348 std::fs::create_dir_all(&dir).ok();
1349
1350 let config = LsmTreeConfig {
1351 data_dir: dir.clone(),
1352 wal_dir: dir.join("wal"),
1353 value_log_config: Some(ValueLogConfig {
1354 vlog_dir: dir.join("vlog"),
1355 max_file_size: 1024 * 1024,
1356 value_threshold: 1024,
1357 sync_on_write: false,
1358 gc_threshold: 0.5,
1359 }),
1360 ..Default::default()
1361 };
1362
1363 let lsm = LsmTree::with_config(config)?;
1364
1365 let key = Key::from_str("update_key");
1366
1367 let value1 = CipherBlob::new(vec![1u8; 2048]);
1369 lsm.put(key.clone(), value1)?;
1370
1371 let value2 = CipherBlob::new(vec![2u8; 2048]);
1373 lsm.put(key.clone(), value2)?;
1374
1375 let retrieved = lsm.get(&key)?;
1377 assert!(retrieved.is_some());
1378 assert_eq!(
1379 retrieved
1380 .expect("Value should be retrievable after put")
1381 .as_bytes()[0],
1382 2u8
1383 );
1384
1385 std::fs::remove_dir_all(&dir).ok();
1386 Ok(())
1387 }
1388
1389 #[test]
1390 fn test_lsm_tree_vlog_delete() -> Result<()> {
1391 let dir = env::temp_dir().join("test_lsm_vlog_delete");
1392 std::fs::create_dir_all(&dir).ok();
1393
1394 let config = LsmTreeConfig {
1395 data_dir: dir.clone(),
1396 wal_dir: dir.join("wal"),
1397 value_log_config: Some(ValueLogConfig {
1398 vlog_dir: dir.join("vlog"),
1399 max_file_size: 1024 * 1024,
1400 value_threshold: 1024,
1401 sync_on_write: false,
1402 gc_threshold: 0.5,
1403 }),
1404 ..Default::default()
1405 };
1406
1407 let lsm = LsmTree::with_config(config)?;
1408
1409 let key = Key::from_str("delete_key");
1410
1411 let value = CipherBlob::new(vec![42u8; 2048]);
1413 lsm.put(key.clone(), value)?;
1414
1415 assert!(lsm.get(&key)?.is_some());
1417
1418 lsm.delete(key.clone())?;
1420
1421 assert!(lsm.get(&key)?.is_none());
1423
1424 std::fs::remove_dir_all(&dir).ok();
1425 Ok(())
1426 }
1427
1428 #[test]
1429 fn test_lsm_tree_value_pointer_encoding() -> Result<()> {
1430 let pointer = ValuePointer {
1432 file_id: 123,
1433 offset: 456789,
1434 length: 2048,
1435 checksum: 0xDEADBEEF,
1436 };
1437
1438 let encoded = LsmTree::encode_value_pointer(&pointer);
1440
1441 assert!(LsmTree::is_value_pointer(&encoded));
1443
1444 let decoded = LsmTree::decode_value_pointer(&encoded)?;
1446
1447 assert_eq!(decoded.file_id, 123);
1449 assert_eq!(decoded.offset, 456789);
1450 assert_eq!(decoded.length, 2048);
1451 assert_eq!(decoded.checksum, 0xDEADBEEF);
1452
1453 let regular_value = CipherBlob::new(vec![1, 2, 3, 4, 5]);
1455 assert!(!LsmTree::is_value_pointer(®ular_value));
1456
1457 Ok(())
1458 }
1459
1460 #[test]
1461 fn test_prefetch_config_default() {
1462 let cfg = PrefetchConfig::default();
1463 assert_eq!(cfg.read_ahead_blocks, 4);
1464 assert!(!cfg.use_madvise);
1465 }
1466
1467 #[test]
1468 fn test_lsm_tree_with_prefetch_config() {
1469 let dir = env::temp_dir().join("test_lsm_prefetch_config");
1470 std::fs::create_dir_all(&dir).ok();
1471
1472 let lsm = LsmTree::new(&dir)
1473 .expect("LsmTree::new failed")
1474 .with_prefetch(PrefetchConfig {
1475 read_ahead_blocks: 8,
1476 use_madvise: false,
1477 });
1478
1479 assert_eq!(lsm.prefetch_config.read_ahead_blocks, 8);
1481 assert!(!lsm.prefetch_config.use_madvise);
1482
1483 std::fs::remove_dir_all(&dir).ok();
1484 }
1485}