lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10    compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
11    config::Config,
12    file::CURRENT_VERSION_FILE,
13    format_version::FormatVersion,
14    iter_guard::{IterGuard, IterGuardImpl},
15    manifest::Manifest,
16    memtable::Memtable,
17    slice::Slice,
18    table::Table,
19    value::InternalValue,
20    version::{recovery::recover, SuperVersion, SuperVersions, Version},
21    vlog::BlobFile,
22    AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
23    ValueType,
24};
25use inner::{TreeId, TreeInner};
26use std::{
27    ops::{Bound, RangeBounds},
28    path::Path,
29    sync::{Arc, Mutex, RwLock},
30};
31
32#[cfg(feature = "metrics")]
33use crate::metrics::Metrics;
34
35/// Iterator value guard
36pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39    fn into_inner_if(
40        self,
41        pred: impl Fn(&UserKey) -> bool,
42    ) -> crate::Result<(UserKey, Option<UserValue>)> {
43        let (k, v) = self.0?;
44
45        if pred(&k) {
46            Ok((k, Some(v)))
47        } else {
48            Ok((k, None))
49        }
50    }
51
52    fn key(self) -> crate::Result<UserKey> {
53        self.0.map(|(k, _)| k)
54    }
55
56    fn size(self) -> crate::Result<u32> {
57        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
58        self.into_inner().map(|(_, v)| v.len() as u32)
59    }
60
61    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
62        self.0
63    }
64}
65
66fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
67    if item.is_tombstone() {
68        None
69    } else {
70        Some(item)
71    }
72}
73
74/// A log-structured merge tree (LSM-tree/LSMT)
75#[derive(Clone)]
76pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
77
78impl std::ops::Deref for Tree {
79    type Target = TreeInner;
80
81    fn deref(&self) -> &Self::Target {
82        &self.0
83    }
84}
85
86impl AbstractTree for Tree {
87    fn table_file_cache_size(&self) -> usize {
88        self.config.descriptor_table.len()
89    }
90
91    fn get_version_history_lock(
92        &self,
93    ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
94        self.version_history.write().expect("lock is poisoned")
95    }
96
97    fn next_table_id(&self) -> TableId {
98        self.0.table_id_counter.get()
99    }
100
101    fn id(&self) -> TreeId {
102        self.id
103    }
104
105    fn blob_file_count(&self) -> usize {
106        0
107    }
108
109    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
110        let super_version = self
111            .version_history
112            .read()
113            .expect("lock is poisoned")
114            .get_version_for_snapshot(seqno);
115
116        Self::get_internal_entry_from_version(&super_version, key, seqno)
117    }
118
119    fn current_version(&self) -> Version {
120        self.version_history
121            .read()
122            .expect("poisoned")
123            .latest_version()
124            .version
125    }
126
127    fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
128        self.flush_lock.lock().expect("lock is poisoned")
129    }
130
131    #[cfg(feature = "metrics")]
132    fn metrics(&self) -> &Arc<crate::Metrics> {
133        &self.0.metrics
134    }
135
136    fn version_free_list_len(&self) -> usize {
137        self.version_history
138            .read()
139            .expect("lock is poisoned")
140            .free_list_len()
141    }
142
143    fn prefix<K: AsRef<[u8]>>(
144        &self,
145        prefix: K,
146        seqno: SeqNo,
147        index: Option<(Arc<Memtable>, SeqNo)>,
148    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
149        Box::new(
150            self.create_prefix(&prefix, seqno, index)
151                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
152        )
153    }
154
155    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
156        &self,
157        range: R,
158        seqno: SeqNo,
159        index: Option<(Arc<Memtable>, SeqNo)>,
160    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
161        Box::new(
162            self.create_range(&range, seqno, index)
163                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
164        )
165    }
166
167    /// Returns the number of tombstones in the tree.
168    fn tombstone_count(&self) -> u64 {
169        self.current_version()
170            .iter_tables()
171            .map(Table::tombstone_count)
172            .sum()
173    }
174
175    /// Returns the number of weak tombstones (single deletes) in the tree.
176    fn weak_tombstone_count(&self) -> u64 {
177        self.current_version()
178            .iter_tables()
179            .map(Table::weak_tombstone_count)
180            .sum()
181    }
182
183    /// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
184    fn weak_tombstone_reclaimable_count(&self) -> u64 {
185        self.current_version()
186            .iter_tables()
187            .map(Table::weak_tombstone_reclaimable)
188            .sum()
189    }
190
191    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
192        let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
193
194        if is_empty {
195            return Ok(());
196        }
197
198        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
199
200        // IMPORTANT: Write lock so we can be the only compaction going on
201        let _lock = self
202            .0
203            .major_compaction_lock
204            .write()
205            .expect("lock is poisoned");
206
207        log::info!("Starting drop_range compaction");
208        self.inner_compact(strategy, 0)
209    }
210
211    #[doc(hidden)]
212    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
213        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
214
215        // IMPORTANT: Write lock so we can be the only compaction going on
216        let _lock = self
217            .0
218            .major_compaction_lock
219            .write()
220            .expect("lock is poisoned");
221
222        log::info!("Starting major compaction");
223        self.inner_compact(strategy, seqno_threshold)
224    }
225
226    fn l0_run_count(&self) -> usize {
227        self.current_version()
228            .level(0)
229            .map(|x| x.run_count())
230            .unwrap_or_default()
231    }
232
233    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
234        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
235        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
236    }
237
238    fn filter_size(&self) -> usize {
239        self.current_version()
240            .iter_tables()
241            .map(Table::filter_size)
242            .sum()
243    }
244
245    fn pinned_filter_size(&self) -> usize {
246        self.current_version()
247            .iter_tables()
248            .map(Table::pinned_filter_size)
249            .sum()
250    }
251
252    fn pinned_block_index_size(&self) -> usize {
253        self.current_version()
254            .iter_tables()
255            .map(Table::pinned_block_index_size)
256            .sum()
257    }
258
259    fn sealed_memtable_count(&self) -> usize {
260        self.version_history
261            .read()
262            .expect("lock is poisoned")
263            .latest_version()
264            .sealed_memtables
265            .len()
266    }
267
268    fn flush_to_tables(
269        &self,
270        stream: impl Iterator<Item = crate::Result<InternalValue>>,
271    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
272        use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
273        use std::time::Instant;
274
275        let start = Instant::now();
276
277        let folder = self.config.path.join(TABLES_FOLDER);
278
279        let data_block_size = self.config.data_block_size_policy.get(0);
280
281        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
282        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
283
284        let data_block_compression = self.config.data_block_compression_policy.get(0);
285        let index_block_compression = self.config.index_block_compression_policy.get(0);
286
287        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
288
289        let index_partitioning = self.config.index_block_partitioning_policy.get(0);
290        let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
291
292        log::debug!(
293            "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
294            folder.display(),
295        );
296
297        let mut table_writer = MultiWriter::new(
298            folder.clone(),
299            self.table_id_counter.clone(),
300            64 * 1_024 * 1_024,
301            0,
302        )?
303        .use_data_block_restart_interval(data_block_restart_interval)
304        .use_index_block_restart_interval(index_block_restart_interval)
305        .use_data_block_compression(data_block_compression)
306        .use_index_block_compression(index_block_compression)
307        .use_data_block_size(data_block_size)
308        .use_data_block_hash_ratio(data_block_hash_ratio)
309        .use_bloom_policy({
310            use crate::config::FilterPolicyEntry::{Bloom, None};
311            use crate::table::filter::BloomConstructionPolicy;
312
313            match self.config.filter_policy.get(0) {
314                Bloom(policy) => policy,
315                None => BloomConstructionPolicy::BitsPerKey(0.0),
316            }
317        });
318
319        if index_partitioning {
320            table_writer = table_writer.use_partitioned_index();
321        }
322        if filter_partitioning {
323            table_writer = table_writer.use_partitioned_filter();
324        }
325
326        for item in stream {
327            table_writer.write(item?)?;
328        }
329
330        let result = table_writer.finish()?;
331
332        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
333
334        let pin_filter = self.config.filter_block_pinning_policy.get(0);
335        let pin_index = self.config.index_block_pinning_policy.get(0);
336
337        // Load tables
338        let tables = result
339            .into_iter()
340            .map(|(table_id, checksum)| -> crate::Result<Table> {
341                Table::recover(
342                    folder.join(table_id.to_string()),
343                    checksum,
344                    0,
345                    self.id,
346                    self.config.cache.clone(),
347                    self.config.descriptor_table.clone(),
348                    pin_filter,
349                    pin_index,
350                    #[cfg(feature = "metrics")]
351                    self.metrics.clone(),
352                )
353            })
354            .collect::<crate::Result<Vec<_>>>()?;
355
356        Ok(Some((tables, None)))
357    }
358
359    #[expect(clippy::significant_drop_tightening)]
360    fn register_tables(
361        &self,
362        tables: &[Table],
363        blob_files: Option<&[BlobFile]>,
364        frag_map: Option<crate::blob_tree::FragmentationMap>,
365        sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
366        gc_watermark: SeqNo,
367    ) -> crate::Result<()> {
368        log::trace!(
369            "Registering {} tables, {} blob files",
370            tables.len(),
371            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
372        );
373
374        let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
375        let mut version_lock = self.version_history.write().expect("lock is poisoned");
376
377        version_lock.upgrade_version(
378            &self.config.path,
379            |current| {
380                let mut copy = current.clone();
381
382                copy.version = copy.version.with_new_l0_run(
383                    tables,
384                    blob_files,
385                    frag_map.filter(|x| !x.is_empty()),
386                );
387
388                for &table_id in sealed_memtables_to_delete {
389                    log::trace!("releasing sealed memtable #{table_id}");
390                    copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
391                }
392
393                Ok(copy)
394            },
395            &self.config.seqno,
396        )?;
397
398        if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
399            log::warn!("Version GC failed: {e:?}");
400        }
401
402        Ok(())
403    }
404
405    fn clear_active_memtable(&self) {
406        self.version_history
407            .write()
408            .expect("lock is poisoned")
409            .latest_version()
410            .active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
411    }
412
413    fn set_active_memtable(&self, memtable: Memtable) {
414        self.version_history
415            .write()
416            .expect("lock is poisoned")
417            .latest_version()
418            .active_memtable = Arc::new(memtable);
419    }
420
421    fn add_sealed_memtable(&self, memtable: Arc<Memtable>) {
422        let mut version_lock = self.version_history.write().expect("lock is poisoned");
423        version_lock.append_sealed_memtable(memtable);
424    }
425
426    fn compact(
427        &self,
428        strategy: Arc<dyn CompactionStrategy>,
429        seqno_threshold: SeqNo,
430    ) -> crate::Result<()> {
431        // NOTE: Read lock major compaction lock
432        // That way, if a major compaction is running, we cannot proceed
433        // But in general, parallel (non-major) compactions can occur
434        let _lock = self
435            .0
436            .major_compaction_lock
437            .read()
438            .expect("lock is poisoned");
439
440        self.inner_compact(strategy, seqno_threshold)
441    }
442
443    fn get_next_table_id(&self) -> TableId {
444        self.0.get_next_table_id()
445    }
446
447    fn tree_config(&self) -> &Config {
448        &self.config
449    }
450
451    fn active_memtable(&self) -> Arc<Memtable> {
452        self.version_history
453            .read()
454            .expect("lock is poisoned")
455            .latest_version()
456            .active_memtable
457            .clone()
458    }
459
460    fn tree_type(&self) -> crate::TreeType {
461        crate::TreeType::Standard
462    }
463
464    #[expect(clippy::significant_drop_tightening)]
465    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
466        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
467        let super_version = version_history_lock.latest_version();
468
469        if super_version.active_memtable.is_empty() {
470            return None;
471        }
472
473        let yanked_memtable = super_version.active_memtable;
474
475        let mut copy = version_history_lock.latest_version();
476        copy.seqno = self.config.seqno.next();
477        copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
478        copy.sealed_memtables =
479            Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
480
481        version_history_lock.append_version(copy);
482
483        log::trace!(
484            "rotate: added memtable id={} to sealed memtables",
485            yanked_memtable.id,
486        );
487
488        Some(yanked_memtable)
489    }
490
491    fn table_count(&self) -> usize {
492        self.current_version().table_count()
493    }
494
495    fn level_table_count(&self, idx: usize) -> Option<usize> {
496        self.current_version().level(idx).map(|x| x.table_count())
497    }
498
499    fn approximate_len(&self) -> usize {
500        let super_version = self
501            .version_history
502            .read()
503            .expect("lock is poisoned")
504            .latest_version();
505
506        let tables_item_count = self
507            .current_version()
508            .iter_tables()
509            .map(|x| x.metadata.item_count)
510            .sum::<u64>();
511
512        let memtable_count = super_version.active_memtable.len() as u64;
513        let sealed_count = super_version
514            .sealed_memtables
515            .iter()
516            .map(|mt| mt.len())
517            .sum::<usize>() as u64;
518
519        (memtable_count + sealed_count + tables_item_count)
520            .try_into()
521            .expect("should not be too large")
522    }
523
524    fn disk_space(&self) -> u64 {
525        self.current_version()
526            .iter_levels()
527            .map(super::version::Level::size)
528            .sum()
529    }
530
531    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
532        let version = self
533            .version_history
534            .read()
535            .expect("lock is poisoned")
536            .latest_version();
537
538        let active = version.active_memtable.get_highest_seqno();
539
540        let sealed = version
541            .sealed_memtables
542            .iter()
543            .map(|mt| mt.get_highest_seqno())
544            .max()
545            .flatten();
546
547        active.max(sealed)
548    }
549
550    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
551        self.current_version()
552            .iter_tables()
553            .map(Table::get_highest_seqno)
554            .max()
555    }
556
557    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
558        Ok(self
559            .get_internal_entry(key.as_ref(), seqno)?
560            .map(|x| x.value))
561    }
562
563    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
564        &self,
565        key: K,
566        value: V,
567        seqno: SeqNo,
568    ) -> (u64, u64) {
569        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
570        self.append_entry(value)
571    }
572
573    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
574        let value = InternalValue::new_tombstone(key, seqno);
575        self.append_entry(value)
576    }
577
578    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
579        let value = InternalValue::new_weak_tombstone(key, seqno);
580        self.append_entry(value)
581    }
582}
583
584impl Tree {
585    #[doc(hidden)]
586    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
587        version: SuperVersion,
588        range: &'a R,
589        seqno: SeqNo,
590        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
591    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
592        use crate::range::{IterState, TreeIter};
593        use std::ops::Bound::{self, Excluded, Included, Unbounded};
594
595        let lo: Bound<UserKey> = match range.start_bound() {
596            Included(x) => Included(x.as_ref().into()),
597            Excluded(x) => Excluded(x.as_ref().into()),
598            Unbounded => Unbounded,
599        };
600
601        let hi: Bound<UserKey> = match range.end_bound() {
602            Included(x) => Included(x.as_ref().into()),
603            Excluded(x) => Excluded(x.as_ref().into()),
604            Unbounded => Unbounded,
605        };
606
607        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
608
609        let iter_state = { IterState { version, ephemeral } };
610
611        TreeIter::create_range(iter_state, bounds, seqno)
612    }
613
614    pub(crate) fn get_internal_entry_from_version(
615        super_version: &SuperVersion,
616        key: &[u8],
617        seqno: SeqNo,
618    ) -> crate::Result<Option<InternalValue>> {
619        if let Some(entry) = super_version.active_memtable.get(key, seqno) {
620            return Ok(ignore_tombstone_value(entry));
621        }
622
623        // Now look in sealed memtables
624        if let Some(entry) =
625            Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
626        {
627            return Ok(ignore_tombstone_value(entry));
628        }
629
630        // Now look in tables... this may involve disk I/O
631        Self::get_internal_entry_from_tables(&super_version.version, key, seqno)
632    }
633
634    fn get_internal_entry_from_tables(
635        version: &Version,
636        key: &[u8],
637        seqno: SeqNo,
638    ) -> crate::Result<Option<InternalValue>> {
639        // NOTE: Create key hash for hash sharing
640        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
641        let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
642
643        for level in version.iter_levels() {
644            for run in level.iter() {
645                // NOTE: Based on benchmarking, binary search is only worth it with ~4 tables
646                if run.len() >= 4 {
647                    if let Some(table) = run.get_for_key(key) {
648                        if let Some(item) = table.get(key, seqno, key_hash)? {
649                            return Ok(ignore_tombstone_value(item));
650                        }
651                    }
652                } else {
653                    // NOTE: Fallback to linear search
654                    for table in run.iter() {
655                        if !table.is_key_in_key_range(key) {
656                            continue;
657                        }
658
659                        if let Some(item) = table.get(key, seqno, key_hash)? {
660                            return Ok(ignore_tombstone_value(item));
661                        }
662                    }
663                }
664            }
665        }
666
667        Ok(None)
668    }
669
670    fn get_internal_entry_from_sealed_memtables(
671        super_version: &SuperVersion,
672        key: &[u8],
673        seqno: SeqNo,
674    ) -> Option<InternalValue> {
675        for mt in super_version.sealed_memtables.iter().rev() {
676            if let Some(entry) = mt.get(key, seqno) {
677                return Some(entry);
678            }
679        }
680
681        None
682    }
683
684    pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
685        self.version_history
686            .read()
687            .expect("lock is poisoned")
688            .get_version_for_snapshot(seqno)
689    }
690
691    /// Normalizes a user-provided range into owned `Bound<Slice>` values.
692    ///
693    /// Returns a tuple containing:
694    /// - the `OwnedBounds` that mirror the original bounds semantics (including
695    ///   inclusive/exclusive markers and unbounded endpoints), and
696    /// - a `bool` flag indicating whether the normalized range is logically
697    ///   empty (e.g., when the lower bound is greater than the upper bound).
698    ///
699    /// Callers can use the flag to detect empty ranges and skip further work
700    /// while still having access to the normalized bounds for non-empty cases.
701    fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
702        range: &R,
703    ) -> (OwnedBounds, bool) {
704        use Bound::{Excluded, Included, Unbounded};
705
706        let start = match range.start_bound() {
707            Included(key) => Included(Slice::from(key.as_ref())),
708            Excluded(key) => Excluded(Slice::from(key.as_ref())),
709            Unbounded => Unbounded,
710        };
711
712        let end = match range.end_bound() {
713            Included(key) => Included(Slice::from(key.as_ref())),
714            Excluded(key) => Excluded(Slice::from(key.as_ref())),
715            Unbounded => Unbounded,
716        };
717
718        let is_empty =
719            if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
720                lo.as_ref() > hi.as_ref()
721            } else {
722                false
723            };
724
725        (OwnedBounds { start, end }, is_empty)
726    }
727
728    /// Opens an LSM-tree in the given directory.
729    ///
730    /// Will recover previous state if the folder was previously
731    /// occupied by an LSM-tree, including the previous configuration.
732    /// If not, a new tree will be initialized with the given config.
733    ///
734    /// After recovering a previous state, use [`Tree::set_active_memtable`]
735    /// to fill the memtable with data from a write-ahead log for full durability.
736    ///
737    /// # Errors
738    ///
739    /// Returns error, if an IO error occurred.
740    pub(crate) fn open(config: Config) -> crate::Result<Self> {
741        log::debug!("Opening LSM-tree at {}", config.path.display());
742
743        // Check for old version
744        if config.path.join("version").try_exists()? {
745            log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
746            return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
747        }
748
749        let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
750            Self::recover(config)
751        } else {
752            Self::create_new(config)
753        }?;
754
755        Ok(tree)
756    }
757
758    pub(crate) fn consume_writer(
759        &self,
760        writer: crate::table::Writer,
761    ) -> crate::Result<Option<Table>> {
762        let table_file_path = writer.path.clone();
763
764        let Some((_, checksum)) = writer.finish()? else {
765            return Ok(None);
766        };
767
768        log::debug!("Finalized table write at {}", table_file_path.display());
769
770        let pin_filter = self.config.filter_block_pinning_policy.get(0);
771        let pin_index = self.config.index_block_pinning_policy.get(0);
772
773        let created_table = Table::recover(
774            table_file_path,
775            checksum,
776            0,
777            self.id,
778            self.config.cache.clone(),
779            self.config.descriptor_table.clone(),
780            pin_filter,
781            pin_index,
782            #[cfg(feature = "metrics")]
783            self.metrics.clone(),
784        )?;
785
786        log::debug!("Flushed table to {:?}", created_table.path);
787
788        Ok(Some(created_table))
789    }
790
791    /// Returns `true` if there are some tables that are being compacted.
792    #[doc(hidden)]
793    #[must_use]
794    pub fn is_compacting(&self) -> bool {
795        !self
796            .compaction_state
797            .lock()
798            .expect("lock is poisoned")
799            .hidden_set()
800            .is_empty()
801    }
802
803    fn inner_compact(
804        &self,
805        strategy: Arc<dyn CompactionStrategy>,
806        mvcc_gc_watermark: SeqNo,
807    ) -> crate::Result<()> {
808        use crate::compaction::worker::{do_compaction, Options};
809
810        let mut opts = Options::from_tree(self, strategy);
811        opts.mvcc_gc_watermark = mvcc_gc_watermark;
812
813        do_compaction(&opts)?;
814
815        log::debug!("Compaction run over");
816
817        Ok(())
818    }
819
820    #[doc(hidden)]
821    #[must_use]
822    pub fn create_iter(
823        &self,
824        seqno: SeqNo,
825        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
826    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
827        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
828    }
829
830    #[doc(hidden)]
831    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
832        &self,
833        range: &'a R,
834        seqno: SeqNo,
835        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
836    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
837        let version_history_lock = self.version_history.read().expect("lock is poisoned");
838        let super_version = version_history_lock.get_version_for_snapshot(seqno);
839
840        Self::create_internal_range(super_version, range, seqno, ephemeral).map(|item| match item {
841            Ok(kv) => Ok((kv.key.user_key, kv.value)),
842            Err(e) => Err(e),
843        })
844    }
845
846    #[doc(hidden)]
847    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
848        &self,
849        prefix: K,
850        seqno: SeqNo,
851        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
852    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
853        use crate::range::prefix_to_range;
854
855        let range = prefix_to_range(prefix.as_ref());
856        self.create_range(&range, seqno, ephemeral)
857    }
858
859    /// Adds an item to the active memtable.
860    ///
861    /// Returns the added item's size and new size of the memtable.
862    #[doc(hidden)]
863    #[must_use]
864    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
865        self.version_history
866            .read()
867            .expect("lock is poisoned")
868            .latest_version()
869            .active_memtable
870            .insert(value)
871    }
872
873    /// Recovers previous state, by loading the level manifest, tables and blob files.
874    ///
875    /// # Errors
876    ///
877    /// Returns error, if an IO error occurred.
878    fn recover(mut config: Config) -> crate::Result<Self> {
879        use crate::stop_signal::StopSignal;
880        use inner::get_next_tree_id;
881
882        log::info!("Recovering LSM-tree at {}", config.path.display());
883
884        // let manifest = {
885        //     let manifest_path = config.path.join(MANIFEST_FILE);
886        //     let reader = sfa::Reader::new(&manifest_path)?;
887        //     Manifest::decode_from(&manifest_path, &reader)?
888        // };
889
890        // if manifest.version != FormatVersion::V3 {
891        //     return Err(crate::Error::InvalidVersion(manifest.version.into()));
892        // }
893
894        let tree_id = get_next_tree_id();
895
896        #[cfg(feature = "metrics")]
897        let metrics = Arc::new(Metrics::default());
898
899        let version = Self::recover_levels(
900            &config.path,
901            tree_id,
902            &config,
903            #[cfg(feature = "metrics")]
904            &metrics,
905        )?;
906
907        {
908            let manifest_path = config.path.join(format!("v{}", version.id()));
909            let reader = sfa::Reader::new(&manifest_path)?;
910            let manifest = Manifest::decode_from(&manifest_path, &reader)?;
911
912            if manifest.version != FormatVersion::V3 {
913                return Err(crate::Error::InvalidVersion(manifest.version.into()));
914            }
915
916            let requested_tree_type = match config.kv_separation_opts {
917                Some(_) => crate::TreeType::Blob,
918                None => crate::TreeType::Standard,
919            };
920
921            if version.tree_type() != requested_tree_type {
922                log::error!(
923                    "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
924                    version.tree_type(),
925                );
926                return Err(crate::Error::Unrecoverable);
927            }
928
929            // IMPORTANT: Restore persisted config
930            config.level_count = manifest.level_count;
931        }
932
933        let highest_table_id = version
934            .iter_tables()
935            .map(Table::id)
936            .max()
937            .unwrap_or_default();
938
939        let inner = TreeInner {
940            id: tree_id,
941            memtable_id_counter: SequenceNumberCounter::default(),
942            table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
943            blob_file_id_counter: SequenceNumberCounter::default(),
944            version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
945            stop_signal: StopSignal::default(),
946            config,
947            major_compaction_lock: RwLock::default(),
948            flush_lock: Mutex::default(),
949            compaction_state: Arc::new(Mutex::new(CompactionState::default())),
950
951            #[cfg(feature = "metrics")]
952            metrics,
953        };
954
955        Ok(Self(Arc::new(inner)))
956    }
957
958    /// Creates a new LSM-tree in a directory.
959    fn create_new(config: Config) -> crate::Result<Self> {
960        use crate::file::{fsync_directory, TABLES_FOLDER};
961        use std::fs::create_dir_all;
962
963        let path = config.path.clone();
964        log::trace!("Creating LSM-tree at {}", path.display());
965
966        create_dir_all(&path)?;
967
968        let table_folder_path = path.join(TABLES_FOLDER);
969        create_dir_all(&table_folder_path)?;
970
971        // // Create manifest
972        // {
973        //     let mut writer = sfa::Writer::new_at_path(manifest_path)?;
974
975        //     Manifest {
976        //         version: FormatVersion::V3,
977        //         level_count: config.level_count,
978        //         tree_type: if config.kv_separation_opts.is_some() {
979        //             TreeType::Blob
980        //         } else {
981        //             TreeType::Standard
982        //         },
983        //     }
984        //     .encode_into(&mut writer)?;
985
986        //     writer.finish()?;
987        // }
988
989        // IMPORTANT: fsync folders on Unix
990        fsync_directory(&table_folder_path)?;
991        fsync_directory(&path)?;
992
993        let inner = TreeInner::create_new(config)?;
994        Ok(Self(Arc::new(inner)))
995    }
996
997    /// Recovers the level manifest, loading all tables from disk.
998    fn recover_levels<P: AsRef<Path>>(
999        tree_path: P,
1000        tree_id: TreeId,
1001        config: &Config,
1002        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
1003    ) -> crate::Result<Version> {
1004        use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
1005
1006        let tree_path = tree_path.as_ref();
1007
1008        let recovery = recover(tree_path)?;
1009
1010        let table_map = {
1011            let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum, SeqNo)> =
1012                crate::HashMap::default();
1013
1014            for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1015                for run in table_ids {
1016                    for table in run {
1017                        #[expect(
1018                            clippy::expect_used,
1019                            reason = "there are always less than 256 levels"
1020                        )]
1021                        result.insert(
1022                            table.id,
1023                            (
1024                                level_idx
1025                                    .try_into()
1026                                    .expect("there are less than 256 levels"),
1027                                table.checksum,
1028                                table.global_seqno,
1029                            ),
1030                        );
1031                    }
1032                }
1033            }
1034
1035            result
1036        };
1037
1038        let cnt = table_map.len();
1039
1040        log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1041
1042        let progress_mod = match cnt {
1043            _ if cnt <= 20 => 1,
1044            _ if cnt <= 100 => 10,
1045            _ => 100,
1046        };
1047
1048        let mut tables = vec![];
1049
1050        let table_base_folder = tree_path.join(TABLES_FOLDER);
1051
1052        if !table_base_folder.try_exists()? {
1053            std::fs::create_dir_all(&table_base_folder)?;
1054            fsync_directory(&table_base_folder)?;
1055        }
1056
1057        let mut orphaned_tables = vec![];
1058
1059        for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1060            let dirent = dirent?;
1061            let file_name = dirent.file_name();
1062
1063            // https://en.wikipedia.org/wiki/.DS_Store
1064            if file_name == ".DS_Store" {
1065                continue;
1066            }
1067
1068            // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
1069            if file_name.to_string_lossy().starts_with("._") {
1070                continue;
1071            }
1072
1073            let table_file_name = file_name.to_str().ok_or_else(|| {
1074                log::error!("invalid table file name {}", file_name.display());
1075                crate::Error::Unrecoverable
1076            })?;
1077
1078            let table_file_path = dirent.path();
1079            assert!(!table_file_path.is_dir());
1080
1081            log::debug!("Recovering table from {}", table_file_path.display());
1082
1083            let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1084                log::error!("invalid table file name {table_file_name:?}: {e:?}");
1085                crate::Error::Unrecoverable
1086            })?;
1087
1088            if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) {
1089                let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
1090                let pin_index = config.index_block_pinning_policy.get(level_idx.into());
1091
1092                let table = Table::recover(
1093                    table_file_path,
1094                    checksum,
1095                    global_seqno,
1096                    tree_id,
1097                    config.cache.clone(),
1098                    config.descriptor_table.clone(),
1099                    pin_filter,
1100                    pin_index,
1101                    #[cfg(feature = "metrics")]
1102                    metrics.clone(),
1103                )?;
1104
1105                log::debug!("Recovered table from {:?}", table.path);
1106
1107                tables.push(table);
1108
1109                if idx % progress_mod == 0 {
1110                    log::debug!("Recovered {idx}/{cnt} tables");
1111                }
1112            } else {
1113                orphaned_tables.push(table_file_path);
1114            }
1115        }
1116
1117        if tables.len() < cnt {
1118            log::error!(
1119                "Recovered less tables than expected: {:?}",
1120                table_map.keys(),
1121            );
1122            return Err(crate::Error::Unrecoverable);
1123        }
1124
1125        log::debug!("Successfully recovered {} tables", tables.len());
1126
1127        let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1128            &tree_path.join(crate::file::BLOBS_FOLDER),
1129            &recovery.blob_file_ids,
1130        )?;
1131
1132        let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1133
1134        // NOTE: Cleanup old versions
1135        // But only after we definitely recovered the latest version
1136        Self::cleanup_orphaned_version(tree_path, version.id())?;
1137
1138        for table_path in orphaned_tables {
1139            log::debug!("Deleting orphaned table {}", table_path.display());
1140            std::fs::remove_file(&table_path)?;
1141        }
1142
1143        for blob_file_path in orphaned_blob_files {
1144            log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1145            std::fs::remove_file(&blob_file_path)?;
1146        }
1147
1148        Ok(version)
1149    }
1150
1151    fn cleanup_orphaned_version(
1152        path: &Path,
1153        latest_version_id: crate::version::VersionId,
1154    ) -> crate::Result<()> {
1155        let version_str = format!("v{latest_version_id}");
1156
1157        for file in std::fs::read_dir(path)? {
1158            let dirent = file?;
1159
1160            if dirent.file_type()?.is_dir() {
1161                continue;
1162            }
1163
1164            let name = dirent.file_name();
1165
1166            if name.to_string_lossy().starts_with('v') && *name != *version_str {
1167                log::trace!("Cleanup orphaned version {}", name.display());
1168                std::fs::remove_file(dirent.path())?;
1169            }
1170        }
1171
1172        Ok(())
1173    }
1174}