lsm_tree/tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5pub mod ingest;
6pub mod inner;
7
8use crate::{
9    coding::{Decode, Encode},
10    compaction::CompactionStrategy,
11    config::Config,
12    file::BLOBS_FOLDER,
13    format_version::FormatVersion,
14    iter_guard::{IterGuard, IterGuardImpl},
15    level_manifest::LevelManifest,
16    manifest::Manifest,
17    memtable::Memtable,
18    segment::Segment,
19    value::InternalValue,
20    vlog::BlobFile,
21    AbstractTree, Cache, DescriptorTable, KvPair, SegmentId, SeqNo, SequenceNumberCounter, UserKey,
22    UserValue, ValueType,
23};
24use inner::{MemtableId, SealedMemtables, TreeId, TreeInner};
25use std::{
26    io::Cursor,
27    ops::RangeBounds,
28    path::Path,
29    sync::{atomic::AtomicU64, Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
30};
31
32#[cfg(feature = "metrics")]
33use crate::metrics::Metrics;
34
35pub struct Guard(crate::Result<(UserKey, UserValue)>);
36
37impl IterGuard for Guard {
38    fn key(self) -> crate::Result<UserKey> {
39        self.0.map(|(k, _)| k)
40    }
41
42    fn size(self) -> crate::Result<u32> {
43        // NOTE: We know LSM-tree values are 32 bits in length max
44        #[allow(clippy::cast_possible_truncation)]
45        self.into_inner().map(|(_, v)| v.len() as u32)
46    }
47
48    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
49        self.0
50    }
51}
52
53fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
54    if item.is_tombstone() {
55        None
56    } else {
57        Some(item)
58    }
59}
60
61/// A log-structured merge tree (LSM-tree/LSMT)
62#[derive(Clone)]
63pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
64
65impl std::ops::Deref for Tree {
66    type Target = TreeInner;
67
68    fn deref(&self) -> &Self::Target {
69        &self.0
70    }
71}
72
73impl AbstractTree for Tree {
74    #[cfg(feature = "metrics")]
75    fn metrics(&self) -> &Arc<crate::Metrics> {
76        &self.0.metrics
77    }
78
79    fn version_free_list_len(&self) -> usize {
80        self.manifest
81            .read()
82            .expect("lock is poisoned")
83            .version_free_list
84            .len()
85    }
86
87    fn prefix<K: AsRef<[u8]>>(
88        &self,
89        prefix: K,
90        seqno: SeqNo,
91        index: Option<Arc<Memtable>>,
92    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
93        Box::new(
94            self.create_prefix(&prefix, seqno, index)
95                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
96        )
97    }
98
99    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
100        &self,
101        range: R,
102        seqno: SeqNo,
103        index: Option<Arc<Memtable>>,
104    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
105        Box::new(
106            self.create_range(&range, seqno, index)
107                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
108        )
109    }
110
111    // TODO: doctest
112    fn tombstone_count(&self) -> u64 {
113        self.manifest
114            .read()
115            .expect("lock is poisoned")
116            .current_version()
117            .iter_segments()
118            .map(Segment::tombstone_count)
119            .sum()
120    }
121
122    fn ingest(
123        &self,
124        iter: impl Iterator<Item = (UserKey, UserValue)>,
125        seqno_generator: &SequenceNumberCounter,
126        visible_seqno: &SequenceNumberCounter,
127    ) -> crate::Result<()> {
128        use crate::tree::ingest::Ingestion;
129        use std::time::Instant;
130
131        // NOTE: Lock active memtable so nothing else can be going on while we are bulk loading
132        let memtable_lock = self.lock_active_memtable();
133
134        let seqno = seqno_generator.next();
135
136        // TODO: allow ingestion always, by flushing memtable
137        assert!(
138            memtable_lock.is_empty(),
139            "can only perform bulk ingestion with empty memtable",
140        );
141
142        let mut writer = Ingestion::new(self)?.with_seqno(seqno);
143
144        let start = Instant::now();
145        let mut count = 0;
146        let mut last_key = None;
147
148        for (key, value) in iter {
149            if let Some(last_key) = &last_key {
150                assert!(
151                    key > last_key,
152                    "next key in bulk ingest was not greater than last key",
153                );
154            }
155            last_key = Some(key.clone());
156
157            writer.write(key, value)?;
158
159            count += 1;
160        }
161
162        writer.finish()?;
163
164        visible_seqno.fetch_max(seqno + 1);
165
166        log::info!("Ingested {count} items in {:?}", start.elapsed());
167
168        Ok(())
169    }
170
171    // TODO: change API to RangeBounds<K>
172    fn drop_range(&self, key_range: crate::KeyRange) -> crate::Result<()> {
173        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(key_range));
174
175        // IMPORTANT: Write lock so we can be the only compaction going on
176        let _lock = self
177            .0
178            .major_compaction_lock
179            .write()
180            .expect("lock is poisoned");
181
182        log::info!("Starting drop_range compaction");
183        self.inner_compact(strategy, 0)
184    }
185
186    #[doc(hidden)]
187    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
188        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
189
190        // IMPORTANT: Write lock so we can be the only compaction going on
191        let _lock = self
192            .0
193            .major_compaction_lock
194            .write()
195            .expect("lock is poisoned");
196
197        log::info!("Starting major compaction");
198        self.inner_compact(strategy, seqno_threshold)
199    }
200
201    fn l0_run_count(&self) -> usize {
202        self.manifest
203            .read()
204            .expect("lock is poisoned")
205            .current_version()
206            .level(0)
207            .map(|x| x.run_count())
208            .unwrap_or_default()
209    }
210
211    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
212        // NOTE: We know that values are u32 max
213        #[allow(clippy::cast_possible_truncation)]
214        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
215    }
216
217    fn filter_size(&self) -> usize {
218        self.manifest
219            .read()
220            .expect("lock is poisoned")
221            .current_version()
222            .iter_segments()
223            .map(Segment::filter_size)
224            .sum()
225    }
226
227    fn pinned_filter_size(&self) -> usize {
228        self.manifest
229            .read()
230            .expect("lock is poisoned")
231            .current_version()
232            .iter_segments()
233            .map(Segment::pinned_filter_size)
234            .sum()
235    }
236
237    fn pinned_block_index_size(&self) -> usize {
238        self.manifest
239            .read()
240            .expect("lock is poisoned")
241            .current_version()
242            .iter_segments()
243            .map(Segment::pinned_block_index_size)
244            .sum()
245    }
246
247    fn sealed_memtable_count(&self) -> usize {
248        self.sealed_memtables
249            .read()
250            .expect("lock is poisoned")
251            .len()
252    }
253
254    /* fn verify(&self) -> crate::Result<usize> {
255        // NOTE: Lock memtable to prevent any tampering with disk segments
256        let _lock = self.lock_active_memtable();
257
258        let mut sum = 0;
259
260        let level_manifest = self.levels.read().expect("lock is poisoned");
261
262        for level in &level_manifest.levels {
263            for segment in &level.segments {
264                sum += segment.verify()?;
265            }
266        }
267
268        Ok(sum)
269    } */
270
271    fn flush_memtable(
272        &self,
273        segment_id: SegmentId,
274        memtable: &Arc<Memtable>,
275        seqno_threshold: SeqNo,
276    ) -> crate::Result<Option<(Segment, Option<BlobFile>)>> {
277        use crate::{compaction::stream::CompactionStream, file::SEGMENTS_FOLDER, segment::Writer};
278        use std::time::Instant;
279
280        let start = Instant::now();
281
282        let folder = self.config.path.join(SEGMENTS_FOLDER);
283        let segment_file_path = folder.join(segment_id.to_string());
284
285        let data_block_size = self.config.data_block_size_policy.get(0);
286        let index_block_size = self.config.index_block_size_policy.get(0);
287
288        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
289        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
290
291        let data_block_compression = self.config.data_block_compression_policy.get(0);
292        let index_block_compression = self.config.index_block_compression_policy.get(0);
293
294        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
295
296        log::debug!(
297            "Flushing segment to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, index_block_size={index_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}",
298            segment_file_path.display(),
299        );
300
301        let mut segment_writer = Writer::new(segment_file_path, segment_id)?
302            .use_data_block_restart_interval(data_block_restart_interval)
303            .use_index_block_restart_interval(index_block_restart_interval)
304            .use_data_block_compression(data_block_compression)
305            .use_index_block_compression(index_block_compression)
306            .use_data_block_size(data_block_size)
307            .use_index_block_size(index_block_size)
308            .use_data_block_hash_ratio(data_block_hash_ratio)
309            .use_bloom_policy({
310                use crate::config::FilterPolicyEntry::{Bloom, None};
311                use crate::segment::filter::BloomConstructionPolicy;
312
313                match self.config.filter_policy.get(0) {
314                    Bloom(policy) => policy,
315                    None => BloomConstructionPolicy::BitsPerKey(0.0),
316                }
317            });
318
319        let iter = memtable.iter().map(Ok);
320        let compaction_filter = CompactionStream::new(iter, seqno_threshold);
321
322        for item in compaction_filter {
323            segment_writer.write(item?)?;
324        }
325
326        let result = self.consume_writer(segment_writer)?;
327
328        log::debug!("Flushed memtable {segment_id:?} in {:?}", start.elapsed());
329
330        Ok(result.map(|segment| (segment, None)))
331    }
332
333    fn register_segments(
334        &self,
335        segments: &[Segment],
336        blob_files: Option<&[BlobFile]>,
337        seqno_threshold: SeqNo,
338    ) -> crate::Result<()> {
339        log::trace!(
340            "Registering {} segments, {} blob files",
341            segments.len(),
342            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
343        );
344
345        // NOTE: Mind lock order L -> M -> S
346        log::trace!("register: Acquiring levels manifest write lock");
347        let mut manifest = self.manifest.write().expect("lock is poisoned");
348        log::trace!("register: Acquired levels manifest write lock");
349
350        // NOTE: Mind lock order L -> M -> S
351        log::trace!("register: Acquiring sealed memtables write lock");
352        let mut sealed_memtables = self.sealed_memtables.write().expect("lock is poisoned");
353        log::trace!("register: Acquired sealed memtables write lock");
354
355        manifest.atomic_swap(
356            |version| version.with_new_l0_run(segments, blob_files),
357            seqno_threshold,
358        )?;
359
360        for segment in segments {
361            log::trace!("releasing sealed memtable {}", segment.id());
362            sealed_memtables.remove(segment.id());
363        }
364
365        Ok(())
366    }
367
368    fn lock_active_memtable(&self) -> RwLockWriteGuard<'_, Arc<Memtable>> {
369        self.active_memtable.write().expect("lock is poisoned")
370    }
371
372    fn clear_active_memtable(&self) {
373        *self.active_memtable.write().expect("lock is poisoned") = Arc::new(Memtable::default());
374    }
375
376    fn set_active_memtable(&self, memtable: Memtable) {
377        let mut memtable_lock = self.active_memtable.write().expect("lock is poisoned");
378        *memtable_lock = Arc::new(memtable);
379    }
380
381    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
382        let mut memtable_lock = self.sealed_memtables.write().expect("lock is poisoned");
383        memtable_lock.add(id, memtable);
384    }
385
386    fn compact(
387        &self,
388        strategy: Arc<dyn CompactionStrategy>,
389        seqno_threshold: SeqNo,
390    ) -> crate::Result<()> {
391        // NOTE: Read lock major compaction lock
392        // That way, if a major compaction is running, we cannot proceed
393        // But in general, parallel (non-major) compactions can occur
394        let _lock = self
395            .0
396            .major_compaction_lock
397            .read()
398            .expect("lock is poisoned");
399
400        self.inner_compact(strategy, seqno_threshold)
401    }
402
403    fn get_next_segment_id(&self) -> SegmentId {
404        self.0.get_next_segment_id()
405    }
406
407    fn tree_config(&self) -> &Config {
408        &self.config
409    }
410
411    fn active_memtable_size(&self) -> u64 {
412        use std::sync::atomic::Ordering::Acquire;
413
414        self.active_memtable
415            .read()
416            .expect("lock is poisoned")
417            .approximate_size
418            .load(Acquire)
419    }
420
421    fn tree_type(&self) -> crate::TreeType {
422        crate::TreeType::Standard
423    }
424
425    fn rotate_memtable(&self) -> Option<(MemtableId, Arc<Memtable>)> {
426        log::trace!("rotate: acquiring active memtable write lock");
427        let mut active_memtable = self.lock_active_memtable();
428
429        log::trace!("rotate: acquiring sealed memtables write lock");
430        let mut sealed_memtables = self.lock_sealed_memtables();
431
432        if active_memtable.is_empty() {
433            return None;
434        }
435
436        let yanked_memtable = std::mem::take(&mut *active_memtable);
437        let yanked_memtable = yanked_memtable;
438
439        let tmp_memtable_id = self.get_next_segment_id();
440        sealed_memtables.add(tmp_memtable_id, yanked_memtable.clone());
441
442        log::trace!("rotate: added memtable id={tmp_memtable_id} to sealed memtables");
443
444        Some((tmp_memtable_id, yanked_memtable))
445    }
446
447    fn segment_count(&self) -> usize {
448        self.manifest
449            .read()
450            .expect("lock is poisoned")
451            .current_version()
452            .segment_count()
453    }
454
455    fn level_segment_count(&self, idx: usize) -> Option<usize> {
456        self.manifest
457            .read()
458            .expect("lock is poisoned")
459            .current_version()
460            .level(idx)
461            .map(|x| x.segment_count())
462    }
463
464    #[allow(clippy::significant_drop_tightening)]
465    fn approximate_len(&self) -> usize {
466        // NOTE: Mind lock order L -> M -> S
467        let manifest = self.manifest.read().expect("lock is poisoned");
468        let memtable = self.active_memtable.read().expect("lock is poisoned");
469        let sealed = self.sealed_memtables.read().expect("lock is poisoned");
470
471        let segments_item_count = manifest
472            .current_version()
473            .iter_segments()
474            .map(|x| x.metadata.item_count)
475            .sum::<u64>();
476
477        let memtable_count = memtable.len() as u64;
478        let sealed_count = sealed.iter().map(|(_, mt)| mt.len()).sum::<usize>() as u64;
479
480        (memtable_count + sealed_count + segments_item_count)
481            .try_into()
482            .expect("should not be too large")
483    }
484
485    fn disk_space(&self) -> u64 {
486        self.manifest
487            .read()
488            .expect("lock is poisoned")
489            .current_version()
490            .iter_levels()
491            .map(super::version::Level::size)
492            .sum()
493    }
494
495    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
496        let active = self
497            .active_memtable
498            .read()
499            .expect("lock is poisoned")
500            .get_highest_seqno();
501
502        let sealed = self
503            .sealed_memtables
504            .read()
505            .expect("Lock is poisoned")
506            .iter()
507            .map(|(_, table)| table.get_highest_seqno())
508            .max()
509            .flatten();
510
511        active.max(sealed)
512    }
513
514    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
515        self.manifest
516            .read()
517            .expect("lock is poisoned")
518            .current_version()
519            .iter_segments()
520            .map(Segment::get_highest_seqno)
521            .max()
522    }
523
524    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
525        Ok(self
526            .get_internal_entry(key.as_ref(), seqno)?
527            .map(|x| x.value))
528    }
529
530    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
531        &self,
532        key: K,
533        value: V,
534        seqno: SeqNo,
535    ) -> (u64, u64) {
536        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
537        self.append_entry(value)
538    }
539
540    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
541        let value = InternalValue::new_tombstone(key, seqno);
542        self.append_entry(value)
543    }
544
545    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
546        let value = InternalValue::new_weak_tombstone(key, seqno);
547        self.append_entry(value)
548    }
549}
550
551impl Tree {
552    /// Opens an LSM-tree in the given directory.
553    ///
554    /// Will recover previous state if the folder was previously
555    /// occupied by an LSM-tree, including the previous configuration.
556    /// If not, a new tree will be initialized with the given config.
557    ///
558    /// After recovering a previous state, use [`Tree::set_active_memtable`]
559    /// to fill the memtable with data from a write-ahead log for full durability.
560    ///
561    /// # Errors
562    ///
563    /// Returns error, if an IO error occurred.
564    pub(crate) fn open(config: Config) -> crate::Result<Self> {
565        use crate::file::MANIFEST_FILE;
566
567        log::debug!("Opening LSM-tree at {}", config.path.display());
568
569        // Check for old version
570        if config.path.join("version").try_exists()? {
571            return Err(crate::Error::InvalidVersion(FormatVersion::V1));
572        }
573
574        let tree = if config.path.join(MANIFEST_FILE).try_exists()? {
575            Self::recover(config)
576        } else {
577            Self::create_new(config)
578        }?;
579
580        Ok(tree)
581    }
582
583    pub(crate) fn read_lock_active_memtable(&self) -> RwLockReadGuard<'_, Arc<Memtable>> {
584        self.active_memtable.read().expect("lock is poisoned")
585    }
586
587    pub(crate) fn consume_writer(
588        &self,
589        writer: crate::segment::Writer,
590    ) -> crate::Result<Option<Segment>> {
591        let segment_file_path = writer.path.clone();
592
593        let Some(_) = writer.finish()? else {
594            return Ok(None);
595        };
596
597        log::debug!("Finalized segment write at {}", segment_file_path.display());
598
599        /* let block_index =
600            FullBlockIndex::from_file(&segment_file_path, &trailer.metadata, &trailer.offsets)?;
601        let block_index = Arc::new(BlockIndexImpl::Full(block_index));
602
603        let created_segment: Segment = SegmentInner {
604            path: segment_file_path.clone(),
605
606            tree_id: self.id,
607
608            metadata: trailer.metadata,
609            offsets: trailer.offsets,
610
611            descriptor_table: self.config.descriptor_table.clone(),
612            block_index,
613            cache: self.config.cache.clone(),
614
615            bloom_filter: Segment::load_bloom(&segment_file_path, trailer.offsets.bloom_ptr)?,
616
617            is_deleted: AtomicBool::default(),
618        }
619        .into(); */
620
621        /* self.config
622        .descriptor_table
623        .insert(segment_file_path, created_segment.global_id()); */
624
625        let pin_filter = self.config.filter_block_pinning_policy.get(0);
626        let pin_index = self.config.filter_block_pinning_policy.get(0);
627
628        let created_segment = Segment::recover(
629            segment_file_path,
630            self.id,
631            self.config.cache.clone(),
632            self.config.descriptor_table.clone(),
633            pin_filter,
634            pin_index,
635            #[cfg(feature = "metrics")]
636            self.metrics.clone(),
637        )?;
638
639        log::debug!("Flushed segment to {:?}", created_segment.path);
640
641        Ok(Some(created_segment))
642    }
643
644    /// Synchronously flushes the active memtable to a disk segment.
645    ///
646    /// The function may not return a result, if, during concurrent workloads, the memtable
647    /// ends up being empty before the flush is set up.
648    ///
649    /// The result will contain the [`Segment`].
650    ///
651    /// # Errors
652    ///
653    /// Will return `Err` if an IO error occurs.
654    #[doc(hidden)]
655    pub fn flush_active_memtable(&self, seqno_threshold: SeqNo) -> crate::Result<Option<Segment>> {
656        log::debug!("Flushing active memtable");
657
658        let Some((segment_id, yanked_memtable)) = self.rotate_memtable() else {
659            return Ok(None);
660        };
661
662        let Some((segment, _)) =
663            self.flush_memtable(segment_id, &yanked_memtable, seqno_threshold)?
664        else {
665            return Ok(None);
666        };
667        self.register_segments(std::slice::from_ref(&segment), None, seqno_threshold)?;
668
669        Ok(Some(segment))
670    }
671
672    /// Returns `true` if there are some segments that are being compacted.
673    #[doc(hidden)]
674    #[must_use]
675    pub fn is_compacting(&self) -> bool {
676        self.manifest
677            .read()
678            .expect("lock is poisoned")
679            .is_compacting()
680    }
681
682    /// Write-locks the sealed memtables for exclusive access
683    fn lock_sealed_memtables(&self) -> RwLockWriteGuard<'_, SealedMemtables> {
684        self.sealed_memtables.write().expect("lock is poisoned")
685    }
686
687    // TODO: maybe not needed anyway
688    /// Used for [`BlobTree`] lookup
689    pub(crate) fn get_internal_entry_with_memtable(
690        &self,
691        memtable_lock: &Memtable,
692        key: &[u8],
693        seqno: SeqNo,
694    ) -> crate::Result<Option<InternalValue>> {
695        if let Some(entry) = memtable_lock.get(key, seqno) {
696            return Ok(ignore_tombstone_value(entry));
697        }
698
699        // Now look in sealed memtables
700        if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) {
701            return Ok(ignore_tombstone_value(entry));
702        }
703
704        self.get_internal_entry_from_segments(key, seqno)
705    }
706
707    fn get_internal_entry_from_sealed_memtables(
708        &self,
709        key: &[u8],
710        seqno: SeqNo,
711    ) -> Option<InternalValue> {
712        let memtable_lock = self.sealed_memtables.read().expect("lock is poisoned");
713
714        for (_, memtable) in memtable_lock.iter().rev() {
715            if let Some(entry) = memtable.get(key, seqno) {
716                return Some(entry);
717            }
718        }
719
720        None
721    }
722
723    fn get_internal_entry_from_segments(
724        &self,
725        key: &[u8],
726        seqno: SeqNo,
727    ) -> crate::Result<Option<InternalValue>> {
728        // NOTE: Create key hash for hash sharing
729        // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/
730        let key_hash = crate::segment::filter::standard_bloom::Builder::get_hash(key);
731
732        let manifest = self.manifest.read().expect("lock is poisoned");
733
734        for level in manifest.current_version().iter_levels() {
735            for run in level.iter() {
736                // NOTE: Based on benchmarking, binary search is only worth it with ~4 segments
737                if run.len() >= 4 {
738                    if let Some(segment) = run.get_for_key(key) {
739                        if let Some(item) = segment.get(key, seqno, key_hash)? {
740                            return Ok(ignore_tombstone_value(item));
741                        }
742                    }
743                } else {
744                    // NOTE: Fallback to linear search
745                    for segment in run.iter() {
746                        if !segment.is_key_in_key_range(key) {
747                            continue;
748                        }
749
750                        if let Some(item) = segment.get(key, seqno, key_hash)? {
751                            return Ok(ignore_tombstone_value(item));
752                        }
753                    }
754                }
755            }
756        }
757
758        Ok(None)
759    }
760
761    #[doc(hidden)]
762    pub fn get_internal_entry(
763        &self,
764        key: &[u8],
765        seqno: SeqNo,
766    ) -> crate::Result<Option<InternalValue>> {
767        // TODO: consolidate memtable & sealed behind single RwLock
768
769        let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
770
771        if let Some(entry) = memtable_lock.get(key, seqno) {
772            return Ok(ignore_tombstone_value(entry));
773        }
774
775        drop(memtable_lock);
776
777        // Now look in sealed memtables
778        if let Some(entry) = self.get_internal_entry_from_sealed_memtables(key, seqno) {
779            return Ok(ignore_tombstone_value(entry));
780        }
781
782        // Now look in segments... this may involve disk I/O
783        self.get_internal_entry_from_segments(key, seqno)
784    }
785
786    fn inner_compact(
787        &self,
788        strategy: Arc<dyn CompactionStrategy>,
789        seqno_threshold: SeqNo,
790    ) -> crate::Result<()> {
791        use crate::compaction::worker::{do_compaction, Options};
792
793        let mut opts = Options::from_tree(self, strategy);
794        opts.eviction_seqno = seqno_threshold;
795
796        do_compaction(&opts)?;
797
798        log::debug!("Compaction run over");
799
800        Ok(())
801    }
802
803    #[doc(hidden)]
804    #[must_use]
805    pub fn create_iter(
806        &self,
807        seqno: SeqNo,
808        ephemeral: Option<Arc<Memtable>>,
809    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
810        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
811    }
812
813    #[doc(hidden)]
814    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
815        &'a self,
816        range: &'a R,
817        seqno: SeqNo,
818        ephemeral: Option<Arc<Memtable>>,
819    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
820        use crate::range::{IterState, TreeIter};
821        use std::ops::Bound::{self, Excluded, Included, Unbounded};
822
823        let lo: Bound<UserKey> = match range.start_bound() {
824            Included(x) => Included(x.as_ref().into()),
825            Excluded(x) => Excluded(x.as_ref().into()),
826            Unbounded => Unbounded,
827        };
828
829        let hi: Bound<UserKey> = match range.end_bound() {
830            Included(x) => Included(x.as_ref().into()),
831            Excluded(x) => Excluded(x.as_ref().into()),
832            Unbounded => Unbounded,
833        };
834
835        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
836
837        // NOTE: Mind lock order L -> M -> S
838        log::trace!("range read: acquiring read locks");
839
840        let manifest = self.manifest.read().expect("lock is poisoned");
841
842        let iter_state = {
843            let active = self.active_memtable.read().expect("lock is poisoned");
844            let sealed = &self.sealed_memtables.read().expect("lock is poisoned");
845
846            IterState {
847                active: active.clone(),
848                sealed: sealed.iter().map(|(_, mt)| mt.clone()).collect(),
849                ephemeral,
850                version: manifest.current_version().clone(),
851            }
852        };
853
854        TreeIter::create_range(iter_state, bounds, seqno, &manifest)
855    }
856
857    #[doc(hidden)]
858    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
859        &self,
860        range: &'a R,
861        seqno: SeqNo,
862        ephemeral: Option<Arc<Memtable>>,
863    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
864        self.create_internal_range(range, seqno, ephemeral)
865            .map(|item| match item {
866                Ok(kv) => Ok((kv.key.user_key, kv.value)),
867                Err(e) => Err(e),
868            })
869    }
870
871    #[doc(hidden)]
872    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
873        &self,
874        prefix: K,
875        seqno: SeqNo,
876        ephemeral: Option<Arc<Memtable>>,
877    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
878        use crate::range::prefix_to_range;
879
880        let range = prefix_to_range(prefix.as_ref());
881        self.create_range(&range, seqno, ephemeral)
882    }
883
884    /// Adds an item to the active memtable.
885    ///
886    /// Returns the added item's size and new size of the memtable.
887    #[doc(hidden)]
888    #[must_use]
889    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
890        let memtable_lock = self.active_memtable.read().expect("lock is poisoned");
891        memtable_lock.insert(value)
892    }
893
894    /// Recovers previous state, by loading the level manifest and segments.
895    ///
896    /// # Errors
897    ///
898    /// Returns error, if an IO error occurred.
899    fn recover(mut config: Config) -> crate::Result<Self> {
900        use crate::{file::MANIFEST_FILE, stop_signal::StopSignal};
901        use inner::get_next_tree_id;
902
903        log::info!("Recovering LSM-tree at {}", config.path.display());
904
905        let bytes = std::fs::read(config.path.join(MANIFEST_FILE))?;
906        let mut bytes = Cursor::new(bytes);
907        let manifest = Manifest::decode_from(&mut bytes)?;
908
909        if manifest.version != FormatVersion::V3 {
910            return Err(crate::Error::InvalidVersion(manifest.version));
911        }
912
913        // IMPORTANT: Restore persisted config
914        config.level_count = manifest.level_count;
915        config.tree_type = manifest.tree_type;
916
917        let tree_id = get_next_tree_id();
918
919        #[cfg(feature = "metrics")]
920        let metrics = Arc::new(Metrics::default());
921
922        let levels = Self::recover_levels(
923            &config.path,
924            tree_id,
925            &config.cache,
926            &config.descriptor_table,
927            #[cfg(feature = "metrics")]
928            &metrics,
929        )?;
930
931        let highest_segment_id = levels.iter().map(Segment::id).max().unwrap_or_default();
932
933        let inner = TreeInner {
934            id: tree_id,
935            segment_id_counter: Arc::new(AtomicU64::new(highest_segment_id + 1)),
936            active_memtable: Arc::default(),
937            sealed_memtables: Arc::default(),
938            manifest: Arc::new(RwLock::new(levels)),
939            stop_signal: StopSignal::default(),
940            config,
941            major_compaction_lock: RwLock::default(),
942            #[cfg(feature = "metrics")]
943            metrics,
944        };
945
946        Ok(Self(Arc::new(inner)))
947    }
948
949    /// Creates a new LSM-tree in a directory.
950    fn create_new(config: Config) -> crate::Result<Self> {
951        use crate::file::{fsync_directory, MANIFEST_FILE, SEGMENTS_FOLDER};
952        use std::fs::{create_dir_all, File};
953
954        let path = config.path.clone();
955        log::trace!("Creating LSM-tree at {}", path.display());
956
957        create_dir_all(&path)?;
958
959        let manifest_path = path.join(MANIFEST_FILE);
960        assert!(!manifest_path.try_exists()?);
961
962        let segment_folder_path = path.join(SEGMENTS_FOLDER);
963        create_dir_all(&segment_folder_path)?;
964
965        // NOTE: Lastly, fsync version marker, which contains the version
966        // -> the LSM is fully initialized
967        let mut file = File::create_new(manifest_path)?;
968        Manifest {
969            version: FormatVersion::V3,
970            level_count: config.level_count,
971            tree_type: config.tree_type,
972            // table_type: TableType::Block,
973        }
974        .encode_into(&mut file)?;
975        file.sync_all()?;
976
977        // IMPORTANT: fsync folders on Unix
978        fsync_directory(&segment_folder_path)?;
979        fsync_directory(&path)?;
980
981        let inner = TreeInner::create_new(config)?;
982        Ok(Self(Arc::new(inner)))
983    }
984
985    /// Recovers the level manifest, loading all segments from disk.
986    fn recover_levels<P: AsRef<Path>>(
987        tree_path: P,
988        tree_id: TreeId,
989        cache: &Arc<Cache>,
990        descriptor_table: &Arc<DescriptorTable>,
991        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
992    ) -> crate::Result<LevelManifest> {
993        use crate::{file::fsync_directory, file::SEGMENTS_FOLDER, SegmentId};
994
995        let tree_path = tree_path.as_ref();
996
997        let recovery = LevelManifest::recover_ids(tree_path)?;
998
999        let segment_id_map = {
1000            let mut result: crate::HashMap<SegmentId, u8 /* Level index */> =
1001                crate::HashMap::default();
1002
1003            for (level_idx, segment_ids) in recovery.segment_ids.iter().enumerate() {
1004                for run in segment_ids {
1005                    for segment_id in run {
1006                        // NOTE: We know there are always less than 256 levels
1007                        #[allow(clippy::expect_used)]
1008                        result.insert(
1009                            *segment_id,
1010                            level_idx
1011                                .try_into()
1012                                .expect("there are less than 256 levels"),
1013                        );
1014                    }
1015                }
1016            }
1017
1018            result
1019        };
1020
1021        let cnt = segment_id_map.len();
1022
1023        log::debug!(
1024            "Recovering {cnt} disk segments from {}",
1025            tree_path.display(),
1026        );
1027
1028        let progress_mod = match cnt {
1029            _ if cnt <= 20 => 1,
1030            _ if cnt <= 100 => 10,
1031            _ => 100,
1032        };
1033
1034        let mut segments = vec![];
1035
1036        let segment_base_folder = tree_path.join(SEGMENTS_FOLDER);
1037
1038        if !segment_base_folder.try_exists()? {
1039            std::fs::create_dir_all(&segment_base_folder)?;
1040            fsync_directory(&segment_base_folder)?;
1041        }
1042
1043        for (idx, dirent) in std::fs::read_dir(&segment_base_folder)?.enumerate() {
1044            let dirent = dirent?;
1045            let file_name = dirent.file_name();
1046
1047            // https://en.wikipedia.org/wiki/.DS_Store
1048            if file_name == ".DS_Store" {
1049                continue;
1050            }
1051
1052            // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
1053            if file_name.to_string_lossy().starts_with("._") {
1054                continue;
1055            }
1056
1057            let segment_file_name = file_name.to_str().ok_or_else(|| {
1058                log::error!("invalid segment file name {file_name:?}");
1059                crate::Error::Unrecoverable
1060            })?;
1061
1062            let segment_file_path = dirent.path();
1063            assert!(!segment_file_path.is_dir());
1064
1065            log::debug!("Recovering segment from {}", segment_file_path.display());
1066
1067            let segment_id = segment_file_name.parse::<SegmentId>().map_err(|e| {
1068                log::error!("invalid segment file name {segment_file_name:?}: {e:?}");
1069                crate::Error::Unrecoverable
1070            })?;
1071
1072            if let Some(&level_idx) = segment_id_map.get(&segment_id) {
1073                let segment = Segment::recover(
1074                    segment_file_path,
1075                    tree_id,
1076                    cache.clone(),
1077                    descriptor_table.clone(),
1078                    level_idx <= 1, // TODO: look at configuration
1079                    level_idx <= 2, // TODO: look at configuration
1080                    #[cfg(feature = "metrics")]
1081                    metrics.clone(),
1082                )?;
1083
1084                log::debug!("Recovered segment from {:?}", segment.path);
1085
1086                segments.push(segment);
1087
1088                if idx % progress_mod == 0 {
1089                    log::debug!("Recovered {idx}/{cnt} disk segments");
1090                }
1091            } else {
1092                log::debug!(
1093                    "Deleting unfinished segment: {}",
1094                    segment_file_path.display(),
1095                );
1096                std::fs::remove_file(&segment_file_path)?;
1097            }
1098        }
1099
1100        if segments.len() < cnt {
1101            log::error!(
1102                "Recovered less segments than expected: {:?}",
1103                segment_id_map.keys(),
1104            );
1105            return Err(crate::Error::Unrecoverable);
1106        }
1107
1108        log::debug!("Successfully recovered {} segments", segments.len());
1109
1110        let blob_files = crate::vlog::recover_blob_files(
1111            &tree_path.join(BLOBS_FOLDER),
1112            &recovery.blob_file_ids,
1113        )?;
1114
1115        LevelManifest::recover(tree_path, &recovery, &segments, &blob_files)
1116    }
1117}