1mod atomic_file;
194mod checksum;
195mod checksum_handle;
196mod session_lock;
197mod snapshot;
198mod sstable;
199mod compaction;
200mod merkle;
201mod monoidal;
202mod io_backend; use std::path::{Path, PathBuf};
205use std::collections::BTreeMap;
206use std::sync::{Arc, RwLock};
207use serde::{Serialize, Deserialize};
208use sstable::{SsTableWriter, SsTableHandle, RunNumber};
209use compaction::Compactor;
210use atomic_file::fsync_directory;
211use session_lock::SessionLock;
212use io_backend::IoBackend;
213
214pub use merkle::{IncrementalMerkleTree, MerkleProof, MerkleRoot, MerkleLeaf, Direction, Hash, MerkleDiff, MerkleSnapshot};
216pub use monoidal::{Monoidal, MonoidalLsmTree, MonoidalSnapshot};
217pub use snapshot::{PersistentSnapshot, SnapshotMetadata, SnapshotRun};
218
219pub type Result<T> = std::result::Result<T, Error>;
220
221#[derive(Debug, thiserror::Error)]
222pub enum Error {
223 #[error("IO error: {0}")]
224 Io(#[from] std::io::Error),
225
226 #[error("Serialization error: {0}")]
227 Serialization(String),
228
229 #[error("Corruption detected: {0}")]
230 Corruption(String),
231
232 #[error("Invalid operation: {0}")]
233 InvalidOperation(String),
234
235 #[error("Session locked: {0}")]
236 SessionLocked(String),
237
238 #[error("Bincode error: {0}")]
239 Bincode(#[from] bincode::Error),
240}
241
242#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
245pub struct Key(Vec<u8>);
246
247impl Key {
248 pub fn from(bytes: impl AsRef<[u8]>) -> Self {
249 Key(bytes.as_ref().to_vec())
250 }
251
252 #[allow(clippy::should_implement_trait)]
253 pub fn as_ref(&self) -> &[u8] {
254 &self.0
255 }
256}
257
258impl AsRef<[u8]> for Key {
259 fn as_ref(&self) -> &[u8] {
260 &self.0
261 }
262}
263
264#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
265pub struct Value(Vec<u8>);
266
267impl Value {
268 pub fn from(bytes: impl AsRef<[u8]>) -> Self {
269 Value(bytes.as_ref().to_vec())
270 }
271}
272
273impl AsRef<[u8]> for Value {
274 fn as_ref(&self) -> &[u8] {
275 &self.0
276 }
277}
278
279#[derive(Clone, Debug, Serialize, Deserialize)]
282pub struct LsmConfig {
283 pub memtable_size: usize,
285 pub max_immutable_memtables: usize,
286 pub block_cache_size: usize,
287
288 pub compaction_strategy: CompactionStrategy,
290 pub compaction_threads: usize,
291 pub level0_compaction_trigger: usize,
292
293 pub bloom_filter_bits_per_key: usize,
295 pub bloom_filter_fp_rate: f64,
296
297 pub max_snapshots_per_wallet: usize,
299 pub snapshot_interval: std::time::Duration,
300
301 pub sstable_size: usize,
303 pub sstable_block_size: usize,
304 #[serde(skip)] pub io_backend: IoBackend,
310}
311
312impl Default for LsmConfig {
313 fn default() -> Self {
314 Self {
315 memtable_size: 64 * 1024 * 1024,
316 max_immutable_memtables: 2,
317 block_cache_size: 256 * 1024 * 1024,
318
319 compaction_strategy: CompactionStrategy::Hybrid {
320 l0_strategy: Box::new(CompactionStrategy::Tiered {
321 size_ratio: 4.0,
322 min_merge_width: 4,
323 max_merge_width: 10,
324 }),
325 ln_strategy: Box::new(CompactionStrategy::Leveled {
326 size_ratio: 10.0,
327 max_level: 7,
328 }),
329 transition_level: 2,
330 },
331 compaction_threads: 2,
332 level0_compaction_trigger: 4,
333
334 bloom_filter_bits_per_key: 10,
335 bloom_filter_fp_rate: 0.01,
336
337 max_snapshots_per_wallet: 10,
338 snapshot_interval: std::time::Duration::from_secs(600),
339
340 sstable_size: 64 * 1024 * 1024,
341 sstable_block_size: 4096,
342
343 io_backend: IoBackend::default(), }
345 }
346}
347
348pub use compaction::CompactionStrategy;
350
351struct MemTable {
355 data: BTreeMap<Key, Option<Value>>, size_bytes: usize,
357 sequence_number: u64,
358}
359
360impl MemTable {
361 fn new(sequence_number: u64) -> Self {
362 Self {
363 data: BTreeMap::new(),
364 size_bytes: 0,
365 sequence_number,
366 }
367 }
368
369 fn insert(&mut self, key: Key, value: Value) {
370 let key_size = key.0.len();
371 let value_size = value.0.len();
372
373 if let Some(old_value) = self.data.get(&key) {
375 if let Some(v) = old_value {
376 self.size_bytes -= v.0.len();
377 }
378 } else {
379 self.size_bytes += key_size;
380 }
381
382 self.size_bytes += value_size;
383 self.data.insert(key, Some(value));
384 }
385
386 fn delete(&mut self, key: Key) {
387 let key_size = key.0.len();
388
389 if let Some(old_value) = self.data.get(&key) {
390 if let Some(v) = old_value {
391 self.size_bytes -= v.0.len();
392 }
393 } else {
394 self.size_bytes += key_size;
395 }
396
397 self.data.insert(key, None);
399 }
400
401 fn get(&self, key: &Key) -> Option<&Option<Value>> {
402 self.data.get(key)
403 }
404
405 fn size_bytes(&self) -> usize {
406 self.size_bytes
407 }
408
409 fn is_empty(&self) -> bool {
410 self.data.is_empty()
411 }
412
413 fn iter(&self) -> impl Iterator<Item = (&Key, &Option<Value>)> {
414 self.data.iter()
415 }
416
417 fn range<'a>(&'a self, from: &Key, to: &Key) -> impl Iterator<Item = (&'a Key, &'a Option<Value>)> + 'a {
418 if from > to {
419 return Box::new(std::iter::empty()) as Box<dyn Iterator<Item = (&'a Key, &'a Option<Value>)> + 'a>;
421 }
422 Box::new(self.data.range(from..=to))
423 }
424}
425
426pub struct LsmTree {
429 path: PathBuf,
430 active_dir: PathBuf, #[allow(dead_code)]
432 snapshots_dir: PathBuf, config: LsmConfig,
434
435 _session_lock: SessionLock,
437
438 memtable: Arc<RwLock<MemTable>>,
440 immutable_memtables: Arc<RwLock<Vec<Arc<MemTable>>>>,
441
442 sequence_number: Arc<RwLock<u64>>,
444
445 next_run_number: Arc<RwLock<RunNumber>>,
447
448 levels: Arc<RwLock<Vec<Vec<SsTableHandle>>>>,
453
454 max_level: u8,
456
457 compactor: Arc<Compactor>,
459}
460
461impl LsmTree {
462 pub fn open(path: impl AsRef<Path>, config: LsmConfig) -> Result<Self> {
463 let path = path.as_ref().to_path_buf();
464
465 std::fs::create_dir_all(&path)?;
467
468 let session_lock = SessionLock::acquire(&path)
471 .map_err(|e| Error::SessionLocked(e.to_string()))?;
472
473 let active_dir = path.join("active");
479 std::fs::create_dir_all(&active_dir)?;
480
481 let snapshots_dir = path.join("snapshots");
482 std::fs::create_dir_all(&snapshots_dir)?;
483
484 fsync_directory(&path)?;
486 fsync_directory(&active_dir)?;
487 fsync_directory(&snapshots_dir)?;
488
489 let sequence_number = 0u64;
491 let memtable = MemTable::new(sequence_number);
492
493 let mut all_sstables = Vec::new();
496 let mut max_run_number = 0u64;
497
498 for entry in std::fs::read_dir(&active_dir)? {
499 let entry = entry?;
500 let path = entry.path();
501
502 if let Some(ext) = path.extension().and_then(|s| s.to_str()) {
504 if ext == "keyops" {
505 if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
507 if let Ok(run_num) = stem.parse::<RunNumber>() {
508 max_run_number = max_run_number.max(run_num);
509
510 match SsTableHandle::open(&active_dir, run_num) {
511 Ok(handle) => all_sstables.push(handle),
512 Err(e) => eprintln!("Failed to load SSTable run {}: {}", run_num, e),
513 }
514 }
515 }
516 }
517 }
518 }
519
520 let next_run_number = max_run_number + 1;
522
523 let max_level = 6;
525 let mut levels: Vec<Vec<SsTableHandle>> = (0..=max_level).map(|_| Vec::new()).collect();
526
527 for handle in all_sstables {
528 let level = handle.level as usize;
529 if level <= max_level as usize {
530 levels[level].push(handle);
531 } else {
532 eprintln!("Warning: SSTable with level {} exceeds max_level {}", level, max_level);
533 }
534 }
535
536 for level in &mut levels {
538 level.sort_by(|a, b| a.min_key.cmp(&b.min_key));
539 }
540
541 let compactor = Arc::new(Compactor::new());
543
544 Ok(Self {
545 active_dir,
546 snapshots_dir,
547 path,
548 config,
549 _session_lock: session_lock,
550 memtable: Arc::new(RwLock::new(memtable)),
551 immutable_memtables: Arc::new(RwLock::new(Vec::new())),
552 sequence_number: Arc::new(RwLock::new(sequence_number)),
553 next_run_number: Arc::new(RwLock::new(next_run_number)),
554 levels: Arc::new(RwLock::new(levels)),
555 max_level,
556 compactor,
557 })
558 }
559
560 pub fn open_snapshot(path: impl AsRef<Path>, snapshot_name: &str) -> Result<Self> {
578 let path = path.as_ref().to_path_buf();
579
580 let snapshot = snapshot::PersistentSnapshot::load(&path, snapshot_name)?;
582 let config = snapshot.metadata.config.clone();
583 let sequence_number = snapshot.metadata.sequence_number;
584
585 let session_lock = SessionLock::acquire(&path)
587 .map_err(|e| Error::SessionLocked(e.to_string()))?;
588
589 let active_dir = path.join("active");
591 std::fs::create_dir_all(&active_dir)?;
592
593 let snapshots_dir = path.join("snapshots");
594 std::fs::create_dir_all(&snapshots_dir)?;
595
596 fsync_directory(&path)?;
598 fsync_directory(&snapshots_dir)?;
599
600 let memtable = MemTable::new(sequence_number);
602
603 let snapshot_dir = snapshots_dir.join(snapshot_name);
616
617 for entry in std::fs::read_dir(&active_dir)? {
619 let entry = entry?;
620 if entry.file_type()?.is_file() {
621 std::fs::remove_file(entry.path())?;
622 }
623 }
624
625 const SSTABLE_EXTS: [&str; 5] = ["keyops", "blobs", "filter", "index", "checksums"];
626 for run in &snapshot.metadata.runs {
627 let prefix = format!("{:05}", run.run_number);
628 for ext in SSTABLE_EXTS {
629 let src = snapshot_dir.join(format!("{}.{}", prefix, ext));
630 let dst = active_dir.join(format!("{}.{}", prefix, ext));
631 std::fs::hard_link(&src, &dst).map_err(Error::Io)?;
632 }
633 }
634 fsync_directory(&active_dir)?;
635
636 let mut all_sstables = Vec::new();
638 let mut max_run_number = 0u64;
639
640 for run in &snapshot.metadata.runs {
641 match SsTableHandle::open(&active_dir, run.run_number) {
642 Ok(handle) => {
643 all_sstables.push(handle);
644 max_run_number = max_run_number.max(run.run_number);
645 }
646 Err(e) => {
647 return Err(Error::InvalidOperation(
648 format!(
649 "Failed to load SSTable run {} from snapshot '{}' at {}:\n {}\n\nThis snapshot may be corrupted. \
650 Consider deleting it and using a previous snapshot.",
651 run.run_number,
652 snapshot_name,
653 active_dir.display(),
654 e
655 )
656 ));
657 }
658 }
659 }
660
661 let next_run_number = max_run_number + 1;
663
664 let max_level = 6;
666 let mut levels: Vec<Vec<SsTableHandle>> = (0..=max_level).map(|_| Vec::new()).collect();
667
668 for handle in all_sstables {
669 let level = handle.level as usize;
670 if level <= max_level as usize {
671 levels[level].push(handle);
672 } else {
673 eprintln!("Warning: SSTable with level {} exceeds max_level {}", level, max_level);
674 }
675 }
676
677 for level in &mut levels {
679 level.sort_by(|a, b| a.min_key.cmp(&b.min_key));
680 }
681
682 let compactor = Arc::new(Compactor::new());
684
685 Ok(Self {
686 active_dir,
687 snapshots_dir,
688 path,
689 config,
690 _session_lock: session_lock,
691 memtable: Arc::new(RwLock::new(memtable)),
692 immutable_memtables: Arc::new(RwLock::new(Vec::new())),
693 sequence_number: Arc::new(RwLock::new(sequence_number)),
694 next_run_number: Arc::new(RwLock::new(next_run_number)),
695 levels: Arc::new(RwLock::new(levels)),
696 max_level,
697 compactor,
698 })
699 }
700
701 pub fn insert(&mut self, key: &Key, value: &Value) -> Result<()> {
702 {
707 let mut memtable = self.memtable.write().unwrap();
708 memtable.insert(key.clone(), value.clone());
709
710 if memtable.size_bytes() >= self.config.memtable_size {
712 drop(memtable); self.flush_memtable()?;
714 }
715 }
716
717 Ok(())
718 }
719
720 pub fn get(&self, key: &Key) -> Result<Option<Value>> {
721 {
723 let memtable = self.memtable.read().unwrap();
724 if let Some(value_opt) = memtable.get(key) {
725 return Ok(value_opt.clone());
726 }
727 }
728
729 {
731 let immutables = self.immutable_memtables.read().unwrap();
732 for imm in immutables.iter().rev() {
733 if let Some(value_opt) = imm.get(key) {
734 return Ok(value_opt.clone());
735 }
736 }
737 }
738
739 {
741 let levels = self.levels.read().unwrap();
742 for level in levels.iter() {
743 let mut sorted_sstables: Vec<&crate::sstable::SsTableHandle> = level.iter().collect();
745 sorted_sstables.sort_by_key(|b| std::cmp::Reverse(b.run_number()));
746
747 for sstable in sorted_sstables {
748 if key >= &sstable.min_key && key <= &sstable.max_key {
750 if let Some(value) = sstable.get(key)? {
751 return Ok(Some(value));
752 }
753 }
754 }
755 }
756 }
757
758 Ok(None)
759 }
760
761 pub fn delete(&mut self, key: &Key) -> Result<()> {
762 {
767 let mut memtable = self.memtable.write().unwrap();
768 memtable.delete(key.clone());
769
770 if memtable.size_bytes() >= self.config.memtable_size {
772 drop(memtable);
773 self.flush_memtable()?;
774 }
775 }
776
777 Ok(())
778 }
779
780 pub fn insert_batch(&mut self, entries: impl IntoIterator<Item = (Key, Value)>) -> Result<()> {
786 let entries_vec: Vec<(Key, Value)> = entries.into_iter().collect();
788
789 if entries_vec.is_empty() {
790 return Ok(());
791 }
792
793 {
795 let mut memtable = self.memtable.write().unwrap();
796 for (key, value) in entries_vec {
797 memtable.insert(key, value);
798 }
799
800 if memtable.size_bytes() >= self.config.memtable_size {
802 drop(memtable); self.flush_memtable()?;
804 }
805 }
806
807 Ok(())
808 }
809
810 pub fn get_batch(&self, keys: impl IntoIterator<Item = Key>) -> Result<Vec<Option<Value>>> {
813 let keys_vec: Vec<Key> = keys.into_iter().collect();
814 let mut results = Vec::with_capacity(keys_vec.len());
815
816 for key in keys_vec {
817 results.push(self.get(&key)?);
818 }
819
820 Ok(results)
821 }
822
823 pub fn delete_batch(&mut self, keys: impl IntoIterator<Item = Key>) -> Result<()> {
827 let keys_vec: Vec<Key> = keys.into_iter().collect();
828
829 if keys_vec.is_empty() {
830 return Ok(());
831 }
832
833 {
835 let mut memtable = self.memtable.write().unwrap();
836 for key in keys_vec {
837 memtable.delete(key);
838 }
839
840 if memtable.size_bytes() >= self.config.memtable_size {
842 drop(memtable); self.flush_memtable()?;
844 }
845 }
846
847 Ok(())
848 }
849
850 pub fn range(&self, from: &Key, to: &Key) -> RangeIter {
853 let mut entries: BTreeMap<Key, Option<Value>> = BTreeMap::new();
855
856 {
858 let levels = self.levels.read().unwrap();
859 for level in levels.iter().rev() {
860 let mut sorted_sstables: Vec<&crate::sstable::SsTableHandle> = level.iter().collect();
863 sorted_sstables.sort_by_key(|a| a.run_number());
864
865 for sstable in sorted_sstables {
866 match sstable.range_with_tombstones(from, to) {
868 Ok(sstable_entries) => {
869 for (k, v) in sstable_entries {
870 entries.insert(k, v);
872 }
873 }
874 Err(e) => {
875 eprintln!("Error reading from SSTable: {}", e);
876 }
877 }
878 }
879 }
880 }
881
882 {
884 let immutables = self.immutable_memtables.read().unwrap();
885 for imm in immutables.iter() {
886 for (k, v) in imm.range(from, to) {
887 entries.insert(k.clone(), v.clone());
888 }
889 }
890 }
891
892 {
894 let memtable = self.memtable.read().unwrap();
895 for (k, v) in memtable.range(from, to) {
896 entries.insert(k.clone(), v.clone());
897 }
898 }
899
900 let results: Vec<_> = entries
902 .into_iter()
903 .filter_map(|(k, v)| v.map(|val| (k, val)))
904 .collect();
905
906 RangeIter {
907 entries: results,
908 index: 0,
909 }
910 }
911
912 pub fn scan_prefix(&self, prefix: &[u8]) -> RangeIter {
913 let mut end_bytes = prefix.to_vec();
915 if let Some(last) = end_bytes.last_mut() {
916 if *last == 0xFF {
917 end_bytes.push(0x00);
918 } else {
919 *last += 1;
920 }
921 } else {
922 end_bytes = vec![0xFF; 20];
924 }
925
926 self.range(&Key::from(prefix), &Key::from(&end_bytes))
927 }
928
929 pub fn iter(&self) -> RangeIter {
930 self.range(&Key::from(b""), &Key::from([0xFF; 256]))
931 }
932
933 pub fn flush(&self) -> Result<()> {
934 Ok(())
936 }
937
938 fn flush_memtable(&mut self) -> Result<()> {
939 let old_memtable = {
941 let mut memtable = self.memtable.write().unwrap();
942 let seq = *self.sequence_number.read().unwrap();
943 let new_memtable = MemTable::new(seq);
944 std::mem::replace(&mut *memtable, new_memtable)
945 };
946
947 if old_memtable.is_empty() {
949 return Ok(());
950 }
951
952 let run_number = {
954 let mut run_num = self.next_run_number.write().unwrap();
955 let current = *run_num;
956 *run_num += 1;
957 current
958 };
959
960 let mut writer = SsTableWriter::new(&self.active_dir, run_number)?;
962
963 for (key, value_opt) in old_memtable.iter() {
964 writer.add(key.clone(), value_opt.clone())?;
965 }
966
967 let handle = writer.finish(0)?; {
971 let mut levels = self.levels.write().unwrap();
972 levels[0].push(handle);
973
974 if levels[0].len() >= self.config.level0_compaction_trigger {
976 drop(levels); self.compact()?;
979 }
980 }
981
982 Ok(())
983 }
984
985 pub fn compact(&mut self) -> Result<()> {
992 let levels_snapshot = self.levels.read().unwrap().clone();
993
994 let job = match self.compactor.select_level_compaction(&levels_snapshot, self.max_level, 4) {
997 Some(job) => job,
998 None => {
999 return Ok(());
1001 }
1002 };
1003
1004 let source_level = job.source_level as usize;
1005 let target_level = job.target_level as usize;
1006
1007 let run_number = {
1009 let mut run_num = self.next_run_number.write().unwrap();
1010 let current = *run_num;
1011 *run_num += 1;
1012 current
1013 };
1014
1015 let source_runs = levels_snapshot[source_level].clone();
1017 let result = self.compactor.compact_levels(
1018 job,
1019 &source_runs,
1020 &self.active_dir,
1021 run_number,
1022 self.max_level,
1023 )?;
1024
1025 {
1027 let mut levels = self.levels.write().unwrap();
1028
1029 let mut to_remove = result.inputs_to_remove.clone();
1031 to_remove.sort_by(|a, b| b.cmp(a)); for idx in to_remove {
1034 if idx < levels[source_level].len() {
1035 let _removed = levels[source_level].remove(idx);
1036 }
1040 }
1041
1042 if let Some(output) = result.output {
1044 if target_level == self.max_level as usize {
1046 levels[target_level].clear();
1049 levels[target_level].push(output);
1050 } else {
1051 levels[target_level].push(output);
1053 }
1054 }
1055 }
1056
1057 Ok(())
1058 }
1059
1060 pub fn compact_all(&mut self) -> Result<()> {
1062 let all_sstables: Vec<SsTableHandle> = {
1064 let levels = self.levels.read().unwrap();
1065 levels.iter().flat_map(|level| level.clone()).collect()
1066 };
1067
1068 if all_sstables.is_empty() {
1069 return Ok(());
1070 }
1071
1072 let all_indices: Vec<usize> = (0..all_sstables.len()).collect();
1074 let job = compaction::CompactionJob {
1075 inputs: all_indices,
1076 strategy: self.config.compaction_strategy.clone(),
1077 };
1078
1079 let run_number = {
1081 let mut run_num = self.next_run_number.write().unwrap();
1082 let current = *run_num;
1083 *run_num += 1;
1084 current
1085 };
1086
1087 let result = self.compactor.compact(job, &all_sstables, &self.active_dir, run_number)?;
1088
1089 {
1091 let mut levels = self.levels.write().unwrap();
1092
1093 for level in levels.iter_mut() {
1096 level.clear();
1097 }
1098
1099 if let Some(output) = result.output {
1101 levels[self.max_level as usize].push(output);
1102 }
1103 }
1104
1105 Ok(())
1106 }
1107
1108 pub fn trigger_background_compaction(&self) {
1109 }
1113
1114 pub fn wait_for_compaction(&self) {
1115 }
1119
1120 pub fn snapshot(&self) -> LsmSnapshot {
1121 let memtable = self.memtable.read().unwrap();
1122 let immutables = self.immutable_memtables.read().unwrap();
1123 let levels = self.levels.read().unwrap();
1124 let seq = *self.sequence_number.read().unwrap();
1125
1126 LsmSnapshot {
1127 memtable: Arc::new((*memtable).clone()),
1128 immutable_memtables: immutables.clone(),
1129 levels: levels.clone(),
1130 sequence_number: seq,
1131 }
1132 }
1133
1134 pub fn rollback(&mut self, snapshot: LsmSnapshot) -> Result<()> {
1135 let current_seq = *self.sequence_number.read().unwrap();
1137 if snapshot.sequence_number > current_seq {
1138 return Err(Error::InvalidOperation(
1139 "Cannot rollback to future snapshot".to_string()
1140 ));
1141 }
1142
1143 *self.memtable.write().unwrap() = (*snapshot.memtable).clone();
1145 *self.immutable_memtables.write().unwrap() = snapshot.immutable_memtables;
1146 *self.levels.write().unwrap() = snapshot.levels;
1147 *self.sequence_number.write().unwrap() = snapshot.sequence_number;
1148
1149 Ok(())
1150 }
1151
1152 pub fn disk_usage(&self) -> Result<u64> {
1153 let mut total = 0u64;
1154
1155 let levels = self.levels.read().unwrap();
1157 for level in levels.iter() {
1158 for sstable in level.iter() {
1159 if let Ok(metadata) = std::fs::metadata(sstable.path()) {
1160 total += metadata.len();
1161 }
1162 }
1163 }
1164
1165 Ok(total)
1166 }
1167
1168 pub fn get_stats(&self) -> Result<LsmStats> {
1169 let memtable = self.memtable.read().unwrap();
1170 let immutables = self.immutable_memtables.read().unwrap();
1171 let levels = self.levels.read().unwrap();
1172
1173 let total_sstables: usize = levels.iter().map(|level| level.len()).sum();
1174
1175 Ok(LsmStats {
1176 memtable_size_bytes: memtable.size_bytes() as u64,
1177 immutable_memtables_count: immutables.len(),
1178 l0_sstables_count: levels[0].len(),
1179 total_sstables_count: total_sstables,
1180 compactions_running: 0,
1181 bloom_filter_false_positives: 0,
1182 })
1183 }
1184
1185 pub fn save_snapshot(&mut self, name: &str, label: &str) -> Result<()> {
1190 self.flush_memtable()?;
1192
1193 let sequence_number = *self.sequence_number.read().unwrap();
1195 let all_sstables: Vec<SsTableHandle> = {
1196 let levels = self.levels.read().unwrap();
1197 levels.iter().flat_map(|level| level.clone()).collect()
1198 };
1199
1200 PersistentSnapshot::create(
1202 &self.path,
1203 name,
1204 label,
1205 &all_sstables,
1206 sequence_number,
1207 &self.config,
1208 )?;
1209
1210 Ok(())
1211 }
1212
1213 pub fn list_snapshots(&self) -> Result<Vec<String>> {
1215 snapshot::list_snapshots(&self.path)
1216 }
1217
1218 pub fn delete_snapshot(&self, name: &str) -> Result<()> {
1220 let snapshot = PersistentSnapshot::load(&self.path, name)?;
1221 snapshot.delete()
1222 .map_err(Error::Io)
1223 }
1224}
1225
1226impl Clone for MemTable {
1228 fn clone(&self) -> Self {
1229 Self {
1230 data: self.data.clone(),
1231 size_bytes: self.size_bytes,
1232 sequence_number: self.sequence_number,
1233 }
1234 }
1235}
1236
1237pub struct RangeIter {
1240 entries: Vec<(Key, Value)>,
1241 index: usize,
1242}
1243
1244impl Iterator for RangeIter {
1245 type Item = (Key, Value);
1246
1247 fn next(&mut self) -> Option<Self::Item> {
1248 if self.index < self.entries.len() {
1249 let item = self.entries[self.index].clone();
1250 self.index += 1;
1251 Some(item)
1252 } else {
1253 None
1254 }
1255 }
1256}
1257
1258impl Clone for RangeIter {
1259 fn clone(&self) -> Self {
1260 Self {
1261 entries: self.entries.clone(),
1262 index: self.index,
1263 }
1264 }
1265}
1266
1267#[derive(Clone)]
1270pub struct LsmSnapshot {
1271 memtable: Arc<MemTable>,
1272 immutable_memtables: Vec<Arc<MemTable>>,
1273 levels: Vec<Vec<SsTableHandle>>,
1274 sequence_number: u64,
1275}
1276
1277impl LsmSnapshot {
1278 pub fn sequence_number(&self) -> u64 {
1279 self.sequence_number
1280 }
1281
1282 pub fn get(&self, key: &Key) -> Result<Option<Value>> {
1283 if let Some(value_opt) = self.memtable.get(key) {
1285 return Ok(value_opt.clone());
1286 }
1287
1288 for imm in self.immutable_memtables.iter().rev() {
1290 if let Some(value_opt) = imm.get(key) {
1291 return Ok(value_opt.clone());
1292 }
1293 }
1294
1295 for level in &self.levels {
1297 for sstable in level.iter().rev() {
1298 if key >= &sstable.min_key && key <= &sstable.max_key {
1299 if let Some(value) = sstable.get(key)? {
1300 return Ok(Some(value));
1301 }
1302 }
1303 }
1304 }
1305
1306 Ok(None)
1307 }
1308
1309 pub fn iter(&self) -> RangeIter {
1310 let mut entries: BTreeMap<Key, Option<Value>> = BTreeMap::new();
1311
1312 for level in self.levels.iter().rev() {
1314 for sstable in level {
1315 if let Ok(sstable_entries) = sstable.range(&Key::from(b""), &Key::from([0xFF; 256])) {
1316 for (k, v) in sstable_entries {
1317 entries.entry(k).or_insert(v);
1318 }
1319 }
1320 }
1321 }
1322
1323 for imm in &self.immutable_memtables {
1325 for (k, v) in imm.iter() {
1326 entries.insert(k.clone(), v.clone());
1327 }
1328 }
1329
1330 for (k, v) in self.memtable.iter() {
1332 entries.insert(k.clone(), v.clone());
1333 }
1334
1335 let results: Vec<_> = entries
1337 .into_iter()
1338 .filter_map(|(k, v)| v.map(|val| (k, val)))
1339 .collect();
1340
1341 RangeIter {
1342 entries: results,
1343 index: 0,
1344 }
1345 }
1346}
1347
1348#[derive(Clone, Debug)]
1349pub struct LsmStats {
1350 pub memtable_size_bytes: u64,
1351 pub immutable_memtables_count: usize,
1352 pub l0_sstables_count: usize,
1353 pub total_sstables_count: usize,
1354 pub compactions_running: usize,
1355 pub bloom_filter_false_positives: u64,
1356}
1357
1358