lsm_tree/blob_tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod gc;
6pub mod handle;
7
8#[doc(hidden)]
9pub use gc::{FragmentationEntry, FragmentationMap};
10
11use crate::{
12    coding::{Decode, Encode},
13    compaction::stream::CompactionStream,
14    file::{fsync_directory, BLOBS_FOLDER},
15    iter_guard::{IterGuard, IterGuardImpl},
16    r#abstract::{AbstractTree, RangeItem},
17    table::Table,
18    tree::inner::MemtableId,
19    value::InternalValue,
20    version::Version,
21    vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
22    Config, Memtable, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
23};
24use handle::BlobIndirection;
25use std::{io::Cursor, ops::RangeBounds, path::PathBuf, sync::Arc};
26
27pub struct Guard<'a> {
28    blob_tree: &'a BlobTree,
29    version: Version,
30    kv: crate::Result<InternalValue>,
31}
32
33impl IterGuard for Guard<'_> {
34    fn key(self) -> crate::Result<UserKey> {
35        self.kv.map(|kv| kv.key.user_key)
36    }
37
38    fn size(self) -> crate::Result<u32> {
39        let kv = self.kv?;
40
41        if kv.key.value_type.is_indirection() {
42            let mut cursor = Cursor::new(kv.value);
43            Ok(BlobIndirection::decode_from(&mut cursor)?.size)
44        } else {
45            #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
46            Ok(kv.value.len() as u32)
47        }
48    }
49
50    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
51        resolve_value_handle(self.blob_tree, &self.version, self.kv?)
52    }
53}
54
55fn resolve_value_handle(tree: &BlobTree, version: &Version, item: InternalValue) -> RangeItem {
56    if item.key.value_type.is_indirection() {
57        let mut cursor = Cursor::new(item.value);
58        let vptr = BlobIndirection::decode_from(&mut cursor)?;
59
60        // Resolve indirection using value log
61        match Accessor::new(&version.blob_files).get(
62            tree.id(),
63            &tree.blobs_folder,
64            &item.key.user_key,
65            &vptr.vhandle,
66            &tree.index.config.cache,
67            &tree.index.config.descriptor_table,
68        ) {
69            Ok(Some(v)) => {
70                let k = item.key.user_key;
71                Ok((k, v))
72            }
73            Ok(None) => {
74                panic!(
75                    "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
76                    item.key.user_key, vptr.vhandle,
77                    version.id(),
78                );
79            }
80            Err(e) => Err(e),
81        }
82    } else {
83        let k = item.key.user_key;
84        let v = item.value;
85        Ok((k, v))
86    }
87}
88
89/// A key-value-separated log-structured merge tree
90///
91/// This tree is a composite structure, consisting of an
92/// index tree (LSM-tree) and a log-structured value log
93/// to reduce write amplification.
94#[derive(Clone)]
95pub struct BlobTree {
96    /// Index tree that holds value handles or small inline values
97    #[doc(hidden)]
98    pub index: crate::Tree,
99
100    blobs_folder: PathBuf,
101}
102
103impl BlobTree {
104    pub(crate) fn open(config: Config) -> crate::Result<Self> {
105        let index = crate::Tree::open(config)?;
106
107        let blobs_folder = index.config.path.join(BLOBS_FOLDER);
108        std::fs::create_dir_all(&blobs_folder)?;
109        fsync_directory(&blobs_folder)?;
110
111        let blob_file_id_to_continue_with = index
112            .current_version()
113            .blob_files
114            .list_ids()
115            .max()
116            .map(|x| x + 1)
117            .unwrap_or_default();
118
119        index
120            .0
121            .blob_file_id_generator
122            .set(blob_file_id_to_continue_with);
123
124        Ok(Self {
125            index,
126            blobs_folder,
127        })
128    }
129}
130
131impl AbstractTree for BlobTree {
132    fn next_table_id(&self) -> TableId {
133        self.index.next_table_id()
134    }
135
136    fn id(&self) -> crate::TreeId {
137        self.index.id()
138    }
139
140    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
141        self.index.get_internal_entry(key, seqno)
142    }
143
144    fn current_version(&self) -> Version {
145        self.index.current_version()
146    }
147
148    fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Table>> {
149        let Some((table_id, yanked_memtable)) = self.index.rotate_memtable() else {
150            return Ok(None);
151        };
152
153        let Some((table, blob_file)) =
154            self.flush_memtable(table_id, &yanked_memtable, eviction_seqno)?
155        else {
156            return Ok(None);
157        };
158        self.register_tables(
159            std::slice::from_ref(&table),
160            blob_file.as_ref().map(std::slice::from_ref),
161            None,
162            eviction_seqno,
163        )?;
164
165        Ok(Some(table))
166    }
167
168    #[cfg(feature = "metrics")]
169    fn metrics(&self) -> &Arc<crate::Metrics> {
170        self.index.metrics()
171    }
172
173    fn version_free_list_len(&self) -> usize {
174        self.index.version_free_list_len()
175    }
176
177    fn prefix<K: AsRef<[u8]>>(
178        &self,
179        prefix: K,
180        seqno: SeqNo,
181        index: Option<Arc<Memtable>>,
182    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
183        use crate::range::prefix_to_range;
184
185        let range = prefix_to_range(prefix.as_ref());
186
187        let version = self.current_version();
188
189        Box::new(
190            self.index
191                .create_internal_range(&range, seqno, index)
192                .map(move |kv| {
193                    IterGuardImpl::Blob(Guard {
194                        blob_tree: self,
195                        version: version.clone(), // TODO: PERF: ugly Arc clone
196                        kv,
197                    })
198                }),
199        )
200    }
201
202    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
203        &self,
204        range: R,
205        seqno: SeqNo,
206        index: Option<Arc<Memtable>>,
207    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
208        let version = self.current_version();
209
210        // TODO: PERF: ugly Arc clone
211        Box::new(
212            self.index
213                .create_internal_range(&range, seqno, index)
214                .map(move |kv| {
215                    IterGuardImpl::Blob(Guard {
216                        blob_tree: self,
217                        version: version.clone(), // TODO: PERF: ugly Arc clone
218                        kv,
219                    })
220                }),
221        )
222    }
223
224    fn tombstone_count(&self) -> u64 {
225        self.index.tombstone_count()
226    }
227
228    fn weak_tombstone_count(&self) -> u64 {
229        self.index.weak_tombstone_count()
230    }
231
232    fn weak_tombstone_reclaimable_count(&self) -> u64 {
233        self.index.weak_tombstone_reclaimable_count()
234    }
235
236    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
237        self.index.drop_range(range)
238    }
239
240    fn ingest(
241        &self,
242        iter: impl Iterator<Item = (UserKey, UserValue)>,
243        seqno_generator: &SequenceNumberCounter,
244        visible_seqno: &SequenceNumberCounter,
245    ) -> crate::Result<()> {
246        use crate::tree::ingest::Ingestion;
247        use std::time::Instant;
248
249        // TODO: take curr seqno for ingest, HOWEVER
250        // TODO: we need to take the next seqno AFTER locking the memtable
251
252        todo!();
253
254        // // NOTE: Lock active memtable so nothing else can be going on while we are bulk loading
255        // let lock = self.lock_active_memtable();
256        // assert!(
257        //     lock.is_empty(),
258        //     "can only perform bulk_ingest on empty trees",
259        // );
260
261        // let mut segment_writer = Ingestion::new(&self.index)?.with_seqno(seqno);
262        // let mut blob_writer = self.blobs.get_writer()?;
263
264        // let start = Instant::now();
265        // let mut count = 0;
266        // let mut last_key = None;
267
268        // for (key, value) in iter {
269        //     if let Some(last_key) = &last_key {
270        //         assert!(
271        //             key > last_key,
272        //             "next key in bulk ingest was not greater than last key",
273        //         );
274        //     }
275        //     last_key = Some(key.clone());
276
277        //     // NOTE: Values are 32-bit max
278        //     #[allow(clippy::cast_possible_truncation)]
279        //     let value_size = value.len() as u32;
280
281        //     if value_size >= self.index.config.blob_file_separation_threshold {
282        //         let vhandle = blob_writer.get_next_value_handle();
283
284        //         let indirection = MaybeInlineValue::Indirect {
285        //             vhandle,
286        //             size: value_size,
287        //         };
288        //         // TODO: use Slice::with_size
289        //         let mut serialized_indirection = vec![];
290        //         indirection.encode_into(&mut serialized_indirection)?;
291
292        //         segment_writer.write(key.clone(), serialized_indirection.into())?;
293
294        //         blob_writer.write(&key, value)?;
295        //     } else {
296        //         // TODO: use Slice::with_size
297        //         let direct = MaybeInlineValue::Inline(value);
298        //         let serialized_direct = direct.encode_into_vec();
299        //         segment_writer.write(key, serialized_direct.into())?;
300        //     }
301
302        //     count += 1;
303        // }
304
305        // // TODO: add to manifest + unit test
306        // // self.blobs.register_writer(blob_writer)?;
307        // // segment_writer.finish()?;
308
309        // TODO: increaes visible seqno
310
311        // log::info!("Ingested {count} items in {:?}", start.elapsed());
312
313        Ok(())
314    }
315
316    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
317        self.index.major_compact(target_size, seqno_threshold)
318    }
319
320    fn clear_active_memtable(&self) {
321        self.index.clear_active_memtable();
322    }
323
324    fn l0_run_count(&self) -> usize {
325        self.index.l0_run_count()
326    }
327
328    fn blob_file_count(&self) -> usize {
329        self.current_version().blob_file_count()
330    }
331
332    // NOTE: We skip reading from the value log
333    // because the vHandles already store the value size
334    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
335        let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
336            return Ok(None);
337        };
338
339        Ok(Some(if item.key.value_type.is_indirection() {
340            let mut cursor = Cursor::new(item.value);
341            let vptr = BlobIndirection::decode_from(&mut cursor)?;
342            vptr.size
343        } else {
344            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
345            {
346                item.value.len() as u32
347            }
348        }))
349    }
350
351    fn stale_blob_bytes(&self) -> u64 {
352        self.current_version().gc_stats().stale_bytes()
353    }
354
355    fn filter_size(&self) -> usize {
356        self.index.filter_size()
357    }
358
359    fn pinned_filter_size(&self) -> usize {
360        self.index.pinned_filter_size()
361    }
362
363    fn pinned_block_index_size(&self) -> usize {
364        self.index.pinned_block_index_size()
365    }
366
367    fn sealed_memtable_count(&self) -> usize {
368        self.index.sealed_memtable_count()
369    }
370
371    fn flush_memtable(
372        &self,
373        table_id: TableId,
374        memtable: &Arc<Memtable>,
375        eviction_seqno: SeqNo,
376    ) -> crate::Result<Option<(Table, Option<BlobFile>)>> {
377        use crate::{file::TABLES_FOLDER, table::Writer as TableWriter};
378
379        let table_folder = self.index.config.path.join(TABLES_FOLDER);
380
381        log::debug!("Flushing memtable & performing key-value separation");
382        log::debug!("=> to table in {}", table_folder.display());
383        log::debug!("=> to blob file at {}", self.blobs_folder.display());
384
385        let mut table_writer =
386            TableWriter::new(table_folder.join(table_id.to_string()), table_id, 0)?
387                // TODO: apply other policies
388                .use_data_block_compression(self.index.config.data_block_compression_policy.get(0))
389                .use_bloom_policy({
390                    use crate::config::FilterPolicyEntry::{Bloom, None};
391                    use crate::table::filter::BloomConstructionPolicy;
392
393                    match self.index.config.filter_policy.get(0) {
394                        Bloom(policy) => policy,
395                        None => BloomConstructionPolicy::BitsPerKey(0.0),
396                    }
397                });
398
399        let mut blob_writer = BlobFileWriter::new(
400            self.index.0.blob_file_id_generator.clone(),
401            u64::MAX, // TODO: actually use target size? but be sure to link to table correctly
402            self.index.config.path.join(BLOBS_FOLDER),
403        )?
404        .use_compression(
405            self.index
406                .config
407                .kv_separation_opts
408                .as_ref()
409                .expect("blob options should exist")
410                .compression,
411        );
412
413        let iter = memtable.iter().map(Ok);
414        let compaction_stream = CompactionStream::new(iter, eviction_seqno);
415
416        let mut blob_bytes_referenced = 0;
417        let mut blob_on_disk_bytes_referenced = 0;
418        let mut blobs_referenced_count = 0;
419
420        let separation_threshold = self
421            .index
422            .config
423            .kv_separation_opts
424            .as_ref()
425            .expect("kv separation options should exist")
426            .separation_threshold;
427
428        for item in compaction_stream {
429            let item = item?;
430
431            if item.is_tombstone() {
432                // NOTE: Still need to add tombstone to index tree
433                // But no blob to blob writer
434                table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
435                continue;
436            }
437
438            let value = item.value;
439
440            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
441            let value_size = value.len() as u32;
442
443            if value_size >= separation_threshold {
444                let offset = blob_writer.offset();
445                let blob_file_id = blob_writer.blob_file_id();
446                let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
447
448                let indirection = BlobIndirection {
449                    vhandle: ValueHandle {
450                        blob_file_id,
451                        offset,
452                        on_disk_size,
453                    },
454                    size: value_size,
455                };
456
457                table_writer.write({
458                    let mut vptr =
459                        InternalValue::new(item.key.clone(), indirection.encode_into_vec());
460                    vptr.key.value_type = crate::ValueType::Indirection;
461                    vptr
462                })?;
463
464                blob_bytes_referenced += u64::from(value_size);
465                blob_on_disk_bytes_referenced += u64::from(on_disk_size);
466                blobs_referenced_count += 1;
467            } else {
468                table_writer.write(InternalValue::new(item.key, value))?;
469            }
470        }
471
472        log::trace!("Creating blob file");
473        let blob_files = blob_writer.finish()?;
474        assert!(blob_files.len() <= 1);
475        let blob_file = blob_files.into_iter().next();
476
477        log::trace!("Creating LSM-tree table {table_id}");
478
479        if blob_bytes_referenced > 0 {
480            if let Some(blob_file) = &blob_file {
481                table_writer.link_blob_file(
482                    blob_file.id(),
483                    blobs_referenced_count,
484                    blob_bytes_referenced,
485                    blob_on_disk_bytes_referenced,
486                );
487            }
488        }
489
490        let table = self.index.consume_writer(table_writer)?;
491
492        Ok(table.map(|table| (table, blob_file)))
493    }
494
495    fn register_tables(
496        &self,
497        tables: &[Table],
498        blob_files: Option<&[BlobFile]>,
499        frag_map: Option<FragmentationMap>,
500        seqno_threshold: SeqNo,
501    ) -> crate::Result<()> {
502        self.index
503            .register_tables(tables, blob_files, frag_map, seqno_threshold)
504    }
505
506    fn set_active_memtable(&self, memtable: Memtable) {
507        self.index.set_active_memtable(memtable);
508    }
509
510    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
511        self.index.add_sealed_memtable(id, memtable);
512    }
513
514    fn compact(
515        &self,
516        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
517        seqno_threshold: SeqNo,
518    ) -> crate::Result<()> {
519        self.index.compact(strategy, seqno_threshold)
520    }
521
522    fn get_next_table_id(&self) -> TableId {
523        self.index.get_next_table_id()
524    }
525
526    fn tree_config(&self) -> &Config {
527        &self.index.config
528    }
529
530    fn get_highest_seqno(&self) -> Option<SeqNo> {
531        self.index.get_highest_seqno()
532    }
533
534    fn active_memtable_size(&self) -> u64 {
535        self.index.active_memtable_size()
536    }
537
538    fn tree_type(&self) -> crate::TreeType {
539        crate::TreeType::Blob
540    }
541
542    fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
543        self.index.rotate_memtable()
544    }
545
546    fn table_count(&self) -> usize {
547        self.index.table_count()
548    }
549
550    fn level_table_count(&self, idx: usize) -> Option<usize> {
551        self.index.level_table_count(idx)
552    }
553
554    fn approximate_len(&self) -> usize {
555        self.index.approximate_len()
556    }
557
558    // NOTE: Override the default implementation to not fetch
559    // data from the value log, so we get much faster key reads
560    fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
561        self.index.is_empty(seqno, index)
562    }
563
564    // NOTE: Override the default implementation to not fetch
565    // data from the value log, so we get much faster key reads
566    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
567        self.index.contains_key(key, seqno)
568    }
569
570    // NOTE: Override the default implementation to not fetch
571    // data from the value log, so we get much faster scans
572    fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
573        self.index.len(seqno, index)
574    }
575
576    fn disk_space(&self) -> u64 {
577        let version = self.current_version();
578        self.index.disk_space() + version.blob_files.on_disk_size()
579    }
580
581    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
582        self.index.get_highest_memtable_seqno()
583    }
584
585    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
586        self.index.get_highest_persisted_seqno()
587    }
588
589    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
590        &self,
591        key: K,
592        value: V,
593        seqno: SeqNo,
594    ) -> (u64, u64) {
595        self.index.insert(key, value.into(), seqno)
596    }
597
598    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
599        let key = key.as_ref();
600
601        // TODO: refactor memtable, sealed memtables, manifest lock to be a single lock (SuperVersion kind of)
602        // TODO: then, try to reduce the lock access to 1, because we are accessing it twice (index.get, and then vhandle resolving...)
603
604        let Some(item) = self.index.get_internal_entry(key, seqno)? else {
605            return Ok(None);
606        };
607
608        let version = self.current_version();
609        let (_, v) = resolve_value_handle(self, &version, item)?;
610
611        Ok(Some(v))
612    }
613
614    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
615        self.index.remove(key, seqno)
616    }
617
618    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
619        self.index.remove_weak(key, seqno)
620    }
621}