1use std::collections::HashSet;
21use std::fs;
22use std::ops::{Bound, RangeBounds};
23use std::path::{Path, PathBuf};
24use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
25use std::sync::{Arc, Condvar, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
26use std::thread::JoinHandle;
27
28use crate::batch::{Batch, Op};
29use crate::bloom::{self, RunFilter};
30use crate::cache::BlockCache;
31use crate::config::LsmConfig;
32use crate::durability::Durability;
33use crate::error::{Error, Result};
34use crate::manifest::{self, Manifest};
35use crate::memtable::MemTable;
36use crate::merge::Merge;
37use crate::record::Record;
38use crate::scan::Scan;
39use crate::sstable::{SsTable, SsTableWriter};
40
41#[derive(Debug)]
43struct Inner {
44 memtable: MemTable,
45 runs: Vec<Arc<SsTable>>,
47 durability: Durability,
49}
50
51#[derive(Debug, Default)]
53struct CompactionState {
54 pending: bool,
56 running: bool,
58 shutdown: bool,
60 generation: u64,
63}
64
65#[derive(Debug)]
67struct Engine {
68 dir: PathBuf,
69 config: LsmConfig,
70 inner: RwLock<Inner>,
71 next_seq: AtomicU64,
74 compacting: AtomicBool,
76 compaction: Mutex<CompactionState>,
77 cond: Condvar,
78 last_error: Mutex<Option<Error>>,
80 cache: Arc<BlockCache>,
83}
84
85#[derive(Debug)]
124pub struct Lsm {
125 engine: Arc<Engine>,
126 compactor: Option<JoinHandle<()>>,
127}
128
129impl Lsm {
130 pub fn open(dir: impl AsRef<Path>) -> Result<Self> {
148 Self::open_with(dir, LsmConfig::default())
149 }
150
151 pub fn open_with(dir: impl AsRef<Path>, config: LsmConfig) -> Result<Self> {
165 let dir = dir.as_ref().to_path_buf();
166 fs::create_dir_all(&dir).map_err(|e| Error::io("create database directory", e))?;
167
168 let manifest = Manifest::load(&dir)?;
169 let (run_names, manifest_seq) = match manifest {
170 Some(m) => (m.runs, m.next_seq),
171 None => (Vec::new(), 0),
172 };
173 let live: HashSet<&str> = run_names.iter().map(String::as_str).collect();
174
175 let cache = BlockCache::new(config.block_cache_capacity_bytes());
177
178 let mut runs = Vec::with_capacity(run_names.len());
183 for name in &run_names {
184 let path = dir.join(name);
185 if !path.exists() {
186 return Err(Error::corruption("manifest references a missing run"));
187 }
188 let mut table = SsTable::open(&path)?;
189 table.attach_filter(RunFilter::load(&path)?);
190 table.attach_cache(Arc::clone(&cache));
191 runs.push(Arc::new(table));
192 }
193
194 let mut next_seq = manifest_seq;
197 for entry in fs::read_dir(&dir).map_err(|e| Error::io("scan database directory", e))? {
198 let entry = entry.map_err(|e| Error::io("read directory entry", e))?;
199 let name = entry.file_name().to_string_lossy().into_owned();
200 if name.ends_with(".tmp") {
201 fs::remove_file(entry.path()).map_err(|e| Error::io("remove temporary file", e))?;
202 } else if let Some(run) = name.strip_suffix(".bloom") {
203 if !live.contains(run) {
206 fs::remove_file(entry.path())
207 .map_err(|e| Error::io("remove orphan bloom sidecar", e))?;
208 }
209 } else if let Some(seq) = manifest::seq_of(&name) {
210 next_seq = next_seq.max(seq + 1);
211 if !live.contains(name.as_str()) {
212 fs::remove_file(entry.path()).map_err(|e| Error::io("remove orphan run", e))?;
213 }
214 }
215 }
216
217 let durability = Durability::open(&dir)?;
220 let mut memtable = MemTable::new();
221 durability.replay(&mut memtable)?;
222
223 let engine = Arc::new(Engine {
224 dir,
225 config,
226 inner: RwLock::new(Inner {
227 memtable,
228 runs,
229 durability,
230 }),
231 next_seq: AtomicU64::new(next_seq),
232 compacting: AtomicBool::new(false),
233 compaction: Mutex::new(CompactionState::default()),
234 cond: Condvar::new(),
235 last_error: Mutex::new(None),
236 cache,
237 });
238
239 engine.flush()?;
242
243 let compactor = {
244 let engine = Arc::clone(&engine);
245 std::thread::Builder::new()
246 .name("lsm-compactor".to_owned())
247 .spawn(move || compactor_loop(&engine))
248 .map_err(|e| Error::io("spawn compaction thread", e))?
249 };
250
251 Ok(Lsm {
252 engine,
253 compactor: Some(compactor),
254 })
255 }
256
257 pub fn put(&self, key: impl AsRef<[u8]>, value: impl AsRef<[u8]>) -> Result<()> {
271 self.engine.put(key.as_ref(), value.as_ref())
272 }
273
274 pub fn delete(&self, key: impl AsRef<[u8]>) -> Result<()> {
292 self.engine.delete(key.as_ref())
293 }
294
295 pub fn write(&self, batch: Batch) -> Result<()> {
319 self.engine.write(batch)
320 }
321
322 pub fn get(&self, key: impl AsRef<[u8]>) -> Result<Option<Vec<u8>>> {
337 self.engine.get(key.as_ref())
338 }
339
340 pub fn scan<R>(&self, range: R) -> Result<Scan>
364 where
365 R: RangeBounds<Vec<u8>>,
366 {
367 self.engine.scan(range)
368 }
369
370 pub fn flush(&self) -> Result<()> {
391 self.engine.flush()
392 }
393
394 #[cfg(test)]
397 pub(crate) fn compact_now(&self) -> Result<()> {
398 self.engine.compact_once()
399 }
400
401 #[cfg(test)]
403 pub(crate) fn run_count(&self) -> usize {
404 self.engine.read_guard().runs.len()
405 }
406
407 #[cfg(test)]
410 pub(crate) fn wait_for_idle(&self) {
411 let mut state = self
412 .engine
413 .compaction
414 .lock()
415 .unwrap_or_else(|p| p.into_inner());
416 while state.pending || state.running {
417 state = self
418 .engine
419 .cond
420 .wait(state)
421 .unwrap_or_else(|p| p.into_inner());
422 }
423 }
424}
425
426impl Drop for Lsm {
427 fn drop(&mut self) {
428 {
429 let mut state = self
430 .engine
431 .compaction
432 .lock()
433 .unwrap_or_else(|p| p.into_inner());
434 state.shutdown = true;
435 }
436 self.engine.cond.notify_all();
437 if let Some(handle) = self.compactor.take() {
438 let _ = handle.join();
439 }
440 }
441}
442
443impl Engine {
444 fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
445 let record = Record::Value(value.to_vec());
446 let mut inner = self.write_guard();
447 inner.durability.log_one(key, &record)?;
449 inner.memtable.apply(key.to_vec(), record);
450 self.maybe_flush(&mut inner)
451 }
452
453 fn delete(&self, key: &[u8]) -> Result<()> {
454 let record = Record::Tombstone;
455 let mut inner = self.write_guard();
456 inner.durability.log_one(key, &record)?;
457 inner.memtable.apply(key.to_vec(), record);
458 self.maybe_flush(&mut inner)
459 }
460
461 fn write(&self, batch: Batch) -> Result<()> {
462 let ops: Vec<(Vec<u8>, Record)> = batch
463 .into_ops()
464 .into_iter()
465 .map(|(key, op)| {
466 let record = match op {
467 Op::Put(value) => Record::Value(value),
468 Op::Delete => Record::Tombstone,
469 };
470 (key, record)
471 })
472 .collect();
473
474 let mut inner = self.write_guard();
475 inner.durability.log_batch(&ops)?;
477 for (key, record) in ops {
478 inner.memtable.apply(key, record);
479 }
480 self.maybe_flush(&mut inner)
481 }
482
483 fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
484 let runs = {
485 let inner = self.read_guard();
486 match inner.memtable.get(key) {
487 Some(Record::Value(value)) => return Ok(Some(value.clone())),
488 Some(Record::Tombstone) => return Ok(None),
489 None => inner.runs.clone(),
490 }
491 };
492 for run in &runs {
495 if !run.might_contain(key) {
496 continue;
497 }
498 match run.lookup(key)? {
499 Some(Record::Value(value)) => return Ok(Some(value)),
500 Some(Record::Tombstone) => return Ok(None),
501 None => {}
502 }
503 }
504 Ok(None)
505 }
506
507 fn scan<R>(&self, range: R) -> Result<Scan>
508 where
509 R: RangeBounds<Vec<u8>>,
510 {
511 let (mem, runs) = {
512 let inner = self.read_guard();
513 let mem: Vec<(Vec<u8>, Record)> = inner
514 .memtable
515 .iter()
516 .filter(|(k, _)| matches!(position(&range, k), Pos::In))
517 .map(|(k, r)| (k.clone(), r.clone()))
518 .collect();
519 (mem, inner.runs.clone())
520 };
521
522 let cursors = runs.iter().map(|r| r.cursor()).collect();
523 let mut out = Vec::new();
524 for item in Merge::new(mem, cursors) {
525 let (key, value) = item?;
526 match position(&range, &key) {
527 Pos::Below => {}
528 Pos::In => out.push((key, value)),
529 Pos::Above => break, }
531 }
532 Ok(Scan::new(out))
533 }
534
535 fn flush(&self) -> Result<()> {
536 let mut inner = self.write_guard();
537 if inner.memtable.is_empty() {
538 return Ok(());
539 }
540 self.flush_locked(&mut inner)
541 }
542
543 fn maybe_flush(&self, inner: &mut Inner) -> Result<()> {
545 if !inner.memtable.is_empty()
546 && inner.memtable.approx_size() >= self.config.memtable_capacity_bytes()
547 {
548 self.flush_locked(inner)?;
549 }
550 Ok(())
551 }
552
553 fn flush_locked(&self, inner: &mut Inner) -> Result<()> {
555 let entries = inner.memtable.take();
556 let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
557 let name = manifest::run_filename(seq);
558 let tmp = self.dir.join(format!("{name}.tmp"));
559 let final_path = self.dir.join(&name);
560
561 let mut writer = SsTableWriter::create(&tmp)?;
562 let mut filter = bloom::builder(entries.len());
563 for (key, record) in &entries {
564 writer.push(key, record)?;
565 filter.add(key);
566 }
567 writer.finish()?;
568 fs::rename(&tmp, &final_path).map_err(|e| Error::io("install flushed run", e))?;
569
570 let filter = filter.finish();
573 if let Some(filter) = &filter {
574 filter.write_sidecar(&final_path)?;
575 }
576 let mut table = SsTable::open(&final_path)?;
577 table.attach_filter(filter);
578 table.attach_cache(Arc::clone(&self.cache));
579
580 let run = Arc::new(table);
581 let mut new_runs = Vec::with_capacity(inner.runs.len() + 1);
582 new_runs.push(run);
583 new_runs.extend(inner.runs.iter().cloned());
584
585 let names: Vec<String> = new_runs.iter().map(|r| r.file_name()).collect();
586 Manifest::store(&self.dir, self.next_seq.load(Ordering::SeqCst), &names)?;
587 inner.runs = new_runs;
588
589 inner.durability.rotate()?;
592
593 if inner.runs.len() >= self.config.compaction_trigger_runs() {
594 self.signal_compaction();
595 }
596 Ok(())
597 }
598
599 fn compact_once(&self) -> Result<()> {
605 if self.compacting.swap(true, Ordering::AcqRel) {
606 return Ok(()); }
608 let result = self.compact_inner();
609 self.compacting.store(false, Ordering::Release);
610 result
611 }
612
613 fn compact_inner(&self) -> Result<()> {
614 let inputs: Vec<Arc<SsTable>> = {
617 let inner = self.read_guard();
618 if inner.runs.len() < 2 {
619 return Ok(());
620 }
621 inner.runs.clone()
622 };
623
624 let capacity: usize = inputs
628 .iter()
629 .map(|r| usize::try_from(r.entry_count()).unwrap_or(usize::MAX))
630 .fold(0usize, |acc, n| acc.saturating_add(n));
631
632 let seq = self.next_seq.fetch_add(1, Ordering::SeqCst);
634 let name = manifest::run_filename(seq);
635 let tmp = self.dir.join(format!("{name}.tmp"));
636 let final_path = self.dir.join(&name);
637 let mut filter = bloom::builder(capacity);
638 {
639 let mut writer = SsTableWriter::create(&tmp)?;
640 let cursors = inputs.iter().map(|r| r.cursor()).collect();
641 for item in Merge::new(Vec::new(), cursors) {
644 let (key, value) = item?;
645 writer.push(&key, &Record::Value(value))?;
646 filter.add(&key);
647 }
648 writer.finish()?;
649 }
650 fs::rename(&tmp, &final_path).map_err(|e| Error::io("install compacted run", e))?;
651
652 let filter = filter.finish();
653 if let Some(filter) = &filter {
654 filter.write_sidecar(&final_path)?;
655 }
656 let mut output = SsTable::open(&final_path)?;
657 output.attach_filter(filter);
658 output.attach_cache(Arc::clone(&self.cache));
659 let output = Arc::new(output);
660
661 {
664 let mut inner = self.write_guard();
665 let mut new_runs: Vec<Arc<SsTable>> = inner
666 .runs
667 .iter()
668 .filter(|r| !inputs.iter().any(|i| Arc::ptr_eq(i, r)))
669 .cloned()
670 .collect();
671 new_runs.push(Arc::clone(&output));
672
673 let names: Vec<String> = new_runs.iter().map(|r| r.file_name()).collect();
674 Manifest::store(&self.dir, self.next_seq.load(Ordering::SeqCst), &names)?;
677 for input in &inputs {
678 input.mark_obsolete();
679 }
680 inner.runs = new_runs;
681 }
682 drop(inputs);
684 Ok(())
685 }
686
687 fn signal_compaction(&self) {
689 let mut state = self.compaction.lock().unwrap_or_else(|p| p.into_inner());
690 state.pending = true;
691 self.cond.notify_all();
692 }
693
694 fn read_guard(&self) -> RwLockReadGuard<'_, Inner> {
695 self.inner.read().unwrap_or_else(|p| p.into_inner())
696 }
697
698 fn write_guard(&self) -> RwLockWriteGuard<'_, Inner> {
699 self.inner.write().unwrap_or_else(|p| p.into_inner())
700 }
701}
702
703fn compactor_loop(engine: &Engine) {
705 loop {
706 {
707 let mut state = engine.compaction.lock().unwrap_or_else(|p| p.into_inner());
708 while !state.pending && !state.shutdown {
709 state = engine.cond.wait(state).unwrap_or_else(|p| p.into_inner());
710 }
711 if state.shutdown {
712 return;
713 }
714 state.pending = false;
715 state.running = true;
716 }
717
718 let result = engine.compact_once();
719
720 {
721 let mut state = engine.compaction.lock().unwrap_or_else(|p| p.into_inner());
722 state.running = false;
723 state.generation += 1;
724 if let Err(err) = result {
725 *engine.last_error.lock().unwrap_or_else(|p| p.into_inner()) = Some(err);
726 }
727 engine.cond.notify_all();
728 }
729 }
730}
731
732enum Pos {
734 Below,
735 In,
736 Above,
737}
738
739fn position<R: RangeBounds<Vec<u8>>>(range: &R, key: &[u8]) -> Pos {
741 let below = match range.start_bound() {
742 Bound::Included(s) => key < s.as_slice(),
743 Bound::Excluded(s) => key <= s.as_slice(),
744 Bound::Unbounded => false,
745 };
746 if below {
747 return Pos::Below;
748 }
749 let above = match range.end_bound() {
750 Bound::Included(e) => key > e.as_slice(),
751 Bound::Excluded(e) => key >= e.as_slice(),
752 Bound::Unbounded => false,
753 };
754 if above { Pos::Above } else { Pos::In }
755}
756
757#[cfg(test)]
758#[allow(clippy::unwrap_used, clippy::expect_used)]
759mod tests {
760 use super::*;
761
762 fn db_no_autocompact() -> (tempfile::TempDir, Lsm) {
764 let dir = tempfile::tempdir().unwrap();
765 let db =
766 Lsm::open_with(dir.path(), LsmConfig::new().compaction_trigger(usize::MAX)).unwrap();
767 (dir, db)
768 }
769
770 fn db() -> (tempfile::TempDir, Lsm) {
771 let dir = tempfile::tempdir().unwrap();
772 let db = Lsm::open(dir.path()).unwrap();
773 (dir, db)
774 }
775
776 #[test]
777 fn test_put_get_roundtrip() {
778 let (_d, db) = db();
779 db.put(b"k", b"v").unwrap();
780 assert_eq!(db.get(b"k").unwrap(), Some(b"v".to_vec()));
781 }
782
783 #[test]
784 fn test_get_absent_is_none() {
785 let (_d, db) = db();
786 assert_eq!(db.get(b"absent").unwrap(), None);
787 }
788
789 #[test]
790 fn test_overwrite_across_runs() {
791 let (_d, db) = db_no_autocompact();
792 db.put(b"k", b"old").unwrap();
793 db.flush().unwrap();
794 db.put(b"k", b"new").unwrap();
795 db.flush().unwrap();
796 assert_eq!(db.run_count(), 2);
797 assert_eq!(db.get(b"k").unwrap(), Some(b"new".to_vec()));
798 }
799
800 #[test]
801 fn test_delete_masks_value_across_runs() {
802 let (_d, db) = db_no_autocompact();
803 db.put(b"k", b"v").unwrap();
804 db.flush().unwrap();
805 db.delete(b"k").unwrap();
806 db.flush().unwrap();
807 assert_eq!(db.get(b"k").unwrap(), None);
808 }
809
810 #[test]
811 fn test_compaction_merges_to_single_run() {
812 let (_d, db) = db_no_autocompact();
813 for i in 0..5u32 {
814 db.put(format!("k{i}").into_bytes(), format!("v{i}").into_bytes())
815 .unwrap();
816 db.flush().unwrap();
817 }
818 assert_eq!(db.run_count(), 5);
819 db.compact_now().unwrap();
820 assert_eq!(db.run_count(), 1);
821 for i in 0..5u32 {
822 assert_eq!(
823 db.get(format!("k{i}").into_bytes()).unwrap(),
824 Some(format!("v{i}").into_bytes())
825 );
826 }
827 }
828
829 #[test]
830 fn test_compaction_drops_tombstones_and_keeps_latest() {
831 let (_d, db) = db_no_autocompact();
832 db.put(b"keep", b"1").unwrap();
833 db.put(b"gone", b"x").unwrap();
834 db.flush().unwrap();
835 db.put(b"keep", b"2").unwrap(); db.delete(b"gone").unwrap(); db.flush().unwrap();
838 db.compact_now().unwrap();
839
840 assert_eq!(db.run_count(), 1);
841 assert_eq!(db.get(b"keep").unwrap(), Some(b"2".to_vec()));
842 assert_eq!(db.get(b"gone").unwrap(), None);
843 assert_eq!(db.scan(..).unwrap().count(), 1);
845 }
846
847 #[test]
848 fn test_reopen_reads_all_runs() {
849 let dir = tempfile::tempdir().unwrap();
850 {
851 let db = Lsm::open_with(dir.path(), LsmConfig::new().compaction_trigger(usize::MAX))
852 .unwrap();
853 db.put(b"a", b"1").unwrap();
854 db.flush().unwrap();
855 db.put(b"b", b"2").unwrap();
856 db.flush().unwrap();
857 db.put(b"a", b"updated").unwrap();
858 db.flush().unwrap();
859 }
860 let db = Lsm::open(dir.path()).unwrap();
861 assert_eq!(db.get(b"a").unwrap(), Some(b"updated".to_vec()));
862 assert_eq!(db.get(b"b").unwrap(), Some(b"2".to_vec()));
863 }
864
865 #[test]
866 fn test_reopen_after_compaction() {
867 let dir = tempfile::tempdir().unwrap();
868 {
869 let db = Lsm::open_with(dir.path(), LsmConfig::new().compaction_trigger(usize::MAX))
870 .unwrap();
871 for i in 0..4u32 {
872 db.put(format!("k{i}").into_bytes(), b"v").unwrap();
873 db.flush().unwrap();
874 }
875 db.compact_now().unwrap();
876 assert_eq!(db.run_count(), 1);
877 }
878 let db = Lsm::open(dir.path()).unwrap();
879 assert_eq!(db.run_count(), 1);
880 assert_eq!(db.scan(..).unwrap().count(), 4);
881 }
882
883 #[test]
884 fn test_background_compaction_triggers() {
885 let dir = tempfile::tempdir().unwrap();
886 let db = Lsm::open_with(dir.path(), LsmConfig::new().compaction_trigger(3)).unwrap();
887 for i in 0..10u32 {
888 db.put(format!("k{i:02}").into_bytes(), b"v").unwrap();
889 db.flush().unwrap();
890 }
891 db.wait_for_idle();
892 assert!(db.run_count() <= 3, "run count was {}", db.run_count());
894 for i in 0..10u32 {
895 assert_eq!(
896 db.get(format!("k{i:02}").into_bytes()).unwrap(),
897 Some(b"v".to_vec())
898 );
899 }
900 }
901
902 #[test]
903 fn test_scan_merges_across_runs() {
904 let (_d, db) = db_no_autocompact();
905 db.put(b"a", b"old-a").unwrap();
906 db.put(b"c", b"3").unwrap();
907 db.flush().unwrap();
908 db.put(b"a", b"new-a").unwrap();
909 db.put(b"b", b"2").unwrap();
910 db.delete(b"c").unwrap();
911 db.flush().unwrap();
912 let got: Vec<_> = db.scan(..).unwrap().collect();
913 assert_eq!(
914 got,
915 vec![
916 (b"a".to_vec(), b"new-a".to_vec()),
917 (b"b".to_vec(), b"2".to_vec())
918 ]
919 );
920 }
921
922 #[test]
923 fn test_scan_bounded_range() {
924 let (_d, db) = db();
925 for (k, v) in [("a", "1"), ("b", "2"), ("c", "3"), ("d", "4")] {
926 db.put(k.as_bytes(), v.as_bytes()).unwrap();
927 }
928 let got: Vec<_> = db.scan(b"b".to_vec()..b"d".to_vec()).unwrap().collect();
929 assert_eq!(
930 got,
931 vec![
932 (b"b".to_vec(), b"2".to_vec()),
933 (b"c".to_vec(), b"3".to_vec())
934 ]
935 );
936 }
937
938 #[test]
939 fn test_empty_value_roundtrips_through_flush() {
940 let (_d, db) = db_no_autocompact();
941 db.put(b"k", b"").unwrap();
942 db.flush().unwrap();
943 assert_eq!(db.get(b"k").unwrap(), Some(Vec::new()));
944 db.compact_now().unwrap();
945 assert_eq!(db.get(b"k").unwrap(), Some(Vec::new()));
946 }
947
948 #[test]
949 fn test_engine_is_send_and_sync() {
950 fn assert_send_sync<T: Send + Sync>() {}
951 assert_send_sync::<Lsm>();
952 }
953
954 #[cfg(feature = "bloom")]
959 #[test]
960 fn test_bloom_skips_blocks_on_negative_lookup() {
961 use crate::sstable::block_reads;
962
963 let (_d, db) = db_no_autocompact();
964 for run in 0..6u32 {
967 for i in 0..50u32 {
968 let key = format!("k{:04}", i * 2); db.put(key.as_bytes(), format!("r{run}").as_bytes())
970 .unwrap();
971 }
972 db.flush().unwrap();
973 }
974 assert_eq!(db.run_count(), 6);
975
976 block_reads::reset();
978 assert_eq!(db.get(b"k0051").unwrap(), None);
979 assert_eq!(
980 block_reads::count(),
981 0,
982 "bloom filters must let a negative lookup skip every run with no block read"
983 );
984
985 block_reads::reset();
987 assert!(db.get(b"k0010").unwrap().is_some());
988 assert!(
989 block_reads::count() >= 1,
990 "a hit must read at least one block"
991 );
992 }
993
994 #[cfg(feature = "bloom")]
997 #[test]
998 fn test_bloom_sidecars_track_runs_through_compaction() {
999 let count = |dir: &std::path::Path, suffix: &str| {
1000 std::fs::read_dir(dir)
1001 .unwrap()
1002 .filter(|e| {
1003 e.as_ref()
1004 .unwrap()
1005 .file_name()
1006 .to_string_lossy()
1007 .ends_with(suffix)
1008 })
1009 .count()
1010 };
1011
1012 let (dir, db) = db_no_autocompact();
1013 for i in 0..5u32 {
1014 db.put(format!("k{i}").into_bytes(), b"v").unwrap();
1015 db.flush().unwrap();
1016 }
1017 assert_eq!(count(dir.path(), ".sst.bloom"), 5);
1018
1019 db.compact_now().unwrap();
1020 assert_eq!(db.run_count(), 1);
1021 assert_eq!(count(dir.path(), ".sst"), 1);
1024 assert_eq!(count(dir.path(), ".sst.bloom"), 1);
1025 for i in 0..5u32 {
1026 assert_eq!(
1027 db.get(format!("k{i}").into_bytes()).unwrap(),
1028 Some(b"v".to_vec())
1029 );
1030 }
1031 }
1032
1033 #[cfg(feature = "bloom")]
1036 #[test]
1037 fn test_block_cache_serves_repeat_lookup() {
1038 use crate::sstable::block_reads;
1039
1040 let (_d, db) = db(); db.put(b"k", b"v").unwrap();
1042 db.flush().unwrap();
1043
1044 block_reads::reset();
1045 assert_eq!(db.get(b"k").unwrap(), Some(b"v".to_vec()));
1046 assert!(block_reads::count() >= 1, "cold lookup reads its block");
1047
1048 block_reads::reset();
1049 assert_eq!(db.get(b"k").unwrap(), Some(b"v".to_vec()));
1050 assert_eq!(
1051 block_reads::count(),
1052 0,
1053 "a repeat lookup must be served from the block cache"
1054 );
1055 }
1056
1057 #[cfg(feature = "bloom")]
1059 #[test]
1060 fn test_block_cache_disabled_always_reads() {
1061 use crate::sstable::block_reads;
1062
1063 let dir = tempfile::tempdir().unwrap();
1064 let db = Lsm::open_with(dir.path(), LsmConfig::new().block_cache_capacity(0)).unwrap();
1065 db.put(b"k", b"v").unwrap();
1066 db.flush().unwrap();
1067
1068 for _ in 0..2 {
1069 block_reads::reset();
1070 assert_eq!(db.get(b"k").unwrap(), Some(b"v".to_vec()));
1071 assert!(
1072 block_reads::count() >= 1,
1073 "with the cache off, every lookup reads its block"
1074 );
1075 }
1076 }
1077}