lsm_tree/blob_tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod gc;
6pub mod handle;
7
8#[doc(hidden)]
9pub use gc::{FragmentationEntry, FragmentationMap};
10
11use crate::{
12    coding::Decode,
13    file::{fsync_directory, BLOBS_FOLDER},
14    iter_guard::{IterGuard, IterGuardImpl},
15    r#abstract::{AbstractTree, RangeItem},
16    table::Table,
17    tree::inner::MemtableId,
18    value::InternalValue,
19    version::Version,
20    vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
21    Cache, Config, DescriptorTable, Memtable, SeqNo, SequenceNumberCounter, TableId, TreeId,
22    UserKey, UserValue,
23};
24use handle::BlobIndirection;
25use std::{
26    io::Cursor,
27    ops::RangeBounds,
28    path::PathBuf,
29    sync::{Arc, MutexGuard},
30};
31
32/// Iterator value guard
33pub struct Guard {
34    tree: crate::BlobTree,
35    version: Version,
36    kv: crate::Result<InternalValue>,
37}
38
39impl IterGuard for Guard {
40    fn into_inner_if(
41        self,
42        pred: impl Fn(&UserKey) -> bool,
43    ) -> crate::Result<(UserKey, Option<UserValue>)> {
44        let kv = self.kv?;
45
46        if pred(&kv.key.user_key) {
47            resolve_value_handle(
48                self.tree.id(),
49                self.tree.blobs_folder.as_path(),
50                &self.tree.index.config.cache,
51                &self.tree.index.config.descriptor_table,
52                &self.version,
53                kv,
54            )
55            .map(|(k, v)| (k, Some(v)))
56        } else {
57            Ok((kv.key.user_key, None))
58        }
59    }
60
61    fn key(self) -> crate::Result<UserKey> {
62        self.kv.map(|kv| kv.key.user_key)
63    }
64
65    fn size(self) -> crate::Result<u32> {
66        let kv = self.kv?;
67
68        if kv.key.value_type.is_indirection() {
69            let mut cursor = Cursor::new(kv.value);
70            Ok(BlobIndirection::decode_from(&mut cursor)?.size)
71        } else {
72            #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
73            Ok(kv.value.len() as u32)
74        }
75    }
76
77    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
78        resolve_value_handle(
79            self.tree.id(),
80            self.tree.blobs_folder.as_path(),
81            &self.tree.index.config.cache,
82            &self.tree.index.config.descriptor_table,
83            &self.version,
84            self.kv?,
85        )
86    }
87}
88
89fn resolve_value_handle(
90    tree_id: TreeId,
91    blobs_folder: &std::path::Path,
92    cache: &Arc<Cache>,
93    descriptor_table: &Arc<DescriptorTable>,
94    version: &Version,
95    item: InternalValue,
96) -> RangeItem {
97    if item.key.value_type.is_indirection() {
98        let mut cursor = Cursor::new(item.value);
99        let vptr = BlobIndirection::decode_from(&mut cursor)?;
100
101        // Resolve indirection using value log
102        match Accessor::new(&version.blob_files).get(
103            tree_id,
104            blobs_folder,
105            &item.key.user_key,
106            &vptr.vhandle,
107            cache,
108            descriptor_table,
109        ) {
110            Ok(Some(v)) => {
111                let k = item.key.user_key;
112                Ok((k, v))
113            }
114            Ok(None) => {
115                panic!(
116                    "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
117                    item.key.user_key, vptr.vhandle,
118                    version.id(),
119                );
120            }
121            Err(e) => Err(e),
122        }
123    } else {
124        let k = item.key.user_key;
125        let v = item.value;
126        Ok((k, v))
127    }
128}
129
130/// A key-value-separated log-structured merge tree
131///
132/// This tree is a composite structure, consisting of an
133/// index tree (LSM-tree) and a log-structured value log
134/// to reduce write amplification.
135#[derive(Clone)]
136pub struct BlobTree {
137    /// Index tree that holds value handles or small inline values
138    #[doc(hidden)]
139    pub index: crate::Tree,
140
141    blobs_folder: Arc<PathBuf>,
142}
143
144impl BlobTree {
145    pub(crate) fn open(config: Config) -> crate::Result<Self> {
146        let index = crate::Tree::open(config)?;
147
148        let blobs_folder = index.config.path.join(BLOBS_FOLDER);
149        std::fs::create_dir_all(&blobs_folder)?;
150        fsync_directory(&blobs_folder)?;
151
152        let blob_file_id_to_continue_with = index
153            .current_version()
154            .blob_files
155            .list_ids()
156            .max()
157            .map(|x| x + 1)
158            .unwrap_or_default();
159
160        index
161            .0
162            .blob_file_id_counter
163            .set(blob_file_id_to_continue_with);
164
165        Ok(Self {
166            index,
167            blobs_folder: Arc::new(blobs_folder),
168        })
169    }
170}
171
172impl AbstractTree for BlobTree {
173    fn table_file_cache_size(&self) -> usize {
174        self.index.table_file_cache_size()
175    }
176
177    fn get_version_history_lock(
178        &self,
179    ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
180        self.index.get_version_history_lock()
181    }
182
183    fn next_table_id(&self) -> TableId {
184        self.index.next_table_id()
185    }
186
187    fn id(&self) -> crate::TreeId {
188        self.index.id()
189    }
190
191    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
192        self.index.get_internal_entry(key, seqno)
193    }
194
195    fn current_version(&self) -> Version {
196        self.index.current_version()
197    }
198
199    #[cfg(feature = "metrics")]
200    fn metrics(&self) -> &Arc<crate::Metrics> {
201        self.index.metrics()
202    }
203
204    fn version_free_list_len(&self) -> usize {
205        self.index.version_free_list_len()
206    }
207
208    fn prefix<K: AsRef<[u8]>>(
209        &self,
210        prefix: K,
211        seqno: SeqNo,
212        index: Option<(Arc<Memtable>, SeqNo)>,
213    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
214        use crate::range::prefix_to_range;
215
216        let super_version = self.index.get_version_for_snapshot(seqno);
217        let tree = self.clone();
218
219        let range = prefix_to_range(prefix.as_ref());
220
221        Box::new(
222            crate::Tree::create_internal_range(super_version.clone(), &range, seqno, index).map(
223                move |kv| {
224                    IterGuardImpl::Blob(Guard {
225                        tree: tree.clone(),
226                        version: super_version.version.clone(),
227                        kv,
228                    })
229                },
230            ),
231        )
232    }
233
234    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
235        &self,
236        range: R,
237        seqno: SeqNo,
238        index: Option<(Arc<Memtable>, SeqNo)>,
239    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
240        let super_version = self.index.get_version_for_snapshot(seqno);
241        let tree = self.clone();
242
243        Box::new(
244            crate::Tree::create_internal_range(super_version.clone(), &range, seqno, index).map(
245                move |kv| {
246                    IterGuardImpl::Blob(Guard {
247                        tree: tree.clone(),
248                        version: super_version.version.clone(),
249                        kv,
250                    })
251                },
252            ),
253        )
254    }
255
256    fn tombstone_count(&self) -> u64 {
257        self.index.tombstone_count()
258    }
259
260    fn weak_tombstone_count(&self) -> u64 {
261        self.index.weak_tombstone_count()
262    }
263
264    fn weak_tombstone_reclaimable_count(&self) -> u64 {
265        self.index.weak_tombstone_reclaimable_count()
266    }
267
268    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
269        self.index.drop_range(range)
270    }
271
272    fn ingest(
273        &self,
274        iter: impl Iterator<Item = (UserKey, UserValue)>,
275        seqno_generator: &SequenceNumberCounter,
276        visible_seqno: &SequenceNumberCounter,
277    ) -> crate::Result<()> {
278        use crate::tree::ingest::Ingestion;
279        use std::time::Instant;
280
281        let seqno = seqno_generator.next();
282
283        let blob_file_size = self
284            .index
285            .config
286            .kv_separation_opts
287            .as_ref()
288            .expect("kv separation options should exist")
289            .file_target_size;
290
291        let mut table_writer = Ingestion::new(&self.index)?.with_seqno(seqno);
292        let mut blob_writer = BlobFileWriter::new(
293            self.index.0.blob_file_id_counter.clone(),
294            blob_file_size,
295            self.index.config.path.join(BLOBS_FOLDER),
296        )?
297        .use_compression(
298            self.index
299                .config
300                .kv_separation_opts
301                .as_ref()
302                .expect("blob options should exist")
303                .compression,
304        );
305
306        let start = Instant::now();
307        let mut count = 0;
308        let mut last_key = None;
309
310        let separation_threshold = self
311            .index
312            .config
313            .kv_separation_opts
314            .as_ref()
315            .expect("kv separation options should exist")
316            .separation_threshold;
317
318        for (key, value) in iter {
319            if let Some(last_key) = &last_key {
320                assert!(
321                    key > last_key,
322                    "next key in bulk ingest was not greater than last key",
323                );
324            }
325            last_key = Some(key.clone());
326
327            #[expect(clippy::cast_possible_truncation, reason = "values are 32-bit max")]
328            let value_size = value.len() as u32;
329
330            if value_size >= separation_threshold {
331                let offset = blob_writer.offset();
332                let blob_file_id = blob_writer.blob_file_id();
333                let on_disk_size = blob_writer.write(&key, seqno, &value)?;
334
335                let indirection = BlobIndirection {
336                    vhandle: ValueHandle {
337                        blob_file_id,
338                        offset,
339                        on_disk_size,
340                    },
341                    size: value_size,
342                };
343
344                table_writer.write_indirection(key, indirection)?;
345            } else {
346                table_writer.write(key, value)?;
347            }
348
349            count += 1;
350        }
351
352        let blob_files = blob_writer.finish()?;
353        let results = table_writer.writer.finish()?;
354
355        let created_tables = results
356            .into_iter()
357            .map(|(table_id, checksum)| -> crate::Result<Table> {
358                Table::recover(
359                    self.index
360                        .config
361                        .path
362                        .join(crate::file::TABLES_FOLDER)
363                        .join(table_id.to_string()),
364                    checksum,
365                    self.index.id,
366                    self.index.config.cache.clone(),
367                    self.index.config.descriptor_table.clone(),
368                    false,
369                    false,
370                    #[cfg(feature = "metrics")]
371                    self.index.metrics.clone(),
372                )
373            })
374            .collect::<crate::Result<Vec<_>>>()?;
375
376        self.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?;
377
378        visible_seqno.fetch_max(seqno + 1);
379
380        log::info!("Ingested {count} items in {:?}", start.elapsed());
381
382        Ok(())
383    }
384
385    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
386        self.index.major_compact(target_size, seqno_threshold)
387    }
388
389    fn clear_active_memtable(&self) {
390        self.index.clear_active_memtable();
391    }
392
393    fn l0_run_count(&self) -> usize {
394        self.index.l0_run_count()
395    }
396
397    fn blob_file_count(&self) -> usize {
398        self.current_version().blob_file_count()
399    }
400
401    // NOTE: We skip reading from the value log
402    // because the vHandles already store the value size
403    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
404        let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
405            return Ok(None);
406        };
407
408        Ok(Some(if item.key.value_type.is_indirection() {
409            let mut cursor = Cursor::new(item.value);
410            let vptr = BlobIndirection::decode_from(&mut cursor)?;
411            vptr.size
412        } else {
413            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
414            {
415                item.value.len() as u32
416            }
417        }))
418    }
419
420    fn stale_blob_bytes(&self) -> u64 {
421        self.current_version().gc_stats().stale_bytes()
422    }
423
424    fn filter_size(&self) -> usize {
425        self.index.filter_size()
426    }
427
428    fn pinned_filter_size(&self) -> usize {
429        self.index.pinned_filter_size()
430    }
431
432    fn pinned_block_index_size(&self) -> usize {
433        self.index.pinned_block_index_size()
434    }
435
436    fn sealed_memtable_count(&self) -> usize {
437        self.index.sealed_memtable_count()
438    }
439
440    fn get_flush_lock(&self) -> MutexGuard<'_, ()> {
441        self.index.get_flush_lock()
442    }
443
444    fn flush_to_tables(
445        &self,
446        stream: impl Iterator<Item = crate::Result<InternalValue>>,
447    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
448        use crate::{coding::Encode, file::TABLES_FOLDER, table::multi_writer::MultiWriter};
449
450        let start = std::time::Instant::now();
451
452        let table_folder = self.index.config.path.join(TABLES_FOLDER);
453
454        let data_block_size = self.index.config.data_block_size_policy.get(0);
455
456        let data_block_restart_interval =
457            self.index.config.data_block_restart_interval_policy.get(0);
458        let index_block_restart_interval =
459            self.index.config.index_block_restart_interval_policy.get(0);
460
461        let data_block_compression = self.index.config.data_block_compression_policy.get(0);
462        let index_block_compression = self.index.config.index_block_compression_policy.get(0);
463
464        let data_block_hash_ratio = self.index.config.data_block_hash_ratio_policy.get(0);
465
466        let index_partitioning = self.index.config.index_block_partitioning_policy.get(0);
467        let filter_partitioning = self.index.config.filter_block_partitioning_policy.get(0);
468
469        log::debug!("Flushing memtable(s) and performing key-value separation, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}");
470        log::debug!("=> to table(s) in {}", table_folder.display());
471        log::debug!("=> to blob file(s) at {}", self.blobs_folder.display());
472
473        let mut table_writer = MultiWriter::new(
474            table_folder.clone(),
475            self.index.table_id_counter.clone(),
476            64 * 1_024 * 1_024,
477            0,
478        )?
479        .use_data_block_restart_interval(data_block_restart_interval)
480        .use_index_block_restart_interval(index_block_restart_interval)
481        .use_data_block_compression(data_block_compression)
482        .use_index_block_compression(index_block_compression)
483        .use_data_block_size(data_block_size)
484        .use_data_block_hash_ratio(data_block_hash_ratio)
485        .use_bloom_policy({
486            use crate::config::FilterPolicyEntry::{Bloom, None};
487            use crate::table::filter::BloomConstructionPolicy;
488
489            match self.index.config.filter_policy.get(0) {
490                Bloom(policy) => policy,
491                None => BloomConstructionPolicy::BitsPerKey(0.0),
492            }
493        });
494
495        if index_partitioning {
496            table_writer = table_writer.use_partitioned_index();
497        }
498        if filter_partitioning {
499            table_writer = table_writer.use_partitioned_filter();
500        }
501
502        let kv_opts = self
503            .index
504            .config
505            .kv_separation_opts
506            .as_ref()
507            .expect("kv separation options should exist");
508
509        let mut blob_writer = BlobFileWriter::new(
510            self.index.0.blob_file_id_counter.clone(),
511            kv_opts.file_target_size,
512            self.index.config.path.join(BLOBS_FOLDER),
513        )?
514        .use_compression(
515            self.index
516                .config
517                .kv_separation_opts
518                .as_ref()
519                .expect("blob options should exist")
520                .compression,
521        );
522
523        let separation_threshold = kv_opts.separation_threshold;
524
525        for item in stream {
526            let item = item?;
527
528            if item.is_tombstone() {
529                // NOTE: Still need to add tombstone to index tree
530                // But no blob to blob writer
531                table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
532                continue;
533            }
534
535            let value = item.value;
536
537            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
538            let value_size = value.len() as u32;
539
540            if value_size >= separation_threshold {
541                let offset = blob_writer.offset();
542                let blob_file_id = blob_writer.blob_file_id();
543                let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
544
545                let indirection = BlobIndirection {
546                    vhandle: ValueHandle {
547                        blob_file_id,
548                        offset,
549                        on_disk_size,
550                    },
551                    size: value_size,
552                };
553
554                table_writer.write({
555                    let mut vptr =
556                        InternalValue::new(item.key.clone(), indirection.encode_into_vec());
557                    vptr.key.value_type = crate::ValueType::Indirection;
558                    vptr
559                })?;
560
561                table_writer.register_blob(indirection);
562            } else {
563                table_writer.write(InternalValue::new(item.key, value))?;
564            }
565        }
566
567        let blob_files = blob_writer.finish()?;
568
569        let result = table_writer.finish()?;
570
571        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
572
573        let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
574        let pin_index = self.index.config.index_block_pinning_policy.get(0);
575
576        // Load tables
577        let tables = result
578            .into_iter()
579            .map(|(table_id, checksum)| -> crate::Result<Table> {
580                Table::recover(
581                    table_folder.join(table_id.to_string()),
582                    checksum,
583                    self.index.id,
584                    self.index.config.cache.clone(),
585                    self.index.config.descriptor_table.clone(),
586                    pin_filter,
587                    pin_index,
588                    #[cfg(feature = "metrics")]
589                    self.index.metrics.clone(),
590                )
591            })
592            .collect::<crate::Result<Vec<_>>>()?;
593
594        Ok(Some((tables, Some(blob_files))))
595    }
596
597    fn register_tables(
598        &self,
599        tables: &[Table],
600        blob_files: Option<&[BlobFile]>,
601        frag_map: Option<FragmentationMap>,
602        sealed_memtables_to_delete: &[MemtableId],
603        gc_watermark: SeqNo,
604    ) -> crate::Result<()> {
605        self.index.register_tables(
606            tables,
607            blob_files,
608            frag_map,
609            sealed_memtables_to_delete,
610            gc_watermark,
611        )
612    }
613
614    fn set_active_memtable(&self, memtable: Memtable) {
615        self.index.set_active_memtable(memtable);
616    }
617
618    fn add_sealed_memtable(&self, memtable: Arc<Memtable>) {
619        self.index.add_sealed_memtable(memtable);
620    }
621
622    fn compact(
623        &self,
624        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
625        seqno_threshold: SeqNo,
626    ) -> crate::Result<()> {
627        self.index.compact(strategy, seqno_threshold)
628    }
629
630    fn get_next_table_id(&self) -> TableId {
631        self.index.get_next_table_id()
632    }
633
634    fn tree_config(&self) -> &Config {
635        &self.index.config
636    }
637
638    fn get_highest_seqno(&self) -> Option<SeqNo> {
639        self.index.get_highest_seqno()
640    }
641
642    fn active_memtable_size(&self) -> u64 {
643        self.index.active_memtable_size()
644    }
645
646    fn tree_type(&self) -> crate::TreeType {
647        crate::TreeType::Blob
648    }
649
650    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
651        self.index.rotate_memtable()
652    }
653
654    fn table_count(&self) -> usize {
655        self.index.table_count()
656    }
657
658    fn level_table_count(&self, idx: usize) -> Option<usize> {
659        self.index.level_table_count(idx)
660    }
661
662    fn approximate_len(&self) -> usize {
663        self.index.approximate_len()
664    }
665
666    // NOTE: Override the default implementation to not fetch
667    // data from the value log, so we get much faster key reads
668    fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
669        self.index.is_empty(seqno, index)
670    }
671
672    // NOTE: Override the default implementation to not fetch
673    // data from the value log, so we get much faster key reads
674    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
675        self.index.contains_key(key, seqno)
676    }
677
678    // NOTE: Override the default implementation to not fetch
679    // data from the value log, so we get much faster scans
680    fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
681        self.index.len(seqno, index)
682    }
683
684    fn disk_space(&self) -> u64 {
685        let version = self.current_version();
686        self.index.disk_space() + version.blob_files.on_disk_size()
687    }
688
689    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
690        self.index.get_highest_memtable_seqno()
691    }
692
693    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
694        self.index.get_highest_persisted_seqno()
695    }
696
697    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
698        &self,
699        key: K,
700        value: V,
701        seqno: SeqNo,
702    ) -> (u64, u64) {
703        self.index.insert(key, value.into(), seqno)
704    }
705
706    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
707        let key = key.as_ref();
708
709        let super_version = self
710            .index
711            .version_history
712            .read()
713            .expect("lock is poisoned")
714            .get_version_for_snapshot(seqno);
715
716        let Some(item) = crate::Tree::get_internal_entry_from_version(&super_version, key, seqno)?
717        else {
718            return Ok(None);
719        };
720
721        let (_, v) = resolve_value_handle(
722            self.id(),
723            self.blobs_folder.as_path(),
724            &self.index.config.cache,
725            &self.index.config.descriptor_table,
726            &super_version.version,
727            item,
728        )?;
729
730        Ok(Some(v))
731    }
732
733    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
734        self.index.remove(key, seqno)
735    }
736
737    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
738        self.index.remove_weak(key, seqno)
739    }
740}