lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10    compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
11    config::Config,
12    file::CURRENT_VERSION_FILE,
13    format_version::FormatVersion,
14    iter_guard::{IterGuard, IterGuardImpl},
15    manifest::Manifest,
16    memtable::Memtable,
17    slice::Slice,
18    table::Table,
19    value::InternalValue,
20    version::{recovery::recover, SuperVersion, SuperVersions, Version},
21    vlog::BlobFile,
22    AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
23    ValueType,
24};
25use inner::{TreeId, TreeInner};
26use std::{
27    ops::{Bound, RangeBounds},
28    path::Path,
29    sync::{Arc, Mutex, RwLock},
30};
31
32#[cfg(feature = "metrics")]
33use crate::metrics::Metrics;
34
35/// Iterator value guard
36pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39    fn into_inner_if(
40        self,
41        pred: impl Fn(&UserKey) -> bool,
42    ) -> crate::Result<(UserKey, Option<UserValue>)> {
43        let (k, v) = self.0?;
44
45        if pred(&k) {
46            Ok((k, Some(v)))
47        } else {
48            Ok((k, None))
49        }
50    }
51
52    fn key(self) -> crate::Result<UserKey> {
53        self.0.map(|(k, _)| k)
54    }
55
56    fn size(self) -> crate::Result<u32> {
57        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
58        self.into_inner().map(|(_, v)| v.len() as u32)
59    }
60
61    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
62        self.0
63    }
64}
65
66fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
67    if item.is_tombstone() {
68        None
69    } else {
70        Some(item)
71    }
72}
73
74/// A log-structured merge tree (LSM-tree/LSMT)
75#[derive(Clone)]
76pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
77
78impl std::ops::Deref for Tree {
79    type Target = TreeInner;
80
81    fn deref(&self) -> &Self::Target {
82        &self.0
83    }
84}
85
86impl AbstractTree for Tree {
87    fn table_file_cache_size(&self) -> usize {
88        self.config.descriptor_table.len()
89    }
90
91    fn get_version_history_lock(
92        &self,
93    ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
94        self.version_history.write().expect("lock is poisoned")
95    }
96
97    fn next_table_id(&self) -> TableId {
98        self.0.table_id_counter.get()
99    }
100
101    fn id(&self) -> TreeId {
102        self.id
103    }
104
105    fn blob_file_count(&self) -> usize {
106        0
107    }
108
109    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
110        let super_version = self
111            .version_history
112            .read()
113            .expect("lock is poisoned")
114            .get_version_for_snapshot(seqno);
115
116        Self::get_internal_entry_from_version(&super_version, key, seqno)
117    }
118
119    fn current_version(&self) -> Version {
120        self.version_history
121            .read()
122            .expect("poisoned")
123            .latest_version()
124            .version
125    }
126
127    fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
128        self.flush_lock.lock().expect("lock is poisoned")
129    }
130
131    #[cfg(feature = "metrics")]
132    fn metrics(&self) -> &Arc<crate::Metrics> {
133        &self.0.metrics
134    }
135
136    fn version_free_list_len(&self) -> usize {
137        self.version_history
138            .read()
139            .expect("lock is poisoned")
140            .free_list_len()
141    }
142
143    fn prefix<K: AsRef<[u8]>>(
144        &self,
145        prefix: K,
146        seqno: SeqNo,
147        index: Option<(Arc<Memtable>, SeqNo)>,
148    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
149        Box::new(
150            self.create_prefix(&prefix, seqno, index)
151                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
152        )
153    }
154
155    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
156        &self,
157        range: R,
158        seqno: SeqNo,
159        index: Option<(Arc<Memtable>, SeqNo)>,
160    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
161        Box::new(
162            self.create_range(&range, seqno, index)
163                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
164        )
165    }
166
167    /// Returns the number of tombstones in the tree.
168    fn tombstone_count(&self) -> u64 {
169        self.current_version()
170            .iter_tables()
171            .map(Table::tombstone_count)
172            .sum()
173    }
174
175    /// Returns the number of weak tombstones (single deletes) in the tree.
176    fn weak_tombstone_count(&self) -> u64 {
177        self.current_version()
178            .iter_tables()
179            .map(Table::weak_tombstone_count)
180            .sum()
181    }
182
183    /// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
184    fn weak_tombstone_reclaimable_count(&self) -> u64 {
185        self.current_version()
186            .iter_tables()
187            .map(Table::weak_tombstone_reclaimable)
188            .sum()
189    }
190
191    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
192        let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
193
194        if is_empty {
195            return Ok(());
196        }
197
198        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
199
200        // IMPORTANT: Write lock so we can be the only compaction going on
201        let _lock = self
202            .0
203            .major_compaction_lock
204            .write()
205            .expect("lock is poisoned");
206
207        log::info!("Starting drop_range compaction");
208        self.inner_compact(strategy, 0)
209    }
210
211    fn clear(&self) -> crate::Result<()> {
212        let mut versions = self.get_version_history_lock();
213
214        versions.upgrade_version(
215            &self.config.path,
216            |v| {
217                let mut copy = v.clone();
218                copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
219                copy.sealed_memtables = Arc::default();
220                copy.version = Version::new(v.version.id() + 1, self.tree_type());
221                Ok(copy)
222            },
223            &self.config.seqno,
224            &self.config.visible_seqno,
225        )
226    }
227
228    #[doc(hidden)]
229    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
230        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
231
232        // IMPORTANT: Write lock so we can be the only compaction going on
233        let _lock = self
234            .0
235            .major_compaction_lock
236            .write()
237            .expect("lock is poisoned");
238
239        log::info!("Starting major compaction");
240        self.inner_compact(strategy, seqno_threshold)
241    }
242
243    fn l0_run_count(&self) -> usize {
244        self.current_version()
245            .level(0)
246            .map(|x| x.run_count())
247            .unwrap_or_default()
248    }
249
250    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
251        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
252        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
253    }
254
255    fn filter_size(&self) -> u64 {
256        self.current_version()
257            .iter_tables()
258            .map(Table::filter_size)
259            .map(u64::from)
260            .sum()
261    }
262
263    fn pinned_filter_size(&self) -> usize {
264        self.current_version()
265            .iter_tables()
266            .map(Table::pinned_filter_size)
267            .sum()
268    }
269
270    fn pinned_block_index_size(&self) -> usize {
271        self.current_version()
272            .iter_tables()
273            .map(Table::pinned_block_index_size)
274            .sum()
275    }
276
277    fn sealed_memtable_count(&self) -> usize {
278        self.version_history
279            .read()
280            .expect("lock is poisoned")
281            .latest_version()
282            .sealed_memtables
283            .len()
284    }
285
286    fn flush_to_tables(
287        &self,
288        stream: impl Iterator<Item = crate::Result<InternalValue>>,
289    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
290        use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
291        use std::time::Instant;
292
293        let start = Instant::now();
294
295        let folder = self.config.path.join(TABLES_FOLDER);
296
297        let data_block_size = self.config.data_block_size_policy.get(0);
298
299        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
300        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
301
302        let data_block_compression = self.config.data_block_compression_policy.get(0);
303        let index_block_compression = self.config.index_block_compression_policy.get(0);
304
305        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
306
307        let index_partitioning = self.config.index_block_partitioning_policy.get(0);
308        let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
309
310        log::debug!(
311            "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
312            folder.display(),
313        );
314
315        let mut table_writer = MultiWriter::new(
316            folder.clone(),
317            self.table_id_counter.clone(),
318            64 * 1_024 * 1_024,
319            0,
320        )?
321        .use_data_block_restart_interval(data_block_restart_interval)
322        .use_index_block_restart_interval(index_block_restart_interval)
323        .use_data_block_compression(data_block_compression)
324        .use_index_block_compression(index_block_compression)
325        .use_data_block_size(data_block_size)
326        .use_data_block_hash_ratio(data_block_hash_ratio)
327        .use_bloom_policy({
328            use crate::config::FilterPolicyEntry::{Bloom, None};
329            use crate::table::filter::BloomConstructionPolicy;
330
331            match self.config.filter_policy.get(0) {
332                Bloom(policy) => policy,
333                None => BloomConstructionPolicy::BitsPerKey(0.0),
334            }
335        });
336
337        if index_partitioning {
338            table_writer = table_writer.use_partitioned_index();
339        }
340        if filter_partitioning {
341            table_writer = table_writer.use_partitioned_filter();
342        }
343
344        for item in stream {
345            table_writer.write(item?)?;
346        }
347
348        let result = table_writer.finish()?;
349
350        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
351
352        let pin_filter = self.config.filter_block_pinning_policy.get(0);
353        let pin_index = self.config.index_block_pinning_policy.get(0);
354
355        // Load tables
356        let tables = result
357            .into_iter()
358            .map(|(table_id, checksum)| -> crate::Result<Table> {
359                Table::recover(
360                    folder.join(table_id.to_string()),
361                    checksum,
362                    0,
363                    self.id,
364                    self.config.cache.clone(),
365                    self.config.descriptor_table.clone(),
366                    pin_filter,
367                    pin_index,
368                    #[cfg(feature = "metrics")]
369                    self.metrics.clone(),
370                )
371            })
372            .collect::<crate::Result<Vec<_>>>()?;
373
374        Ok(Some((tables, None)))
375    }
376
377    #[expect(clippy::significant_drop_tightening)]
378    fn register_tables(
379        &self,
380        tables: &[Table],
381        blob_files: Option<&[BlobFile]>,
382        frag_map: Option<crate::blob_tree::FragmentationMap>,
383        sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
384        gc_watermark: SeqNo,
385    ) -> crate::Result<()> {
386        log::trace!(
387            "Registering {} tables, {} blob files",
388            tables.len(),
389            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
390        );
391
392        let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
393        let mut version_lock = self.version_history.write().expect("lock is poisoned");
394
395        version_lock.upgrade_version(
396            &self.config.path,
397            |current| {
398                let mut copy = current.clone();
399
400                copy.version = copy.version.with_new_l0_run(
401                    tables,
402                    blob_files,
403                    frag_map.filter(|x| !x.is_empty()),
404                );
405
406                for &table_id in sealed_memtables_to_delete {
407                    log::trace!("releasing sealed memtable #{table_id}");
408                    copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
409                }
410
411                Ok(copy)
412            },
413            &self.config.seqno,
414            &self.config.visible_seqno,
415        )?;
416
417        if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
418            log::warn!("Version GC failed: {e:?}");
419        }
420
421        Ok(())
422    }
423
424    fn clear_active_memtable(&self) {
425        use crate::tree::sealed::SealedMemtables;
426
427        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
428        let super_version = version_history_lock.latest_version();
429
430        if super_version.active_memtable.is_empty() {
431            return;
432        }
433
434        let mut copy = version_history_lock.latest_version();
435        copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
436        copy.sealed_memtables = Arc::new(SealedMemtables::default());
437
438        // Rotate does not modify the memtable, so it cannot break snapshots
439        copy.seqno = super_version.seqno;
440
441        version_history_lock.replace_latest_version(copy);
442
443        log::trace!("cleared active memtable");
444    }
445
446    fn add_sealed_memtable(&self, memtable: Arc<Memtable>) {
447        let mut version_lock = self.version_history.write().expect("lock is poisoned");
448        version_lock.append_sealed_memtable(memtable);
449    }
450
451    fn compact(
452        &self,
453        strategy: Arc<dyn CompactionStrategy>,
454        seqno_threshold: SeqNo,
455    ) -> crate::Result<()> {
456        // NOTE: Read lock major compaction lock
457        // That way, if a major compaction is running, we cannot proceed
458        // But in general, parallel (non-major) compactions can occur
459        let _lock = self
460            .0
461            .major_compaction_lock
462            .read()
463            .expect("lock is poisoned");
464
465        self.inner_compact(strategy, seqno_threshold)
466    }
467
468    fn get_next_table_id(&self) -> TableId {
469        self.0.get_next_table_id()
470    }
471
472    fn tree_config(&self) -> &Config {
473        &self.config
474    }
475
476    fn active_memtable(&self) -> Arc<Memtable> {
477        self.version_history
478            .read()
479            .expect("lock is poisoned")
480            .latest_version()
481            .active_memtable
482    }
483
484    fn tree_type(&self) -> crate::TreeType {
485        crate::TreeType::Standard
486    }
487
488    #[expect(clippy::significant_drop_tightening)]
489    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
490        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
491        let super_version = version_history_lock.latest_version();
492
493        if super_version.active_memtable.is_empty() {
494            return None;
495        }
496
497        let yanked_memtable = super_version.active_memtable;
498
499        let mut copy = version_history_lock.latest_version();
500        copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
501        copy.sealed_memtables =
502            Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
503
504        // Rotate does not modify the memtable so it cannot break snapshots
505        copy.seqno = super_version.seqno;
506
507        version_history_lock.replace_latest_version(copy);
508
509        log::trace!(
510            "rotate: added memtable id={} to sealed memtables",
511            yanked_memtable.id,
512        );
513
514        Some(yanked_memtable)
515    }
516
517    fn table_count(&self) -> usize {
518        self.current_version().table_count()
519    }
520
521    fn level_table_count(&self, idx: usize) -> Option<usize> {
522        self.current_version().level(idx).map(|x| x.table_count())
523    }
524
525    fn approximate_len(&self) -> usize {
526        let super_version = self
527            .version_history
528            .read()
529            .expect("lock is poisoned")
530            .latest_version();
531
532        let tables_item_count = self
533            .current_version()
534            .iter_tables()
535            .map(|x| x.metadata.item_count)
536            .sum::<u64>();
537
538        let memtable_count = super_version.active_memtable.len() as u64;
539        let sealed_count = super_version
540            .sealed_memtables
541            .iter()
542            .map(|mt| mt.len())
543            .sum::<usize>() as u64;
544
545        (memtable_count + sealed_count + tables_item_count)
546            .try_into()
547            .expect("should not be too large")
548    }
549
550    fn disk_space(&self) -> u64 {
551        self.current_version()
552            .iter_levels()
553            .map(super::version::Level::size)
554            .sum()
555    }
556
557    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
558        let version = self
559            .version_history
560            .read()
561            .expect("lock is poisoned")
562            .latest_version();
563
564        let active = version.active_memtable.get_highest_seqno();
565
566        let sealed = version
567            .sealed_memtables
568            .iter()
569            .map(|mt| mt.get_highest_seqno())
570            .max()
571            .flatten();
572
573        active.max(sealed)
574    }
575
576    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
577        self.current_version()
578            .iter_tables()
579            .map(Table::get_highest_seqno)
580            .max()
581    }
582
583    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
584        Ok(self
585            .get_internal_entry(key.as_ref(), seqno)?
586            .map(|x| x.value))
587    }
588
589    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
590        &self,
591        key: K,
592        value: V,
593        seqno: SeqNo,
594    ) -> (u64, u64) {
595        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
596        self.append_entry(value)
597    }
598
599    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
600        let value = InternalValue::new_tombstone(key, seqno);
601        self.append_entry(value)
602    }
603
604    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
605        let value = InternalValue::new_weak_tombstone(key, seqno);
606        self.append_entry(value)
607    }
608}
609
610impl Tree {
611    #[doc(hidden)]
612    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
613        version: SuperVersion,
614        range: &'a R,
615        seqno: SeqNo,
616        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
617    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
618        use crate::range::{IterState, TreeIter};
619        use std::ops::Bound::{self, Excluded, Included, Unbounded};
620
621        let lo: Bound<UserKey> = match range.start_bound() {
622            Included(x) => Included(x.as_ref().into()),
623            Excluded(x) => Excluded(x.as_ref().into()),
624            Unbounded => Unbounded,
625        };
626
627        let hi: Bound<UserKey> = match range.end_bound() {
628            Included(x) => Included(x.as_ref().into()),
629            Excluded(x) => Excluded(x.as_ref().into()),
630            Unbounded => Unbounded,
631        };
632
633        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
634
635        let iter_state = { IterState { version, ephemeral } };
636
637        TreeIter::create_range(iter_state, bounds, seqno)
638    }
639
640    pub(crate) fn get_internal_entry_from_version(
641        super_version: &SuperVersion,
642        key: &[u8],
643        seqno: SeqNo,
644    ) -> crate::Result<Option<InternalValue>> {
645        if let Some(entry) = super_version.active_memtable.get(key, seqno) {
646            return Ok(ignore_tombstone_value(entry));
647        }
648
649        // Now look in sealed memtables
650        if let Some(entry) =
651            Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
652        {
653            return Ok(ignore_tombstone_value(entry));
654        }
655
656        // Now look in tables... this may involve disk I/O
657        Self::get_internal_entry_from_tables(&super_version.version, key, seqno)
658    }
659
660    fn get_internal_entry_from_tables(
661        version: &Version,
662        key: &[u8],
663        seqno: SeqNo,
664    ) -> crate::Result<Option<InternalValue>> {
665        // NOTE: Create key hash for hash sharing
666        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
667        let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
668
669        for level in version.iter_levels() {
670            for run in level.iter() {
671                // NOTE: Based on benchmarking, binary search is only worth it with ~4 tables
672                if run.len() >= 4 {
673                    if let Some(table) = run.get_for_key(key) {
674                        if let Some(item) = table.get(key, seqno, key_hash)? {
675                            return Ok(ignore_tombstone_value(item));
676                        }
677                    }
678                } else {
679                    // NOTE: Fallback to linear search
680                    for table in run.iter() {
681                        if !table.is_key_in_key_range(key) {
682                            continue;
683                        }
684
685                        if let Some(item) = table.get(key, seqno, key_hash)? {
686                            return Ok(ignore_tombstone_value(item));
687                        }
688                    }
689                }
690            }
691        }
692
693        Ok(None)
694    }
695
696    fn get_internal_entry_from_sealed_memtables(
697        super_version: &SuperVersion,
698        key: &[u8],
699        seqno: SeqNo,
700    ) -> Option<InternalValue> {
701        for mt in super_version.sealed_memtables.iter().rev() {
702            if let Some(entry) = mt.get(key, seqno) {
703                return Some(entry);
704            }
705        }
706
707        None
708    }
709
710    pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
711        self.version_history
712            .read()
713            .expect("lock is poisoned")
714            .get_version_for_snapshot(seqno)
715    }
716
717    /// Normalizes a user-provided range into owned `Bound<Slice>` values.
718    ///
719    /// Returns a tuple containing:
720    /// - the `OwnedBounds` that mirror the original bounds semantics (including
721    ///   inclusive/exclusive markers and unbounded endpoints), and
722    /// - a `bool` flag indicating whether the normalized range is logically
723    ///   empty (e.g., when the lower bound is greater than the upper bound).
724    ///
725    /// Callers can use the flag to detect empty ranges and skip further work
726    /// while still having access to the normalized bounds for non-empty cases.
727    fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
728        range: &R,
729    ) -> (OwnedBounds, bool) {
730        use Bound::{Excluded, Included, Unbounded};
731
732        let start = match range.start_bound() {
733            Included(key) => Included(Slice::from(key.as_ref())),
734            Excluded(key) => Excluded(Slice::from(key.as_ref())),
735            Unbounded => Unbounded,
736        };
737
738        let end = match range.end_bound() {
739            Included(key) => Included(Slice::from(key.as_ref())),
740            Excluded(key) => Excluded(Slice::from(key.as_ref())),
741            Unbounded => Unbounded,
742        };
743
744        let is_empty =
745            if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
746                lo.as_ref() > hi.as_ref()
747            } else {
748                false
749            };
750
751        (OwnedBounds { start, end }, is_empty)
752    }
753
754    /// Opens an LSM-tree in the given directory.
755    ///
756    /// Will recover previous state if the folder was previously
757    /// occupied by an LSM-tree, including the previous configuration.
758    /// If not, a new tree will be initialized with the given config.
759    ///
760    /// After recovering a previous state, use [`Tree::set_active_memtable`]
761    /// to fill the memtable with data from a write-ahead log for full durability.
762    ///
763    /// # Errors
764    ///
765    /// Returns error, if an IO error occurred.
766    pub(crate) fn open(config: Config) -> crate::Result<Self> {
767        log::debug!("Opening LSM-tree at {}", config.path.display());
768
769        // Check for old version
770        if config.path.join("version").try_exists()? {
771            log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
772            return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
773        }
774
775        let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
776            Self::recover(config)
777        } else {
778            Self::create_new(config)
779        }?;
780
781        Ok(tree)
782    }
783
784    pub(crate) fn consume_writer(
785        &self,
786        writer: crate::table::Writer,
787    ) -> crate::Result<Option<Table>> {
788        let table_file_path = writer.path.clone();
789
790        let Some((_, checksum)) = writer.finish()? else {
791            return Ok(None);
792        };
793
794        log::debug!("Finalized table write at {}", table_file_path.display());
795
796        let pin_filter = self.config.filter_block_pinning_policy.get(0);
797        let pin_index = self.config.index_block_pinning_policy.get(0);
798
799        let created_table = Table::recover(
800            table_file_path,
801            checksum,
802            0,
803            self.id,
804            self.config.cache.clone(),
805            self.config.descriptor_table.clone(),
806            pin_filter,
807            pin_index,
808            #[cfg(feature = "metrics")]
809            self.metrics.clone(),
810        )?;
811
812        log::debug!("Flushed table to {:?}", created_table.path);
813
814        Ok(Some(created_table))
815    }
816
817    /// Returns `true` if there are some tables that are being compacted.
818    #[doc(hidden)]
819    #[must_use]
820    pub fn is_compacting(&self) -> bool {
821        !self
822            .compaction_state
823            .lock()
824            .expect("lock is poisoned")
825            .hidden_set()
826            .is_empty()
827    }
828
829    fn inner_compact(
830        &self,
831        strategy: Arc<dyn CompactionStrategy>,
832        mvcc_gc_watermark: SeqNo,
833    ) -> crate::Result<()> {
834        use crate::compaction::worker::{do_compaction, Options};
835
836        let mut opts = Options::from_tree(self, strategy);
837        opts.mvcc_gc_watermark = mvcc_gc_watermark;
838
839        do_compaction(&opts)?;
840
841        log::debug!("Compaction run over");
842
843        Ok(())
844    }
845
846    #[doc(hidden)]
847    #[must_use]
848    pub fn create_iter(
849        &self,
850        seqno: SeqNo,
851        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
852    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
853        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
854    }
855
856    #[doc(hidden)]
857    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
858        &self,
859        range: &'a R,
860        seqno: SeqNo,
861        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
862    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
863        let version_history_lock = self.version_history.read().expect("lock is poisoned");
864        let super_version = version_history_lock.get_version_for_snapshot(seqno);
865
866        Self::create_internal_range(super_version, range, seqno, ephemeral).map(|item| match item {
867            Ok(kv) => Ok((kv.key.user_key, kv.value)),
868            Err(e) => Err(e),
869        })
870    }
871
872    #[doc(hidden)]
873    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
874        &self,
875        prefix: K,
876        seqno: SeqNo,
877        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
878    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
879        use crate::range::prefix_to_range;
880
881        let range = prefix_to_range(prefix.as_ref());
882        self.create_range(&range, seqno, ephemeral)
883    }
884
885    /// Adds an item to the active memtable.
886    ///
887    /// Returns the added item's size and new size of the memtable.
888    #[doc(hidden)]
889    #[must_use]
890    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
891        self.version_history
892            .read()
893            .expect("lock is poisoned")
894            .latest_version()
895            .active_memtable
896            .insert(value)
897    }
898
899    /// Recovers previous state, by loading the level manifest, tables and blob files.
900    ///
901    /// # Errors
902    ///
903    /// Returns error, if an IO error occurred.
904    fn recover(mut config: Config) -> crate::Result<Self> {
905        use crate::stop_signal::StopSignal;
906        use inner::get_next_tree_id;
907
908        log::info!("Recovering LSM-tree at {}", config.path.display());
909
910        // let manifest = {
911        //     let manifest_path = config.path.join(MANIFEST_FILE);
912        //     let reader = sfa::Reader::new(&manifest_path)?;
913        //     Manifest::decode_from(&manifest_path, &reader)?
914        // };
915
916        // if manifest.version != FormatVersion::V3 {
917        //     return Err(crate::Error::InvalidVersion(manifest.version.into()));
918        // }
919
920        let tree_id = get_next_tree_id();
921
922        #[cfg(feature = "metrics")]
923        let metrics = Arc::new(Metrics::default());
924
925        let version = Self::recover_levels(
926            &config.path,
927            tree_id,
928            &config,
929            #[cfg(feature = "metrics")]
930            &metrics,
931        )?;
932
933        {
934            let manifest_path = config.path.join(format!("v{}", version.id()));
935            let reader = sfa::Reader::new(&manifest_path)?;
936            let manifest = Manifest::decode_from(&manifest_path, &reader)?;
937
938            if manifest.version != FormatVersion::V3 {
939                return Err(crate::Error::InvalidVersion(manifest.version.into()));
940            }
941
942            let requested_tree_type = match config.kv_separation_opts {
943                Some(_) => crate::TreeType::Blob,
944                None => crate::TreeType::Standard,
945            };
946
947            if version.tree_type() != requested_tree_type {
948                log::error!(
949                    "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
950                    version.tree_type(),
951                );
952                return Err(crate::Error::Unrecoverable);
953            }
954
955            // IMPORTANT: Restore persisted config
956            config.level_count = manifest.level_count;
957        }
958
959        let highest_table_id = version
960            .iter_tables()
961            .map(Table::id)
962            .max()
963            .unwrap_or_default();
964
965        let inner = TreeInner {
966            id: tree_id,
967            memtable_id_counter: SequenceNumberCounter::default(),
968            table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
969            blob_file_id_counter: SequenceNumberCounter::default(),
970            version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
971            stop_signal: StopSignal::default(),
972            config,
973            major_compaction_lock: RwLock::default(),
974            flush_lock: Mutex::default(),
975            compaction_state: Arc::new(Mutex::new(CompactionState::default())),
976
977            #[cfg(feature = "metrics")]
978            metrics,
979        };
980
981        Ok(Self(Arc::new(inner)))
982    }
983
984    /// Creates a new LSM-tree in a directory.
985    fn create_new(config: Config) -> crate::Result<Self> {
986        use crate::file::{fsync_directory, TABLES_FOLDER};
987        use std::fs::create_dir_all;
988
989        let path = config.path.clone();
990        log::trace!("Creating LSM-tree at {}", path.display());
991
992        create_dir_all(&path)?;
993
994        let table_folder_path = path.join(TABLES_FOLDER);
995        create_dir_all(&table_folder_path)?;
996
997        // IMPORTANT: fsync folders on Unix
998        fsync_directory(&table_folder_path)?;
999        fsync_directory(&path)?;
1000
1001        let inner = TreeInner::create_new(config)?;
1002        Ok(Self(Arc::new(inner)))
1003    }
1004
1005    /// Recovers the level manifest, loading all tables from disk.
1006    fn recover_levels<P: AsRef<Path>>(
1007        tree_path: P,
1008        tree_id: TreeId,
1009        config: &Config,
1010        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
1011    ) -> crate::Result<Version> {
1012        use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
1013
1014        let tree_path = tree_path.as_ref();
1015
1016        let recovery = recover(tree_path)?;
1017
1018        let table_map = {
1019            let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum, SeqNo)> =
1020                crate::HashMap::default();
1021
1022            for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1023                for run in table_ids {
1024                    for table in run {
1025                        #[expect(
1026                            clippy::expect_used,
1027                            reason = "there are always less than 256 levels"
1028                        )]
1029                        result.insert(
1030                            table.id,
1031                            (
1032                                level_idx
1033                                    .try_into()
1034                                    .expect("there are less than 256 levels"),
1035                                table.checksum,
1036                                table.global_seqno,
1037                            ),
1038                        );
1039                    }
1040                }
1041            }
1042
1043            result
1044        };
1045
1046        let cnt = table_map.len();
1047
1048        log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1049
1050        let progress_mod = match cnt {
1051            _ if cnt <= 20 => 1,
1052            _ if cnt <= 100 => 10,
1053            _ => 100,
1054        };
1055
1056        let mut tables = vec![];
1057
1058        let table_base_folder = tree_path.join(TABLES_FOLDER);
1059
1060        if !table_base_folder.try_exists()? {
1061            std::fs::create_dir_all(&table_base_folder)?;
1062            fsync_directory(&table_base_folder)?;
1063        }
1064
1065        let mut orphaned_tables = vec![];
1066
1067        for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1068            let dirent = dirent?;
1069            let file_name = dirent.file_name();
1070
1071            // https://en.wikipedia.org/wiki/.DS_Store
1072            if file_name == ".DS_Store" {
1073                continue;
1074            }
1075
1076            // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
1077            if file_name.to_string_lossy().starts_with("._") {
1078                continue;
1079            }
1080
1081            let table_file_name = file_name.to_str().ok_or_else(|| {
1082                log::error!("invalid table file name {}", file_name.display());
1083                crate::Error::Unrecoverable
1084            })?;
1085
1086            let table_file_path = dirent.path();
1087            assert!(!table_file_path.is_dir());
1088
1089            log::debug!("Recovering table from {}", table_file_path.display());
1090
1091            let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1092                log::error!("invalid table file name {table_file_name:?}: {e:?}");
1093                crate::Error::Unrecoverable
1094            })?;
1095
1096            if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) {
1097                let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
1098                let pin_index = config.index_block_pinning_policy.get(level_idx.into());
1099
1100                let table = Table::recover(
1101                    table_file_path,
1102                    checksum,
1103                    global_seqno,
1104                    tree_id,
1105                    config.cache.clone(),
1106                    config.descriptor_table.clone(),
1107                    pin_filter,
1108                    pin_index,
1109                    #[cfg(feature = "metrics")]
1110                    metrics.clone(),
1111                )?;
1112
1113                log::debug!("Recovered table from {:?}", table.path);
1114
1115                tables.push(table);
1116
1117                if idx % progress_mod == 0 {
1118                    log::debug!("Recovered {idx}/{cnt} tables");
1119                }
1120            } else {
1121                orphaned_tables.push(table_file_path);
1122            }
1123        }
1124
1125        if tables.len() < cnt {
1126            log::error!(
1127                "Recovered less tables than expected: {:?}",
1128                table_map.keys(),
1129            );
1130            return Err(crate::Error::Unrecoverable);
1131        }
1132
1133        log::debug!("Successfully recovered {} tables", tables.len());
1134
1135        let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1136            &tree_path.join(crate::file::BLOBS_FOLDER),
1137            &recovery.blob_file_ids,
1138        )?;
1139
1140        let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1141
1142        // NOTE: Cleanup old versions
1143        // But only after we definitely recovered the latest version
1144        Self::cleanup_orphaned_version(tree_path, version.id())?;
1145
1146        for table_path in orphaned_tables {
1147            log::debug!("Deleting orphaned table {}", table_path.display());
1148            std::fs::remove_file(&table_path)?;
1149        }
1150
1151        for blob_file_path in orphaned_blob_files {
1152            log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1153            std::fs::remove_file(&blob_file_path)?;
1154        }
1155
1156        Ok(version)
1157    }
1158
1159    fn cleanup_orphaned_version(
1160        path: &Path,
1161        latest_version_id: crate::version::VersionId,
1162    ) -> crate::Result<()> {
1163        let version_str = format!("v{latest_version_id}");
1164
1165        for file in std::fs::read_dir(path)? {
1166            let dirent = file?;
1167
1168            if dirent.file_type()?.is_dir() {
1169                continue;
1170            }
1171
1172            let name = dirent.file_name();
1173
1174            if name.to_string_lossy().starts_with('v') && *name != *version_str {
1175                log::trace!("Cleanup orphaned version {}", name.display());
1176                std::fs::remove_file(dirent.path())?;
1177            }
1178        }
1179
1180        Ok(())
1181    }
1182}