lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub(crate) mod ingest;
6pub mod inner;
7
8use crate::{
9    cache::Cache,
10    coding::{Decode, Encode},
11    compaction::CompactionStrategy,
12    config::Config,
13    descriptor_table::FileDescriptorTable,
14    level_manifest::LevelManifest,
15    manifest::Manifest,
16    memtable::Memtable,
17    segment::{
18        block_index::{full_index::FullBlockIndex, BlockIndexImpl},
19        meta::TableType,
20        Segment, SegmentInner,
21    },
22    value::InternalValue,
23    version::Version,
24    AbstractTree, KvPair, SegmentId, SeqNo, Snapshot, UserKey, UserValue, ValueType,
25};
26use inner::{MemtableId, SealedMemtables, TreeId, TreeInner};
27use std::{
28    io::Cursor,
29    ops::RangeBounds,
30    path::Path,
31    sync::atomic::AtomicBool,
32    sync::{atomic::AtomicU64, Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
33};
34
35fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
36    if item.is_tombstone() {
37        None
38    } else {
39        Some(item)
40    }
41}
42
43/// A log-structured merge tree (LSM-tree/LSMT)
44#[derive(Clone)]
45pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
46
47impl std::ops::Deref for Tree {
48    type Target = TreeInner;
49
50    fn deref(&self) -> &Self::Target {
51        &self.0
52    }
53}
54
55impl AbstractTree for Tree {
56    fn ingest(&self, iter: impl Iterator<Item = (UserKey, UserValue)>) -> crate::Result<()> {
57        use crate::tree::ingest::Ingestion;
58        use std::time::Instant;
59
60        // NOTE: Lock active memtable so nothing else can be going on while we are bulk loading
61        let lock = self.lock_active_memtable();
62        assert!(
63            lock.is_empty(),
64            "can only perform bulk_ingest on empty trees",
65        );
66
67        let mut writer = Ingestion::new(self)?;
68
69        let start = Instant::now();
70        let mut count = 0;
71        let mut last_key = None;
72
73        for (key, value) in iter {
74            if let Some(last_key) = &last_key {
75                assert!(
76                    key > last_key,
77                    "next key in bulk ingest was not greater than last key",
78                );
79            }
80            last_key = Some(key.clone());
81
82            writer.write(key, value)?;
83
84            count += 1;
85        }
86
87        writer.finish()?;
88
89        log::info!("Ingested {count} items in {:?}", start.elapsed());
90
91        Ok(())
92    }
93
94    #[doc(hidden)]
95    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
96        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
97
98        // IMPORTANT: Write lock so we can be the only compaction going on
99        let _lock = self
100            .0
101            .major_compaction_lock
102            .write()
103            .expect("lock is poisoned");
104
105        log::info!("Starting major compaction");
106        self.inner_compact(strategy, seqno_threshold)
107    }
108
109    fn l0_run_count(&self) -> usize {
110        let lock = self.levels.read().expect("lock is poisoned");
111
112        let first_level = lock
113            .levels
114            .first()
115            .expect("first level should always exist");
116
117        if first_level.is_disjoint {
118            1
119        } else {
120            // TODO: in the future, there will be a Vec<Run> per Level
121            // TODO: so this will need to change,
122            // TODO: but then we also don't need the manual is_disjoint check
123            first_level.segments.len()
124        }
125    }
126
127    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
128        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
129    }
130
131    fn bloom_filter_size(&self) -> usize {
132        self.levels
133            .read()
134            .expect("lock is poisoned")
135            .iter()
136            .map(super::segment::Segment::bloom_filter_size)
137            .sum()
138    }
139
140    fn sealed_memtable_count(&self) -> usize {
141        self.sealed_memtables
142            .read()
143            .expect("lock is poisoned")
144            .len()
145    }
146
147    fn verify(&self) -> crate::Result<usize> {
148        // NOTE: Lock memtable to prevent any tampering with disk segments
149        let _lock = self.lock_active_memtable();
150
151        let mut sum = 0;
152
153        let level_manifest = self.levels.read().expect("lock is poisoned");
154
155        for level in &level_manifest.levels {
156            for segment in &level.segments {
157                sum += segment.verify()?;
158            }
159        }
160
161        Ok(sum)
162    }
163
164    fn keys(
165        &self,
166        seqno: Option<SeqNo>,
167        index: Option<Arc<Memtable>>,
168    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
169        Box::new(self.create_iter(seqno, index).map(|x| x.map(|(k, _)| k)))
170    }
171
172    fn values(
173        &self,
174        seqno: Option<SeqNo>,
175        index: Option<Arc<Memtable>>,
176    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
177        Box::new(self.create_iter(seqno, index).map(|x| x.map(|(_, v)| v)))
178    }
179
180    fn flush_memtable(
181        &self,
182        segment_id: SegmentId,
183        memtable: &Arc<Memtable>,
184        seqno_threshold: SeqNo,
185    ) -> crate::Result<Option<Segment>> {
186        use crate::{
187            compaction::stream::CompactionStream,
188            file::SEGMENTS_FOLDER,
189            segment::writer::{Options, Writer},
190        };
191        use std::time::Instant;
192
193        let start = Instant::now();
194
195        let folder = self.config.path.join(SEGMENTS_FOLDER);
196        log::debug!("writing segment to {folder:?}");
197
198        let mut segment_writer = Writer::new(Options {
199            segment_id,
200            folder,
201            data_block_size: self.config.data_block_size,
202            index_block_size: self.config.index_block_size,
203        })?
204        .use_compression(self.config.compression);
205
206        {
207            use crate::segment::writer::BloomConstructionPolicy;
208
209            if self.config.bloom_bits_per_key >= 0 {
210                segment_writer =
211                    segment_writer.use_bloom_policy(BloomConstructionPolicy::FpRate(0.00001));
212            } else {
213                segment_writer =
214                    segment_writer.use_bloom_policy(BloomConstructionPolicy::BitsPerKey(0));
215            }
216        }
217
218        let iter = memtable.iter().map(Ok);
219        let compaction_filter = CompactionStream::new(iter, seqno_threshold);
220
221        for item in compaction_filter {
222            segment_writer.write(item?)?;
223        }
224
225        let result = self.consume_writer(segment_id, segment_writer)?;
226
227        log::debug!("Flushed memtable {segment_id:?} in {:?}", start.elapsed());
228
229        Ok(result)
230    }
231
232    fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
233        // NOTE: Mind lock order L -> M -> S
234        log::trace!("register: Acquiring levels manifest write lock");
235        let mut original_levels = self.levels.write().expect("lock is poisoned");
236        log::trace!("register: Acquired levels manifest write lock");
237
238        // NOTE: Mind lock order L -> M -> S
239        log::trace!("register: Acquiring sealed memtables write lock");
240        let mut sealed_memtables = self.sealed_memtables.write().expect("lock is poisoned");
241        log::trace!("register: Acquired sealed memtables write lock");
242
243        original_levels.atomic_swap(|recipe| {
244            for segment in segments.iter().cloned() {
245                recipe
246                    .first_mut()
247                    .expect("first level should exist")
248                    .insert(segment);
249            }
250        })?;
251
252        // eprintln!("{original_levels}");
253
254        for segment in segments {
255            log::trace!("releasing sealed memtable {}", segment.id());
256            sealed_memtables.remove(segment.id());
257        }
258
259        Ok(())
260    }
261
262    fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Arc<Memtable>> {
263        self.active_memtable.write().expect("lock is poisoned")
264    }
265
266    fn clear_active_memtable(&self) {
267        *self.active_memtable.write().expect("lock is poisoned") = Arc::new(Memtable::default());
268    }
269
270    fn set_active_memtable(&self, memtable: Memtable) {
271        let mut memtable_lock = self.active_memtable.write().expect("lock is poisoned");
272        *memtable_lock = Arc::new(memtable);
273    }
274
275    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
276        let mut memtable_lock = self.sealed_memtables.write().expect("lock is poisoned");
277        memtable_lock.add(id, memtable);
278    }
279
280    fn compact(
281        &self,
282        strategy: Arc<dyn CompactionStrategy>,
283        seqno_threshold: SeqNo,
284    ) -> crate::Result<()> {
285        // NOTE: Read lock major compaction lock
286        // That way, if a major compaction is running, we cannot proceed
287        // But in general, parallel (non-major) compactions can occur
288        let _lock = self
289            .0
290            .major_compaction_lock
291            .read()
292            .expect("lock is poisoned");
293
294        self.inner_compact(strategy, seqno_threshold)
295    }
296
297    fn get_next_segment_id(&self) -> SegmentId {
298        self.0.get_next_segment_id()
299    }
300
301    fn tree_config(&self) -> &Config {
302        &self.config
303    }
304
305    fn active_memtable_size(&self) -> u32 {
306        use std::sync::atomic::Ordering::Acquire;
307
308        self.active_memtable
309            .read()
310            .expect("lock is poisoned")
311            .approximate_size
312            .load(Acquire)
313    }
314
315    fn tree_type(&self) -> crate::TreeType {
316        crate::TreeType::Standard
317    }
318
319    fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
320        log::trace!("rotate: acquiring active memtable write lock");
321        let mut active_memtable = self.lock_active_memtable();
322
323        log::trace!("rotate: acquiring sealed memtables write lock");
324        let mut sealed_memtables = self.lock_sealed_memtables();
325
326        if active_memtable.is_empty() {
327            return None;
328        }
329
330        let yanked_memtable = std::mem::take(&mut *active_memtable);
331        let yanked_memtable = yanked_memtable;
332
333        let tmp_memtable_id = self.get_next_segment_id();
334        sealed_memtables.add(tmp_memtable_id, yanked_memtable.clone());
335
336        log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
337
338        Some((tmp_memtable_id, yanked_memtable))
339    }
340
341    fn segment_count(&self) -> usize {
342        self.levels.read().expect("lock is poisoned").len()
343    }
344
345    fn level_segment_count(&self, idx: usize) -> Option<usize> {
346        self.levels
347            .read()
348            .expect("lock is poisoned")
349            .levels
350            .get(idx)
351            .map(|x| x.len())
352    }
353
354    #[allow(clippy::significant_drop_tightening)]
355    fn approximate_len(&self) -> usize {
356        // NOTE: Mind lock order L -> M -> S
357        let levels = self.levels.read().expect("lock is poisoned");
358        let memtable = self.active_memtable.read().expect("lock is poisoned");
359        let sealed = self.sealed_memtables.read().expect("lock is poisoned");
360
361        let segments_item_count = levels.iter().map(|x| x.metadata.item_count).sum::<u64>();
362        let memtable_count = memtable.len() as u64;
363        let sealed_count = sealed.iter().map(|(_, mt)| mt.len()).sum::<usize>() as u64;
364
365        (memtable_count + sealed_count + segments_item_count)
366            .try_into()
367            .expect("should not be too large")
368    }
369
370    fn disk_space(&self) -> u64 {
371        let levels = self.levels.read().expect("lock is poisoned");
372        levels.iter().map(|x| x.metadata.file_size).sum()
373    }
374
375    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
376        let active = self
377            .active_memtable
378            .read()
379            .expect("lock is poisoned")
380            .get_highest_seqno();
381
382        let sealed = self
383            .sealed_memtables
384            .read()
385            .expect("Lock is poisoned")
386            .iter()
387            .map(|(_, table)| table.get_highest_seqno())
388            .max()
389            .flatten();
390
391        active.max(sealed)
392    }
393
394    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
395        let levels = self.levels.read().expect("lock is poisoned");
396        levels
397            .iter()
398            .map(super::segment::Segment::get_highest_seqno)
399            .max()
400    }
401
402    fn snapshot(&self, seqno: SeqNo) -> Snapshot {
403        use crate::AnyTree::Standard;
404
405        Snapshot::new(Standard(self.clone()), seqno)
406    }
407
408    fn get<K: AsRef<[u8]>>(
409        &self,
410        key: K,
411        seqno: Option<SeqNo>,
412    ) -> crate::Result<Option<UserValue>> {
413        Ok(self
414            .get_internal_entry(key.as_ref(), seqno)?
415            .map(|x| x.value))
416    }
417
418    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
419        &self,
420        range: R,
421        seqno: Option<SeqNo>,
422        index: Option<Arc<Memtable>>,
423    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
424        Box::new(self.create_range(&range, seqno, index))
425    }
426
427    fn prefix<K: AsRef<[u8]>>(
428        &self,
429        prefix: K,
430        seqno: Option<SeqNo>,
431        index: Option<Arc<Memtable>>,
432    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
433        Box::new(self.create_prefix(prefix, seqno, index))
434    }
435
436    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
437        &self,
438        key: K,
439        value: V,
440        seqno: SeqNo,
441    ) -> (u32, u32) {
442        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
443        self.append_entry(value)
444    }
445
446    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
447        let value = InternalValue::new_tombstone(key, seqno);
448        self.append_entry(value)
449    }
450
451    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
452        let value = InternalValue::new_weak_tombstone(key, seqno);
453        self.append_entry(value)
454    }
455}
456
457impl Tree {
458    /// Opens an LSM-tree in the given directory.
459    ///
460    /// Will recover previous state if the folder was previously
461    /// occupied by an LSM-tree, including the previous configuration.
462    /// If not, a new tree will be initialized with the given config.
463    ///
464    /// After recovering a previous state, use [`Tree::set_active_memtable`]
465    /// to fill the memtable with data from a write-ahead log for full durability.
466    ///
467    /// # Errors
468    ///
469    /// Returns error, if an IO error occurred.
470    pub(crate) fn open(config: Config) -> crate::Result<Self> {
471        use crate::file::MANIFEST_FILE;
472
473        log::debug!("Opening LSM-tree at {:?}", config.path);
474
475        // Check for old version
476        if config.path.join("version").try_exists()? {
477            return Err(crate::Error::InvalidVersion(Version::V1));
478        }
479
480        let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
481            Self::recover(config)
482        } else {
483            Self::create_new(config)
484        }?;
485
486        Ok(tree)
487    }
488
489    pub(crate) fn read_lock_active_memtable(&self) -> RwLockReadGuard<'_, Arc<Memtable>> {
490        self.active_memtable.read().expect("lock is poisoned")
491    }
492
493    pub(crate) fn consume_writer(
494        &self,
495        segment_id: SegmentId,
496        mut writer: crate::segment::writer::Writer,
497    ) -> crate::Result<Option<Segment>> {
498        let segment_folder = writer.opts.folder.clone();
499        let segment_file_path = segment_folder.join(segment_id.to_string());
500
501        let Some(trailer) = writer.finish()? else {
502            return Ok(None);
503        };
504
505        log::debug!("Finalized segment write at {segment_folder:?}");
506
507        let block_index =
508            FullBlockIndex::from_file(&segment_file_path, &trailer.metadata, &trailer.offsets)?;
509        let block_index = Arc::new(BlockIndexImpl::Full(block_index));
510
511        let created_segment: Segment = SegmentInner {
512            path: segment_file_path.to_path_buf(),
513
514            tree_id: self.id,
515
516            metadata: trailer.metadata,
517            offsets: trailer.offsets,
518
519            descriptor_table: self.config.descriptor_table.clone(),
520            block_index,
521            cache: self.config.cache.clone(),
522
523            bloom_filter: Segment::load_bloom(&segment_file_path, trailer.offsets.bloom_ptr)?,
524
525            is_deleted: AtomicBool::default(),
526        }
527        .into();
528
529        self.config
530            .descriptor_table
531            .insert(segment_file_path, created_segment.global_id());
532
533        log::debug!("Flushed segment to {segment_folder:?}");
534
535        Ok(Some(created_segment))
536    }
537
538    /// Synchronously flushes the active memtable to a disk segment.
539    ///
540    /// The function may not return a result, if, during concurrent workloads, the memtable
541    /// ends up being empty before the flush thread is set up.
542    ///
543    /// The result will contain the disk segment's path, relative to the tree's base path.
544    ///
545    /// # Errors
546    ///
547    /// Will return `Err` if an IO error occurs.
548    #[doc(hidden)]
549    pub fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Segment>> {
550        log::debug!("Flushing active memtable");
551
552        let Some((segment_id, yanked_memtable)) = self.rotate_memtable() else {
553            return Ok(None);
554        };
555
556        let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, seqno_threshold)?
557        else {
558            return Ok(None);
559        };
560        self.register_segments(&[segment.clone()])?;
561
562        Ok(Some(segment))
563    }
564
565    /// Returns `true` if there are some segments that are being compacted.
566    #[doc(hidden)]
567    #[must_use]
568    pub fn is_compacting(&self) -> bool {
569        let levels = self.levels.read().expect("lock is poisoned");
570        levels.is_compacting()
571    }
572
573    /// Write-locks the sealed memtables for exclusive access
574    fn lock_sealed_memtables(&self) -> RwLockWriteGuard<'_, SealedMemtables> {
575        self.sealed_memtables.write().expect("lock is poisoned")
576    }
577
578    // TODO: maybe not needed anyway
579    /// Used for [`BlobTree`] lookup
580    pub(crate) fn get_internal_entry_with_memtable(
581        &self,
582        memtable_lock: &Memtable,
583        key: &[u8],
584        seqno: Option<SeqNo>,
585    ) -> crate::Result<Option<InternalValue>> {
586        if let Some(entry) = memtable_lock.get(key, seqno) {
587            return Ok(ignore_tombstone_value(entry));
588        };
589
590        // Now look in sealed memtables
591        if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) {
592            return Ok(ignore_tombstone_value(entry));
593        }
594
595        self.get_internal_entry_from_segments(key, seqno)
596    }
597
598    fn get_internal_entry_from_sealed_memtables(
599        &self,
600        key: &[u8],
601        seqno: Option<SeqNo>,
602    ) -> Option<InternalValue> {
603        let memtable_lock = self.sealed_memtables.read().expect("lock is poisoned");
604
605        for (_, memtable) in memtable_lock.iter().rev() {
606            if let Some(entry) = memtable.get(key, seqno) {
607                return Some(entry);
608            }
609        }
610
611        None
612    }
613
614    fn get_internal_entry_from_segments(
615        &self,
616        key: &[u8],
617        seqno: Option<SeqNo>,
618    ) -> crate::Result<Option<InternalValue>> {
619        // NOTE: Create key hash for hash sharing
620        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
621        let key_hash = crate::bloom::BloomFilter::get_hash(key);
622
623        let level_manifest = self.levels.read().expect("lock is poisoned");
624
625        for level in &level_manifest.levels {
626            // NOTE: Based on benchmarking, binary search is only worth it with ~4 segments
627            if level.len() >= 4 {
628                if let Some(level) = level.as_disjoint() {
629                    if let Some(segment) = level.get_segment_containing_key(key) {
630                        if let Some(item) = segment.get(key, seqno, key_hash)? {
631                            return Ok(ignore_tombstone_value(item));
632                        }
633                    }
634
635                    // NOTE: Go to next level
636                    continue;
637                }
638            }
639
640            // NOTE: Fallback to linear search
641            for segment in &level.segments {
642                if !segment.is_key_in_key_range(key) {
643                    continue;
644                }
645
646                if let Some(item) = segment.get(key, seqno, key_hash)? {
647                    return Ok(ignore_tombstone_value(item));
648                }
649            }
650        }
651
652        Ok(None)
653    }
654
655    #[doc(hidden)]
656    pub fn get_internal_entry(
657        &self,
658        key: &[u8],
659        seqno: Option<SeqNo>,
660    ) -> crate::Result<Option<InternalValue>> {
661        // TODO: consolidate memtable & sealed behind single RwLock
662
663        let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
664
665        if let Some(entry) = memtable_lock.get(key, seqno) {
666            return Ok(ignore_tombstone_value(entry));
667        };
668
669        drop(memtable_lock);
670
671        // Now look in sealed memtables
672        if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) {
673            return Ok(ignore_tombstone_value(entry));
674        }
675
676        // Now look in segments... this may involve disk I/O
677        self.get_internal_entry_from_segments(key, seqno)
678    }
679
680    fn inner_compact(
681        &self,
682        strategy: Arc<dyn CompactionStrategy>,
683        seqno_threshold: SeqNo,
684    ) -> crate::Result<()> {
685        use crate::compaction::worker::{do_compaction, Options};
686
687        let mut opts = Options::from_tree(self, strategy);
688        opts.eviction_seqno = seqno_threshold;
689
690        do_compaction(&opts)?;
691
692        log::debug!("Compaction run over");
693
694        Ok(())
695    }
696
697    #[doc(hidden)]
698    #[must_use]
699    pub fn create_iter(
700        &self,
701        seqno: Option<SeqNo>,
702        ephemeral: Option<Arc<Memtable>>,
703    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
704        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
705    }
706
707    #[doc(hidden)]
708    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
709        &'a self,
710        range: &'a R,
711        seqno: Option<SeqNo>,
712        ephemeral: Option<Arc<Memtable>>,
713    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
714        use crate::range::{IterState, TreeIter};
715        use std::ops::Bound::{self, Excluded, Included, Unbounded};
716
717        let lo: Bound<UserKey> = match range.start_bound() {
718            Included(x) => Included(x.as_ref().into()),
719            Excluded(x) => Excluded(x.as_ref().into()),
720            Unbounded => Unbounded,
721        };
722
723        let hi: Bound<UserKey> = match range.end_bound() {
724            Included(x) => Included(x.as_ref().into()),
725            Excluded(x) => Excluded(x.as_ref().into()),
726            Unbounded => Unbounded,
727        };
728
729        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
730
731        log::trace!("range read: acquiring levels manifest read lock");
732        // NOTE: Mind lock order L -> M -> S
733        let level_manifest =
734            guardian::ArcRwLockReadGuardian::take(self.levels.clone()).expect("lock is poisoned");
735        log::trace!("range read: acquired level manifest read lock");
736
737        log::trace!("range read: acquiring active memtable read lock");
738        let active = guardian::ArcRwLockReadGuardian::take(self.active_memtable.clone())
739            .expect("lock is poisoned");
740        log::trace!("range read: acquired active memtable read lock");
741
742        log::trace!("range read: acquiring sealed memtable read lock");
743        let sealed = guardian::ArcRwLockReadGuardian::take(self.sealed_memtables.clone())
744            .expect("lock is poisoned");
745        log::trace!("range read: acquired sealed memtable read lock");
746
747        let iter_state = IterState {
748            active: active.clone(),
749            sealed: sealed.iter().map(|(_, mt)| mt.clone()).collect(),
750            ephemeral,
751            levels: level_manifest.levels.clone(),
752        };
753
754        TreeIter::create_range(iter_state, bounds, seqno, level_manifest)
755    }
756
757    #[doc(hidden)]
758    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
759        &'a self,
760        range: &'a R,
761        seqno: Option<SeqNo>,
762        ephemeral: Option<Arc<Memtable>>,
763    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
764        self.create_internal_range(range, seqno, ephemeral)
765            .map(|item| match item {
766                Ok(kv) => Ok((kv.key.user_key, kv.value)),
767                Err(e) => Err(e),
768            })
769    }
770
771    #[doc(hidden)]
772    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
773        &'a self,
774        prefix: K,
775        seqno: Option<SeqNo>,
776        ephemeral: Option<Arc<Memtable>>,
777    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
778        use crate::range::prefix_to_range;
779
780        let range = prefix_to_range(prefix.as_ref());
781        self.create_range(&range, seqno, ephemeral)
782    }
783
784    /// Adds an item to the active memtable.
785    ///
786    /// Returns the added item's size and new size of the memtable.
787    #[doc(hidden)]
788    #[must_use]
789    pub fn append_entry(&self, value: InternalValue) -> (u32, u32) {
790        let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
791        memtable_lock.insert(value)
792    }
793
794    /// Recovers previous state, by loading the level manifest and segments.
795    ///
796    /// # Errors
797    ///
798    /// Returns error, if an IO error occurred.
799    fn recover(mut config: Config) -> crate::Result<Self> {
800        use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
801        use inner::get_next_tree_id;
802
803        log::info!("Recovering LSM-tree at {:?}", config.path);
804
805        let bytes = std::fs::read(config.path.join(MANIFEST_FILE))?;
806        let mut bytes = Cursor::new(bytes);
807        let manifest = Manifest::decode_from(&mut bytes)?;
808
809        if manifest.version != Version::V2 {
810            return Err(crate::Error::InvalidVersion(manifest.version));
811        }
812
813        // IMPORTANT: Restore persisted config
814        config.level_count = manifest.level_count;
815        config.table_type = manifest.table_type;
816        config.tree_type = manifest.tree_type;
817
818        let tree_id = get_next_tree_id();
819
820        let mut levels = Self::recover_levels(
821            &config.path,
822            tree_id,
823            &config.cache,
824            &config.descriptor_table,
825        )?;
826        levels.update_metadata();
827
828        let highest_segment_id = levels.iter().map(Segment::id).max().unwrap_or_default();
829
830        let inner = TreeInner {
831            id: tree_id,
832            segment_id_counter: Arc::new(AtomicU64::new(highest_segment_id + 1)),
833            active_memtable: Arc::default(),
834            sealed_memtables: Arc::default(),
835            levels: Arc::new(RwLock::new(levels)),
836            stop_signal: StopSignal::default(),
837            config,
838            major_compaction_lock: RwLock::default(),
839        };
840
841        Ok(Self(Arc::new(inner)))
842    }
843
844    /// Creates a new LSM-tree in a directory.
845    fn create_new(config: Config) -> crate::Result<Self> {
846        use crate::file::{fsync_directory, MANIFEST_FILE, SEGMENTS_FOLDER};
847        use std::fs::{create_dir_all, File};
848
849        let path = config.path.clone();
850        log::trace!("Creating LSM-tree at {path:?}");
851
852        create_dir_all(&path)?;
853
854        let manifest_path = path.join(MANIFEST_FILE);
855        assert!(!manifest_path.try_exists()?);
856
857        let segment_folder_path = path.join(SEGMENTS_FOLDER);
858        create_dir_all(&segment_folder_path)?;
859
860        // NOTE: Lastly, fsync version marker, which contains the version
861        // -> the LSM is fully initialized
862        let mut file = File::create(manifest_path)?;
863        Manifest {
864            version: Version::V2,
865            level_count: config.level_count,
866            tree_type: config.tree_type,
867            table_type: TableType::Block,
868        }
869        .encode_into(&mut file)?;
870        file.sync_all()?;
871
872        // IMPORTANT: fsync folders on Unix
873        fsync_directory(&segment_folder_path)?;
874        fsync_directory(&path)?;
875
876        let inner = TreeInner::create_new(config)?;
877        Ok(Self(Arc::new(inner)))
878    }
879
880    /// Recovers the level manifest, loading all segments from disk.
881    fn recover_levels<P: AsRef<Path>>(
882        tree_path: P,
883        tree_id: TreeId,
884        cache: &Arc<Cache>,
885        descriptor_table: &Arc<FileDescriptorTable>,
886    ) -> crate::Result<LevelManifest> {
887        use crate::{
888            file::fsync_directory,
889            file::{LEVELS_MANIFEST_FILE, SEGMENTS_FOLDER},
890            SegmentId,
891        };
892
893        let tree_path = tree_path.as_ref();
894
895        let level_manifest_path = tree_path.join(LEVELS_MANIFEST_FILE);
896        log::info!("Recovering manifest at {level_manifest_path:?}");
897
898        let segment_id_map = LevelManifest::recover_ids(&level_manifest_path)?;
899        let cnt = segment_id_map.len();
900
901        log::debug!("Recovering {cnt} disk segments from {tree_path:?}");
902
903        let progress_mod = match cnt {
904            _ if cnt <= 20 => 1,
905            _ if cnt <= 100 => 10,
906            _ => 100,
907        };
908
909        let mut segments = vec![];
910
911        let segment_base_folder = tree_path.join(SEGMENTS_FOLDER);
912
913        if !segment_base_folder.try_exists()? {
914            std::fs::create_dir_all(&segment_base_folder)?;
915            fsync_directory(&segment_base_folder)?;
916        }
917
918        for (idx, dirent) in std::fs::read_dir(&segment_base_folder)?.enumerate() {
919            let dirent = dirent?;
920
921            let file_name = dirent.file_name();
922
923            if file_name == ".DS_Store" {
924                continue;
925            }
926
927            let segment_file_name = file_name.to_str().ok_or_else(|| {
928                log::error!("invalid segment file name {file_name:?}");
929                crate::Error::Unrecoverable
930            })?;
931
932            let segment_file_path = dirent.path();
933            assert!(!segment_file_path.is_dir());
934
935            log::debug!("Recovering segment from {segment_file_path:?}");
936
937            let segment_id = segment_file_name.parse::<SegmentId>().map_err(|e| {
938                log::error!("invalid segment file name {segment_file_name:?}: {e:?}");
939                crate::Error::Unrecoverable
940            })?;
941
942            if let Some(&level_idx) = segment_id_map.get(&segment_id) {
943                let segment = Segment::recover(
944                    &segment_file_path,
945                    tree_id,
946                    cache.clone(),
947                    descriptor_table.clone(),
948                    level_idx == 0 || level_idx == 1,
949                )?;
950
951                descriptor_table.insert(&segment_file_path, segment.global_id());
952
953                segments.push(segment);
954                log::debug!("Recovered segment from {segment_file_path:?}");
955
956                if idx % progress_mod == 0 {
957                    log::debug!("Recovered {idx}/{cnt} disk segments");
958                }
959            } else {
960                log::debug!("Deleting unfinished segment: {segment_file_path:?}",);
961                std::fs::remove_file(&segment_file_path)?;
962            }
963        }
964
965        if segments.len() < cnt {
966            log::error!(
967                "Recovered less segments than expected: {:?}",
968                segment_id_map.keys(),
969            );
970            return Err(crate::Error::Unrecoverable);
971        }
972
973        log::debug!("Successfully recovered {} segments", segments.len());
974
975        LevelManifest::recover(&level_manifest_path, segments)
976    }
977}