lsm_tree/blob_tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod cache;
6mod compression;
7mod gc;
8pub mod index;
9pub mod value;
10
11use crate::{
12    coding::{Decode, Encode},
13    compaction::stream::CompactionStream,
14    file::BLOBS_FOLDER,
15    r#abstract::{AbstractTree, RangeItem},
16    tree::inner::MemtableId,
17    value::InternalValue,
18    Config, KvPair, Memtable, Segment, SegmentId, SeqNo, Snapshot, UserKey, UserValue,
19};
20use cache::MyBlobCache;
21use compression::MyCompressor;
22use gc::{reader::GcReader, writer::GcWriter};
23use index::IndexTree;
24use std::{
25    io::Cursor,
26    ops::{RangeBounds, RangeFull},
27    sync::{atomic::AtomicUsize, Arc},
28};
29use value::MaybeInlineValue;
30use value_log::ValueLog;
31
32fn resolve_value_handle(vlog: &ValueLog<MyBlobCache, MyCompressor>, item: RangeItem) -> RangeItem {
33    use MaybeInlineValue::{Indirect, Inline};
34
35    match item {
36        Ok((key, value)) => {
37            let mut cursor = Cursor::new(value);
38
39            match MaybeInlineValue::decode_from(&mut cursor)? {
40                Inline(bytes) => Ok((key, bytes)),
41                Indirect { vhandle, .. } => {
42                    // Resolve indirection using value log
43                    match vlog.get(&vhandle) {
44                        Ok(Some(bytes)) => Ok((key, bytes)),
45                        Err(e) => Err(e.into()),
46                        _ => {
47                            panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(&key))
48                        }
49                    }
50                }
51            }
52        }
53        Err(e) => Err(e),
54    }
55}
56
57/// A key-value-separated log-structured merge tree
58///
59/// This tree is a composite structure, consisting of an
60/// index tree (LSM-tree) and a log-structured value log
61/// to reduce write amplification.
62///
63/// See <https://docs.rs/value-log> for more information.
64#[derive(Clone)]
65pub struct BlobTree {
66    /// Index tree that holds value handles or small inline values
67    #[doc(hidden)]
68    pub index: IndexTree,
69
70    /// Log-structured value-log that stores large values
71    #[doc(hidden)]
72    pub blobs: ValueLog<MyBlobCache, MyCompressor>,
73
74    // TODO: maybe replace this with a nonce system
75    #[doc(hidden)]
76    pub pending_segments: Arc<AtomicUsize>,
77}
78
79impl BlobTree {
80    pub(crate) fn open(config: Config) -> crate::Result<Self> {
81        let path = &config.path;
82
83        let vlog_path = path.join(BLOBS_FOLDER);
84        let vlog_cfg =
85            value_log::Config::<MyBlobCache, MyCompressor>::new(MyBlobCache(config.cache.clone()))
86                .segment_size_bytes(config.blob_file_target_size)
87                .compression(MyCompressor(config.blob_compression));
88
89        let index: IndexTree = config.open()?.into();
90
91        Ok(Self {
92            index,
93            blobs: ValueLog::open(vlog_path, vlog_cfg)?,
94            pending_segments: Arc::new(AtomicUsize::new(0)),
95        })
96    }
97
98    /// Scans the index tree, collecting statistics about
99    /// value log fragmentation
100    #[doc(hidden)]
101    pub fn gc_scan_stats(
102        &self,
103        seqno: SeqNo,
104        gc_watermark: SeqNo,
105    ) -> crate::Result<crate::gc::Report> {
106        use std::io::{Error as IoError, ErrorKind as IoErrorKind};
107        use MaybeInlineValue::{Indirect, Inline};
108
109        while self
110            .pending_segments
111            .load(std::sync::atomic::Ordering::Acquire)
112            > 0
113        {
114            // IMPORTANT: Busy wait until all segments in-flight are committed
115            // to the tree
116        }
117
118        // IMPORTANT: Lock + snapshot memtable to avoid read skew + preventing tampering with memtable
119        let _memtable_lock = self.index.read_lock_active_memtable();
120
121        while self
122            .pending_segments
123            .load(std::sync::atomic::Ordering::Acquire)
124            > 0
125        {
126            // IMPORTANT: Busy wait again until all segments in-flight are committed
127            // to the tree
128        }
129
130        let iter = self
131            .index
132            .create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None);
133
134        // Stores the max seqno of every blob file
135        let mut seqno_map = crate::HashMap::<SegmentId, SeqNo>::default();
136
137        let result = self
138            .blobs
139            .scan_for_stats(iter.filter_map(|kv| {
140                let Ok(kv) = kv else {
141                    return Some(Err(IoError::new(
142                        IoErrorKind::Other,
143                        "Failed to load KV pair from index tree",
144                    )));
145                };
146
147                let mut cursor = Cursor::new(kv.value);
148                let value = match MaybeInlineValue::decode_from(&mut cursor) {
149                    Ok(v) => v,
150                    Err(e) => return Some(Err(IoError::new(IoErrorKind::Other, e.to_string()))),
151                };
152
153                match value {
154                    Indirect { vhandle, size } => {
155                        seqno_map
156                            .entry(vhandle.segment_id)
157                            .and_modify(|x| *x = (*x).max(kv.key.seqno))
158                            .or_insert(kv.key.seqno);
159
160                        Some(Ok((vhandle, size)))
161                    }
162                    Inline(_) => None,
163                }
164            }))
165            .map_err(Into::into);
166
167        let mut lock = self
168            .blobs
169            .manifest
170            .segments
171            .write()
172            .expect("lock is poisoned");
173
174        // IMPORTANT: We are overwiting the staleness of blob files
175        // that contain an item that is still contained in the GC watermark
176        // so snapshots cannot accidentally lose data
177        //
178        // TODO: 3.0.0 this should be dealt with in value-log 2.0 (make it MVCC aware)
179        for (blob_file_id, max_seqno) in seqno_map {
180            if gc_watermark <= max_seqno {
181                if let Some(blob_file) = lock.get_mut(&blob_file_id) {
182                    blob_file.gc_stats.set_stale_items(0);
183                    blob_file.gc_stats.set_stale_bytes(0);
184                }
185            }
186        }
187
188        result
189    }
190
191    pub fn apply_gc_strategy(
192        &self,
193        strategy: &impl value_log::GcStrategy<MyBlobCache, MyCompressor>,
194        seqno: SeqNo,
195    ) -> crate::Result<u64> {
196        // IMPORTANT: Write lock memtable to avoid read skew
197        let memtable_lock = self.index.lock_active_memtable();
198
199        self.blobs.apply_gc_strategy(
200            strategy,
201            &GcReader::new(&self.index, &memtable_lock),
202            GcWriter::new(seqno, &memtable_lock),
203        )?;
204
205        // NOTE: We still have the memtable lock, can't use gc_drop_stale because recursive locking
206        self.blobs.drop_stale_segments().map_err(Into::into)
207    }
208
209    /// Drops all stale blob segment files
210    #[doc(hidden)]
211    pub fn gc_drop_stale(&self) -> crate::Result<u64> {
212        // IMPORTANT: Write lock memtable to avoid read skew
213        let _lock = self.index.lock_active_memtable();
214
215        self.blobs.drop_stale_segments().map_err(Into::into)
216    }
217
218    #[doc(hidden)]
219    pub fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Segment>> {
220        let Some((segment_id, yanked_memtable)) = self.index.rotate_memtable() else {
221            return Ok(None);
222        };
223
224        let Some(segment) = self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)?
225        else {
226            return Ok(None);
227        };
228        self.register_segments(&[segment.clone()])?;
229
230        Ok(Some(segment))
231    }
232}
233
234impl AbstractTree for BlobTree {
235    fn ingest(&self, iter: impl Iterator<Item = (UserKey, UserValue)>) -> crate::Result<()> {
236        use crate::tree::ingest::Ingestion;
237        use std::time::Instant;
238
239        // NOTE: Lock active memtable so nothing else can be going on while we are bulk loading
240        let lock = self.lock_active_memtable();
241        assert!(
242            lock.is_empty(),
243            "can only perform bulk_ingest on empty trees",
244        );
245
246        let mut segment_writer = Ingestion::new(&self.index)?;
247        let mut blob_writer = self.blobs.get_writer()?;
248
249        let start = Instant::now();
250        let mut count = 0;
251        let mut last_key = None;
252
253        for (key, value) in iter {
254            if let Some(last_key) = &last_key {
255                assert!(
256                    key > last_key,
257                    "next key in bulk ingest was not greater than last key",
258                );
259            }
260            last_key = Some(key.clone());
261
262            // NOTE: Values are 32-bit max
263            #[allow(clippy::cast_possible_truncation)]
264            let value_size = value.len() as u32;
265
266            if value_size >= self.index.config.blob_file_separation_threshold {
267                let vhandle = blob_writer.get_next_value_handle();
268
269                let indirection = MaybeInlineValue::Indirect {
270                    vhandle,
271                    size: value_size,
272                };
273                // TODO: use Slice::with_size
274                let mut serialized_indirection = vec![];
275                indirection.encode_into(&mut serialized_indirection)?;
276
277                segment_writer.write(key.clone(), serialized_indirection.into())?;
278
279                blob_writer.write(&key, value)?;
280            } else {
281                // TODO: use Slice::with_size
282                let direct = MaybeInlineValue::Inline(value);
283                let serialized_direct = direct.encode_into_vec();
284                segment_writer.write(key, serialized_direct.into())?;
285            }
286
287            count += 1;
288        }
289
290        self.blobs.register_writer(blob_writer)?;
291        segment_writer.finish()?;
292
293        log::info!("Ingested {count} items in {:?}", start.elapsed());
294
295        Ok(())
296    }
297
298    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
299        self.index.major_compact(target_size, seqno_threshold)
300    }
301
302    fn clear_active_memtable(&self) {
303        self.index.clear_active_memtable();
304    }
305
306    fn l0_run_count(&self) -> usize {
307        self.index.l0_run_count()
308    }
309
310    fn blob_file_count(&self) -> usize {
311        self.blobs.segment_count()
312    }
313
314    // NOTE: We skip reading from the value log
315    // because the vHandles already store the value size
316    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<Option<u32>> {
317        let vhandle = self.index.get_vhandle(key.as_ref(), seqno)?;
318
319        Ok(vhandle.map(|x| match x {
320            MaybeInlineValue::Inline(v) => v.len() as u32,
321
322            // NOTE: We skip reading from the value log
323            // because the indirections already store the value size
324            MaybeInlineValue::Indirect { size, .. } => size,
325        }))
326    }
327
328    fn bloom_filter_size(&self) -> usize {
329        self.index.bloom_filter_size()
330    }
331
332    fn sealed_memtable_count(&self) -> usize {
333        self.index.sealed_memtable_count()
334    }
335
336    #[doc(hidden)]
337    fn verify(&self) -> crate::Result<usize> {
338        let index_tree_sum = self.index.verify()?;
339        let vlog_sum = self.blobs.verify()?;
340        Ok(index_tree_sum + vlog_sum)
341    }
342
343    fn keys(
344        &self,
345        seqno: Option<SeqNo>,
346        index: Option<Arc<Memtable>>,
347    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserKey>> + 'static> {
348        self.index.keys(seqno, index)
349    }
350
351    fn values(
352        &self,
353        seqno: Option<SeqNo>,
354        index: Option<Arc<Memtable>>,
355    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<UserValue>> + 'static> {
356        Box::new(self.iter(seqno, index).map(|x| x.map(|(_, v)| v)))
357    }
358
359    fn flush_memtable(
360        &self,
361        segment_id: SegmentId,
362        memtable: &Arc<Memtable>,
363        eviction_seqno: SeqNo,
364    ) -> crate::Result<Option<Segment>> {
365        use crate::{
366            file::SEGMENTS_FOLDER,
367            segment::writer::{Options, Writer as SegmentWriter},
368        };
369        use value::MaybeInlineValue;
370
371        let lsm_segment_folder = self.index.config.path.join(SEGMENTS_FOLDER);
372
373        log::debug!("flushing memtable & performing key-value separation");
374        log::debug!("=> to LSM segments in {:?}", lsm_segment_folder);
375        log::debug!("=> to blob segment at {:?}", self.blobs.path);
376
377        let mut segment_writer = SegmentWriter::new(Options {
378            segment_id,
379            data_block_size: self.index.config.data_block_size,
380            index_block_size: self.index.config.index_block_size,
381            folder: lsm_segment_folder,
382        })?
383        .use_compression(self.index.config.compression);
384
385        segment_writer = segment_writer.use_bloom_policy(
386            crate::segment::writer::BloomConstructionPolicy::FpRate(0.0001),
387        );
388
389        let mut blob_writer = self.blobs.get_writer()?;
390
391        let iter = memtable.iter().map(Ok);
392        let compaction_filter = CompactionStream::new(iter, eviction_seqno);
393
394        for item in compaction_filter {
395            let item = item?;
396
397            if item.is_tombstone() {
398                // NOTE: Still need to add tombstone to index tree
399                // But no blob to blob writer
400
401                segment_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
402                continue;
403            }
404
405            let mut cursor = Cursor::new(item.value);
406
407            let value = MaybeInlineValue::decode_from(&mut cursor)?;
408            let value = match value {
409                MaybeInlineValue::Inline(value) => value,
410                indirection @ MaybeInlineValue::Indirect { .. } => {
411                    // NOTE: This is a previous indirection, just write it to index tree
412                    // without writing the blob again
413
414                    let mut serialized_indirection = vec![];
415                    indirection.encode_into(&mut serialized_indirection)?;
416
417                    segment_writer
418                        .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
419
420                    continue;
421                }
422            };
423
424            // NOTE: Values are 32-bit max
425            #[allow(clippy::cast_possible_truncation)]
426            let value_size = value.len() as u32;
427
428            if value_size >= self.index.config.blob_file_separation_threshold {
429                let vhandle = blob_writer.get_next_value_handle();
430
431                let indirection = MaybeInlineValue::Indirect {
432                    vhandle,
433                    size: value_size,
434                };
435                // TODO: use Slice::with_size
436                let mut serialized_indirection = vec![];
437                indirection.encode_into(&mut serialized_indirection)?;
438
439                segment_writer
440                    .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
441
442                blob_writer.write(&item.key.user_key, value)?;
443            } else {
444                // TODO: use Slice::with_size
445                let direct = MaybeInlineValue::Inline(value);
446                let serialized_direct = direct.encode_into_vec();
447                segment_writer.write(InternalValue::new(item.key, serialized_direct))?;
448            }
449        }
450
451        let _memtable_lock = self.lock_active_memtable();
452
453        log::trace!("Register blob writer into value log");
454        self.blobs.register_writer(blob_writer)?;
455
456        log::trace!("Creating LSM-tree segment {segment_id}");
457        let segment = self.index.consume_writer(segment_id, segment_writer)?;
458
459        // TODO: this can probably solved in a nicer way
460        if segment.is_some() {
461            // IMPORTANT: Increment the pending count
462            // so there cannot be a GC scan now, until the segment is registered
463            self.pending_segments
464                .fetch_add(1, std::sync::atomic::Ordering::Release);
465        }
466
467        Ok(segment)
468    }
469
470    fn register_segments(&self, segments: &[Segment]) -> crate::Result<()> {
471        self.index.register_segments(segments)?;
472
473        let count = self
474            .pending_segments
475            .load(std::sync::atomic::Ordering::Acquire);
476
477        assert!(
478            count >= segments.len(),
479            "pending_segments is less than segments to register - this is a bug"
480        );
481
482        self.pending_segments
483            .fetch_sub(segments.len(), std::sync::atomic::Ordering::Release);
484
485        Ok(())
486    }
487
488    fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Arc<Memtable>> {
489        self.index.lock_active_memtable()
490    }
491
492    fn set_active_memtable(&self, memtable: Memtable) {
493        self.index.set_active_memtable(memtable);
494    }
495
496    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
497        self.index.add_sealed_memtable(id, memtable);
498    }
499
500    fn compact(
501        &self,
502        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
503        seqno_threshold: SeqNo,
504    ) -> crate::Result<()> {
505        self.index.compact(strategy, seqno_threshold)
506    }
507
508    fn get_next_segment_id(&self) -> SegmentId {
509        self.index.get_next_segment_id()
510    }
511
512    fn tree_config(&self) -> &Config {
513        &self.index.config
514    }
515
516    fn get_highest_seqno(&self) -> Option<SeqNo> {
517        self.index.get_highest_seqno()
518    }
519
520    fn active_memtable_size(&self) -> u32 {
521        self.index.active_memtable_size()
522    }
523
524    fn tree_type(&self) -> crate::TreeType {
525        crate::TreeType::Blob
526    }
527
528    fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
529        self.index.rotate_memtable()
530    }
531
532    fn segment_count(&self) -> usize {
533        self.index.segment_count()
534    }
535
536    fn level_segment_count(&self, idx: usize) -> Option<usize> {
537        self.index.level_segment_count(idx)
538    }
539
540    fn approximate_len(&self) -> usize {
541        self.index.approximate_len()
542    }
543
544    // NOTE: Override the default implementation to not fetch
545    // data from the value log, so we get much faster key reads
546    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: Option<SeqNo>) -> crate::Result<bool> {
547        self.index.contains_key(key, seqno)
548    }
549
550    // NOTE: Override the default implementation to not fetch
551    // data from the value log, so we get much faster scans
552    fn len(&self, seqno: Option<SeqNo>, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
553        self.index.len(seqno, index)
554    }
555
556    fn disk_space(&self) -> u64 {
557        self.index.disk_space() + self.blobs.manifest.disk_space_used()
558    }
559
560    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
561        self.index.get_highest_memtable_seqno()
562    }
563
564    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
565        self.index.get_highest_persisted_seqno()
566    }
567
568    fn snapshot(&self, seqno: SeqNo) -> Snapshot {
569        use crate::AnyTree::Blob;
570
571        Snapshot::new(Blob(self.clone()), seqno)
572    }
573
574    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
575        &self,
576        range: R,
577        seqno: Option<SeqNo>,
578        index: Option<Arc<Memtable>>,
579    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
580        let vlog = self.blobs.clone();
581        Box::new(
582            self.index
583                .0
584                .create_range(&range, seqno, index)
585                .map(move |item| resolve_value_handle(&vlog, item)),
586        )
587    }
588
589    fn prefix<K: AsRef<[u8]>>(
590        &self,
591        prefix: K,
592        seqno: Option<SeqNo>,
593        index: Option<Arc<Memtable>>,
594    ) -> Box<dyn DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static> {
595        let vlog = self.blobs.clone();
596        Box::new(
597            self.index
598                .0
599                .create_prefix(prefix, seqno, index)
600                .map(move |item| resolve_value_handle(&vlog, item)),
601        )
602    }
603
604    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
605        &self,
606        key: K,
607        value: V,
608        seqno: SeqNo,
609    ) -> (u32, u32) {
610        use value::MaybeInlineValue;
611
612        // NOTE: Initially, we always write an inline value
613        // On memtable flush, depending on the values' sizes, they will be separated
614        // into inline or indirect values
615        let item = MaybeInlineValue::Inline(value.into());
616
617        let value = item.encode_into_vec();
618
619        self.index.insert(key, value, seqno)
620    }
621
622    fn get<K: AsRef<[u8]>>(
623        &self,
624        key: K,
625        seqno: Option<SeqNo>,
626    ) -> crate::Result<Option<crate::UserValue>> {
627        use value::MaybeInlineValue::{Indirect, Inline};
628
629        let key = key.as_ref();
630
631        let Some(value) = self.index.get_vhandle(key, seqno)? else {
632            return Ok(None);
633        };
634
635        match value {
636            Inline(bytes) => Ok(Some(bytes)),
637            Indirect { vhandle, .. } => {
638                // Resolve indirection using value log
639                match self.blobs.get(&vhandle)? {
640                    Some(bytes) => Ok(Some(bytes)),
641                    None => {
642                        panic!("value handle ({key:?} => {vhandle:?}) did not match any blob - this is a bug")
643                    }
644                }
645            }
646        }
647    }
648
649    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
650        self.index.remove(key, seqno)
651    }
652
653    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u32, u32) {
654        self.index.remove_weak(key, seqno)
655    }
656}