lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod ingest;
6pub mod inner;
7mod sealed;
8
9use crate::{
10    blob_tree::FragmentationMap,
11    compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
12    config::Config,
13    file::BLOBS_FOLDER,
14    format_version::FormatVersion,
15    iter_guard::{IterGuard, IterGuardImpl},
16    manifest::Manifest,
17    memtable::Memtable,
18    slice::Slice,
19    table::Table,
20    tree::inner::SuperVersion,
21    value::InternalValue,
22    version::{recovery::recover, Version, VersionId},
23    vlog::BlobFile,
24    AbstractTree, Cache, Checksum, DescriptorTable, KvPair, SeqNo, SequenceNumberCounter, TableId,
25    TreeType, UserKey, UserValue, ValueType,
26};
27use inner::{MemtableId, TreeId, TreeInner};
28use std::{
29    ops::{Bound, RangeBounds},
30    path::Path,
31    sync::{atomic::AtomicU64, Arc, Mutex, RwLock},
32};
33
34#[cfg(feature = "metrics")]
35use crate::metrics::Metrics;
36
37pub struct Guard(crate::Result<(UserKey, UserValue)>);
38
39impl IterGuard for Guard {
40    fn key(self) -> crate::Result<UserKey> {
41        self.0.map(|(k, _)| k)
42    }
43
44    fn size(self) -> crate::Result<u32> {
45        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
46        self.into_inner().map(|(_, v)| v.len() as u32)
47    }
48
49    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
50        self.0
51    }
52}
53
54fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
55    if item.is_tombstone() {
56        None
57    } else {
58        Some(item)
59    }
60}
61
62/// A log-structured merge tree (LSM-tree/LSMT)
63#[derive(Clone)]
64pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
65
66impl std::ops::Deref for Tree {
67    type Target = TreeInner;
68
69    fn deref(&self) -> &Self::Target {
70        &self.0
71    }
72}
73
74impl AbstractTree for Tree {
75    fn next_table_id(&self) -> TableId {
76        self.0
77            .table_id_counter
78            .load(std::sync::atomic::Ordering::Relaxed)
79    }
80
81    fn id(&self) -> TreeId {
82        self.id
83    }
84
85    #[expect(clippy::significant_drop_tightening)]
86    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
87        let version_lock = self.super_version.read().expect("lock is poisoned");
88
89        if let Some(entry) = version_lock.active_memtable.get(key, seqno) {
90            return Ok(ignore_tombstone_value(entry));
91        }
92
93        // Now look in sealed memtables
94        if let Some(entry) =
95            Self::get_internal_entry_from_sealed_memtables(&version_lock, key, seqno)
96        {
97            return Ok(ignore_tombstone_value(entry));
98        }
99
100        // Now look in tables... this may involve disk I/O
101        Self::get_internal_entry_from_tables(&version_lock, key, seqno)
102    }
103
104    fn current_version(&self) -> Version {
105        self.super_version.read().expect("poisoned").version.clone()
106    }
107
108    fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Table>> {
109        log::debug!("Flushing active memtable");
110
111        let Some((table_id, yanked_memtable)) = self.rotate_memtable() else {
112            return Ok(None);
113        };
114
115        let Some((table, _)) = self.flush_memtable(table_id, &yanked_memtable, seqno_threshold)?
116        else {
117            return Ok(None);
118        };
119        self.register_tables(std::slice::from_ref(&table), None, None, seqno_threshold)?;
120
121        Ok(Some(table))
122    }
123
124    #[cfg(feature = "metrics")]
125    fn metrics(&self) -> &Arc<crate::Metrics> {
126        &self.0.metrics
127    }
128
129    fn version_free_list_len(&self) -> usize {
130        self.compaction_state
131            .lock()
132            .expect("lock is poisoned")
133            .version_free_list_len()
134    }
135
136    fn prefix<K: AsRef<[u8]>>(
137        &self,
138        prefix: K,
139        seqno: SeqNo,
140        index: Option<Arc<Memtable>>,
141    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
142        Box::new(
143            self.create_prefix(&prefix, seqno, index)
144                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
145        )
146    }
147
148    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
149        &self,
150        range: R,
151        seqno: SeqNo,
152        index: Option<Arc<Memtable>>,
153    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
154        Box::new(
155            self.create_range(&range, seqno, index)
156                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
157        )
158    }
159
160    /// Returns the number of tombstones in the tree.
161    fn tombstone_count(&self) -> u64 {
162        self.current_version()
163            .iter_tables()
164            .map(Table::tombstone_count)
165            .sum()
166    }
167
168    /// Returns the number of weak tombstones (single deletes) in the tree.
169    fn weak_tombstone_count(&self) -> u64 {
170        self.current_version()
171            .iter_tables()
172            .map(Table::weak_tombstone_count)
173            .sum()
174    }
175
176    /// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
177    fn weak_tombstone_reclaimable_count(&self) -> u64 {
178        self.current_version()
179            .iter_tables()
180            .map(Table::weak_tombstone_reclaimable)
181            .sum()
182    }
183
184    fn ingest(
185        &self,
186        iter: impl Iterator<Item = (UserKey, UserValue)>,
187        seqno_generator: &SequenceNumberCounter,
188        visible_seqno: &SequenceNumberCounter,
189    ) -> crate::Result<()> {
190        use crate::tree::ingest::Ingestion;
191        use std::time::Instant;
192
193        // // TODO: 3.0.0 ... hmmmm
194        // let global_lock = self.super_version.write().expect("lock is poisoned");
195
196        let seqno = seqno_generator.next();
197
198        // TODO: allow ingestion always, by flushing memtable
199        // assert!(
200        //     global_lock.active_memtable.is_empty(),
201        //     "can only perform bulk ingestion with empty memtable(s)",
202        // );
203        // assert!(
204        //     global_lock.sealed_memtables.len() == 0,
205        //     "can only perform bulk ingestion with empty memtable(s)",
206        // );
207
208        let mut writer = Ingestion::new(self)?.with_seqno(seqno);
209
210        let start = Instant::now();
211        let mut count = 0;
212        let mut last_key = None;
213
214        #[expect(clippy::explicit_counter_loop)]
215        for (key, value) in iter {
216            if let Some(last_key) = &last_key {
217                assert!(
218                    key > last_key,
219                    "next key in bulk ingest was not greater than last key, last: {last_key:?}, next: {key:?}",
220                );
221            }
222            last_key = Some(key.clone());
223
224            writer.write(key, value)?;
225
226            count += 1;
227        }
228
229        writer.finish()?;
230
231        visible_seqno.fetch_max(seqno + 1);
232
233        log::info!("Ingested {count} items in {:?}", start.elapsed());
234
235        Ok(())
236    }
237
238    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
239        let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
240
241        if is_empty {
242            return Ok(());
243        }
244
245        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
246
247        // IMPORTANT: Write lock so we can be the only compaction going on
248        let _lock = self
249            .0
250            .major_compaction_lock
251            .write()
252            .expect("lock is poisoned");
253
254        log::info!("Starting drop_range compaction");
255        self.inner_compact(strategy, 0)
256    }
257
258    #[doc(hidden)]
259    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
260        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
261
262        // IMPORTANT: Write lock so we can be the only compaction going on
263        let _lock = self
264            .0
265            .major_compaction_lock
266            .write()
267            .expect("lock is poisoned");
268
269        log::info!("Starting major compaction");
270        self.inner_compact(strategy, seqno_threshold)
271    }
272
273    fn l0_run_count(&self) -> usize {
274        self.current_version()
275            .level(0)
276            .map(|x| x.run_count())
277            .unwrap_or_default()
278    }
279
280    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
281        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
282        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
283    }
284
285    fn filter_size(&self) -> usize {
286        self.current_version()
287            .iter_tables()
288            .map(Table::filter_size)
289            .sum()
290    }
291
292    fn pinned_filter_size(&self) -> usize {
293        self.current_version()
294            .iter_tables()
295            .map(Table::pinned_filter_size)
296            .sum()
297    }
298
299    fn pinned_block_index_size(&self) -> usize {
300        self.current_version()
301            .iter_tables()
302            .map(Table::pinned_block_index_size)
303            .sum()
304    }
305
306    fn sealed_memtable_count(&self) -> usize {
307        self.super_version
308            .read()
309            .expect("lock is poisoned")
310            .sealed_memtables
311            .len()
312    }
313
314    fn flush_memtable(
315        &self,
316        table_id: TableId,
317        memtable: &Arc<Memtable>,
318        seqno_threshold: SeqNo,
319    ) -> crate::Result<Option<(Table, Option<BlobFile>)>> {
320        use crate::{compaction::stream::CompactionStream, file::TABLES_FOLDER, table::Writer};
321        use std::time::Instant;
322
323        let start = Instant::now();
324
325        let folder = self.config.path.join(TABLES_FOLDER);
326        let table_file_path = folder.join(table_id.to_string());
327
328        let data_block_size = self.config.data_block_size_policy.get(0);
329
330        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
331        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
332
333        let data_block_compression = self.config.data_block_compression_policy.get(0);
334        let index_block_compression = self.config.index_block_compression_policy.get(0);
335
336        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
337
338        let index_partioning = self.config.index_block_partitioning_policy.get(0);
339        let filter_partioning = self.config.filter_block_partitioning_policy.get(0);
340
341        log::debug!(
342            "Flushing table to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
343            table_file_path.display(),
344        );
345
346        let mut table_writer = Writer::new(table_file_path, table_id, 0)?
347            .use_data_block_restart_interval(data_block_restart_interval)
348            .use_index_block_restart_interval(index_block_restart_interval)
349            .use_data_block_compression(data_block_compression)
350            .use_index_block_compression(index_block_compression)
351            .use_data_block_size(data_block_size)
352            .use_data_block_hash_ratio(data_block_hash_ratio)
353            .use_bloom_policy({
354                use crate::config::FilterPolicyEntry::{Bloom, None};
355                use crate::table::filter::BloomConstructionPolicy;
356
357                match self.config.filter_policy.get(0) {
358                    Bloom(policy) => policy,
359                    None => BloomConstructionPolicy::BitsPerKey(0.0),
360                }
361            });
362
363        if index_partioning {
364            table_writer = table_writer.use_partitioned_index();
365        }
366        if filter_partioning {
367            table_writer = table_writer.use_partitioned_filter();
368        }
369
370        let iter = memtable.iter().map(Ok);
371        let compaction_filter = CompactionStream::new(iter, seqno_threshold);
372
373        for item in compaction_filter {
374            table_writer.write(item?)?;
375        }
376
377        let result = self.consume_writer(table_writer)?;
378
379        log::debug!("Flushed memtable {table_id:?} in {:?}", start.elapsed());
380
381        Ok(result.map(|table| (table, None)))
382    }
383
384    #[expect(clippy::significant_drop_tightening)]
385    fn register_tables(
386        &self,
387        tables: &[Table],
388        blob_files: Option<&[BlobFile]>,
389        frag_map: Option<FragmentationMap>,
390        seqno_threshold: SeqNo,
391    ) -> crate::Result<()> {
392        log::trace!(
393            "Registering {} tables, {} blob files",
394            tables.len(),
395            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
396        );
397
398        let mut compaction_state = self.compaction_state.lock().expect("lock is poisoned");
399        let mut super_version = self.super_version.write().expect("lock is poisoned");
400
401        compaction_state.upgrade_version(
402            &mut super_version,
403            |version| {
404                Ok(version.with_new_l0_run(tables, blob_files, frag_map.filter(|x| !x.is_empty())))
405            },
406            seqno_threshold,
407        )?;
408
409        for table in tables {
410            log::trace!("releasing sealed memtable {}", table.id());
411
412            super_version.sealed_memtables =
413                Arc::new(super_version.sealed_memtables.remove(table.id()));
414        }
415
416        Ok(())
417    }
418
419    fn clear_active_memtable(&self) {
420        self.super_version
421            .write()
422            .expect("lock is poisoned")
423            .active_memtable = Arc::new(Memtable::default());
424    }
425
426    fn set_active_memtable(&self, memtable: Memtable) {
427        let mut version_lock = self.super_version.write().expect("lock is poisoned");
428        version_lock.active_memtable = Arc::new(memtable);
429    }
430
431    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
432        let mut version_lock = self.super_version.write().expect("lock is poisoned");
433        version_lock.sealed_memtables = Arc::new(version_lock.sealed_memtables.add(id, memtable));
434    }
435
436    fn compact(
437        &self,
438        strategy: Arc<dyn CompactionStrategy>,
439        seqno_threshold: SeqNo,
440    ) -> crate::Result<()> {
441        // NOTE: Read lock major compaction lock
442        // That way, if a major compaction is running, we cannot proceed
443        // But in general, parallel (non-major) compactions can occur
444        let _lock = self
445            .0
446            .major_compaction_lock
447            .read()
448            .expect("lock is poisoned");
449
450        self.inner_compact(strategy, seqno_threshold)
451    }
452
453    fn get_next_table_id(&self) -> TableId {
454        self.0.get_next_table_id()
455    }
456
457    fn tree_config(&self) -> &Config {
458        &self.config
459    }
460
461    fn active_memtable_size(&self) -> u64 {
462        use std::sync::atomic::Ordering::Acquire;
463
464        self.super_version
465            .read()
466            .expect("lock is poisoned")
467            .active_memtable
468            .approximate_size
469            .load(Acquire)
470    }
471
472    fn tree_type(&self) -> crate::TreeType {
473        crate::TreeType::Standard
474    }
475
476    #[expect(clippy::significant_drop_tightening)]
477    fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
478        let mut version_lock = self.super_version.write().expect("lock is poisoned");
479
480        if version_lock.active_memtable.is_empty() {
481            return None;
482        }
483
484        let yanked_memtable = std::mem::take(&mut version_lock.active_memtable);
485        let yanked_memtable = yanked_memtable;
486
487        let tmp_memtable_id = self.get_next_table_id();
488
489        version_lock.sealed_memtables = Arc::new(
490            version_lock
491                .sealed_memtables
492                .add(tmp_memtable_id, yanked_memtable.clone()),
493        );
494
495        log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
496
497        Some((tmp_memtable_id, yanked_memtable))
498    }
499
500    fn table_count(&self) -> usize {
501        self.current_version().table_count()
502    }
503
504    fn level_table_count(&self, idx: usize) -> Option<usize> {
505        self.current_version().level(idx).map(|x| x.table_count())
506    }
507
508    #[expect(clippy::significant_drop_tightening)]
509    fn approximate_len(&self) -> usize {
510        let version = self.super_version.read().expect("lock is poisoned");
511
512        let tables_item_count = self
513            .current_version()
514            .iter_tables()
515            .map(|x| x.metadata.item_count)
516            .sum::<u64>();
517
518        let memtable_count = version.active_memtable.len() as u64;
519        let sealed_count = version
520            .sealed_memtables
521            .iter()
522            .map(|(_, mt)| mt.len())
523            .sum::<usize>() as u64;
524
525        (memtable_count + sealed_count + tables_item_count)
526            .try_into()
527            .expect("should not be too large")
528    }
529
530    fn disk_space(&self) -> u64 {
531        self.current_version()
532            .iter_levels()
533            .map(super::version::Level::size)
534            .sum()
535    }
536
537    #[expect(clippy::significant_drop_tightening)]
538    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
539        let version = self.super_version.read().expect("lock is poisoned");
540
541        let active = version.active_memtable.get_highest_seqno();
542
543        let sealed = version
544            .sealed_memtables
545            .iter()
546            .map(|(_, table)| table.get_highest_seqno())
547            .max()
548            .flatten();
549
550        active.max(sealed)
551    }
552
553    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
554        self.current_version()
555            .iter_tables()
556            .map(Table::get_highest_seqno)
557            .max()
558    }
559
560    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
561        Ok(self
562            .get_internal_entry(key.as_ref(), seqno)?
563            .map(|x| x.value))
564    }
565
566    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
567        &self,
568        key: K,
569        value: V,
570        seqno: SeqNo,
571    ) -> (u64, u64) {
572        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
573        self.append_entry(value)
574    }
575
576    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
577        let value = InternalValue::new_tombstone(key, seqno);
578        self.append_entry(value)
579    }
580
581    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
582        let value = InternalValue::new_weak_tombstone(key, seqno);
583        self.append_entry(value)
584    }
585}
586
587impl Tree {
588    /// Normalizes a user-provided range into owned `Bound<Slice>` values.
589    ///
590    /// Returns a tuple containing:
591    /// - the `OwnedBounds` that mirror the original bounds semantics (including
592    ///   inclusive/exclusive markers and unbounded endpoints), and
593    /// - a `bool` flag indicating whether the normalized range is logically
594    ///   empty (e.g., when the lower bound is greater than the upper bound).
595    ///
596    /// Callers can use the flag to detect empty ranges and skip further work
597    /// while still having access to the normalized bounds for non-empty cases.
598    fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
599        range: &R,
600    ) -> (OwnedBounds, bool) {
601        use Bound::{Excluded, Included, Unbounded};
602
603        let start = match range.start_bound() {
604            Included(key) => Included(Slice::from(key.as_ref())),
605            Excluded(key) => Excluded(Slice::from(key.as_ref())),
606            Unbounded => Unbounded,
607        };
608
609        let end = match range.end_bound() {
610            Included(key) => Included(Slice::from(key.as_ref())),
611            Excluded(key) => Excluded(Slice::from(key.as_ref())),
612            Unbounded => Unbounded,
613        };
614
615        let is_empty =
616            if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
617                lo.as_ref() > hi.as_ref()
618            } else {
619                false
620            };
621
622        (OwnedBounds { start, end }, is_empty)
623    }
624
625    /// Opens an LSM-tree in the given directory.
626    ///
627    /// Will recover previous state if the folder was previously
628    /// occupied by an LSM-tree, including the previous configuration.
629    /// If not, a new tree will be initialized with the given config.
630    ///
631    /// After recovering a previous state, use [`Tree::set_active_memtable`]
632    /// to fill the memtable with data from a write-ahead log for full durability.
633    ///
634    /// # Errors
635    ///
636    /// Returns error, if an IO error occurred.
637    pub(crate) fn open(config: Config) -> crate::Result<Self> {
638        use crate::file::MANIFEST_FILE;
639
640        log::debug!("Opening LSM-tree at {}", config.path.display());
641
642        // Check for old version
643        if config.path.join("version").try_exists()? {
644            return Err(crate::Error::InvalidVersion(FormatVersion::V1));
645        }
646
647        let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
648            Self::recover(config)
649        } else {
650            Self::create_new(config)
651        }?;
652
653        Ok(tree)
654    }
655
656    pub(crate) fn consume_writer(
657        &self,
658        writer: crate::table::Writer,
659    ) -> crate::Result<Option<Table>> {
660        let table_file_path = writer.path.clone();
661
662        let Some((_, checksum)) = writer.finish()? else {
663            return Ok(None);
664        };
665
666        log::debug!("Finalized table write at {}", table_file_path.display());
667
668        let pin_filter = self.config.filter_block_pinning_policy.get(0);
669        let pin_index = self.config.filter_block_pinning_policy.get(0);
670
671        let created_table = Table::recover(
672            table_file_path,
673            checksum,
674            self.id,
675            self.config.cache.clone(),
676            self.config.descriptor_table.clone(),
677            pin_filter,
678            pin_index,
679            #[cfg(feature = "metrics")]
680            self.metrics.clone(),
681        )?;
682
683        log::debug!("Flushed table to {:?}", created_table.path);
684
685        Ok(Some(created_table))
686    }
687
688    /// Returns `true` if there are some tables that are being compacted.
689    #[doc(hidden)]
690    #[must_use]
691    pub fn is_compacting(&self) -> bool {
692        !self
693            .compaction_state
694            .lock()
695            .expect("lock is poisoned")
696            .hidden_set()
697            .is_empty()
698    }
699
700    fn get_internal_entry_from_sealed_memtables(
701        super_version: &SuperVersion,
702        key: &[u8],
703        seqno: SeqNo,
704    ) -> Option<InternalValue> {
705        for (_, memtable) in super_version.sealed_memtables.iter().rev() {
706            if let Some(entry) = memtable.get(key, seqno) {
707                return Some(entry);
708            }
709        }
710
711        None
712    }
713
714    fn get_internal_entry_from_tables(
715        super_version: &SuperVersion,
716        key: &[u8],
717        seqno: SeqNo,
718    ) -> crate::Result<Option<InternalValue>> {
719        // NOTE: Create key hash for hash sharing
720        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
721        let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key);
722
723        for level in super_version.version.iter_levels() {
724            for run in level.iter() {
725                // NOTE: Based on benchmarking, binary search is only worth it with ~4 tables
726                if run.len() >= 4 {
727                    if let Some(table) = run.get_for_key(key) {
728                        if let Some(item) = table.get(key, seqno, key_hash)? {
729                            return Ok(ignore_tombstone_value(item));
730                        }
731                    }
732                } else {
733                    // NOTE: Fallback to linear search
734                    for table in run.iter() {
735                        if !table.is_key_in_key_range(key) {
736                            continue;
737                        }
738
739                        if let Some(item) = table.get(key, seqno, key_hash)? {
740                            return Ok(ignore_tombstone_value(item));
741                        }
742                    }
743                }
744            }
745        }
746
747        Ok(None)
748    }
749
750    fn inner_compact(
751        &self,
752        strategy: Arc<dyn CompactionStrategy>,
753        seqno_threshold: SeqNo,
754    ) -> crate::Result<()> {
755        use crate::compaction::worker::{do_compaction, Options};
756
757        let mut opts = Options::from_tree(self, strategy);
758        opts.eviction_seqno = seqno_threshold;
759
760        do_compaction(&opts)?;
761
762        log::debug!("Compaction run over");
763
764        Ok(())
765    }
766
767    #[doc(hidden)]
768    #[must_use]
769    pub fn create_iter(
770        &self,
771        seqno: SeqNo,
772        ephemeral: Option<Arc<Memtable>>,
773    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
774        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
775    }
776
777    #[doc(hidden)]
778    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
779        &'a self,
780        range: &'a R,
781        seqno: SeqNo,
782        ephemeral: Option<Arc<Memtable>>,
783    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
784        use crate::range::{IterState, TreeIter};
785        use std::ops::Bound::{self, Excluded, Included, Unbounded};
786
787        let lo: Bound<UserKey> = match range.start_bound() {
788            Included(x) => Included(x.as_ref().into()),
789            Excluded(x) => Excluded(x.as_ref().into()),
790            Unbounded => Unbounded,
791        };
792
793        let hi: Bound<UserKey> = match range.end_bound() {
794            Included(x) => Included(x.as_ref().into()),
795            Excluded(x) => Excluded(x.as_ref().into()),
796            Unbounded => Unbounded,
797        };
798
799        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
800
801        let super_version = self.super_version.write().expect("lock is poisoned");
802
803        let iter_state = {
804            let active = &super_version.active_memtable;
805            let sealed = &super_version.sealed_memtables;
806
807            IterState {
808                active: active.clone(),
809                sealed: sealed.iter().map(|(_, mt)| mt.clone()).collect(),
810                ephemeral,
811                version: super_version.version.clone(),
812            }
813        };
814
815        TreeIter::create_range(iter_state, bounds, seqno, &super_version.version)
816    }
817
818    #[doc(hidden)]
819    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
820        &self,
821        range: &'a R,
822        seqno: SeqNo,
823        ephemeral: Option<Arc<Memtable>>,
824    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
825        self.create_internal_range(range, seqno, ephemeral)
826            .map(|item| match item {
827                Ok(kv) => Ok((kv.key.user_key, kv.value)),
828                Err(e) => Err(e),
829            })
830    }
831
832    #[doc(hidden)]
833    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
834        &self,
835        prefix: K,
836        seqno: SeqNo,
837        ephemeral: Option<Arc<Memtable>>,
838    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
839        use crate::range::prefix_to_range;
840
841        let range = prefix_to_range(prefix.as_ref());
842        self.create_range(&range, seqno, ephemeral)
843    }
844
845    /// Adds an item to the active memtable.
846    ///
847    /// Returns the added item's size and new size of the memtable.
848    #[doc(hidden)]
849    #[must_use]
850    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
851        self.super_version
852            .read()
853            .expect("lock is poisoned")
854            .active_memtable
855            .insert(value)
856    }
857
858    /// Recovers previous state, by loading the level manifest, tables and blob files.
859    ///
860    /// # Errors
861    ///
862    /// Returns error, if an IO error occurred.
863    fn recover(mut config: Config) -> crate::Result<Self> {
864        use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
865        use inner::get_next_tree_id;
866
867        log::info!("Recovering LSM-tree at {}", config.path.display());
868
869        let manifest = {
870            let manifest_path = config.path.join(MANIFEST_FILE);
871            let reader = sfa::Reader::new(&manifest_path)?;
872            Manifest::decode_from(&manifest_path, &reader)?
873        };
874
875        if manifest.version != FormatVersion::V3 {
876            return Err(crate::Error::InvalidVersion(manifest.version));
877        }
878
879        // IMPORTANT: Restore persisted config
880        config.level_count = manifest.level_count;
881
882        let tree_id = get_next_tree_id();
883
884        #[cfg(feature = "metrics")]
885        let metrics = Arc::new(Metrics::default());
886
887        let version = Self::recover_levels(
888            &config.path,
889            tree_id,
890            &config.cache,
891            &config.descriptor_table,
892            #[cfg(feature = "metrics")]
893            &metrics,
894        )?;
895
896        let highest_table_id = version
897            .iter_tables()
898            .map(Table::id)
899            .max()
900            .unwrap_or_default();
901
902        let path = config.path.clone();
903
904        let inner = TreeInner {
905            id: tree_id,
906            table_id_counter: Arc::new(AtomicU64::new(highest_table_id + 1)),
907            blob_file_id_generator: SequenceNumberCounter::default(),
908            super_version: Arc::new(RwLock::new(SuperVersion {
909                active_memtable: Arc::default(),
910                sealed_memtables: Arc::default(),
911                version,
912            })),
913            stop_signal: StopSignal::default(),
914            config,
915            major_compaction_lock: RwLock::default(),
916            compaction_state: Arc::new(Mutex::new(CompactionState::new(path))),
917
918            #[cfg(feature = "metrics")]
919            metrics,
920        };
921
922        Ok(Self(Arc::new(inner)))
923    }
924
925    /// Creates a new LSM-tree in a directory.
926    fn create_new(config: Config) -> crate::Result<Self> {
927        use crate::file::{fsync_directory, MANIFEST_FILE, TABLES_FOLDER};
928        use std::fs::create_dir_all;
929
930        let path = config.path.clone();
931        log::trace!("Creating LSM-tree at {}", path.display());
932
933        create_dir_all(&path)?;
934
935        let manifest_path = path.join(MANIFEST_FILE);
936        assert!(!manifest_path.try_exists()?);
937
938        let table_folder_path = path.join(TABLES_FOLDER);
939        create_dir_all(&table_folder_path)?;
940
941        // Create manifest
942        {
943            let mut writer = sfa::Writer::new_at_path(manifest_path)?;
944
945            Manifest {
946                version: FormatVersion::V3,
947                level_count: config.level_count,
948                tree_type: if config.kv_separation_opts.is_some() {
949                    TreeType::Blob
950                } else {
951                    TreeType::Standard
952                },
953            }
954            .encode_into(&mut writer)?;
955
956            writer.finish()?;
957        }
958
959        // IMPORTANT: fsync folders on Unix
960        fsync_directory(&table_folder_path)?;
961        fsync_directory(&path)?;
962
963        let inner = TreeInner::create_new(config)?;
964        Ok(Self(Arc::new(inner)))
965    }
966
967    /// Recovers the level manifest, loading all tables from disk.
968    fn recover_levels<P: AsRef<Path>>(
969        tree_path: P,
970        tree_id: TreeId,
971        cache: &Arc<Cache>,
972        descriptor_table: &Arc<DescriptorTable>,
973        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
974    ) -> crate::Result<Version> {
975        use crate::{file::fsync_directory, file::TABLES_FOLDER, TableId};
976
977        let tree_path = tree_path.as_ref();
978
979        let recovery = recover(tree_path)?;
980
981        let table_map = {
982            let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum)> =
983                crate::HashMap::default();
984
985            for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
986                for run in table_ids {
987                    for &(table_id, checksum) in run {
988                        #[expect(
989                            clippy::expect_used,
990                            reason = "there are always less than 256 levels"
991                        )]
992                        result.insert(
993                            table_id,
994                            (
995                                level_idx
996                                    .try_into()
997                                    .expect("there are less than 256 levels"),
998                                checksum,
999                            ),
1000                        );
1001                    }
1002                }
1003            }
1004
1005            result
1006        };
1007
1008        let cnt = table_map.len();
1009
1010        log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1011
1012        let progress_mod = match cnt {
1013            _ if cnt <= 20 => 1,
1014            _ if cnt <= 100 => 10,
1015            _ => 100,
1016        };
1017
1018        let mut tables = vec![];
1019
1020        let table_base_folder = tree_path.join(TABLES_FOLDER);
1021
1022        if !table_base_folder.try_exists()? {
1023            std::fs::create_dir_all(&table_base_folder)?;
1024            fsync_directory(&table_base_folder)?;
1025        }
1026
1027        let mut orphaned_tables = vec![];
1028
1029        for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1030            let dirent = dirent?;
1031            let file_name = dirent.file_name();
1032
1033            // https://en.wikipedia.org/wiki/.DS_Store
1034            if file_name == ".DS_Store" {
1035                continue;
1036            }
1037
1038            // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
1039            if file_name.to_string_lossy().starts_with("._") {
1040                continue;
1041            }
1042
1043            let table_file_name = file_name.to_str().ok_or_else(|| {
1044                log::error!("invalid table file name {}", file_name.display());
1045                crate::Error::Unrecoverable
1046            })?;
1047
1048            let table_file_path = dirent.path();
1049            assert!(!table_file_path.is_dir());
1050
1051            log::debug!("Recovering table from {}", table_file_path.display());
1052
1053            let table_id = table_file_name.parse::<TableId>().map_err(|e| {
1054                log::error!("invalid table file name {table_file_name:?}: {e:?}");
1055                crate::Error::Unrecoverable
1056            })?;
1057
1058            if let Some(&(level_idx, checksum)) = table_map.get(&table_id) {
1059                let table = Table::recover(
1060                    table_file_path,
1061                    checksum,
1062                    tree_id,
1063                    cache.clone(),
1064                    descriptor_table.clone(),
1065                    level_idx <= 1, // TODO: look at configuration
1066                    level_idx <= 2, // TODO: look at configuration
1067                    #[cfg(feature = "metrics")]
1068                    metrics.clone(),
1069                )?;
1070
1071                log::debug!("Recovered table from {:?}", table.path);
1072
1073                tables.push(table);
1074
1075                if idx % progress_mod == 0 {
1076                    log::debug!("Recovered {idx}/{cnt} tables");
1077                }
1078            } else {
1079                orphaned_tables.push(table_file_path);
1080            }
1081        }
1082
1083        if tables.len() < cnt {
1084            log::error!(
1085                "Recovered less tables than expected: {:?}",
1086                table_map.keys(),
1087            );
1088            return Err(crate::Error::Unrecoverable);
1089        }
1090
1091        log::debug!("Successfully recovered {} tables", tables.len());
1092
1093        let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
1094            &tree_path.join(BLOBS_FOLDER),
1095            &recovery.blob_file_ids,
1096        )?;
1097
1098        let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1099
1100        // NOTE: Cleanup old versions
1101        // But only after we definitely recovered the latest version
1102        Self::cleanup_orphaned_version(tree_path, version.id())?;
1103
1104        for table_path in orphaned_tables {
1105            log::debug!("Deleting orphaned table {}", table_path.display());
1106            std::fs::remove_file(&table_path)?;
1107        }
1108
1109        for blob_file_path in orphaned_blob_files {
1110            log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
1111            std::fs::remove_file(&blob_file_path)?;
1112        }
1113
1114        Ok(version)
1115    }
1116
1117    fn cleanup_orphaned_version(path: &Path, latest_version_id: VersionId) -> crate::Result<()> {
1118        let version_str = format!("v{latest_version_id}");
1119
1120        for file in std::fs::read_dir(path)? {
1121            let dirent = file?;
1122
1123            if dirent.file_type()?.is_dir() {
1124                continue;
1125            }
1126
1127            let name = dirent.file_name();
1128
1129            if name.to_string_lossy().starts_with('v') && *name != *version_str {
1130                log::trace!("Cleanup orphaned version {}", name.display());
1131                std::fs::remove_file(dirent.path())?;
1132            }
1133        }
1134
1135        Ok(())
1136    }
1137}