lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10    blob_tree::FragmentationMap,
11    compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
12    config::Config,
13    file::BLOBS_FOLDER,
14    format_version::FormatVersion,
15    iter_guard::{IterGuard, IterGuardImpl},
16    manifest::Manifest,
17    memtable::Memtable,
18    slice::Slice,
19    table::{multi_writer::MultiWriter, Table},
20    value::InternalValue,
21    version::{recovery::recover, SuperVersion, SuperVersions, Version, VersionId},
22    vlog::BlobFile,
23    AbstractTree, Cache, Checksum, DescriptorTable, KvPair, SeqNo, SequenceNumberCounter, TableId,
24    TreeType, UserKey, UserValue, ValueType,
25};
26use inner::{MemtableId, TreeId, TreeInner};
27use std::{
28    ops::{Bound, RangeBounds},
29    path::Path,
30    sync::{Arc, Mutex, MutexGuard, RwLock},
31};
32
33#[cfg(feature = "metrics")]
34use crate::metrics::Metrics;
35
36pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39    fn into_inner_if(self, pred: impl Fn(&UserKey) -> bool) -> crate::Result<Option<KvPair>> {
40        let (k, v) = self.0?;
41
42        if pred(&k) {
43            Ok(Some((k, v)))
44        } else {
45            Ok(None)
46        }
47    }
48
49    fn key(self) -> crate::Result<UserKey> {
50        self.0.map(|(k, _)| k)
51    }
52
53    fn size(self) -> crate::Result<u32> {
54        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
55        self.into_inner().map(|(_, v)| v.len() as u32)
56    }
57
58    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
59        self.0
60    }
61}
62
63fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
64    if item.is_tombstone() {
65        None
66    } else {
67        Some(item)
68    }
69}
70
71/// A log-structured merge tree (LSM-tree/LSMT)
72#[derive(Clone)]
73pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
74
75impl std::ops::Deref for Tree {
76    type Target = TreeInner;
77
78    fn deref(&self) -> &Self::Target {
79        &self.0
80    }
81}
82
83impl AbstractTree for Tree {
84    fn get_version_history_lock(
85        &self,
86    ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
87        self.version_history.write().expect("lock is poisoned")
88    }
89
90    fn next_table_id(&self) -> TableId {
91        self.0.table_id_counter.get()
92    }
93
94    fn id(&self) -> TreeId {
95        self.id
96    }
97
98    fn blob_file_count(&self) -> usize {
99        0
100    }
101
102    #[expect(clippy::significant_drop_tightening)]
103    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
104        let version_history_lock = self.version_history.read().expect("lock is poisoned");
105        let super_version = version_history_lock.get_version_for_snapshot(seqno);
106
107        if let Some(entry) = super_version.active_memtable.get(key, seqno) {
108            return Ok(ignore_tombstone_value(entry));
109        }
110
111        // Now look in sealed memtables
112        if let Some(entry) =
113            Self::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
114        {
115            return Ok(ignore_tombstone_value(entry));
116        }
117
118        // Now look in tables... this may involve disk I/O
119        self.get_internal_entry_from_tables(&super_version.version, key, seqno)
120    }
121
122    fn current_version(&self) -> Version {
123        self.version_history
124            .read()
125            .expect("poisoned")
126            .latest_version()
127            .version
128    }
129
130    fn get_flush_lock(&self) -> MutexGuard<'_, ()> {
131        self.flush_lock.lock().expect("lock is poisoned")
132    }
133
134    #[cfg(feature = "metrics")]
135    fn metrics(&self) -> &Arc<crate::Metrics> {
136        &self.0.metrics
137    }
138
139    fn version_free_list_len(&self) -> usize {
140        self.version_history
141            .read()
142            .expect("lock is poisoned")
143            .free_list_len()
144    }
145
146    fn prefix<K: AsRef<[u8]>>(
147        &self,
148        prefix: K,
149        seqno: SeqNo,
150        index: Option<Arc<Memtable>>,
151    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
152        Box::new(
153            self.create_prefix(&prefix, seqno, index)
154                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
155        )
156    }
157
158    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
159        &self,
160        range: R,
161        seqno: SeqNo,
162        index: Option<Arc<Memtable>>,
163    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
164        Box::new(
165            self.create_range(&range, seqno, index)
166                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
167        )
168    }
169
170    /// Returns the number of tombstones in the tree.
171    fn tombstone_count(&self) -> u64 {
172        self.current_version()
173            .iter_tables()
174            .map(Table::tombstone_count)
175            .sum()
176    }
177
178    /// Returns the number of weak tombstones (single deletes) in the tree.
179    fn weak_tombstone_count(&self) -> u64 {
180        self.current_version()
181            .iter_tables()
182            .map(Table::weak_tombstone_count)
183            .sum()
184    }
185
186    /// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
187    fn weak_tombstone_reclaimable_count(&self) -> u64 {
188        self.current_version()
189            .iter_tables()
190            .map(Table::weak_tombstone_reclaimable)
191            .sum()
192    }
193
194    fn ingest(
195        &self,
196        iter: impl Iterator<Item = (UserKey, UserValue)>,
197        seqno_generator: &SequenceNumberCounter,
198        visible_seqno: &SequenceNumberCounter,
199    ) -> crate::Result<()> {
200        use crate::tree::ingest::Ingestion;
201        use std::time::Instant;
202
203        let seqno = seqno_generator.next();
204
205        // TODO: allow ingestion always, by flushing memtable
206
207        let mut writer = Ingestion::new(self)?.with_seqno(seqno);
208
209        let start = Instant::now();
210        let mut count = 0;
211        let mut last_key = None;
212
213        for (key, value) in iter {
214            if let Some(last_key) = &last_key {
215                assert!(
216                    key > last_key,
217                    "next key in bulk ingest was not greater than last key, last: {last_key:?}, next: {key:?}",
218                );
219            }
220            last_key = Some(key.clone());
221
222            writer.write(key, value)?;
223
224            count += 1;
225        }
226
227        writer.finish()?;
228
229        visible_seqno.fetch_max(seqno + 1);
230
231        log::info!("Ingested {count} items in {:?}", start.elapsed());
232
233        Ok(())
234    }
235
236    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
237        let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
238
239        if is_empty {
240            return Ok(());
241        }
242
243        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
244
245        // IMPORTANT: Write lock so we can be the only compaction going on
246        let _lock = self
247            .0
248            .major_compaction_lock
249            .write()
250            .expect("lock is poisoned");
251
252        log::info!("Starting drop_range compaction");
253        self.inner_compact(strategy, 0)
254    }
255
256    #[doc(hidden)]
257    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
258        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
259
260        // IMPORTANT: Write lock so we can be the only compaction going on
261        let _lock = self
262            .0
263            .major_compaction_lock
264            .write()
265            .expect("lock is poisoned");
266
267        log::info!("Starting major compaction");
268        self.inner_compact(strategy, seqno_threshold)
269    }
270
271    fn l0_run_count(&self) -> usize {
272        self.current_version()
273            .level(0)
274            .map(|x| x.run_count())
275            .unwrap_or_default()
276    }
277
278    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
279        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
280        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
281    }
282
283    fn filter_size(&self) -> usize {
284        self.current_version()
285            .iter_tables()
286            .map(Table::filter_size)
287            .sum()
288    }
289
290    fn pinned_filter_size(&self) -> usize {
291        self.current_version()
292            .iter_tables()
293            .map(Table::pinned_filter_size)
294            .sum()
295    }
296
297    fn pinned_block_index_size(&self) -> usize {
298        self.current_version()
299            .iter_tables()
300            .map(Table::pinned_block_index_size)
301            .sum()
302    }
303
304    fn sealed_memtable_count(&self) -> usize {
305        self.version_history
306            .read()
307            .expect("lock is poisoned")
308            .latest_version()
309            .sealed_memtables
310            .len()
311    }
312
313    fn flush_to_tables(
314        &self,
315        stream: impl Iterator<Item = crate::Result<InternalValue>>,
316    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
317        use crate::file::TABLES_FOLDER;
318        use std::time::Instant;
319
320        let start = Instant::now();
321
322        let folder = self.config.path.join(TABLES_FOLDER);
323
324        let data_block_size = self.config.data_block_size_policy.get(0);
325
326        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
327        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
328
329        let data_block_compression = self.config.data_block_compression_policy.get(0);
330        let index_block_compression = self.config.index_block_compression_policy.get(0);
331
332        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
333
334        let index_partitioning = self.config.index_block_partitioning_policy.get(0);
335        let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
336
337        log::debug!(
338            "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
339            folder.display(),
340        );
341
342        let mut table_writer = MultiWriter::new(
343            folder.clone(),
344            self.table_id_counter.clone(),
345            64 * 1_024 * 1_024,
346            0,
347        )?
348        .use_data_block_restart_interval(data_block_restart_interval)
349        .use_index_block_restart_interval(index_block_restart_interval)
350        .use_data_block_compression(data_block_compression)
351        .use_index_block_compression(index_block_compression)
352        .use_data_block_size(data_block_size)
353        .use_data_block_hash_ratio(data_block_hash_ratio)
354        .use_bloom_policy({
355            use crate::config::FilterPolicyEntry::{Bloom, None};
356            use crate::table::filter::BloomConstructionPolicy;
357
358            match self.config.filter_policy.get(0) {
359                Bloom(policy) => policy,
360                None => BloomConstructionPolicy::BitsPerKey(0.0),
361            }
362        });
363
364        if index_partitioning {
365            table_writer = table_writer.use_partitioned_index();
366        }
367        if filter_partitioning {
368            table_writer = table_writer.use_partitioned_filter();
369        }
370
371        for item in stream {
372            table_writer.write(item?)?;
373        }
374
375        let result = table_writer.finish()?;
376
377        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
378
379        let pin_filter = self.config.filter_block_pinning_policy.get(0);
380        let pin_index = self.config.index_block_pinning_policy.get(0);
381
382        // Load tables
383        let tables = result
384            .into_iter()
385            .map(|(table_id, checksum)| -> crate::Result<Table> {
386                Table::recover(
387                    folder.join(table_id.to_string()),
388                    checksum,
389                    self.id,
390                    self.config.cache.clone(),
391                    self.config.descriptor_table.clone(),
392                    pin_filter,
393                    pin_index,
394                    #[cfg(feature = "metrics")]
395                    self.metrics.clone(),
396                )
397            })
398            .collect::<crate::Result<Vec<_>>>()?;
399
400        Ok(Some((tables, None)))
401    }
402
403    #[expect(clippy::significant_drop_tightening)]
404    fn register_tables(
405        &self,
406        tables: &[Table],
407        blob_files: Option<&[BlobFile]>,
408        frag_map: Option<FragmentationMap>,
409        sealed_memtables_to_delete: &[MemtableId],
410        gc_watermark: SeqNo,
411    ) -> crate::Result<()> {
412        log::trace!(
413            "Registering {} tables, {} blob files",
414            tables.len(),
415            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
416        );
417
418        let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
419        let mut version_lock = self.version_history.write().expect("lock is poisoned");
420
421        version_lock.upgrade_version(
422            &self.config.path,
423            |current| {
424                let mut copy = current.clone();
425
426                copy.version = copy.version.with_new_l0_run(
427                    tables,
428                    blob_files,
429                    frag_map.filter(|x| !x.is_empty()),
430                );
431
432                for &table_id in sealed_memtables_to_delete {
433                    log::trace!("releasing sealed memtable #{table_id}");
434                    copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
435                }
436
437                Ok(copy)
438            },
439            &self.config.seqno,
440        )?;
441
442        if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
443            log::warn!("Version GC failed: {e:?}");
444        }
445
446        Ok(())
447    }
448
449    fn clear_active_memtable(&self) {
450        self.version_history
451            .write()
452            .expect("lock is poisoned")
453            .latest_version()
454            .active_memtable = Arc::new(Memtable::default());
455    }
456
457    fn set_active_memtable(&self, memtable: Memtable) {
458        self.version_history
459            .write()
460            .expect("lock is poisoned")
461            .latest_version()
462            .active_memtable = Arc::new(memtable);
463    }
464
465    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
466        let mut version_lock = self.version_history.write().expect("lock is poisoned");
467        version_lock.append_sealed_memtable(id, memtable);
468    }
469
470    fn compact(
471        &self,
472        strategy: Arc<dyn CompactionStrategy>,
473        seqno_threshold: SeqNo,
474    ) -> crate::Result<()> {
475        // NOTE: Read lock major compaction lock
476        // That way, if a major compaction is running, we cannot proceed
477        // But in general, parallel (non-major) compactions can occur
478        let _lock = self
479            .0
480            .major_compaction_lock
481            .read()
482            .expect("lock is poisoned");
483
484        self.inner_compact(strategy, seqno_threshold)
485    }
486
487    fn get_next_table_id(&self) -> TableId {
488        self.0.get_next_table_id()
489    }
490
491    fn tree_config(&self) -> &Config {
492        &self.config
493    }
494
495    fn active_memtable_size(&self) -> u64 {
496        use std::sync::atomic::Ordering::Acquire;
497
498        self.version_history
499            .read()
500            .expect("lock is poisoned")
501            .latest_version()
502            .active_memtable
503            .approximate_size
504            .load(Acquire)
505    }
506
507    fn tree_type(&self) -> crate::TreeType {
508        crate::TreeType::Standard
509    }
510
511    #[expect(clippy::significant_drop_tightening)]
512    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
513        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
514        let super_version = version_history_lock.latest_version();
515
516        if super_version.active_memtable.is_empty() {
517            return None;
518        }
519
520        let yanked_memtable = super_version.active_memtable;
521        let memtable_id = self.0.memtable_id_counter.next();
522
523        let mut copy = version_history_lock.latest_version();
524        copy.seqno = self.config.seqno.next();
525        copy.active_memtable = Arc::new(Memtable::default());
526        copy.sealed_memtables = Arc::new(
527            super_version
528                .sealed_memtables
529                .add(memtable_id, yanked_memtable.clone()),
530        );
531
532        version_history_lock.append_version(copy);
533
534        log::trace!("rotate: added memtable id={memtable_id} to sealed memtables");
535
536        Some(yanked_memtable)
537    }
538
539    fn table_count(&self) -> usize {
540        self.current_version().table_count()
541    }
542
543    fn level_table_count(&self, idx: usize) -> Option<usize> {
544        self.current_version().level(idx).map(|x| x.table_count())
545    }
546
547    #[expect(clippy::significant_drop_tightening)]
548    fn approximate_len(&self) -> usize {
549        let super_version = self
550            .version_history
551            .read()
552            .expect("lock is poisoned")
553            .latest_version();
554
555        let tables_item_count = self
556            .current_version()
557            .iter_tables()
558            .map(|x| x.metadata.item_count)
559            .sum::<u64>();
560
561        let memtable_count = super_version.active_memtable.len() as u64;
562        let sealed_count = super_version
563            .sealed_memtables
564            .iter()
565            .map(|(_, mt)| mt.len())
566            .sum::<usize>() as u64;
567
568        (memtable_count + sealed_count + tables_item_count)
569            .try_into()
570            .expect("should not be too large")
571    }
572
573    fn disk_space(&self) -> u64 {
574        self.current_version()
575            .iter_levels()
576            .map(super::version::Level::size)
577            .sum()
578    }
579
580    #[expect(clippy::significant_drop_tightening)]
581    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
582        let version = self
583            .version_history
584            .read()
585            .expect("lock is poisoned")
586            .latest_version();
587
588        let active = version.active_memtable.get_highest_seqno();
589
590        let sealed = version
591            .sealed_memtables
592            .iter()
593            .map(|(_, table)| table.get_highest_seqno())
594            .max()
595            .flatten();
596
597        active.max(sealed)
598    }
599
600    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
601        self.current_version()
602            .iter_tables()
603            .map(Table::get_highest_seqno)
604            .max()
605    }
606
607    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
608        Ok(self
609            .get_internal_entry(key.as_ref(), seqno)?
610            .map(|x| x.value))
611    }
612
613    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
614        &self,
615        key: K,
616        value: V,
617        seqno: SeqNo,
618    ) -> (u64, u64) {
619        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
620        self.append_entry(value)
621    }
622
623    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
624        let value = InternalValue::new_tombstone(key, seqno);
625        self.append_entry(value)
626    }
627
628    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
629        let value = InternalValue::new_weak_tombstone(key, seqno);
630        self.append_entry(value)
631    }
632}
633
634impl Tree {
635    pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
636        self.version_history
637            .read()
638            .expect("lock is poisoned")
639            .get_version_for_snapshot(seqno)
640    }
641
642    /// Normalizes a user-provided range into owned `Bound<Slice>` values.
643    ///
644    /// Returns a tuple containing:
645    /// - the `OwnedBounds` that mirror the original bounds semantics (including
646    ///   inclusive/exclusive markers and unbounded endpoints), and
647    /// - a `bool` flag indicating whether the normalized range is logically
648    ///   empty (e.g., when the lower bound is greater than the upper bound).
649    ///
650    /// Callers can use the flag to detect empty ranges and skip further work
651    /// while still having access to the normalized bounds for non-empty cases.
652    fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
653        range: &R,
654    ) -> (OwnedBounds, bool) {
655        use Bound::{Excluded, Included, Unbounded};
656
657        let start = match range.start_bound() {
658            Included(key) => Included(Slice::from(key.as_ref())),
659            Excluded(key) => Excluded(Slice::from(key.as_ref())),
660            Unbounded => Unbounded,
661        };
662
663        let end = match range.end_bound() {
664            Included(key) => Included(Slice::from(key.as_ref())),
665            Excluded(key) => Excluded(Slice::from(key.as_ref())),
666            Unbounded => Unbounded,
667        };
668
669        let is_empty =
670            if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
671                lo.as_ref() > hi.as_ref()
672            } else {
673                false
674            };
675
676        (OwnedBounds { start, end }, is_empty)
677    }
678
679    /// Opens an LSM-tree in the given directory.
680    ///
681    /// Will recover previous state if the folder was previously
682    /// occupied by an LSM-tree, including the previous configuration.
683    /// If not, a new tree will be initialized with the given config.
684    ///
685    /// After recovering a previous state, use [`Tree::set_active_memtable`]
686    /// to fill the memtable with data from a write-ahead log for full durability.
687    ///
688    /// # Errors
689    ///
690    /// Returns error, if an IO error occurred.
691    pub(crate) fn open(config: Config) -> crate::Result<Self> {
692        use crate::file::MANIFEST_FILE;
693
694        log::debug!("Opening LSM-tree at {}", config.path.display());
695
696        // Check for old version
697        if config.path.join("version").try_exists()? {
698            log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
699            return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
700        }
701
702        let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
703            Self::recover(config)
704        } else {
705            Self::create_new(config)
706        }?;
707
708        Ok(tree)
709    }
710
711    /// Returns `true` if there are some tables that are being compacted.
712    #[doc(hidden)]
713    #[must_use]
714    pub fn is_compacting(&self) -> bool {
715        !self
716            .compaction_state
717            .lock()
718            .expect("lock is poisoned")
719            .hidden_set()
720            .is_empty()
721    }
722
723    fn get_internal_entry_from_sealed_memtables(
724        super_version: &SuperVersion,
725        key: &[u8],
726        seqno: SeqNo,
727    ) -> Option<InternalValue> {
728        for (_, memtable) in super_version.sealed_memtables.iter().rev() {
729            if let Some(entry) = memtable.get(key, seqno) {
730                return Some(entry);
731            }
732        }
733
734        None
735    }
736
737    fn get_internal_entry_from_tables(
738        &self,
739        version: &Version,
740        key: &[u8],
741        seqno: SeqNo,
742    ) -> crate::Result<Option<InternalValue>> {
743        // NOTE: Create key hash for hash sharing
744        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
745        let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
746
747        for level in version.iter_levels() {
748            for run in level.iter() {
749                // NOTE: Based on benchmarking, binary search is only worth it with ~4 tables
750                if run.len() >= 4 {
751                    if let Some(table) = run.get_for_key(key) {
752                        if let Some(item) = table.get(key, seqno, key_hash)? {
753                            return Ok(ignore_tombstone_value(item));
754                        }
755                    }
756                } else {
757                    // NOTE: Fallback to linear search
758                    for table in run.iter() {
759                        if !table.is_key_in_key_range(key) {
760                            continue;
761                        }
762
763                        if let Some(item) = table.get(key, seqno, key_hash)? {
764                            return Ok(ignore_tombstone_value(item));
765                        }
766                    }
767                }
768            }
769        }
770
771        Ok(None)
772    }
773
774    fn inner_compact(
775        &self,
776        strategy: Arc<dyn CompactionStrategy>,
777        mvcc_gc_watermark: SeqNo,
778    ) -> crate::Result<()> {
779        use crate::compaction::worker::{do_compaction, Options};
780
781        let mut opts = Options::from_tree(self, strategy);
782        opts.mvcc_gc_watermark = mvcc_gc_watermark;
783
784        do_compaction(&opts)?;
785
786        log::debug!("Compaction run over");
787
788        Ok(())
789    }
790
791    #[doc(hidden)]
792    #[must_use]
793    pub fn create_iter(
794        &self,
795        seqno: SeqNo,
796        ephemeral: Option<Arc<Memtable>>,
797    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
798        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
799    }
800
801    #[doc(hidden)]
802    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
803        &'a self,
804        range: &'a R,
805        seqno: SeqNo,
806        ephemeral: Option<Arc<Memtable>>,
807    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
808        use crate::range::{IterState, TreeIter};
809        use std::ops::Bound::{self, Excluded, Included, Unbounded};
810
811        let lo: Bound<UserKey> = match range.start_bound() {
812            Included(x) => Included(x.as_ref().into()),
813            Excluded(x) => Excluded(x.as_ref().into()),
814            Unbounded => Unbounded,
815        };
816
817        let hi: Bound<UserKey> = match range.end_bound() {
818            Included(x) => Included(x.as_ref().into()),
819            Excluded(x) => Excluded(x.as_ref().into()),
820            Unbounded => Unbounded,
821        };
822
823        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
824
825        let version = self.get_version_for_snapshot(seqno);
826
827        let iter_state = { IterState { version, ephemeral } };
828
829        TreeIter::create_range(iter_state, bounds, seqno)
830    }
831
832    #[doc(hidden)]
833    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
834        &self,
835        range: &'a R,
836        seqno: SeqNo,
837        ephemeral: Option<Arc<Memtable>>,
838    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
839        self.create_internal_range(range, seqno, ephemeral)
840            .map(|item| match item {
841                Ok(kv) => Ok((kv.key.user_key, kv.value)),
842                Err(e) => Err(e),
843            })
844    }
845
846    #[doc(hidden)]
847    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
848        &self,
849        prefix: K,
850        seqno: SeqNo,
851        ephemeral: Option<Arc<Memtable>>,
852    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
853        use crate::range::prefix_to_range;
854
855        let range = prefix_to_range(prefix.as_ref());
856        self.create_range(&range, seqno, ephemeral)
857    }
858
859    /// Adds an item to the active memtable.
860    ///
861    /// Returns the added item's size and new size of the memtable.
862    #[doc(hidden)]
863    #[must_use]
864    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
865        self.version_history
866            .read()
867            .expect("lock is poisoned")
868            .latest_version()
869            .active_memtable
870            .insert(value)
871    }
872
873    /// Recovers previous state, by loading the level manifest, tables and blob files.
874    ///
875    /// # Errors
876    ///
877    /// Returns error, if an IO error occurred.
878    fn recover(mut config: Config) -> crate::Result<Self> {
879        use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
880        use inner::get_next_tree_id;
881
882        log::info!("Recovering LSM-tree at {}", config.path.display());
883
884        let manifest = {
885            let manifest_path = config.path.join(MANIFEST_FILE);
886            let reader = sfa::Reader::new(&manifest_path)?;
887            Manifest::decode_from(&manifest_path, &reader)?
888        };
889
890        if manifest.version != FormatVersion::V3 {
891            if manifest.version == FormatVersion::V2 {
892                log::error!("It looks like you are trying to open a V2 database - the database needs a manual migration, a tool is available at <TODO: 3.0.0 LINK>.");
893            }
894            if manifest.version as u8 > 3 {
895                log::error!("It looks like you are trying to open a database from the future. Are you a time traveller?");
896            }
897            return Err(crate::Error::InvalidVersion(manifest.version.into()));
898        }
899
900        // IMPORTANT: Restore persisted config
901        config.level_count = manifest.level_count;
902
903        let tree_id = get_next_tree_id();
904
905        #[cfg(feature = "metrics")]
906        let metrics = Arc::new(Metrics::default());
907
908        let version = Self::recover_levels(
909            &config.path,
910            tree_id,
911            &config.cache,
912            &config.descriptor_table,
913            #[cfg(feature = "metrics")]
914            &metrics,
915        )?;
916
917        let highest_table_id = version
918            .iter_tables()
919            .map(Table::id)
920            .max()
921            .unwrap_or_default();
922
923        let inner = TreeInner {
924            id: tree_id,
925            memtable_id_counter: SequenceNumberCounter::default(),
926            table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
927            blob_file_id_counter: SequenceNumberCounter::default(),
928            version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
929            stop_signal: StopSignal::default(),
930            config,
931            major_compaction_lock: RwLock::default(),
932            flush_lock: Mutex::default(),
933            compaction_state: Arc::new(Mutex::new(CompactionState::default())),
934
935            #[cfg(feature = "metrics")]
936            metrics,
937        };
938
939        Ok(Self(Arc::new(inner)))
940    }
941
942    /// Creates a new LSM-tree in a directory.
943    fn create_new(config: Config) -> crate::Result<Self> {
944        use crate::file::{fsync_directory, MANIFEST_FILE, TABLES_FOLDER};
945        use std::fs::create_dir_all;
946
947        let path = config.path.clone();
948        log::trace!("Creating LSM-tree at {}", path.display());
949
950        create_dir_all(&path)?;
951
952        let manifest_path = path.join(MANIFEST_FILE);
953        assert!(!manifest_path.try_exists()?);
954
955        let table_folder_path = path.join(TABLES_FOLDER);
956        create_dir_all(&table_folder_path)?;
957
958        // Create manifest
959        {
960            let mut writer = sfa::Writer::new_at_path(manifest_path)?;
961
962            Manifest {
963                version: FormatVersion::V3,
964                level_count: config.level_count,
965                tree_type: if config.kv_separation_opts.is_some() {
966                    TreeType::Blob
967                } else {
968                    TreeType::Standard
969                },
970            }
971            .encode_into(&mut writer)?;
972
973            writer.finish()?;
974        }
975
976        // IMPORTANT: fsync folders on Unix
977        fsync_directory(&table_folder_path)?;
978        fsync_directory(&path)?;
979
980        let inner = TreeInner::create_new(config)?;
981        Ok(Self(Arc::new(inner)))
982    }
983
984    /// Recovers the level manifest, loading all tables from disk.
985    fn recover_levels<P: AsRef<Path>>(
986        tree_path: P,
987        tree_id: TreeId,
988        cache: &Arc<Cache>,
989        descriptor_table: &Arc<DescriptorTable>,
990        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
991    ) -> crate::Result<Version> {
992        use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
993
994        let tree_path = tree_path.as_ref();
995
996        let recovery = recover(tree_path)?;
997
998        let table_map = {
999            let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum)> =
1000                crate::HashMap::default();
1001
1002            for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1003                for run in table_ids {
1004                    for &(table_id, checksum) in run {
1005                        #[expect(
1006                            clippy::expect_used,
1007                            reason = "there are always less than 256 levels"
1008                        )]
1009                        result.insert(
1010                            table_id,
1011                            (
1012                                level_idx
1013                                    .try_into()
1014                                    .expect("there are less than 256 levels"),
1015                                checksum,
1016                            ),
1017                        );
1018                    }
1019                }
1020            }
1021
1022            result
1023        };
1024
1025        let cnt = table_map.len();
1026
1027        log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1028
1029        let progress_mod = match cnt {
1030            _ if cnt <= 20 => 1,
1031            _ if cnt <= 100 => 10,
1032            _ => 100,
1033        };
1034
1035        let mut tables = vec![];
1036
1037        let table_base_folder = tree_path.join(TABLES_FOLDER);
1038
1039        if !table_base_folder.try_exists()? {
1040            std::fs::create_dir_all(&table_base_folder)?;
1041            fsync_directory(&table_base_folder)?;
1042        }
1043
1044        let mut orphaned_tables = vec![];
1045
1046        for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1047            let dirent = dirent?;
1048            let file_name = dirent.file_name();
1049
1050            // https://en.wikipedia.org/wiki/.DS_Store
1051            if file_name == ".DS_Store" {
1052                continue;
1053            }
1054
1055            // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
1056            if file_name.to_string_lossy().starts_with("._") {
1057                continue;
1058            }
1059
1060            let table_file_name = file_name.to_str().ok_or_else(|| {
1061                log::error!("invalid table file name {}", file_name.display());
1062                crate::Error::Unrecoverable
1063            })?;
1064
1065            let table_file_path = dirent.path();
1066            assert!(!table_file_path.is_dir());
1067
1068            log::debug!("Recovering table from {}", table_file_path.display());
1069
1070            let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1071                log::error!("invalid table file name {table_file_name:?}: {e:?}");
1072                crate::Error::Unrecoverable
1073            })?;
1074
1075            if let Some(&(level_idx, checksum)) = table_map.get(&table_id) {
1076                let table = Table::recover(
1077                    table_file_path,
1078                    checksum,
1079                    tree_id,
1080                    cache.clone(),
1081                    descriptor_table.clone(),
1082                    level_idx <= 1, // TODO: look at configuration
1083                    level_idx <= 2, // TODO: look at configuration
1084                    #[cfg(feature = "metrics")]
1085                    metrics.clone(),
1086                )?;
1087
1088                log::debug!("Recovered table from {:?}", table.path);
1089
1090                tables.push(table);
1091
1092                if idx % progress_mod == 0 {
1093                    log::debug!("Recovered {idx}/{cnt} tables");
1094                }
1095            } else {
1096                orphaned_tables.push(table_file_path);
1097            }
1098        }
1099
1100        if tables.len() < cnt {
1101            log::error!(
1102                "Recovered less tables than expected: {:?}",
1103                table_map.keys(),
1104            );
1105            return Err(crate::Error::Unrecoverable);
1106        }
1107
1108        log::debug!("Successfully recovered {} tables", tables.len());
1109
1110        let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1111            &tree_path.join(BLOBS_FOLDER),
1112            &recovery.blob_file_ids,
1113        )?;
1114
1115        let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1116
1117        // NOTE: Cleanup old versions
1118        // But only after we definitely recovered the latest version
1119        Self::cleanup_orphaned_version(tree_path, version.id())?;
1120
1121        for table_path in orphaned_tables {
1122            log::debug!("Deleting orphaned table {}", table_path.display());
1123            std::fs::remove_file(&table_path)?;
1124        }
1125
1126        for blob_file_path in orphaned_blob_files {
1127            log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1128            std::fs::remove_file(&blob_file_path)?;
1129        }
1130
1131        Ok(version)
1132    }
1133
1134    fn cleanup_orphaned_version(path: &Path, latest_version_id: VersionId) -> crate::Result<()> {
1135        let version_str = format!("v{latest_version_id}");
1136
1137        for file in std::fs::read_dir(path)? {
1138            let dirent = file?;
1139
1140            if dirent.file_type()?.is_dir() {
1141                continue;
1142            }
1143
1144            let name = dirent.file_name();
1145
1146            if name.to_string_lossy().starts_with('v') && *name != *version_str {
1147                log::trace!("Cleanup orphaned version {}", name.display());
1148                std::fs::remove_file(dirent.path())?;
1149            }
1150        }
1151
1152        Ok(())
1153    }
1154}