lsm_tree/blob_tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod gc;
6pub mod handle;
7
8#[doc(hidden)]
9pub use gc::{FragmentationEntry, FragmentationMap};
10
11use crate::{
12    coding::{Decode, Encode},
13    compaction::stream::CompactionStream,
14    file::{fsync_directory, BLOBS_FOLDER},
15    iter_guard::{IterGuard, IterGuardImpl},
16    r#abstract::{AbstractTree, RangeItem},
17    segment::Segment,
18    tree::inner::MemtableId,
19    value::InternalValue,
20    version::Version,
21    vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
22    Config, Memtable, SegmentId, SeqNo, SequenceNumberCounter, UserKey, UserValue,
23};
24use handle::BlobIndirection;
25use std::{io::Cursor, ops::RangeBounds, path::PathBuf, sync::Arc};
26
27pub struct Guard<'a> {
28    blob_tree: &'a BlobTree,
29    version: Version,
30    kv: crate::Result<InternalValue>,
31}
32
33impl IterGuard for Guard<'_> {
34    fn key(self) -> crate::Result<UserKey> {
35        self.kv.map(|kv| kv.key.user_key)
36    }
37
38    fn size(self) -> crate::Result<u32> {
39        let kv = self.kv?;
40
41        if kv.key.value_type.is_indirection() {
42            let mut cursor = Cursor::new(kv.value);
43            Ok(BlobIndirection::decode_from(&mut cursor)?.size)
44        } else {
45            // NOTE: We know that values are u32 max length
46            #[allow(clippy::cast_possible_truncation)]
47            Ok(kv.value.len() as u32)
48        }
49    }
50
51    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
52        resolve_value_handle(self.blob_tree, &self.version, self.kv?)
53    }
54}
55
56fn resolve_value_handle(tree: &BlobTree, version: &Version, item: InternalValue) -> RangeItem {
57    if item.key.value_type.is_indirection() {
58        let mut cursor = Cursor::new(item.value);
59        let vptr = BlobIndirection::decode_from(&mut cursor)?;
60
61        // Resolve indirection using value log
62        match Accessor::new(&version.value_log).get(
63            tree.id(),
64            &tree.blobs_folder,
65            &item.key.user_key,
66            &vptr.vhandle,
67            &tree.index.config.cache,
68            &tree.index.config.descriptor_table,
69        ) {
70            Ok(Some(v)) => {
71                let k = item.key.user_key;
72                Ok((k, v))
73            }
74            Ok(None) => {
75                panic!(
76                    "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
77                    item.key.user_key, vptr.vhandle,
78                    version.id(),
79                );
80            }
81            Err(e) => Err(e),
82        }
83    } else {
84        let k = item.key.user_key;
85        let v = item.value;
86        Ok((k, v))
87    }
88}
89
90/// A key-value-separated log-structured merge tree
91///
92/// This tree is a composite structure, consisting of an
93/// index tree (LSM-tree) and a log-structured value log
94/// to reduce write amplification.
95#[derive(Clone)]
96pub struct BlobTree {
97    /// Index tree that holds value handles or small inline values
98    #[doc(hidden)]
99    pub index: crate::Tree,
100
101    blobs_folder: PathBuf,
102}
103
104impl BlobTree {
105    pub(crate) fn open(config: Config) -> crate::Result<Self> {
106        let index = crate::Tree::open(config)?;
107
108        let blobs_folder = index.config.path.join(BLOBS_FOLDER);
109        std::fs::create_dir_all(&blobs_folder)?;
110        fsync_directory(&blobs_folder)?;
111
112        let blob_file_id_to_continue_with = index
113            .current_version()
114            .value_log
115            .values()
116            .map(BlobFile::id)
117            .max()
118            .map(|x| x + 1)
119            .unwrap_or_default();
120
121        index
122            .0
123            .blob_file_id_generator
124            .set(blob_file_id_to_continue_with);
125
126        Ok(Self {
127            index,
128            blobs_folder,
129        })
130    }
131}
132
133impl AbstractTree for BlobTree {
134    fn next_table_id(&self) -> SegmentId {
135        self.index.next_table_id()
136    }
137
138    fn id(&self) -> crate::TreeId {
139        self.index.id()
140    }
141
142    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
143        self.index.get_internal_entry(key, seqno)
144    }
145
146    fn current_version(&self) -> Version {
147        self.index.current_version()
148    }
149
150    fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Segment>> {
151        let Some((segment_id, yanked_memtable)) = self.index.rotate_memtable() else {
152            return Ok(None);
153        };
154
155        let Some((segment, blob_file)) =
156            self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)?
157        else {
158            return Ok(None);
159        };
160        self.register_segments(
161            std::slice::from_ref(&segment),
162            blob_file.as_ref().map(std::slice::from_ref),
163            None,
164            eviction_seqno,
165        )?;
166
167        Ok(Some(segment))
168    }
169
170    #[cfg(feature = "metrics")]
171    fn metrics(&self) -> &Arc<crate::Metrics> {
172        self.index.metrics()
173    }
174
175    fn version_free_list_len(&self) -> usize {
176        self.index.version_free_list_len()
177    }
178
179    fn prefix<K: AsRef<[u8]>>(
180        &self,
181        prefix: K,
182        seqno: SeqNo,
183        index: Option<Arc<Memtable>>,
184    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
185        use crate::range::prefix_to_range;
186
187        let range = prefix_to_range(prefix.as_ref());
188
189        let version = self.current_version();
190
191        Box::new(
192            self.index
193                .create_internal_range(&range, seqno, index)
194                .map(move |kv| {
195                    IterGuardImpl::Blob(Guard {
196                        blob_tree: self,
197                        version: version.clone(), // TODO: PERF: ugly Arc clone
198                        kv,
199                    })
200                }),
201        )
202    }
203
204    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
205        &self,
206        range: R,
207        seqno: SeqNo,
208        index: Option<Arc<Memtable>>,
209    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
210        let version = self.current_version();
211
212        // TODO: PERF: ugly Arc clone
213        Box::new(
214            self.index
215                .create_internal_range(&range, seqno, index)
216                .map(move |kv| {
217                    IterGuardImpl::Blob(Guard {
218                        blob_tree: self,
219                        version: version.clone(), // TODO: PERF: ugly Arc clone
220                        kv,
221                    })
222                }),
223        )
224    }
225
226    fn tombstone_count(&self) -> u64 {
227        self.index.tombstone_count()
228    }
229
230    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
231        self.index.drop_range(range)
232    }
233
234    fn ingest(
235        &self,
236        iter: impl Iterator<Item = (UserKey, UserValue)>,
237        seqno_generator: &SequenceNumberCounter,
238        visible_seqno: &SequenceNumberCounter,
239    ) -> crate::Result<()> {
240        use crate::tree::ingest::Ingestion;
241        use std::time::Instant;
242
243        // TODO: take curr seqno for ingest, HOWEVER
244        // TODO: we need to take the next seqno AFTER locking the memtable
245
246        todo!();
247
248        // // NOTE: Lock active memtable so nothing else can be going on while we are bulk loading
249        // let lock = self.lock_active_memtable();
250        // assert!(
251        //     lock.is_empty(),
252        //     "can only perform bulk_ingest on empty trees",
253        // );
254
255        // let mut segment_writer = Ingestion::new(&self.index)?.with_seqno(seqno);
256        // let mut blob_writer = self.blobs.get_writer()?;
257
258        // let start = Instant::now();
259        // let mut count = 0;
260        // let mut last_key = None;
261
262        // for (key, value) in iter {
263        //     if let Some(last_key) = &last_key {
264        //         assert!(
265        //             key > last_key,
266        //             "next key in bulk ingest was not greater than last key",
267        //         );
268        //     }
269        //     last_key = Some(key.clone());
270
271        //     // NOTE: Values are 32-bit max
272        //     #[allow(clippy::cast_possible_truncation)]
273        //     let value_size = value.len() as u32;
274
275        //     if value_size >= self.index.config.blob_file_separation_threshold {
276        //         let vhandle = blob_writer.get_next_value_handle();
277
278        //         let indirection = MaybeInlineValue::Indirect {
279        //             vhandle,
280        //             size: value_size,
281        //         };
282        //         // TODO: use Slice::with_size
283        //         let mut serialized_indirection = vec![];
284        //         indirection.encode_into(&mut serialized_indirection)?;
285
286        //         segment_writer.write(key.clone(), serialized_indirection.into())?;
287
288        //         blob_writer.write(&key, value)?;
289        //     } else {
290        //         // TODO: use Slice::with_size
291        //         let direct = MaybeInlineValue::Inline(value);
292        //         let serialized_direct = direct.encode_into_vec();
293        //         segment_writer.write(key, serialized_direct.into())?;
294        //     }
295
296        //     count += 1;
297        // }
298
299        // // TODO: add to manifest + unit test
300        // // self.blobs.register_writer(blob_writer)?;
301        // // segment_writer.finish()?;
302
303        // TODO: increaes visible seqno
304
305        // log::info!("Ingested {count} items in {:?}", start.elapsed());
306
307        Ok(())
308    }
309
310    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
311        self.index.major_compact(target_size, seqno_threshold)
312    }
313
314    fn clear_active_memtable(&self) {
315        self.index.clear_active_memtable();
316    }
317
318    fn l0_run_count(&self) -> usize {
319        self.index.l0_run_count()
320    }
321
322    fn blob_file_count(&self) -> usize {
323        self.current_version().blob_file_count()
324    }
325
326    // NOTE: We skip reading from the value log
327    // because the vHandles already store the value size
328    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
329        let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
330            return Ok(None);
331        };
332
333        Ok(Some(if item.key.value_type.is_indirection() {
334            let mut cursor = Cursor::new(item.value);
335            let vptr = BlobIndirection::decode_from(&mut cursor)?;
336            vptr.size
337        } else {
338            // NOTE: Values are u32 length max
339            #[allow(clippy::cast_possible_truncation)]
340            {
341                item.value.len() as u32
342            }
343        }))
344    }
345
346    fn stale_blob_bytes(&self) -> u64 {
347        self.current_version().gc_stats().stale_bytes()
348    }
349
350    fn filter_size(&self) -> usize {
351        self.index.filter_size()
352    }
353
354    fn pinned_filter_size(&self) -> usize {
355        self.index.pinned_filter_size()
356    }
357
358    fn pinned_block_index_size(&self) -> usize {
359        self.index.pinned_block_index_size()
360    }
361
362    fn sealed_memtable_count(&self) -> usize {
363        self.index.sealed_memtable_count()
364    }
365
366    fn flush_memtable(
367        &self,
368        segment_id: SegmentId,
369        memtable: &Arc<Memtable>,
370        eviction_seqno: SeqNo,
371    ) -> crate::Result<Option<(Segment, Option<BlobFile>)>> {
372        use crate::{file::SEGMENTS_FOLDER, segment::Writer as SegmentWriter};
373
374        let lsm_segment_folder = self.index.config.path.join(SEGMENTS_FOLDER);
375
376        log::debug!("Flushing memtable & performing key-value separation");
377        log::debug!("=> to LSM table in {}", lsm_segment_folder.display());
378        log::debug!("=> to blob file at {}", self.blobs_folder.display());
379
380        let mut segment_writer =
381            SegmentWriter::new(lsm_segment_folder.join(segment_id.to_string()), segment_id)?
382                // TODO: apply other policies
383                .use_data_block_compression(self.index.config.data_block_compression_policy.get(0))
384                .use_bloom_policy({
385                    use crate::config::FilterPolicyEntry::{Bloom, None};
386                    use crate::segment::filter::BloomConstructionPolicy;
387
388                    match self.index.config.filter_policy.get(0) {
389                        Bloom(policy) => policy,
390                        None => BloomConstructionPolicy::BitsPerKey(0.0),
391                    }
392                });
393
394        let mut blob_writer = BlobFileWriter::new(
395            self.index.0.blob_file_id_generator.clone(),
396            u64::MAX, // TODO: actually use target size? but be sure to link to table correctly
397            self.index.config.path.join(BLOBS_FOLDER),
398        )?
399        .use_compression(
400            self.index
401                .config
402                .kv_separation_opts
403                .as_ref()
404                .expect("blob options should exist")
405                .compression,
406        );
407
408        let iter = memtable.iter().map(Ok);
409        let compaction_filter = CompactionStream::new(iter, eviction_seqno);
410
411        let mut blob_bytes_referenced = 0;
412        let mut blobs_referenced_count = 0;
413
414        let separation_threshold = self
415            .index
416            .config
417            .kv_separation_opts
418            .as_ref()
419            .expect("kv separation options should exist")
420            .separation_threshold;
421
422        for item in compaction_filter {
423            let item = item?;
424
425            if item.is_tombstone() {
426                // NOTE: Still need to add tombstone to index tree
427                // But no blob to blob writer
428                segment_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
429                continue;
430            }
431
432            let value = item.value;
433
434            // NOTE: Values are 32-bit max
435            #[allow(clippy::cast_possible_truncation)]
436            let value_size = value.len() as u32;
437
438            if value_size >= separation_threshold {
439                let offset = blob_writer.offset();
440                let blob_file_id = blob_writer.blob_file_id();
441                let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
442
443                let indirection = BlobIndirection {
444                    vhandle: ValueHandle {
445                        blob_file_id,
446                        offset,
447                        on_disk_size,
448                    },
449                    size: value_size,
450                };
451
452                segment_writer.write({
453                    let mut vptr =
454                        InternalValue::new(item.key.clone(), indirection.encode_into_vec());
455                    vptr.key.value_type = crate::ValueType::Indirection;
456                    vptr
457                })?;
458
459                blob_bytes_referenced += u64::from(value_size);
460                blobs_referenced_count += 1;
461            } else {
462                segment_writer.write(InternalValue::new(item.key, value))?;
463            }
464        }
465
466        log::trace!("Creating blob file");
467        let blob_files = blob_writer.finish()?;
468        assert!(blob_files.len() <= 1);
469        let blob_file = blob_files.into_iter().next();
470
471        log::trace!("Creating LSM-tree segment {segment_id}");
472
473        if blob_bytes_referenced > 0 {
474            if let Some(blob_file) = &blob_file {
475                segment_writer.link_blob_file(
476                    blob_file.id(),
477                    blob_bytes_referenced,
478                    blobs_referenced_count,
479                );
480            }
481        }
482
483        let segment = self.index.consume_writer(segment_writer)?;
484
485        Ok(segment.map(|segment| (segment, blob_file)))
486    }
487
488    fn register_segments(
489        &self,
490        segments: &[Segment],
491        blob_files: Option<&[BlobFile]>,
492        frag_map: Option<FragmentationMap>,
493        seqno_threshold: SeqNo,
494    ) -> crate::Result<()> {
495        self.index
496            .register_segments(segments, blob_files, frag_map, seqno_threshold)
497    }
498
499    fn set_active_memtable(&self, memtable: Memtable) {
500        self.index.set_active_memtable(memtable);
501    }
502
503    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
504        self.index.add_sealed_memtable(id, memtable);
505    }
506
507    fn compact(
508        &self,
509        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
510        seqno_threshold: SeqNo,
511    ) -> crate::Result<()> {
512        self.index.compact(strategy, seqno_threshold)
513    }
514
515    fn get_next_segment_id(&self) -> SegmentId {
516        self.index.get_next_segment_id()
517    }
518
519    fn tree_config(&self) -> &Config {
520        &self.index.config
521    }
522
523    fn get_highest_seqno(&self) -> Option<SeqNo> {
524        self.index.get_highest_seqno()
525    }
526
527    fn active_memtable_size(&self) -> u64 {
528        self.index.active_memtable_size()
529    }
530
531    fn tree_type(&self) -> crate::TreeType {
532        crate::TreeType::Blob
533    }
534
535    fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
536        self.index.rotate_memtable()
537    }
538
539    fn segment_count(&self) -> usize {
540        self.index.segment_count()
541    }
542
543    fn level_segment_count(&self, idx: usize) -> Option<usize> {
544        self.index.level_segment_count(idx)
545    }
546
547    fn approximate_len(&self) -> usize {
548        self.index.approximate_len()
549    }
550
551    // NOTE: Override the default implementation to not fetch
552    // data from the value log, so we get much faster key reads
553    fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
554        self.index.is_empty(seqno, index)
555    }
556
557    // NOTE: Override the default implementation to not fetch
558    // data from the value log, so we get much faster key reads
559    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
560        self.index.contains_key(key, seqno)
561    }
562
563    // NOTE: Override the default implementation to not fetch
564    // data from the value log, so we get much faster scans
565    fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
566        self.index.len(seqno, index)
567    }
568
569    fn disk_space(&self) -> u64 {
570        let version = self.current_version();
571        let vlog = crate::vlog::Accessor::new(&version.value_log);
572        self.index.disk_space() + vlog.disk_space()
573    }
574
575    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
576        self.index.get_highest_memtable_seqno()
577    }
578
579    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
580        self.index.get_highest_persisted_seqno()
581    }
582
583    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
584        &self,
585        key: K,
586        value: V,
587        seqno: SeqNo,
588    ) -> (u64, u64) {
589        self.index.insert(key, value.into(), seqno)
590    }
591
592    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
593        let key = key.as_ref();
594
595        // TODO: refactor memtable, sealed memtables, manifest lock to be a single lock (SuperVersion kind of)
596        // TODO: then, try to reduce the lock access to 1, because we are accessing it twice (index.get, and then vhandle resolving...)
597
598        let Some(item) = self.index.get_internal_entry(key, seqno)? else {
599            return Ok(None);
600        };
601
602        let version = self.current_version();
603        let (_, v) = resolve_value_handle(self, &version, item)?;
604
605        Ok(Some(v))
606    }
607
608    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
609        self.index.remove(key, seqno)
610    }
611
612    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
613        self.index.remove_weak(key, seqno)
614    }
615}