lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod inner;
6
7use crate::{
8    coding::{Decode, Encode},
9    compaction::{stream::CompactionStream, CompactionStrategy},
10    config::Config,
11    descriptor_table::FileDescriptorTable,
12    level_manifest::LevelManifest,
13    manifest::Manifest,
14    memtable::Memtable,
15    range::{prefix_to_range, MemtableLockGuard, TreeIter},
16    segment::{
17        block_index::{full_index::FullBlockIndex, BlockIndexImpl},
18        meta::TableType,
19        Segment, SegmentInner,
20    },
21    stop_signal::StopSignal,
22    value::InternalValue,
23    version::Version,
24    AbstractTree, BlockCache, KvPair, SegmentId, SeqNo, Snapshot, UserKey, UserValue, ValueType,
25};
26use inner::{MemtableId, SealedMemtables, TreeId, TreeInner};
27use std::{
28    io::Cursor,
29    ops::RangeBounds,
30    path::Path,
31    sync::{atomic::AtomicU64, Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
32};
33
34fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
35    if item.is_tombstone() {
36        None
37    } else {
38        Some(item)
39    }
40}
41
42/// A log-structured merge tree (LSM-tree/LSMT)
43#[derive(Clone)]
44pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
45
46impl std::ops::Deref for Tree {
47    type Target = TreeInner;
48
49    fn deref(&self) -> &Self::Target {
50        &self.0
51    }
52}
53
54impl AbstractTree for Tree {
55    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
56        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
57    }
58
59    fn bloom_filter_size(&self) -> usize {
60        self.levels
61            .read()
62            .expect("lock is poisoned")
63            .iter()
64            .map(|x| x.bloom_filter_size())
65            .sum()
66    }
67
68    fn sealed_memtable_count(&self) -> usize {
69        self.sealed_memtables
70            .read()
71            .expect("lock is poisoned")
72            .len()
73    }
74
75    fn is_first_level_disjoint(&self) -> bool {
76        self.levels
77            .read()
78            .expect("lock is poisoned")
79            .levels
80            .first()
81            .expect("first level should exist")
82            .is_disjoint
83    }
84
85    fn verify(&self) -> crate::Result<usize> {
86        // NOTE: Lock memtable to prevent any tampering with disk segments
87        let _lock = self.lock_active_memtable();
88
89        let mut sum = 0;
90
91        let level_manifest = self.levels.read().expect("lock is poisoned");
92
93        for level in &level_manifest.levels {
94            for segment in &level.segments {
95                sum += segment.verify()?;
96            }
97        }
98
99        Ok(sum)
100    }
101
102    fn keys(
103        &self,
104        seqno: Option<SeqNo>,
105        index: Option<Arc<Memtable>>,
106    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
107        Box::new(self.create_iter(seqno, index).map(|x| x.map(|(k, _)| k)))
108    }
109
110    fn values(
111        &self,
112        seqno: Option<SeqNo>,
113        index: Option<Arc<Memtable>>,
114    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
115        Box::new(self.create_iter(seqno, index).map(|x| x.map(|(_, v)| v)))
116    }
117
118    fn flush_memtable(
119        &self,
120        segment_id: SegmentId,
121        memtable: &Arc<Memtable>,
122        seqno_threshold: SeqNo,
123    ) -> crate::Result<Option<Segment>> {
124        use crate::{
125            file::SEGMENTS_FOLDER,
126            segment::writer::{Options, Writer},
127        };
128
129        let start = std::time::Instant::now();
130
131        let folder = self.config.path.join(SEGMENTS_FOLDER);
132        log::debug!("writing segment to {folder:?}");
133
134        let mut segment_writer = Writer::new(Options {
135            segment_id,
136            folder,
137            data_block_size: self.config.data_block_size,
138            index_block_size: self.config.index_block_size,
139        })?
140        .use_compression(self.config.compression);
141
142        {
143            use crate::segment::writer::BloomConstructionPolicy;
144
145            if self.config.bloom_bits_per_key >= 0 {
146                segment_writer =
147                    segment_writer.use_bloom_policy(BloomConstructionPolicy::FpRate(0.00001));
148            } else {
149                segment_writer =
150                    segment_writer.use_bloom_policy(BloomConstructionPolicy::BitsPerKey(0));
151            }
152        }
153
154        let iter = memtable.iter().map(Ok);
155        let compaction_filter = CompactionStream::new(iter, seqno_threshold);
156
157        for item in compaction_filter {
158            segment_writer.write(item?)?;
159        }
160
161        let result = self.consume_writer(segment_id, segment_writer)?;
162
163        log::debug!("Flushed memtable {segment_id:?} in {:?}", start.elapsed());
164
165        Ok(result)
166    }
167
168    fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
169        // NOTE: Mind lock order L -> M -> S
170        log::trace!("Acquiring levels manifest write lock");
171        let mut original_levels = self.levels.write().expect("lock is poisoned");
172
173        // NOTE: Mind lock order L -> M -> S
174        log::trace!("Acquiring sealed memtables write lock");
175        let mut sealed_memtables = self.sealed_memtables.write().expect("lock is poisoned");
176
177        original_levels.atomic_swap(|recipe| {
178            for segment in segments.iter().cloned() {
179                recipe
180                    .first_mut()
181                    .expect("first level should exist")
182                    .insert(segment);
183            }
184        })?;
185
186        // eprintln!("{original_levels}");
187
188        for segment in segments {
189            log::trace!("releasing sealed memtable {}", segment.id());
190            sealed_memtables.remove(segment.id());
191        }
192
193        Ok(())
194    }
195
196    fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Memtable> {
197        self.active_memtable.write().expect("lock is poisoned")
198    }
199
200    fn set_active_memtable(&self, memtable: Memtable) {
201        let mut memtable_lock = self.active_memtable.write().expect("lock is poisoned");
202        *memtable_lock = memtable;
203    }
204
205    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
206        let mut memtable_lock = self.sealed_memtables.write().expect("lock is poisoned");
207        memtable_lock.add(id, memtable);
208    }
209
210    fn compact(
211        &self,
212        strategy: Arc<dyn CompactionStrategy>,
213        seqno_threshold: SeqNo,
214    ) -> crate::Result<()> {
215        use crate::compaction::worker::{do_compaction, Options};
216
217        let mut opts = Options::from_tree(self, strategy);
218        opts.eviction_seqno = seqno_threshold;
219        do_compaction(&opts)?;
220
221        log::debug!("lsm-tree: compaction run over");
222
223        Ok(())
224    }
225
226    fn get_next_segment_id(&self) -> SegmentId {
227        self.0.get_next_segment_id()
228    }
229
230    fn tree_config(&self) -> &Config {
231        &self.config
232    }
233
234    fn active_memtable_size(&self) -> u32 {
235        use std::sync::atomic::Ordering::Acquire;
236
237        self.active_memtable
238            .read()
239            .expect("lock is poisoned")
240            .approximate_size
241            .load(Acquire)
242    }
243
244    fn tree_type(&self) -> crate::TreeType {
245        crate::TreeType::Standard
246    }
247
248    fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
249        log::trace!("rotate: acquiring active memtable write lock");
250        let mut active_memtable = self.lock_active_memtable();
251
252        log::trace!("rotate: acquiring sealed memtables write lock");
253        let mut sealed_memtables = self.lock_sealed_memtables();
254
255        if active_memtable.is_empty() {
256            return None;
257        }
258
259        let yanked_memtable = std::mem::take(&mut *active_memtable);
260        let yanked_memtable = Arc::new(yanked_memtable);
261
262        let tmp_memtable_id = self.get_next_segment_id();
263        sealed_memtables.add(tmp_memtable_id, yanked_memtable.clone());
264
265        log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
266
267        Some((tmp_memtable_id, yanked_memtable))
268    }
269
270    fn segment_count(&self) -> usize {
271        self.levels.read().expect("lock is poisoned").len()
272    }
273
274    fn first_level_segment_count(&self) -> usize {
275        self.levels
276            .read()
277            .expect("lock is poisoned")
278            .first_level_segment_count()
279    }
280
281    #[allow(clippy::significant_drop_tightening)]
282    fn approximate_len(&self) -> usize {
283        // NOTE: Mind lock order L -> M -> S
284        let levels = self.levels.read().expect("lock is poisoned");
285        let memtable = self.active_memtable.read().expect("lock is poisoned");
286        let sealed = self.sealed_memtables.read().expect("lock is poisoned");
287
288        let segments_item_count = levels.iter().map(|x| x.metadata.item_count).sum::<u64>();
289        let memtable_count = memtable.len() as u64;
290        let sealed_count = sealed.iter().map(|(_, mt)| mt.len()).sum::<usize>() as u64;
291
292        (memtable_count + sealed_count + segments_item_count)
293            .try_into()
294            .expect("should not be too large")
295    }
296
297    fn disk_space(&self) -> u64 {
298        let levels = self.levels.read().expect("lock is poisoned");
299        levels.iter().map(|x| x.metadata.file_size).sum()
300    }
301
302    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
303        let active = self
304            .active_memtable
305            .read()
306            .expect("lock is poisoned")
307            .get_highest_seqno();
308
309        let sealed = self
310            .sealed_memtables
311            .read()
312            .expect("Lock is poisoned")
313            .iter()
314            .map(|(_, table)| table.get_highest_seqno())
315            .max()
316            .flatten();
317
318        active.max(sealed)
319    }
320
321    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
322        let levels = self.levels.read().expect("lock is poisoned");
323        levels
324            .iter()
325            .map(super::segment::Segment::get_highest_seqno)
326            .max()
327    }
328
329    fn snapshot(&self, seqno: SeqNo) -> Snapshot {
330        use crate::AnyTree::Standard;
331
332        Snapshot::new(Standard(self.clone()), seqno)
333    }
334
335    fn get<K: AsRef<[u8]>>(
336        &self,
337        key: K,
338        seqno: Option<SeqNo>,
339    ) -> crate::Result<Option<UserValue>> {
340        Ok(self.get_internal_entry(key, seqno)?.map(|x| x.value))
341    }
342
343    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
344        &self,
345        range: R,
346        seqno: Option<SeqNo>,
347        index: Option<Arc<Memtable>>,
348    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
349        Box::new(self.create_range(&range, seqno, index))
350    }
351
352    fn prefix<K: AsRef<[u8]>>(
353        &self,
354        prefix: K,
355        seqno: Option<SeqNo>,
356        index: Option<Arc<Memtable>>,
357    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
358        Box::new(self.create_prefix(prefix, seqno, index))
359    }
360
361    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
362        &self,
363        key: K,
364        value: V,
365        seqno: SeqNo,
366    ) -> (u32, u32) {
367        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
368        self.append_entry(value)
369    }
370
371    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
372        let value = InternalValue::new_tombstone(key, seqno);
373        self.append_entry(value)
374    }
375
376    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
377        let value = InternalValue::new_weak_tombstone(key, seqno);
378        self.append_entry(value)
379    }
380}
381
382impl Tree {
383    /// Opens an LSM-tree in the given directory.
384    ///
385    /// Will recover previous state if the folder was previously
386    /// occupied by an LSM-tree, including the previous configuration.
387    /// If not, a new tree will be initialized with the given config.
388    ///
389    /// After recovering a previous state, use [`Tree::set_active_memtable`]
390    /// to fill the memtable with data from a write-ahead log for full durability.
391    ///
392    /// # Errors
393    ///
394    /// Returns error, if an IO error occurred.
395    pub(crate) fn open(config: Config) -> crate::Result<Self> {
396        use crate::file::MANIFEST_FILE;
397
398        log::debug!("Opening LSM-tree at {:?}", config.path);
399
400        // Check for old version
401        if config.path.join("version").try_exists()? {
402            return Err(crate::Error::InvalidVersion(Version::V1));
403        }
404
405        let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
406            Self::recover(config)
407        } else {
408            Self::create_new(config)
409        }?;
410
411        Ok(tree)
412    }
413
414    pub(crate) fn read_lock_active_memtable(&self) -> RwLockReadGuard<'_, Memtable> {
415        self.active_memtable.read().expect("lock is poisoned")
416    }
417
418    // TODO: Expose as public function, however:
419    // TODO: Right now this is somewhat unsafe to expose as
420    // major compaction needs ALL segments, right now it just takes as many
421    // as it can, which may make the LSM inconsistent.
422    // TODO: There should also be a function to partially compact levels and individual segments
423
424    /// Performs major compaction, blocking the caller until it's done.
425    ///
426    /// # Errors
427    ///
428    /// Will return `Err` if an IO error occurs.
429    #[doc(hidden)]
430    pub fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
431        log::info!("Starting major compaction");
432        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
433        self.compact(strategy, seqno_threshold)
434    }
435
436    pub(crate) fn consume_writer(
437        &self,
438        segment_id: SegmentId,
439        mut writer: crate::segment::writer::Writer,
440    ) -> crate::Result<Option<Segment>> {
441        let segment_folder = writer.opts.folder.clone();
442        let segment_file_path = segment_folder.join(segment_id.to_string());
443
444        let Some(trailer) = writer.finish()? else {
445            return Ok(None);
446        };
447
448        log::debug!("Finalized segment write at {segment_folder:?}");
449
450        let block_index =
451            FullBlockIndex::from_file(&segment_file_path, &trailer.metadata, &trailer.offsets)?;
452        let block_index = Arc::new(BlockIndexImpl::Full(block_index));
453
454        let created_segment: Segment = SegmentInner {
455            tree_id: self.id,
456
457            metadata: trailer.metadata,
458            offsets: trailer.offsets,
459
460            descriptor_table: self.config.descriptor_table.clone(),
461            block_index,
462            block_cache: self.config.block_cache.clone(),
463
464            bloom_filter: Segment::load_bloom(&segment_file_path, trailer.offsets.bloom_ptr)?,
465        }
466        .into();
467
468        self.config
469            .descriptor_table
470            .insert(segment_file_path, created_segment.global_id());
471
472        log::debug!("Flushed segment to {segment_folder:?}");
473
474        Ok(Some(created_segment))
475    }
476
477    /// Synchronously flushes the active memtable to a disk segment.
478    ///
479    /// The function may not return a result, if, during concurrent workloads, the memtable
480    /// ends up being empty before the flush thread is set up.
481    ///
482    /// The result will contain the disk segment's path, relative to the tree's base path.
483    ///
484    /// # Errors
485    ///
486    /// Will return `Err` if an IO error occurs.
487    #[doc(hidden)]
488    pub fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Segment>> {
489        log::debug!("Flushing active memtable");
490
491        let Some((segment_id, yanked_memtable)) = self.rotate_memtable() else {
492            return Ok(None);
493        };
494
495        let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, seqno_threshold)?
496        else {
497            return Ok(None);
498        };
499        self.register_segments(&[segment.clone()])?;
500
501        Ok(Some(segment))
502    }
503
504    /// Returns `true` if there are some segments that are being compacted.
505    #[doc(hidden)]
506    #[must_use]
507    pub fn is_compacting(&self) -> bool {
508        let levels = self.levels.read().expect("lock is poisoned");
509        levels.is_compacting()
510    }
511
512    /// Write-locks the sealed memtables for exclusive access
513    fn lock_sealed_memtables(&self) -> RwLockWriteGuard<'_, SealedMemtables> {
514        self.sealed_memtables.write().expect("lock is poisoned")
515    }
516
517    /// Used for [`BlobTree`] lookup
518    pub(crate) fn get_internal_entry_with_lock<K: AsRef<[u8]>>(
519        &self,
520        memtable_lock: &RwLockWriteGuard<'_, Memtable>,
521        key: K,
522        seqno: Option<SeqNo>,
523    ) -> crate::Result<Option<InternalValue>> {
524        if let Some(entry) = memtable_lock.get(&key, seqno) {
525            return Ok(ignore_tombstone_value(entry));
526        };
527
528        // Now look in sealed memtables
529        if let Some(entry) = self.get_internal_entry_from_sealed_memtables(&key, seqno) {
530            return Ok(ignore_tombstone_value(entry));
531        }
532
533        self.get_internal_entry_from_segments(key, seqno)
534    }
535
536    fn get_internal_entry_from_sealed_memtables<K: AsRef<[u8]>>(
537        &self,
538        key: K,
539        seqno: Option<SeqNo>,
540    ) -> Option<InternalValue> {
541        let memtable_lock = self.sealed_memtables.read().expect("lock is poisoned");
542
543        for (_, memtable) in memtable_lock.iter().rev() {
544            if let Some(entry) = memtable.get(&key, seqno) {
545                return Some(entry);
546            }
547        }
548
549        None
550    }
551
552    fn get_internal_entry_from_segments<K: AsRef<[u8]>>(
553        &self,
554        key: K,
555        seqno: Option<SeqNo>,
556    ) -> crate::Result<Option<InternalValue>> {
557        // NOTE: Create key hash for hash sharing
558        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
559        let key_hash = crate::bloom::BloomFilter::get_hash(key.as_ref());
560
561        let level_manifest = self.levels.read().expect("lock is poisoned");
562
563        for level in &level_manifest.levels {
564            // NOTE: Based on benchmarking, binary search is only worth it with ~4 segments
565            if level.len() >= 4 {
566                if let Some(level) = level.as_disjoint() {
567                    // TODO: unit test in disjoint level:
568                    // [a:5, a:4] [a:3, b:5]
569                    // ^
570                    // snapshot read a:3!!!
571
572                    if let Some(segment) = level.get_segment_containing_key(&key) {
573                        let maybe_item = segment.get(&key, seqno, key_hash)?;
574
575                        if let Some(item) = maybe_item {
576                            return Ok(ignore_tombstone_value(item));
577                        }
578                    }
579
580                    // NOTE: Go to next level
581                    continue;
582                }
583            }
584
585            // NOTE: Fallback to linear search
586            for segment in &level.segments {
587                let maybe_item = segment.get(&key, seqno, key_hash)?;
588
589                if let Some(item) = maybe_item {
590                    return Ok(ignore_tombstone_value(item));
591                }
592            }
593        }
594
595        Ok(None)
596    }
597
598    #[doc(hidden)]
599    pub fn get_internal_entry<K: AsRef<[u8]>>(
600        &self,
601        key: K,
602        seqno: Option<SeqNo>,
603    ) -> crate::Result<Option<InternalValue>> {
604        // TODO: consolidate memtable & sealed behind single RwLock
605
606        let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
607
608        if let Some(entry) = memtable_lock.get(&key, seqno) {
609            return Ok(ignore_tombstone_value(entry));
610        };
611
612        drop(memtable_lock);
613
614        // Now look in sealed memtables
615        if let Some(entry) = self.get_internal_entry_from_sealed_memtables(&key, seqno) {
616            return Ok(ignore_tombstone_value(entry));
617        }
618
619        // Now look in segments... this may involve disk I/O
620        self.get_internal_entry_from_segments(key, seqno)
621    }
622
623    #[doc(hidden)]
624    #[must_use]
625    pub fn create_iter(
626        &self,
627        seqno: Option<SeqNo>,
628        ephemeral: Option<Arc<Memtable>>,
629    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
630        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
631    }
632
633    #[doc(hidden)]
634    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
635        &'a self,
636        range: &'a R,
637        seqno: Option<SeqNo>,
638        ephemeral: Option<Arc<Memtable>>,
639    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
640        use std::ops::Bound::{self, Excluded, Included, Unbounded};
641
642        let lo: Bound<UserKey> = match range.start_bound() {
643            Included(x) => Included(x.as_ref().into()),
644            Excluded(x) => Excluded(x.as_ref().into()),
645            Unbounded => Unbounded,
646        };
647
648        let hi: Bound<UserKey> = match range.end_bound() {
649            Included(x) => Included(x.as_ref().into()),
650            Excluded(x) => Excluded(x.as_ref().into()),
651            Unbounded => Unbounded,
652        };
653
654        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
655
656        // NOTE: Mind lock order L -> M -> S
657        let level_manifest_lock =
658            guardian::ArcRwLockReadGuardian::take(self.levels.clone()).expect("lock is poisoned");
659
660        let active = guardian::ArcRwLockReadGuardian::take(self.active_memtable.clone())
661            .expect("lock is poisoned");
662
663        let sealed = guardian::ArcRwLockReadGuardian::take(self.sealed_memtables.clone())
664            .expect("lock is poisoned");
665
666        TreeIter::create_range(
667            MemtableLockGuard {
668                active,
669                sealed,
670                ephemeral,
671            },
672            bounds,
673            seqno,
674            level_manifest_lock,
675        )
676    }
677
678    #[doc(hidden)]
679    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
680        &'a self,
681        range: &'a R,
682        seqno: Option<SeqNo>,
683        ephemeral: Option<Arc<Memtable>>,
684    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
685        self.create_internal_range(range, seqno, ephemeral)
686            .map(|item| match item {
687                Ok(kv) => Ok((kv.key.user_key, kv.value)),
688                Err(e) => Err(e),
689            })
690    }
691
692    #[doc(hidden)]
693    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
694        &'a self,
695        prefix: K,
696        seqno: Option<SeqNo>,
697        ephemeral: Option<Arc<Memtable>>,
698    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
699        let range = prefix_to_range(prefix.as_ref());
700        self.create_range(&range, seqno, ephemeral)
701    }
702
703    /// Adds an item to the active memtable.
704    ///
705    /// Returns the added item's size and new size of the memtable.
706    #[doc(hidden)]
707    #[must_use]
708    pub fn append_entry(&self, value: InternalValue) -> (u32, u32) {
709        let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
710        memtable_lock.insert(value)
711    }
712
713    /// Recovers previous state, by loading the level manifest and segments.
714    ///
715    /// # Errors
716    ///
717    /// Returns error, if an IO error occurred.
718    fn recover(mut config: Config) -> crate::Result<Self> {
719        use crate::file::MANIFEST_FILE;
720        use inner::get_next_tree_id;
721
722        log::info!("Recovering LSM-tree at {:?}", config.path);
723
724        let bytes = std::fs::read(config.path.join(MANIFEST_FILE))?;
725        let mut bytes = Cursor::new(bytes);
726        let manifest = Manifest::decode_from(&mut bytes)?;
727
728        if manifest.version != Version::V2 {
729            return Err(crate::Error::InvalidVersion(manifest.version));
730        }
731
732        // IMPORTANT: Restore persisted config
733        config.level_count = manifest.level_count;
734        config.table_type = manifest.table_type;
735        config.tree_type = manifest.tree_type;
736
737        let tree_id = get_next_tree_id();
738
739        let mut levels = Self::recover_levels(
740            &config.path,
741            tree_id,
742            &config.block_cache,
743            &config.descriptor_table,
744        )?;
745        levels.update_metadata();
746
747        let highest_segment_id = levels.iter().map(Segment::id).max().unwrap_or_default();
748
749        let inner = TreeInner {
750            id: tree_id,
751            segment_id_counter: Arc::new(AtomicU64::new(highest_segment_id + 1)),
752            active_memtable: Arc::default(),
753            sealed_memtables: Arc::default(),
754            levels: Arc::new(RwLock::new(levels)),
755            stop_signal: StopSignal::default(),
756            config,
757        };
758
759        Ok(Self(Arc::new(inner)))
760    }
761
762    /// Creates a new LSM-tree in a directory.
763    fn create_new(config: Config) -> crate::Result<Self> {
764        use crate::file::{fsync_directory, MANIFEST_FILE, SEGMENTS_FOLDER};
765        use std::fs::{create_dir_all, File};
766
767        let path = config.path.clone();
768        log::trace!("Creating LSM-tree at {path:?}");
769
770        create_dir_all(&path)?;
771
772        let manifest_path = path.join(MANIFEST_FILE);
773        assert!(!manifest_path.try_exists()?);
774
775        let segment_folder_path = path.join(SEGMENTS_FOLDER);
776        create_dir_all(&segment_folder_path)?;
777
778        // NOTE: Lastly, fsync version marker, which contains the version
779        // -> the LSM is fully initialized
780        let mut file = File::create(manifest_path)?;
781        Manifest {
782            version: Version::V2,
783            level_count: config.level_count,
784            tree_type: config.tree_type,
785            table_type: TableType::Block,
786        }
787        .encode_into(&mut file)?;
788        file.sync_all()?;
789
790        // IMPORTANT: fsync folders on Unix
791        fsync_directory(&segment_folder_path)?;
792        fsync_directory(&path)?;
793
794        let inner = TreeInner::create_new(config)?;
795        Ok(Self(Arc::new(inner)))
796    }
797
798    /// Recovers the level manifest, loading all segments from disk.
799    fn recover_levels<P: AsRef<Path>>(
800        tree_path: P,
801        tree_id: TreeId,
802        block_cache: &Arc<BlockCache>,
803        descriptor_table: &Arc<FileDescriptorTable>,
804    ) -> crate::Result<LevelManifest> {
805        use crate::{
806            file::fsync_directory,
807            file::{LEVELS_MANIFEST_FILE, SEGMENTS_FOLDER},
808            SegmentId,
809        };
810
811        let tree_path = tree_path.as_ref();
812
813        let level_manifest_path = tree_path.join(LEVELS_MANIFEST_FILE);
814        log::info!("Recovering manifest at {level_manifest_path:?}");
815
816        let segment_id_map = LevelManifest::recover_ids(&level_manifest_path)?;
817        let cnt = segment_id_map.len();
818
819        log::debug!("Recovering {cnt} disk segments from {tree_path:?}");
820
821        let progress_mod = match cnt {
822            _ if cnt <= 20 => 1,
823            _ if cnt <= 100 => 10,
824            _ => 100,
825        };
826
827        let mut segments = vec![];
828
829        let segment_base_folder = tree_path.join(SEGMENTS_FOLDER);
830
831        if !segment_base_folder.try_exists()? {
832            std::fs::create_dir_all(&segment_base_folder)?;
833            fsync_directory(&segment_base_folder)?;
834        }
835
836        for (idx, dirent) in std::fs::read_dir(&segment_base_folder)?.enumerate() {
837            let dirent = dirent?;
838
839            let file_name = dirent.file_name();
840
841            if file_name == ".DS_Store" {
842                continue;
843            }
844
845            let segment_file_name = file_name.to_str().ok_or_else(|| {
846                log::error!("invalid segment file name {file_name:?}");
847                crate::Error::Unrecoverable
848            })?;
849
850            let segment_file_path = dirent.path();
851            assert!(!segment_file_path.is_dir());
852
853            if segment_file_name.starts_with("tmp_") {
854                log::debug!("Deleting unfinished segment: {segment_file_path:?}",);
855                std::fs::remove_file(&segment_file_path)?;
856                continue;
857            }
858
859            log::debug!("Recovering segment from {segment_file_path:?}");
860
861            let segment_id = segment_file_name.parse::<SegmentId>().map_err(|e| {
862                log::error!("invalid segment file name {segment_file_name:?}: {e:?}");
863                crate::Error::Unrecoverable
864            })?;
865
866            if let Some(&level_idx) = segment_id_map.get(&segment_id) {
867                let segment = Segment::recover(
868                    &segment_file_path,
869                    tree_id,
870                    block_cache.clone(),
871                    descriptor_table.clone(),
872                    level_idx == 0 || level_idx == 1,
873                )?;
874
875                descriptor_table.insert(&segment_file_path, segment.global_id());
876
877                segments.push(segment);
878                log::debug!("Recovered segment from {segment_file_path:?}");
879
880                if idx % progress_mod == 0 {
881                    log::debug!("Recovered {idx}/{cnt} disk segments");
882                }
883            } else {
884                log::debug!("Deleting unfinished segment: {segment_file_path:?}",);
885                std::fs::remove_file(&segment_file_path)?;
886            }
887        }
888
889        if segments.len() < cnt {
890            log::error!(
891                "Recovered less segments than expected: {:?}",
892                segment_id_map.keys(),
893            );
894            return Err(crate::Error::Unrecoverable);
895        }
896
897        log::debug!("Successfully recovered {} segments", segments.len());
898
899        LevelManifest::recover(&level_manifest_path, segments)
900    }
901}