lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::{
10    compaction::{drop_range::OwnedBounds, CompactionStrategy},
11    config::Config,
12    file::CURRENT_VERSION_FILE,
13    format_version::FormatVersion,
14    iter_guard::{IterGuard, IterGuardImpl},
15    manifest::Manifest,
16    memtable::Memtable,
17    slice::Slice,
18    table::Table,
19    value::InternalValue,
20    version::{recovery::recover, SuperVersion, SuperVersions, Version},
21    vlog::BlobFile,
22    AbstractTree, Cache, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey,
23    UserValue, ValueType,
24};
25use inner::{TreeId, TreeInner};
26use std::{
27    ops::{Bound, RangeBounds},
28    path::Path,
29    sync::{Arc, Mutex, RwLock},
30};
31
32#[cfg(feature = "metrics")]
33use crate::metrics::Metrics;
34
35/// Iterator value guard
36pub struct Guard(crate::Result<(UserKey, UserValue)>);
37
38impl IterGuard for Guard {
39    fn into_inner_if(
40        self,
41        pred: impl Fn(&UserKey) -> bool,
42    ) -> crate::Result<(UserKey, Option<UserValue>)> {
43        let (k, v) = self.0?;
44
45        if pred(&k) {
46            Ok((k, Some(v)))
47        } else {
48            Ok((k, None))
49        }
50    }
51
52    fn key(self) -> crate::Result<UserKey> {
53        self.0.map(|(k, _)| k)
54    }
55
56    fn size(self) -> crate::Result<u32> {
57        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
58        self.into_inner().map(|(_, v)| v.len() as u32)
59    }
60
61    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
62        self.0
63    }
64}
65
66fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
67    if item.is_tombstone() {
68        None
69    } else {
70        Some(item)
71    }
72}
73
74/// A log-structured merge tree (LSM-tree/LSMT)
75#[derive(Clone)]
76pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
77
78impl std::ops::Deref for Tree {
79    type Target = TreeInner;
80
81    fn deref(&self) -> &Self::Target {
82        &self.0
83    }
84}
85
86impl AbstractTree for Tree {
87    fn table_file_cache_size(&self) -> usize {
88        self.config.descriptor_table.len()
89    }
90
91    fn get_version_history_lock(
92        &self,
93    ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
94        self.version_history.write().expect("lock is poisoned")
95    }
96
97    fn next_table_id(&self) -> TableId {
98        self.0.table_id_counter.get()
99    }
100
101    fn id(&self) -> TreeId {
102        self.id
103    }
104
105    fn blob_file_count(&self) -> usize {
106        0
107    }
108
109    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
110        let super_version = self
111            .version_history
112            .read()
113            .expect("lock is poisoned")
114            .get_version_for_snapshot(seqno);
115
116        Self::get_internal_entry_from_version(&super_version, key, seqno)
117    }
118
119    fn current_version(&self) -> Version {
120        self.version_history
121            .read()
122            .expect("poisoned")
123            .latest_version()
124            .version
125    }
126
127    fn get_flush_lock(&self) -> std::sync::MutexGuard<'_, ()> {
128        self.flush_lock.lock().expect("lock is poisoned")
129    }
130
131    #[cfg(feature = "metrics")]
132    fn metrics(&self) -> &Arc<crate::Metrics> {
133        &self.0.metrics
134    }
135
136    fn version_free_list_len(&self) -> usize {
137        self.version_history
138            .read()
139            .expect("lock is poisoned")
140            .free_list_len()
141    }
142
143    fn prefix<K: AsRef<[u8]>>(
144        &self,
145        prefix: K,
146        seqno: SeqNo,
147        index: Option<(Arc<Memtable>, SeqNo)>,
148    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
149        Box::new(
150            self.create_prefix(&prefix, seqno, index)
151                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
152        )
153    }
154
155    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
156        &self,
157        range: R,
158        seqno: SeqNo,
159        index: Option<(Arc<Memtable>, SeqNo)>,
160    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
161        Box::new(
162            self.create_range(&range, seqno, index)
163                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
164        )
165    }
166
167    /// Returns the number of tombstones in the tree.
168    fn tombstone_count(&self) -> u64 {
169        self.current_version()
170            .iter_tables()
171            .map(Table::tombstone_count)
172            .sum()
173    }
174
175    /// Returns the number of weak tombstones (single deletes) in the tree.
176    fn weak_tombstone_count(&self) -> u64 {
177        self.current_version()
178            .iter_tables()
179            .map(Table::weak_tombstone_count)
180            .sum()
181    }
182
183    /// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
184    fn weak_tombstone_reclaimable_count(&self) -> u64 {
185        self.current_version()
186            .iter_tables()
187            .map(Table::weak_tombstone_reclaimable)
188            .sum()
189    }
190
191    fn ingest(
192        &self,
193        iter: impl Iterator<Item = (UserKey, UserValue)>,
194        seqno_generator: &SequenceNumberCounter,
195        visible_seqno: &SequenceNumberCounter,
196    ) -> crate::Result<()> {
197        use crate::tree::ingest::Ingestion;
198        use std::time::Instant;
199
200        let seqno = seqno_generator.next();
201
202        // TODO: allow ingestion always, by flushing memtable
203
204        let mut writer = Ingestion::new(self)?.with_seqno(seqno);
205
206        let start = Instant::now();
207        let mut count = 0;
208        let mut last_key = None;
209
210        for (key, value) in iter {
211            if let Some(last_key) = &last_key {
212                assert!(
213                    key > last_key,
214                    "next key in bulk ingest was not greater than last key, last: {last_key:?}, next: {key:?}",
215                );
216            }
217            last_key = Some(key.clone());
218
219            writer.write(key, value)?;
220
221            count += 1;
222        }
223
224        writer.finish()?;
225
226        visible_seqno.fetch_max(seqno + 1);
227
228        log::info!("Ingested {count} items in {:?}", start.elapsed());
229
230        Ok(())
231    }
232
233    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
234        let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
235
236        if is_empty {
237            return Ok(());
238        }
239
240        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
241
242        // IMPORTANT: Write lock so we can be the only compaction going on
243        let _lock = self
244            .0
245            .major_compaction_lock
246            .write()
247            .expect("lock is poisoned");
248
249        log::info!("Starting drop_range compaction");
250        self.inner_compact(strategy, 0)
251    }
252
253    #[doc(hidden)]
254    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
255        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
256
257        // IMPORTANT: Write lock so we can be the only compaction going on
258        let _lock = self
259            .0
260            .major_compaction_lock
261            .write()
262            .expect("lock is poisoned");
263
264        log::info!("Starting major compaction");
265        self.inner_compact(strategy, seqno_threshold)
266    }
267
268    fn l0_run_count(&self) -> usize {
269        self.current_version()
270            .level(0)
271            .map(|x| x.run_count())
272            .unwrap_or_default()
273    }
274
275    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
276        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
277        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
278    }
279
280    fn filter_size(&self) -> usize {
281        self.current_version()
282            .iter_tables()
283            .map(Table::filter_size)
284            .sum()
285    }
286
287    fn pinned_filter_size(&self) -> usize {
288        self.current_version()
289            .iter_tables()
290            .map(Table::pinned_filter_size)
291            .sum()
292    }
293
294    fn pinned_block_index_size(&self) -> usize {
295        self.current_version()
296            .iter_tables()
297            .map(Table::pinned_block_index_size)
298            .sum()
299    }
300
301    fn sealed_memtable_count(&self) -> usize {
302        self.version_history
303            .read()
304            .expect("lock is poisoned")
305            .latest_version()
306            .sealed_memtables
307            .len()
308    }
309
310    fn flush_to_tables(
311        &self,
312        stream: impl Iterator<Item = crate::Result<InternalValue>>,
313    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
314        use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter};
315        use std::time::Instant;
316
317        let start = Instant::now();
318
319        let folder = self.config.path.join(TABLES_FOLDER);
320
321        let data_block_size = self.config.data_block_size_policy.get(0);
322
323        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
324        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
325
326        let data_block_compression = self.config.data_block_compression_policy.get(0);
327        let index_block_compression = self.config.index_block_compression_policy.get(0);
328
329        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
330
331        let index_partitioning = self.config.index_block_partitioning_policy.get(0);
332        let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
333
334        log::debug!(
335            "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
336            folder.display(),
337        );
338
339        let mut table_writer = MultiWriter::new(
340            folder.clone(),
341            self.table_id_counter.clone(),
342            64 * 1_024 * 1_024,
343            0,
344        )?
345        .use_data_block_restart_interval(data_block_restart_interval)
346        .use_index_block_restart_interval(index_block_restart_interval)
347        .use_data_block_compression(data_block_compression)
348        .use_index_block_compression(index_block_compression)
349        .use_data_block_size(data_block_size)
350        .use_data_block_hash_ratio(data_block_hash_ratio)
351        .use_bloom_policy({
352            use crate::config::FilterPolicyEntry::{Bloom, None};
353            use crate::table::filter::BloomConstructionPolicy;
354
355            match self.config.filter_policy.get(0) {
356                Bloom(policy) => policy,
357                None => BloomConstructionPolicy::BitsPerKey(0.0),
358            }
359        });
360
361        if index_partitioning {
362            table_writer = table_writer.use_partitioned_index();
363        }
364        if filter_partitioning {
365            table_writer = table_writer.use_partitioned_filter();
366        }
367
368        for item in stream {
369            table_writer.write(item?)?;
370        }
371
372        let result = table_writer.finish()?;
373
374        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
375
376        let pin_filter = self.config.filter_block_pinning_policy.get(0);
377        let pin_index = self.config.index_block_pinning_policy.get(0);
378
379        // Load tables
380        let tables = result
381            .into_iter()
382            .map(|(table_id, checksum)| -> crate::Result<Table> {
383                Table::recover(
384                    folder.join(table_id.to_string()),
385                    checksum,
386                    self.id,
387                    self.config.cache.clone(),
388                    self.config.descriptor_table.clone(),
389                    pin_filter,
390                    pin_index,
391                    #[cfg(feature = "metrics")]
392                    self.metrics.clone(),
393                )
394            })
395            .collect::<crate::Result<Vec<_>>>()?;
396
397        Ok(Some((tables, None)))
398    }
399
400    #[expect(clippy::significant_drop_tightening)]
401    fn register_tables(
402        &self,
403        tables: &[Table],
404        blob_files: Option<&[BlobFile]>,
405        frag_map: Option<crate::blob_tree::FragmentationMap>,
406        sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
407        gc_watermark: SeqNo,
408    ) -> crate::Result<()> {
409        log::trace!(
410            "Registering {} tables, {} blob files",
411            tables.len(),
412            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
413        );
414
415        let mut _compaction_state = self.compaction_state.lock().expect("lock is poisoned");
416        let mut version_lock = self.version_history.write().expect("lock is poisoned");
417
418        version_lock.upgrade_version(
419            &self.config.path,
420            |current| {
421                let mut copy = current.clone();
422
423                copy.version = copy.version.with_new_l0_run(
424                    tables,
425                    blob_files,
426                    frag_map.filter(|x| !x.is_empty()),
427                );
428
429                for &table_id in sealed_memtables_to_delete {
430                    log::trace!("releasing sealed memtable #{table_id}");
431                    copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
432                }
433
434                Ok(copy)
435            },
436            &self.config.seqno,
437        )?;
438
439        if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
440            log::warn!("Version GC failed: {e:?}");
441        }
442
443        Ok(())
444    }
445
446    fn clear_active_memtable(&self) {
447        self.version_history
448            .write()
449            .expect("lock is poisoned")
450            .latest_version()
451            .active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
452    }
453
454    fn set_active_memtable(&self, memtable: Memtable) {
455        self.version_history
456            .write()
457            .expect("lock is poisoned")
458            .latest_version()
459            .active_memtable = Arc::new(memtable);
460    }
461
462    fn add_sealed_memtable(&self, memtable: Arc<Memtable>) {
463        let mut version_lock = self.version_history.write().expect("lock is poisoned");
464        version_lock.append_sealed_memtable(memtable);
465    }
466
467    fn compact(
468        &self,
469        strategy: Arc<dyn CompactionStrategy>,
470        seqno_threshold: SeqNo,
471    ) -> crate::Result<()> {
472        // NOTE: Read lock major compaction lock
473        // That way, if a major compaction is running, we cannot proceed
474        // But in general, parallel (non-major) compactions can occur
475        let _lock = self
476            .0
477            .major_compaction_lock
478            .read()
479            .expect("lock is poisoned");
480
481        self.inner_compact(strategy, seqno_threshold)
482    }
483
484    fn get_next_table_id(&self) -> TableId {
485        self.0.get_next_table_id()
486    }
487
488    fn tree_config(&self) -> &Config {
489        &self.config
490    }
491
492    fn active_memtable_size(&self) -> u64 {
493        use std::sync::atomic::Ordering::Acquire;
494
495        self.version_history
496            .read()
497            .expect("lock is poisoned")
498            .latest_version()
499            .active_memtable
500            .approximate_size
501            .load(Acquire)
502    }
503
504    fn tree_type(&self) -> crate::TreeType {
505        crate::TreeType::Standard
506    }
507
508    #[expect(clippy::significant_drop_tightening)]
509    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
510        let mut version_history_lock = self.version_history.write().expect("lock is poisoned");
511        let super_version = version_history_lock.latest_version();
512
513        if super_version.active_memtable.is_empty() {
514            return None;
515        }
516
517        let yanked_memtable = super_version.active_memtable;
518
519        let mut copy = version_history_lock.latest_version();
520        copy.seqno = self.config.seqno.next();
521        copy.active_memtable = Arc::new(Memtable::new(self.memtable_id_counter.next()));
522        copy.sealed_memtables =
523            Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
524
525        version_history_lock.append_version(copy);
526
527        log::trace!(
528            "rotate: added memtable id={} to sealed memtables",
529            yanked_memtable.id,
530        );
531
532        Some(yanked_memtable)
533    }
534
535    fn table_count(&self) -> usize {
536        self.current_version().table_count()
537    }
538
539    fn level_table_count(&self, idx: usize) -> Option<usize> {
540        self.current_version().level(idx).map(|x| x.table_count())
541    }
542
543    #[expect(clippy::significant_drop_tightening)]
544    fn approximate_len(&self) -> usize {
545        let super_version = self
546            .version_history
547            .read()
548            .expect("lock is poisoned")
549            .latest_version();
550
551        let tables_item_count = self
552            .current_version()
553            .iter_tables()
554            .map(|x| x.metadata.item_count)
555            .sum::<u64>();
556
557        let memtable_count = super_version.active_memtable.len() as u64;
558        let sealed_count = super_version
559            .sealed_memtables
560            .iter()
561            .map(|mt| mt.len())
562            .sum::<usize>() as u64;
563
564        (memtable_count + sealed_count + tables_item_count)
565            .try_into()
566            .expect("should not be too large")
567    }
568
569    fn disk_space(&self) -> u64 {
570        self.current_version()
571            .iter_levels()
572            .map(super::version::Level::size)
573            .sum()
574    }
575
576    #[expect(clippy::significant_drop_tightening)]
577    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
578        let version = self
579            .version_history
580            .read()
581            .expect("lock is poisoned")
582            .latest_version();
583
584        let active = version.active_memtable.get_highest_seqno();
585
586        let sealed = version
587            .sealed_memtables
588            .iter()
589            .map(|mt| mt.get_highest_seqno())
590            .max()
591            .flatten();
592
593        active.max(sealed)
594    }
595
596    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
597        self.current_version()
598            .iter_tables()
599            .map(Table::get_highest_seqno)
600            .max()
601    }
602
603    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
604        Ok(self
605            .get_internal_entry(key.as_ref(), seqno)?
606            .map(|x| x.value))
607    }
608
609    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
610        &self,
611        key: K,
612        value: V,
613        seqno: SeqNo,
614    ) -> (u64, u64) {
615        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
616        self.append_entry(value)
617    }
618
619    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
620        let value = InternalValue::new_tombstone(key, seqno);
621        self.append_entry(value)
622    }
623
624    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
625        let value = InternalValue::new_weak_tombstone(key, seqno);
626        self.append_entry(value)
627    }
628}
629
630impl Tree {
631    #[doc(hidden)]
632    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
633        version: SuperVersion,
634        range: &'a R,
635        seqno: SeqNo,
636        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
637    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
638        use crate::range::{IterState, TreeIter};
639        use std::ops::Bound::{self, Excluded, Included, Unbounded};
640
641        let lo: Bound<UserKey> = match range.start_bound() {
642            Included(x) => Included(x.as_ref().into()),
643            Excluded(x) => Excluded(x.as_ref().into()),
644            Unbounded => Unbounded,
645        };
646
647        let hi: Bound<UserKey> = match range.end_bound() {
648            Included(x) => Included(x.as_ref().into()),
649            Excluded(x) => Excluded(x.as_ref().into()),
650            Unbounded => Unbounded,
651        };
652
653        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
654
655        let iter_state = { IterState { version, ephemeral } };
656
657        TreeIter::create_range(iter_state, bounds, seqno)
658    }
659
660    pub(crate) fn get_internal_entry_from_version(
661        super_version: &SuperVersion,
662        key: &[u8],
663        seqno: SeqNo,
664    ) -> crate::Result<Option<InternalValue>> {
665        if let Some(entry) = super_version.active_memtable.get(key, seqno) {
666            return Ok(ignore_tombstone_value(entry));
667        }
668
669        // Now look in sealed memtables
670        if let Some(entry) =
671            Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
672        {
673            return Ok(ignore_tombstone_value(entry));
674        }
675
676        // Now look in tables... this may involve disk I/O
677        Self::get_internal_entry_from_tables(&super_version.version, key, seqno)
678    }
679
680    fn get_internal_entry_from_tables(
681        version: &Version,
682        key: &[u8],
683        seqno: SeqNo,
684    ) -> crate::Result<Option<InternalValue>> {
685        // NOTE: Create key hash for hash sharing
686        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
687        let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
688
689        for level in version.iter_levels() {
690            for run in level.iter() {
691                // NOTE: Based on benchmarking, binary search is only worth it with ~4 tables
692                if run.len() >= 4 {
693                    if let Some(table) = run.get_for_key(key) {
694                        if let Some(item) = table.get(key, seqno, key_hash)? {
695                            return Ok(ignore_tombstone_value(item));
696                        }
697                    }
698                } else {
699                    // NOTE: Fallback to linear search
700                    for table in run.iter() {
701                        if !table.is_key_in_key_range(key) {
702                            continue;
703                        }
704
705                        if let Some(item) = table.get(key, seqno, key_hash)? {
706                            return Ok(ignore_tombstone_value(item));
707                        }
708                    }
709                }
710            }
711        }
712
713        Ok(None)
714    }
715
716    fn get_internal_entry_from_sealed_memtables(
717        super_version: &SuperVersion,
718        key: &[u8],
719        seqno: SeqNo,
720    ) -> Option<InternalValue> {
721        for mt in super_version.sealed_memtables.iter().rev() {
722            if let Some(entry) = mt.get(key, seqno) {
723                return Some(entry);
724            }
725        }
726
727        None
728    }
729
730    pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
731        self.version_history
732            .read()
733            .expect("lock is poisoned")
734            .get_version_for_snapshot(seqno)
735    }
736
737    /// Normalizes a user-provided range into owned `Bound<Slice>` values.
738    ///
739    /// Returns a tuple containing:
740    /// - the `OwnedBounds` that mirror the original bounds semantics (including
741    ///   inclusive/exclusive markers and unbounded endpoints), and
742    /// - a `bool` flag indicating whether the normalized range is logically
743    ///   empty (e.g., when the lower bound is greater than the upper bound).
744    ///
745    /// Callers can use the flag to detect empty ranges and skip further work
746    /// while still having access to the normalized bounds for non-empty cases.
747    fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
748        range: &R,
749    ) -> (OwnedBounds, bool) {
750        use Bound::{Excluded, Included, Unbounded};
751
752        let start = match range.start_bound() {
753            Included(key) => Included(Slice::from(key.as_ref())),
754            Excluded(key) => Excluded(Slice::from(key.as_ref())),
755            Unbounded => Unbounded,
756        };
757
758        let end = match range.end_bound() {
759            Included(key) => Included(Slice::from(key.as_ref())),
760            Excluded(key) => Excluded(Slice::from(key.as_ref())),
761            Unbounded => Unbounded,
762        };
763
764        let is_empty =
765            if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
766                lo.as_ref() > hi.as_ref()
767            } else {
768                false
769            };
770
771        (OwnedBounds { start, end }, is_empty)
772    }
773
774    /// Opens an LSM-tree in the given directory.
775    ///
776    /// Will recover previous state if the folder was previously
777    /// occupied by an LSM-tree, including the previous configuration.
778    /// If not, a new tree will be initialized with the given config.
779    ///
780    /// After recovering a previous state, use [`Tree::set_active_memtable`]
781    /// to fill the memtable with data from a write-ahead log for full durability.
782    ///
783    /// # Errors
784    ///
785    /// Returns error, if an IO error occurred.
786    pub(crate) fn open(config: Config) -> crate::Result<Self> {
787        log::debug!("Opening LSM-tree at {}", config.path.display());
788
789        // Check for old version
790        if config.path.join("version").try_exists()? {
791            log::error!("It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated.");
792            return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
793        }
794
795        let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
796            Self::recover(config)
797        } else {
798            Self::create_new(config)
799        }?;
800
801        Ok(tree)
802    }
803
804    /// Returns `true` if there are some tables that are being compacted.
805    #[doc(hidden)]
806    #[must_use]
807    pub fn is_compacting(&self) -> bool {
808        !self
809            .compaction_state
810            .lock()
811            .expect("lock is poisoned")
812            .hidden_set()
813            .is_empty()
814    }
815
816    fn inner_compact(
817        &self,
818        strategy: Arc<dyn CompactionStrategy>,
819        mvcc_gc_watermark: SeqNo,
820    ) -> crate::Result<()> {
821        use crate::compaction::worker::{do_compaction, Options};
822
823        let mut opts = Options::from_tree(self, strategy);
824        opts.mvcc_gc_watermark = mvcc_gc_watermark;
825
826        do_compaction(&opts)?;
827
828        log::debug!("Compaction run over");
829
830        Ok(())
831    }
832
833    #[doc(hidden)]
834    #[must_use]
835    pub fn create_iter(
836        &self,
837        seqno: SeqNo,
838        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
839    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
840        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
841    }
842
843    #[doc(hidden)]
844    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
845        &self,
846        range: &'a R,
847        seqno: SeqNo,
848        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
849    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
850        let version_history_lock = self.version_history.read().expect("lock is poisoned");
851        let super_version = version_history_lock.get_version_for_snapshot(seqno);
852
853        Self::create_internal_range(super_version, range, seqno, ephemeral).map(|item| match item {
854            Ok(kv) => Ok((kv.key.user_key, kv.value)),
855            Err(e) => Err(e),
856        })
857    }
858
859    #[doc(hidden)]
860    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
861        &self,
862        prefix: K,
863        seqno: SeqNo,
864        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
865    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
866        use crate::range::prefix_to_range;
867
868        let range = prefix_to_range(prefix.as_ref());
869        self.create_range(&range, seqno, ephemeral)
870    }
871
872    /// Adds an item to the active memtable.
873    ///
874    /// Returns the added item's size and new size of the memtable.
875    #[doc(hidden)]
876    #[must_use]
877    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
878        self.version_history
879            .read()
880            .expect("lock is poisoned")
881            .latest_version()
882            .active_memtable
883            .insert(value)
884    }
885
886    /// Recovers previous state, by loading the level manifest, tables and blob files.
887    ///
888    /// # Errors
889    ///
890    /// Returns error, if an IO error occurred.
891    fn recover(mut config: Config) -> crate::Result<Self> {
892        use crate::stop_signal::StopSignal;
893        use inner::get_next_tree_id;
894
895        log::info!("Recovering LSM-tree at {}", config.path.display());
896
897        // let manifest = {
898        //     let manifest_path = config.path.join(MANIFEST_FILE);
899        //     let reader = sfa::Reader::new(&manifest_path)?;
900        //     Manifest::decode_from(&manifest_path, &reader)?
901        // };
902
903        // if manifest.version != FormatVersion::V3 {
904        //     return Err(crate::Error::InvalidVersion(manifest.version.into()));
905        // }
906
907        let tree_id = get_next_tree_id();
908
909        #[cfg(feature = "metrics")]
910        let metrics = Arc::new(Metrics::default());
911
912        let version = Self::recover_levels(
913            &config.path,
914            tree_id,
915            &config.cache,
916            &config.descriptor_table,
917            #[cfg(feature = "metrics")]
918            &metrics,
919        )?;
920
921        {
922            let manifest_path = config.path.join(format!("v{}", version.id()));
923            let reader = sfa::Reader::new(&manifest_path)?;
924            let manifest = Manifest::decode_from(&manifest_path, &reader)?;
925
926            if manifest.version != FormatVersion::V3 {
927                return Err(crate::Error::InvalidVersion(manifest.version.into()));
928            }
929
930            let requested_tree_type = match config.kv_separation_opts {
931                Some(_) => crate::TreeType::Blob,
932                None => crate::TreeType::Standard,
933            };
934
935            if version.tree_type() != requested_tree_type {
936                log::error!(
937                    "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
938                    version.tree_type(),
939                );
940                return Err(crate::Error::Unrecoverable);
941            }
942
943            // IMPORTANT: Restore persisted config
944            config.level_count = manifest.level_count;
945        }
946
947        let highest_table_id = version
948            .iter_tables()
949            .map(Table::id)
950            .max()
951            .unwrap_or_default();
952
953        let inner = TreeInner {
954            id: tree_id,
955            memtable_id_counter: SequenceNumberCounter::default(),
956            table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
957            blob_file_id_counter: SequenceNumberCounter::default(),
958            version_history: Arc::new(RwLock::new(SuperVersions::new(version))),
959            stop_signal: StopSignal::default(),
960            config,
961            major_compaction_lock: RwLock::default(),
962            flush_lock: Mutex::default(),
963            compaction_state: Arc::new(Mutex::new(
964                crate::compaction::state::CompactionState::default(),
965            )),
966
967            #[cfg(feature = "metrics")]
968            metrics,
969        };
970
971        Ok(Self(Arc::new(inner)))
972    }
973
974    /// Creates a new LSM-tree in a directory.
975    fn create_new(config: Config) -> crate::Result<Self> {
976        use crate::file::{fsync_directory, TABLES_FOLDER};
977        use std::fs::create_dir_all;
978
979        let path = config.path.clone();
980        log::trace!("Creating LSM-tree at {}", path.display());
981
982        create_dir_all(&path)?;
983
984        let table_folder_path = path.join(TABLES_FOLDER);
985        create_dir_all(&table_folder_path)?;
986
987        // // Create manifest
988        // {
989        //     let mut writer = sfa::Writer::new_at_path(manifest_path)?;
990
991        //     Manifest {
992        //         version: FormatVersion::V3,
993        //         level_count: config.level_count,
994        //         tree_type: if config.kv_separation_opts.is_some() {
995        //             TreeType::Blob
996        //         } else {
997        //             TreeType::Standard
998        //         },
999        //     }
1000        //     .encode_into(&mut writer)?;
1001
1002        //     writer.finish()?;
1003        // }
1004
1005        // IMPORTANT: fsync folders on Unix
1006        fsync_directory(&table_folder_path)?;
1007        fsync_directory(&path)?;
1008
1009        let inner = TreeInner::create_new(config)?;
1010        Ok(Self(Arc::new(inner)))
1011    }
1012
1013    /// Recovers the level manifest, loading all tables from disk.
1014    fn recover_levels<P: AsRef<Path>>(
1015        tree_path: P,
1016        tree_id: TreeId,
1017        cache: &Arc<Cache>,
1018        descriptor_table: &Arc<crate::DescriptorTable>,
1019        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
1020    ) -> crate::Result<Version> {
1021        use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
1022
1023        let tree_path = tree_path.as_ref();
1024
1025        let recovery = recover(tree_path)?;
1026
1027        let table_map = {
1028            let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum)> =
1029                crate::HashMap::default();
1030
1031            for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
1032                for run in table_ids {
1033                    for &(table_id, checksum) in run {
1034                        #[expect(
1035                            clippy::expect_used,
1036                            reason = "there are always less than 256 levels"
1037                        )]
1038                        result.insert(
1039                            table_id,
1040                            (
1041                                level_idx
1042                                    .try_into()
1043                                    .expect("there are less than 256 levels"),
1044                                checksum,
1045                            ),
1046                        );
1047                    }
1048                }
1049            }
1050
1051            result
1052        };
1053
1054        let cnt = table_map.len();
1055
1056        log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1057
1058        let progress_mod = match cnt {
1059            _ if cnt <= 20 => 1,
1060            _ if cnt <= 100 => 10,
1061            _ => 100,
1062        };
1063
1064        let mut tables = vec![];
1065
1066        let table_base_folder = tree_path.join(TABLES_FOLDER);
1067
1068        if !table_base_folder.try_exists()? {
1069            std::fs::create_dir_all(&table_base_folder)?;
1070            fsync_directory(&table_base_folder)?;
1071        }
1072
1073        let mut orphaned_tables = vec![];
1074
1075        for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1076            let dirent = dirent?;
1077            let file_name = dirent.file_name();
1078
1079            // https://en.wikipedia.org/wiki/.DS_Store
1080            if file_name == ".DS_Store" {
1081                continue;
1082            }
1083
1084            // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
1085            if file_name.to_string_lossy().starts_with("._") {
1086                continue;
1087            }
1088
1089            let table_file_name = file_name.to_str().ok_or_else(|| {
1090                log::error!("invalid table file name {}", file_name.display());
1091                crate::Error::Unrecoverable
1092            })?;
1093
1094            let table_file_path = dirent.path();
1095            assert!(!table_file_path.is_dir());
1096
1097            log::debug!("Recovering table from {}", table_file_path.display());
1098
1099            let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1100                log::error!("invalid table file name {table_file_name:?}: {e:?}");
1101                crate::Error::Unrecoverable
1102            })?;
1103
1104            if let Some(&(level_idx, checksum)) = table_map.get(&table_id) {
1105                let table = Table::recover(
1106                    table_file_path,
1107                    checksum,
1108                    tree_id,
1109                    cache.clone(),
1110                    descriptor_table.clone(),
1111                    level_idx <= 1, // TODO: look at configuration
1112                    level_idx <= 2, // TODO: look at configuration
1113                    #[cfg(feature = "metrics")]
1114                    metrics.clone(),
1115                )?;
1116
1117                log::debug!("Recovered table from {:?}", table.path);
1118
1119                tables.push(table);
1120
1121                if idx % progress_mod == 0 {
1122                    log::debug!("Recovered {idx}/{cnt} tables");
1123                }
1124            } else {
1125                orphaned_tables.push(table_file_path);
1126            }
1127        }
1128
1129        if tables.len() < cnt {
1130            log::error!(
1131                "Recovered less tables than expected: {:?}",
1132                table_map.keys(),
1133            );
1134            return Err(crate::Error::Unrecoverable);
1135        }
1136
1137        log::debug!("Successfully recovered {} tables", tables.len());
1138
1139        let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1140            &tree_path.join(crate::file::BLOBS_FOLDER),
1141            &recovery.blob_file_ids,
1142        )?;
1143
1144        let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1145
1146        // NOTE: Cleanup old versions
1147        // But only after we definitely recovered the latest version
1148        Self::cleanup_orphaned_version(tree_path, version.id())?;
1149
1150        for table_path in orphaned_tables {
1151            log::debug!("Deleting orphaned table {}", table_path.display());
1152            std::fs::remove_file(&table_path)?;
1153        }
1154
1155        for blob_file_path in orphaned_blob_files {
1156            log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1157            std::fs::remove_file(&blob_file_path)?;
1158        }
1159
1160        Ok(version)
1161    }
1162
1163    fn cleanup_orphaned_version(
1164        path: &Path,
1165        latest_version_id: crate::version::VersionId,
1166    ) -> crate::Result<()> {
1167        let version_str = format!("v{latest_version_id}");
1168
1169        for file in std::fs::read_dir(path)? {
1170            let dirent = file?;
1171
1172            if dirent.file_type()?.is_dir() {
1173                continue;
1174            }
1175
1176            let name = dirent.file_name();
1177
1178            if name.to_string_lossy().starts_with('v') && *name != *version_str {
1179                log::trace!("Cleanup orphaned version {}", name.display());
1180                std::fs::remove_file(dirent.path())?;
1181            }
1182        }
1183
1184        Ok(())
1185    }
1186}