lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10    compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
11    config::Config,
12    file::CURRENT_VERSION_FILE,
13    format_version::FormatVersion,
14    iter_guard::{IterGuard, IterGuardImpl},
15    manifest::Manifest,
16    memtable::Memtable,
17    slice::Slice,
18    table::Table,
19    value::InternalValue,
20    version::{recovery::recover, SuperVersion, SuperVersions, Version},
21    vlog::BlobFile,
22    AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
23    ValueType,
24};
25use inner::{TreeId, TreeInner};
26use std::{
27    ops::{Bound, RangeBounds},
28    path::Path,
29    sync::{Arc, Mutex, RwLock},
30};
31
32#[cfg(feature = "metrics")]
33use crate::metrics::Metrics;
34
35/// Iterator value guard
36pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39    fn into_inner_if(
40        self,
41        pred: impl Fn(&UserKey) -> bool,
42    ) -> crate::Result<(UserKey, Option<UserValue>)> {
43        let (k, v) = self.0?;
44
45        if pred(&k) {
46            Ok((k, Some(v)))
47        } else {
48            Ok((k, None))
49        }
50    }
51
52    fn key(self) -> crate::Result<UserKey> {
53        self.0.map(|(k, _)| k)
54    }
55
56    fn size(self) -> crate::Result<u32> {
57        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
58        self.into_inner().map(|(_, v)| v.len() as u32)
59    }
60
61    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
62        self.0
63    }
64}
65
66fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
67    if item.is_tombstone() {
68        None
69    } else {
70        Some(item)
71    }
72}
73
74/// A log-structured merge tree (LSM-tree/LSMT)
75#[derive(Clone)]
76pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
77
78impl std::ops::Deref for Tree {
79    type Target = TreeInner;
80
81    fn deref(&self) -> &Self::Target {
82        &self.0
83    }
84}
85
86impl AbstractTree for Tree {
87    fn table_file_cache_size(&self) -> usize {
88        self.config.descriptor_table.len()
89    }
90
91    fn get_version_history_lock(
92        &self,
93    ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
94        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
95        self.version_history.write().expect("lock is poisoned")
96    }
97
98    fn next_table_id(&self) -> TableId {
99        self.0.table_id_counter.get()
100    }
101
102    fn id(&self) -> TreeId {
103        self.id
104    }
105
106    fn blob_file_count(&self) -> usize {
107        0
108    }
109
110    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
111        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
112        let super_version = self
113            .version_history
114            .read()
115            .expect("lock is poisoned")
116            .get_version_for_snapshot(seqno);
117
118        Self::get_internal_entry_from_version(&super_version, key, seqno)
119    }
120
121    fn current_version(&self) -> Version {
122        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
123        self.version_history
124            .read()
125            .expect("poisoned")
126            .latest_version()
127            .version
128    }
129
130    fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
131        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
132        self.flush_lock.lock().expect("lock is poisoned")
133    }
134
135    #[cfg(feature = "metrics")]
136    fn metrics(&self) -> &Arc<crate::Metrics> {
137        &self.0.metrics
138    }
139
140    fn version_free_list_len(&self) -> usize {
141        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
142        self.version_history
143            .read()
144            .expect("lock is poisoned")
145            .free_list_len()
146    }
147
148    fn prefix<K: AsRef<[u8]>>(
149        &self,
150        prefix: K,
151        seqno: SeqNo,
152        index: Option<(Arc<Memtable>, SeqNo)>,
153    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
154        Box::new(
155            self.create_prefix(&prefix, seqno, index)
156                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
157        )
158    }
159
160    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
161        &self,
162        range: R,
163        seqno: SeqNo,
164        index: Option<(Arc<Memtable>, SeqNo)>,
165    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
166        Box::new(
167            self.create_range(&range, seqno, index)
168                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
169        )
170    }
171
172    /// Returns the number of tombstones in the tree.
173    fn tombstone_count(&self) -> u64 {
174        self.current_version()
175            .iter_tables()
176            .map(Table::tombstone_count)
177            .sum()
178    }
179
180    /// Returns the number of weak tombstones (single deletes) in the tree.
181    fn weak_tombstone_count(&self) -> u64 {
182        self.current_version()
183            .iter_tables()
184            .map(Table::weak_tombstone_count)
185            .sum()
186    }
187
188    /// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
189    fn weak_tombstone_reclaimable_count(&self) -> u64 {
190        self.current_version()
191            .iter_tables()
192            .map(Table::weak_tombstone_reclaimable)
193            .sum()
194    }
195
196    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
197        let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
198
199        if is_empty {
200            return Ok(());
201        }
202
203        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
204
205        // IMPORTANT: Write lock so we can be the only compaction going on
206        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
207        let _lock = self
208            .0
209            .major_compaction_lock
210            .write()
211            .expect("lock is poisoned");
212
213        log::info!("Starting drop_range compaction");
214        self.inner_compact(strategy, 0)
215    }
216
217    fn clear(&self) -> crate::Result<()> {
218        let mut versions = self.get_version_history_lock();
219
220        versions.upgrade_version(
221            &self.config.path,
222            |v| {
223                let mut copy = v.clone();
224                copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
225                copy.sealed_memtables = Arc::default();
226                copy.version = Version::new(v.version.id() + 1, self.tree_type());
227                Ok(copy)
228            },
229            &self.config.seqno,
230            &self.config.visible_seqno,
231        )
232    }
233
234    #[doc(hidden)]
235    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
236        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
237
238        // IMPORTANT: Write lock so we can be the only compaction going on
239        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
240        let _lock = self
241            .0
242            .major_compaction_lock
243            .write()
244            .expect("lock is poisoned");
245
246        log::info!("Starting major compaction");
247        self.inner_compact(strategy, seqno_threshold)
248    }
249
250    fn l0_run_count(&self) -> usize {
251        self.current_version()
252            .level(0)
253            .map(|x| x.run_count())
254            .unwrap_or_default()
255    }
256
257    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
258        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
259        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
260    }
261
262    fn filter_size(&self) -> u64 {
263        self.current_version()
264            .iter_tables()
265            .map(Table::filter_size)
266            .map(u64::from)
267            .sum()
268    }
269
270    fn pinned_filter_size(&self) -> usize {
271        self.current_version()
272            .iter_tables()
273            .map(Table::pinned_filter_size)
274            .sum()
275    }
276
277    fn pinned_block_index_size(&self) -> usize {
278        self.current_version()
279            .iter_tables()
280            .map(Table::pinned_block_index_size)
281            .sum()
282    }
283
284    fn sealed_memtable_count(&self) -> usize {
285        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
286        self.version_history
287            .read()
288            .expect("lock is poisoned")
289            .latest_version()
290            .sealed_memtables
291            .len()
292    }
293
294    fn flush_to_tables(
295        &self,
296        stream: impl Iterator<Item = crate::Result<InternalValue>>,
297    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
298        use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
299        use std::time::Instant;
300
301        let start = Instant::now();
302
303        let folder = self.config.path.join(TABLES_FOLDER);
304
305        let data_block_size = self.config.data_block_size_policy.get(0);
306
307        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
308        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
309
310        let data_block_compression = self.config.data_block_compression_policy.get(0);
311        let index_block_compression = self.config.index_block_compression_policy.get(0);
312
313        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
314
315        let index_partitioning = self.config.index_block_partitioning_policy.get(0);
316        let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
317
318        log::debug!(
319            "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
320            folder.display(),
321        );
322
323        let mut table_writer = MultiWriter::new(
324            folder.clone(),
325            self.table_id_counter.clone(),
326            64 * 1_024 * 1_024,
327            0,
328        )?
329        .use_data_block_restart_interval(data_block_restart_interval)
330        .use_index_block_restart_interval(index_block_restart_interval)
331        .use_data_block_compression(data_block_compression)
332        .use_index_block_compression(index_block_compression)
333        .use_data_block_size(data_block_size)
334        .use_data_block_hash_ratio(data_block_hash_ratio)
335        .use_bloom_policy({
336            use crate::config::FilterPolicyEntry::{Bloom, None};
337            use crate::table::filter::BloomConstructionPolicy;
338
339            match self.config.filter_policy.get(0) {
340                Bloom(policy) => policy,
341                None => BloomConstructionPolicy::BitsPerKey(0.0),
342            }
343        });
344
345        if index_partitioning {
346            table_writer = table_writer.use_partitioned_index();
347        }
348        if filter_partitioning {
349            table_writer = table_writer.use_partitioned_filter();
350        }
351
352        for item in stream {
353            table_writer.write(item?)?;
354        }
355
356        let result = table_writer.finish()?;
357
358        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
359
360        let pin_filter = self.config.filter_block_pinning_policy.get(0);
361        let pin_index = self.config.index_block_pinning_policy.get(0);
362
363        // Load tables
364        let tables = result
365            .into_iter()
366            .map(|(table_id, checksum)| -> crate::Result<Table> {
367                Table::recover(
368                    folder.join(table_id.to_string()),
369                    checksum,
370                    0,
371                    self.id,
372                    self.config.cache.clone(),
373                    self.config.descriptor_table.clone(),
374                    pin_filter,
375                    pin_index,
376                    #[cfg(feature = "metrics")]
377                    self.metrics.clone(),
378                )
379            })
380            .collect::<crate::Result<Vec<_>>>()?;
381
382        Ok(Some((tables, None)))
383    }
384
385    #[expect(clippy::significant_drop_tightening)]
386    fn register_tables(
387        &self,
388        tables: &[Table],
389        blob_files: Option<&[BlobFile]>,
390        frag_map: Option<crate::blob_tree::FragmentationMap>,
391        sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
392        gc_watermark: SeqNo,
393    ) -> crate::Result<()> {
394        log::trace!(
395            "Registering {} tables, {} blob files",
396            tables.len(),
397            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
398        );
399
400        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
401        let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
402        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
403        let mut version_lock = self.version_history.write().expect("lock is poisoned");
404
405        version_lock.upgrade_version(
406            &self.config.path,
407            |current| {
408                let mut copy = current.clone();
409
410                copy.version = copy.version.with_new_l0_run(
411                    tables,
412                    blob_files,
413                    frag_map.filter(|x| !x.is_empty()),
414                );
415
416                for &table_id in sealed_memtables_to_delete {
417                    log::trace!("releasing sealed memtable #{table_id}");
418                    copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
419                }
420
421                Ok(copy)
422            },
423            &self.config.seqno,
424            &self.config.visible_seqno,
425        )?;
426
427        if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
428            log::warn!("Version GC failed: {e:?}");
429        }
430
431        Ok(())
432    }
433
434    fn clear_active_memtable(&self) {
435        use crate::tree::sealed::SealedMemtables;
436
437        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
438        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
439        let super_version = version_history_lock.latest_version();
440
441        if super_version.active_memtable.is_empty() {
442            return;
443        }
444
445        let mut copy = version_history_lock.latest_version();
446        copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
447        copy.sealed_memtables = Arc::new(SealedMemtables::default());
448
449        // Rotate does not modify the memtable, so it cannot break snapshots
450        copy.seqno = super_version.seqno;
451
452        version_history_lock.replace_latest_version(copy);
453
454        log::trace!("cleared active memtable");
455    }
456
457    fn compact(
458        &self,
459        strategy: Arc<dyn CompactionStrategy>,
460        seqno_threshold: SeqNo,
461    ) -> crate::Result<()> {
462        // NOTE: Read lock major compaction lock
463        // That way, if a major compaction is running, we cannot proceed
464        // But in general, parallel (non-major) compactions can occur
465        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
466        let _lock = self
467            .0
468            .major_compaction_lock
469            .read()
470            .expect("lock is poisoned");
471
472        self.inner_compact(strategy, seqno_threshold)
473    }
474
475    fn get_next_table_id(&self) -> TableId {
476        self.0.get_next_table_id()
477    }
478
479    fn tree_config(&self) -> &Config {
480        &self.config
481    }
482
483    fn active_memtable(&self) -> Arc<Memtable> {
484        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
485        self.version_history
486            .read()
487            .expect("lock is poisoned")
488            .latest_version()
489            .active_memtable
490    }
491
492    fn tree_type(&self) -> crate::TreeType {
493        crate::TreeType::Standard
494    }
495
496    #[expect(clippy::significant_drop_tightening)]
497    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
498        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
499        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
500        let super_version = version_history_lock.latest_version();
501
502        if super_version.active_memtable.is_empty() {
503            return None;
504        }
505
506        let yanked_memtable = super_version.active_memtable;
507
508        let mut copy = version_history_lock.latest_version();
509        copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
510        copy.sealed_memtables =
511            Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
512
513        // Rotate does not modify the memtable so it cannot break snapshots
514        copy.seqno = super_version.seqno;
515
516        version_history_lock.replace_latest_version(copy);
517
518        log::trace!(
519            "rotate: added memtable id={} to sealed memtables",
520            yanked_memtable.id,
521        );
522
523        Some(yanked_memtable)
524    }
525
526    fn table_count(&self) -> usize {
527        self.current_version().table_count()
528    }
529
530    fn level_table_count(&self, idx: usize) -> Option<usize> {
531        self.current_version().level(idx).map(|x| x.table_count())
532    }
533
534    fn approximate_len(&self) -> usize {
535        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
536        let super_version = self
537            .version_history
538            .read()
539            .expect("lock is poisoned")
540            .latest_version();
541
542        let tables_item_count = self
543            .current_version()
544            .iter_tables()
545            .map(|x| x.metadata.item_count)
546            .sum::<u64>();
547
548        let memtable_count = super_version.active_memtable.len() as u64;
549        let sealed_count = super_version
550            .sealed_memtables
551            .iter()
552            .map(|mt| mt.len())
553            .sum::<usize>() as u64;
554
555        #[expect(clippy::expect_used, reason = "result should fit into usize")]
556        (memtable_count + sealed_count + tables_item_count)
557            .try_into()
558            .expect("approximate_len too large for usize")
559    }
560
561    fn disk_space(&self) -> u64 {
562        self.current_version()
563            .iter_levels()
564            .map(super::version::Level::size)
565            .sum()
566    }
567
568    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
569        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
570        let version = self
571            .version_history
572            .read()
573            .expect("lock is poisoned")
574            .latest_version();
575
576        let active = version.active_memtable.get_highest_seqno();
577
578        let sealed = version
579            .sealed_memtables
580            .iter()
581            .map(|mt| mt.get_highest_seqno())
582            .max()
583            .flatten();
584
585        active.max(sealed)
586    }
587
588    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
589        self.current_version()
590            .iter_tables()
591            .map(Table::get_highest_seqno)
592            .max()
593    }
594
595    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
596        Ok(self
597            .get_internal_entry(key.as_ref(), seqno)?
598            .map(|x| x.value))
599    }
600
601    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
602        &self,
603        key: K,
604        value: V,
605        seqno: SeqNo,
606    ) -> (u64, u64) {
607        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
608        self.append_entry(value)
609    }
610
611    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
612        let value = InternalValue::new_tombstone(key, seqno);
613        self.append_entry(value)
614    }
615
616    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
617        let value = InternalValue::new_weak_tombstone(key, seqno);
618        self.append_entry(value)
619    }
620}
621
622impl Tree {
623    #[doc(hidden)]
624    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
625        version: SuperVersion,
626        range: &'a R,
627        seqno: SeqNo,
628        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
629    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
630        use crate::range::{IterState, TreeIter};
631        use std::ops::Bound::{self, Excluded, Included, Unbounded};
632
633        let lo: Bound<UserKey> = match range.start_bound() {
634            Included(x) => Included(x.as_ref().into()),
635            Excluded(x) => Excluded(x.as_ref().into()),
636            Unbounded => Unbounded,
637        };
638
639        let hi: Bound<UserKey> = match range.end_bound() {
640            Included(x) => Included(x.as_ref().into()),
641            Excluded(x) => Excluded(x.as_ref().into()),
642            Unbounded => Unbounded,
643        };
644
645        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
646
647        let iter_state = { IterState { version, ephemeral } };
648
649        TreeIter::create_range(iter_state, bounds, seqno)
650    }
651
652    pub(crate) fn get_internal_entry_from_version(
653        super_version: &SuperVersion,
654        key: &[u8],
655        seqno: SeqNo,
656    ) -> crate::Result<Option<InternalValue>> {
657        if let Some(entry) = super_version.active_memtable.get(key, seqno) {
658            return Ok(ignore_tombstone_value(entry));
659        }
660
661        // Now look in sealed memtables
662        if let Some(entry) =
663            Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
664        {
665            return Ok(ignore_tombstone_value(entry));
666        }
667
668        // Now look in tables... this may involve disk I/O
669        Self::get_internal_entry_from_tables(&super_version.version, key, seqno)
670    }
671
672    fn get_internal_entry_from_tables(
673        version: &Version,
674        key: &[u8],
675        seqno: SeqNo,
676    ) -> crate::Result<Option<InternalValue>> {
677        // NOTE: Create key hash for hash sharing
678        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
679        let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
680
681        for table in version
682            .iter_levels()
683            .flat_map(|lvl| lvl.iter())
684            .filter_map(|run| run.get_for_key(key))
685        {
686            if let Some(item) = table.get(key, seqno, key_hash)? {
687                return Ok(ignore_tombstone_value(item));
688            }
689        }
690
691        Ok(None)
692    }
693
694    fn get_internal_entry_from_sealed_memtables(
695        super_version: &SuperVersion,
696        key: &[u8],
697        seqno: SeqNo,
698    ) -> Option<InternalValue> {
699        for mt in super_version.sealed_memtables.iter().rev() {
700            if let Some(entry) = mt.get(key, seqno) {
701                return Some(entry);
702            }
703        }
704
705        None
706    }
707
708    pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
709        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
710        self.version_history
711            .read()
712            .expect("lock is poisoned")
713            .get_version_for_snapshot(seqno)
714    }
715
716    /// Normalizes a user-provided range into owned `Bound<Slice>` values.
717    ///
718    /// Returns a tuple containing:
719    /// - the `OwnedBounds` that mirror the original bounds semantics (including
720    ///   inclusive/exclusive markers and unbounded endpoints), and
721    /// - a `bool` flag indicating whether the normalized range is logically
722    ///   empty (e.g., when the lower bound is greater than the upper bound).
723    ///
724    /// Callers can use the flag to detect empty ranges and skip further work
725    /// while still having access to the normalized bounds for non-empty cases.
726    fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
727        range: &R,
728    ) -> (OwnedBounds, bool) {
729        use Bound::{Excluded, Included, Unbounded};
730
731        let start = match range.start_bound() {
732            Included(key) => Included(Slice::from(key.as_ref())),
733            Excluded(key) => Excluded(Slice::from(key.as_ref())),
734            Unbounded => Unbounded,
735        };
736
737        let end = match range.end_bound() {
738            Included(key) => Included(Slice::from(key.as_ref())),
739            Excluded(key) => Excluded(Slice::from(key.as_ref())),
740            Unbounded => Unbounded,
741        };
742
743        let is_empty =
744            if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
745                lo.as_ref() > hi.as_ref()
746            } else {
747                false
748            };
749
750        (OwnedBounds { start, end }, is_empty)
751    }
752
753    /// Opens an LSM-tree in the given directory.
754    ///
755    /// Will recover previous state if the folder was previously
756    /// occupied by an LSM-tree, including the previous configuration.
757    /// If not, a new tree will be initialized with the given config.
758    ///
759    /// After recovering a previous state, use [`Tree::set_active_memtable`]
760    /// to fill the memtable with data from a write-ahead log for full durability.
761    ///
762    /// # Errors
763    ///
764    /// Returns error, if an IO error occurred.
765    pub(crate) fn open(config: Config) -> crate::Result<Self> {
766        log::debug!("Opening LSM-tree at {}", config.path.display());
767
768        // Check for old version
769        if config.path.join("version").try_exists()? {
770            log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
771            return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
772        }
773
774        let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
775            Self::recover(config)
776        } else {
777            Self::create_new(config)
778        }?;
779
780        Ok(tree)
781    }
782
783    /// Returns `true` if there are some tables that are being compacted.
784    #[doc(hidden)]
785    #[must_use]
786    pub fn is_compacting(&self) -> bool {
787        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
788        !self
789            .compaction_state
790            .lock()
791            .expect("lock is poisoned")
792            .hidden_set()
793            .is_empty()
794    }
795
796    fn inner_compact(
797        &self,
798        strategy: Arc<dyn CompactionStrategy>,
799        mvcc_gc_watermark: SeqNo,
800    ) -> crate::Result<()> {
801        use crate::compaction::worker::{do_compaction, Options};
802
803        let mut opts = Options::from_tree(self, strategy);
804        opts.mvcc_gc_watermark = mvcc_gc_watermark;
805
806        do_compaction(&opts)?;
807
808        log::debug!("Compaction run over");
809
810        Ok(())
811    }
812
813    #[doc(hidden)]
814    #[must_use]
815    pub fn create_iter(
816        &self,
817        seqno: SeqNo,
818        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
819    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
820        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
821    }
822
823    #[doc(hidden)]
824    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
825        &self,
826        range: &'a R,
827        seqno: SeqNo,
828        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
829    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
830        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
831        let super_version = self
832            .version_history
833            .read()
834            .expect("lock is poisoned")
835            .get_version_for_snapshot(seqno);
836
837        Self::create_internal_range(super_version, range, seqno, ephemeral).map(|item| match item {
838            Ok(kv) => Ok((kv.key.user_key, kv.value)),
839            Err(e) => Err(e),
840        })
841    }
842
843    #[doc(hidden)]
844    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
845        &self,
846        prefix: K,
847        seqno: SeqNo,
848        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
849    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
850        use crate::range::prefix_to_range;
851
852        let range = prefix_to_range(prefix.as_ref());
853        self.create_range(&range, seqno, ephemeral)
854    }
855
856    /// Adds an item to the active memtable.
857    ///
858    /// Returns the added item's size and new size of the memtable.
859    #[doc(hidden)]
860    #[must_use]
861    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
862        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
863        self.version_history
864            .read()
865            .expect("lock is poisoned")
866            .latest_version()
867            .active_memtable
868            .insert(value)
869    }
870
871    /// Recovers previous state, by loading the level manifest, tables and blob files.
872    ///
873    /// # Errors
874    ///
875    /// Returns error, if an IO error occurred.
876    fn recover(mut config: Config) -> crate::Result<Self> {
877        use crate::stop_signal::StopSignal;
878        use inner::get_next_tree_id;
879
880        log::info!("Recovering LSM-tree at {}", config.path.display());
881
882        // let manifest = {
883        //     let manifest_path = config.path.join(MANIFEST_FILE);
884        //     let reader = sfa::Reader::new(&manifest_path)?;
885        //     Manifest::decode_from(&manifest_path, &reader)?
886        // };
887
888        // if manifest.version != FormatVersion::V3 {
889        //     return Err(crate::Error::InvalidVersion(manifest.version.into()));
890        // }
891
892        let tree_id = get_next_tree_id();
893
894        #[cfg(feature = "metrics")]
895        let metrics = Arc::new(Metrics::default());
896
897        let version = Self::recover_levels(
898            &config.path,
899            tree_id,
900            &config,
901            #[cfg(feature = "metrics")]
902            &metrics,
903        )?;
904
905        {
906            let manifest_path = config.path.join(format!("v{}", version.id()));
907            let reader = sfa::Reader::new(&manifest_path)?;
908            let manifest = Manifest::decode_from(&manifest_path, &reader)?;
909
910            if manifest.version != FormatVersion::V3 {
911                return Err(crate::Error::InvalidVersion(manifest.version.into()));
912            }
913
914            let requested_tree_type = match config.kv_separation_opts {
915                Some(_) => crate::TreeType::Blob,
916                None => crate::TreeType::Standard,
917            };
918
919            if version.tree_type() != requested_tree_type {
920                log::error!(
921                    "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
922                    version.tree_type(),
923                );
924                return Err(crate::Error::Unrecoverable);
925            }
926
927            // IMPORTANT: Restore persisted config
928            config.level_count = manifest.level_count;
929        }
930
931        let highest_table_id = version
932            .iter_tables()
933            .map(Table::id)
934            .max()
935            .unwrap_or_default();
936
937        let inner = TreeInner {
938            id: tree_id,
939            memtable_id_counter: SequenceNumberCounter::default(),
940            table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
941            blob_file_id_counter: SequenceNumberCounter::default(),
942            version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
943            stop_signal: StopSignal::default(),
944            config: Arc::new(config),
945            major_compaction_lock: RwLock::default(),
946            flush_lock: Mutex::default(),
947            compaction_state: Arc::new(Mutex::new(CompactionState::default())),
948
949            #[cfg(feature = "metrics")]
950            metrics,
951        };
952
953        Ok(Self(Arc::new(inner)))
954    }
955
956    /// Creates a new LSM-tree in a directory.
957    fn create_new(config: Config) -> crate::Result<Self> {
958        use crate::file::{fsync_directory, TABLES_FOLDER};
959        use std::fs::create_dir_all;
960
961        let path = config.path.clone();
962        log::trace!("Creating LSM-tree at {}", path.display());
963
964        create_dir_all(&path)?;
965
966        let table_folder_path = path.join(TABLES_FOLDER);
967        create_dir_all(&table_folder_path)?;
968
969        // IMPORTANT: fsync folders on Unix
970        fsync_directory(&table_folder_path)?;
971        fsync_directory(&path)?;
972
973        let inner = TreeInner::create_new(config)?;
974        Ok(Self(Arc::new(inner)))
975    }
976
977    /// Recovers the level manifest, loading all tables from disk.
978    fn recover_levels<P: AsRef<Path>>(
979        tree_path: P,
980        tree_id: TreeId,
981        config: &Config,
982        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
983    ) -> crate::Result<Version> {
984        use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
985
986        let tree_path = tree_path.as_ref();
987
988        let recovery = recover(tree_path)?;
989
990        let table_map = {
991            let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum, SeqNo)> =
992                crate::HashMap::default();
993
994            for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
995                for run in table_ids {
996                    for table in run {
997                        #[expect(
998                            clippy::expect_used,
999                            reason = "there are always less than 256 levels"
1000                        )]
1001                        result.insert(
1002                            table.id,
1003                            (
1004                                level_idx
1005                                    .try_into()
1006                                    .expect("there are less than 256 levels"),
1007                                table.checksum,
1008                                table.global_seqno,
1009                            ),
1010                        );
1011                    }
1012                }
1013            }
1014
1015            result
1016        };
1017
1018        let cnt = table_map.len();
1019
1020        log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1021
1022        let progress_mod = match cnt {
1023            _ if cnt <= 20 => 1,
1024            _ if cnt <= 100 => 10,
1025            _ => 100,
1026        };
1027
1028        let mut tables = vec![];
1029
1030        let table_base_folder = tree_path.join(TABLES_FOLDER);
1031
1032        if !table_base_folder.try_exists()? {
1033            std::fs::create_dir_all(&table_base_folder)?;
1034            fsync_directory(&table_base_folder)?;
1035        }
1036
1037        let mut orphaned_tables = vec![];
1038
1039        for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1040            let dirent = dirent?;
1041            let file_name = dirent.file_name();
1042
1043            // https://en.wikipedia.org/wiki/.DS_Store
1044            if file_name == ".DS_Store" {
1045                continue;
1046            }
1047
1048            // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
1049            if file_name.to_string_lossy().starts_with("._") {
1050                continue;
1051            }
1052
1053            let table_file_name = file_name.to_str().ok_or_else(|| {
1054                log::error!("invalid table file name {}", file_name.display());
1055                crate::Error::Unrecoverable
1056            })?;
1057
1058            let table_file_path = dirent.path();
1059            assert!(!table_file_path.is_dir());
1060
1061            let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1062                log::error!("invalid table file name {table_file_name:?}: {e:?}");
1063                crate::Error::Unrecoverable
1064            })?;
1065
1066            if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) {
1067                let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
1068                let pin_index = config.index_block_pinning_policy.get(level_idx.into());
1069
1070                let table = Table::recover(
1071                    table_file_path,
1072                    checksum,
1073                    global_seqno,
1074                    tree_id,
1075                    config.cache.clone(),
1076                    config.descriptor_table.clone(),
1077                    pin_filter,
1078                    pin_index,
1079                    #[cfg(feature = "metrics")]
1080                    metrics.clone(),
1081                )?;
1082
1083                tables.push(table);
1084
1085                if idx % progress_mod == 0 {
1086                    log::debug!("Recovered {idx}/{cnt} tables");
1087                }
1088            } else {
1089                orphaned_tables.push(table_file_path);
1090            }
1091        }
1092
1093        if tables.len() < cnt {
1094            log::error!(
1095                "Recovered less tables than expected: {:?}",
1096                table_map.keys(),
1097            );
1098            return Err(crate::Error::Unrecoverable);
1099        }
1100
1101        log::debug!("Successfully recovered {} tables", tables.len());
1102
1103        let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1104            &tree_path.join(crate::file::BLOBS_FOLDER),
1105            &recovery.blob_file_ids,
1106        )?;
1107
1108        let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1109
1110        // NOTE: Cleanup old versions
1111        // But only after we definitely recovered the latest version
1112        Self::cleanup_orphaned_version(tree_path, version.id())?;
1113
1114        for table_path in orphaned_tables {
1115            log::debug!("Deleting orphaned table {}", table_path.display());
1116            std::fs::remove_file(&table_path)?;
1117        }
1118
1119        for blob_file_path in orphaned_blob_files {
1120            log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1121            std::fs::remove_file(&blob_file_path)?;
1122        }
1123
1124        Ok(version)
1125    }
1126
1127    fn cleanup_orphaned_version(
1128        path: &Path,
1129        latest_version_id: crate::version::VersionId,
1130    ) -> crate::Result<()> {
1131        let version_str = format!("v{latest_version_id}");
1132
1133        for file in std::fs::read_dir(path)? {
1134            let dirent = file?;
1135
1136            if dirent.file_type()?.is_dir() {
1137                continue;
1138            }
1139
1140            let name = dirent.file_name();
1141
1142            if name.to_string_lossy().starts_with('v') && *name != *version_str {
1143                log::trace!("Cleanup orphaned version {}", name.display());
1144                std::fs::remove_file(dirent.path())?;
1145            }
1146        }
1147
1148        Ok(())
1149    }
1150}