lsm_tree/blob_tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod cache;
6mod compression;
7mod gc;
8pub mod index;
9pub mod value;
10
11use crate::{
12    coding::{Decode, Encode},
13    compaction::stream::CompactionStream,
14    file::BLOBS_FOLDER,
15    r#abstract::{AbstractTree, RangeItem},
16    tree::inner::MemtableId,
17    value::InternalValue,
18    Config, KvPair, Memtable, Segment, SegmentId, SeqNo, Snapshot, UserKey, UserValue,
19};
20use cache::MyBlobCache;
21use compression::MyCompressor;
22use gc::{reader::GcReader, writer::GcWriter};
23use index::IndexTree;
24use std::{
25    io::Cursor,
26    ops::{RangeBounds, RangeFull},
27    sync::{atomic::AtomicUsize, Arc},
28};
29use value::MaybeInlineValue;
30use value_log::ValueLog;
31
32fn resolve_value_handle(vlog: &ValueLog<MyBlobCache, MyCompressor>, item: RangeItem) -> RangeItem {
33    use MaybeInlineValue::{Indirect, Inline};
34
35    match item {
36        Ok((key, value)) => {
37            let mut cursor = Cursor::new(value);
38
39            match MaybeInlineValue::decode_from(&mut cursor)? {
40                Inline(bytes) => Ok((key, bytes)),
41                Indirect { vhandle, .. } => {
42                    // Resolve indirection using value log
43                    match vlog.get(&vhandle) {
44                        Ok(Some(bytes)) => Ok((key, bytes)),
45                        Err(e) => Err(e.into()),
46                        _ => {
47                            panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(&key))
48                        }
49                    }
50                }
51            }
52        }
53        Err(e) => Err(e),
54    }
55}
56
57/// A key-value-separated log-structured merge tree
58///
59/// This tree is a composite structure, consisting of an
60/// index tree (LSM-tree) and a log-structured value log
61/// to reduce write amplification.
62///
63/// See <https://docs.rs/value-log> for more information.
64#[derive(Clone)]
65pub struct BlobTree {
66    /// Index tree that holds value handles or small inline values
67    #[doc(hidden)]
68    pub index: IndexTree,
69
70    /// Log-structured value-log that stores large values
71    #[doc(hidden)]
72    pub blobs: ValueLog<MyBlobCache, MyCompressor>,
73
74    // TODO: maybe replace this with a nonce system
75    #[doc(hidden)]
76    pub pending_segments: Arc<AtomicUsize>,
77}
78
79impl BlobTree {
80    pub(crate) fn open(config: Config) -> crate::Result<Self> {
81        let path = &config.path;
82
83        let vlog_path = path.join(BLOBS_FOLDER);
84        let vlog_cfg =
85            value_log::Config::<MyBlobCache, MyCompressor>::new(MyBlobCache(config.cache.clone()))
86                .segment_size_bytes(config.blob_file_target_size)
87                .compression(match config.blob_compression {
88                    crate::CompressionType::None => None,
89
90                    #[cfg(any(feature = "lz4", feature = "miniz"))]
91                    c => Some(MyCompressor(c)),
92                });
93
94        let index: IndexTree = config.open()?.into();
95
96        Ok(Self {
97            index,
98            blobs: ValueLog::open(vlog_path, vlog_cfg)?,
99            pending_segments: Arc::new(AtomicUsize::new(0)),
100        })
101    }
102
103    /// Scans the index tree, collecting statistics about
104    /// value log fragmentation
105    #[doc(hidden)]
106    pub fn gc_scan_stats(
107        &self,
108        seqno: SeqNo,
109        gc_watermark: SeqNo,
110    ) -> crate::Result<crate::gc::Report> {
111        use std::io::Error as IoError;
112        use MaybeInlineValue::{Indirect, Inline};
113
114        while self
115            .pending_segments
116            .load(std::sync::atomic::Ordering::Acquire)
117            > 0
118        {
119            // IMPORTANT: Busy wait until all segments in-flight are committed
120            // to the tree
121        }
122
123        // IMPORTANT: Lock + snapshot memtable to avoid read skew + preventing tampering with memtable
124        let _memtable_lock = self.index.read_lock_active_memtable();
125
126        while self
127            .pending_segments
128            .load(std::sync::atomic::Ordering::Acquire)
129            > 0
130        {
131            // IMPORTANT: Busy wait again until all segments in-flight are committed
132            // to the tree
133        }
134
135        let iter = self
136            .index
137            .create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None);
138
139        // Stores the max seqno of every blob file
140        let mut seqno_map = crate::HashMap::<SegmentId, SeqNo>::default();
141
142        let result = self
143            .blobs
144            .scan_for_stats(iter.filter_map(|kv| {
145                let Ok(kv) = kv else {
146                    return Some(Err(IoError::other(
147                        "Failed to load KV pair from index tree",
148                    )));
149                };
150
151                let mut cursor = Cursor::new(kv.value);
152                let value = match MaybeInlineValue::decode_from(&mut cursor) {
153                    Ok(v) => v,
154                    Err(e) => return Some(Err(IoError::other(e.to_string()))),
155                };
156
157                match value {
158                    Indirect { vhandle, size } => {
159                        seqno_map
160                            .entry(vhandle.segment_id)
161                            .and_modify(|x| *x = (*x).max(kv.key.seqno))
162                            .or_insert(kv.key.seqno);
163
164                        Some(Ok((vhandle, size)))
165                    }
166                    Inline(_) => None,
167                }
168            }))
169            .map_err(Into::into);
170
171        let mut lock = self
172            .blobs
173            .manifest
174            .segments
175            .write()
176            .expect("lock is poisoned");
177
178        // IMPORTANT: We are overwiting the staleness of blob files
179        // that contain an item that is still contained in the GC watermark
180        // so snapshots cannot accidentally lose data
181        //
182        // TODO: 3.0.0 this should be dealt with in value-log 2.0 (make it MVCC aware)
183        for (blob_file_id, max_seqno) in seqno_map {
184            if gc_watermark <= max_seqno {
185                if let Some(blob_file) = lock.get_mut(&blob_file_id) {
186                    blob_file.gc_stats.set_stale_items(0);
187                    blob_file.gc_stats.set_stale_bytes(0);
188                }
189            }
190        }
191
192        result
193    }
194
195    pub fn apply_gc_strategy(
196        &self,
197        strategy: &impl value_log::GcStrategy<MyBlobCache, MyCompressor>,
198        seqno: SeqNo,
199    ) -> crate::Result<u64> {
200        // IMPORTANT: Write lock memtable to avoid read skew
201        let memtable_lock = self.index.lock_active_memtable();
202
203        self.blobs.apply_gc_strategy(
204            strategy,
205            &GcReader::new(&self.index, &memtable_lock),
206            GcWriter::new(seqno, &memtable_lock),
207        )?;
208
209        // NOTE: We still have the memtable lock, can't use gc_drop_stale because recursive locking
210        self.blobs.drop_stale_segments().map_err(Into::into)
211    }
212
213    /// Drops all stale blob segment files
214    #[doc(hidden)]
215    pub fn gc_drop_stale(&self) -> crate::Result<u64> {
216        // IMPORTANT: Write lock memtable to avoid read skew
217        let _lock = self.index.lock_active_memtable();
218
219        self.blobs.drop_stale_segments().map_err(Into::into)
220    }
221
222    #[doc(hidden)]
223    pub fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Segment>> {
224        let Some((segment_id, yanked_memtable)) = self.index.rotate_memtable() else {
225            return Ok(None);
226        };
227
228        let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)?
229        else {
230            return Ok(None);
231        };
232        self.register_segments(&[segment.clone()])?;
233
234        Ok(Some(segment))
235    }
236}
237
238impl AbstractTree for BlobTree {
239    fn ingest(&self, iter: impl Iterator<Item = (UserKey, UserValue)>) -> crate::Result<()> {
240        use crate::tree::ingest::Ingestion;
241        use std::time::Instant;
242
243        // NOTE: Lock active memtable so nothing else can be going on while we are bulk loading
244        let lock = self.lock_active_memtable();
245        assert!(
246            lock.is_empty(),
247            "can only perform bulk_ingest on empty trees",
248        );
249
250        let mut segment_writer = Ingestion::new(&self.index)?;
251        let mut blob_writer = self.blobs.get_writer()?;
252
253        let start = Instant::now();
254        let mut count = 0;
255        let mut last_key = None;
256
257        for (key, value) in iter {
258            if let Some(last_key) = &last_key {
259                assert!(
260                    key > last_key,
261                    "next key in bulk ingest was not greater than last key",
262                );
263            }
264            last_key = Some(key.clone());
265
266            // NOTE: Values are 32-bit max
267            #[allow(clippy::cast_possible_truncation)]
268            let value_size = value.len() as u32;
269
270            if value_size >= self.index.config.blob_file_separation_threshold {
271                let vhandle = blob_writer.get_next_value_handle();
272
273                let indirection = MaybeInlineValue::Indirect {
274                    vhandle,
275                    size: value_size,
276                };
277                // TODO: use Slice::with_size
278                let mut serialized_indirection = vec![];
279                indirection.encode_into(&mut serialized_indirection)?;
280
281                segment_writer.write(key.clone(), serialized_indirection.into())?;
282
283                blob_writer.write(&key, value)?;
284            } else {
285                // TODO: use Slice::with_size
286                let direct = MaybeInlineValue::Inline(value);
287                let serialized_direct = direct.encode_into_vec();
288                segment_writer.write(key, serialized_direct.into())?;
289            }
290
291            count += 1;
292        }
293
294        self.blobs.register_writer(blob_writer)?;
295        segment_writer.finish()?;
296
297        log::info!("Ingested {count} items in {:?}", start.elapsed());
298
299        Ok(())
300    }
301
302    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
303        self.index.major_compact(target_size, seqno_threshold)
304    }
305
306    fn clear_active_memtable(&self) {
307        self.index.clear_active_memtable();
308    }
309
310    fn l0_run_count(&self) -> usize {
311        self.index.l0_run_count()
312    }
313
314    fn blob_file_count(&self) -> usize {
315        self.blobs.segment_count()
316    }
317
318    // NOTE: We skip reading from the value log
319    // because the vHandles already store the value size
320    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
321        let vhandle = self.index.get_vhandle(key.as_ref(), seqno)?;
322
323        Ok(vhandle.map(|x| match x {
324            MaybeInlineValue::Inline(v) => v.len() as u32,
325
326            // NOTE: We skip reading from the value log
327            // because the indirections already store the value size
328            MaybeInlineValue::Indirect { size, .. } => size,
329        }))
330    }
331
332    fn bloom_filter_size(&self) -> usize {
333        self.index.bloom_filter_size()
334    }
335
336    fn sealed_memtable_count(&self) -> usize {
337        self.index.sealed_memtable_count()
338    }
339
340    #[doc(hidden)]
341    fn verify(&self) -> crate::Result<usize> {
342        let index_tree_sum = self.index.verify()?;
343        let vlog_sum = self.blobs.verify()?;
344        Ok(index_tree_sum + vlog_sum)
345    }
346
347    fn keys(
348        &self,
349        seqno: Option<SeqNo>,
350        index: Option<Arc<Memtable>>,
351    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
352        self.index.keys(seqno, index)
353    }
354
355    fn values(
356        &self,
357        seqno: Option<SeqNo>,
358        index: Option<Arc<Memtable>>,
359    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
360        Box::new(self.iter(seqno, index).map(|x| x.map(|(_, v)| v)))
361    }
362
363    fn flush_memtable(
364        &self,
365        segment_id: SegmentId,
366        memtable: &Arc<Memtable>,
367        eviction_seqno: SeqNo,
368    ) -> crate::Result<Option<Segment>> {
369        use crate::{
370            file::SEGMENTS_FOLDER,
371            segment::writer::{Options, Writer as SegmentWriter},
372        };
373        use value::MaybeInlineValue;
374
375        let lsm_segment_folder = self.index.config.path.join(SEGMENTS_FOLDER);
376
377        log::debug!("flushing memtable & performing key-value separation");
378        log::debug!("=> to LSM segments in {lsm_segment_folder:?}");
379        log::debug!("=> to blob segment at {:?}", self.blobs.path);
380
381        let mut segment_writer = SegmentWriter::new(Options {
382            segment_id,
383            data_block_size: self.index.config.data_block_size,
384            index_block_size: self.index.config.index_block_size,
385            folder: lsm_segment_folder,
386        })?
387        .use_compression(self.index.config.compression);
388
389        segment_writer = segment_writer.use_bloom_policy(
390            crate::segment::writer::BloomConstructionPolicy::FpRate(0.0001),
391        );
392
393        let mut blob_writer = self.blobs.get_writer()?;
394
395        let iter = memtable.iter().map(Ok);
396        let compaction_filter = CompactionStream::new(iter, eviction_seqno);
397
398        for item in compaction_filter {
399            let item = item?;
400
401            if item.is_tombstone() {
402                // NOTE: Still need to add tombstone to index tree
403                // But no blob to blob writer
404
405                segment_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
406                continue;
407            }
408
409            let mut cursor = Cursor::new(item.value);
410
411            let value = MaybeInlineValue::decode_from(&mut cursor)?;
412            let value = match value {
413                MaybeInlineValue::Inline(value) => value,
414                indirection @ MaybeInlineValue::Indirect { .. } => {
415                    // NOTE: This is a previous indirection, just write it to index tree
416                    // without writing the blob again
417
418                    let mut serialized_indirection = vec![];
419                    indirection.encode_into(&mut serialized_indirection)?;
420
421                    segment_writer
422                        .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
423
424                    continue;
425                }
426            };
427
428            // NOTE: Values are 32-bit max
429            #[allow(clippy::cast_possible_truncation)]
430            let value_size = value.len() as u32;
431
432            if value_size >= self.index.config.blob_file_separation_threshold {
433                let vhandle = blob_writer.get_next_value_handle();
434
435                let indirection = MaybeInlineValue::Indirect {
436                    vhandle,
437                    size: value_size,
438                };
439                // TODO: use Slice::with_size
440                let mut serialized_indirection = vec![];
441                indirection.encode_into(&mut serialized_indirection)?;
442
443                segment_writer
444                    .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
445
446                blob_writer.write(&item.key.user_key, value)?;
447            } else {
448                // TODO: use Slice::with_size
449                let direct = MaybeInlineValue::Inline(value);
450                let serialized_direct = direct.encode_into_vec();
451                segment_writer.write(InternalValue::new(item.key, serialized_direct))?;
452            }
453        }
454
455        let _memtable_lock = self.lock_active_memtable();
456
457        log::trace!("Register blob writer into value log");
458        self.blobs.register_writer(blob_writer)?;
459
460        log::trace!("Creating LSM-tree segment {segment_id}");
461        let segment = self.index.consume_writer(segment_id, segment_writer)?;
462
463        // TODO: this can probably solved in a nicer way
464        if segment.is_some() {
465            // IMPORTANT: Increment the pending count
466            // so there cannot be a GC scan now, until the segment is registered
467            self.pending_segments
468                .fetch_add(1, std::sync::atomic::Ordering::Release);
469        }
470
471        Ok(segment)
472    }
473
474    fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
475        self.index.register_segments(segments)?;
476
477        let count = self
478            .pending_segments
479            .load(std::sync::atomic::Ordering::Acquire);
480
481        assert!(
482            count >= segments.len(),
483            "pending_segments is less than segments to register - this is a bug"
484        );
485
486        self.pending_segments
487            .fetch_sub(segments.len(), std::sync::atomic::Ordering::Release);
488
489        Ok(())
490    }
491
492    fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Arc<Memtable>> {
493        self.index.lock_active_memtable()
494    }
495
496    fn set_active_memtable(&self, memtable: Memtable) {
497        self.index.set_active_memtable(memtable);
498    }
499
500    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
501        self.index.add_sealed_memtable(id, memtable);
502    }
503
504    fn compact(
505        &self,
506        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
507        seqno_threshold: SeqNo,
508    ) -> crate::Result<()> {
509        self.index.compact(strategy, seqno_threshold)
510    }
511
512    fn get_next_segment_id(&self) -> SegmentId {
513        self.index.get_next_segment_id()
514    }
515
516    fn tree_config(&self) -> &Config {
517        &self.index.config
518    }
519
520    fn get_highest_seqno(&self) -> Option<SeqNo> {
521        self.index.get_highest_seqno()
522    }
523
524    fn active_memtable_size(&self) -> u32 {
525        self.index.active_memtable_size()
526    }
527
528    fn tree_type(&self) -> crate::TreeType {
529        crate::TreeType::Blob
530    }
531
532    fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
533        self.index.rotate_memtable()
534    }
535
536    fn segment_count(&self) -> usize {
537        self.index.segment_count()
538    }
539
540    fn level_segment_count(&self, idx: usize) -> Option<usize> {
541        self.index.level_segment_count(idx)
542    }
543
544    fn approximate_len(&self) -> usize {
545        self.index.approximate_len()
546    }
547
548    // NOTE: Override the default implementation to not fetch
549    // data from the value log, so we get much faster key reads
550    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<bool> {
551        self.index.contains_key(key, seqno)
552    }
553
554    // NOTE: Override the default implementation to not fetch
555    // data from the value log, so we get much faster scans
556    fn len(&self, seqno: Option<SeqNo>, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
557        self.index.len(seqno, index)
558    }
559
560    fn disk_space(&self) -> u64 {
561        self.index.disk_space() + self.blobs.manifest.disk_space_used()
562    }
563
564    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
565        self.index.get_highest_memtable_seqno()
566    }
567
568    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
569        self.index.get_highest_persisted_seqno()
570    }
571
572    fn snapshot(&self, seqno: SeqNo) -> Snapshot {
573        use crate::AnyTree::Blob;
574
575        Snapshot::new(Blob(self.clone()), seqno)
576    }
577
578    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
579        &self,
580        range: R,
581        seqno: Option<SeqNo>,
582        index: Option<Arc<Memtable>>,
583    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
584        let vlog = self.blobs.clone();
585        Box::new(
586            self.index
587                .0
588                .create_range(&range, seqno, index)
589                .map(move |item| resolve_value_handle(&vlog, item)),
590        )
591    }
592
593    fn prefix<K: AsRef<[u8]>>(
594        &self,
595        prefix: K,
596        seqno: Option<SeqNo>,
597        index: Option<Arc<Memtable>>,
598    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
599        let vlog = self.blobs.clone();
600        Box::new(
601            self.index
602                .0
603                .create_prefix(prefix, seqno, index)
604                .map(move |item| resolve_value_handle(&vlog, item)),
605        )
606    }
607
608    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
609        &self,
610        key: K,
611        value: V,
612        seqno: SeqNo,
613    ) -> (u32, u32) {
614        use value::MaybeInlineValue;
615
616        // NOTE: Initially, we always write an inline value
617        // On memtable flush, depending on the values' sizes, they will be separated
618        // into inline or indirect values
619        let item = MaybeInlineValue::Inline(value.into());
620
621        let value = item.encode_into_vec();
622
623        self.index.insert(key, value, seqno)
624    }
625
626    fn get<K: AsRef<[u8]>>(
627        &self,
628        key: K,
629        seqno: Option<SeqNo>,
630    ) -> crate::Result<Option<crate::UserValue>> {
631        use value::MaybeInlineValue::{Indirect, Inline};
632
633        let key = key.as_ref();
634
635        let Some(value) = self.index.get_vhandle(key, seqno)? else {
636            return Ok(None);
637        };
638
639        match value {
640            Inline(bytes) => Ok(Some(bytes)),
641            Indirect { vhandle, .. } => {
642                // Resolve indirection using value log
643                match self.blobs.get(&vhandle)? {
644                    Some(bytes) => Ok(Some(bytes)),
645                    None => {
646                        panic!("value handle ({key:?} => {vhandle:?}) did not match any blob - this is a bug")
647                    }
648                }
649            }
650        }
651    }
652
653    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
654        self.index.remove(key, seqno)
655    }
656
657    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
658        self.index.remove_weak(key, seqno)
659    }
660}