lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod ingest;
6pub mod inner;
7mod sealed;
8
9use crate::{
10    blob_tree::FragmentationMap,
11    coding::{Decode, Encode},
12    compaction::{drop_range::OwnedBounds, state::CompactionState, CompactionStrategy},
13    config::Config,
14    file::BLOBS_FOLDER,
15    format_version::FormatVersion,
16    iter_guard::{IterGuard, IterGuardImpl},
17    manifest::Manifest,
18    memtable::Memtable,
19    segment::Segment,
20    slice::Slice,
21    tree::inner::SuperVersion,
22    value::InternalValue,
23    version::{recovery::recover_ids, Version, VersionId},
24    vlog::BlobFile,
25    AbstractTree, Cache, DescriptorTable, KvPair, SegmentId, SeqNo, SequenceNumberCounter,
26    TreeType, UserKey, UserValue, ValueType,
27};
28use inner::{MemtableId, TreeId, TreeInner};
29use std::{
30    io::Cursor,
31    ops::{Bound, RangeBounds},
32    path::Path,
33    sync::{atomic::AtomicU64, Arc, Mutex, RwLock},
34};
35
36#[cfg(feature = "metrics")]
37use crate::metrics::Metrics;
38
39pub struct Guard(crate::Result<(UserKey, UserValue)>);
40
41impl IterGuard for Guard {
42    fn key(self) -> crate::Result<UserKey> {
43        self.0.map(|(k, _)| k)
44    }
45
46    fn size(self) -> crate::Result<u32> {
47        // NOTE: We know LSM-tree values are 32 bits in length max
48        #[allow(clippy::cast_possible_truncation)]
49        self.into_inner().map(|(_, v)| v.len() as u32)
50    }
51
52    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
53        self.0
54    }
55}
56
57fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
58    if item.is_tombstone() {
59        None
60    } else {
61        Some(item)
62    }
63}
64
65/// A log-structured merge tree (LSM-tree/LSMT)
66#[derive(Clone)]
67pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
68
69impl std::ops::Deref for Tree {
70    type Target = TreeInner;
71
72    fn deref(&self) -> &Self::Target {
73        &self.0
74    }
75}
76
77impl AbstractTree for Tree {
78    fn next_table_id(&self) -> SegmentId {
79        self.0
80            .segment_id_counter
81            .load(std::sync::atomic::Ordering::Relaxed)
82    }
83
84    fn id(&self) -> TreeId {
85        self.id
86    }
87
88    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
89        #[allow(clippy::significant_drop_tightening)]
90        let version_lock = self.super_version.read().expect("lock is poisoned");
91
92        if let Some(entry) = version_lock.active_memtable.get(key, seqno) {
93            return Ok(ignore_tombstone_value(entry));
94        }
95
96        // Now look in sealed memtables
97        if let Some(entry) =
98            self.get_internal_entry_from_sealed_memtables(&version_lock, key, seqno)
99        {
100            return Ok(ignore_tombstone_value(entry));
101        }
102
103        // Now look in segments... this may involve disk I/O
104        self.get_internal_entry_from_segments(&version_lock, key, seqno)
105    }
106
107    fn current_version(&self) -> Version {
108        self.super_version.read().expect("poisoned").version.clone()
109    }
110
111    fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Segment>> {
112        log::debug!("Flushing active memtable");
113
114        let Some((segment_id, yanked_memtable)) = self.rotate_memtable() else {
115            return Ok(None);
116        };
117
118        let Some((segment, _)) =
119            self.flush_memtable(segment_id, &yanked_memtable, seqno_threshold)?
120        else {
121            return Ok(None);
122        };
123        self.register_segments(std::slice::from_ref(&segment), None, None, seqno_threshold)?;
124
125        Ok(Some(segment))
126    }
127
128    #[cfg(feature = "metrics")]
129    fn metrics(&self) -> &Arc<crate::Metrics> {
130        &self.0.metrics
131    }
132
133    fn version_free_list_len(&self) -> usize {
134        self.compaction_state
135            .lock()
136            .expect("lock is poisoned")
137            .version_free_list_len()
138    }
139
140    fn prefix<K: AsRef<[u8]>>(
141        &self,
142        prefix: K,
143        seqno: SeqNo,
144        index: Option<Arc<Memtable>>,
145    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
146        Box::new(
147            self.create_prefix(&prefix, seqno, index)
148                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
149        )
150    }
151
152    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
153        &self,
154        range: R,
155        seqno: SeqNo,
156        index: Option<Arc<Memtable>>,
157    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
158        Box::new(
159            self.create_range(&range, seqno, index)
160                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
161        )
162    }
163
164    // TODO: doctest
165    fn tombstone_count(&self) -> u64 {
166        self.current_version()
167            .iter_segments()
168            .map(Segment::tombstone_count)
169            .sum()
170    }
171
172    fn ingest(
173        &self,
174        iter: impl Iterator<Item = (UserKey, UserValue)>,
175        seqno_generator: &SequenceNumberCounter,
176        visible_seqno: &SequenceNumberCounter,
177    ) -> crate::Result<()> {
178        use crate::tree::ingest::Ingestion;
179        use std::time::Instant;
180
181        // // TODO: 3.0.0 ... hmmmm
182        // let global_lock = self.super_version.write().expect("lock is poisoned");
183
184        let seqno = seqno_generator.next();
185
186        // TODO: allow ingestion always, by flushing memtable
187        // assert!(
188        //     global_lock.active_memtable.is_empty(),
189        //     "can only perform bulk ingestion with empty memtable(s)",
190        // );
191        // assert!(
192        //     global_lock.sealed_memtables.len() == 0,
193        //     "can only perform bulk ingestion with empty memtable(s)",
194        // );
195
196        let mut writer = Ingestion::new(self)?.with_seqno(seqno);
197
198        let start = Instant::now();
199        let mut count = 0;
200        let mut last_key = None;
201
202        #[allow(clippy::explicit_counter_loop)]
203        for (key, value) in iter {
204            if let Some(last_key) = &last_key {
205                assert!(
206                    key > last_key,
207                    "next key in bulk ingest was not greater than last key",
208                );
209            }
210            last_key = Some(key.clone());
211
212            writer.write(key, value)?;
213
214            count += 1;
215        }
216
217        writer.finish()?;
218
219        visible_seqno.fetch_max(seqno + 1);
220
221        log::info!("Ingested {count} items in {:?}", start.elapsed());
222
223        Ok(())
224    }
225
226    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
227        let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
228
229        if is_empty {
230            return Ok(());
231        }
232
233        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
234
235        // IMPORTANT: Write lock so we can be the only compaction going on
236        let _lock = self
237            .0
238            .major_compaction_lock
239            .write()
240            .expect("lock is poisoned");
241
242        log::info!("Starting drop_range compaction");
243        self.inner_compact(strategy, 0)
244    }
245
246    #[doc(hidden)]
247    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
248        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
249
250        // IMPORTANT: Write lock so we can be the only compaction going on
251        let _lock = self
252            .0
253            .major_compaction_lock
254            .write()
255            .expect("lock is poisoned");
256
257        log::info!("Starting major compaction");
258        self.inner_compact(strategy, seqno_threshold)
259    }
260
261    fn l0_run_count(&self) -> usize {
262        self.current_version()
263            .level(0)
264            .map(|x| x.run_count())
265            .unwrap_or_default()
266    }
267
268    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
269        // NOTE: We know that values are u32 max
270        #[allow(clippy::cast_possible_truncation)]
271        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
272    }
273
274    fn filter_size(&self) -> usize {
275        self.current_version()
276            .iter_segments()
277            .map(Segment::filter_size)
278            .sum()
279    }
280
281    fn pinned_filter_size(&self) -> usize {
282        self.current_version()
283            .iter_segments()
284            .map(Segment::pinned_filter_size)
285            .sum()
286    }
287
288    fn pinned_block_index_size(&self) -> usize {
289        self.current_version()
290            .iter_segments()
291            .map(Segment::pinned_block_index_size)
292            .sum()
293    }
294
295    fn sealed_memtable_count(&self) -> usize {
296        self.super_version
297            .read()
298            .expect("lock is poisoned")
299            .sealed_memtables
300            .len()
301    }
302
303    /* fn verify(&self) -> crate::Result<usize> {
304        // NOTE: Lock memtable to prevent any tampering with disk segments
305        let _lock = self.lock_active_memtable();
306
307        let mut sum = 0;
308
309        let level_manifest = self.levels.read().expect("lock is poisoned");
310
311        for level in &level_manifest.levels {
312            for segment in &level.segments {
313                sum += segment.verify()?;
314            }
315        }
316
317        Ok(sum)
318    } */
319
320    fn flush_memtable(
321        &self,
322        segment_id: SegmentId,
323        memtable: &Arc<Memtable>,
324        seqno_threshold: SeqNo,
325    ) -> crate::Result<Option<(Segment, Option<BlobFile>)>> {
326        use crate::{compaction::stream::CompactionStream, file::SEGMENTS_FOLDER, segment::Writer};
327        use std::time::Instant;
328
329        let start = Instant::now();
330
331        let folder = self.config.path.join(SEGMENTS_FOLDER);
332        let segment_file_path = folder.join(segment_id.to_string());
333
334        let data_block_size = self.config.data_block_size_policy.get(0);
335        let index_block_size = self.config.index_block_size_policy.get(0);
336
337        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
338        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
339
340        let data_block_compression = self.config.data_block_compression_policy.get(0);
341        let index_block_compression = self.config.index_block_compression_policy.get(0);
342
343        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
344
345        log::debug!(
346            "Flushing segment to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, index_block_size={index_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
347            segment_file_path.display(),
348        );
349
350        let mut segment_writer = Writer::new(segment_file_path, segment_id)?
351            .use_data_block_restart_interval(data_block_restart_interval)
352            .use_index_block_restart_interval(index_block_restart_interval)
353            .use_data_block_compression(data_block_compression)
354            .use_index_block_compression(index_block_compression)
355            .use_data_block_size(data_block_size)
356            .use_index_block_size(index_block_size)
357            .use_data_block_hash_ratio(data_block_hash_ratio)
358            .use_bloom_policy({
359                use crate::config::FilterPolicyEntry::{Bloom, None};
360                use crate::segment::filter::BloomConstructionPolicy;
361
362                match self.config.filter_policy.get(0) {
363                    Bloom(policy) => policy,
364                    None => BloomConstructionPolicy::BitsPerKey(0.0),
365                }
366            });
367
368        let iter = memtable.iter().map(Ok);
369        let compaction_filter = CompactionStream::new(iter, seqno_threshold);
370
371        for item in compaction_filter {
372            segment_writer.write(item?)?;
373        }
374
375        let result = self.consume_writer(segment_writer)?;
376
377        log::debug!("Flushed memtable {segment_id:?} in {:?}", start.elapsed());
378
379        Ok(result.map(|segment| (segment, None)))
380    }
381
382    fn register_segments(
383        &self,
384        segments: &[Segment],
385        blob_files: Option<&[BlobFile]>,
386        frag_map: Option<FragmentationMap>,
387        seqno_threshold: SeqNo,
388    ) -> crate::Result<()> {
389        log::trace!(
390            "Registering {} segments, {} blob files",
391            segments.len(),
392            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
393        );
394
395        let mut compaction_state = self.compaction_state.lock().expect("lock is poisoned");
396        let mut super_version = self.super_version.write().expect("lock is poisoned");
397
398        compaction_state.upgrade_version(
399            &mut super_version,
400            |version| {
401                Ok(version.with_new_l0_run(
402                    segments,
403                    blob_files,
404                    frag_map.filter(|x| !x.is_empty()),
405                ))
406            },
407            seqno_threshold,
408        )?;
409
410        for segment in segments {
411            log::trace!("releasing sealed memtable {}", segment.id());
412
413            super_version.sealed_memtables =
414                Arc::new(super_version.sealed_memtables.remove(segment.id()));
415        }
416
417        Ok(())
418    }
419
420    fn clear_active_memtable(&self) {
421        self.super_version
422            .write()
423            .expect("lock is poisoned")
424            .active_memtable = Arc::new(Memtable::default());
425    }
426
427    fn set_active_memtable(&self, memtable: Memtable) {
428        let mut version_lock = self.super_version.write().expect("lock is poisoned");
429        version_lock.active_memtable = Arc::new(memtable);
430    }
431
432    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
433        let mut version_lock = self.super_version.write().expect("lock is poisoned");
434        version_lock.sealed_memtables = Arc::new(version_lock.sealed_memtables.add(id, memtable));
435    }
436
437    fn compact(
438        &self,
439        strategy: Arc<dyn CompactionStrategy>,
440        seqno_threshold: SeqNo,
441    ) -> crate::Result<()> {
442        // NOTE: Read lock major compaction lock
443        // That way, if a major compaction is running, we cannot proceed
444        // But in general, parallel (non-major) compactions can occur
445        let _lock = self
446            .0
447            .major_compaction_lock
448            .read()
449            .expect("lock is poisoned");
450
451        self.inner_compact(strategy, seqno_threshold)
452    }
453
454    fn get_next_segment_id(&self) -> SegmentId {
455        self.0.get_next_segment_id()
456    }
457
458    fn tree_config(&self) -> &Config {
459        &self.config
460    }
461
462    fn active_memtable_size(&self) -> u64 {
463        use std::sync::atomic::Ordering::Acquire;
464
465        self.super_version
466            .read()
467            .expect("lock is poisoned")
468            .active_memtable
469            .approximate_size
470            .load(Acquire)
471    }
472
473    fn tree_type(&self) -> crate::TreeType {
474        crate::TreeType::Standard
475    }
476
477    fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
478        let mut version_lock = self.super_version.write().expect("lock is poisoned");
479
480        if version_lock.active_memtable.is_empty() {
481            return None;
482        }
483
484        let yanked_memtable = std::mem::take(&mut version_lock.active_memtable);
485        let yanked_memtable = yanked_memtable;
486
487        let tmp_memtable_id = self.get_next_segment_id();
488
489        version_lock.sealed_memtables = Arc::new(
490            version_lock
491                .sealed_memtables
492                .add(tmp_memtable_id, yanked_memtable.clone()),
493        );
494
495        log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
496
497        Some((tmp_memtable_id, yanked_memtable))
498    }
499
500    fn segment_count(&self) -> usize {
501        self.current_version().segment_count()
502    }
503
504    fn level_segment_count(&self, idx: usize) -> Option<usize> {
505        self.current_version().level(idx).map(|x| x.segment_count())
506    }
507
508    #[allow(clippy::significant_drop_tightening)]
509    fn approximate_len(&self) -> usize {
510        let version = self.super_version.read().expect("lock is poisoned");
511
512        let segments_item_count = self
513            .current_version()
514            .iter_segments()
515            .map(|x| x.metadata.item_count)
516            .sum::<u64>();
517
518        let memtable_count = version.active_memtable.len() as u64;
519        let sealed_count = version
520            .sealed_memtables
521            .iter()
522            .map(|(_, mt)| mt.len())
523            .sum::<usize>() as u64;
524
525        (memtable_count + sealed_count + segments_item_count)
526            .try_into()
527            .expect("should not be too large")
528    }
529
530    fn disk_space(&self) -> u64 {
531        self.current_version()
532            .iter_levels()
533            .map(super::version::Level::size)
534            .sum()
535    }
536
537    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
538        let version = self.super_version.read().expect("lock is poisoned");
539
540        let active = version.active_memtable.get_highest_seqno();
541
542        let sealed = version
543            .sealed_memtables
544            .iter()
545            .map(|(_, table)| table.get_highest_seqno())
546            .max()
547            .flatten();
548
549        active.max(sealed)
550    }
551
552    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
553        self.current_version()
554            .iter_segments()
555            .map(Segment::get_highest_seqno)
556            .max()
557    }
558
559    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
560        Ok(self
561            .get_internal_entry(key.as_ref(), seqno)?
562            .map(|x| x.value))
563    }
564
565    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
566        &self,
567        key: K,
568        value: V,
569        seqno: SeqNo,
570    ) -> (u64, u64) {
571        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
572        self.append_entry(value)
573    }
574
575    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
576        let value = InternalValue::new_tombstone(key, seqno);
577        self.append_entry(value)
578    }
579
580    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
581        let value = InternalValue::new_weak_tombstone(key, seqno);
582        self.append_entry(value)
583    }
584}
585
586impl Tree {
587    /// Normalizes a user-provided range into owned `Bound<Slice>` values.
588    ///
589    /// Returns a tuple containing:
590    /// - the `OwnedBounds` that mirror the original bounds semantics (including
591    ///   inclusive/exclusive markers and unbounded endpoints), and
592    /// - a `bool` flag indicating whether the normalized range is logically
593    ///   empty (e.g., when the lower bound is greater than the upper bound).
594    ///
595    /// Callers can use the flag to detect empty ranges and skip further work
596    /// while still having access to the normalized bounds for non-empty cases.
597    fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
598        range: &R,
599    ) -> (OwnedBounds, bool) {
600        use Bound::{Excluded, Included, Unbounded};
601
602        let start = match range.start_bound() {
603            Included(key) => Included(Slice::from(key.as_ref())),
604            Excluded(key) => Excluded(Slice::from(key.as_ref())),
605            Unbounded => Unbounded,
606        };
607
608        let end = match range.end_bound() {
609            Included(key) => Included(Slice::from(key.as_ref())),
610            Excluded(key) => Excluded(Slice::from(key.as_ref())),
611            Unbounded => Unbounded,
612        };
613
614        let is_empty =
615            if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
616                lo.as_ref() > hi.as_ref()
617            } else {
618                false
619            };
620
621        (OwnedBounds { start, end }, is_empty)
622    }
623
624    /// Opens an LSM-tree in the given directory.
625    ///
626    /// Will recover previous state if the folder was previously
627    /// occupied by an LSM-tree, including the previous configuration.
628    /// If not, a new tree will be initialized with the given config.
629    ///
630    /// After recovering a previous state, use [`Tree::set_active_memtable`]
631    /// to fill the memtable with data from a write-ahead log for full durability.
632    ///
633    /// # Errors
634    ///
635    /// Returns error, if an IO error occurred.
636    pub(crate) fn open(config: Config) -> crate::Result<Self> {
637        use crate::file::MANIFEST_FILE;
638
639        log::debug!("Opening LSM-tree at {}", config.path.display());
640
641        // Check for old version
642        if config.path.join("version").try_exists()? {
643            return Err(crate::Error::InvalidVersion(FormatVersion::V1));
644        }
645
646        let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
647            Self::recover(config)
648        } else {
649            Self::create_new(config)
650        }?;
651
652        Ok(tree)
653    }
654
655    pub(crate) fn consume_writer(
656        &self,
657        writer: crate::segment::Writer,
658    ) -> crate::Result<Option<Segment>> {
659        let segment_file_path = writer.path.clone();
660
661        let Some(_) = writer.finish()? else {
662            return Ok(None);
663        };
664
665        log::debug!("Finalized segment write at {}", segment_file_path.display());
666
667        let pin_filter = self.config.filter_block_pinning_policy.get(0);
668        let pin_index = self.config.filter_block_pinning_policy.get(0);
669
670        let created_segment = Segment::recover(
671            segment_file_path,
672            self.id,
673            self.config.cache.clone(),
674            self.config.descriptor_table.clone(),
675            pin_filter,
676            pin_index,
677            #[cfg(feature = "metrics")]
678            self.metrics.clone(),
679        )?;
680
681        log::debug!("Flushed segment to {:?}", created_segment.path);
682
683        Ok(Some(created_segment))
684    }
685
686    /// Returns `true` if there are some segments that are being compacted.
687    #[doc(hidden)]
688    #[must_use]
689    pub fn is_compacting(&self) -> bool {
690        !self
691            .compaction_state
692            .lock()
693            .expect("lock is poisoned")
694            .hidden_set()
695            .is_empty()
696    }
697
698    fn get_internal_entry_from_sealed_memtables(
699        &self,
700        super_version: &SuperVersion,
701        key: &[u8],
702        seqno: SeqNo,
703    ) -> Option<InternalValue> {
704        for (_, memtable) in super_version.sealed_memtables.iter().rev() {
705            if let Some(entry) = memtable.get(key, seqno) {
706                return Some(entry);
707            }
708        }
709
710        None
711    }
712
713    fn get_internal_entry_from_segments(
714        &self,
715        super_version: &SuperVersion,
716        key: &[u8],
717        seqno: SeqNo,
718    ) -> crate::Result<Option<InternalValue>> {
719        // NOTE: Create key hash for hash sharing
720        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
721        let key_hash = crate::segment::filter::standard_bloom::Builder::get_hash(key);
722
723        for level in super_version.version.iter_levels() {
724            for run in level.iter() {
725                // NOTE: Based on benchmarking, binary search is only worth it with ~4 segments
726                if run.len() >= 4 {
727                    if let Some(segment) = run.get_for_key(key) {
728                        if let Some(item) = segment.get(key, seqno, key_hash)? {
729                            return Ok(ignore_tombstone_value(item));
730                        }
731                    }
732                } else {
733                    // NOTE: Fallback to linear search
734                    for segment in run.iter() {
735                        if !segment.is_key_in_key_range(key) {
736                            continue;
737                        }
738
739                        if let Some(item) = segment.get(key, seqno, key_hash)? {
740                            return Ok(ignore_tombstone_value(item));
741                        }
742                    }
743                }
744            }
745        }
746
747        Ok(None)
748    }
749
750    fn inner_compact(
751        &self,
752        strategy: Arc<dyn CompactionStrategy>,
753        seqno_threshold: SeqNo,
754    ) -> crate::Result<()> {
755        use crate::compaction::worker::{do_compaction, Options};
756
757        let mut opts = Options::from_tree(self, strategy);
758        opts.eviction_seqno = seqno_threshold;
759
760        do_compaction(&opts)?;
761
762        log::debug!("Compaction run over");
763
764        Ok(())
765    }
766
767    #[doc(hidden)]
768    #[must_use]
769    pub fn create_iter(
770        &self,
771        seqno: SeqNo,
772        ephemeral: Option<Arc<Memtable>>,
773    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
774        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
775    }
776
777    #[doc(hidden)]
778    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
779        &'a self,
780        range: &'a R,
781        seqno: SeqNo,
782        ephemeral: Option<Arc<Memtable>>,
783    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
784        use crate::range::{IterState, TreeIter};
785        use std::ops::Bound::{self, Excluded, Included, Unbounded};
786
787        let lo: Bound<UserKey> = match range.start_bound() {
788            Included(x) => Included(x.as_ref().into()),
789            Excluded(x) => Excluded(x.as_ref().into()),
790            Unbounded => Unbounded,
791        };
792
793        let hi: Bound<UserKey> = match range.end_bound() {
794            Included(x) => Included(x.as_ref().into()),
795            Excluded(x) => Excluded(x.as_ref().into()),
796            Unbounded => Unbounded,
797        };
798
799        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
800
801        let super_version = self.super_version.write().expect("lock is poisoned");
802
803        let iter_state = {
804            let active = &super_version.active_memtable;
805            let sealed = &super_version.sealed_memtables;
806
807            IterState {
808                active: active.clone(),
809                sealed: sealed.iter().map(|(_, mt)| mt.clone()).collect(),
810                ephemeral,
811                version: super_version.version.clone(),
812            }
813        };
814
815        TreeIter::create_range(iter_state, bounds, seqno, &super_version.version)
816    }
817
818    #[doc(hidden)]
819    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
820        &self,
821        range: &'a R,
822        seqno: SeqNo,
823        ephemeral: Option<Arc<Memtable>>,
824    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
825        self.create_internal_range(range, seqno, ephemeral)
826            .map(|item| match item {
827                Ok(kv) => Ok((kv.key.user_key, kv.value)),
828                Err(e) => Err(e),
829            })
830    }
831
832    #[doc(hidden)]
833    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
834        &self,
835        prefix: K,
836        seqno: SeqNo,
837        ephemeral: Option<Arc<Memtable>>,
838    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
839        use crate::range::prefix_to_range;
840
841        let range = prefix_to_range(prefix.as_ref());
842        self.create_range(&range, seqno, ephemeral)
843    }
844
845    /// Adds an item to the active memtable.
846    ///
847    /// Returns the added item's size and new size of the memtable.
848    #[doc(hidden)]
849    #[must_use]
850    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
851        self.super_version
852            .read()
853            .expect("lock is poisoned")
854            .active_memtable
855            .insert(value)
856    }
857
858    /// Recovers previous state, by loading the level manifest and segments.
859    ///
860    /// # Errors
861    ///
862    /// Returns error, if an IO error occurred.
863    fn recover(mut config: Config) -> crate::Result<Self> {
864        use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
865        use inner::get_next_tree_id;
866
867        log::info!("Recovering LSM-tree at {}", config.path.display());
868
869        let bytes = std::fs::read(config.path.join(MANIFEST_FILE))?;
870        let mut bytes = Cursor::new(bytes);
871        let manifest = Manifest::decode_from(&mut bytes)?;
872
873        if manifest.version != FormatVersion::V3 {
874            return Err(crate::Error::InvalidVersion(manifest.version));
875        }
876
877        // IMPORTANT: Restore persisted config
878        config.level_count = manifest.level_count;
879
880        let tree_id = get_next_tree_id();
881
882        #[cfg(feature = "metrics")]
883        let metrics = Arc::new(Metrics::default());
884
885        let version = Self::recover_levels(
886            &config.path,
887            tree_id,
888            &config.cache,
889            &config.descriptor_table,
890            #[cfg(feature = "metrics")]
891            &metrics,
892        )?;
893
894        let highest_segment_id = version
895            .iter_segments()
896            .map(Segment::id)
897            .max()
898            .unwrap_or_default();
899
900        let path = config.path.clone();
901
902        let inner = TreeInner {
903            id: tree_id,
904            segment_id_counter: Arc::new(AtomicU64::new(highest_segment_id + 1)),
905            blob_file_id_generator: SequenceNumberCounter::default(),
906            super_version: Arc::new(RwLock::new(SuperVersion {
907                active_memtable: Arc::default(),
908                sealed_memtables: Arc::default(),
909                version,
910            })),
911            stop_signal: StopSignal::default(),
912            config,
913            major_compaction_lock: RwLock::default(),
914            compaction_state: Arc::new(Mutex::new(CompactionState::new(path))),
915
916            #[cfg(feature = "metrics")]
917            metrics,
918        };
919
920        Ok(Self(Arc::new(inner)))
921    }
922
923    /// Creates a new LSM-tree in a directory.
924    fn create_new(config: Config) -> crate::Result<Self> {
925        use crate::file::{fsync_directory, MANIFEST_FILE, SEGMENTS_FOLDER};
926        use std::fs::{create_dir_all, File};
927
928        let path = config.path.clone();
929        log::trace!("Creating LSM-tree at {}", path.display());
930
931        create_dir_all(&path)?;
932
933        let manifest_path = path.join(MANIFEST_FILE);
934        assert!(!manifest_path.try_exists()?);
935
936        let segment_folder_path = path.join(SEGMENTS_FOLDER);
937        create_dir_all(&segment_folder_path)?;
938
939        // NOTE: Lastly, fsync version marker, which contains the version
940        // -> the LSM is fully initialized
941        let mut file = File::create_new(manifest_path)?;
942        Manifest {
943            version: FormatVersion::V3,
944            level_count: config.level_count,
945            tree_type: if config.kv_separation_opts.is_some() {
946                TreeType::Blob
947            } else {
948                TreeType::Standard
949            },
950        }
951        .encode_into(&mut file)?;
952        file.sync_all()?;
953
954        // IMPORTANT: fsync folders on Unix
955        fsync_directory(&segment_folder_path)?;
956        fsync_directory(&path)?;
957
958        let inner = TreeInner::create_new(config)?;
959        Ok(Self(Arc::new(inner)))
960    }
961
962    /// Recovers the level manifest, loading all tables from disk.
963    fn recover_levels<P: AsRef<Path>>(
964        tree_path: P,
965        tree_id: TreeId,
966        cache: &Arc<Cache>,
967        descriptor_table: &Arc<DescriptorTable>,
968        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
969    ) -> crate::Result<Version> {
970        use crate::{file::fsync_directory, file::SEGMENTS_FOLDER, SegmentId};
971
972        let tree_path = tree_path.as_ref();
973
974        let recovery = recover_ids(tree_path)?;
975
976        let table_id_map = {
977            let mut result: crate::HashMap<SegmentId, u8 /* Level index */> =
978                crate::HashMap::default();
979
980            for (level_idx, table_ids) in recovery.segment_ids.iter().enumerate() {
981                for run in table_ids {
982                    for table_id in run {
983                        // NOTE: We know there are always less than 256 levels
984                        #[allow(clippy::expect_used)]
985                        result.insert(
986                            *table_id,
987                            level_idx
988                                .try_into()
989                                .expect("there are less than 256 levels"),
990                        );
991                    }
992                }
993            }
994
995            result
996        };
997
998        let cnt = table_id_map.len();
999
1000        log::debug!("Recovering {cnt} tables from {}", tree_path.display());
1001
1002        let progress_mod = match cnt {
1003            _ if cnt <= 20 => 1,
1004            _ if cnt <= 100 => 10,
1005            _ => 100,
1006        };
1007
1008        let mut tables = vec![];
1009
1010        let table_base_folder = tree_path.join(SEGMENTS_FOLDER);
1011
1012        if !table_base_folder.try_exists()? {
1013            std::fs::create_dir_all(&table_base_folder)?;
1014            fsync_directory(&table_base_folder)?;
1015        }
1016
1017        let mut orphaned_tables = vec![];
1018
1019        for (idx, dirent) in std::fs::read_dir(&table_base_folder)?.enumerate() {
1020            let dirent = dirent?;
1021            let file_name = dirent.file_name();
1022
1023            // https://en.wikipedia.org/wiki/.DS_Store
1024            if file_name == ".DS_Store" {
1025                continue;
1026            }
1027
1028            // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
1029            if file_name.to_string_lossy().starts_with("._") {
1030                continue;
1031            }
1032
1033            let table_file_name = file_name.to_str().ok_or_else(|| {
1034                log::error!("invalid table file name {}", file_name.display());
1035                crate::Error::Unrecoverable
1036            })?;
1037
1038            let table_file_path = dirent.path();
1039            assert!(!table_file_path.is_dir());
1040
1041            log::debug!("Recovering table from {}", table_file_path.display());
1042
1043            let table_id = table_file_name.parse::<SegmentId>().map_err(|e| {
1044                log::error!("invalid table file name {table_file_name:?}: {e:?}");
1045                crate::Error::Unrecoverable
1046            })?;
1047
1048            if let Some(&level_idx) = table_id_map.get(&table_id) {
1049                let table = Segment::recover(
1050                    table_file_path,
1051                    tree_id,
1052                    cache.clone(),
1053                    descriptor_table.clone(),
1054                    level_idx <= 1, // TODO: look at configuration
1055                    level_idx <= 2, // TODO: look at configuration
1056                    #[cfg(feature = "metrics")]
1057                    metrics.clone(),
1058                )?;
1059
1060                log::debug!("Recovered table from {:?}", table.path);
1061
1062                tables.push(table);
1063
1064                if idx % progress_mod == 0 {
1065                    log::debug!("Recovered {idx}/{cnt} tables");
1066                }
1067            } else {
1068                orphaned_tables.push(table_file_path);
1069            }
1070        }
1071
1072        if tables.len() < cnt {
1073            log::error!(
1074                "Recovered less tables than expected: {:?}",
1075                table_id_map.keys(),
1076            );
1077            return Err(crate::Error::Unrecoverable);
1078        }
1079
1080        log::debug!("Successfully recovered {} tables", tables.len());
1081
1082        let blob_files = crate::vlog::recover_blob_files(
1083            &tree_path.join(BLOBS_FOLDER),
1084            &recovery.blob_file_ids,
1085        )?;
1086
1087        let version = Version::from_recovery(recovery, &tables, &blob_files)?;
1088
1089        // NOTE: Cleanup old versions
1090        // But only after we definitely recovered the latest version
1091        Self::cleanup_orphaned_version(tree_path, version.id())?;
1092
1093        for table_path in orphaned_tables {
1094            log::debug!("Deleting orphaned table {}", table_path.display());
1095            std::fs::remove_file(&table_path)?;
1096        }
1097
1098        // TODO: remove orphaned blob files as well -> unit test
1099
1100        Ok(version)
1101    }
1102
1103    fn cleanup_orphaned_version(path: &Path, latest_version_id: VersionId) -> crate::Result<()> {
1104        let version_str = format!("v{latest_version_id}");
1105
1106        for file in std::fs::read_dir(path)? {
1107            let dirent = file?;
1108
1109            if dirent.file_type()?.is_dir() {
1110                continue;
1111            }
1112
1113            let name = dirent.file_name();
1114
1115            if name.to_string_lossy().starts_with('v') && *name != *version_str {
1116                log::trace!("Cleanup orphaned version {}", name.display());
1117                std::fs::remove_file(dirent.path())?;
1118            }
1119        }
1120
1121        Ok(())
1122    }
1123}