lsm_tree/blob_tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod gc;
6pub mod handle;
7
8#[doc(hidden)]
9pub use gc::{FragmentationEntry, FragmentationMap};
10
11use crate::{
12    coding::{Decode, Encode},
13    compaction::stream::CompactionStream,
14    file::{fsync_directory, BLOBS_FOLDER},
15    iter_guard::{IterGuard, IterGuardImpl},
16    r#abstract::{AbstractTree, RangeItem},
17    table::Table,
18    tree::inner::MemtableId,
19    value::InternalValue,
20    version::Version,
21    vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
22    Config, Memtable, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
23};
24use handle::BlobIndirection;
25use std::{io::Cursor, ops::RangeBounds, path::PathBuf, sync::Arc};
26
27pub struct Guard<'a> {
28    blob_tree: &'a BlobTree,
29    version: Version,
30    kv: crate::Result<InternalValue>,
31}
32
33impl IterGuard for Guard<'_> {
34    fn key(self) -> crate::Result<UserKey> {
35        self.kv.map(|kv| kv.key.user_key)
36    }
37
38    fn size(self) -> crate::Result<u32> {
39        let kv = self.kv?;
40
41        if kv.key.value_type.is_indirection() {
42            let mut cursor = Cursor::new(kv.value);
43            Ok(BlobIndirection::decode_from(&mut cursor)?.size)
44        } else {
45            // NOTE: We know that values are u32 max length
46            #[allow(clippy::cast_possible_truncation)]
47            Ok(kv.value.len() as u32)
48        }
49    }
50
51    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
52        resolve_value_handle(self.blob_tree, &self.version, self.kv?)
53    }
54}
55
56fn resolve_value_handle(tree: &BlobTree, version: &Version, item: InternalValue) -> RangeItem {
57    if item.key.value_type.is_indirection() {
58        let mut cursor = Cursor::new(item.value);
59        let vptr = BlobIndirection::decode_from(&mut cursor)?;
60
61        // Resolve indirection using value log
62        match Accessor::new(&version.blob_files).get(
63            tree.id(),
64            &tree.blobs_folder,
65            &item.key.user_key,
66            &vptr.vhandle,
67            &tree.index.config.cache,
68            &tree.index.config.descriptor_table,
69        ) {
70            Ok(Some(v)) => {
71                let k = item.key.user_key;
72                Ok((k, v))
73            }
74            Ok(None) => {
75                panic!(
76                    "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
77                    item.key.user_key, vptr.vhandle,
78                    version.id(),
79                );
80            }
81            Err(e) => Err(e),
82        }
83    } else {
84        let k = item.key.user_key;
85        let v = item.value;
86        Ok((k, v))
87    }
88}
89
90/// A key-value-separated log-structured merge tree
91///
92/// This tree is a composite structure, consisting of an
93/// index tree (LSM-tree) and a log-structured value log
94/// to reduce write amplification.
95#[derive(Clone)]
96pub struct BlobTree {
97    /// Index tree that holds value handles or small inline values
98    #[doc(hidden)]
99    pub index: crate::Tree,
100
101    blobs_folder: PathBuf,
102}
103
104impl BlobTree {
105    pub(crate) fn open(config: Config) -> crate::Result<Self> {
106        let index = crate::Tree::open(config)?;
107
108        let blobs_folder = index.config.path.join(BLOBS_FOLDER);
109        std::fs::create_dir_all(&blobs_folder)?;
110        fsync_directory(&blobs_folder)?;
111
112        let blob_file_id_to_continue_with = index
113            .current_version()
114            .blob_files
115            .list_ids()
116            .max()
117            .map(|x| x + 1)
118            .unwrap_or_default();
119
120        index
121            .0
122            .blob_file_id_generator
123            .set(blob_file_id_to_continue_with);
124
125        Ok(Self {
126            index,
127            blobs_folder,
128        })
129    }
130}
131
132impl AbstractTree for BlobTree {
133    fn next_table_id(&self) -> TableId {
134        self.index.next_table_id()
135    }
136
137    fn id(&self) -> crate::TreeId {
138        self.index.id()
139    }
140
141    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
142        self.index.get_internal_entry(key, seqno)
143    }
144
145    fn current_version(&self) -> Version {
146        self.index.current_version()
147    }
148
149    fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Table>> {
150        let Some((table_id, yanked_memtable)) = self.index.rotate_memtable() else {
151            return Ok(None);
152        };
153
154        let Some((table, blob_file)) =
155            self.flush_memtable(table_id, &yanked_memtable, eviction_seqno)?
156        else {
157            return Ok(None);
158        };
159        self.register_tables(
160            std::slice::from_ref(&table),
161            blob_file.as_ref().map(std::slice::from_ref),
162            None,
163            eviction_seqno,
164        )?;
165
166        Ok(Some(table))
167    }
168
169    #[cfg(feature = "metrics")]
170    fn metrics(&self) -> &Arc<crate::Metrics> {
171        self.index.metrics()
172    }
173
174    fn version_free_list_len(&self) -> usize {
175        self.index.version_free_list_len()
176    }
177
178    fn prefix<K: AsRef<[u8]>>(
179        &self,
180        prefix: K,
181        seqno: SeqNo,
182        index: Option<Arc<Memtable>>,
183    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
184        use crate::range::prefix_to_range;
185
186        let range = prefix_to_range(prefix.as_ref());
187
188        let version = self.current_version();
189
190        Box::new(
191            self.index
192                .create_internal_range(&range, seqno, index)
193                .map(move |kv| {
194                    IterGuardImpl::Blob(Guard {
195                        blob_tree: self,
196                        version: version.clone(), // TODO: PERF: ugly Arc clone
197                        kv,
198                    })
199                }),
200        )
201    }
202
203    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
204        &self,
205        range: R,
206        seqno: SeqNo,
207        index: Option<Arc<Memtable>>,
208    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
209        let version = self.current_version();
210
211        // TODO: PERF: ugly Arc clone
212        Box::new(
213            self.index
214                .create_internal_range(&range, seqno, index)
215                .map(move |kv| {
216                    IterGuardImpl::Blob(Guard {
217                        blob_tree: self,
218                        version: version.clone(), // TODO: PERF: ugly Arc clone
219                        kv,
220                    })
221                }),
222        )
223    }
224
225    fn tombstone_count(&self) -> u64 {
226        self.index.tombstone_count()
227    }
228
229    fn weak_tombstone_count(&self) -> u64 {
230        self.index.weak_tombstone_count()
231    }
232
233    fn weak_tombstone_reclaimable_count(&self) -> u64 {
234        self.index.weak_tombstone_reclaimable_count()
235    }
236
237    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
238        self.index.drop_range(range)
239    }
240
241    fn ingest(
242        &self,
243        iter: impl Iterator<Item = (UserKey, UserValue)>,
244        seqno_generator: &SequenceNumberCounter,
245        visible_seqno: &SequenceNumberCounter,
246    ) -> crate::Result<()> {
247        use crate::tree::ingest::Ingestion;
248        use std::time::Instant;
249
250        // TODO: take curr seqno for ingest, HOWEVER
251        // TODO: we need to take the next seqno AFTER locking the memtable
252
253        todo!();
254
255        // // NOTE: Lock active memtable so nothing else can be going on while we are bulk loading
256        // let lock = self.lock_active_memtable();
257        // assert!(
258        //     lock.is_empty(),
259        //     "can only perform bulk_ingest on empty trees",
260        // );
261
262        // let mut segment_writer = Ingestion::new(&self.index)?.with_seqno(seqno);
263        // let mut blob_writer = self.blobs.get_writer()?;
264
265        // let start = Instant::now();
266        // let mut count = 0;
267        // let mut last_key = None;
268
269        // for (key, value) in iter {
270        //     if let Some(last_key) = &last_key {
271        //         assert!(
272        //             key > last_key,
273        //             "next key in bulk ingest was not greater than last key",
274        //         );
275        //     }
276        //     last_key = Some(key.clone());
277
278        //     // NOTE: Values are 32-bit max
279        //     #[allow(clippy::cast_possible_truncation)]
280        //     let value_size = value.len() as u32;
281
282        //     if value_size >= self.index.config.blob_file_separation_threshold {
283        //         let vhandle = blob_writer.get_next_value_handle();
284
285        //         let indirection = MaybeInlineValue::Indirect {
286        //             vhandle,
287        //             size: value_size,
288        //         };
289        //         // TODO: use Slice::with_size
290        //         let mut serialized_indirection = vec![];
291        //         indirection.encode_into(&mut serialized_indirection)?;
292
293        //         segment_writer.write(key.clone(), serialized_indirection.into())?;
294
295        //         blob_writer.write(&key, value)?;
296        //     } else {
297        //         // TODO: use Slice::with_size
298        //         let direct = MaybeInlineValue::Inline(value);
299        //         let serialized_direct = direct.encode_into_vec();
300        //         segment_writer.write(key, serialized_direct.into())?;
301        //     }
302
303        //     count += 1;
304        // }
305
306        // // TODO: add to manifest + unit test
307        // // self.blobs.register_writer(blob_writer)?;
308        // // segment_writer.finish()?;
309
310        // TODO: increaes visible seqno
311
312        // log::info!("Ingested {count} items in {:?}", start.elapsed());
313
314        Ok(())
315    }
316
317    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
318        self.index.major_compact(target_size, seqno_threshold)
319    }
320
321    fn clear_active_memtable(&self) {
322        self.index.clear_active_memtable();
323    }
324
325    fn l0_run_count(&self) -> usize {
326        self.index.l0_run_count()
327    }
328
329    fn blob_file_count(&self) -> usize {
330        self.current_version().blob_file_count()
331    }
332
333    // NOTE: We skip reading from the value log
334    // because the vHandles already store the value size
335    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
336        let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
337            return Ok(None);
338        };
339
340        Ok(Some(if item.key.value_type.is_indirection() {
341            let mut cursor = Cursor::new(item.value);
342            let vptr = BlobIndirection::decode_from(&mut cursor)?;
343            vptr.size
344        } else {
345            // NOTE: Values are u32 length max
346            #[allow(clippy::cast_possible_truncation)]
347            {
348                item.value.len() as u32
349            }
350        }))
351    }
352
353    fn stale_blob_bytes(&self) -> u64 {
354        self.current_version().gc_stats().stale_bytes()
355    }
356
357    fn filter_size(&self) -> usize {
358        self.index.filter_size()
359    }
360
361    fn pinned_filter_size(&self) -> usize {
362        self.index.pinned_filter_size()
363    }
364
365    fn pinned_block_index_size(&self) -> usize {
366        self.index.pinned_block_index_size()
367    }
368
369    fn sealed_memtable_count(&self) -> usize {
370        self.index.sealed_memtable_count()
371    }
372
373    fn flush_memtable(
374        &self,
375        table_id: TableId,
376        memtable: &Arc<Memtable>,
377        eviction_seqno: SeqNo,
378    ) -> crate::Result<Option<(Table, Option<BlobFile>)>> {
379        use crate::{file::TABLES_FOLDER, table::Writer as TableWriter};
380
381        let table_folder = self.index.config.path.join(TABLES_FOLDER);
382
383        log::debug!("Flushing memtable & performing key-value separation");
384        log::debug!("=> to table in {}", table_folder.display());
385        log::debug!("=> to blob file at {}", self.blobs_folder.display());
386
387        let mut table_writer = TableWriter::new(table_folder.join(table_id.to_string()), table_id)?
388            // TODO: apply other policies
389            .use_data_block_compression(self.index.config.data_block_compression_policy.get(0))
390            .use_bloom_policy({
391                use crate::config::FilterPolicyEntry::{Bloom, None};
392                use crate::table::filter::BloomConstructionPolicy;
393
394                match self.index.config.filter_policy.get(0) {
395                    Bloom(policy) => policy,
396                    None => BloomConstructionPolicy::BitsPerKey(0.0),
397                }
398            });
399
400        let mut blob_writer = BlobFileWriter::new(
401            self.index.0.blob_file_id_generator.clone(),
402            u64::MAX, // TODO: actually use target size? but be sure to link to table correctly
403            self.index.config.path.join(BLOBS_FOLDER),
404        )?
405        .use_compression(
406            self.index
407                .config
408                .kv_separation_opts
409                .as_ref()
410                .expect("blob options should exist")
411                .compression,
412        );
413
414        let iter = memtable.iter().map(Ok);
415        let compaction_stream = CompactionStream::new(iter, eviction_seqno);
416
417        let mut blob_bytes_referenced = 0;
418        let mut blob_on_disk_bytes_referenced = 0;
419        let mut blobs_referenced_count = 0;
420
421        let separation_threshold = self
422            .index
423            .config
424            .kv_separation_opts
425            .as_ref()
426            .expect("kv separation options should exist")
427            .separation_threshold;
428
429        for item in compaction_stream {
430            let item = item?;
431
432            if item.is_tombstone() {
433                // NOTE: Still need to add tombstone to index tree
434                // But no blob to blob writer
435                table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
436                continue;
437            }
438
439            let value = item.value;
440
441            // NOTE: Values are 32-bit max
442            #[allow(clippy::cast_possible_truncation)]
443            let value_size = value.len() as u32;
444
445            if value_size >= separation_threshold {
446                let offset = blob_writer.offset();
447                let blob_file_id = blob_writer.blob_file_id();
448                let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
449
450                let indirection = BlobIndirection {
451                    vhandle: ValueHandle {
452                        blob_file_id,
453                        offset,
454                        on_disk_size,
455                    },
456                    size: value_size,
457                };
458
459                table_writer.write({
460                    let mut vptr =
461                        InternalValue::new(item.key.clone(), indirection.encode_into_vec());
462                    vptr.key.value_type = crate::ValueType::Indirection;
463                    vptr
464                })?;
465
466                blob_bytes_referenced += u64::from(value_size);
467                blob_on_disk_bytes_referenced += u64::from(on_disk_size);
468                blobs_referenced_count += 1;
469            } else {
470                table_writer.write(InternalValue::new(item.key, value))?;
471            }
472        }
473
474        log::trace!("Creating blob file");
475        let blob_files = blob_writer.finish()?;
476        assert!(blob_files.len() <= 1);
477        let blob_file = blob_files.into_iter().next();
478
479        log::trace!("Creating LSM-tree table {table_id}");
480
481        if blob_bytes_referenced > 0 {
482            if let Some(blob_file) = &blob_file {
483                table_writer.link_blob_file(
484                    blob_file.id(),
485                    blobs_referenced_count,
486                    blob_bytes_referenced,
487                    blob_on_disk_bytes_referenced,
488                );
489            }
490        }
491
492        let table = self.index.consume_writer(table_writer)?;
493
494        Ok(table.map(|table| (table, blob_file)))
495    }
496
497    fn register_tables(
498        &self,
499        tables: &[Table],
500        blob_files: Option<&[BlobFile]>,
501        frag_map: Option<FragmentationMap>,
502        seqno_threshold: SeqNo,
503    ) -> crate::Result<()> {
504        self.index
505            .register_tables(tables, blob_files, frag_map, seqno_threshold)
506    }
507
508    fn set_active_memtable(&self, memtable: Memtable) {
509        self.index.set_active_memtable(memtable);
510    }
511
512    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
513        self.index.add_sealed_memtable(id, memtable);
514    }
515
516    fn compact(
517        &self,
518        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
519        seqno_threshold: SeqNo,
520    ) -> crate::Result<()> {
521        self.index.compact(strategy, seqno_threshold)
522    }
523
524    fn get_next_table_id(&self) -> TableId {
525        self.index.get_next_table_id()
526    }
527
528    fn tree_config(&self) -> &Config {
529        &self.index.config
530    }
531
532    fn get_highest_seqno(&self) -> Option<SeqNo> {
533        self.index.get_highest_seqno()
534    }
535
536    fn active_memtable_size(&self) -> u64 {
537        self.index.active_memtable_size()
538    }
539
540    fn tree_type(&self) -> crate::TreeType {
541        crate::TreeType::Blob
542    }
543
544    fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
545        self.index.rotate_memtable()
546    }
547
548    fn table_count(&self) -> usize {
549        self.index.table_count()
550    }
551
552    fn level_table_count(&self, idx: usize) -> Option<usize> {
553        self.index.level_table_count(idx)
554    }
555
556    fn approximate_len(&self) -> usize {
557        self.index.approximate_len()
558    }
559
560    // NOTE: Override the default implementation to not fetch
561    // data from the value log, so we get much faster key reads
562    fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
563        self.index.is_empty(seqno, index)
564    }
565
566    // NOTE: Override the default implementation to not fetch
567    // data from the value log, so we get much faster key reads
568    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
569        self.index.contains_key(key, seqno)
570    }
571
572    // NOTE: Override the default implementation to not fetch
573    // data from the value log, so we get much faster scans
574    fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
575        self.index.len(seqno, index)
576    }
577
578    fn disk_space(&self) -> u64 {
579        let version = self.current_version();
580        self.index.disk_space() + version.blob_files.on_disk_size()
581    }
582
583    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
584        self.index.get_highest_memtable_seqno()
585    }
586
587    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
588        self.index.get_highest_persisted_seqno()
589    }
590
591    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
592        &self,
593        key: K,
594        value: V,
595        seqno: SeqNo,
596    ) -> (u64, u64) {
597        self.index.insert(key, value.into(), seqno)
598    }
599
600    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
601        let key = key.as_ref();
602
603        // TODO: refactor memtable, sealed memtables, manifest lock to be a single lock (SuperVersion kind of)
604        // TODO: then, try to reduce the lock access to 1, because we are accessing it twice (index.get, and then vhandle resolving...)
605
606        let Some(item) = self.index.get_internal_entry(key, seqno)? else {
607            return Ok(None);
608        };
609
610        let version = self.current_version();
611        let (_, v) = resolve_value_handle(self, &version, item)?;
612
613        Ok(Some(v))
614    }
615
616    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
617        self.index.remove(key, seqno)
618    }
619
620    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
621        self.index.remove_weak(key, seqno)
622    }
623}