Skip to main content

lsm_tree/blob_tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod gc;
6pub mod handle;
7pub mod ingest;
8
9#[doc(hidden)]
10pub use gc::{FragmentationEntry, FragmentationMap};
11
12use crate::{
13    abstract_tree::{AbstractTree, RangeItem},
14    coding::Decode,
15    iter_guard::{IterGuard, IterGuardImpl},
16    table::Table,
17    tree::inner::MemtableId,
18    value::InternalValue,
19    version::Version,
20    vlog::{Accessor, BlobFile, BlobFileWriter},
21    Cache, Config, Memtable, SeqNo, TableId, TreeId, UserKey, UserValue,
22};
23use handle::BlobIndirection;
24use std::{
25    ops::RangeBounds,
26    path::{Path, PathBuf},
27    sync::{Arc, MutexGuard},
28};
29
30/// Iterator value guard
31pub struct Guard {
32    tree: crate::BlobTree,
33    version: Version,
34    kv: crate::Result<InternalValue>,
35}
36
37impl IterGuard for Guard {
38    fn into_inner_if(
39        self,
40        pred: impl Fn(&UserKey) -> bool,
41    ) -> crate::Result<(UserKey, Option<UserValue>)> {
42        let kv = self.kv?;
43
44        if pred(&kv.key.user_key) {
45            resolve_value_handle(
46                self.tree.id(),
47                self.tree.blobs_folder.as_path(),
48                &self.tree.index.config.cache,
49                &self.version,
50                kv,
51            )
52            .map(|(k, v)| (k, Some(v)))
53        } else {
54            Ok((kv.key.user_key, None))
55        }
56    }
57
58    fn key(self) -> crate::Result<UserKey> {
59        self.kv.map(|kv| kv.key.user_key)
60    }
61
62    fn size(self) -> crate::Result<u32> {
63        let kv = self.kv?;
64
65        if kv.key.value_type.is_indirection() {
66            let mut cursor = std::io::Cursor::new(kv.value);
67            Ok(BlobIndirection::decode_from(&mut cursor)?.size)
68        } else {
69            #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
70            Ok(kv.value.len() as u32)
71        }
72    }
73
74    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
75        resolve_value_handle(
76            self.tree.id(),
77            self.tree.blobs_folder.as_path(),
78            &self.tree.index.config.cache,
79            &self.version,
80            self.kv?,
81        )
82    }
83}
84
85fn resolve_value_handle(
86    tree_id: TreeId,
87    blobs_folder: &Path,
88    cache: &Cache,
89    version: &Version,
90    item: InternalValue,
91) -> RangeItem {
92    if item.key.value_type.is_indirection() {
93        let mut cursor = std::io::Cursor::new(item.value);
94        let vptr = BlobIndirection::decode_from(&mut cursor)?;
95
96        // Resolve indirection using value log
97        match Accessor::new(&version.blob_files).get(
98            tree_id,
99            blobs_folder,
100            &item.key.user_key,
101            &vptr.vhandle,
102            cache,
103        ) {
104            Ok(Some(v)) => {
105                let k = item.key.user_key;
106                Ok((k, v))
107            }
108            Ok(None) => {
109                panic!(
110                    "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
111                    item.key.user_key, vptr.vhandle,
112                    version.id(),
113                );
114            }
115            Err(e) => Err(e),
116        }
117    } else {
118        let k = item.key.user_key;
119        let v = item.value;
120        Ok((k, v))
121    }
122}
123
124/// A key-value-separated log-structured merge tree
125///
126/// This tree is a composite structure, consisting of an
127/// index tree (LSM-tree) and a log-structured value log
128/// to reduce write amplification.
129#[derive(Clone)]
130pub struct BlobTree {
131    /// Index tree that holds value handles or small inline values
132    #[doc(hidden)]
133    pub index: crate::Tree,
134
135    blobs_folder: Arc<PathBuf>,
136}
137
138impl BlobTree {
139    pub(crate) fn open(config: Config) -> crate::Result<Self> {
140        use crate::file::{fsync_directory, BLOBS_FOLDER};
141        use crate::fs::Fs;
142
143        let index = crate::Tree::open(config)?;
144
145        let blobs_folder = index.config.path.join(BLOBS_FOLDER);
146        (*index.config.fs).create_dir_all(&blobs_folder)?;
147        fsync_directory(&blobs_folder, &*index.config.fs)?;
148
149        let blob_file_id_to_continue_with = index
150            .current_version()
151            .blob_files
152            .list_ids()
153            .max()
154            .map(|x| x + 1)
155            .unwrap_or_default();
156
157        index
158            .0
159            .blob_file_id_counter
160            .set(blob_file_id_to_continue_with);
161
162        Ok(Self {
163            index,
164            blobs_folder: Arc::new(blobs_folder),
165        })
166    }
167
168    /// Resolves a single key against a pre-acquired [`SuperVersion`].
169    fn resolve_key(
170        &self,
171        super_version: &crate::version::SuperVersion,
172        key: &[u8],
173        seqno: SeqNo,
174    ) -> crate::Result<Option<UserValue>> {
175        let Some(item) = crate::Tree::get_internal_entry_from_version(
176            super_version,
177            key,
178            seqno,
179            self.index.config.comparator.as_ref(),
180        )?
181        else {
182            return Ok(None);
183        };
184
185        let (_, v) = resolve_value_handle(
186            self.id(),
187            self.blobs_folder.as_path(),
188            &self.index.config.cache,
189            &super_version.version,
190            item,
191        )?;
192
193        Ok(Some(v))
194    }
195}
196
197impl crate::abstract_tree::sealed::Sealed for BlobTree {}
198
199impl AbstractTree for BlobTree {
200    fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
201        self.index.print_trace(key)
202    }
203
204    fn table_file_cache_size(&self) -> usize {
205        self.index.table_file_cache_size()
206    }
207
208    fn get_version_history_lock(
209        &self,
210    ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
211        self.index.get_version_history_lock()
212    }
213
214    fn next_table_id(&self) -> TableId {
215        self.index.next_table_id()
216    }
217
218    fn id(&self) -> crate::TreeId {
219        self.index.id()
220    }
221
222    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
223        self.index.get_internal_entry(key, seqno)
224    }
225
226    fn current_version(&self) -> Version {
227        self.index.current_version()
228    }
229
230    #[cfg(feature = "metrics")]
231    fn metrics(&self) -> &Arc<crate::Metrics> {
232        self.index.metrics()
233    }
234
235    fn version_free_list_len(&self) -> usize {
236        self.index.version_free_list_len()
237    }
238
239    fn prefix<K: AsRef<[u8]>>(
240        &self,
241        prefix: K,
242        seqno: SeqNo,
243        index: Option<(Arc<Memtable>, SeqNo)>,
244    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
245        use crate::prefix::compute_prefix_hash;
246        use crate::range::prefix_to_range;
247
248        let prefix_bytes = prefix.as_ref();
249
250        let prefix_hash =
251            compute_prefix_hash(self.index.config.prefix_extractor.as_ref(), prefix_bytes);
252
253        let super_version = self.index.get_version_for_snapshot(seqno);
254        let tree = self.clone();
255
256        let range = prefix_to_range(prefix_bytes);
257
258        Box::new(
259            crate::Tree::create_internal_range_with_prefix_hash(
260                super_version.clone(),
261                &range,
262                seqno,
263                index,
264                None, // BlobTree does not use merge operators for prefix scans
265                self.index.config.comparator.clone(),
266                prefix_hash,
267            )
268            .map(move |kv| {
269                IterGuardImpl::Blob(Guard {
270                    tree: tree.clone(),
271                    version: super_version.version.clone(),
272                    kv,
273                })
274            }),
275        )
276    }
277
278    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
279        &self,
280        range: R,
281        seqno: SeqNo,
282        index: Option<(Arc<Memtable>, SeqNo)>,
283    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
284        let super_version = self.index.get_version_for_snapshot(seqno);
285        let tree = self.clone();
286
287        Box::new(
288            crate::Tree::create_internal_range(
289                super_version.clone(),
290                &range,
291                seqno,
292                index,
293                None,
294                self.index.config.comparator.clone(),
295            )
296            .map(move |kv| {
297                IterGuardImpl::Blob(Guard {
298                    tree: tree.clone(),
299                    version: super_version.version.clone(),
300                    kv,
301                })
302            }),
303        )
304    }
305
306    fn tombstone_count(&self) -> u64 {
307        self.index.tombstone_count()
308    }
309
310    fn weak_tombstone_count(&self) -> u64 {
311        self.index.weak_tombstone_count()
312    }
313
314    fn weak_tombstone_reclaimable_count(&self) -> u64 {
315        self.index.weak_tombstone_reclaimable_count()
316    }
317
318    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
319        self.index.drop_range(range)
320    }
321
322    fn clear(&self) -> crate::Result<()> {
323        self.index.clear()
324    }
325
326    fn major_compact(
327        &self,
328        target_size: u64,
329        seqno_threshold: SeqNo,
330    ) -> crate::Result<crate::compaction::CompactionResult> {
331        self.index.major_compact(target_size, seqno_threshold)
332    }
333
334    fn clear_active_memtable(&self) {
335        self.index.clear_active_memtable();
336    }
337
338    fn l0_run_count(&self) -> usize {
339        self.index.l0_run_count()
340    }
341
342    fn blob_file_count(&self) -> usize {
343        self.current_version().blob_file_count()
344    }
345
346    // NOTE: We skip reading from the value log
347    // because the vHandles already store the value size
348    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
349        let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
350            return Ok(None);
351        };
352
353        Ok(Some(if item.key.value_type.is_indirection() {
354            let mut cursor = std::io::Cursor::new(item.value);
355            let vptr = BlobIndirection::decode_from(&mut cursor)?;
356            vptr.size
357        } else {
358            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
359            {
360                item.value.len() as u32
361            }
362        }))
363    }
364
365    fn stale_blob_bytes(&self) -> u64 {
366        self.current_version().gc_stats().stale_bytes()
367    }
368
369    fn filter_size(&self) -> u64 {
370        self.index.filter_size()
371    }
372
373    fn pinned_filter_size(&self) -> usize {
374        self.index.pinned_filter_size()
375    }
376
377    fn pinned_block_index_size(&self) -> usize {
378        self.index.pinned_block_index_size()
379    }
380
381    fn sealed_memtable_count(&self) -> usize {
382        self.index.sealed_memtable_count()
383    }
384
385    fn get_flush_lock(&self) -> MutexGuard<'_, ()> {
386        self.index.get_flush_lock()
387    }
388
389    #[expect(clippy::too_many_lines, reason = "flush logic is inherently complex")]
390    fn flush_to_tables_with_rt(
391        &self,
392        stream: impl Iterator<Item = crate::Result<InternalValue>>,
393        range_tombstones: Vec<crate::range_tombstone::RangeTombstone>,
394    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
395        use crate::{
396            coding::Encode, file::BLOBS_FOLDER, file::TABLES_FOLDER,
397            table::multi_writer::MultiWriter,
398        };
399
400        let start = std::time::Instant::now();
401
402        let table_folder = self.index.config.path.join(TABLES_FOLDER);
403
404        let data_block_size = self.index.config.data_block_size_policy.get(0);
405
406        let data_block_restart_interval =
407            self.index.config.data_block_restart_interval_policy.get(0);
408        let index_block_restart_interval =
409            self.index.config.index_block_restart_interval_policy.get(0);
410
411        let data_block_compression = self.index.config.data_block_compression_policy.get(0);
412        let index_block_compression = self.index.config.index_block_compression_policy.get(0);
413
414        let data_block_hash_ratio = self.index.config.data_block_hash_ratio_policy.get(0);
415
416        let index_partitioning = self.index.config.index_block_partitioning_policy.get(0);
417        let filter_partitioning = self.index.config.filter_block_partitioning_policy.get(0);
418
419        log::debug!("Flushing memtable(s) and performing key-value separation, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}");
420        log::debug!("=> to table(s) in {}", table_folder.display());
421        log::debug!("=> to blob file(s) at {}", self.blobs_folder.display());
422
423        let mut table_writer = MultiWriter::new(
424            table_folder.clone(),
425            self.index.table_id_counter.clone(),
426            64 * 1_024 * 1_024,
427            0,
428            self.index.config.fs.clone(),
429        )?
430        .use_data_block_restart_interval(data_block_restart_interval)
431        .use_index_block_restart_interval(index_block_restart_interval)
432        .use_data_block_compression(data_block_compression)
433        .use_index_block_compression(index_block_compression)
434        .use_data_block_size(data_block_size)
435        .use_data_block_hash_ratio(data_block_hash_ratio)
436        .use_bloom_policy({
437            use crate::config::FilterPolicyEntry::{Bloom, None};
438            use crate::table::filter::BloomConstructionPolicy;
439
440            match self.index.config.filter_policy.get(0) {
441                Bloom(policy) => policy,
442                None => BloomConstructionPolicy::BitsPerKey(0.0),
443            }
444        });
445
446        if index_partitioning {
447            table_writer = table_writer.use_partitioned_index();
448        }
449        if filter_partitioning {
450            table_writer = table_writer.use_partitioned_filter();
451        }
452
453        table_writer =
454            table_writer.use_prefix_extractor(self.index.config.prefix_extractor.clone());
455        table_writer = table_writer.use_encryption(self.index.config.encryption.clone());
456
457        #[cfg(feature = "zstd")]
458        {
459            table_writer =
460                table_writer.use_zstd_dictionary(self.index.config.zstd_dictionary.clone());
461        }
462
463        #[expect(
464            clippy::expect_used,
465            reason = "cannot create blob tree without defining kv separation options"
466        )]
467        let kv_opts = self
468            .index
469            .config
470            .kv_separation_opts
471            .as_ref()
472            .expect("kv separation options should exist");
473
474        let mut blob_writer = BlobFileWriter::new(
475            self.index.0.blob_file_id_counter.clone(),
476            self.index.config.path.join(BLOBS_FOLDER),
477            self.id(),
478            self.index.config.descriptor_table.clone(),
479            self.index.config.fs.clone(),
480        )?
481        .use_target_size(kv_opts.file_target_size)
482        .use_compression(kv_opts.compression);
483
484        let separation_threshold = kv_opts.separation_threshold;
485
486        // Set range tombstones BEFORE writing KV items so that if MultiWriter
487        // rotates to a new table during the write loop, earlier tables already
488        // carry the RT metadata.
489        table_writer.set_range_tombstones(range_tombstones);
490
491        for item in stream {
492            let item = item?;
493
494            if item.is_tombstone() {
495                // NOTE: Still need to add tombstone to index tree
496                // But no blob to blob writer
497                table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
498                continue;
499            }
500
501            let value = item.value;
502
503            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
504            let value_size = value.len() as u32;
505
506            if value_size >= separation_threshold {
507                let vhandle = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
508
509                let indirection = BlobIndirection {
510                    vhandle,
511                    size: value_size,
512                };
513
514                table_writer.write({
515                    let mut vptr =
516                        InternalValue::new(item.key.clone(), indirection.encode_into_vec());
517                    vptr.key.value_type = crate::ValueType::Indirection;
518                    vptr
519                })?;
520
521                table_writer.register_blob(indirection);
522            } else {
523                table_writer.write(InternalValue::new(item.key, value))?;
524            }
525        }
526
527        let blob_files = blob_writer.finish()?;
528
529        let result = table_writer.finish()?;
530
531        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
532
533        let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
534        let pin_index = self.index.config.index_block_pinning_policy.get(0);
535
536        // Load tables
537        let tables = result
538            .into_iter()
539            .map(|(table_id, checksum)| -> crate::Result<Table> {
540                Table::recover(
541                    table_folder.join(table_id.to_string()),
542                    checksum,
543                    0,
544                    self.index.id,
545                    self.index.config.cache.clone(),
546                    self.index.config.descriptor_table.clone(),
547                    pin_filter,
548                    pin_index,
549                    self.index.config.encryption.clone(),
550                    #[cfg(feature = "zstd")]
551                    self.index.config.zstd_dictionary.clone(),
552                    self.index.config.comparator.clone(),
553                    #[cfg(feature = "metrics")]
554                    self.index.metrics.clone(),
555                )
556            })
557            .collect::<crate::Result<Vec<_>>>()?;
558
559        // Return Some even when tables is empty (RT-only flush): the caller
560        // (AbstractTree::flush) handles empty tables by re-inserting RTs into
561        // the active memtable and still needs to delete sealed memtables.
562        Ok(Some((tables, Some(blob_files))))
563    }
564
565    fn register_tables(
566        &self,
567        tables: &[Table],
568        blob_files: Option<&[BlobFile]>,
569        frag_map: Option<FragmentationMap>,
570        sealed_memtables_to_delete: &[MemtableId],
571        gc_watermark: SeqNo,
572    ) -> crate::Result<()> {
573        self.index.register_tables(
574            tables,
575            blob_files,
576            frag_map,
577            sealed_memtables_to_delete,
578            gc_watermark,
579        )
580    }
581
582    fn compact(
583        &self,
584        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
585        seqno_threshold: SeqNo,
586    ) -> crate::Result<crate::compaction::CompactionResult> {
587        self.index.compact(strategy, seqno_threshold)
588    }
589
590    fn get_next_table_id(&self) -> TableId {
591        self.index.get_next_table_id()
592    }
593
594    fn tree_config(&self) -> &Config {
595        &self.index.config
596    }
597
598    fn get_highest_seqno(&self) -> Option<SeqNo> {
599        self.index.get_highest_seqno()
600    }
601
602    fn active_memtable(&self) -> Arc<Memtable> {
603        self.index.active_memtable()
604    }
605
606    fn tree_type(&self) -> crate::TreeType {
607        crate::TreeType::Blob
608    }
609
610    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
611        self.index.rotate_memtable()
612    }
613
614    fn table_count(&self) -> usize {
615        self.index.table_count()
616    }
617
618    fn level_table_count(&self, idx: usize) -> Option<usize> {
619        self.index.level_table_count(idx)
620    }
621
622    fn approximate_len(&self) -> usize {
623        self.index.approximate_len()
624    }
625
626    // NOTE: Override the default implementation to not fetch
627    // data from the value log, so we get much faster key reads
628    fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
629        self.index.is_empty(seqno, index)
630    }
631
632    // NOTE: Override the default implementation to not fetch
633    // data from the value log, so we get much faster key reads
634    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
635        self.index.contains_key(key, seqno)
636    }
637
638    // NOTE: Override the default implementation to delegate directly
639    // to the index tree, avoiding extra iterator/guard overhead for
640    // prefix checks
641    fn contains_prefix<K: AsRef<[u8]>>(
642        &self,
643        prefix: K,
644        seqno: SeqNo,
645        index: Option<(Arc<Memtable>, SeqNo)>,
646    ) -> crate::Result<bool> {
647        self.index.contains_prefix(prefix, seqno, index)
648    }
649
650    // NOTE: Override the default implementation to not fetch
651    // data from the value log, so we get much faster scans
652    fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
653        self.index.len(seqno, index)
654    }
655
656    fn disk_space(&self) -> u64 {
657        let version = self.current_version();
658        self.index.disk_space() + version.blob_files.on_disk_size()
659    }
660
661    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
662        self.index.get_highest_memtable_seqno()
663    }
664
665    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
666        self.index.get_highest_persisted_seqno()
667    }
668
669    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
670        &self,
671        key: K,
672        value: V,
673        seqno: SeqNo,
674    ) -> (u64, u64) {
675        self.index.insert(key, value.into(), seqno)
676    }
677
678    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
679        let super_version = self.index.get_version_for_snapshot(seqno);
680        self.resolve_key(&super_version, key.as_ref(), seqno)
681    }
682
683    fn multi_get<K: AsRef<[u8]>>(
684        &self,
685        keys: impl IntoIterator<Item = K>,
686        seqno: SeqNo,
687    ) -> crate::Result<Vec<Option<crate::UserValue>>> {
688        let super_version = self.index.get_version_for_snapshot(seqno);
689
690        keys.into_iter()
691            .map(|key| self.resolve_key(&super_version, key.as_ref(), seqno))
692            .collect()
693    }
694
695    fn merge<K: Into<UserKey>, V: Into<UserValue>>(
696        &self,
697        key: K,
698        operand: V,
699        seqno: SeqNo,
700    ) -> (u64, u64) {
701        self.index.merge(key, operand, seqno)
702    }
703
704    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
705        self.index.remove(key, seqno)
706    }
707
708    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
709        self.index.remove_weak(key, seqno)
710    }
711
712    fn remove_range<K: Into<UserKey>>(&self, start: K, end: K, seqno: SeqNo) -> u64 {
713        self.index.remove_range(start, end, seqno)
714    }
715}