lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod inner;
6
7use crate::{
8    coding::{Decode, Encode},
9    compaction::CompactionStrategy,
10    config::Config,
11    descriptor_table::FileDescriptorTable,
12    level_manifest::LevelManifest,
13    manifest::Manifest,
14    memtable::Memtable,
15    segment::{
16        block_index::{full_index::FullBlockIndex, BlockIndexImpl},
17        meta::TableType,
18        Segment, SegmentInner,
19    },
20    value::InternalValue,
21    version::Version,
22    AbstractTree, BlockCache, KvPair, SegmentId, SeqNo, Snapshot, UserKey, UserValue, ValueType,
23};
24use inner::{MemtableId, SealedMemtables, TreeId, TreeInner};
25use std::{
26    io::Cursor,
27    ops::RangeBounds,
28    path::Path,
29    sync::{
30        atomic::{AtomicBool, AtomicU64},
31        Arc, RwLock, RwLockReadGuard, RwLockWriteGuard,
32    },
33};
34
35fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
36    if item.is_tombstone() {
37        None
38    } else {
39        Some(item)
40    }
41}
42
43/// A log-structured merge tree (LSM-tree/LSMT)
44#[derive(Clone)]
45pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
46
47impl std::ops::Deref for Tree {
48    type Target = TreeInner;
49
50    fn deref(&self) -> &Self::Target {
51        &self.0
52    }
53}
54
55impl AbstractTree for Tree {
56    #[doc(hidden)]
57    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
58        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
59
60        // IMPORTANT: Write lock so we can be the only compaction going on
61        let _lock = self
62            .0
63            .major_compaction_lock
64            .write()
65            .expect("lock is poisoned");
66
67        log::info!("Starting major compaction");
68        self.inner_compact(strategy, seqno_threshold)
69    }
70
71    fn l0_run_count(&self) -> usize {
72        let lock = self.levels.read().expect("lock is poisoned");
73
74        let first_level = lock
75            .levels
76            .first()
77            .expect("first level should always exist");
78
79        if first_level.is_disjoint {
80            1
81        } else {
82            // TODO: in the future, there will be a Vec<Run> per Level
83            // TODO: so this will need to change,
84            // TODO: but then we also don't need the manual is_disjoint check
85            first_level.segments.len()
86        }
87    }
88
89    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
90        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
91    }
92
93    fn bloom_filter_size(&self) -> usize {
94        self.levels
95            .read()
96            .expect("lock is poisoned")
97            .iter()
98            .map(super::segment::Segment::bloom_filter_size)
99            .sum()
100    }
101
102    fn sealed_memtable_count(&self) -> usize {
103        self.sealed_memtables
104            .read()
105            .expect("lock is poisoned")
106            .len()
107    }
108
109    fn verify(&self) -> crate::Result<usize> {
110        // NOTE: Lock memtable to prevent any tampering with disk segments
111        let _lock = self.lock_active_memtable();
112
113        let mut sum = 0;
114
115        let level_manifest = self.levels.read().expect("lock is poisoned");
116
117        for level in &level_manifest.levels {
118            for segment in &level.segments {
119                sum += segment.verify()?;
120            }
121        }
122
123        Ok(sum)
124    }
125
126    fn keys(
127        &self,
128        seqno: Option<SeqNo>,
129        index: Option<Arc<Memtable>>,
130    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
131        Box::new(self.create_iter(seqno, index).map(|x| x.map(|(k, _)| k)))
132    }
133
134    fn values(
135        &self,
136        seqno: Option<SeqNo>,
137        index: Option<Arc<Memtable>>,
138    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
139        Box::new(self.create_iter(seqno, index).map(|x| x.map(|(_, v)| v)))
140    }
141
142    fn flush_memtable(
143        &self,
144        segment_id: SegmentId,
145        memtable: &Arc<Memtable>,
146        seqno_threshold: SeqNo,
147    ) -> crate::Result<Option<Segment>> {
148        use crate::{
149            compaction::stream::CompactionStream,
150            file::SEGMENTS_FOLDER,
151            segment::writer::{Options, Writer},
152        };
153
154        let start = std::time::Instant::now();
155
156        let folder = self.config.path.join(SEGMENTS_FOLDER);
157        log::debug!("writing segment to {folder:?}");
158
159        let mut segment_writer = Writer::new(Options {
160            segment_id,
161            folder,
162            data_block_size: self.config.data_block_size,
163            index_block_size: self.config.index_block_size,
164        })?
165        .use_compression(self.config.compression);
166
167        {
168            use crate::segment::writer::BloomConstructionPolicy;
169
170            if self.config.bloom_bits_per_key >= 0 {
171                segment_writer =
172                    segment_writer.use_bloom_policy(BloomConstructionPolicy::FpRate(0.00001));
173            } else {
174                segment_writer =
175                    segment_writer.use_bloom_policy(BloomConstructionPolicy::BitsPerKey(0));
176            }
177        }
178
179        let iter = memtable.iter().map(Ok);
180        let compaction_filter = CompactionStream::new(iter, seqno_threshold);
181
182        for item in compaction_filter {
183            segment_writer.write(item?)?;
184        }
185
186        let result = self.consume_writer(segment_id, segment_writer)?;
187
188        log::debug!("Flushed memtable {segment_id:?} in {:?}", start.elapsed());
189
190        Ok(result)
191    }
192
193    fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
194        // NOTE: Mind lock order L -> M -> S
195        log::trace!("register: Acquiring levels manifest write lock");
196        let mut original_levels = self.levels.write().expect("lock is poisoned");
197        log::trace!("register: Acquired levels manifest write lock");
198
199        // NOTE: Mind lock order L -> M -> S
200        log::trace!("register: Acquiring sealed memtables write lock");
201        let mut sealed_memtables = self.sealed_memtables.write().expect("lock is poisoned");
202        log::trace!("register: Acquired sealed memtables write lock");
203
204        original_levels.atomic_swap(|recipe| {
205            for segment in segments.iter().cloned() {
206                recipe
207                    .first_mut()
208                    .expect("first level should exist")
209                    .insert(segment);
210            }
211        })?;
212
213        // eprintln!("{original_levels}");
214
215        for segment in segments {
216            log::trace!("releasing sealed memtable {}", segment.id());
217            sealed_memtables.remove(segment.id());
218        }
219
220        Ok(())
221    }
222
223    fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Arc<Memtable>> {
224        self.active_memtable.write().expect("lock is poisoned")
225    }
226
227    fn clear_active_memtable(&self) {
228        *self.active_memtable.write().expect("lock is poisoned") = Arc::new(Memtable::default());
229    }
230
231    fn set_active_memtable(&self, memtable: Memtable) {
232        let mut memtable_lock = self.active_memtable.write().expect("lock is poisoned");
233        *memtable_lock = Arc::new(memtable);
234    }
235
236    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
237        let mut memtable_lock = self.sealed_memtables.write().expect("lock is poisoned");
238        memtable_lock.add(id, memtable);
239    }
240
241    fn compact(
242        &self,
243        strategy: Arc<dyn CompactionStrategy>,
244        seqno_threshold: SeqNo,
245    ) -> crate::Result<()> {
246        // NOTE: Read lock major compaction lock
247        // That way, if a major compaction is running, we cannot proceed
248        // But in general, parallel (non-major) compactions can occur
249        let _lock = self
250            .0
251            .major_compaction_lock
252            .read()
253            .expect("lock is poisoned");
254
255        self.inner_compact(strategy, seqno_threshold)
256    }
257
258    fn get_next_segment_id(&self) -> SegmentId {
259        self.0.get_next_segment_id()
260    }
261
262    fn tree_config(&self) -> &Config {
263        &self.config
264    }
265
266    fn active_memtable_size(&self) -> u32 {
267        use std::sync::atomic::Ordering::Acquire;
268
269        self.active_memtable
270            .read()
271            .expect("lock is poisoned")
272            .approximate_size
273            .load(Acquire)
274    }
275
276    fn tree_type(&self) -> crate::TreeType {
277        crate::TreeType::Standard
278    }
279
280    fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
281        log::trace!("rotate: acquiring active memtable write lock");
282        let mut active_memtable = self.lock_active_memtable();
283
284        log::trace!("rotate: acquiring sealed memtables write lock");
285        let mut sealed_memtables = self.lock_sealed_memtables();
286
287        if active_memtable.is_empty() {
288            return None;
289        }
290
291        let yanked_memtable = std::mem::take(&mut *active_memtable);
292        let yanked_memtable = yanked_memtable;
293
294        let tmp_memtable_id = self.get_next_segment_id();
295        sealed_memtables.add(tmp_memtable_id, yanked_memtable.clone());
296
297        log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
298
299        Some((tmp_memtable_id, yanked_memtable))
300    }
301
302    fn segment_count(&self) -> usize {
303        self.levels.read().expect("lock is poisoned").len()
304    }
305
306    fn level_segment_count(&self, idx: usize) -> Option<usize> {
307        self.levels
308            .read()
309            .expect("lock is poisoned")
310            .levels
311            .get(idx)
312            .map(|x| x.len())
313    }
314
315    #[allow(clippy::significant_drop_tightening)]
316    fn approximate_len(&self) -> usize {
317        // NOTE: Mind lock order L -> M -> S
318        let levels = self.levels.read().expect("lock is poisoned");
319        let memtable = self.active_memtable.read().expect("lock is poisoned");
320        let sealed = self.sealed_memtables.read().expect("lock is poisoned");
321
322        let segments_item_count = levels.iter().map(|x| x.metadata.item_count).sum::<u64>();
323        let memtable_count = memtable.len() as u64;
324        let sealed_count = sealed.iter().map(|(_, mt)| mt.len()).sum::<usize>() as u64;
325
326        (memtable_count + sealed_count + segments_item_count)
327            .try_into()
328            .expect("should not be too large")
329    }
330
331    fn disk_space(&self) -> u64 {
332        let levels = self.levels.read().expect("lock is poisoned");
333        levels.iter().map(|x| x.metadata.file_size).sum()
334    }
335
336    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
337        let active = self
338            .active_memtable
339            .read()
340            .expect("lock is poisoned")
341            .get_highest_seqno();
342
343        let sealed = self
344            .sealed_memtables
345            .read()
346            .expect("Lock is poisoned")
347            .iter()
348            .map(|(_, table)| table.get_highest_seqno())
349            .max()
350            .flatten();
351
352        active.max(sealed)
353    }
354
355    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
356        let levels = self.levels.read().expect("lock is poisoned");
357        levels
358            .iter()
359            .map(super::segment::Segment::get_highest_seqno)
360            .max()
361    }
362
363    fn snapshot(&self, seqno: SeqNo) -> Snapshot {
364        use crate::AnyTree::Standard;
365
366        Snapshot::new(Standard(self.clone()), seqno)
367    }
368
369    fn get<K: AsRef<[u8]>>(
370        &self,
371        key: K,
372        seqno: Option<SeqNo>,
373    ) -> crate::Result<Option<UserValue>> {
374        Ok(self
375            .get_internal_entry(key.as_ref(), seqno)?
376            .map(|x| x.value))
377    }
378
379    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
380        &self,
381        range: R,
382        seqno: Option<SeqNo>,
383        index: Option<Arc<Memtable>>,
384    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
385        Box::new(self.create_range(&range, seqno, index))
386    }
387
388    fn prefix<K: AsRef<[u8]>>(
389        &self,
390        prefix: K,
391        seqno: Option<SeqNo>,
392        index: Option<Arc<Memtable>>,
393    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
394        Box::new(self.create_prefix(prefix, seqno, index))
395    }
396
397    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
398        &self,
399        key: K,
400        value: V,
401        seqno: SeqNo,
402    ) -> (u32, u32) {
403        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
404        self.append_entry(value)
405    }
406
407    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
408        let value = InternalValue::new_tombstone(key, seqno);
409        self.append_entry(value)
410    }
411
412    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
413        let value = InternalValue::new_weak_tombstone(key, seqno);
414        self.append_entry(value)
415    }
416}
417
418impl Tree {
419    /// Opens an LSM-tree in the given directory.
420    ///
421    /// Will recover previous state if the folder was previously
422    /// occupied by an LSM-tree, including the previous configuration.
423    /// If not, a new tree will be initialized with the given config.
424    ///
425    /// After recovering a previous state, use [`Tree::set_active_memtable`]
426    /// to fill the memtable with data from a write-ahead log for full durability.
427    ///
428    /// # Errors
429    ///
430    /// Returns error, if an IO error occurred.
431    pub(crate) fn open(config: Config) -> crate::Result<Self> {
432        use crate::file::MANIFEST_FILE;
433
434        log::debug!("Opening LSM-tree at {:?}", config.path);
435
436        // Check for old version
437        if config.path.join("version").try_exists()? {
438            return Err(crate::Error::InvalidVersion(Version::V1));
439        }
440
441        let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
442            Self::recover(config)
443        } else {
444            Self::create_new(config)
445        }?;
446
447        Ok(tree)
448    }
449
450    pub(crate) fn read_lock_active_memtable(&self) -> RwLockReadGuard<'_, Arc<Memtable>> {
451        self.active_memtable.read().expect("lock is poisoned")
452    }
453
454    pub(crate) fn consume_writer(
455        &self,
456        segment_id: SegmentId,
457        mut writer: crate::segment::writer::Writer,
458    ) -> crate::Result<Option<Segment>> {
459        let segment_folder = writer.opts.folder.clone();
460        let segment_file_path = segment_folder.join(segment_id.to_string());
461
462        let Some(trailer) = writer.finish()? else {
463            return Ok(None);
464        };
465
466        log::debug!("Finalized segment write at {segment_folder:?}");
467
468        let block_index =
469            FullBlockIndex::from_file(&segment_file_path, &trailer.metadata, &trailer.offsets)?;
470        let block_index = Arc::new(BlockIndexImpl::Full(block_index));
471
472        let created_segment: Segment = SegmentInner {
473            path: segment_file_path.to_path_buf(),
474
475            tree_id: self.id,
476
477            metadata: trailer.metadata,
478            offsets: trailer.offsets,
479
480            descriptor_table: self.config.descriptor_table.clone(),
481            block_index,
482            block_cache: self.config.block_cache.clone(),
483
484            bloom_filter: Segment::load_bloom(&segment_file_path, trailer.offsets.bloom_ptr)?,
485
486            is_deleted: AtomicBool::default(),
487        }
488        .into();
489
490        self.config
491            .descriptor_table
492            .insert(segment_file_path, created_segment.global_id());
493
494        log::debug!("Flushed segment to {segment_folder:?}");
495
496        Ok(Some(created_segment))
497    }
498
499    /// Synchronously flushes the active memtable to a disk segment.
500    ///
501    /// The function may not return a result, if, during concurrent workloads, the memtable
502    /// ends up being empty before the flush thread is set up.
503    ///
504    /// The result will contain the disk segment's path, relative to the tree's base path.
505    ///
506    /// # Errors
507    ///
508    /// Will return `Err` if an IO error occurs.
509    #[doc(hidden)]
510    pub fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Segment>> {
511        log::debug!("Flushing active memtable");
512
513        let Some((segment_id, yanked_memtable)) = self.rotate_memtable() else {
514            return Ok(None);
515        };
516
517        let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, seqno_threshold)?
518        else {
519            return Ok(None);
520        };
521        self.register_segments(&[segment.clone()])?;
522
523        Ok(Some(segment))
524    }
525
526    /// Returns `true` if there are some segments that are being compacted.
527    #[doc(hidden)]
528    #[must_use]
529    pub fn is_compacting(&self) -> bool {
530        let levels = self.levels.read().expect("lock is poisoned");
531        levels.is_compacting()
532    }
533
534    /// Write-locks the sealed memtables for exclusive access
535    fn lock_sealed_memtables(&self) -> RwLockWriteGuard<'_, SealedMemtables> {
536        self.sealed_memtables.write().expect("lock is poisoned")
537    }
538
539    // TODO: maybe not needed anyway
540    /// Used for [`BlobTree`] lookup
541    pub(crate) fn get_internal_entry_with_memtable(
542        &self,
543        memtable_lock: &Memtable,
544        key: &[u8],
545        seqno: Option<SeqNo>,
546    ) -> crate::Result<Option<InternalValue>> {
547        if let Some(entry) = memtable_lock.get(key, seqno) {
548            return Ok(ignore_tombstone_value(entry));
549        };
550
551        // Now look in sealed memtables
552        if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) {
553            return Ok(ignore_tombstone_value(entry));
554        }
555
556        self.get_internal_entry_from_segments(key, seqno)
557    }
558
559    fn get_internal_entry_from_sealed_memtables(
560        &self,
561        key: &[u8],
562        seqno: Option<SeqNo>,
563    ) -> Option<InternalValue> {
564        let memtable_lock = self.sealed_memtables.read().expect("lock is poisoned");
565
566        for (_, memtable) in memtable_lock.iter().rev() {
567            if let Some(entry) = memtable.get(key, seqno) {
568                return Some(entry);
569            }
570        }
571
572        None
573    }
574
575    fn get_internal_entry_from_segments(
576        &self,
577        key: &[u8],
578        seqno: Option<SeqNo>,
579    ) -> crate::Result<Option<InternalValue>> {
580        // NOTE: Create key hash for hash sharing
581        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
582        let key_hash = crate::bloom::BloomFilter::get_hash(key);
583
584        let level_manifest = self.levels.read().expect("lock is poisoned");
585
586        for level in &level_manifest.levels {
587            // NOTE: Based on benchmarking, binary search is only worth it with ~4 segments
588            if level.len() >= 4 {
589                if let Some(level) = level.as_disjoint() {
590                    if let Some(segment) = level.get_segment_containing_key(key) {
591                        if let Some(item) = segment.get(key, seqno, key_hash)? {
592                            return Ok(ignore_tombstone_value(item));
593                        }
594                    }
595
596                    // NOTE: Go to next level
597                    continue;
598                }
599            }
600
601            // NOTE: Fallback to linear search
602            for segment in &level.segments {
603                if !segment.is_key_in_key_range(key) {
604                    continue;
605                }
606
607                if let Some(item) = segment.get(key, seqno, key_hash)? {
608                    return Ok(ignore_tombstone_value(item));
609                }
610            }
611        }
612
613        Ok(None)
614    }
615
616    #[doc(hidden)]
617    pub fn get_internal_entry(
618        &self,
619        key: &[u8],
620        seqno: Option<SeqNo>,
621    ) -> crate::Result<Option<InternalValue>> {
622        // TODO: consolidate memtable & sealed behind single RwLock
623
624        let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
625
626        if let Some(entry) = memtable_lock.get(key, seqno) {
627            return Ok(ignore_tombstone_value(entry));
628        };
629
630        drop(memtable_lock);
631
632        // Now look in sealed memtables
633        if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) {
634            return Ok(ignore_tombstone_value(entry));
635        }
636
637        // Now look in segments... this may involve disk I/O
638        self.get_internal_entry_from_segments(key, seqno)
639    }
640
641    fn inner_compact(
642        &self,
643        strategy: Arc<dyn CompactionStrategy>,
644        seqno_threshold: SeqNo,
645    ) -> crate::Result<()> {
646        use crate::compaction::worker::{do_compaction, Options};
647
648        let mut opts = Options::from_tree(self, strategy);
649        opts.eviction_seqno = seqno_threshold;
650
651        do_compaction(&opts)?;
652
653        log::debug!("Compaction run over");
654
655        Ok(())
656    }
657
658    #[doc(hidden)]
659    #[must_use]
660    pub fn create_iter(
661        &self,
662        seqno: Option<SeqNo>,
663        ephemeral: Option<Arc<Memtable>>,
664    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
665        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
666    }
667
668    #[doc(hidden)]
669    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
670        &'a self,
671        range: &'a R,
672        seqno: Option<SeqNo>,
673        ephemeral: Option<Arc<Memtable>>,
674    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
675        use crate::range::{IterState, TreeIter};
676        use std::ops::Bound::{self, Excluded, Included, Unbounded};
677
678        let lo: Bound<UserKey> = match range.start_bound() {
679            Included(x) => Included(x.as_ref().into()),
680            Excluded(x) => Excluded(x.as_ref().into()),
681            Unbounded => Unbounded,
682        };
683
684        let hi: Bound<UserKey> = match range.end_bound() {
685            Included(x) => Included(x.as_ref().into()),
686            Excluded(x) => Excluded(x.as_ref().into()),
687            Unbounded => Unbounded,
688        };
689
690        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
691
692        log::trace!("range read: acquiring levels manifest read lock");
693        // NOTE: Mind lock order L -> M -> S
694        let level_manifest =
695            guardian::ArcRwLockReadGuardian::take(self.levels.clone()).expect("lock is poisoned");
696        log::trace!("range read: acquired level manifest read lock");
697
698        log::trace!("range read: acquiring active memtable read lock");
699        let active = guardian::ArcRwLockReadGuardian::take(self.active_memtable.clone())
700            .expect("lock is poisoned");
701        log::trace!("range read: acquired active memtable read lock");
702
703        log::trace!("range read: acquiring sealed memtable read lock");
704        let sealed = guardian::ArcRwLockReadGuardian::take(self.sealed_memtables.clone())
705            .expect("lock is poisoned");
706        log::trace!("range read: acquired sealed memtable read lock");
707
708        let iter_state = IterState {
709            active: active.clone(),
710            sealed: sealed.iter().map(|(_, mt)| mt.clone()).collect(),
711            ephemeral,
712            levels: level_manifest.levels.clone(),
713        };
714
715        TreeIter::create_range(iter_state, bounds, seqno, level_manifest)
716    }
717
718    #[doc(hidden)]
719    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
720        &'a self,
721        range: &'a R,
722        seqno: Option<SeqNo>,
723        ephemeral: Option<Arc<Memtable>>,
724    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
725        self.create_internal_range(range, seqno, ephemeral)
726            .map(|item| match item {
727                Ok(kv) => Ok((kv.key.user_key, kv.value)),
728                Err(e) => Err(e),
729            })
730    }
731
732    #[doc(hidden)]
733    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
734        &'a self,
735        prefix: K,
736        seqno: Option<SeqNo>,
737        ephemeral: Option<Arc<Memtable>>,
738    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
739        use crate::range::prefix_to_range;
740
741        let range = prefix_to_range(prefix.as_ref());
742        self.create_range(&range, seqno, ephemeral)
743    }
744
745    /// Adds an item to the active memtable.
746    ///
747    /// Returns the added item's size and new size of the memtable.
748    #[doc(hidden)]
749    #[must_use]
750    pub fn append_entry(&self, value: InternalValue) -> (u32, u32) {
751        let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
752        memtable_lock.insert(value)
753    }
754
755    /// Recovers previous state, by loading the level manifest and segments.
756    ///
757    /// # Errors
758    ///
759    /// Returns error, if an IO error occurred.
760    fn recover(mut config: Config) -> crate::Result<Self> {
761        use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
762        use inner::get_next_tree_id;
763
764        log::info!("Recovering LSM-tree at {:?}", config.path);
765
766        let bytes = std::fs::read(config.path.join(MANIFEST_FILE))?;
767        let mut bytes = Cursor::new(bytes);
768        let manifest = Manifest::decode_from(&mut bytes)?;
769
770        if manifest.version != Version::V2 {
771            return Err(crate::Error::InvalidVersion(manifest.version));
772        }
773
774        // IMPORTANT: Restore persisted config
775        config.level_count = manifest.level_count;
776        config.table_type = manifest.table_type;
777        config.tree_type = manifest.tree_type;
778
779        let tree_id = get_next_tree_id();
780
781        let mut levels = Self::recover_levels(
782            &config.path,
783            tree_id,
784            &config.block_cache,
785            &config.descriptor_table,
786        )?;
787        levels.update_metadata();
788
789        let highest_segment_id = levels.iter().map(Segment::id).max().unwrap_or_default();
790
791        let inner = TreeInner {
792            id: tree_id,
793            segment_id_counter: Arc::new(AtomicU64::new(highest_segment_id + 1)),
794            active_memtable: Arc::default(),
795            sealed_memtables: Arc::default(),
796            levels: Arc::new(RwLock::new(levels)),
797            stop_signal: StopSignal::default(),
798            config,
799            major_compaction_lock: RwLock::default(),
800        };
801
802        Ok(Self(Arc::new(inner)))
803    }
804
805    /// Creates a new LSM-tree in a directory.
806    fn create_new(config: Config) -> crate::Result<Self> {
807        use crate::file::{fsync_directory, MANIFEST_FILE, SEGMENTS_FOLDER};
808        use std::fs::{create_dir_all, File};
809
810        let path = config.path.clone();
811        log::trace!("Creating LSM-tree at {path:?}");
812
813        create_dir_all(&path)?;
814
815        let manifest_path = path.join(MANIFEST_FILE);
816        assert!(!manifest_path.try_exists()?);
817
818        let segment_folder_path = path.join(SEGMENTS_FOLDER);
819        create_dir_all(&segment_folder_path)?;
820
821        // NOTE: Lastly, fsync version marker, which contains the version
822        // -> the LSM is fully initialized
823        let mut file = File::create(manifest_path)?;
824        Manifest {
825            version: Version::V2,
826            level_count: config.level_count,
827            tree_type: config.tree_type,
828            table_type: TableType::Block,
829        }
830        .encode_into(&mut file)?;
831        file.sync_all()?;
832
833        // IMPORTANT: fsync folders on Unix
834        fsync_directory(&segment_folder_path)?;
835        fsync_directory(&path)?;
836
837        let inner = TreeInner::create_new(config)?;
838        Ok(Self(Arc::new(inner)))
839    }
840
841    /// Recovers the level manifest, loading all segments from disk.
842    fn recover_levels<P: AsRef<Path>>(
843        tree_path: P,
844        tree_id: TreeId,
845        block_cache: &Arc<BlockCache>,
846        descriptor_table: &Arc<FileDescriptorTable>,
847    ) -> crate::Result<LevelManifest> {
848        use crate::{
849            file::fsync_directory,
850            file::{LEVELS_MANIFEST_FILE, SEGMENTS_FOLDER},
851            SegmentId,
852        };
853
854        let tree_path = tree_path.as_ref();
855
856        let level_manifest_path = tree_path.join(LEVELS_MANIFEST_FILE);
857        log::info!("Recovering manifest at {level_manifest_path:?}");
858
859        let segment_id_map = LevelManifest::recover_ids(&level_manifest_path)?;
860        let cnt = segment_id_map.len();
861
862        log::debug!("Recovering {cnt} disk segments from {tree_path:?}");
863
864        let progress_mod = match cnt {
865            _ if cnt <= 20 => 1,
866            _ if cnt <= 100 => 10,
867            _ => 100,
868        };
869
870        let mut segments = vec![];
871
872        let segment_base_folder = tree_path.join(SEGMENTS_FOLDER);
873
874        if !segment_base_folder.try_exists()? {
875            std::fs::create_dir_all(&segment_base_folder)?;
876            fsync_directory(&segment_base_folder)?;
877        }
878
879        for (idx, dirent) in std::fs::read_dir(&segment_base_folder)?.enumerate() {
880            let dirent = dirent?;
881
882            let file_name = dirent.file_name();
883
884            if file_name == ".DS_Store" {
885                continue;
886            }
887
888            let segment_file_name = file_name.to_str().ok_or_else(|| {
889                log::error!("invalid segment file name {file_name:?}");
890                crate::Error::Unrecoverable
891            })?;
892
893            let segment_file_path = dirent.path();
894            assert!(!segment_file_path.is_dir());
895
896            if segment_file_name.starts_with("tmp_") {
897                log::debug!("Deleting unfinished segment: {segment_file_path:?}",);
898                std::fs::remove_file(&segment_file_path)?;
899                continue;
900            }
901
902            log::debug!("Recovering segment from {segment_file_path:?}");
903
904            let segment_id = segment_file_name.parse::<SegmentId>().map_err(|e| {
905                log::error!("invalid segment file name {segment_file_name:?}: {e:?}");
906                crate::Error::Unrecoverable
907            })?;
908
909            if let Some(&level_idx) = segment_id_map.get(&segment_id) {
910                let segment = Segment::recover(
911                    &segment_file_path,
912                    tree_id,
913                    block_cache.clone(),
914                    descriptor_table.clone(),
915                    level_idx == 0 || level_idx == 1,
916                )?;
917
918                descriptor_table.insert(&segment_file_path, segment.global_id());
919
920                segments.push(segment);
921                log::debug!("Recovered segment from {segment_file_path:?}");
922
923                if idx % progress_mod == 0 {
924                    log::debug!("Recovered {idx}/{cnt} disk segments");
925                }
926            } else {
927                log::debug!("Deleting unfinished segment: {segment_file_path:?}",);
928                std::fs::remove_file(&segment_file_path)?;
929            }
930        }
931
932        if segments.len() < cnt {
933            log::error!(
934                "Recovered less segments than expected: {:?}",
935                segment_id_map.keys(),
936            );
937            return Err(crate::Error::Unrecoverable);
938        }
939
940        log::debug!("Successfully recovered {} segments", segments.len());
941
942        LevelManifest::recover(&level_manifest_path, segments)
943    }
944}