Skip to main content

lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10    compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
11    config::Config,
12    file::CURRENT_VERSION_FILE,
13    format_version::FormatVersion,
14    iter_guard::{IterGuard, IterGuardImpl},
15    key::InternalKey,
16    manifest::Manifest,
17    memtable::Memtable,
18    slice::Slice,
19    table::Table,
20    value::InternalValue,
21    version::{recovery::recover, SuperVersion, SuperVersions, Version},
22    vlog::BlobFile,
23    AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
24    ValueType,
25};
26use inner::{TreeId, TreeInner};
27use std::{
28    ops::{Bound, RangeBounds},
29    path::Path,
30    sync::{Arc, Mutex, RwLock},
31};
32
33#[cfg(feature = "metrics")]
34use crate::metrics::Metrics;
35
36/// Iterator value guard
37pub struct Guard(crate::Result<(UserKey, UserValue)>);
38
39impl IterGuard for Guard {
40    fn into_inner_if(
41        self,
42        pred: impl Fn(&UserKey) -> bool,
43    ) -> crate::Result<(UserKey, Option<UserValue>)> {
44        let (k, v) = self.0?;
45
46        if pred(&k) {
47            Ok((k, Some(v)))
48        } else {
49            Ok((k, None))
50        }
51    }
52
53    fn key(self) -> crate::Result<UserKey> {
54        self.0.map(|(k, _)| k)
55    }
56
57    fn size(self) -> crate::Result<u32> {
58        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
59        self.into_inner().map(|(_, v)| v.len() as u32)
60    }
61
62    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
63        self.0
64    }
65}
66
67fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
68    if item.is_tombstone() {
69        None
70    } else {
71        Some(item)
72    }
73}
74
75/// A log-structured merge tree (LSM-tree/LSMT)
76#[derive(Clone)]
77pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
78
79impl std::ops::Deref for Tree {
80    type Target = TreeInner;
81
82    fn deref(&self) -> &Self::Target {
83        &self.0
84    }
85}
86
87impl crate::abstract_tree::sealed::Sealed for Tree {}
88
89impl AbstractTree for Tree {
90    fn table_file_cache_size(&self) -> usize {
91        self.config
92            .descriptor_table
93            .as_ref()
94            .map_or(0, |dt| dt.len())
95    }
96
97    fn get_version_history_lock(
98        &self,
99    ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
100        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
101        self.version_history.write().expect("lock is poisoned")
102    }
103
104    fn next_table_id(&self) -> TableId {
105        self.0.table_id_counter.get()
106    }
107
108    fn id(&self) -> TreeId {
109        self.id
110    }
111
112    fn blob_file_count(&self) -> usize {
113        0
114    }
115
116    fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
117        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
118        let super_version = self
119            .version_history
120            .read()
121            .expect("lock is poisoned")
122            .latest_version();
123
124        let key = Slice::from(key);
125
126        for kv in super_version.active_memtable.range_internal((
127            Bound::Included(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)),
128            Bound::Unbounded,
129        )) {
130            log::info!("[Active] {kv:?}");
131        }
132
133        for mt in super_version.sealed_memtables.iter().rev() {
134            for kv in mt.range_internal((
135                Bound::Included(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)),
136                Bound::Unbounded,
137            )) {
138                log::info!("[Sealed #{}] {kv:?}", mt.id());
139            }
140        }
141
142        for table in super_version
143            .version
144            .iter_levels()
145            .flat_map(|lvl| lvl.iter())
146            .filter_map(|run| run.get_for_key_cmp(&key, self.config.comparator.as_ref()))
147        {
148            for kv in table.range(..) {
149                let kv = kv?;
150
151                if kv.key.user_key != key {
152                    break;
153                }
154
155                log::info!("[Table #{}] {kv:?}", table.id());
156            }
157        }
158
159        Ok(())
160    }
161
162    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
163        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
164        let super_version = self
165            .version_history
166            .read()
167            .expect("lock is poisoned")
168            .get_version_for_snapshot(seqno);
169
170        Self::get_internal_entry_from_version(
171            &super_version,
172            key,
173            seqno,
174            self.config.comparator.as_ref(),
175        )
176    }
177
178    fn current_version(&self) -> Version {
179        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
180        self.version_history
181            .read()
182            .expect("poisoned")
183            .latest_version()
184            .version
185    }
186
187    fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
188        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
189        self.flush_lock.lock().expect("lock is poisoned")
190    }
191
192    #[cfg(feature = "metrics")]
193    fn metrics(&self) -> &Arc<crate::Metrics> {
194        &self.0.metrics
195    }
196
197    fn version_free_list_len(&self) -> usize {
198        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
199        self.version_history
200            .read()
201            .expect("lock is poisoned")
202            .free_list_len()
203    }
204
205    fn prefix<K: AsRef<[u8]>>(
206        &self,
207        prefix: K,
208        seqno: SeqNo,
209        index: Option<(Arc<Memtable>, SeqNo)>,
210    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
211        Box::new(
212            self.create_prefix(&prefix, seqno, index)
213                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
214        )
215    }
216
217    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
218        &self,
219        range: R,
220        seqno: SeqNo,
221        index: Option<(Arc<Memtable>, SeqNo)>,
222    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
223        Box::new(
224            self.create_range(&range, seqno, index)
225                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
226        )
227    }
228
229    /// Returns the number of tombstones in the tree.
230    fn tombstone_count(&self) -> u64 {
231        self.current_version()
232            .iter_tables()
233            .map(Table::tombstone_count)
234            .sum()
235    }
236
237    /// Returns the number of weak tombstones (single deletes) in the tree.
238    fn weak_tombstone_count(&self) -> u64 {
239        self.current_version()
240            .iter_tables()
241            .map(Table::weak_tombstone_count)
242            .sum()
243    }
244
245    /// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
246    fn weak_tombstone_reclaimable_count(&self) -> u64 {
247        self.current_version()
248            .iter_tables()
249            .map(Table::weak_tombstone_reclaimable)
250            .sum()
251    }
252
253    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
254        let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
255
256        if is_empty {
257            return Ok(());
258        }
259
260        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
261
262        // IMPORTANT: Write lock so we can be the only compaction going on
263        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
264        let _lock = self
265            .0
266            .major_compaction_lock
267            .write()
268            .expect("lock is poisoned");
269
270        log::info!("Starting drop_range compaction");
271        self.inner_compact(strategy, 0)?;
272        Ok(())
273    }
274
275    fn clear(&self) -> crate::Result<()> {
276        let mut versions = self.get_version_history_lock();
277
278        versions.upgrade_version(
279            &self.config.path,
280            |v| {
281                let mut copy = v.clone();
282                copy.active_memtable = Arc::new(Memtable::new(
283                    self.memtable_id_counter.next(),
284                    self.config.comparator.clone(),
285                ));
286                copy.sealed_memtables = Arc::default();
287                copy.version = Version::new(v.version.id() + 1, self.tree_type());
288                Ok(copy)
289            },
290            &self.config.seqno,
291            &self.config.visible_seqno,
292            &*self.config.fs,
293        )
294    }
295
296    #[doc(hidden)]
297    fn major_compact(
298        &self,
299        target_size: u64,
300        seqno_threshold: SeqNo,
301    ) -> crate::Result<crate::compaction::CompactionResult> {
302        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
303
304        // IMPORTANT: Write lock so we can be the only compaction going on
305        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
306        let _lock = self
307            .0
308            .major_compaction_lock
309            .write()
310            .expect("lock is poisoned");
311
312        log::info!("Starting major compaction");
313        self.inner_compact(strategy, seqno_threshold)
314    }
315
316    fn l0_run_count(&self) -> usize {
317        self.current_version()
318            .level(0)
319            .map(|x| x.run_count())
320            .unwrap_or_default()
321    }
322
323    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
324        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
325        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
326    }
327
328    fn filter_size(&self) -> u64 {
329        self.current_version()
330            .iter_tables()
331            .map(Table::filter_size)
332            .map(u64::from)
333            .sum()
334    }
335
336    fn pinned_filter_size(&self) -> usize {
337        self.current_version()
338            .iter_tables()
339            .map(Table::pinned_filter_size)
340            .sum()
341    }
342
343    fn pinned_block_index_size(&self) -> usize {
344        self.current_version()
345            .iter_tables()
346            .map(Table::pinned_block_index_size)
347            .sum()
348    }
349
350    fn sealed_memtable_count(&self) -> usize {
351        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
352        self.version_history
353            .read()
354            .expect("lock is poisoned")
355            .latest_version()
356            .sealed_memtables
357            .len()
358    }
359
360    fn flush_to_tables_with_rt(
361        &self,
362        stream: impl Iterator<Item = crate::Result<InternalValue>>,
363        range_tombstones: Vec<crate::range_tombstone::RangeTombstone>,
364    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
365        use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
366        use std::time::Instant;
367
368        let start = Instant::now();
369
370        let folder = self.config.path.join(TABLES_FOLDER);
371
372        let data_block_size = self.config.data_block_size_policy.get(0);
373
374        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
375        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
376
377        let data_block_compression = self.config.data_block_compression_policy.get(0);
378        let index_block_compression = self.config.index_block_compression_policy.get(0);
379
380        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
381
382        let index_partitioning = self.config.index_block_partitioning_policy.get(0);
383        let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
384
385        log::debug!(
386            "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}",
387            folder.display(),
388        );
389
390        let mut table_writer = MultiWriter::new(
391            folder.clone(),
392            self.table_id_counter.clone(),
393            64 * 1_024 * 1_024,
394            0,
395            self.config.fs.clone(),
396        )?
397        .use_data_block_restart_interval(data_block_restart_interval)
398        .use_index_block_restart_interval(index_block_restart_interval)
399        .use_data_block_compression(data_block_compression)
400        .use_index_block_compression(index_block_compression)
401        .use_data_block_size(data_block_size)
402        .use_data_block_hash_ratio(data_block_hash_ratio)
403        .use_bloom_policy({
404            use crate::config::FilterPolicyEntry::{Bloom, None};
405            use crate::table::filter::BloomConstructionPolicy;
406
407            match self.config.filter_policy.get(0) {
408                Bloom(policy) => policy,
409                None => BloomConstructionPolicy::BitsPerKey(0.0),
410            }
411        });
412
413        if index_partitioning {
414            table_writer = table_writer.use_partitioned_index();
415        }
416        if filter_partitioning {
417            table_writer = table_writer.use_partitioned_filter();
418        }
419
420        table_writer = table_writer.use_prefix_extractor(self.config.prefix_extractor.clone());
421        table_writer = table_writer.use_encryption(self.config.encryption.clone());
422
423        #[cfg(feature = "zstd")]
424        {
425            table_writer = table_writer.use_zstd_dictionary(self.config.zstd_dictionary.clone());
426        }
427
428        // Set range tombstones BEFORE writing KV items so that if MultiWriter
429        // rotates to a new table during the write loop, earlier tables already
430        // carry the RT metadata.
431        table_writer.set_range_tombstones(range_tombstones);
432
433        for item in stream {
434            table_writer.write(item?)?;
435        }
436
437        let result = table_writer.finish()?;
438
439        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
440
441        let pin_filter = self.config.filter_block_pinning_policy.get(0);
442        let pin_index = self.config.index_block_pinning_policy.get(0);
443
444        // Load tables
445        let tables = result
446            .into_iter()
447            .map(|(table_id, checksum)| -> crate::Result<Table> {
448                Table::recover(
449                    folder.join(table_id.to_string()),
450                    checksum,
451                    0,
452                    self.id,
453                    self.config.cache.clone(),
454                    self.config.descriptor_table.clone(),
455                    pin_filter,
456                    pin_index,
457                    self.config.encryption.clone(),
458                    #[cfg(feature = "zstd")]
459                    self.config.zstd_dictionary.clone(),
460                    self.config.comparator.clone(),
461                    #[cfg(feature = "metrics")]
462                    self.metrics.clone(),
463                )
464            })
465            .collect::<crate::Result<Vec<_>>>()?;
466
467        // Return Some even when tables is empty (RT-only flush): the caller
468        // (AbstractTree::flush) handles empty tables by re-inserting RTs into
469        // the active memtable and still needs to delete sealed memtables.
470        Ok(Some((tables, None)))
471    }
472
473    #[expect(clippy::significant_drop_tightening)]
474    fn register_tables(
475        &self,
476        tables: &[Table],
477        blob_files: Option<&[BlobFile]>,
478        frag_map: Option<crate::blob_tree::FragmentationMap>,
479        sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
480        gc_watermark: SeqNo,
481    ) -> crate::Result<()> {
482        log::trace!(
483            "Registering {} tables, {} blob files",
484            tables.len(),
485            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
486        );
487
488        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
489        let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
490        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
491        let mut version_lock = self.version_history.write().expect("lock is poisoned");
492
493        version_lock.upgrade_version(
494            &self.config.path,
495            |current| {
496                let mut copy = current.clone();
497
498                let ctx = crate::version::TransformContext::new(self.config.comparator.as_ref());
499                copy.version = copy.version.with_new_l0_run(
500                    tables,
501                    blob_files,
502                    frag_map.filter(|x| !x.is_empty()),
503                    &ctx,
504                );
505
506                for &table_id in sealed_memtables_to_delete {
507                    log::trace!("releasing sealed memtable #{table_id}");
508                    copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
509                }
510
511                Ok(copy)
512            },
513            &self.config.seqno,
514            &self.config.visible_seqno,
515            &*self.config.fs,
516        )?;
517
518        if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
519            log::warn!("Version GC failed: {e:?}");
520        }
521
522        Ok(())
523    }
524
525    fn clear_active_memtable(&self) {
526        use crate::tree::sealed::SealedMemtables;
527
528        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
529        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
530        let super_version = version_history_lock.latest_version();
531
532        if super_version.active_memtable.is_empty() {
533            return;
534        }
535
536        let mut copy = version_history_lock.latest_version();
537        copy.active_memtable = Arc::new(Memtable::new(
538            self.memtable_id_counter.next(),
539            self.config.comparator.clone(),
540        ));
541        copy.sealed_memtables = Arc::new(SealedMemtables::default());
542
543        // Rotate does not modify the memtable, so it cannot break snapshots
544        copy.seqno = super_version.seqno;
545
546        version_history_lock.replace_latest_version(copy);
547
548        log::trace!("cleared active memtable");
549    }
550
551    fn compact(
552        &self,
553        strategy: Arc<dyn CompactionStrategy>,
554        seqno_threshold: SeqNo,
555    ) -> crate::Result<crate::compaction::CompactionResult> {
556        // NOTE: Read lock major compaction lock
557        // That way, if a major compaction is running, we cannot proceed
558        // But in general, parallel (non-major) compactions can occur
559        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
560        let _lock = self
561            .0
562            .major_compaction_lock
563            .read()
564            .expect("lock is poisoned");
565
566        self.inner_compact(strategy, seqno_threshold)
567    }
568
569    fn get_next_table_id(&self) -> TableId {
570        self.0.get_next_table_id()
571    }
572
573    fn tree_config(&self) -> &Config {
574        &self.config
575    }
576
577    fn active_memtable(&self) -> Arc<Memtable> {
578        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
579        self.version_history
580            .read()
581            .expect("lock is poisoned")
582            .latest_version()
583            .active_memtable
584    }
585
586    fn tree_type(&self) -> crate::TreeType {
587        crate::TreeType::Standard
588    }
589
590    #[expect(clippy::significant_drop_tightening)]
591    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
592        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
593        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
594        let super_version = version_history_lock.latest_version();
595
596        if super_version.active_memtable.is_empty() {
597            return None;
598        }
599
600        let yanked_memtable = super_version.active_memtable;
601
602        let mut copy = version_history_lock.latest_version();
603        copy.active_memtable = Arc::new(Memtable::new(
604            self.memtable_id_counter.next(),
605            self.config.comparator.clone(),
606        ));
607        copy.sealed_memtables =
608            Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
609
610        // Rotate does not modify the memtable so it cannot break snapshots
611        copy.seqno = super_version.seqno;
612
613        version_history_lock.replace_latest_version(copy);
614
615        log::trace!(
616            "rotate: added memtable id={} to sealed memtables",
617            yanked_memtable.id,
618        );
619
620        Some(yanked_memtable)
621    }
622
623    fn table_count(&self) -> usize {
624        self.current_version().table_count()
625    }
626
627    fn level_table_count(&self, idx: usize) -> Option<usize> {
628        self.current_version().level(idx).map(|x| x.table_count())
629    }
630
631    fn approximate_len(&self) -> usize {
632        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
633        let super_version = self
634            .version_history
635            .read()
636            .expect("lock is poisoned")
637            .latest_version();
638
639        let tables_item_count = self
640            .current_version()
641            .iter_tables()
642            .map(|x| x.metadata.item_count)
643            .sum::<u64>();
644
645        let memtable_count = super_version.active_memtable.len() as u64;
646        let sealed_count = super_version
647            .sealed_memtables
648            .iter()
649            .map(|mt| mt.len())
650            .sum::<usize>() as u64;
651
652        #[expect(clippy::expect_used, reason = "result should fit into usize")]
653        (memtable_count + sealed_count + tables_item_count)
654            .try_into()
655            .expect("approximate_len too large for usize")
656    }
657
658    fn disk_space(&self) -> u64 {
659        self.current_version()
660            .iter_levels()
661            .map(super::version::Level::size)
662            .sum()
663    }
664
665    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
666        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
667        let version = self
668            .version_history
669            .read()
670            .expect("lock is poisoned")
671            .latest_version();
672
673        let active = version.active_memtable.get_highest_seqno();
674
675        let sealed = version
676            .sealed_memtables
677            .iter()
678            .map(|mt| mt.get_highest_seqno())
679            .max()
680            .flatten();
681
682        active.max(sealed)
683    }
684
685    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
686        self.current_version()
687            .iter_tables()
688            .map(Table::get_highest_seqno)
689            .max()
690    }
691
692    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
693        let key = key.as_ref();
694
695        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
696        let super_version = self
697            .version_history
698            .read()
699            .expect("lock is poisoned")
700            .get_version_for_snapshot(seqno);
701
702        Self::resolve_or_passthrough(
703            &super_version,
704            key,
705            seqno,
706            self.config.merge_operator.as_ref(),
707            self.config.comparator.as_ref(),
708        )
709    }
710
711    fn multi_get<K: AsRef<[u8]>>(
712        &self,
713        keys: impl IntoIterator<Item = K>,
714        seqno: SeqNo,
715    ) -> crate::Result<Vec<Option<UserValue>>> {
716        let super_version = self.get_version_for_snapshot(seqno);
717
718        keys.into_iter()
719            .map(|key| {
720                Self::resolve_or_passthrough(
721                    &super_version,
722                    key.as_ref(),
723                    seqno,
724                    self.config.merge_operator.as_ref(),
725                    self.config.comparator.as_ref(),
726                )
727            })
728            .collect()
729    }
730
731    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
732        &self,
733        key: K,
734        value: V,
735        seqno: SeqNo,
736    ) -> (u64, u64) {
737        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
738        self.append_entry(value)
739    }
740
741    fn merge<K: Into<UserKey>, V: Into<UserValue>>(
742        &self,
743        key: K,
744        operand: V,
745        seqno: SeqNo,
746    ) -> (u64, u64) {
747        let value = InternalValue::new_merge_operand(key, operand, seqno);
748        self.append_entry(value)
749    }
750
751    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
752        let value = InternalValue::new_tombstone(key, seqno);
753        self.append_entry(value)
754    }
755
756    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
757        let value = InternalValue::new_weak_tombstone(key, seqno);
758        self.append_entry(value)
759    }
760
761    fn remove_range<K: Into<UserKey>>(&self, start: K, end: K, seqno: SeqNo) -> u64 {
762        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
763        let memtable = Arc::clone(
764            &self
765                .version_history
766                .read()
767                .expect("lock is poisoned")
768                .latest_version()
769                .active_memtable,
770        );
771
772        memtable.insert_range_tombstone(start.into(), end.into(), seqno)
773    }
774}
775
776impl Tree {
777    /// Shared point-read logic for `get()` and `multi_get()`: finds the newest
778    /// entry, applies merge resolution or RT suppression, and returns the value.
779    fn resolve_or_passthrough(
780        super_version: &SuperVersion,
781        key: &[u8],
782        seqno: SeqNo,
783        merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
784        comparator: &dyn crate::comparator::UserComparator,
785    ) -> crate::Result<Option<UserValue>> {
786        let entry = Self::get_internal_entry_from_version(super_version, key, seqno, comparator)?;
787
788        match entry {
789            Some(entry) if entry.key.value_type == ValueType::MergeOperand => {
790                if let Some(merge_op) = merge_operator {
791                    // Build a bloom-filtered single-key iterator pipeline that
792                    // reuses MvccStream for merge/RT/Indirection resolution,
793                    // eliminating the previous hand-rolled merge collection.
794                    Self::resolve_merge_via_pipeline(
795                        super_version.clone(),
796                        key,
797                        seqno,
798                        Arc::clone(merge_op),
799                    )
800                } else if Self::is_suppressed_by_range_tombstones(
801                    super_version,
802                    key,
803                    entry.key.seqno,
804                    seqno,
805                    comparator,
806                ) {
807                    Ok(None)
808                } else {
809                    Ok(Some(entry.value))
810                }
811            }
812            Some(entry) => Ok(Some(entry.value)),
813            None => Ok(None),
814        }
815    }
816
817    /// Resolves merge operands for a point read via a bloom-filtered iterator pipeline.
818    ///
819    /// Builds a single-key range (`key..=key`) with bloom pre-filtering, wraps
820    /// all sources in `Merger → MvccStream`, and takes the first result. This
821    /// reuses the unified merge/RT/Indirection resolution logic from `MvccStream`
822    /// instead of duplicating it in a hand-rolled collection loop.
823    ///
824    /// Bloom pre-filtering can reject many disk tables at the filter level,
825    /// which typically improves point-read performance on deep LSM trees.
826    fn resolve_merge_via_pipeline(
827        version: SuperVersion,
828        key: &[u8],
829        seqno: SeqNo,
830        merge_operator: Arc<dyn crate::merge_operator::MergeOperator>,
831    ) -> crate::Result<Option<UserValue>> {
832        use crate::range::{IterState, TreeIter};
833
834        let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
835        // NOTE: Slice::from(&[u8]) copies the key (small, typically < 100 bytes).
836        // This runs once per merge resolution, not per-table — cost is negligible
837        // compared to the I/O saved by partition-aware bloom filtering.
838        let bloom_key = crate::Slice::from(key);
839        let comparator = version.active_memtable.comparator.clone();
840
841        let iter_state = IterState {
842            version,
843            ephemeral: None,
844            merge_operator: Some(merge_operator),
845            comparator,
846            prefix_hash: None,
847            key_hash: Some(key_hash),
848            bloom_key: Some(bloom_key),
849            #[cfg(feature = "metrics")]
850            metrics: None,
851        };
852
853        // Point-read fast path: skips eager RT collection, sort+dedup, table-skip,
854        // and RangeTombstoneFilter wrapper. MvccStream handles merge-internal RT
855        // suppression; a post-merge linear RT check catches the rest.
856        let mut iter = TreeIter::create_range_point(iter_state, key, seqno);
857
858        match iter.next() {
859            Some(Ok(entry)) => Ok(Some(entry.value)),
860            Some(Err(e)) => Err(e),
861            None => Ok(None),
862        }
863    }
864
865    #[doc(hidden)]
866    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
867        version: SuperVersion,
868        range: &'a R,
869        seqno: SeqNo,
870        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
871        merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
872        comparator: crate::comparator::SharedComparator,
873    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
874        Self::create_internal_range_with_prefix_hash(
875            version,
876            range,
877            seqno,
878            ephemeral,
879            merge_operator,
880            comparator,
881            None,
882        )
883    }
884
885    /// Like [`Tree::create_internal_range`], but with an optional prefix hash
886    /// for prefix bloom filter skipping during prefix scans.
887    #[doc(hidden)]
888    pub(crate) fn create_internal_range_with_prefix_hash<
889        'a,
890        K: AsRef<[u8]> + 'a,
891        R: RangeBounds<K> + 'a,
892    >(
893        version: SuperVersion,
894        range: &'a R,
895        seqno: SeqNo,
896        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
897        merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
898        comparator: crate::comparator::SharedComparator,
899        prefix_hash: Option<u64>,
900    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
901        use crate::range::{IterState, TreeIter};
902        use std::ops::Bound::{self, Excluded, Included, Unbounded};
903
904        let lo: Bound<UserKey> = match range.start_bound() {
905            Included(x) => Included(x.as_ref().into()),
906            Excluded(x) => Excluded(x.as_ref().into()),
907            Unbounded => Unbounded,
908        };
909
910        let hi: Bound<UserKey> = match range.end_bound() {
911            Included(x) => Included(x.as_ref().into()),
912            Excluded(x) => Excluded(x.as_ref().into()),
913            Unbounded => Unbounded,
914        };
915
916        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
917
918        let iter_state = IterState {
919            version,
920            ephemeral,
921            merge_operator,
922            comparator,
923            prefix_hash,
924            key_hash: None,
925            bloom_key: None,
926            #[cfg(feature = "metrics")]
927            metrics: None,
928        };
929
930        TreeIter::create_range(iter_state, bounds, seqno)
931    }
932
933    pub(crate) fn get_internal_entry_from_version(
934        super_version: &SuperVersion,
935        key: &[u8],
936        seqno: SeqNo,
937        comparator: &dyn crate::comparator::UserComparator,
938    ) -> crate::Result<Option<InternalValue>> {
939        // Search order: active → sealed → SST (newest first). A point
940        // tombstone in a newer source is authoritative — no older source
941        // can contain a newer value, so returning None is correct.
942        if let Some(entry) = super_version.active_memtable.get(key, seqno) {
943            let Some(entry) = ignore_tombstone_value(entry) else {
944                return Ok(None);
945            };
946
947            // Check if any range tombstone suppresses this entry
948            if Self::is_suppressed_by_range_tombstones(
949                super_version,
950                key,
951                entry.key.seqno,
952                seqno,
953                comparator,
954            ) {
955                return Ok(None);
956            }
957            return Ok(Some(entry));
958        }
959
960        // Now look in sealed memtables
961        if let Some(entry) =
962            Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
963        {
964            let Some(entry) = ignore_tombstone_value(entry) else {
965                return Ok(None);
966            };
967
968            if Self::is_suppressed_by_range_tombstones(
969                super_version,
970                key,
971                entry.key.seqno,
972                seqno,
973                comparator,
974            ) {
975                return Ok(None);
976            }
977            return Ok(Some(entry));
978        }
979
980        // Now look in tables... this may involve disk I/O
981        let entry =
982            Self::get_internal_entry_from_tables(&super_version.version, key, seqno, comparator)?;
983
984        if let Some(entry) = entry {
985            if Self::is_suppressed_by_range_tombstones(
986                super_version,
987                key,
988                entry.key.seqno,
989                seqno,
990                comparator,
991            ) {
992                return Ok(None);
993            }
994            return Ok(Some(entry));
995        }
996
997        Ok(None)
998    }
999
1000    /// Checks if a key at `key_seqno` is suppressed by any range tombstone
1001    /// in the active memtable, sealed memtables, or SST tables, visible at `read_seqno`.
1002    fn is_suppressed_by_range_tombstones(
1003        super_version: &SuperVersion,
1004        key: &[u8],
1005        key_seqno: SeqNo,
1006        read_seqno: SeqNo,
1007        comparator: &dyn crate::comparator::UserComparator,
1008    ) -> bool {
1009        // Check active memtable range tombstones.
1010        // Future optimization: skip lock when memtable has no RTs (atomic count).
1011        if super_version
1012            .active_memtable
1013            .is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno)
1014        {
1015            return true;
1016        }
1017
1018        // Check sealed memtable range tombstones
1019        for mt in super_version.sealed_memtables.iter().rev() {
1020            if mt.is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno) {
1021                return true;
1022            }
1023        }
1024
1025        // Check SST table range tombstones.
1026        //
1027        // Per-table RT lists are sorted by start key (using comparator) on load,
1028        // so binary search narrows candidates to RTs with start <= key.
1029        // The key_range early reject uses the comparator so it works with
1030        // non-lexicographic orderings.
1031        for table in super_version
1032            .version
1033            .iter_levels()
1034            .flat_map(|lvl| lvl.iter())
1035            .flat_map(|run| run.iter())
1036            .filter(|t| !t.range_tombstones().is_empty())
1037            .filter(|t| {
1038                // Early reject: skip tables whose key range doesn't contain the key.
1039                let kr = &t.metadata.key_range;
1040                comparator.compare(kr.min(), key) != std::cmp::Ordering::Greater
1041                    && comparator.compare(key, kr.max()) != std::cmp::Ordering::Greater
1042            })
1043        {
1044            let rts = table.range_tombstones();
1045
1046            // Binary search: find the first RT whose start is > key (in comparator order).
1047            // All RTs before that index have start <= key and are candidates.
1048            let candidate_end = rts.partition_point(|rt| {
1049                comparator.compare(&rt.start, key) != std::cmp::Ordering::Greater
1050            });
1051
1052            for rt in rts.iter().take(candidate_end) {
1053                // Check: start <= key < end (in comparator order) AND seqno visibility.
1054                if rt.visible_at(read_seqno)
1055                    && comparator.compare(&rt.start, key) != std::cmp::Ordering::Greater
1056                    && comparator.compare(key, &rt.end) == std::cmp::Ordering::Less
1057                    && key_seqno < rt.seqno
1058                {
1059                    return true;
1060                }
1061            }
1062        }
1063
1064        false
1065    }
1066
1067    fn get_internal_entry_from_tables(
1068        version: &Version,
1069        key: &[u8],
1070        seqno: SeqNo,
1071        comparator: &dyn crate::comparator::UserComparator,
1072    ) -> crate::Result<Option<InternalValue>> {
1073        // NOTE: Create key hash for hash sharing
1074        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
1075        let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
1076
1077        // L0: optimize_runs may merge disjoint SSTs from different temporal
1078        // epochs into the same run, so run iteration order does not guarantee
1079        // newest-first. We must check ALL runs and keep the highest seqno.
1080        //
1081        // L1+: key ranges within a level do not overlap, so at most one run
1082        // can contain the key — return on the first match.
1083        //
1084        // Once a level yields a match, lower levels cannot contain newer data,
1085        // so we stop early.
1086        for (level_idx, level) in version.iter_levels().enumerate() {
1087            if level_idx == 0 {
1088                let mut best: Option<InternalValue> = None;
1089
1090                for run in level.iter() {
1091                    if let Some(table) = run.get_for_key_cmp(key, comparator) {
1092                        if let Some(item) = table.get(key, seqno, key_hash)? {
1093                            match &best {
1094                                // >= keeps first-seen on tie. Seqno is monotonically
1095                                // unique per write; equal seqno for the same user key
1096                                // across tables is impossible in normal operation.
1097                                Some(current) if current.key.seqno >= item.key.seqno => {}
1098                                _ => {
1099                                    // Short-circuit: seqno is the read horizon, so no
1100                                    // other run in this level can have a higher one.
1101                                    if item.key.seqno == seqno {
1102                                        return Ok(ignore_tombstone_value(item));
1103                                    }
1104                                    best = Some(item);
1105                                }
1106                            }
1107                        }
1108                    }
1109                }
1110
1111                if let Some(entry) = best {
1112                    return Ok(ignore_tombstone_value(entry));
1113                }
1114            } else {
1115                for run in level.iter() {
1116                    if let Some(table) = run.get_for_key_cmp(key, comparator) {
1117                        if let Some(item) = table.get(key, seqno, key_hash)? {
1118                            return Ok(ignore_tombstone_value(item));
1119                        }
1120                    }
1121                }
1122            }
1123        }
1124
1125        Ok(None)
1126    }
1127
1128    fn get_internal_entry_from_sealed_memtables(
1129        super_version: &SuperVersion,
1130        key: &[u8],
1131        seqno: SeqNo,
1132    ) -> Option<InternalValue> {
1133        for mt in super_version.sealed_memtables.iter().rev() {
1134            if let Some(entry) = mt.get(key, seqno) {
1135                return Some(entry);
1136            }
1137        }
1138
1139        None
1140    }
1141
1142    pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
1143        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
1144        self.version_history
1145            .read()
1146            .expect("lock is poisoned")
1147            .get_version_for_snapshot(seqno)
1148    }
1149
1150    /// Normalizes a user-provided range into owned `Bound<Slice>` values.
1151    ///
1152    /// Returns a tuple containing:
1153    /// - the `OwnedBounds` that mirror the original bounds semantics (including
1154    ///   inclusive/exclusive markers and unbounded endpoints), and
1155    /// - a `bool` flag indicating whether the normalized range is logically
1156    ///   empty (e.g., when the lower bound is greater than the upper bound).
1157    ///
1158    /// Callers can use the flag to detect empty ranges and skip further work
1159    /// while still having access to the normalized bounds for non-empty cases.
1160    fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
1161        range: &R,
1162    ) -> (OwnedBounds, bool) {
1163        use Bound::{Excluded, Included, Unbounded};
1164
1165        let start = match range.start_bound() {
1166            Included(key) => Included(Slice::from(key.as_ref())),
1167            Excluded(key) => Excluded(Slice::from(key.as_ref())),
1168            Unbounded => Unbounded,
1169        };
1170
1171        let end = match range.end_bound() {
1172            Included(key) => Included(Slice::from(key.as_ref())),
1173            Excluded(key) => Excluded(Slice::from(key.as_ref())),
1174            Unbounded => Unbounded,
1175        };
1176
1177        let is_empty =
1178            if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
1179                lo.as_ref() > hi.as_ref()
1180            } else {
1181                false
1182            };
1183
1184        (OwnedBounds { start, end }, is_empty)
1185    }
1186
1187    /// Opens an LSM-tree in the given directory.
1188    ///
1189    /// Will recover previous state if the folder was previously
1190    /// occupied by an LSM-tree, including the previous configuration.
1191    /// If not, a new tree will be initialized with the given config.
1192    ///
1193    /// After recovering a previous state, use [`Tree::set_active_memtable`]
1194    /// to fill the memtable with data from a write-ahead log for full durability.
1195    ///
1196    /// # Errors
1197    ///
1198    /// Returns error, if an IO error occurred.
1199    pub(crate) fn open(config: Config) -> crate::Result<Self> {
1200        log::debug!("Opening LSM-tree at {}", config.path.display());
1201
1202        // Check for old version
1203        if config.path.join("version").try_exists()? {
1204            log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
1205            return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
1206        }
1207
1208        let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
1209            Self::recover(config)
1210        } else {
1211            Self::create_new(config)
1212        }?;
1213
1214        Ok(tree)
1215    }
1216
1217    /// Returns `true` if there are some tables that are being compacted.
1218    #[doc(hidden)]
1219    #[must_use]
1220    pub fn is_compacting(&self) -> bool {
1221        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
1222        !self
1223            .compaction_state
1224            .lock()
1225            .expect("lock is poisoned")
1226            .hidden_set()
1227            .is_empty()
1228    }
1229
1230    fn inner_compact(
1231        &self,
1232        strategy: Arc<dyn CompactionStrategy>,
1233        mvcc_gc_watermark: SeqNo,
1234    ) -> crate::Result<crate::compaction::CompactionResult> {
1235        use crate::compaction::worker::{do_compaction, Options};
1236
1237        let mut opts = Options::from_tree(self, strategy);
1238        opts.mvcc_gc_watermark = mvcc_gc_watermark;
1239
1240        let result = do_compaction(&opts)?;
1241
1242        log::debug!("Compaction run over");
1243
1244        Ok(result)
1245    }
1246
1247    #[doc(hidden)]
1248    #[must_use]
1249    pub fn create_iter(
1250        &self,
1251        seqno: SeqNo,
1252        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
1253    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
1254        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
1255    }
1256
1257    #[doc(hidden)]
1258    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
1259        &self,
1260        range: &'a R,
1261        seqno: SeqNo,
1262        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
1263    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
1264        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
1265        let super_version = self
1266            .version_history
1267            .read()
1268            .expect("lock is poisoned")
1269            .get_version_for_snapshot(seqno);
1270
1271        Self::create_internal_range(
1272            super_version,
1273            range,
1274            seqno,
1275            ephemeral,
1276            self.config.merge_operator.clone(),
1277            self.config.comparator.clone(),
1278        )
1279        .map(|item| match item {
1280            Ok(kv) => Ok((kv.key.user_key, kv.value)),
1281            Err(e) => Err(e),
1282        })
1283    }
1284
1285    #[doc(hidden)]
1286    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
1287        &self,
1288        prefix: K,
1289        seqno: SeqNo,
1290        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
1291    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
1292        use crate::prefix::compute_prefix_hash;
1293        use crate::range::{prefix_to_range, IterState, TreeIter};
1294
1295        let prefix_bytes = prefix.as_ref();
1296
1297        let prefix_hash = compute_prefix_hash(self.config.prefix_extractor.as_ref(), prefix_bytes);
1298
1299        let range = prefix_to_range(prefix_bytes);
1300
1301        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
1302        let super_version = self
1303            .version_history
1304            .read()
1305            .expect("lock is poisoned")
1306            .get_version_for_snapshot(seqno);
1307
1308        let iter_state = IterState {
1309            version: super_version,
1310            ephemeral,
1311            merge_operator: self.config.merge_operator.clone(),
1312            comparator: self.config.comparator.clone(),
1313            prefix_hash,
1314            key_hash: None,
1315            bloom_key: None,
1316            #[cfg(feature = "metrics")]
1317            metrics: Some(self.0.metrics.clone()),
1318        };
1319
1320        TreeIter::create_range(iter_state, range, seqno).map(|item| match item {
1321            Ok(kv) => Ok((kv.key.user_key, kv.value)),
1322            Err(e) => Err(e),
1323        })
1324    }
1325
1326    /// Adds an item to the active memtable.
1327    ///
1328    /// Returns the added item's size and new size of the memtable.
1329    #[doc(hidden)]
1330    #[must_use]
1331    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
1332        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
1333        self.version_history
1334            .read()
1335            .expect("lock is poisoned")
1336            .latest_version()
1337            .active_memtable
1338            .insert(value)
1339    }
1340
1341    /// Recovers previous state, by loading the level manifest, tables and blob files.
1342    ///
1343    /// # Errors
1344    ///
1345    /// Returns error, if an IO error occurred.
1346    fn recover(mut config: Config) -> crate::Result<Self> {
1347        use crate::stop_signal::StopSignal;
1348        use inner::get_next_tree_id;
1349
1350        log::info!("Recovering LSM-tree at {}", config.path.display());
1351
1352        // Validate manifest metadata (format version, comparator name)
1353        // BEFORE recover_levels, so a rejected open is side-effect free
1354        // — recover_levels loads tables and cleans up orphans.
1355        // Tree type is checked after recovery (needs the Version object).
1356        // NOTE: the version file is read twice (here for metadata, then inside
1357        // recover_levels for table/blob data). This is intentional — metadata
1358        // validation must complete before any disk-mutating recovery work.
1359        {
1360            let version_id = crate::version::recovery::get_current_version(&config.path)?;
1361            let manifest_path = config.path.join(format!("v{version_id}"));
1362            let reader = sfa::Reader::new(&manifest_path)?;
1363            let manifest = Manifest::decode_from(&manifest_path, &reader)?;
1364
1365            if !matches!(manifest.version, FormatVersion::V3 | FormatVersion::V4) {
1366                return Err(crate::Error::InvalidVersion(manifest.version.into()));
1367            }
1368
1369            let supplied_name = config.comparator.name();
1370            if manifest.comparator_name != supplied_name {
1371                log::warn!(
1372                    "Comparator mismatch: tree was created with {:?} but opened with {:?}",
1373                    manifest.comparator_name,
1374                    supplied_name,
1375                );
1376                return Err(crate::Error::ComparatorMismatch {
1377                    stored: manifest.comparator_name,
1378                    supplied: supplied_name,
1379                });
1380            }
1381
1382            // IMPORTANT: Restore persisted config
1383            config.level_count = manifest.level_count;
1384        }
1385
1386        let tree_id = get_next_tree_id();
1387
1388        #[cfg(feature = "metrics")]
1389        let metrics = Arc::new(Metrics::default());
1390
1391        let version = Self::recover_levels(
1392            &config.path,
1393            tree_id,
1394            &config,
1395            #[cfg(feature = "metrics")]
1396            &metrics,
1397        )?;
1398
1399        {
1400            let requested_tree_type = match config.kv_separation_opts {
1401                Some(_) => crate::TreeType::Blob,
1402                None => crate::TreeType::Standard,
1403            };
1404
1405            if version.tree_type() != requested_tree_type {
1406                log::error!(
1407                    "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
1408                    version.tree_type(),
1409                );
1410                return Err(crate::Error::Unrecoverable);
1411            }
1412        }
1413
1414        let highest_table_id = version
1415            .iter_tables()
1416            .map(Table::id)
1417            .max()
1418            .unwrap_or_default();
1419
1420        let comparator = config.comparator.clone();
1421
1422        let inner = TreeInner {
1423            id: tree_id,
1424            memtable_id_counter: SequenceNumberCounter::new(1),
1425            table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
1426            blob_file_id_counter: SequenceNumberCounter::default(),
1427            version_history: Arc::new(RwLock::new(SuperVersions::new(version, comparator))),
1428            stop_signal: StopSignal::default(),
1429            config: Arc::new(config),
1430            major_compaction_lock: RwLock::default(),
1431            flush_lock: Mutex::default(),
1432            compaction_state: Arc::new(Mutex::new(CompactionState::default())),
1433
1434            #[cfg(feature = "metrics")]
1435            metrics,
1436        };
1437
1438        Ok(Self(Arc::new(inner)))
1439    }
1440
1441    /// Creates a new LSM-tree in a directory.
1442    fn create_new(config: Config) -> crate::Result<Self> {
1443        use crate::file::{fsync_directory, TABLES_FOLDER};
1444        use crate::fs::Fs;
1445
1446        let path = config.path.clone();
1447        log::trace!("Creating LSM-tree at {}", path.display());
1448
1449        (*config.fs).create_dir_all(&path)?;
1450
1451        let table_folder_path = path.join(TABLES_FOLDER);
1452        (*config.fs).create_dir_all(&table_folder_path)?;
1453
1454        // IMPORTANT: fsync folders on Unix
1455        fsync_directory(&table_folder_path, &*config.fs)?;
1456        fsync_directory(&path, &*config.fs)?;
1457
1458        let inner = TreeInner::create_new(config)?;
1459        Ok(Self(Arc::new(inner)))
1460    }
1461
1462    /// Recovers the level manifest, loading all tables from disk.
1463    #[expect(
1464        clippy::too_many_lines,
1465        reason = "recovery logic is inherently complex"
1466    )]
1467    fn recover_levels<P: AsRef<Path>>(
1468        tree_path: P,
1469        tree_id: TreeId,
1470        config: &Config,
1471        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
1472    ) -> crate::Result<Version> {
1473        use crate::{file::fsync_directory, file::TABLES_FOLDER, fs::Fs, TableId};
1474
1475        let tree_path = tree_path.as_ref();
1476
1477        let recovery = recover(tree_path)?;
1478
1479        let table_map = {
1480            let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum, SeqNo)> =
1481                crate::HashMap::default();
1482
1483            for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1484                for run in table_ids {
1485                    for table in run {
1486                        #[expect(
1487                            clippy::expect_used,
1488                            reason = "there are always less than 256 levels"
1489                        )]
1490                        result.insert(
1491                            table.id,
1492                            (
1493                                level_idx
1494                                    .try_into()
1495                                    .expect("there are less than 256 levels"),
1496                                table.checksum,
1497                                table.global_seqno,
1498                            ),
1499                        );
1500                    }
1501                }
1502            }
1503
1504            result
1505        };
1506
1507        let cnt = table_map.len();
1508
1509        log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1510
1511        let progress_mod = match cnt {
1512            _ if cnt <= 20 => 1,
1513            _ if cnt <= 100 => 10,
1514            _ => 100,
1515        };
1516
1517        let mut tables = vec![];
1518
1519        let table_base_folder = tree_path.join(TABLES_FOLDER);
1520
1521        if !config.fs.exists(&table_base_folder)? {
1522            (*config.fs).create_dir_all(&table_base_folder)?;
1523            fsync_directory(&table_base_folder, &*config.fs)?;
1524        }
1525
1526        let mut orphaned_tables = vec![];
1527
1528        for (idx, dirent) in config
1529            .fs
1530            .read_dir(&table_base_folder)?
1531            .into_iter()
1532            .enumerate()
1533        {
1534            let crate::fs::FsDirEntry {
1535                path: table_file_path,
1536                file_name,
1537                is_dir,
1538            } = dirent;
1539
1540            // https://en.wikipedia.org/wiki/.DS_Store
1541            if file_name == ".DS_Store" {
1542                continue;
1543            }
1544
1545            // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
1546            if file_name.starts_with("._") {
1547                continue;
1548            }
1549
1550            let table_file_name = &file_name;
1551            assert!(!is_dir);
1552
1553            let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1554                log::error!("invalid table file name {table_file_name:?}: {e:?}");
1555                crate::Error::Unrecoverable
1556            })?;
1557
1558            if let Some(&(level_idx, checksum, global_seqno)) = table_map.get(&table_id) {
1559                let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
1560                let pin_index = config.index_block_pinning_policy.get(level_idx.into());
1561
1562                let table = Table::recover(
1563                    table_file_path,
1564                    checksum,
1565                    global_seqno,
1566                    tree_id,
1567                    config.cache.clone(),
1568                    config.descriptor_table.clone(),
1569                    pin_filter,
1570                    pin_index,
1571                    config.encryption.clone(),
1572                    #[cfg(feature = "zstd")]
1573                    config.zstd_dictionary.clone(),
1574                    config.comparator.clone(),
1575                    #[cfg(feature = "metrics")]
1576                    metrics.clone(),
1577                )?;
1578
1579                tables.push(table);
1580
1581                if idx % progress_mod == 0 {
1582                    log::debug!("Recovered {idx}/{cnt} tables");
1583                }
1584            } else {
1585                orphaned_tables.push(table_file_path);
1586            }
1587        }
1588
1589        if tables.len() < cnt {
1590            log::error!(
1591                "Recovered less tables than expected: {:?}",
1592                table_map.keys(),
1593            );
1594            return Err(crate::Error::Unrecoverable);
1595        }
1596
1597        log::debug!("Successfully recovered {} tables", tables.len());
1598
1599        let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1600            &tree_path.join(crate::file::BLOBS_FOLDER),
1601            &recovery.blob_file_ids,
1602            tree_id,
1603            config.descriptor_table.as_ref(),
1604        )?;
1605
1606        let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1607
1608        // NOTE: Cleanup old versions
1609        // But only after we definitely recovered the latest version
1610        Self::cleanup_orphaned_version(tree_path, version.id())?;
1611
1612        for table_path in orphaned_tables {
1613            log::debug!("Deleting orphaned table {}", table_path.display());
1614            std::fs::remove_file(&table_path)?;
1615        }
1616
1617        for blob_file_path in orphaned_blob_files {
1618            log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1619            std::fs::remove_file(&blob_file_path)?;
1620        }
1621
1622        Ok(version)
1623    }
1624
1625    fn cleanup_orphaned_version(
1626        path: &Path,
1627        latest_version_id: crate::version::VersionId,
1628    ) -> crate::Result<()> {
1629        let version_str = format!("v{latest_version_id}");
1630
1631        for file in std::fs::read_dir(path)? {
1632            let dirent = file?;
1633
1634            if dirent.file_type()?.is_dir() {
1635                continue;
1636            }
1637
1638            let name = dirent.file_name();
1639
1640            if name.to_string_lossy().starts_with('v') && *name != *version_str {
1641                log::trace!("Cleanup orphaned version {}", name.display());
1642                std::fs::remove_file(dirent.path())?;
1643            }
1644        }
1645
1646        Ok(())
1647    }
1648}