lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10    compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
11    config::Config,
12    file::CURRENT_VERSION_FILE,
13    format_version::FormatVersion,
14    iter_guard::{IterGuard, IterGuardImpl},
15    manifest::Manifest,
16    memtable::Memtable,
17    slice::Slice,
18    table::Table,
19    value::InternalValue,
20    version::{recovery::recover, SuperVersion, SuperVersions, Version},
21    vlog::BlobFile,
22    AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
23    ValueType,
24};
25use inner::{TreeId, TreeInner};
26use std::{
27    ops::{Bound, RangeBounds},
28    path::Path,
29    sync::{Arc, Mutex, RwLock},
30};
31
32#[cfg(feature = "metrics")]
33use crate::metrics::Metrics;
34
35/// Iterator value guard
36pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39    fn into_inner_if(
40        self,
41        pred: impl Fn(&UserKey) -> bool,
42    ) -> crate::Result<(UserKey, Option<UserValue>)> {
43        let (k, v) = self.0?;
44
45        if pred(&k) {
46            Ok((k, Some(v)))
47        } else {
48            Ok((k, None))
49        }
50    }
51
52    fn key(self) -> crate::Result<UserKey> {
53        self.0.map(|(k, _)| k)
54    }
55
56    fn size(self) -> crate::Result<u32> {
57        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
58        self.into_inner().map(|(_, v)| v.len() as u32)
59    }
60
61    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
62        self.0
63    }
64}
65
66fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
67    if item.is_tombstone() {
68        None
69    } else {
70        Some(item)
71    }
72}
73
74/// A log-structured merge tree (LSM-tree/LSMT)
75#[derive(Clone)]
76pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
77
78impl std::ops::Deref for Tree {
79    type Target = TreeInner;
80
81    fn deref(&self) -> &Self::Target {
82        &self.0
83    }
84}
85
86impl AbstractTree for Tree {
87    fn table_file_cache_size(&self) -> usize {
88        self.config.descriptor_table.len()
89    }
90
91    fn get_version_history_lock(
92        &self,
93    ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
94        self.version_history.write().expect("lock is poisoned")
95    }
96
97    fn next_table_id(&self) -> TableId {
98        self.0.table_id_counter.get()
99    }
100
101    fn id(&self) -> TreeId {
102        self.id
103    }
104
105    fn blob_file_count(&self) -> usize {
106        0
107    }
108
109    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
110        let super_version = self
111            .version_history
112            .read()
113            .expect("lock is poisoned")
114            .get_version_for_snapshot(seqno);
115
116        Self::get_internal_entry_from_version(&super_version, key, seqno)
117    }
118
119    fn current_version(&self) -> Version {
120        self.version_history
121            .read()
122            .expect("poisoned")
123            .latest_version()
124            .version
125    }
126
127    fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
128        self.flush_lock.lock().expect("lock is poisoned")
129    }
130
131    #[cfg(feature = "metrics")]
132    fn metrics(&self) -> &Arc<crate::Metrics> {
133        &self.0.metrics
134    }
135
136    fn version_free_list_len(&self) -> usize {
137        self.version_history
138            .read()
139            .expect("lock is poisoned")
140            .free_list_len()
141    }
142
143    fn prefix<K: AsRef<[u8]>>(
144        &self,
145        prefix: K,
146        seqno: SeqNo,
147        index: Option<(Arc<Memtable>, SeqNo)>,
148    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
149        Box::new(
150            self.create_prefix(&prefix, seqno, index)
151                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
152        )
153    }
154
155    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
156        &self,
157        range: R,
158        seqno: SeqNo,
159        index: Option<(Arc<Memtable>, SeqNo)>,
160    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
161        Box::new(
162            self.create_range(&range, seqno, index)
163                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
164        )
165    }
166
167    /// Returns the number of tombstones in the tree.
168    fn tombstone_count(&self) -> u64 {
169        self.current_version()
170            .iter_tables()
171            .map(Table::tombstone_count)
172            .sum()
173    }
174
175    /// Returns the number of weak tombstones (single deletes) in the tree.
176    fn weak_tombstone_count(&self) -> u64 {
177        self.current_version()
178            .iter_tables()
179            .map(Table::weak_tombstone_count)
180            .sum()
181    }
182
183    /// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
184    fn weak_tombstone_reclaimable_count(&self) -> u64 {
185        self.current_version()
186            .iter_tables()
187            .map(Table::weak_tombstone_reclaimable)
188            .sum()
189    }
190
191    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
192        let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
193
194        if is_empty {
195            return Ok(());
196        }
197
198        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
199
200        // IMPORTANT: Write lock so we can be the only compaction going on
201        let _lock = self
202            .0
203            .major_compaction_lock
204            .write()
205            .expect("lock is poisoned");
206
207        log::info!("Starting drop_range compaction");
208        self.inner_compact(strategy, 0)
209    }
210
211    #[doc(hidden)]
212    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
213        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
214
215        // IMPORTANT: Write lock so we can be the only compaction going on
216        let _lock = self
217            .0
218            .major_compaction_lock
219            .write()
220            .expect("lock is poisoned");
221
222        log::info!("Starting major compaction");
223        self.inner_compact(strategy, seqno_threshold)
224    }
225
226    fn l0_run_count(&self) -> usize {
227        self.current_version()
228            .level(0)
229            .map(|x| x.run_count())
230            .unwrap_or_default()
231    }
232
233    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
234        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
235        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
236    }
237
238    fn filter_size(&self) -> usize {
239        self.current_version()
240            .iter_tables()
241            .map(Table::filter_size)
242            .sum()
243    }
244
245    fn pinned_filter_size(&self) -> usize {
246        self.current_version()
247            .iter_tables()
248            .map(Table::pinned_filter_size)
249            .sum()
250    }
251
252    fn pinned_block_index_size(&self) -> usize {
253        self.current_version()
254            .iter_tables()
255            .map(Table::pinned_block_index_size)
256            .sum()
257    }
258
259    fn sealed_memtable_count(&self) -> usize {
260        self.version_history
261            .read()
262            .expect("lock is poisoned")
263            .latest_version()
264            .sealed_memtables
265            .len()
266    }
267
268    fn flush_to_tables(
269        &self,
270        stream: impl Iterator<Item = crate::Result<InternalValue>>,
271    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
272        use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
273        use std::time::Instant;
274
275        let start = Instant::now();
276
277        let folder = self.config.path.join(TABLES_FOLDER);
278
279        let data_block_size = self.config.data_block_size_policy.get(0);
280
281        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
282        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
283
284        let data_block_compression = self.config.data_block_compression_policy.get(0);
285        let index_block_compression = self.config.index_block_compression_policy.get(0);
286
287        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
288
289        let index_partitioning = self.config.index_block_partitioning_policy.get(0);
290        let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
291
292        log::debug!(
293            "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
294            folder.display(),
295        );
296
297        let mut table_writer = MultiWriter::new(
298            folder.clone(),
299            self.table_id_counter.clone(),
300            64 * 1_024 * 1_024,
301            0,
302        )?
303        .use_data_block_restart_interval(data_block_restart_interval)
304        .use_index_block_restart_interval(index_block_restart_interval)
305        .use_data_block_compression(data_block_compression)
306        .use_index_block_compression(index_block_compression)
307        .use_data_block_size(data_block_size)
308        .use_data_block_hash_ratio(data_block_hash_ratio)
309        .use_bloom_policy({
310            use crate::config::FilterPolicyEntry::{Bloom, None};
311            use crate::table::filter::BloomConstructionPolicy;
312
313            match self.config.filter_policy.get(0) {
314                Bloom(policy) => policy,
315                None => BloomConstructionPolicy::BitsPerKey(0.0),
316            }
317        });
318
319        if index_partitioning {
320            table_writer = table_writer.use_partitioned_index();
321        }
322        if filter_partitioning {
323            table_writer = table_writer.use_partitioned_filter();
324        }
325
326        for item in stream {
327            table_writer.write(item?)?;
328        }
329
330        let result = table_writer.finish()?;
331
332        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
333
334        let pin_filter = self.config.filter_block_pinning_policy.get(0);
335        let pin_index = self.config.index_block_pinning_policy.get(0);
336
337        // Load tables
338        let tables = result
339            .into_iter()
340            .map(|(table_id, checksum)| -> crate::Result<Table> {
341                Table::recover(
342                    folder.join(table_id.to_string()),
343                    checksum,
344                    0,
345                    self.id,
346                    self.config.cache.clone(),
347                    self.config.descriptor_table.clone(),
348                    pin_filter,
349                    pin_index,
350                    #[cfg(feature = "metrics")]
351                    self.metrics.clone(),
352                )
353            })
354            .collect::<crate::Result<Vec<_>>>()?;
355
356        Ok(Some((tables, None)))
357    }
358
359    #[expect(clippy::significant_drop_tightening)]
360    fn register_tables(
361        &self,
362        tables: &[Table],
363        blob_files: Option<&[BlobFile]>,
364        frag_map: Option<crate::blob_tree::FragmentationMap>,
365        sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
366        gc_watermark: SeqNo,
367    ) -> crate::Result<()> {
368        log::trace!(
369            "Registering {} tables, {} blob files",
370            tables.len(),
371            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
372        );
373
374        let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
375        let mut version_lock = self.version_history.write().expect("lock is poisoned");
376
377        version_lock.upgrade_version(
378            &self.config.path,
379            |current| {
380                let mut copy = current.clone();
381
382                copy.version = copy.version.with_new_l0_run(
383                    tables,
384                    blob_files,
385                    frag_map.filter(|x| !x.is_empty()),
386                );
387
388                for &table_id in sealed_memtables_to_delete {
389                    log::trace!("releasing sealed memtable #{table_id}");
390                    copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
391                }
392
393                Ok(copy)
394            },
395            &self.config.seqno,
396        )?;
397
398        if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
399            log::warn!("Version GC failed: {e:?}");
400        }
401
402        Ok(())
403    }
404
405    fn clear_active_memtable(&self) {
406        use crate::tree::sealed::SealedMemtables;
407
408        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
409        let super_version = version_history_lock.latest_version();
410
411        if super_version.active_memtable.is_empty() {
412            return;
413        }
414
415        let mut copy = version_history_lock.latest_version();
416        copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
417        copy.sealed_memtables = Arc::new(SealedMemtables::default());
418
419        // Rotate does not modify the memtable, so it cannot break snapshots
420        copy.seqno = super_version.seqno;
421
422        version_history_lock.append_version(copy);
423
424        log::trace!("cleared active memtable");
425    }
426
427    fn add_sealed_memtable(&self, memtable: Arc<Memtable>) {
428        let mut version_lock = self.version_history.write().expect("lock is poisoned");
429        version_lock.append_sealed_memtable(memtable);
430    }
431
432    fn compact(
433        &self,
434        strategy: Arc<dyn CompactionStrategy>,
435        seqno_threshold: SeqNo,
436    ) -> crate::Result<()> {
437        // NOTE: Read lock major compaction lock
438        // That way, if a major compaction is running, we cannot proceed
439        // But in general, parallel (non-major) compactions can occur
440        let _lock = self
441            .0
442            .major_compaction_lock
443            .read()
444            .expect("lock is poisoned");
445
446        self.inner_compact(strategy, seqno_threshold)
447    }
448
449    fn get_next_table_id(&self) -> TableId {
450        self.0.get_next_table_id()
451    }
452
453    fn tree_config(&self) -> &Config {
454        &self.config
455    }
456
457    fn active_memtable(&self) -> Arc<Memtable> {
458        self.version_history
459            .read()
460            .expect("lock is poisoned")
461            .latest_version()
462            .active_memtable
463    }
464
465    fn tree_type(&self) -> crate::TreeType {
466        crate::TreeType::Standard
467    }
468
469    #[expect(clippy::significant_drop_tightening)]
470    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
471        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
472        let super_version = version_history_lock.latest_version();
473
474        if super_version.active_memtable.is_empty() {
475            return None;
476        }
477
478        let yanked_memtable = super_version.active_memtable;
479
480        let mut copy = version_history_lock.latest_version();
481        copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
482        copy.sealed_memtables =
483            Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
484
485        // Rotate does not modify the memtable so it cannot break snapshots
486        copy.seqno = super_version.seqno;
487
488        version_history_lock.append_version(copy);
489
490        log::trace!(
491            "rotate: added memtable id={} to sealed memtables",
492            yanked_memtable.id,
493        );
494
495        Some(yanked_memtable)
496    }
497
498    fn table_count(&self) -> usize {
499        self.current_version().table_count()
500    }
501
502    fn level_table_count(&self, idx: usize) -> Option<usize> {
503        self.current_version().level(idx).map(|x| x.table_count())
504    }
505
506    fn approximate_len(&self) -> usize {
507        let super_version = self
508            .version_history
509            .read()
510            .expect("lock is poisoned")
511            .latest_version();
512
513        let tables_item_count = self
514            .current_version()
515            .iter_tables()
516            .map(|x| x.metadata.item_count)
517            .sum::<u64>();
518
519        let memtable_count = super_version.active_memtable.len() as u64;
520        let sealed_count = super_version
521            .sealed_memtables
522            .iter()
523            .map(|mt| mt.len())
524            .sum::<usize>() as u64;
525
526        (memtable_count + sealed_count + tables_item_count)
527            .try_into()
528            .expect("should not be too large")
529    }
530
531    fn disk_space(&self) -> u64 {
532        self.current_version()
533            .iter_levels()
534            .map(super::version::Level::size)
535            .sum()
536    }
537
538    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
539        let version = self
540            .version_history
541            .read()
542            .expect("lock is poisoned")
543            .latest_version();
544
545        let active = version.active_memtable.get_highest_seqno();
546
547        let sealed = version
548            .sealed_memtables
549            .iter()
550            .map(|mt| mt.get_highest_seqno())
551            .max()
552            .flatten();
553
554        active.max(sealed)
555    }
556
557    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
558        self.current_version()
559            .iter_tables()
560            .map(Table::get_highest_seqno)
561            .max()
562    }
563
564    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
565        Ok(self
566            .get_internal_entry(key.as_ref(), seqno)?
567            .map(|x| x.value))
568    }
569
570    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
571        &self,
572        key: K,
573        value: V,
574        seqno: SeqNo,
575    ) -> (u64, u64) {
576        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
577        self.append_entry(value)
578    }
579
580    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
581        let value = InternalValue::new_tombstone(key, seqno);
582        self.append_entry(value)
583    }
584
585    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
586        let value = InternalValue::new_weak_tombstone(key, seqno);
587        self.append_entry(value)
588    }
589}
590
591impl Tree {
592    #[doc(hidden)]
593    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
594        version: SuperVersion,
595        range: &'a R,
596        seqno: SeqNo,
597        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
598    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
599        use crate::range::{IterState, TreeIter};
600        use std::ops::Bound::{self, Excluded, Included, Unbounded};
601
602        let lo: Bound<UserKey> = match range.start_bound() {
603            Included(x) => Included(x.as_ref().into()),
604            Excluded(x) => Excluded(x.as_ref().into()),
605            Unbounded => Unbounded,
606        };
607
608        let hi: Bound<UserKey> = match range.end_bound() {
609            Included(x) => Included(x.as_ref().into()),
610            Excluded(x) => Excluded(x.as_ref().into()),
611            Unbounded => Unbounded,
612        };
613
614        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
615
616        let iter_state = { IterState { version, ephemeral } };
617
618        TreeIter::create_range(iter_state, bounds, seqno)
619    }
620
621    pub(crate) fn get_internal_entry_from_version(
622        super_version: &SuperVersion,
623        key: &[u8],
624        seqno: SeqNo,
625    ) -> crate::Result<Option<InternalValue>> {
626        if let Some(entry) = super_version.active_memtable.get(key, seqno) {
627            return Ok(ignore_tombstone_value(entry));
628        }
629
630        // Now look in sealed memtables
631        if let Some(entry) =
632            Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
633        {
634            return Ok(ignore_tombstone_value(entry));
635        }
636
637        // Now look in tables... this may involve disk I/O
638        Self::get_internal_entry_from_tables(&super_version.version, key, seqno)
639    }
640
641    fn get_internal_entry_from_tables(
642        version: &Version,
643        key: &[u8],
644        seqno: SeqNo,
645    ) -> crate::Result<Option<InternalValue>> {
646        // NOTE: Create key hash for hash sharing
647        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
648        let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
649
650        for level in version.iter_levels() {
651            for run in level.iter() {
652                // NOTE: Based on benchmarking, binary search is only worth it with ~4 tables
653                if run.len() >= 4 {
654                    if let Some(table) = run.get_for_key(key) {
655                        if let Some(item) = table.get(key, seqno, key_hash)? {
656                            return Ok(ignore_tombstone_value(item));
657                        }
658                    }
659                } else {
660                    // NOTE: Fallback to linear search
661                    for table in run.iter() {
662                        if !table.is_key_in_key_range(key) {
663                            continue;
664                        }
665
666                        if let Some(item) = table.get(key, seqno, key_hash)? {
667                            return Ok(ignore_tombstone_value(item));
668                        }
669                    }
670                }
671            }
672        }
673
674        Ok(None)
675    }
676
677    fn get_internal_entry_from_sealed_memtables(
678        super_version: &SuperVersion,
679        key: &[u8],
680        seqno: SeqNo,
681    ) -> Option<InternalValue> {
682        for mt in super_version.sealed_memtables.iter().rev() {
683            if let Some(entry) = mt.get(key, seqno) {
684                return Some(entry);
685            }
686        }
687
688        None
689    }
690
691    pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
692        self.version_history
693            .read()
694            .expect("lock is poisoned")
695            .get_version_for_snapshot(seqno)
696    }
697
698    /// Normalizes a user-provided range into owned `Bound<Slice>` values.
699    ///
700    /// Returns a tuple containing:
701    /// - the `OwnedBounds` that mirror the original bounds semantics (including
702    ///   inclusive/exclusive markers and unbounded endpoints), and
703    /// - a `bool` flag indicating whether the normalized range is logically
704    ///   empty (e.g., when the lower bound is greater than the upper bound).
705    ///
706    /// Callers can use the flag to detect empty ranges and skip further work
707    /// while still having access to the normalized bounds for non-empty cases.
708    fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
709        range: &R,
710    ) -> (OwnedBounds, bool) {
711        use Bound::{Excluded, Included, Unbounded};
712
713        let start = match range.start_bound() {
714            Included(key) => Included(Slice::from(key.as_ref())),
715            Excluded(key) => Excluded(Slice::from(key.as_ref())),
716            Unbounded => Unbounded,
717        };
718
719        let end = match range.end_bound() {
720            Included(key) => Included(Slice::from(key.as_ref())),
721            Excluded(key) => Excluded(Slice::from(key.as_ref())),
722            Unbounded => Unbounded,
723        };
724
725        let is_empty =
726            if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
727                lo.as_ref() > hi.as_ref()
728            } else {
729                false
730            };
731
732        (OwnedBounds { start, end }, is_empty)
733    }
734
735    /// Opens an LSM-tree in the given directory.
736    ///
737    /// Will recover previous state if the folder was previously
738    /// occupied by an LSM-tree, including the previous configuration.
739    /// If not, a new tree will be initialized with the given config.
740    ///
741    /// After recovering a previous state, use [`Tree::set_active_memtable`]
742    /// to fill the memtable with data from a write-ahead log for full durability.
743    ///
744    /// # Errors
745    ///
746    /// Returns error, if an IO error occurred.
747    pub(crate) fn open(config: Config) -> crate::Result<Self> {
748        log::debug!("Opening LSM-tree at {}", config.path.display());
749
750        // Check for old version
751        if config.path.join("version").try_exists()? {
752            log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
753            return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
754        }
755
756        let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
757            Self::recover(config)
758        } else {
759            Self::create_new(config)
760        }?;
761
762        Ok(tree)
763    }
764
765    pub(crate) fn consume_writer(
766        &self,
767        writer: crate::table::Writer,
768    ) -> crate::Result<Option<Table>> {
769        let table_file_path = writer.path.clone();
770
771        let Some((_, checksum)) = writer.finish()? else {
772            return Ok(None);
773        };
774
775        log::debug!("Finalized table write at {}", table_file_path.display());
776
777        let pin_filter = self.config.filter_block_pinning_policy.get(0);
778        let pin_index = self.config.index_block_pinning_policy.get(0);
779
780        let created_table = Table::recover(
781            table_file_path,
782            checksum,
783            0,
784            self.id,
785            self.config.cache.clone(),
786            self.config.descriptor_table.clone(),
787            pin_filter,
788            pin_index,
789            #[cfg(feature = "metrics")]
790            self.metrics.clone(),
791        )?;
792
793        log::debug!("Flushed table to {:?}", created_table.path);
794
795        Ok(Some(created_table))
796    }
797
798    /// Returns `true` if there are some tables that are being compacted.
799    #[doc(hidden)]
800    #[must_use]
801    pub fn is_compacting(&self) -> bool {
802        !self
803            .compaction_state
804            .lock()
805            .expect("lock is poisoned")
806            .hidden_set()
807            .is_empty()
808    }
809
810    fn inner_compact(
811        &self,
812        strategy: Arc<dyn CompactionStrategy>,
813        mvcc_gc_watermark: SeqNo,
814    ) -> crate::Result<()> {
815        use crate::compaction::worker::{do_compaction, Options};
816
817        let mut opts = Options::from_tree(self, strategy);
818        opts.mvcc_gc_watermark = mvcc_gc_watermark;
819
820        do_compaction(&opts)?;
821
822        log::debug!("Compaction run over");
823
824        Ok(())
825    }
826
827    #[doc(hidden)]
828    #[must_use]
829    pub fn create_iter(
830        &self,
831        seqno: SeqNo,
832        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
833    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
834        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
835    }
836
837    #[doc(hidden)]
838    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
839        &self,
840        range: &'a R,
841        seqno: SeqNo,
842        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
843    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
844        let version_history_lock = self.version_history.read().expect("lock is poisoned");
845        let super_version = version_history_lock.get_version_for_snapshot(seqno);
846
847        Self::create_internal_range(super_version, range, seqno, ephemeral).map(|item| match item {
848            Ok(kv) => Ok((kv.key.user_key, kv.value)),
849            Err(e) => Err(e),
850        })
851    }
852
853    #[doc(hidden)]
854    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
855        &self,
856        prefix: K,
857        seqno: SeqNo,
858        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
859    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
860        use crate::range::prefix_to_range;
861
862        let range = prefix_to_range(prefix.as_ref());
863        self.create_range(&range, seqno, ephemeral)
864    }
865
866    /// Adds an item to the active memtable.
867    ///
868    /// Returns the added item's size and new size of the memtable.
869    #[doc(hidden)]
870    #[must_use]
871    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
872        self.version_history
873            .read()
874            .expect("lock is poisoned")
875            .latest_version()
876            .active_memtable
877            .insert(value)
878    }
879
880    /// Recovers previous state, by loading the level manifest, tables and blob files.
881    ///
882    /// # Errors
883    ///
884    /// Returns error, if an IO error occurred.
885    fn recover(mut config: Config) -> crate::Result<Self> {
886        use crate::stop_signal::StopSignal;
887        use inner::get_next_tree_id;
888
889        log::info!("Recovering LSM-tree at {}", config.path.display());
890
891        // let manifest = {
892        //     let manifest_path = config.path.join(MANIFEST_FILE);
893        //     let reader = sfa::Reader::new(&manifest_path)?;
894        //     Manifest::decode_from(&manifest_path, &reader)?
895        // };
896
897        // if manifest.version != FormatVersion::V3 {
898        //     return Err(crate::Error::InvalidVersion(manifest.version.into()));
899        // }
900
901        let tree_id = get_next_tree_id();
902
903        #[cfg(feature = "metrics")]
904        let metrics = Arc::new(Metrics::default());
905
906        let version = Self::recover_levels(
907            &config.path,
908            tree_id,
909            &config,
910            #[cfg(feature = "metrics")]
911            &metrics,
912        )?;
913
914        {
915            let manifest_path = config.path.join(format!("v{}", version.id()));
916            let reader = sfa::Reader::new(&manifest_path)?;
917            let manifest = Manifest::decode_from(&manifest_path, &reader)?;
918
919            if manifest.version != FormatVersion::V3 {
920                return Err(crate::Error::InvalidVersion(manifest.version.into()));
921            }
922
923            let requested_tree_type = match config.kv_separation_opts {
924                Some(_) => crate::TreeType::Blob,
925                None => crate::TreeType::Standard,
926            };
927
928            if version.tree_type() != requested_tree_type {
929                log::error!(
930                    "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
931                    version.tree_type(),
932                );
933                return Err(crate::Error::Unrecoverable);
934            }
935
936            // IMPORTANT: Restore persisted config
937            config.level_count = manifest.level_count;
938        }
939
940        let highest_table_id = version
941            .iter_tables()
942            .map(Table::id)
943            .max()
944            .unwrap_or_default();
945
946        let inner = TreeInner {
947            id: tree_id,
948            memtable_id_counter: SequenceNumberCounter::default(),
949            table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
950            blob_file_id_counter: SequenceNumberCounter::default(),
951            version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
952            stop_signal: StopSignal::default(),
953            config,
954            major_compaction_lock: RwLock::default(),
955            flush_lock: Mutex::default(),
956            compaction_state: Arc::new(Mutex::new(CompactionState::default())),
957
958            #[cfg(feature = "metrics")]
959            metrics,
960        };
961
962        Ok(Self(Arc::new(inner)))
963    }
964
965    /// Creates a new LSM-tree in a directory.
966    fn create_new(config: Config) -> crate::Result<Self> {
967        use crate::file::{fsync_directory, TABLES_FOLDER};
968        use std::fs::create_dir_all;
969
970        let path = config.path.clone();
971        log::trace!("Creating LSM-tree at {}", path.display());
972
973        create_dir_all(&path)?;
974
975        let table_folder_path = path.join(TABLES_FOLDER);
976        create_dir_all(&table_folder_path)?;
977
978        // // Create manifest
979        // {
980        //     let mut writer = sfa::Writer::new_at_path(manifest_path)?;
981
982        //     Manifest {
983        //         version: FormatVersion::V3,
984        //         level_count: config.level_count,
985        //         tree_type: if config.kv_separation_opts.is_some() {
986        //             TreeType::Blob
987        //         } else {
988        //             TreeType::Standard
989        //         },
990        //     }
991        //     .encode_into(&mut writer)?;
992
993        //     writer.finish()?;
994        // }
995
996        // IMPORTANT: fsync folders on Unix
997        fsync_directory(&table_folder_path)?;
998        fsync_directory(&path)?;
999
1000        let inner = TreeInner::create_new(config)?;
1001        Ok(Self(Arc::new(inner)))
1002    }
1003
1004    /// Recovers the level manifest, loading all tables from disk.
1005    fn recover_levels<P: AsRef<Path>>(
1006        tree_path: P,
1007        tree_id: TreeId,
1008        config: &Config,
1009        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
1010    ) -> crate::Result<Version> {
1011        use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
1012
1013        let tree_path = tree_path.as_ref();
1014
1015        let recovery = recover(tree_path)?;
1016
1017        let table_map = {
1018            let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum, SeqNo)> =
1019                crate::HashMap::default();
1020
1021            for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1022                for run in table_ids {
1023                    for table in run {
1024                        #[expect(
1025                            clippy::expect_used,
1026                            reason = "there are always less than 256 levels"
1027                        )]
1028                        result.insert(
1029                            table.id,
1030                            (
1031                                level_idx
1032                                    .try_into()
1033                                    .expect("there are less than 256 levels"),
1034                                table.checksum,
1035                                table.global_seqno,
1036                            ),
1037                        );
1038                    }
1039                }
1040            }
1041
1042            result
1043        };
1044
1045        let cnt = table_map.len();
1046
1047        log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1048
1049        let progress_mod = match cnt {
1050            _ if cnt <= 20 => 1,
1051            _ if cnt <= 100 => 10,
1052            _ => 100,
1053        };
1054
1055        let mut tables = vec![];
1056
1057        let table_base_folder = tree_path.join(TABLES_FOLDER);
1058
1059        if !table_base_folder.try_exists()? {
1060            std::fs::create_dir_all(&table_base_folder)?;
1061            fsync_directory(&table_base_folder)?;
1062        }
1063
1064        let mut orphaned_tables = vec![];
1065
1066        for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1067            let dirent = dirent?;
1068            let file_name = dirent.file_name();
1069
1070            // https://en.wikipedia.org/wiki/.DS_Store
1071            if file_name == ".DS_Store" {
1072                continue;
1073            }
1074
1075            // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
1076            if file_name.to_string_lossy().starts_with("._") {
1077                continue;
1078            }
1079
1080            let table_file_name = file_name.to_str().ok_or_else(|| {
1081                log::error!("invalid table file name {}", file_name.display());
1082                crate::Error::Unrecoverable
1083            })?;
1084
1085            let table_file_path = dirent.path();
1086            assert!(!table_file_path.is_dir());
1087
1088            log::debug!("Recovering table from {}", table_file_path.display());
1089
1090            let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1091                log::error!("invalid table file name {table_file_name:?}: {e:?}");
1092                crate::Error::Unrecoverable
1093            })?;
1094
1095            if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) {
1096                let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
1097                let pin_index = config.index_block_pinning_policy.get(level_idx.into());
1098
1099                let table = Table::recover(
1100                    table_file_path,
1101                    checksum,
1102                    global_seqno,
1103                    tree_id,
1104                    config.cache.clone(),
1105                    config.descriptor_table.clone(),
1106                    pin_filter,
1107                    pin_index,
1108                    #[cfg(feature = "metrics")]
1109                    metrics.clone(),
1110                )?;
1111
1112                log::debug!("Recovered table from {:?}", table.path);
1113
1114                tables.push(table);
1115
1116                if idx % progress_mod == 0 {
1117                    log::debug!("Recovered {idx}/{cnt} tables");
1118                }
1119            } else {
1120                orphaned_tables.push(table_file_path);
1121            }
1122        }
1123
1124        if tables.len() < cnt {
1125            log::error!(
1126                "Recovered less tables than expected: {:?}",
1127                table_map.keys(),
1128            );
1129            return Err(crate::Error::Unrecoverable);
1130        }
1131
1132        log::debug!("Successfully recovered {} tables", tables.len());
1133
1134        let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1135            &tree_path.join(crate::file::BLOBS_FOLDER),
1136            &recovery.blob_file_ids,
1137        )?;
1138
1139        let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1140
1141        // NOTE: Cleanup old versions
1142        // But only after we definitely recovered the latest version
1143        Self::cleanup_orphaned_version(tree_path, version.id())?;
1144
1145        for table_path in orphaned_tables {
1146            log::debug!("Deleting orphaned table {}", table_path.display());
1147            std::fs::remove_file(&table_path)?;
1148        }
1149
1150        for blob_file_path in orphaned_blob_files {
1151            log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1152            std::fs::remove_file(&blob_file_path)?;
1153        }
1154
1155        Ok(version)
1156    }
1157
1158    fn cleanup_orphaned_version(
1159        path: &Path,
1160        latest_version_id: crate::version::VersionId,
1161    ) -> crate::Result<()> {
1162        let version_str = format!("v{latest_version_id}");
1163
1164        for file in std::fs::read_dir(path)? {
1165            let dirent = file?;
1166
1167            if dirent.file_type()?.is_dir() {
1168                continue;
1169            }
1170
1171            let name = dirent.file_name();
1172
1173            if name.to_string_lossy().starts_with('v') && *name != *version_str {
1174                log::trace!("Cleanup orphaned version {}", name.display());
1175                std::fs::remove_file(dirent.path())?;
1176            }
1177        }
1178
1179        Ok(())
1180    }
1181}