lsm_tree/blob_tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod compression;
6mod gc;
7pub mod index;
8pub mod value;
9
10use crate::{
11    coding::{Decode, Encode},
12    compaction::stream::CompactionStream,
13    file::BLOBS_FOLDER,
14    r#abstract::{AbstractTree, RangeItem},
15    tree::inner::MemtableId,
16    value::InternalValue,
17    Config, KvPair, Memtable, Segment, SegmentId, SeqNo, Snapshot, UserKey, UserValue,
18};
19use compression::MyCompressor;
20use gc::{reader::GcReader, writer::GcWriter};
21use index::IndexTree;
22use std::{
23    io::Cursor,
24    ops::{RangeBounds, RangeFull},
25    sync::{atomic::AtomicUsize, Arc},
26};
27use value::MaybeInlineValue;
28use value_log::ValueLog;
29
30fn resolve_value_handle(vlog: &ValueLog<MyCompressor>, item: RangeItem) -> RangeItem {
31    use MaybeInlineValue::{Indirect, Inline};
32
33    match item {
34        Ok((key, value)) => {
35            let mut cursor = Cursor::new(value);
36
37            match MaybeInlineValue::decode_from(&mut cursor)? {
38                Inline(bytes) => Ok((key, bytes)),
39                Indirect { vhandle, .. } => {
40                    // Resolve indirection using value log
41                    match vlog.get(&vhandle) {
42                        Ok(Some(bytes)) => Ok((key, bytes)),
43                        Err(e) => Err(e.into()),
44                        _ => {
45                            panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(&key))
46                        }
47                    }
48                }
49            }
50        }
51        Err(e) => Err(e),
52    }
53}
54
55/// A key-value-separated log-structured merge tree
56///
57/// This tree is a composite structure, consisting of an
58/// index tree (LSM-tree) and a log-structured value log
59/// to reduce write amplification.
60///
61/// See <https://docs.rs/value-log> for more information.
62#[derive(Clone)]
63pub struct BlobTree {
64    /// Index tree that holds value handles or small inline values
65    #[doc(hidden)]
66    pub index: IndexTree,
67
68    /// Log-structured value-log that stores large values
69    #[doc(hidden)]
70    pub blobs: ValueLog<MyCompressor>,
71
72    // TODO: maybe replace this with a nonce system
73    #[doc(hidden)]
74    pub pending_segments: Arc<AtomicUsize>,
75}
76
77impl BlobTree {
78    pub(crate) fn open(config: Config) -> crate::Result<Self> {
79        let path = &config.path;
80
81        let vlog_path = path.join(BLOBS_FOLDER);
82        let vlog_cfg = value_log::Config::<MyCompressor>::default()
83            .blob_cache(config.blob_cache.clone())
84            .segment_size_bytes(config.blob_file_target_size)
85            .compression(MyCompressor(config.blob_compression));
86
87        let index: IndexTree = config.open()?.into();
88
89        Ok(Self {
90            index,
91            blobs: ValueLog::open(vlog_path, vlog_cfg)?,
92            pending_segments: Arc::new(AtomicUsize::new(0)),
93        })
94    }
95
96    /// Scans the index tree, collecting statistics about
97    /// value log fragmentation
98    #[doc(hidden)]
99    pub fn gc_scan_stats(
100        &self,
101        seqno: SeqNo,
102        gc_watermark: SeqNo,
103    ) -> crate::Result<crate::gc::Report> {
104        use std::io::{Error as IoError, ErrorKind as IoErrorKind};
105        use MaybeInlineValue::{Indirect, Inline};
106
107        while self
108            .pending_segments
109            .load(std::sync::atomic::Ordering::Acquire)
110            > 0
111        {
112            // IMPORTANT: Busy wait until all segments in-flight are committed
113            // to the tree
114        }
115
116        // IMPORTANT: Lock + snapshot memtable to avoid read skew + preventing tampering with memtable
117        let _memtable_lock = self.index.read_lock_active_memtable();
118
119        while self
120            .pending_segments
121            .load(std::sync::atomic::Ordering::Acquire)
122            > 0
123        {
124            // IMPORTANT: Busy wait again until all segments in-flight are committed
125            // to the tree
126        }
127
128        let iter = self
129            .index
130            .create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None);
131
132        // Stores the max seqno of every blob file
133        let mut seqno_map = crate::HashMap::<SegmentId, SeqNo>::default();
134
135        let result = self
136            .blobs
137            .scan_for_stats(iter.filter_map(|kv| {
138                let Ok(kv) = kv else {
139                    return Some(Err(IoError::new(
140                        IoErrorKind::Other,
141                        "Failed to load KV pair from index tree",
142                    )));
143                };
144
145                let mut cursor = Cursor::new(kv.value);
146                let value = match MaybeInlineValue::decode_from(&mut cursor) {
147                    Ok(v) => v,
148                    Err(e) => return Some(Err(IoError::new(IoErrorKind::Other, e.to_string()))),
149                };
150
151                match value {
152                    Indirect { vhandle, size } => {
153                        seqno_map
154                            .entry(vhandle.segment_id)
155                            .and_modify(|x| *x = (*x).max(kv.key.seqno))
156                            .or_insert(kv.key.seqno);
157
158                        Some(Ok((vhandle, size)))
159                    }
160                    Inline(_) => None,
161                }
162            }))
163            .map_err(Into::into);
164
165        let mut lock = self
166            .blobs
167            .manifest
168            .segments
169            .write()
170            .expect("lock is poisoned");
171
172        // IMPORTANT: We are overwiting the staleness of blob files
173        // that contain an item that is still contained in the GC watermark
174        // so snapshots cannot accidentally lose data
175        //
176        // TODO: 3.0.0 this should be dealt with in value-log 2.0 (make it MVCC aware)
177        for (blob_file_id, max_seqno) in seqno_map {
178            if gc_watermark <= max_seqno {
179                if let Some(blob_file) = lock.get_mut(&blob_file_id) {
180                    blob_file.gc_stats.set_stale_items(0);
181                    blob_file.gc_stats.set_stale_bytes(0);
182                }
183            }
184        }
185
186        result
187    }
188
189    pub fn apply_gc_strategy(
190        &self,
191        strategy: &impl value_log::GcStrategy<MyCompressor>,
192        seqno: SeqNo,
193    ) -> crate::Result<u64> {
194        // IMPORTANT: Write lock memtable to avoid read skew
195        let memtable_lock = self.index.lock_active_memtable();
196
197        self.blobs.apply_gc_strategy(
198            strategy,
199            &GcReader::new(&self.index, &memtable_lock),
200            GcWriter::new(seqno, &memtable_lock),
201        )?;
202
203        // NOTE: We still have the memtable lock, can't use gc_drop_stale because recursive locking
204        self.blobs.drop_stale_segments().map_err(Into::into)
205    }
206
207    /// Drops all stale blob segment files
208    #[doc(hidden)]
209    pub fn gc_drop_stale(&self) -> crate::Result<u64> {
210        // IMPORTANT: Write lock memtable to avoid read skew
211        let _lock = self.index.lock_active_memtable();
212
213        self.blobs.drop_stale_segments().map_err(Into::into)
214    }
215
216    #[doc(hidden)]
217    pub fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Segment>> {
218        let Some((segment_id, yanked_memtable)) = self.index.rotate_memtable() else {
219            return Ok(None);
220        };
221
222        let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)?
223        else {
224            return Ok(None);
225        };
226        self.register_segments(&[segment.clone()])?;
227
228        Ok(Some(segment))
229    }
230}
231
232impl AbstractTree for BlobTree {
233    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
234        self.index.major_compact(target_size, seqno_threshold)
235    }
236
237    fn clear_active_memtable(&self) {
238        self.index.clear_active_memtable();
239    }
240
241    fn l0_run_count(&self) -> usize {
242        self.index.l0_run_count()
243    }
244
245    fn blob_file_count(&self) -> usize {
246        self.blobs.segment_count()
247    }
248
249    // NOTE: We skip reading from the value log
250    // because the vHandles already store the value size
251    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
252        let vhandle = self.index.get_vhandle(key.as_ref(), seqno)?;
253
254        Ok(vhandle.map(|x| match x {
255            MaybeInlineValue::Inline(v) => v.len() as u32,
256
257            // NOTE: We skip reading from the value log
258            // because the indirections already store the value size
259            MaybeInlineValue::Indirect { size, .. } => size,
260        }))
261    }
262
263    fn bloom_filter_size(&self) -> usize {
264        self.index.bloom_filter_size()
265    }
266
267    fn sealed_memtable_count(&self) -> usize {
268        self.index.sealed_memtable_count()
269    }
270
271    #[doc(hidden)]
272    fn verify(&self) -> crate::Result<usize> {
273        let index_tree_sum = self.index.verify()?;
274        let vlog_sum = self.blobs.verify()?;
275        Ok(index_tree_sum + vlog_sum)
276    }
277
278    fn keys(
279        &self,
280        seqno: Option<SeqNo>,
281        index: Option<Arc<Memtable>>,
282    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
283        self.index.keys(seqno, index)
284    }
285
286    fn values(
287        &self,
288        seqno: Option<SeqNo>,
289        index: Option<Arc<Memtable>>,
290    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
291        Box::new(self.iter(seqno, index).map(|x| x.map(|(_, v)| v)))
292    }
293
294    fn flush_memtable(
295        &self,
296        segment_id: SegmentId,
297        memtable: &Arc<Memtable>,
298        eviction_seqno: SeqNo,
299    ) -> crate::Result<Option<Segment>> {
300        use crate::{
301            file::SEGMENTS_FOLDER,
302            segment::writer::{Options, Writer as SegmentWriter},
303        };
304        use value::MaybeInlineValue;
305
306        let lsm_segment_folder = self.index.config.path.join(SEGMENTS_FOLDER);
307
308        log::debug!("flushing memtable & performing key-value separation");
309        log::debug!("=> to LSM segments in {:?}", lsm_segment_folder);
310        log::debug!("=> to blob segment at {:?}", self.blobs.path);
311
312        let mut segment_writer = SegmentWriter::new(Options {
313            segment_id,
314            data_block_size: self.index.config.data_block_size,
315            index_block_size: self.index.config.index_block_size,
316            folder: lsm_segment_folder,
317        })?
318        .use_compression(self.index.config.compression);
319
320        segment_writer = segment_writer.use_bloom_policy(
321            crate::segment::writer::BloomConstructionPolicy::FpRate(0.0001),
322        );
323
324        let mut blob_writer = self.blobs.get_writer()?;
325
326        let iter = memtable.iter().map(Ok);
327        let compaction_filter = CompactionStream::new(iter, eviction_seqno);
328
329        for item in compaction_filter {
330            let item = item?;
331
332            if item.is_tombstone() {
333                // NOTE: Still need to add tombstone to index tree
334                // But no blob to blob writer
335
336                segment_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
337                continue;
338            }
339
340            let mut cursor = Cursor::new(item.value);
341
342            let value = MaybeInlineValue::decode_from(&mut cursor)?;
343            let value = match value {
344                MaybeInlineValue::Inline(value) => value,
345                indirection @ MaybeInlineValue::Indirect { .. } => {
346                    // NOTE: This is a previous indirection, just write it to index tree
347                    // without writing the blob again
348
349                    let mut serialized_indirection = vec![];
350                    indirection.encode_into(&mut serialized_indirection)?;
351
352                    segment_writer
353                        .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
354
355                    continue;
356                }
357            };
358
359            // NOTE: Values are 32-bit max
360            #[allow(clippy::cast_possible_truncation)]
361            let value_size = value.len() as u32;
362
363            if value_size >= self.index.config.blob_file_separation_threshold {
364                let vhandle = blob_writer.get_next_value_handle();
365
366                let indirection = MaybeInlineValue::Indirect {
367                    vhandle,
368                    size: value_size,
369                };
370                let mut serialized_indirection = vec![];
371                indirection.encode_into(&mut serialized_indirection)?;
372
373                segment_writer
374                    .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
375
376                blob_writer.write(&item.key.user_key, value)?;
377            } else {
378                let direct = MaybeInlineValue::Inline(value);
379                let serialized_direct = direct.encode_into_vec();
380                segment_writer.write(InternalValue::new(item.key, serialized_direct))?;
381            }
382        }
383
384        let _memtable_lock = self.lock_active_memtable();
385
386        log::trace!("Register blob writer into value log");
387        self.blobs.register_writer(blob_writer)?;
388
389        log::trace!("Creating LSM-tree segment {segment_id}");
390        let segment = self.index.consume_writer(segment_id, segment_writer)?;
391
392        // TODO: this can probably solved in a nicer way
393        if segment.is_some() {
394            // IMPORTANT: Increment the pending count
395            // so there cannot be a GC scan now, until the segment is registered
396            self.pending_segments
397                .fetch_add(1, std::sync::atomic::Ordering::Release);
398        }
399
400        Ok(segment)
401    }
402
403    fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
404        self.index.register_segments(segments)?;
405
406        let count = self
407            .pending_segments
408            .load(std::sync::atomic::Ordering::Acquire);
409
410        assert!(
411            count >= segments.len(),
412            "pending_segments is less than segments to register - this is a bug"
413        );
414
415        self.pending_segments
416            .fetch_sub(segments.len(), std::sync::atomic::Ordering::Release);
417
418        Ok(())
419    }
420
421    fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Arc<Memtable>> {
422        self.index.lock_active_memtable()
423    }
424
425    fn set_active_memtable(&self, memtable: Memtable) {
426        self.index.set_active_memtable(memtable);
427    }
428
429    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
430        self.index.add_sealed_memtable(id, memtable);
431    }
432
433    fn compact(
434        &self,
435        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
436        seqno_threshold: SeqNo,
437    ) -> crate::Result<()> {
438        self.index.compact(strategy, seqno_threshold)
439    }
440
441    fn get_next_segment_id(&self) -> SegmentId {
442        self.index.get_next_segment_id()
443    }
444
445    fn tree_config(&self) -> &Config {
446        &self.index.config
447    }
448
449    fn get_highest_seqno(&self) -> Option<SeqNo> {
450        self.index.get_highest_seqno()
451    }
452
453    fn active_memtable_size(&self) -> u32 {
454        self.index.active_memtable_size()
455    }
456
457    fn tree_type(&self) -> crate::TreeType {
458        crate::TreeType::Blob
459    }
460
461    fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
462        self.index.rotate_memtable()
463    }
464
465    fn segment_count(&self) -> usize {
466        self.index.segment_count()
467    }
468
469    fn level_segment_count(&self, idx: usize) -> Option<usize> {
470        self.index.level_segment_count(idx)
471    }
472
473    fn approximate_len(&self) -> usize {
474        self.index.approximate_len()
475    }
476
477    // NOTE: Override the default implementation to not fetch
478    // data from the value log, so we get much faster key reads
479    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<bool> {
480        self.index.contains_key(key, seqno)
481    }
482
483    // NOTE: Override the default implementation to not fetch
484    // data from the value log, so we get much faster scans
485    fn len(&self, seqno: Option<SeqNo>, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
486        self.index.len(seqno, index)
487    }
488
489    #[must_use]
490    fn disk_space(&self) -> u64 {
491        self.index.disk_space() + self.blobs.manifest.disk_space_used()
492    }
493
494    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
495        self.index.get_highest_memtable_seqno()
496    }
497
498    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
499        self.index.get_highest_persisted_seqno()
500    }
501
502    fn snapshot(&self, seqno: SeqNo) -> Snapshot {
503        use crate::AnyTree::Blob;
504
505        Snapshot::new(Blob(self.clone()), seqno)
506    }
507
508    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
509        &self,
510        range: R,
511        seqno: Option<SeqNo>,
512        index: Option<Arc<Memtable>>,
513    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
514        let vlog = self.blobs.clone();
515        Box::new(
516            self.index
517                .0
518                .create_range(&range, seqno, index)
519                .map(move |item| resolve_value_handle(&vlog, item)),
520        )
521    }
522
523    fn prefix<K: AsRef<[u8]>>(
524        &self,
525        prefix: K,
526        seqno: Option<SeqNo>,
527        index: Option<Arc<Memtable>>,
528    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
529        let vlog = self.blobs.clone();
530        Box::new(
531            self.index
532                .0
533                .create_prefix(prefix, seqno, index)
534                .map(move |item| resolve_value_handle(&vlog, item)),
535        )
536    }
537
538    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
539        &self,
540        key: K,
541        value: V,
542        seqno: SeqNo,
543    ) -> (u32, u32) {
544        use value::MaybeInlineValue;
545
546        // NOTE: Initially, we always write an inline value
547        // On memtable flush, depending on the values' sizes, they will be separated
548        // into inline or indirect values
549        let item = MaybeInlineValue::Inline(value.into());
550
551        let value = item.encode_into_vec();
552
553        self.index.insert(key, value, seqno)
554    }
555
556    fn get<K: AsRef<[u8]>>(
557        &self,
558        key: K,
559        seqno: Option<SeqNo>,
560    ) -> crate::Result<Option<crate::UserValue>> {
561        use value::MaybeInlineValue::{Indirect, Inline};
562
563        let key = key.as_ref();
564
565        let Some(value) = self.index.get_vhandle(key, seqno)? else {
566            return Ok(None);
567        };
568
569        match value {
570            Inline(bytes) => Ok(Some(bytes)),
571            Indirect { vhandle, .. } => {
572                // Resolve indirection using value log
573                match self.blobs.get(&vhandle)? {
574                    Some(bytes) => Ok(Some(bytes)),
575                    None => {
576                        panic!("value handle ({key:?} => {vhandle:?}) did not match any blob - this is a bug")
577                    }
578                }
579            }
580        }
581    }
582
583    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
584        self.index.remove(key, seqno)
585    }
586
587    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
588        self.index.remove_weak(key, seqno)
589    }
590}