lsm_tree/blob_tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod cache;
6mod compression;
7mod gc;
8pub mod index;
9pub mod value;
10
11use crate::{
12    coding::{Decode, Encode},
13    compaction::stream::CompactionStream,
14    file::BLOBS_FOLDER,
15    r#abstract::{AbstractTree, RangeItem},
16    tree::inner::MemtableId,
17    value::InternalValue,
18    Config, KvPair, Memtable, Segment, SegmentId, SeqNo, Snapshot, UserKey, UserValue,
19};
20use cache::MyBlobCache;
21use compression::MyCompressor;
22use gc::{reader::GcReader, writer::GcWriter};
23use index::IndexTree;
24use std::{
25    io::Cursor,
26    ops::{RangeBounds, RangeFull},
27    sync::{atomic::AtomicUsize, Arc},
28};
29use value::MaybeInlineValue;
30use value_log::ValueLog;
31
32fn resolve_value_handle(vlog: &ValueLog<MyBlobCache, MyCompressor>, item: RangeItem) -> RangeItem {
33    use MaybeInlineValue::{Indirect, Inline};
34
35    match item {
36        Ok((key, value)) => {
37            let mut cursor = Cursor::new(value);
38
39            match MaybeInlineValue::decode_from(&mut cursor)? {
40                Inline(bytes) => Ok((key, bytes)),
41                Indirect { vhandle, .. } => {
42                    // Resolve indirection using value log
43                    match vlog.get(&vhandle) {
44                        Ok(Some(bytes)) => Ok((key, bytes)),
45                        Err(e) => Err(e.into()),
46                        _ => {
47                            panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(&key))
48                        }
49                    }
50                }
51            }
52        }
53        Err(e) => Err(e),
54    }
55}
56
57/// A key-value-separated log-structured merge tree
58///
59/// This tree is a composite structure, consisting of an
60/// index tree (LSM-tree) and a log-structured value log
61/// to reduce write amplification.
62///
63/// See <https://docs.rs/value-log> for more information.
64#[derive(Clone)]
65pub struct BlobTree {
66    /// Index tree that holds value handles or small inline values
67    #[doc(hidden)]
68    pub index: IndexTree,
69
70    /// Log-structured value-log that stores large values
71    #[doc(hidden)]
72    pub blobs: ValueLog<MyBlobCache, MyCompressor>,
73
74    // TODO: maybe replace this with a nonce system
75    #[doc(hidden)]
76    pub pending_segments: Arc<AtomicUsize>,
77}
78
79impl BlobTree {
80    pub(crate) fn open(config: Config) -> crate::Result<Self> {
81        let path = &config.path;
82
83        let vlog_path = path.join(BLOBS_FOLDER);
84        let vlog_cfg =
85            value_log::Config::<MyBlobCache, MyCompressor>::new(MyBlobCache(config.cache.clone()))
86                .segment_size_bytes(config.blob_file_target_size)
87                .compression(match config.blob_compression {
88                    crate::CompressionType::None => None,
89
90                    #[cfg(any(feature = "lz4", feature = "miniz"))]
91                    c => Some(MyCompressor(c)),
92                });
93
94        let index: IndexTree = config.open()?.into();
95
96        Ok(Self {
97            index,
98            blobs: ValueLog::open(vlog_path, vlog_cfg)?,
99            pending_segments: Arc::new(AtomicUsize::new(0)),
100        })
101    }
102
103    /// Scans the index tree, collecting statistics about
104    /// value log fragmentation
105    #[doc(hidden)]
106    pub fn gc_scan_stats(
107        &self,
108        seqno: SeqNo,
109        gc_watermark: SeqNo,
110    ) -> crate::Result<crate::gc::Report> {
111        use std::io::{Error as IoError, ErrorKind as IoErrorKind};
112        use MaybeInlineValue::{Indirect, Inline};
113
114        while self
115            .pending_segments
116            .load(std::sync::atomic::Ordering::Acquire)
117            > 0
118        {
119            // IMPORTANT: Busy wait until all segments in-flight are committed
120            // to the tree
121        }
122
123        // IMPORTANT: Lock + snapshot memtable to avoid read skew + preventing tampering with memtable
124        let _memtable_lock = self.index.read_lock_active_memtable();
125
126        while self
127            .pending_segments
128            .load(std::sync::atomic::Ordering::Acquire)
129            > 0
130        {
131            // IMPORTANT: Busy wait again until all segments in-flight are committed
132            // to the tree
133        }
134
135        let iter = self
136            .index
137            .create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None);
138
139        // Stores the max seqno of every blob file
140        let mut seqno_map = crate::HashMap::<SegmentId, SeqNo>::default();
141
142        let result = self
143            .blobs
144            .scan_for_stats(iter.filter_map(|kv| {
145                let Ok(kv) = kv else {
146                    return Some(Err(IoError::new(
147                        IoErrorKind::Other,
148                        "Failed to load KV pair from index tree",
149                    )));
150                };
151
152                let mut cursor = Cursor::new(kv.value);
153                let value = match MaybeInlineValue::decode_from(&mut cursor) {
154                    Ok(v) => v,
155                    Err(e) => return Some(Err(IoError::new(IoErrorKind::Other, e.to_string()))),
156                };
157
158                match value {
159                    Indirect { vhandle, size } => {
160                        seqno_map
161                            .entry(vhandle.segment_id)
162                            .and_modify(|x| *x = (*x).max(kv.key.seqno))
163                            .or_insert(kv.key.seqno);
164
165                        Some(Ok((vhandle, size)))
166                    }
167                    Inline(_) => None,
168                }
169            }))
170            .map_err(Into::into);
171
172        let mut lock = self
173            .blobs
174            .manifest
175            .segments
176            .write()
177            .expect("lock is poisoned");
178
179        // IMPORTANT: We are overwiting the staleness of blob files
180        // that contain an item that is still contained in the GC watermark
181        // so snapshots cannot accidentally lose data
182        //
183        // TODO: 3.0.0 this should be dealt with in value-log 2.0 (make it MVCC aware)
184        for (blob_file_id, max_seqno) in seqno_map {
185            if gc_watermark <= max_seqno {
186                if let Some(blob_file) = lock.get_mut(&blob_file_id) {
187                    blob_file.gc_stats.set_stale_items(0);
188                    blob_file.gc_stats.set_stale_bytes(0);
189                }
190            }
191        }
192
193        result
194    }
195
196    pub fn apply_gc_strategy(
197        &self,
198        strategy: &impl value_log::GcStrategy<MyBlobCache, MyCompressor>,
199        seqno: SeqNo,
200    ) -> crate::Result<u64> {
201        // IMPORTANT: Write lock memtable to avoid read skew
202        let memtable_lock = self.index.lock_active_memtable();
203
204        self.blobs.apply_gc_strategy(
205            strategy,
206            &GcReader::new(&self.index, &memtable_lock),
207            GcWriter::new(seqno, &memtable_lock),
208        )?;
209
210        // NOTE: We still have the memtable lock, can't use gc_drop_stale because recursive locking
211        self.blobs.drop_stale_segments().map_err(Into::into)
212    }
213
214    /// Drops all stale blob segment files
215    #[doc(hidden)]
216    pub fn gc_drop_stale(&self) -> crate::Result<u64> {
217        // IMPORTANT: Write lock memtable to avoid read skew
218        let _lock = self.index.lock_active_memtable();
219
220        self.blobs.drop_stale_segments().map_err(Into::into)
221    }
222
223    #[doc(hidden)]
224    pub fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Segment>> {
225        let Some((segment_id, yanked_memtable)) = self.index.rotate_memtable() else {
226            return Ok(None);
227        };
228
229        let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)?
230        else {
231            return Ok(None);
232        };
233        self.register_segments(&[segment.clone()])?;
234
235        Ok(Some(segment))
236    }
237}
238
239impl AbstractTree for BlobTree {
240    fn ingest(&self, iter: impl Iterator<Item = (UserKey, UserValue)>) -> crate::Result<()> {
241        use crate::tree::ingest::Ingestion;
242        use std::time::Instant;
243
244        // NOTE: Lock active memtable so nothing else can be going on while we are bulk loading
245        let lock = self.lock_active_memtable();
246        assert!(
247            lock.is_empty(),
248            "can only perform bulk_ingest on empty trees",
249        );
250
251        let mut segment_writer = Ingestion::new(&self.index)?;
252        let mut blob_writer = self.blobs.get_writer()?;
253
254        let start = Instant::now();
255        let mut count = 0;
256        let mut last_key = None;
257
258        for (key, value) in iter {
259            if let Some(last_key) = &last_key {
260                assert!(
261                    key > last_key,
262                    "next key in bulk ingest was not greater than last key",
263                );
264            }
265            last_key = Some(key.clone());
266
267            // NOTE: Values are 32-bit max
268            #[allow(clippy::cast_possible_truncation)]
269            let value_size = value.len() as u32;
270
271            if value_size >= self.index.config.blob_file_separation_threshold {
272                let vhandle = blob_writer.get_next_value_handle();
273
274                let indirection = MaybeInlineValue::Indirect {
275                    vhandle,
276                    size: value_size,
277                };
278                // TODO: use Slice::with_size
279                let mut serialized_indirection = vec![];
280                indirection.encode_into(&mut serialized_indirection)?;
281
282                segment_writer.write(key.clone(), serialized_indirection.into())?;
283
284                blob_writer.write(&key, value)?;
285            } else {
286                // TODO: use Slice::with_size
287                let direct = MaybeInlineValue::Inline(value);
288                let serialized_direct = direct.encode_into_vec();
289                segment_writer.write(key, serialized_direct.into())?;
290            }
291
292            count += 1;
293        }
294
295        self.blobs.register_writer(blob_writer)?;
296        segment_writer.finish()?;
297
298        log::info!("Ingested {count} items in {:?}", start.elapsed());
299
300        Ok(())
301    }
302
303    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
304        self.index.major_compact(target_size, seqno_threshold)
305    }
306
307    fn clear_active_memtable(&self) {
308        self.index.clear_active_memtable();
309    }
310
311    fn l0_run_count(&self) -> usize {
312        self.index.l0_run_count()
313    }
314
315    fn blob_file_count(&self) -> usize {
316        self.blobs.segment_count()
317    }
318
319    // NOTE: We skip reading from the value log
320    // because the vHandles already store the value size
321    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
322        let vhandle = self.index.get_vhandle(key.as_ref(), seqno)?;
323
324        Ok(vhandle.map(|x| match x {
325            MaybeInlineValue::Inline(v) => v.len() as u32,
326
327            // NOTE: We skip reading from the value log
328            // because the indirections already store the value size
329            MaybeInlineValue::Indirect { size, .. } => size,
330        }))
331    }
332
333    fn bloom_filter_size(&self) -> usize {
334        self.index.bloom_filter_size()
335    }
336
337    fn sealed_memtable_count(&self) -> usize {
338        self.index.sealed_memtable_count()
339    }
340
341    #[doc(hidden)]
342    fn verify(&self) -> crate::Result<usize> {
343        let index_tree_sum = self.index.verify()?;
344        let vlog_sum = self.blobs.verify()?;
345        Ok(index_tree_sum + vlog_sum)
346    }
347
348    fn keys(
349        &self,
350        seqno: Option<SeqNo>,
351        index: Option<Arc<Memtable>>,
352    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
353        self.index.keys(seqno, index)
354    }
355
356    fn values(
357        &self,
358        seqno: Option<SeqNo>,
359        index: Option<Arc<Memtable>>,
360    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
361        Box::new(self.iter(seqno, index).map(|x| x.map(|(_, v)| v)))
362    }
363
364    fn flush_memtable(
365        &self,
366        segment_id: SegmentId,
367        memtable: &Arc<Memtable>,
368        eviction_seqno: SeqNo,
369    ) -> crate::Result<Option<Segment>> {
370        use crate::{
371            file::SEGMENTS_FOLDER,
372            segment::writer::{Options, Writer as SegmentWriter},
373        };
374        use value::MaybeInlineValue;
375
376        let lsm_segment_folder = self.index.config.path.join(SEGMENTS_FOLDER);
377
378        log::debug!("flushing memtable & performing key-value separation");
379        log::debug!("=> to LSM segments in {:?}", lsm_segment_folder);
380        log::debug!("=> to blob segment at {:?}", self.blobs.path);
381
382        let mut segment_writer = SegmentWriter::new(Options {
383            segment_id,
384            data_block_size: self.index.config.data_block_size,
385            index_block_size: self.index.config.index_block_size,
386            folder: lsm_segment_folder,
387        })?
388        .use_compression(self.index.config.compression);
389
390        segment_writer = segment_writer.use_bloom_policy(
391            crate::segment::writer::BloomConstructionPolicy::FpRate(0.0001),
392        );
393
394        let mut blob_writer = self.blobs.get_writer()?;
395
396        let iter = memtable.iter().map(Ok);
397        let compaction_filter = CompactionStream::new(iter, eviction_seqno);
398
399        for item in compaction_filter {
400            let item = item?;
401
402            if item.is_tombstone() {
403                // NOTE: Still need to add tombstone to index tree
404                // But no blob to blob writer
405
406                segment_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
407                continue;
408            }
409
410            let mut cursor = Cursor::new(item.value);
411
412            let value = MaybeInlineValue::decode_from(&mut cursor)?;
413            let value = match value {
414                MaybeInlineValue::Inline(value) => value,
415                indirection @ MaybeInlineValue::Indirect { .. } => {
416                    // NOTE: This is a previous indirection, just write it to index tree
417                    // without writing the blob again
418
419                    let mut serialized_indirection = vec![];
420                    indirection.encode_into(&mut serialized_indirection)?;
421
422                    segment_writer
423                        .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
424
425                    continue;
426                }
427            };
428
429            // NOTE: Values are 32-bit max
430            #[allow(clippy::cast_possible_truncation)]
431            let value_size = value.len() as u32;
432
433            if value_size >= self.index.config.blob_file_separation_threshold {
434                let vhandle = blob_writer.get_next_value_handle();
435
436                let indirection = MaybeInlineValue::Indirect {
437                    vhandle,
438                    size: value_size,
439                };
440                // TODO: use Slice::with_size
441                let mut serialized_indirection = vec![];
442                indirection.encode_into(&mut serialized_indirection)?;
443
444                segment_writer
445                    .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
446
447                blob_writer.write(&item.key.user_key, value)?;
448            } else {
449                // TODO: use Slice::with_size
450                let direct = MaybeInlineValue::Inline(value);
451                let serialized_direct = direct.encode_into_vec();
452                segment_writer.write(InternalValue::new(item.key, serialized_direct))?;
453            }
454        }
455
456        let _memtable_lock = self.lock_active_memtable();
457
458        log::trace!("Register blob writer into value log");
459        self.blobs.register_writer(blob_writer)?;
460
461        log::trace!("Creating LSM-tree segment {segment_id}");
462        let segment = self.index.consume_writer(segment_id, segment_writer)?;
463
464        // TODO: this can probably solved in a nicer way
465        if segment.is_some() {
466            // IMPORTANT: Increment the pending count
467            // so there cannot be a GC scan now, until the segment is registered
468            self.pending_segments
469                .fetch_add(1, std::sync::atomic::Ordering::Release);
470        }
471
472        Ok(segment)
473    }
474
475    fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
476        self.index.register_segments(segments)?;
477
478        let count = self
479            .pending_segments
480            .load(std::sync::atomic::Ordering::Acquire);
481
482        assert!(
483            count >= segments.len(),
484            "pending_segments is less than segments to register - this is a bug"
485        );
486
487        self.pending_segments
488            .fetch_sub(segments.len(), std::sync::atomic::Ordering::Release);
489
490        Ok(())
491    }
492
493    fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Arc<Memtable>> {
494        self.index.lock_active_memtable()
495    }
496
497    fn set_active_memtable(&self, memtable: Memtable) {
498        self.index.set_active_memtable(memtable);
499    }
500
501    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
502        self.index.add_sealed_memtable(id, memtable);
503    }
504
505    fn compact(
506        &self,
507        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
508        seqno_threshold: SeqNo,
509    ) -> crate::Result<()> {
510        self.index.compact(strategy, seqno_threshold)
511    }
512
513    fn get_next_segment_id(&self) -> SegmentId {
514        self.index.get_next_segment_id()
515    }
516
517    fn tree_config(&self) -> &Config {
518        &self.index.config
519    }
520
521    fn get_highest_seqno(&self) -> Option<SeqNo> {
522        self.index.get_highest_seqno()
523    }
524
525    fn active_memtable_size(&self) -> u32 {
526        self.index.active_memtable_size()
527    }
528
529    fn tree_type(&self) -> crate::TreeType {
530        crate::TreeType::Blob
531    }
532
533    fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
534        self.index.rotate_memtable()
535    }
536
537    fn segment_count(&self) -> usize {
538        self.index.segment_count()
539    }
540
541    fn level_segment_count(&self, idx: usize) -> Option<usize> {
542        self.index.level_segment_count(idx)
543    }
544
545    fn approximate_len(&self) -> usize {
546        self.index.approximate_len()
547    }
548
549    // NOTE: Override the default implementation to not fetch
550    // data from the value log, so we get much faster key reads
551    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<bool> {
552        self.index.contains_key(key, seqno)
553    }
554
555    // NOTE: Override the default implementation to not fetch
556    // data from the value log, so we get much faster scans
557    fn len(&self, seqno: Option<SeqNo>, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
558        self.index.len(seqno, index)
559    }
560
561    fn disk_space(&self) -> u64 {
562        self.index.disk_space() + self.blobs.manifest.disk_space_used()
563    }
564
565    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
566        self.index.get_highest_memtable_seqno()
567    }
568
569    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
570        self.index.get_highest_persisted_seqno()
571    }
572
573    fn snapshot(&self, seqno: SeqNo) -> Snapshot {
574        use crate::AnyTree::Blob;
575
576        Snapshot::new(Blob(self.clone()), seqno)
577    }
578
579    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
580        &self,
581        range: R,
582        seqno: Option<SeqNo>,
583        index: Option<Arc<Memtable>>,
584    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
585        let vlog = self.blobs.clone();
586        Box::new(
587            self.index
588                .0
589                .create_range(&range, seqno, index)
590                .map(move |item| resolve_value_handle(&vlog, item)),
591        )
592    }
593
594    fn prefix<K: AsRef<[u8]>>(
595        &self,
596        prefix: K,
597        seqno: Option<SeqNo>,
598        index: Option<Arc<Memtable>>,
599    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
600        let vlog = self.blobs.clone();
601        Box::new(
602            self.index
603                .0
604                .create_prefix(prefix, seqno, index)
605                .map(move |item| resolve_value_handle(&vlog, item)),
606        )
607    }
608
609    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
610        &self,
611        key: K,
612        value: V,
613        seqno: SeqNo,
614    ) -> (u32, u32) {
615        use value::MaybeInlineValue;
616
617        // NOTE: Initially, we always write an inline value
618        // On memtable flush, depending on the values' sizes, they will be separated
619        // into inline or indirect values
620        let item = MaybeInlineValue::Inline(value.into());
621
622        let value = item.encode_into_vec();
623
624        self.index.insert(key, value, seqno)
625    }
626
627    fn get<K: AsRef<[u8]>>(
628        &self,
629        key: K,
630        seqno: Option<SeqNo>,
631    ) -> crate::Result<Option<crate::UserValue>> {
632        use value::MaybeInlineValue::{Indirect, Inline};
633
634        let key = key.as_ref();
635
636        let Some(value) = self.index.get_vhandle(key, seqno)? else {
637            return Ok(None);
638        };
639
640        match value {
641            Inline(bytes) => Ok(Some(bytes)),
642            Indirect { vhandle, .. } => {
643                // Resolve indirection using value log
644                match self.blobs.get(&vhandle)? {
645                    Some(bytes) => Ok(Some(bytes)),
646                    None => {
647                        panic!("value handle ({key:?} => {vhandle:?}) did not match any blob - this is a bug")
648                    }
649                }
650            }
651        }
652    }
653
654    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
655        self.index.remove(key, seqno)
656    }
657
658    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
659        self.index.remove_weak(key, seqno)
660    }
661}