lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10    blob_tree::FragmentationMap,
11    compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
12    config::Config,
13    file::BLOBS_FOLDER,
14    format_version::FormatVersion,
15    iter_guard::{IterGuard, IterGuardImpl},
16    manifest::Manifest,
17    memtable::Memtable,
18    slice::Slice,
19    table::Table,
20    value::InternalValue,
21    version::{recovery::recover, SuperVersion, SuperVersions, Version, VersionId},
22    vlog::BlobFile,
23    AbstractTree, Cache, Checksum, DescriptorTable, KvPair, SeqNo, SequenceNumberCounter, TableId,
24    TreeType, UserKey, UserValue, ValueType,
25};
26use inner::{MemtableId, TreeId, TreeInner};
27use std::{
28    ops::{Bound, RangeBounds},
29    path::Path,
30    sync::{Arc, Mutex, RwLock},
31};
32
33#[cfg(feature = "metrics")]
34use crate::metrics::Metrics;
35
36pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39    fn key(self) -> crate::Result<UserKey> {
40        self.0.map(|(k, _)| k)
41    }
42
43    fn size(self) -> crate::Result<u32> {
44        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
45        self.into_inner().map(|(_, v)| v.len() as u32)
46    }
47
48    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
49        self.0
50    }
51}
52
53fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
54    if item.is_tombstone() {
55        None
56    } else {
57        Some(item)
58    }
59}
60
61/// A log-structured merge tree (LSM-tree/LSMT)
62#[derive(Clone)]
63pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
64
65impl std::ops::Deref for Tree {
66    type Target = TreeInner;
67
68    fn deref(&self) -> &Self::Target {
69        &self.0
70    }
71}
72
73impl AbstractTree for Tree {
74    fn next_table_id(&self) -> TableId {
75        self.0.table_id_counter.get()
76    }
77
78    fn id(&self) -> TreeId {
79        self.id
80    }
81
82    #[expect(clippy::significant_drop_tightening)]
83    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
84        let version_history_lock = self.version_history.read().expect("lock is poisoned");
85        let super_version = version_history_lock.get_version_for_snapshot(seqno);
86
87        if let Some(entry) = super_version.active_memtable.get(key, seqno) {
88            return Ok(ignore_tombstone_value(entry));
89        }
90
91        // Now look in sealed memtables
92        if let Some(entry) =
93            Self::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
94        {
95            return Ok(ignore_tombstone_value(entry));
96        }
97
98        // Now look in tables... this may involve disk I/O
99        self.get_internal_entry_from_tables(&super_version.version, key, seqno)
100    }
101
102    fn current_version(&self) -> Version {
103        self.version_history
104            .read()
105            .expect("poisoned")
106            .latest_version()
107            .version
108    }
109
110    fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Table>> {
111        log::debug!("Flushing active memtable");
112
113        let Some((table_id, yanked_memtable)) = self.rotate_memtable() else {
114            return Ok(None);
115        };
116
117        let Some((table, _)) = self.flush_memtable(table_id, &yanked_memtable, seqno_threshold)?
118        else {
119            return Ok(None);
120        };
121        self.register_tables(std::slice::from_ref(&table), None, None)?;
122
123        Ok(Some(table))
124    }
125
126    #[cfg(feature = "metrics")]
127    fn metrics(&self) -> &Arc<crate::Metrics> {
128        &self.0.metrics
129    }
130
131    fn version_free_list_len(&self) -> usize {
132        self.version_history
133            .read()
134            .expect("lock is poisoned")
135            .free_list_len()
136    }
137
138    fn prefix<K: AsRef<[u8]>>(
139        &self,
140        prefix: K,
141        seqno: SeqNo,
142        index: Option<Arc<Memtable>>,
143    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
144        Box::new(
145            self.create_prefix(&prefix, seqno, index)
146                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
147        )
148    }
149
150    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
151        &self,
152        range: R,
153        seqno: SeqNo,
154        index: Option<Arc<Memtable>>,
155    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
156        Box::new(
157            self.create_range(&range, seqno, index)
158                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
159        )
160    }
161
162    /// Returns the number of tombstones in the tree.
163    fn tombstone_count(&self) -> u64 {
164        self.current_version()
165            .iter_tables()
166            .map(Table::tombstone_count)
167            .sum()
168    }
169
170    /// Returns the number of weak tombstones (single deletes) in the tree.
171    fn weak_tombstone_count(&self) -> u64 {
172        self.current_version()
173            .iter_tables()
174            .map(Table::weak_tombstone_count)
175            .sum()
176    }
177
178    /// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
179    fn weak_tombstone_reclaimable_count(&self) -> u64 {
180        self.current_version()
181            .iter_tables()
182            .map(Table::weak_tombstone_reclaimable)
183            .sum()
184    }
185
186    fn ingest(
187        &self,
188        iter: impl Iterator<Item = (UserKey, UserValue)>,
189        seqno_generator: &SequenceNumberCounter,
190        visible_seqno: &SequenceNumberCounter,
191    ) -> crate::Result<()> {
192        use crate::tree::ingest::Ingestion;
193        use std::time::Instant;
194
195        let seqno = seqno_generator.next();
196
197        // TODO: allow ingestion always, by flushing memtable
198
199        let mut writer = Ingestion::new(self)?.with_seqno(seqno);
200
201        let start = Instant::now();
202        let mut count = 0;
203        let mut last_key = None;
204
205        for (key, value) in iter {
206            if let Some(last_key) = &last_key {
207                assert!(
208                    key > last_key,
209                    "next key in bulk ingest was not greater than last key, last: {last_key:?}, next: {key:?}",
210                );
211            }
212            last_key = Some(key.clone());
213
214            writer.write(key, value)?;
215
216            count += 1;
217        }
218
219        writer.finish()?;
220
221        visible_seqno.fetch_max(seqno + 1);
222
223        log::info!("Ingested {count} items in {:?}", start.elapsed());
224
225        Ok(())
226    }
227
228    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
229        let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
230
231        if is_empty {
232            return Ok(());
233        }
234
235        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
236
237        // IMPORTANT: Write lock so we can be the only compaction going on
238        let _lock = self
239            .0
240            .major_compaction_lock
241            .write()
242            .expect("lock is poisoned");
243
244        log::info!("Starting drop_range compaction");
245        self.inner_compact(strategy, 0)
246    }
247
248    #[doc(hidden)]
249    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
250        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
251
252        // IMPORTANT: Write lock so we can be the only compaction going on
253        let _lock = self
254            .0
255            .major_compaction_lock
256            .write()
257            .expect("lock is poisoned");
258
259        log::info!("Starting major compaction");
260        self.inner_compact(strategy, seqno_threshold)
261    }
262
263    fn l0_run_count(&self) -> usize {
264        self.current_version()
265            .level(0)
266            .map(|x| x.run_count())
267            .unwrap_or_default()
268    }
269
270    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
271        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
272        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
273    }
274
275    fn filter_size(&self) -> usize {
276        self.current_version()
277            .iter_tables()
278            .map(Table::filter_size)
279            .sum()
280    }
281
282    fn pinned_filter_size(&self) -> usize {
283        self.current_version()
284            .iter_tables()
285            .map(Table::pinned_filter_size)
286            .sum()
287    }
288
289    fn pinned_block_index_size(&self) -> usize {
290        self.current_version()
291            .iter_tables()
292            .map(Table::pinned_block_index_size)
293            .sum()
294    }
295
296    fn sealed_memtable_count(&self) -> usize {
297        self.version_history
298            .read()
299            .expect("lock is poisoned")
300            .latest_version()
301            .sealed_memtables
302            .len()
303    }
304
305    fn flush_memtable(
306        &self,
307        table_id: TableId,
308        memtable: &Arc<Memtable>,
309        seqno_threshold: SeqNo,
310    ) -> crate::Result<Option<(Table, Option<BlobFile>)>> {
311        use crate::{compaction::stream::CompactionStream, file::TABLES_FOLDER, table::Writer};
312        use std::time::Instant;
313
314        let start = Instant::now();
315
316        let folder = self.config.path.join(TABLES_FOLDER);
317        let table_file_path = folder.join(table_id.to_string());
318
319        let data_block_size = self.config.data_block_size_policy.get(0);
320
321        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
322        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
323
324        let data_block_compression = self.config.data_block_compression_policy.get(0);
325        let index_block_compression = self.config.index_block_compression_policy.get(0);
326
327        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
328
329        let index_partitioning = self.config.index_block_partitioning_policy.get(0);
330        let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
331
332        log::debug!(
333            "Flushing table to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
334            table_file_path.display(),
335        );
336
337        let mut table_writer = Writer::new(table_file_path, table_id, 0)?
338            .use_data_block_restart_interval(data_block_restart_interval)
339            .use_index_block_restart_interval(index_block_restart_interval)
340            .use_data_block_compression(data_block_compression)
341            .use_index_block_compression(index_block_compression)
342            .use_data_block_size(data_block_size)
343            .use_data_block_hash_ratio(data_block_hash_ratio)
344            .use_bloom_policy({
345                use crate::config::FilterPolicyEntry::{Bloom, None};
346                use crate::table::filter::BloomConstructionPolicy;
347
348                match self.config.filter_policy.get(0) {
349                    Bloom(policy) => policy,
350                    None => BloomConstructionPolicy::BitsPerKey(0.0),
351                }
352            });
353
354        if index_partitioning {
355            table_writer = table_writer.use_partitioned_index();
356        }
357        if filter_partitioning {
358            table_writer = table_writer.use_partitioned_filter();
359        }
360
361        let iter = memtable.iter().map(Ok);
362        let compaction_filter = CompactionStream::new(iter, seqno_threshold);
363
364        for item in compaction_filter {
365            table_writer.write(item?)?;
366        }
367
368        let result = self.consume_writer(table_writer)?;
369
370        log::debug!("Flushed memtable {table_id:?} in {:?}", start.elapsed());
371
372        Ok(result.map(|table| (table, None)))
373    }
374
375    #[expect(clippy::significant_drop_tightening)]
376    fn register_tables(
377        &self,
378        tables: &[Table],
379        blob_files: Option<&[BlobFile]>,
380        frag_map: Option<FragmentationMap>,
381    ) -> crate::Result<()> {
382        log::trace!(
383            "Registering {} tables, {} blob files",
384            tables.len(),
385            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
386        );
387
388        let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
389        let mut version_lock = self.version_history.write().expect("lock is poisoned");
390
391        version_lock.upgrade_version(
392            &self.config.path,
393            |current| {
394                let mut copy = current.clone();
395
396                copy.version = copy.version.with_new_l0_run(
397                    tables,
398                    blob_files,
399                    frag_map.filter(|x| !x.is_empty()),
400                );
401
402                for table in tables {
403                    log::trace!("releasing sealed memtable {}", table.id());
404                    copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table.id()));
405                }
406
407                Ok(copy)
408            },
409            &self.config.seqno,
410        )?;
411
412        Ok(())
413    }
414
415    fn clear_active_memtable(&self) {
416        self.version_history
417            .write()
418            .expect("lock is poisoned")
419            .latest_version()
420            .active_memtable = Arc::new(Memtable::default());
421    }
422
423    fn set_active_memtable(&self, memtable: Memtable) {
424        self.version_history
425            .write()
426            .expect("lock is poisoned")
427            .latest_version()
428            .active_memtable = Arc::new(memtable);
429    }
430
431    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
432        let mut version_lock = self.version_history.write().expect("lock is poisoned");
433        version_lock.append_sealed_memtable(id, memtable);
434    }
435
436    fn compact(
437        &self,
438        strategy: Arc<dyn CompactionStrategy>,
439        seqno_threshold: SeqNo,
440    ) -> crate::Result<()> {
441        // NOTE: Read lock major compaction lock
442        // That way, if a major compaction is running, we cannot proceed
443        // But in general, parallel (non-major) compactions can occur
444        let _lock = self
445            .0
446            .major_compaction_lock
447            .read()
448            .expect("lock is poisoned");
449
450        self.inner_compact(strategy, seqno_threshold)
451    }
452
453    fn get_next_table_id(&self) -> TableId {
454        self.0.get_next_table_id()
455    }
456
457    fn tree_config(&self) -> &Config {
458        &self.config
459    }
460
461    fn active_memtable_size(&self) -> u64 {
462        use std::sync::atomic::Ordering::Acquire;
463
464        self.version_history
465            .read()
466            .expect("lock is poisoned")
467            .latest_version()
468            .active_memtable
469            .approximate_size
470            .load(Acquire)
471    }
472
473    fn tree_type(&self) -> crate::TreeType {
474        crate::TreeType::Standard
475    }
476
477    #[expect(clippy::significant_drop_tightening)]
478    fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
479        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
480        let super_version = version_history_lock.latest_version();
481
482        if super_version.active_memtable.is_empty() {
483            return None;
484        }
485
486        let yanked_memtable = super_version.active_memtable;
487        let tmp_memtable_id = self.get_next_table_id();
488
489        let mut copy = version_history_lock.latest_version();
490        copy.seqno = self.config.seqno.next();
491        copy.active_memtable = Arc::new(Memtable::default());
492        copy.sealed_memtables = Arc::new(
493            super_version
494                .sealed_memtables
495                .add(tmp_memtable_id, yanked_memtable.clone()),
496        );
497
498        version_history_lock.append_version(copy);
499
500        log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
501
502        Some((tmp_memtable_id, yanked_memtable))
503    }
504
505    fn table_count(&self) -> usize {
506        self.current_version().table_count()
507    }
508
509    fn level_table_count(&self, idx: usize) -> Option<usize> {
510        self.current_version().level(idx).map(|x| x.table_count())
511    }
512
513    #[expect(clippy::significant_drop_tightening)]
514    fn approximate_len(&self) -> usize {
515        let super_version = self
516            .version_history
517            .read()
518            .expect("lock is poisoned")
519            .latest_version();
520
521        let tables_item_count = self
522            .current_version()
523            .iter_tables()
524            .map(|x| x.metadata.item_count)
525            .sum::<u64>();
526
527        let memtable_count = super_version.active_memtable.len() as u64;
528        let sealed_count = super_version
529            .sealed_memtables
530            .iter()
531            .map(|(_, mt)| mt.len())
532            .sum::<usize>() as u64;
533
534        (memtable_count + sealed_count + tables_item_count)
535            .try_into()
536            .expect("should not be too large")
537    }
538
539    fn disk_space(&self) -> u64 {
540        self.current_version()
541            .iter_levels()
542            .map(super::version::Level::size)
543            .sum()
544    }
545
546    #[expect(clippy::significant_drop_tightening)]
547    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
548        let version = self
549            .version_history
550            .read()
551            .expect("lock is poisoned")
552            .latest_version();
553
554        let active = version.active_memtable.get_highest_seqno();
555
556        let sealed = version
557            .sealed_memtables
558            .iter()
559            .map(|(_, table)| table.get_highest_seqno())
560            .max()
561            .flatten();
562
563        active.max(sealed)
564    }
565
566    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
567        self.current_version()
568            .iter_tables()
569            .map(Table::get_highest_seqno)
570            .max()
571    }
572
573    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
574        Ok(self
575            .get_internal_entry(key.as_ref(), seqno)?
576            .map(|x| x.value))
577    }
578
579    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
580        &self,
581        key: K,
582        value: V,
583        seqno: SeqNo,
584    ) -> (u64, u64) {
585        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
586        self.append_entry(value)
587    }
588
589    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
590        let value = InternalValue::new_tombstone(key, seqno);
591        self.append_entry(value)
592    }
593
594    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
595        let value = InternalValue::new_weak_tombstone(key, seqno);
596        self.append_entry(value)
597    }
598}
599
600impl Tree {
601    pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
602        self.version_history
603            .read()
604            .expect("lock is poisoned")
605            .get_version_for_snapshot(seqno)
606    }
607
608    /// Normalizes a user-provided range into owned `Bound<Slice>` values.
609    ///
610    /// Returns a tuple containing:
611    /// - the `OwnedBounds` that mirror the original bounds semantics (including
612    ///   inclusive/exclusive markers and unbounded endpoints), and
613    /// - a `bool` flag indicating whether the normalized range is logically
614    ///   empty (e.g., when the lower bound is greater than the upper bound).
615    ///
616    /// Callers can use the flag to detect empty ranges and skip further work
617    /// while still having access to the normalized bounds for non-empty cases.
618    fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
619        range: &R,
620    ) -> (OwnedBounds, bool) {
621        use Bound::{Excluded, Included, Unbounded};
622
623        let start = match range.start_bound() {
624            Included(key) => Included(Slice::from(key.as_ref())),
625            Excluded(key) => Excluded(Slice::from(key.as_ref())),
626            Unbounded => Unbounded,
627        };
628
629        let end = match range.end_bound() {
630            Included(key) => Included(Slice::from(key.as_ref())),
631            Excluded(key) => Excluded(Slice::from(key.as_ref())),
632            Unbounded => Unbounded,
633        };
634
635        let is_empty =
636            if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
637                lo.as_ref() > hi.as_ref()
638            } else {
639                false
640            };
641
642        (OwnedBounds { start, end }, is_empty)
643    }
644
645    /// Opens an LSM-tree in the given directory.
646    ///
647    /// Will recover previous state if the folder was previously
648    /// occupied by an LSM-tree, including the previous configuration.
649    /// If not, a new tree will be initialized with the given config.
650    ///
651    /// After recovering a previous state, use [`Tree::set_active_memtable`]
652    /// to fill the memtable with data from a write-ahead log for full durability.
653    ///
654    /// # Errors
655    ///
656    /// Returns error, if an IO error occurred.
657    pub(crate) fn open(config: Config) -> crate::Result<Self> {
658        use crate::file::MANIFEST_FILE;
659
660        log::debug!("Opening LSM-tree at {}", config.path.display());
661
662        // Check for old version
663        if config.path.join("version").try_exists()? {
664            log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
665            return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
666        }
667
668        let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
669            Self::recover(config)
670        } else {
671            Self::create_new(config)
672        }?;
673
674        Ok(tree)
675    }
676
677    pub(crate) fn consume_writer(
678        &self,
679        writer: crate::table::Writer,
680    ) -> crate::Result<Option<Table>> {
681        let table_file_path = writer.path.clone();
682
683        let Some((_, checksum)) = writer.finish()? else {
684            return Ok(None);
685        };
686
687        log::debug!("Finalized table write at {}", table_file_path.display());
688
689        let pin_filter = self.config.filter_block_pinning_policy.get(0);
690        let pin_index = self.config.filter_block_pinning_policy.get(0);
691
692        let created_table = Table::recover(
693            table_file_path,
694            checksum,
695            self.id,
696            self.config.cache.clone(),
697            self.config.descriptor_table.clone(),
698            pin_filter,
699            pin_index,
700            #[cfg(feature = "metrics")]
701            self.metrics.clone(),
702        )?;
703
704        log::debug!("Flushed table to {:?}", created_table.path);
705
706        Ok(Some(created_table))
707    }
708
709    /// Returns `true` if there are some tables that are being compacted.
710    #[doc(hidden)]
711    #[must_use]
712    pub fn is_compacting(&self) -> bool {
713        !self
714            .compaction_state
715            .lock()
716            .expect("lock is poisoned")
717            .hidden_set()
718            .is_empty()
719    }
720
721    fn get_internal_entry_from_sealed_memtables(
722        super_version: &SuperVersion,
723        key: &[u8],
724        seqno: SeqNo,
725    ) -> Option<InternalValue> {
726        for (_, memtable) in super_version.sealed_memtables.iter().rev() {
727            if let Some(entry) = memtable.get(key, seqno) {
728                return Some(entry);
729            }
730        }
731
732        None
733    }
734
735    fn get_internal_entry_from_tables(
736        &self,
737        version: &Version,
738        key: &[u8],
739        seqno: SeqNo,
740    ) -> crate::Result<Option<InternalValue>> {
741        // NOTE: Create key hash for hash sharing
742        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
743        let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
744
745        for level in version.iter_levels() {
746            for run in level.iter() {
747                // NOTE: Based on benchmarking, binary search is only worth it with ~4 tables
748                if run.len() >= 4 {
749                    if let Some(table) = run.get_for_key(key) {
750                        if let Some(item) = table.get(key, seqno, key_hash)? {
751                            return Ok(ignore_tombstone_value(item));
752                        }
753                    }
754                } else {
755                    // NOTE: Fallback to linear search
756                    for table in run.iter() {
757                        if !table.is_key_in_key_range(key) {
758                            continue;
759                        }
760
761                        if let Some(item) = table.get(key, seqno, key_hash)? {
762                            return Ok(ignore_tombstone_value(item));
763                        }
764                    }
765                }
766            }
767        }
768
769        Ok(None)
770    }
771
772    fn inner_compact(
773        &self,
774        strategy: Arc<dyn CompactionStrategy>,
775        mvcc_gc_watermark: SeqNo,
776    ) -> crate::Result<()> {
777        use crate::compaction::worker::{do_compaction, Options};
778
779        let mut opts = Options::from_tree(self, strategy);
780        opts.mvcc_gc_watermark = mvcc_gc_watermark;
781
782        do_compaction(&opts)?;
783
784        log::debug!("Compaction run over");
785
786        Ok(())
787    }
788
789    #[doc(hidden)]
790    #[must_use]
791    pub fn create_iter(
792        &self,
793        seqno: SeqNo,
794        ephemeral: Option<Arc<Memtable>>,
795    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
796        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
797    }
798
799    #[doc(hidden)]
800    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
801        &'a self,
802        range: &'a R,
803        seqno: SeqNo,
804        ephemeral: Option<Arc<Memtable>>,
805    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
806        use crate::range::{IterState, TreeIter};
807        use std::ops::Bound::{self, Excluded, Included, Unbounded};
808
809        let lo: Bound<UserKey> = match range.start_bound() {
810            Included(x) => Included(x.as_ref().into()),
811            Excluded(x) => Excluded(x.as_ref().into()),
812            Unbounded => Unbounded,
813        };
814
815        let hi: Bound<UserKey> = match range.end_bound() {
816            Included(x) => Included(x.as_ref().into()),
817            Excluded(x) => Excluded(x.as_ref().into()),
818            Unbounded => Unbounded,
819        };
820
821        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
822
823        let version = self.get_version_for_snapshot(seqno);
824
825        let iter_state = { IterState { version, ephemeral } };
826
827        TreeIter::create_range(iter_state, bounds, seqno)
828    }
829
830    #[doc(hidden)]
831    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
832        &self,
833        range: &'a R,
834        seqno: SeqNo,
835        ephemeral: Option<Arc<Memtable>>,
836    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
837        self.create_internal_range(range, seqno, ephemeral)
838            .map(|item| match item {
839                Ok(kv) => Ok((kv.key.user_key, kv.value)),
840                Err(e) => Err(e),
841            })
842    }
843
844    #[doc(hidden)]
845    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
846        &self,
847        prefix: K,
848        seqno: SeqNo,
849        ephemeral: Option<Arc<Memtable>>,
850    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
851        use crate::range::prefix_to_range;
852
853        let range = prefix_to_range(prefix.as_ref());
854        self.create_range(&range, seqno, ephemeral)
855    }
856
857    /// Adds an item to the active memtable.
858    ///
859    /// Returns the added item's size and new size of the memtable.
860    #[doc(hidden)]
861    #[must_use]
862    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
863        self.version_history
864            .read()
865            .expect("lock is poisoned")
866            .latest_version()
867            .active_memtable
868            .insert(value)
869    }
870
871    /// Recovers previous state, by loading the level manifest, tables and blob files.
872    ///
873    /// # Errors
874    ///
875    /// Returns error, if an IO error occurred.
876    fn recover(mut config: Config) -> crate::Result<Self> {
877        use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
878        use inner::get_next_tree_id;
879
880        log::info!("Recovering LSM-tree at {}", config.path.display());
881
882        let manifest = {
883            let manifest_path = config.path.join(MANIFEST_FILE);
884            let reader = sfa::Reader::new(&manifest_path)?;
885            Manifest::decode_from(&manifest_path, &reader)?
886        };
887
888        if manifest.version != FormatVersion::V3 {
889            if manifest.version == FormatVersion::V2 {
890                log::error!("It looks like you are trying to open a V2 database - the database needs a manual migration, a tool is available at <TODO: 3.0.0 LINK>.");
891            }
892            if manifest.version as u8 > 3 {
893                log::error!("It looks like you are trying to open a database from the future. Are you a time traveller?");
894            }
895            return Err(crate::Error::InvalidVersion(manifest.version.into()));
896        }
897
898        // IMPORTANT: Restore persisted config
899        config.level_count = manifest.level_count;
900
901        let tree_id = get_next_tree_id();
902
903        #[cfg(feature = "metrics")]
904        let metrics = Arc::new(Metrics::default());
905
906        let version = Self::recover_levels(
907            &config.path,
908            tree_id,
909            &config.cache,
910            &config.descriptor_table,
911            #[cfg(feature = "metrics")]
912            &metrics,
913        )?;
914
915        let highest_table_id = version
916            .iter_tables()
917            .map(Table::id)
918            .max()
919            .unwrap_or_default();
920
921        let inner = TreeInner {
922            id: tree_id,
923            table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
924            blob_file_id_generator: SequenceNumberCounter::default(),
925            version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
926            stop_signal: StopSignal::default(),
927            config,
928            major_compaction_lock: RwLock::default(),
929            compaction_state: Arc::new(Mutex::new(CompactionState::default())),
930
931            #[cfg(feature = "metrics")]
932            metrics,
933        };
934
935        Ok(Self(Arc::new(inner)))
936    }
937
938    /// Creates a new LSM-tree in a directory.
939    fn create_new(config: Config) -> crate::Result<Self> {
940        use crate::file::{fsync_directory, MANIFEST_FILE, TABLES_FOLDER};
941        use std::fs::create_dir_all;
942
943        let path = config.path.clone();
944        log::trace!("Creating LSM-tree at {}", path.display());
945
946        create_dir_all(&path)?;
947
948        let manifest_path = path.join(MANIFEST_FILE);
949        assert!(!manifest_path.try_exists()?);
950
951        let table_folder_path = path.join(TABLES_FOLDER);
952        create_dir_all(&table_folder_path)?;
953
954        // Create manifest
955        {
956            let mut writer = sfa::Writer::new_at_path(manifest_path)?;
957
958            Manifest {
959                version: FormatVersion::V3,
960                level_count: config.level_count,
961                tree_type: if config.kv_separation_opts.is_some() {
962                    TreeType::Blob
963                } else {
964                    TreeType::Standard
965                },
966            }
967            .encode_into(&mut writer)?;
968
969            writer.finish()?;
970        }
971
972        // IMPORTANT: fsync folders on Unix
973        fsync_directory(&table_folder_path)?;
974        fsync_directory(&path)?;
975
976        let inner = TreeInner::create_new(config)?;
977        Ok(Self(Arc::new(inner)))
978    }
979
980    /// Recovers the level manifest, loading all tables from disk.
981    fn recover_levels<P: AsRef<Path>>(
982        tree_path: P,
983        tree_id: TreeId,
984        cache: &Arc<Cache>,
985        descriptor_table: &Arc<DescriptorTable>,
986        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
987    ) -> crate::Result<Version> {
988        use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
989
990        let tree_path = tree_path.as_ref();
991
992        let recovery = recover(tree_path)?;
993
994        let table_map = {
995            let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum)> =
996                crate::HashMap::default();
997
998            for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
999                for run in table_ids {
1000                    for &(table_id, checksum) in run {
1001                        #[expect(
1002                            clippy::expect_used,
1003                            reason = "there are always less than 256 levels"
1004                        )]
1005                        result.insert(
1006                            table_id,
1007                            (
1008                                level_idx
1009                                    .try_into()
1010                                    .expect("there are less than 256 levels"),
1011                                checksum,
1012                            ),
1013                        );
1014                    }
1015                }
1016            }
1017
1018            result
1019        };
1020
1021        let cnt = table_map.len();
1022
1023        log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1024
1025        let progress_mod = match cnt {
1026            _ if cnt <= 20 => 1,
1027            _ if cnt <= 100 => 10,
1028            _ => 100,
1029        };
1030
1031        let mut tables = vec![];
1032
1033        let table_base_folder = tree_path.join(TABLES_FOLDER);
1034
1035        if !table_base_folder.try_exists()? {
1036            std::fs::create_dir_all(&table_base_folder)?;
1037            fsync_directory(&table_base_folder)?;
1038        }
1039
1040        let mut orphaned_tables = vec![];
1041
1042        for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1043            let dirent = dirent?;
1044            let file_name = dirent.file_name();
1045
1046            // https://en.wikipedia.org/wiki/.DS_Store
1047            if file_name == ".DS_Store" {
1048                continue;
1049            }
1050
1051            // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
1052            if file_name.to_string_lossy().starts_with("._") {
1053                continue;
1054            }
1055
1056            let table_file_name = file_name.to_str().ok_or_else(|| {
1057                log::error!("invalid table file name {}", file_name.display());
1058                crate::Error::Unrecoverable
1059            })?;
1060
1061            let table_file_path = dirent.path();
1062            assert!(!table_file_path.is_dir());
1063
1064            log::debug!("Recovering table from {}", table_file_path.display());
1065
1066            let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1067                log::error!("invalid table file name {table_file_name:?}: {e:?}");
1068                crate::Error::Unrecoverable
1069            })?;
1070
1071            if let Some(&(level_idx, checksum)) = table_map.get(&table_id) {
1072                let table = Table::recover(
1073                    table_file_path,
1074                    checksum,
1075                    tree_id,
1076                    cache.clone(),
1077                    descriptor_table.clone(),
1078                    level_idx <= 1, // TODO: look at configuration
1079                    level_idx <= 2, // TODO: look at configuration
1080                    #[cfg(feature = "metrics")]
1081                    metrics.clone(),
1082                )?;
1083
1084                log::debug!("Recovered table from {:?}", table.path);
1085
1086                tables.push(table);
1087
1088                if idx % progress_mod == 0 {
1089                    log::debug!("Recovered {idx}/{cnt} tables");
1090                }
1091            } else {
1092                orphaned_tables.push(table_file_path);
1093            }
1094        }
1095
1096        if tables.len() < cnt {
1097            log::error!(
1098                "Recovered less tables than expected: {:?}",
1099                table_map.keys(),
1100            );
1101            return Err(crate::Error::Unrecoverable);
1102        }
1103
1104        log::debug!("Successfully recovered {} tables", tables.len());
1105
1106        let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1107            &tree_path.join(BLOBS_FOLDER),
1108            &recovery.blob_file_ids,
1109        )?;
1110
1111        let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1112
1113        // NOTE: Cleanup old versions
1114        // But only after we definitely recovered the latest version
1115        Self::cleanup_orphaned_version(tree_path, version.id())?;
1116
1117        for table_path in orphaned_tables {
1118            log::debug!("Deleting orphaned table {}", table_path.display());
1119            std::fs::remove_file(&table_path)?;
1120        }
1121
1122        for blob_file_path in orphaned_blob_files {
1123            log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1124            std::fs::remove_file(&blob_file_path)?;
1125        }
1126
1127        Ok(version)
1128    }
1129
1130    fn cleanup_orphaned_version(path: &Path, latest_version_id: VersionId) -> crate::Result<()> {
1131        let version_str = format!("v{latest_version_id}");
1132
1133        for file in std::fs::read_dir(path)? {
1134            let dirent = file?;
1135
1136            if dirent.file_type()?.is_dir() {
1137                continue;
1138            }
1139
1140            let name = dirent.file_name();
1141
1142            if name.to_string_lossy().starts_with('v') && *name != *version_str {
1143                log::trace!("Cleanup orphaned version {}", name.display());
1144                std::fs::remove_file(dirent.path())?;
1145            }
1146        }
1147
1148        Ok(())
1149    }
1150}