lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod ingest;
6pub mod inner;
7mod sealed;
8
9use crate::{
10    blob_tree::FragmentationMap,
11    compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
12    config::Config,
13    file::BLOBS_FOLDER,
14    format_version::FormatVersion,
15    iter_guard::{IterGuard, IterGuardImpl},
16    manifest::Manifest,
17    memtable::Memtable,
18    slice::Slice,
19    table::Table,
20    tree::inner::SuperVersion,
21    value::InternalValue,
22    version::{recovery::recover, Version, VersionId},
23    vlog::BlobFile,
24    AbstractTree, Cache, Checksum, DescriptorTable, KvPair, SeqNo, SequenceNumberCounter, TableId,
25    TreeType, UserKey, UserValue, ValueType,
26};
27use inner::{MemtableId, TreeId, TreeInner};
28use std::{
29    ops::{Bound, RangeBounds},
30    path::Path,
31    sync::{atomic::AtomicU64, Arc, Mutex, RwLock},
32};
33
34#[cfg(feature = "metrics")]
35use crate::metrics::Metrics;
36
37pub struct Guard(crate::Result<(UserKey, UserValue)>);
38
39impl IterGuard for Guard {
40    fn key(self) -> crate::Result<UserKey> {
41        self.0.map(|(k, _)| k)
42    }
43
44    fn size(self) -> crate::Result<u32> {
45        // NOTE: We know LSM-tree values are 32 bits in length max
46        #[allow(clippy::cast_possible_truncation)]
47        self.into_inner().map(|(_, v)| v.len() as u32)
48    }
49
50    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
51        self.0
52    }
53}
54
55fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
56    if item.is_tombstone() {
57        None
58    } else {
59        Some(item)
60    }
61}
62
63/// A log-structured merge tree (LSM-tree/LSMT)
64#[derive(Clone)]
65pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
66
67impl std::ops::Deref for Tree {
68    type Target = TreeInner;
69
70    fn deref(&self) -> &Self::Target {
71        &self.0
72    }
73}
74
75impl AbstractTree for Tree {
76    fn next_table_id(&self) -> TableId {
77        self.0
78            .table_id_counter
79            .load(std::sync::atomic::Ordering::Relaxed)
80    }
81
82    fn id(&self) -> TreeId {
83        self.id
84    }
85
86    #[allow(clippy::significant_drop_tightening)]
87    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
88        let version_lock = self.super_version.read().expect("lock is poisoned");
89
90        if let Some(entry) = version_lock.active_memtable.get(key, seqno) {
91            return Ok(ignore_tombstone_value(entry));
92        }
93
94        // Now look in sealed memtables
95        if let Some(entry) =
96            Self::get_internal_entry_from_sealed_memtables(&version_lock, key, seqno)
97        {
98            return Ok(ignore_tombstone_value(entry));
99        }
100
101        // Now look in tables... this may involve disk I/O
102        Self::get_internal_entry_from_tables(&version_lock, key, seqno)
103    }
104
105    fn current_version(&self) -> Version {
106        self.super_version.read().expect("poisoned").version.clone()
107    }
108
109    fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Table>> {
110        log::debug!("Flushing active memtable");
111
112        let Some((table_id, yanked_memtable)) = self.rotate_memtable() else {
113            return Ok(None);
114        };
115
116        let Some((table, _)) = self.flush_memtable(table_id, &yanked_memtable, seqno_threshold)?
117        else {
118            return Ok(None);
119        };
120        self.register_tables(std::slice::from_ref(&table), None, None, seqno_threshold)?;
121
122        Ok(Some(table))
123    }
124
125    #[cfg(feature = "metrics")]
126    fn metrics(&self) -> &Arc<crate::Metrics> {
127        &self.0.metrics
128    }
129
130    fn version_free_list_len(&self) -> usize {
131        self.compaction_state
132            .lock()
133            .expect("lock is poisoned")
134            .version_free_list_len()
135    }
136
137    fn prefix<K: AsRef<[u8]>>(
138        &self,
139        prefix: K,
140        seqno: SeqNo,
141        index: Option<Arc<Memtable>>,
142    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
143        Box::new(
144            self.create_prefix(&prefix, seqno, index)
145                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
146        )
147    }
148
149    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
150        &self,
151        range: R,
152        seqno: SeqNo,
153        index: Option<Arc<Memtable>>,
154    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
155        Box::new(
156            self.create_range(&range, seqno, index)
157                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
158        )
159    }
160
161    /// Returns the number of tombstones in the tree.
162    fn tombstone_count(&self) -> u64 {
163        self.current_version()
164            .iter_tables()
165            .map(Table::tombstone_count)
166            .sum()
167    }
168
169    /// Returns the number of weak tombstones (single deletes) in the tree.
170    fn weak_tombstone_count(&self) -> u64 {
171        self.current_version()
172            .iter_tables()
173            .map(Table::weak_tombstone_count)
174            .sum()
175    }
176
177    /// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
178    fn weak_tombstone_reclaimable_count(&self) -> u64 {
179        self.current_version()
180            .iter_tables()
181            .map(Table::weak_tombstone_reclaimable)
182            .sum()
183    }
184
185    fn ingest(
186        &self,
187        iter: impl Iterator<Item = (UserKey, UserValue)>,
188        seqno_generator: &SequenceNumberCounter,
189        visible_seqno: &SequenceNumberCounter,
190    ) -> crate::Result<()> {
191        use crate::tree::ingest::Ingestion;
192        use std::time::Instant;
193
194        // // TODO: 3.0.0 ... hmmmm
195        // let global_lock = self.super_version.write().expect("lock is poisoned");
196
197        let seqno = seqno_generator.next();
198
199        // TODO: allow ingestion always, by flushing memtable
200        // assert!(
201        //     global_lock.active_memtable.is_empty(),
202        //     "can only perform bulk ingestion with empty memtable(s)",
203        // );
204        // assert!(
205        //     global_lock.sealed_memtables.len() == 0,
206        //     "can only perform bulk ingestion with empty memtable(s)",
207        // );
208
209        let mut writer = Ingestion::new(self)?.with_seqno(seqno);
210
211        let start = Instant::now();
212        let mut count = 0;
213        let mut last_key = None;
214
215        #[allow(clippy::explicit_counter_loop)]
216        for (key, value) in iter {
217            if let Some(last_key) = &last_key {
218                assert!(
219                    key > last_key,
220                    "next key in bulk ingest was not greater than last key, last: {last_key:?}, next: {key:?}",
221                );
222            }
223            last_key = Some(key.clone());
224
225            writer.write(key, value)?;
226
227            count += 1;
228        }
229
230        writer.finish()?;
231
232        visible_seqno.fetch_max(seqno + 1);
233
234        log::info!("Ingested {count} items in {:?}", start.elapsed());
235
236        Ok(())
237    }
238
239    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
240        let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
241
242        if is_empty {
243            return Ok(());
244        }
245
246        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
247
248        // IMPORTANT: Write lock so we can be the only compaction going on
249        let _lock = self
250            .0
251            .major_compaction_lock
252            .write()
253            .expect("lock is poisoned");
254
255        log::info!("Starting drop_range compaction");
256        self.inner_compact(strategy, 0)
257    }
258
259    #[doc(hidden)]
260    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
261        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
262
263        // IMPORTANT: Write lock so we can be the only compaction going on
264        let _lock = self
265            .0
266            .major_compaction_lock
267            .write()
268            .expect("lock is poisoned");
269
270        log::info!("Starting major compaction");
271        self.inner_compact(strategy, seqno_threshold)
272    }
273
274    fn l0_run_count(&self) -> usize {
275        self.current_version()
276            .level(0)
277            .map(|x| x.run_count())
278            .unwrap_or_default()
279    }
280
281    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
282        // NOTE: We know that values are u32 max
283        #[allow(clippy::cast_possible_truncation)]
284        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
285    }
286
287    fn filter_size(&self) -> usize {
288        self.current_version()
289            .iter_tables()
290            .map(Table::filter_size)
291            .sum()
292    }
293
294    fn pinned_filter_size(&self) -> usize {
295        self.current_version()
296            .iter_tables()
297            .map(Table::pinned_filter_size)
298            .sum()
299    }
300
301    fn pinned_block_index_size(&self) -> usize {
302        self.current_version()
303            .iter_tables()
304            .map(Table::pinned_block_index_size)
305            .sum()
306    }
307
308    fn sealed_memtable_count(&self) -> usize {
309        self.super_version
310            .read()
311            .expect("lock is poisoned")
312            .sealed_memtables
313            .len()
314    }
315
316    fn flush_memtable(
317        &self,
318        table_id: TableId,
319        memtable: &Arc<Memtable>,
320        seqno_threshold: SeqNo,
321    ) -> crate::Result<Option<(Table, Option<BlobFile>)>> {
322        use crate::{compaction::stream::CompactionStream, file::TABLES_FOLDER, table::Writer};
323        use std::time::Instant;
324
325        let start = Instant::now();
326
327        let folder = self.config.path.join(TABLES_FOLDER);
328        let table_file_path = folder.join(table_id.to_string());
329
330        let data_block_size = self.config.data_block_size_policy.get(0);
331        let index_block_size = self.config.index_block_size_policy.get(0);
332
333        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
334        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
335
336        let data_block_compression = self.config.data_block_compression_policy.get(0);
337        let index_block_compression = self.config.index_block_compression_policy.get(0);
338
339        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
340
341        let index_partioning = self.config.index_block_partitioning_policy.get(0);
342        let filter_partioning = self.config.filter_block_partitioning_policy.get(0);
343
344        log::debug!(
345            "Flushing table to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, index_block_size={index_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
346            table_file_path.display(),
347        );
348
349        let mut table_writer = Writer::new(table_file_path, table_id)?
350            .use_data_block_restart_interval(data_block_restart_interval)
351            .use_index_block_restart_interval(index_block_restart_interval)
352            .use_data_block_compression(data_block_compression)
353            .use_index_block_compression(index_block_compression)
354            .use_data_block_size(data_block_size)
355            .use_index_block_size(index_block_size)
356            .use_data_block_hash_ratio(data_block_hash_ratio)
357            .use_bloom_policy({
358                use crate::config::FilterPolicyEntry::{Bloom, None};
359                use crate::table::filter::BloomConstructionPolicy;
360
361                match self.config.filter_policy.get(0) {
362                    Bloom(policy) => policy,
363                    None => BloomConstructionPolicy::BitsPerKey(0.0),
364                }
365            });
366
367        if index_partioning {
368            table_writer = table_writer.use_partitioned_index();
369        }
370        if filter_partioning {
371            table_writer = table_writer.use_partitioned_filter();
372        }
373
374        let iter = memtable.iter().map(Ok);
375        let compaction_filter = CompactionStream::new(iter, seqno_threshold);
376
377        for item in compaction_filter {
378            table_writer.write(item?)?;
379        }
380
381        let result = self.consume_writer(table_writer)?;
382
383        log::debug!("Flushed memtable {table_id:?} in {:?}", start.elapsed());
384
385        Ok(result.map(|table| (table, None)))
386    }
387
388    #[allow(clippy::significant_drop_tightening)]
389    fn register_tables(
390        &self,
391        tables: &[Table],
392        blob_files: Option<&[BlobFile]>,
393        frag_map: Option<FragmentationMap>,
394        seqno_threshold: SeqNo,
395    ) -> crate::Result<()> {
396        log::trace!(
397            "Registering {} tables, {} blob files",
398            tables.len(),
399            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
400        );
401
402        let mut compaction_state = self.compaction_state.lock().expect("lock is poisoned");
403        let mut super_version = self.super_version.write().expect("lock is poisoned");
404
405        compaction_state.upgrade_version(
406            &mut super_version,
407            |version| {
408                Ok(version.with_new_l0_run(tables, blob_files, frag_map.filter(|x| !x.is_empty())))
409            },
410            seqno_threshold,
411        )?;
412
413        for table in tables {
414            log::trace!("releasing sealed memtable {}", table.id());
415
416            super_version.sealed_memtables =
417                Arc::new(super_version.sealed_memtables.remove(table.id()));
418        }
419
420        Ok(())
421    }
422
423    fn clear_active_memtable(&self) {
424        self.super_version
425            .write()
426            .expect("lock is poisoned")
427            .active_memtable = Arc::new(Memtable::default());
428    }
429
430    fn set_active_memtable(&self, memtable: Memtable) {
431        let mut version_lock = self.super_version.write().expect("lock is poisoned");
432        version_lock.active_memtable = Arc::new(memtable);
433    }
434
435    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
436        let mut version_lock = self.super_version.write().expect("lock is poisoned");
437        version_lock.sealed_memtables = Arc::new(version_lock.sealed_memtables.add(id, memtable));
438    }
439
440    fn compact(
441        &self,
442        strategy: Arc<dyn CompactionStrategy>,
443        seqno_threshold: SeqNo,
444    ) -> crate::Result<()> {
445        // NOTE: Read lock major compaction lock
446        // That way, if a major compaction is running, we cannot proceed
447        // But in general, parallel (non-major) compactions can occur
448        let _lock = self
449            .0
450            .major_compaction_lock
451            .read()
452            .expect("lock is poisoned");
453
454        self.inner_compact(strategy, seqno_threshold)
455    }
456
457    fn get_next_table_id(&self) -> TableId {
458        self.0.get_next_table_id()
459    }
460
461    fn tree_config(&self) -> &Config {
462        &self.config
463    }
464
465    fn active_memtable_size(&self) -> u64 {
466        use std::sync::atomic::Ordering::Acquire;
467
468        self.super_version
469            .read()
470            .expect("lock is poisoned")
471            .active_memtable
472            .approximate_size
473            .load(Acquire)
474    }
475
476    fn tree_type(&self) -> crate::TreeType {
477        crate::TreeType::Standard
478    }
479
480    #[allow(clippy::significant_drop_tightening)]
481    fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
482        let mut version_lock = self.super_version.write().expect("lock is poisoned");
483
484        if version_lock.active_memtable.is_empty() {
485            return None;
486        }
487
488        let yanked_memtable = std::mem::take(&mut version_lock.active_memtable);
489        let yanked_memtable = yanked_memtable;
490
491        let tmp_memtable_id = self.get_next_table_id();
492
493        version_lock.sealed_memtables = Arc::new(
494            version_lock
495                .sealed_memtables
496                .add(tmp_memtable_id, yanked_memtable.clone()),
497        );
498
499        log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
500
501        Some((tmp_memtable_id, yanked_memtable))
502    }
503
504    fn table_count(&self) -> usize {
505        self.current_version().table_count()
506    }
507
508    fn level_table_count(&self, idx: usize) -> Option<usize> {
509        self.current_version().level(idx).map(|x| x.table_count())
510    }
511
512    #[allow(clippy::significant_drop_tightening)]
513    fn approximate_len(&self) -> usize {
514        let version = self.super_version.read().expect("lock is poisoned");
515
516        let tables_item_count = self
517            .current_version()
518            .iter_tables()
519            .map(|x| x.metadata.item_count)
520            .sum::<u64>();
521
522        let memtable_count = version.active_memtable.len() as u64;
523        let sealed_count = version
524            .sealed_memtables
525            .iter()
526            .map(|(_, mt)| mt.len())
527            .sum::<usize>() as u64;
528
529        (memtable_count + sealed_count + tables_item_count)
530            .try_into()
531            .expect("should not be too large")
532    }
533
534    fn disk_space(&self) -> u64 {
535        self.current_version()
536            .iter_levels()
537            .map(super::version::Level::size)
538            .sum()
539    }
540
541    #[allow(clippy::significant_drop_tightening)]
542    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
543        let version = self.super_version.read().expect("lock is poisoned");
544
545        let active = version.active_memtable.get_highest_seqno();
546
547        let sealed = version
548            .sealed_memtables
549            .iter()
550            .map(|(_, table)| table.get_highest_seqno())
551            .max()
552            .flatten();
553
554        active.max(sealed)
555    }
556
557    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
558        self.current_version()
559            .iter_tables()
560            .map(Table::get_highest_seqno)
561            .max()
562    }
563
564    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
565        Ok(self
566            .get_internal_entry(key.as_ref(), seqno)?
567            .map(|x| x.value))
568    }
569
570    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
571        &self,
572        key: K,
573        value: V,
574        seqno: SeqNo,
575    ) -> (u64, u64) {
576        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
577        self.append_entry(value)
578    }
579
580    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
581        let value = InternalValue::new_tombstone(key, seqno);
582        self.append_entry(value)
583    }
584
585    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
586        let value = InternalValue::new_weak_tombstone(key, seqno);
587        self.append_entry(value)
588    }
589}
590
591impl Tree {
592    /// Normalizes a user-provided range into owned `Bound<Slice>` values.
593    ///
594    /// Returns a tuple containing:
595    /// - the `OwnedBounds` that mirror the original bounds semantics (including
596    ///   inclusive/exclusive markers and unbounded endpoints), and
597    /// - a `bool` flag indicating whether the normalized range is logically
598    ///   empty (e.g., when the lower bound is greater than the upper bound).
599    ///
600    /// Callers can use the flag to detect empty ranges and skip further work
601    /// while still having access to the normalized bounds for non-empty cases.
602    fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
603        range: &R,
604    ) -> (OwnedBounds, bool) {
605        use Bound::{Excluded, Included, Unbounded};
606
607        let start = match range.start_bound() {
608            Included(key) => Included(Slice::from(key.as_ref())),
609            Excluded(key) => Excluded(Slice::from(key.as_ref())),
610            Unbounded => Unbounded,
611        };
612
613        let end = match range.end_bound() {
614            Included(key) => Included(Slice::from(key.as_ref())),
615            Excluded(key) => Excluded(Slice::from(key.as_ref())),
616            Unbounded => Unbounded,
617        };
618
619        let is_empty =
620            if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
621                lo.as_ref() > hi.as_ref()
622            } else {
623                false
624            };
625
626        (OwnedBounds { start, end }, is_empty)
627    }
628
629    /// Opens an LSM-tree in the given directory.
630    ///
631    /// Will recover previous state if the folder was previously
632    /// occupied by an LSM-tree, including the previous configuration.
633    /// If not, a new tree will be initialized with the given config.
634    ///
635    /// After recovering a previous state, use [`Tree::set_active_memtable`]
636    /// to fill the memtable with data from a write-ahead log for full durability.
637    ///
638    /// # Errors
639    ///
640    /// Returns error, if an IO error occurred.
641    pub(crate) fn open(config: Config) -> crate::Result<Self> {
642        use crate::file::MANIFEST_FILE;
643
644        log::debug!("Opening LSM-tree at {}", config.path.display());
645
646        // Check for old version
647        if config.path.join("version").try_exists()? {
648            return Err(crate::Error::InvalidVersion(FormatVersion::V1));
649        }
650
651        let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
652            Self::recover(config)
653        } else {
654            Self::create_new(config)
655        }?;
656
657        Ok(tree)
658    }
659
660    pub(crate) fn consume_writer(
661        &self,
662        writer: crate::table::Writer,
663    ) -> crate::Result<Option<Table>> {
664        let table_file_path = writer.path.clone();
665
666        let Some((_, checksum)) = writer.finish()? else {
667            return Ok(None);
668        };
669
670        log::debug!("Finalized table write at {}", table_file_path.display());
671
672        let pin_filter = self.config.filter_block_pinning_policy.get(0);
673        let pin_index = self.config.filter_block_pinning_policy.get(0);
674
675        let created_table = Table::recover(
676            table_file_path,
677            checksum,
678            self.id,
679            self.config.cache.clone(),
680            self.config.descriptor_table.clone(),
681            pin_filter,
682            pin_index,
683            #[cfg(feature = "metrics")]
684            self.metrics.clone(),
685        )?;
686
687        log::debug!("Flushed table to {:?}", created_table.path);
688
689        Ok(Some(created_table))
690    }
691
692    /// Returns `true` if there are some tables that are being compacted.
693    #[doc(hidden)]
694    #[must_use]
695    pub fn is_compacting(&self) -> bool {
696        !self
697            .compaction_state
698            .lock()
699            .expect("lock is poisoned")
700            .hidden_set()
701            .is_empty()
702    }
703
704    fn get_internal_entry_from_sealed_memtables(
705        super_version: &SuperVersion,
706        key: &[u8],
707        seqno: SeqNo,
708    ) -> Option<InternalValue> {
709        for (_, memtable) in super_version.sealed_memtables.iter().rev() {
710            if let Some(entry) = memtable.get(key, seqno) {
711                return Some(entry);
712            }
713        }
714
715        None
716    }
717
718    fn get_internal_entry_from_tables(
719        super_version: &SuperVersion,
720        key: &[u8],
721        seqno: SeqNo,
722    ) -> crate::Result<Option<InternalValue>> {
723        // NOTE: Create key hash for hash sharing
724        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
725        let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
726
727        for level in super_version.version.iter_levels() {
728            for run in level.iter() {
729                // NOTE: Based on benchmarking, binary search is only worth it with ~4 tables
730                if run.len() >= 4 {
731                    if let Some(table) = run.get_for_key(key) {
732                        if let Some(item) = table.get(key, seqno, key_hash)? {
733                            return Ok(ignore_tombstone_value(item));
734                        }
735                    }
736                } else {
737                    // NOTE: Fallback to linear search
738                    for table in run.iter() {
739                        if !table.is_key_in_key_range(key) {
740                            continue;
741                        }
742
743                        if let Some(item) = table.get(key, seqno, key_hash)? {
744                            return Ok(ignore_tombstone_value(item));
745                        }
746                    }
747                }
748            }
749        }
750
751        Ok(None)
752    }
753
754    fn inner_compact(
755        &self,
756        strategy: Arc<dyn CompactionStrategy>,
757        seqno_threshold: SeqNo,
758    ) -> crate::Result<()> {
759        use crate::compaction::worker::{do_compaction, Options};
760
761        let mut opts = Options::from_tree(self, strategy);
762        opts.eviction_seqno = seqno_threshold;
763
764        do_compaction(&opts)?;
765
766        log::debug!("Compaction run over");
767
768        Ok(())
769    }
770
771    #[doc(hidden)]
772    #[must_use]
773    pub fn create_iter(
774        &self,
775        seqno: SeqNo,
776        ephemeral: Option<Arc<Memtable>>,
777    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
778        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
779    }
780
781    #[doc(hidden)]
782    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
783        &'a self,
784        range: &'a R,
785        seqno: SeqNo,
786        ephemeral: Option<Arc<Memtable>>,
787    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
788        use crate::range::{IterState, TreeIter};
789        use std::ops::Bound::{self, Excluded, Included, Unbounded};
790
791        let lo: Bound<UserKey> = match range.start_bound() {
792            Included(x) => Included(x.as_ref().into()),
793            Excluded(x) => Excluded(x.as_ref().into()),
794            Unbounded => Unbounded,
795        };
796
797        let hi: Bound<UserKey> = match range.end_bound() {
798            Included(x) => Included(x.as_ref().into()),
799            Excluded(x) => Excluded(x.as_ref().into()),
800            Unbounded => Unbounded,
801        };
802
803        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
804
805        let super_version = self.super_version.write().expect("lock is poisoned");
806
807        let iter_state = {
808            let active = &super_version.active_memtable;
809            let sealed = &super_version.sealed_memtables;
810
811            IterState {
812                active: active.clone(),
813                sealed: sealed.iter().map(|(_, mt)| mt.clone()).collect(),
814                ephemeral,
815                version: super_version.version.clone(),
816            }
817        };
818
819        TreeIter::create_range(iter_state, bounds, seqno, &super_version.version)
820    }
821
822    #[doc(hidden)]
823    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
824        &self,
825        range: &'a R,
826        seqno: SeqNo,
827        ephemeral: Option<Arc<Memtable>>,
828    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
829        self.create_internal_range(range, seqno, ephemeral)
830            .map(|item| match item {
831                Ok(kv) => Ok((kv.key.user_key, kv.value)),
832                Err(e) => Err(e),
833            })
834    }
835
836    #[doc(hidden)]
837    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
838        &self,
839        prefix: K,
840        seqno: SeqNo,
841        ephemeral: Option<Arc<Memtable>>,
842    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
843        use crate::range::prefix_to_range;
844
845        let range = prefix_to_range(prefix.as_ref());
846        self.create_range(&range, seqno, ephemeral)
847    }
848
849    /// Adds an item to the active memtable.
850    ///
851    /// Returns the added item's size and new size of the memtable.
852    #[doc(hidden)]
853    #[must_use]
854    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
855        self.super_version
856            .read()
857            .expect("lock is poisoned")
858            .active_memtable
859            .insert(value)
860    }
861
862    /// Recovers previous state, by loading the level manifest, tables and blob files.
863    ///
864    /// # Errors
865    ///
866    /// Returns error, if an IO error occurred.
867    fn recover(mut config: Config) -> crate::Result<Self> {
868        use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
869        use inner::get_next_tree_id;
870
871        log::info!("Recovering LSM-tree at {}", config.path.display());
872
873        let manifest = {
874            let manifest_path = config.path.join(MANIFEST_FILE);
875            let reader = sfa::Reader::new(&manifest_path)?;
876            Manifest::decode_from(&manifest_path, &reader)?
877        };
878
879        if manifest.version != FormatVersion::V3 {
880            return Err(crate::Error::InvalidVersion(manifest.version));
881        }
882
883        // IMPORTANT: Restore persisted config
884        config.level_count = manifest.level_count;
885
886        let tree_id = get_next_tree_id();
887
888        #[cfg(feature = "metrics")]
889        let metrics = Arc::new(Metrics::default());
890
891        let version = Self::recover_levels(
892            &config.path,
893            tree_id,
894            &config.cache,
895            &config.descriptor_table,
896            #[cfg(feature = "metrics")]
897            &metrics,
898        )?;
899
900        let highest_table_id = version
901            .iter_tables()
902            .map(Table::id)
903            .max()
904            .unwrap_or_default();
905
906        let path = config.path.clone();
907
908        let inner = TreeInner {
909            id: tree_id,
910            table_id_counter: Arc::new(AtomicU64::new(highest_table_id + 1)),
911            blob_file_id_generator: SequenceNumberCounter::default(),
912            super_version: Arc::new(RwLock::new(SuperVersion {
913                active_memtable: Arc::default(),
914                sealed_memtables: Arc::default(),
915                version,
916            })),
917            stop_signal: StopSignal::default(),
918            config,
919            major_compaction_lock: RwLock::default(),
920            compaction_state: Arc::new(Mutex::new(CompactionState::new(path))),
921
922            #[cfg(feature = "metrics")]
923            metrics,
924        };
925
926        Ok(Self(Arc::new(inner)))
927    }
928
929    /// Creates a new LSM-tree in a directory.
930    fn create_new(config: Config) -> crate::Result<Self> {
931        use crate::file::{fsync_directory, MANIFEST_FILE, TABLES_FOLDER};
932        use std::fs::create_dir_all;
933
934        let path = config.path.clone();
935        log::trace!("Creating LSM-tree at {}", path.display());
936
937        create_dir_all(&path)?;
938
939        let manifest_path = path.join(MANIFEST_FILE);
940        assert!(!manifest_path.try_exists()?);
941
942        let table_folder_path = path.join(TABLES_FOLDER);
943        create_dir_all(&table_folder_path)?;
944
945        // Create manifest
946        {
947            let mut writer = sfa::Writer::new_at_path(manifest_path)?;
948
949            Manifest {
950                version: FormatVersion::V3,
951                level_count: config.level_count,
952                tree_type: if config.kv_separation_opts.is_some() {
953                    TreeType::Blob
954                } else {
955                    TreeType::Standard
956                },
957            }
958            .encode_into(&mut writer)?;
959
960            writer.finish()?;
961        }
962
963        // IMPORTANT: fsync folders on Unix
964        fsync_directory(&table_folder_path)?;
965        fsync_directory(&path)?;
966
967        let inner = TreeInner::create_new(config)?;
968        Ok(Self(Arc::new(inner)))
969    }
970
971    /// Recovers the level manifest, loading all tables from disk.
972    fn recover_levels<P: AsRef<Path>>(
973        tree_path: P,
974        tree_id: TreeId,
975        cache: &Arc<Cache>,
976        descriptor_table: &Arc<DescriptorTable>,
977        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
978    ) -> crate::Result<Version> {
979        use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
980
981        let tree_path = tree_path.as_ref();
982
983        let recovery = recover(tree_path)?;
984
985        let table_map = {
986            let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum)> =
987                crate::HashMap::default();
988
989            for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
990                for run in table_ids {
991                    for &(table_id, checksum) in run {
992                        // NOTE: We know there are always less than 256 levels
993                        #[allow(clippy::expect_used)]
994                        result.insert(
995                            table_id,
996                            (
997                                level_idx
998                                    .try_into()
999                                    .expect("there are less than 256 levels"),
1000                                checksum,
1001                            ),
1002                        );
1003                    }
1004                }
1005            }
1006
1007            result
1008        };
1009
1010        let cnt = table_map.len();
1011
1012        log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1013
1014        let progress_mod = match cnt {
1015            _ if cnt <= 20 => 1,
1016            _ if cnt <= 100 => 10,
1017            _ => 100,
1018        };
1019
1020        let mut tables = vec![];
1021
1022        let table_base_folder = tree_path.join(TABLES_FOLDER);
1023
1024        if !table_base_folder.try_exists()? {
1025            std::fs::create_dir_all(&table_base_folder)?;
1026            fsync_directory(&table_base_folder)?;
1027        }
1028
1029        let mut orphaned_tables = vec![];
1030
1031        for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1032            let dirent = dirent?;
1033            let file_name = dirent.file_name();
1034
1035            // https://en.wikipedia.org/wiki/.DS_Store
1036            if file_name == ".DS_Store" {
1037                continue;
1038            }
1039
1040            // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
1041            if file_name.to_string_lossy().starts_with("._") {
1042                continue;
1043            }
1044
1045            let table_file_name = file_name.to_str().ok_or_else(|| {
1046                log::error!("invalid table file name {}", file_name.display());
1047                crate::Error::Unrecoverable
1048            })?;
1049
1050            let table_file_path = dirent.path();
1051            assert!(!table_file_path.is_dir());
1052
1053            log::debug!("Recovering table from {}", table_file_path.display());
1054
1055            let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1056                log::error!("invalid table file name {table_file_name:?}: {e:?}");
1057                crate::Error::Unrecoverable
1058            })?;
1059
1060            if let Some(&(level_idx, checksum)) = table_map.get(&table_id) {
1061                let table = Table::recover(
1062                    table_file_path,
1063                    checksum,
1064                    tree_id,
1065                    cache.clone(),
1066                    descriptor_table.clone(),
1067                    level_idx <= 1, // TODO: look at configuration
1068                    level_idx <= 2, // TODO: look at configuration
1069                    #[cfg(feature = "metrics")]
1070                    metrics.clone(),
1071                )?;
1072
1073                log::debug!("Recovered table from {:?}", table.path);
1074
1075                tables.push(table);
1076
1077                if idx % progress_mod == 0 {
1078                    log::debug!("Recovered {idx}/{cnt} tables");
1079                }
1080            } else {
1081                orphaned_tables.push(table_file_path);
1082            }
1083        }
1084
1085        if tables.len() < cnt {
1086            log::error!(
1087                "Recovered less tables than expected: {:?}",
1088                table_map.keys(),
1089            );
1090            return Err(crate::Error::Unrecoverable);
1091        }
1092
1093        log::debug!("Successfully recovered {} tables", tables.len());
1094
1095        let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1096            &tree_path.join(BLOBS_FOLDER),
1097            &recovery.blob_file_ids,
1098        )?;
1099
1100        let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1101
1102        // NOTE: Cleanup old versions
1103        // But only after we definitely recovered the latest version
1104        Self::cleanup_orphaned_version(tree_path, version.id())?;
1105
1106        for table_path in orphaned_tables {
1107            log::debug!("Deleting orphaned table {}", table_path.display());
1108            std::fs::remove_file(&table_path)?;
1109        }
1110
1111        for blob_file_path in orphaned_blob_files {
1112            log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1113            std::fs::remove_file(&blob_file_path)?;
1114        }
1115
1116        Ok(version)
1117    }
1118
1119    fn cleanup_orphaned_version(path: &Path, latest_version_id: VersionId) -> crate::Result<()> {
1120        let version_str = format!("v{latest_version_id}");
1121
1122        for file in std::fs::read_dir(path)? {
1123            let dirent = file?;
1124
1125            if dirent.file_type()?.is_dir() {
1126                continue;
1127            }
1128
1129            let name = dirent.file_name();
1130
1131            if name.to_string_lossy().starts_with('v') && *name != *version_str {
1132                log::trace!("Cleanup orphaned version {}", name.display());
1133                std::fs::remove_file(dirent.path())?;
1134            }
1135        }
1136
1137        Ok(())
1138    }
1139}