Skip to main content

lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10    compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
11    config::Config,
12    file::CURRENT_VERSION_FILE,
13    format_version::FormatVersion,
14    iter_guard::{IterGuard, IterGuardImpl},
15    key::InternalKey,
16    manifest::Manifest,
17    memtable::Memtable,
18    slice::Slice,
19    table::Table,
20    value::InternalValue,
21    version::{recovery::recover, SuperVersion, SuperVersions, Version},
22    vlog::BlobFile,
23    AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
24    ValueType,
25};
26use inner::{TreeId, TreeInner};
27use std::{
28    ops::{Bound, RangeBounds},
29    path::Path,
30    sync::{Arc, Mutex, RwLock},
31};
32
33#[cfg(feature = "metrics")]
34use crate::metrics::Metrics;
35
36/// Iterator value guard
37pub struct Guard(crate::Result<(UserKey, UserValue)>);
38
39impl IterGuard for Guard {
40    fn into_inner_if(
41        self,
42        pred: impl Fn(&UserKey) -> bool,
43    ) -> crate::Result<(UserKey, Option<UserValue>)> {
44        let (k, v) = self.0?;
45
46        if pred(&k) {
47            Ok((k, Some(v)))
48        } else {
49            Ok((k, None))
50        }
51    }
52
53    fn key(self) -> crate::Result<UserKey> {
54        self.0.map(|(k, _)| k)
55    }
56
57    fn size(self) -> crate::Result<u32> {
58        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
59        self.into_inner().map(|(_, v)| v.len() as u32)
60    }
61
62    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
63        self.0
64    }
65}
66
67fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
68    if item.is_tombstone() {
69        None
70    } else {
71        Some(item)
72    }
73}
74
75/// A log-structured merge tree (LSM-tree/LSMT)
76#[derive(Clone)]
77pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
78
79impl std::ops::Deref for Tree {
80    type Target = TreeInner;
81
82    fn deref(&self) -> &Self::Target {
83        &self.0
84    }
85}
86
87impl AbstractTree for Tree {
88    fn table_file_cache_size(&self) -> usize {
89        self.config
90            .descriptor_table
91            .as_ref()
92            .map_or(0, |dt| dt.len())
93    }
94
95    fn get_version_history_lock(
96        &self,
97    ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
98        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
99        self.version_history.write().expect("lock is poisoned")
100    }
101
102    fn next_table_id(&self) -> TableId {
103        self.0.table_id_counter.get()
104    }
105
106    fn id(&self) -> TreeId {
107        self.id
108    }
109
110    fn blob_file_count(&self) -> usize {
111        0
112    }
113
114    fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
115        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
116        let super_version = self
117            .version_history
118            .read()
119            .expect("lock is poisoned")
120            .latest_version();
121
122        let key = Slice::from(key);
123
124        for kv in super_version
125            .active_memtable
126            .range(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)..)
127        {
128            log::info!("[Active] {kv:?}");
129        }
130
131        for mt in super_version.sealed_memtables.iter().rev() {
132            for kv in mt.range(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)..) {
133                log::info!("[Sealed #{}] {kv:?}", mt.id());
134            }
135        }
136
137        for table in super_version
138            .version
139            .iter_levels()
140            .flat_map(|lvl| lvl.iter())
141            .filter_map(|run| run.get_for_key(&key))
142        {
143            for kv in table.range(..) {
144                let kv = kv?;
145
146                if kv.key.user_key != key {
147                    break;
148                }
149
150                log::info!("[Table #{}] {kv:?}", table.id());
151            }
152        }
153
154        Ok(())
155    }
156
157    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
158        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
159        let super_version = self
160            .version_history
161            .read()
162            .expect("lock is poisoned")
163            .get_version_for_snapshot(seqno);
164
165        Self::get_internal_entry_from_version(&super_version, key, seqno)
166    }
167
168    fn current_version(&self) -> Version {
169        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
170        self.version_history
171            .read()
172            .expect("poisoned")
173            .latest_version()
174            .version
175    }
176
177    fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
178        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
179        self.flush_lock.lock().expect("lock is poisoned")
180    }
181
182    #[cfg(feature = "metrics")]
183    fn metrics(&self) -> &Arc<crate::Metrics> {
184        &self.0.metrics
185    }
186
187    fn version_free_list_len(&self) -> usize {
188        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
189        self.version_history
190            .read()
191            .expect("lock is poisoned")
192            .free_list_len()
193    }
194
195    fn prefix<K: AsRef<[u8]>>(
196        &self,
197        prefix: K,
198        seqno: SeqNo,
199        index: Option<(Arc<Memtable>, SeqNo)>,
200    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
201        Box::new(
202            self.create_prefix(&prefix, seqno, index)
203                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
204        )
205    }
206
207    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
208        &self,
209        range: R,
210        seqno: SeqNo,
211        index: Option<(Arc<Memtable>, SeqNo)>,
212    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
213        Box::new(
214            self.create_range(&range, seqno, index)
215                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
216        )
217    }
218
219    /// Returns the number of tombstones in the tree.
220    fn tombstone_count(&self) -> u64 {
221        self.current_version()
222            .iter_tables()
223            .map(Table::tombstone_count)
224            .sum()
225    }
226
227    /// Returns the number of weak tombstones (single deletes) in the tree.
228    fn weak_tombstone_count(&self) -> u64 {
229        self.current_version()
230            .iter_tables()
231            .map(Table::weak_tombstone_count)
232            .sum()
233    }
234
235    /// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
236    fn weak_tombstone_reclaimable_count(&self) -> u64 {
237        self.current_version()
238            .iter_tables()
239            .map(Table::weak_tombstone_reclaimable)
240            .sum()
241    }
242
243    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
244        let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
245
246        if is_empty {
247            return Ok(());
248        }
249
250        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
251
252        // IMPORTANT: Write lock so we can be the only compaction going on
253        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
254        let _lock = self
255            .0
256            .major_compaction_lock
257            .write()
258            .expect("lock is poisoned");
259
260        log::info!("Starting drop_range compaction");
261        self.inner_compact(strategy, 0)
262    }
263
264    fn clear(&self) -> crate::Result<()> {
265        let mut versions = self.get_version_history_lock();
266
267        versions.upgrade_version(
268            &self.config.path,
269            |v| {
270                let mut copy = v.clone();
271                copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
272                copy.sealed_memtables = Arc::default();
273                copy.version = Version::new(v.version.id() + 1, self.tree_type());
274                Ok(copy)
275            },
276            &self.config.seqno,
277            &self.config.visible_seqno,
278        )
279    }
280
281    #[doc(hidden)]
282    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
283        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
284
285        // IMPORTANT: Write lock so we can be the only compaction going on
286        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
287        let _lock = self
288            .0
289            .major_compaction_lock
290            .write()
291            .expect("lock is poisoned");
292
293        log::info!("Starting major compaction");
294        self.inner_compact(strategy, seqno_threshold)
295    }
296
297    fn l0_run_count(&self) -> usize {
298        self.current_version()
299            .level(0)
300            .map(|x| x.run_count())
301            .unwrap_or_default()
302    }
303
304    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
305        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
306        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
307    }
308
309    fn filter_size(&self) -> u64 {
310        self.current_version()
311            .iter_tables()
312            .map(Table::filter_size)
313            .map(u64::from)
314            .sum()
315    }
316
317    fn pinned_filter_size(&self) -> usize {
318        self.current_version()
319            .iter_tables()
320            .map(Table::pinned_filter_size)
321            .sum()
322    }
323
324    fn pinned_block_index_size(&self) -> usize {
325        self.current_version()
326            .iter_tables()
327            .map(Table::pinned_block_index_size)
328            .sum()
329    }
330
331    fn sealed_memtable_count(&self) -> usize {
332        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
333        self.version_history
334            .read()
335            .expect("lock is poisoned")
336            .latest_version()
337            .sealed_memtables
338            .len()
339    }
340
341    fn flush_to_tables(
342        &self,
343        stream: impl Iterator<Item = crate::Result<InternalValue>>,
344    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
345        use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
346        use std::time::Instant;
347
348        let start = Instant::now();
349
350        let folder = self.config.path.join(TABLES_FOLDER);
351
352        let data_block_size = self.config.data_block_size_policy.get(0);
353
354        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
355        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
356
357        let data_block_compression = self.config.data_block_compression_policy.get(0);
358        let index_block_compression = self.config.index_block_compression_policy.get(0);
359
360        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
361
362        let index_partitioning = self.config.index_block_partitioning_policy.get(0);
363        let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
364
365        log::debug!(
366            "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}",
367            folder.display(),
368        );
369
370        let mut table_writer = MultiWriter::new(
371            folder.clone(),
372            self.table_id_counter.clone(),
373            64 * 1_024 * 1_024,
374            0,
375        )?
376        .use_data_block_restart_interval(data_block_restart_interval)
377        .use_index_block_restart_interval(index_block_restart_interval)
378        .use_data_block_compression(data_block_compression)
379        .use_index_block_compression(index_block_compression)
380        .use_data_block_size(data_block_size)
381        .use_data_block_hash_ratio(data_block_hash_ratio)
382        .use_bloom_policy({
383            use crate::config::FilterPolicyEntry::{Bloom, None};
384            use crate::table::filter::BloomConstructionPolicy;
385
386            match self.config.filter_policy.get(0) {
387                Bloom(policy) => policy,
388                None => BloomConstructionPolicy::BitsPerKey(0.0),
389            }
390        });
391
392        if index_partitioning {
393            table_writer = table_writer.use_partitioned_index();
394        }
395        if filter_partitioning {
396            table_writer = table_writer.use_partitioned_filter();
397        }
398
399        for item in stream {
400            table_writer.write(item?)?;
401        }
402
403        let result = table_writer.finish()?;
404
405        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
406
407        let pin_filter = self.config.filter_block_pinning_policy.get(0);
408        let pin_index = self.config.index_block_pinning_policy.get(0);
409
410        // Load tables
411        let tables = result
412            .into_iter()
413            .map(|(table_id, checksum)| -> crate::Result<Table> {
414                Table::recover(
415                    folder.join(table_id.to_string()),
416                    checksum,
417                    0,
418                    self.id,
419                    self.config.cache.clone(),
420                    self.config.descriptor_table.clone(),
421                    pin_filter,
422                    pin_index,
423                    #[cfg(feature = "metrics")]
424                    self.metrics.clone(),
425                )
426            })
427            .collect::<crate::Result<Vec<_>>>()?;
428
429        Ok(Some((tables, None)))
430    }
431
432    #[expect(clippy::significant_drop_tightening)]
433    fn register_tables(
434        &self,
435        tables: &[Table],
436        blob_files: Option<&[BlobFile]>,
437        frag_map: Option<crate::blob_tree::FragmentationMap>,
438        sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
439        gc_watermark: SeqNo,
440    ) -> crate::Result<()> {
441        log::trace!(
442            "Registering {} tables, {} blob files",
443            tables.len(),
444            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
445        );
446
447        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
448        let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
449        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
450        let mut version_lock = self.version_history.write().expect("lock is poisoned");
451
452        version_lock.upgrade_version(
453            &self.config.path,
454            |current| {
455                let mut copy = current.clone();
456
457                copy.version = copy.version.with_new_l0_run(
458                    tables,
459                    blob_files,
460                    frag_map.filter(|x| !x.is_empty()),
461                );
462
463                for &table_id in sealed_memtables_to_delete {
464                    log::trace!("releasing sealed memtable #{table_id}");
465                    copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
466                }
467
468                Ok(copy)
469            },
470            &self.config.seqno,
471            &self.config.visible_seqno,
472        )?;
473
474        if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
475            log::warn!("Version GC failed: {e:?}");
476        }
477
478        Ok(())
479    }
480
481    fn clear_active_memtable(&self) {
482        use crate::tree::sealed::SealedMemtables;
483
484        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
485        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
486        let super_version = version_history_lock.latest_version();
487
488        if super_version.active_memtable.is_empty() {
489            return;
490        }
491
492        let mut copy = version_history_lock.latest_version();
493        copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
494        copy.sealed_memtables = Arc::new(SealedMemtables::default());
495
496        // Rotate does not modify the memtable, so it cannot break snapshots
497        copy.seqno = super_version.seqno;
498
499        version_history_lock.replace_latest_version(copy);
500
501        log::trace!("cleared active memtable");
502    }
503
504    fn compact(
505        &self,
506        strategy: Arc<dyn CompactionStrategy>,
507        seqno_threshold: SeqNo,
508    ) -> crate::Result<()> {
509        // NOTE: Read lock major compaction lock
510        // That way, if a major compaction is running, we cannot proceed
511        // But in general, parallel (non-major) compactions can occur
512        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
513        let _lock = self
514            .0
515            .major_compaction_lock
516            .read()
517            .expect("lock is poisoned");
518
519        self.inner_compact(strategy, seqno_threshold)
520    }
521
522    fn get_next_table_id(&self) -> TableId {
523        self.0.get_next_table_id()
524    }
525
526    fn tree_config(&self) -> &Config {
527        &self.config
528    }
529
530    fn active_memtable(&self) -> Arc<Memtable> {
531        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
532        self.version_history
533            .read()
534            .expect("lock is poisoned")
535            .latest_version()
536            .active_memtable
537    }
538
539    fn tree_type(&self) -> crate::TreeType {
540        crate::TreeType::Standard
541    }
542
543    #[expect(clippy::significant_drop_tightening)]
544    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
545        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
546        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
547        let super_version = version_history_lock.latest_version();
548
549        if super_version.active_memtable.is_empty() {
550            return None;
551        }
552
553        let yanked_memtable = super_version.active_memtable;
554
555        let mut copy = version_history_lock.latest_version();
556        copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
557        copy.sealed_memtables =
558            Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
559
560        // Rotate does not modify the memtable so it cannot break snapshots
561        copy.seqno = super_version.seqno;
562
563        version_history_lock.replace_latest_version(copy);
564
565        log::trace!(
566            "rotate: added memtable id={} to sealed memtables",
567            yanked_memtable.id,
568        );
569
570        Some(yanked_memtable)
571    }
572
573    fn table_count(&self) -> usize {
574        self.current_version().table_count()
575    }
576
577    fn level_table_count(&self, idx: usize) -> Option<usize> {
578        self.current_version().level(idx).map(|x| x.table_count())
579    }
580
581    fn approximate_len(&self) -> usize {
582        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
583        let super_version = self
584            .version_history
585            .read()
586            .expect("lock is poisoned")
587            .latest_version();
588
589        let tables_item_count = self
590            .current_version()
591            .iter_tables()
592            .map(|x| x.metadata.item_count)
593            .sum::<u64>();
594
595        let memtable_count = super_version.active_memtable.len() as u64;
596        let sealed_count = super_version
597            .sealed_memtables
598            .iter()
599            .map(|mt| mt.len())
600            .sum::<usize>() as u64;
601
602        #[expect(clippy::expect_used, reason = "result should fit into usize")]
603        (memtable_count + sealed_count + tables_item_count)
604            .try_into()
605            .expect("approximate_len too large for usize")
606    }
607
608    fn disk_space(&self) -> u64 {
609        self.current_version()
610            .iter_levels()
611            .map(super::version::Level::size)
612            .sum()
613    }
614
615    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
616        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
617        let version = self
618            .version_history
619            .read()
620            .expect("lock is poisoned")
621            .latest_version();
622
623        let active = version.active_memtable.get_highest_seqno();
624
625        let sealed = version
626            .sealed_memtables
627            .iter()
628            .map(|mt| mt.get_highest_seqno())
629            .max()
630            .flatten();
631
632        active.max(sealed)
633    }
634
635    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
636        self.current_version()
637            .iter_tables()
638            .map(Table::get_highest_seqno)
639            .max()
640    }
641
642    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
643        Ok(self
644            .get_internal_entry(key.as_ref(), seqno)?
645            .map(|x| x.value))
646    }
647
648    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
649        &self,
650        key: K,
651        value: V,
652        seqno: SeqNo,
653    ) -> (u64, u64) {
654        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
655        self.append_entry(value)
656    }
657
658    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
659        let value = InternalValue::new_tombstone(key, seqno);
660        self.append_entry(value)
661    }
662
663    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
664        let value = InternalValue::new_weak_tombstone(key, seqno);
665        self.append_entry(value)
666    }
667}
668
669impl Tree {
670    #[doc(hidden)]
671    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
672        version: SuperVersion,
673        range: &'a R,
674        seqno: SeqNo,
675        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
676    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
677        use crate::range::{IterState, TreeIter};
678        use std::ops::Bound::{self, Excluded, Included, Unbounded};
679
680        let lo: Bound<UserKey> = match range.start_bound() {
681            Included(x) => Included(x.as_ref().into()),
682            Excluded(x) => Excluded(x.as_ref().into()),
683            Unbounded => Unbounded,
684        };
685
686        let hi: Bound<UserKey> = match range.end_bound() {
687            Included(x) => Included(x.as_ref().into()),
688            Excluded(x) => Excluded(x.as_ref().into()),
689            Unbounded => Unbounded,
690        };
691
692        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
693
694        let iter_state = { IterState { version, ephemeral } };
695
696        TreeIter::create_range(iter_state, bounds, seqno)
697    }
698
699    pub(crate) fn get_internal_entry_from_version(
700        super_version: &SuperVersion,
701        key: &[u8],
702        seqno: SeqNo,
703    ) -> crate::Result<Option<InternalValue>> {
704        if let Some(entry) = super_version.active_memtable.get(key, seqno) {
705            return Ok(ignore_tombstone_value(entry));
706        }
707
708        // Now look in sealed memtables
709        if let Some(entry) =
710            Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
711        {
712            return Ok(ignore_tombstone_value(entry));
713        }
714
715        // Now look in tables... this may involve disk I/O
716        Self::get_internal_entry_from_tables(&super_version.version, key, seqno)
717    }
718
719    fn get_internal_entry_from_tables(
720        version: &Version,
721        key: &[u8],
722        seqno: SeqNo,
723    ) -> crate::Result<Option<InternalValue>> {
724        // NOTE: Create key hash for hash sharing
725        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
726        let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
727
728        for table in version
729            .iter_levels()
730            .flat_map(|lvl| lvl.iter())
731            .filter_map(|run| run.get_for_key(key))
732        {
733            if let Some(item) = table.get(key, seqno, key_hash)? {
734                return Ok(ignore_tombstone_value(item));
735            }
736        }
737
738        Ok(None)
739    }
740
741    fn get_internal_entry_from_sealed_memtables(
742        super_version: &SuperVersion,
743        key: &[u8],
744        seqno: SeqNo,
745    ) -> Option<InternalValue> {
746        for mt in super_version.sealed_memtables.iter().rev() {
747            if let Some(entry) = mt.get(key, seqno) {
748                return Some(entry);
749            }
750        }
751
752        None
753    }
754
755    pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
756        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
757        self.version_history
758            .read()
759            .expect("lock is poisoned")
760            .get_version_for_snapshot(seqno)
761    }
762
763    /// Normalizes a user-provided range into owned `Bound<Slice>` values.
764    ///
765    /// Returns a tuple containing:
766    /// - the `OwnedBounds` that mirror the original bounds semantics (including
767    ///   inclusive/exclusive markers and unbounded endpoints), and
768    /// - a `bool` flag indicating whether the normalized range is logically
769    ///   empty (e.g., when the lower bound is greater than the upper bound).
770    ///
771    /// Callers can use the flag to detect empty ranges and skip further work
772    /// while still having access to the normalized bounds for non-empty cases.
773    fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
774        range: &R,
775    ) -> (OwnedBounds, bool) {
776        use Bound::{Excluded, Included, Unbounded};
777
778        let start = match range.start_bound() {
779            Included(key) => Included(Slice::from(key.as_ref())),
780            Excluded(key) => Excluded(Slice::from(key.as_ref())),
781            Unbounded => Unbounded,
782        };
783
784        let end = match range.end_bound() {
785            Included(key) => Included(Slice::from(key.as_ref())),
786            Excluded(key) => Excluded(Slice::from(key.as_ref())),
787            Unbounded => Unbounded,
788        };
789
790        let is_empty =
791            if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
792                lo.as_ref() > hi.as_ref()
793            } else {
794                false
795            };
796
797        (OwnedBounds { start, end }, is_empty)
798    }
799
800    /// Opens an LSM-tree in the given directory.
801    ///
802    /// Will recover previous state if the folder was previously
803    /// occupied by an LSM-tree, including the previous configuration.
804    /// If not, a new tree will be initialized with the given config.
805    ///
806    /// After recovering a previous state, use [`Tree::set_active_memtable`]
807    /// to fill the memtable with data from a write-ahead log for full durability.
808    ///
809    /// # Errors
810    ///
811    /// Returns error, if an IO error occurred.
812    pub(crate) fn open(config: Config) -> crate::Result<Self> {
813        log::debug!("Opening LSM-tree at {}", config.path.display());
814
815        // Check for old version
816        if config.path.join("version").try_exists()? {
817            log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
818            return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
819        }
820
821        let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
822            Self::recover(config)
823        } else {
824            Self::create_new(config)
825        }?;
826
827        Ok(tree)
828    }
829
830    /// Returns `true` if there are some tables that are being compacted.
831    #[doc(hidden)]
832    #[must_use]
833    pub fn is_compacting(&self) -> bool {
834        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
835        !self
836            .compaction_state
837            .lock()
838            .expect("lock is poisoned")
839            .hidden_set()
840            .is_empty()
841    }
842
843    fn inner_compact(
844        &self,
845        strategy: Arc<dyn CompactionStrategy>,
846        mvcc_gc_watermark: SeqNo,
847    ) -> crate::Result<()> {
848        use crate::compaction::worker::{do_compaction, Options};
849
850        let mut opts = Options::from_tree(self, strategy);
851        opts.mvcc_gc_watermark = mvcc_gc_watermark;
852
853        do_compaction(&opts)?;
854
855        log::debug!("Compaction run over");
856
857        Ok(())
858    }
859
860    #[doc(hidden)]
861    #[must_use]
862    pub fn create_iter(
863        &self,
864        seqno: SeqNo,
865        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
866    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
867        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
868    }
869
870    #[doc(hidden)]
871    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
872        &self,
873        range: &'a R,
874        seqno: SeqNo,
875        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
876    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
877        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
878        let super_version = self
879            .version_history
880            .read()
881            .expect("lock is poisoned")
882            .get_version_for_snapshot(seqno);
883
884        Self::create_internal_range(super_version, range, seqno, ephemeral).map(|item| match item {
885            Ok(kv) => Ok((kv.key.user_key, kv.value)),
886            Err(e) => Err(e),
887        })
888    }
889
890    #[doc(hidden)]
891    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
892        &self,
893        prefix: K,
894        seqno: SeqNo,
895        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
896    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
897        use crate::range::prefix_to_range;
898
899        let range = prefix_to_range(prefix.as_ref());
900        self.create_range(&range, seqno, ephemeral)
901    }
902
903    /// Adds an item to the active memtable.
904    ///
905    /// Returns the added item's size and new size of the memtable.
906    #[doc(hidden)]
907    #[must_use]
908    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
909        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
910        self.version_history
911            .read()
912            .expect("lock is poisoned")
913            .latest_version()
914            .active_memtable
915            .insert(value)
916    }
917
918    /// Recovers previous state, by loading the level manifest, tables and blob files.
919    ///
920    /// # Errors
921    ///
922    /// Returns error, if an IO error occurred.
923    fn recover(mut config: Config) -> crate::Result<Self> {
924        use crate::stop_signal::StopSignal;
925        use inner::get_next_tree_id;
926
927        log::info!("Recovering LSM-tree at {}", config.path.display());
928
929        // let manifest = {
930        //     let manifest_path = config.path.join(MANIFEST_FILE);
931        //     let reader = sfa::Reader::new(&manifest_path)?;
932        //     Manifest::decode_from(&manifest_path, &reader)?
933        // };
934
935        // if manifest.version != FormatVersion::V3 {
936        //     return Err(crate::Error::InvalidVersion(manifest.version.into()));
937        // }
938
939        let tree_id = get_next_tree_id();
940
941        #[cfg(feature = "metrics")]
942        let metrics = Arc::new(Metrics::default());
943
944        let version = Self::recover_levels(
945            &config.path,
946            tree_id,
947            &config,
948            #[cfg(feature = "metrics")]
949            &metrics,
950        )?;
951
952        {
953            let manifest_path = config.path.join(format!("v{}", version.id()));
954            let reader = sfa::Reader::new(&manifest_path)?;
955            let manifest = Manifest::decode_from(&manifest_path, &reader)?;
956
957            if manifest.version != FormatVersion::V3 {
958                return Err(crate::Error::InvalidVersion(manifest.version.into()));
959            }
960
961            let requested_tree_type = match config.kv_separation_opts {
962                Some(_) => crate::TreeType::Blob,
963                None => crate::TreeType::Standard,
964            };
965
966            if version.tree_type() != requested_tree_type {
967                log::error!(
968                    "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
969                    version.tree_type(),
970                );
971                return Err(crate::Error::Unrecoverable);
972            }
973
974            // IMPORTANT: Restore persisted config
975            config.level_count = manifest.level_count;
976        }
977
978        let highest_table_id = version
979            .iter_tables()
980            .map(Table::id)
981            .max()
982            .unwrap_or_default();
983
984        let inner = TreeInner {
985            id: tree_id,
986            memtable_id_counter: SequenceNumberCounter::default(),
987            table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
988            blob_file_id_counter: SequenceNumberCounter::default(),
989            version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
990            stop_signal: StopSignal::default(),
991            config: Arc::new(config),
992            major_compaction_lock: RwLock::default(),
993            flush_lock: Mutex::default(),
994            compaction_state: Arc::new(Mutex::new(CompactionState::default())),
995
996            #[cfg(feature = "metrics")]
997            metrics,
998        };
999
1000        Ok(Self(Arc::new(inner)))
1001    }
1002
1003    /// Creates a new LSM-tree in a directory.
1004    fn create_new(config: Config) -> crate::Result<Self> {
1005        use crate::file::{fsync_directory, TABLES_FOLDER};
1006        use std::fs::create_dir_all;
1007
1008        let path = config.path.clone();
1009        log::trace!("Creating LSM-tree at {}", path.display());
1010
1011        create_dir_all(&path)?;
1012
1013        let table_folder_path = path.join(TABLES_FOLDER);
1014        create_dir_all(&table_folder_path)?;
1015
1016        // IMPORTANT: fsync folders on Unix
1017        fsync_directory(&table_folder_path)?;
1018        fsync_directory(&path)?;
1019
1020        let inner = TreeInner::create_new(config)?;
1021        Ok(Self(Arc::new(inner)))
1022    }
1023
1024    /// Recovers the level manifest, loading all tables from disk.
1025    fn recover_levels<P: AsRef<Path>>(
1026        tree_path: P,
1027        tree_id: TreeId,
1028        config: &Config,
1029        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
1030    ) -> crate::Result<Version> {
1031        use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
1032
1033        let tree_path = tree_path.as_ref();
1034
1035        let recovery = recover(tree_path)?;
1036
1037        let table_map = {
1038            let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum, SeqNo)> =
1039                crate::HashMap::default();
1040
1041            for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1042                for run in table_ids {
1043                    for table in run {
1044                        #[expect(
1045                            clippy::expect_used,
1046                            reason = "there are always less than 256 levels"
1047                        )]
1048                        result.insert(
1049                            table.id,
1050                            (
1051                                level_idx
1052                                    .try_into()
1053                                    .expect("there are less than 256 levels"),
1054                                table.checksum,
1055                                table.global_seqno,
1056                            ),
1057                        );
1058                    }
1059                }
1060            }
1061
1062            result
1063        };
1064
1065        let cnt = table_map.len();
1066
1067        log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1068
1069        let progress_mod = match cnt {
1070            _ if cnt <= 20 => 1,
1071            _ if cnt <= 100 => 10,
1072            _ => 100,
1073        };
1074
1075        let mut tables = vec![];
1076
1077        let table_base_folder = tree_path.join(TABLES_FOLDER);
1078
1079        if !table_base_folder.try_exists()? {
1080            std::fs::create_dir_all(&table_base_folder)?;
1081            fsync_directory(&table_base_folder)?;
1082        }
1083
1084        let mut orphaned_tables = vec![];
1085
1086        for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1087            let dirent = dirent?;
1088            let file_name = dirent.file_name();
1089
1090            // https://en.wikipedia.org/wiki/.DS_Store
1091            if file_name == ".DS_Store" {
1092                continue;
1093            }
1094
1095            // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
1096            if file_name.to_string_lossy().starts_with("._") {
1097                continue;
1098            }
1099
1100            let table_file_name = file_name.to_str().ok_or_else(|| {
1101                log::error!("invalid table file name {}", file_name.display());
1102                crate::Error::Unrecoverable
1103            })?;
1104
1105            let table_file_path = dirent.path();
1106            assert!(!table_file_path.is_dir());
1107
1108            let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1109                log::error!("invalid table file name {table_file_name:?}: {e:?}");
1110                crate::Error::Unrecoverable
1111            })?;
1112
1113            if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) {
1114                let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
1115                let pin_index = config.index_block_pinning_policy.get(level_idx.into());
1116
1117                let table = Table::recover(
1118                    table_file_path,
1119                    checksum,
1120                    global_seqno,
1121                    tree_id,
1122                    config.cache.clone(),
1123                    config.descriptor_table.clone(),
1124                    pin_filter,
1125                    pin_index,
1126                    #[cfg(feature = "metrics")]
1127                    metrics.clone(),
1128                )?;
1129
1130                tables.push(table);
1131
1132                if idx % progress_mod == 0 {
1133                    log::debug!("Recovered {idx}/{cnt} tables");
1134                }
1135            } else {
1136                orphaned_tables.push(table_file_path);
1137            }
1138        }
1139
1140        if tables.len() < cnt {
1141            log::error!(
1142                "Recovered less tables than expected: {:?}",
1143                table_map.keys(),
1144            );
1145            return Err(crate::Error::Unrecoverable);
1146        }
1147
1148        log::debug!("Successfully recovered {} tables", tables.len());
1149
1150        let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1151            &tree_path.join(crate::file::BLOBS_FOLDER),
1152            &recovery.blob_file_ids,
1153            tree_id,
1154            config.descriptor_table.as_ref(),
1155        )?;
1156
1157        let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1158
1159        // NOTE: Cleanup old versions
1160        // But only after we definitely recovered the latest version
1161        Self::cleanup_orphaned_version(tree_path, version.id())?;
1162
1163        for table_path in orphaned_tables {
1164            log::debug!("Deleting orphaned table {}", table_path.display());
1165            std::fs::remove_file(&table_path)?;
1166        }
1167
1168        for blob_file_path in orphaned_blob_files {
1169            log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1170            std::fs::remove_file(&blob_file_path)?;
1171        }
1172
1173        Ok(version)
1174    }
1175
1176    fn cleanup_orphaned_version(
1177        path: &Path,
1178        latest_version_id: crate::version::VersionId,
1179    ) -> crate::Result<()> {
1180        let version_str = format!("v{latest_version_id}");
1181
1182        for file in std::fs::read_dir(path)? {
1183            let dirent = file?;
1184
1185            if dirent.file_type()?.is_dir() {
1186                continue;
1187            }
1188
1189            let name = dirent.file_name();
1190
1191            if name.to_string_lossy().starts_with('v') && *name != *version_str {
1192                log::trace!("Cleanup orphaned version {}", name.display());
1193                std::fs::remove_file(dirent.path())?;
1194            }
1195        }
1196
1197        Ok(())
1198    }
1199}