lsm_tree/blob_tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod gc;
6pub mod handle;
7
8#[doc(hidden)]
9pub use gc::{FragmentationEntry, FragmentationMap};
10
11use crate::{
12    coding::Decode,
13    file::{fsync_directory, BLOBS_FOLDER},
14    iter_guard::{IterGuard, IterGuardImpl},
15    r#abstract::{AbstractTree, RangeItem},
16    table::Table,
17    tree::inner::MemtableId,
18    value::InternalValue,
19    version::Version,
20    vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
21    Cache, Config, DescriptorTable, Memtable, SeqNo, SequenceNumberCounter, TableId, TreeId,
22    UserKey, UserValue,
23};
24use handle::BlobIndirection;
25use std::{
26    io::Cursor,
27    ops::RangeBounds,
28    path::PathBuf,
29    sync::{Arc, MutexGuard},
30};
31
32pub struct Guard {
33    tree: crate::BlobTree,
34    version: Version,
35    kv: crate::Result<InternalValue>,
36}
37
38impl IterGuard for Guard {
39    fn into_inner_if(
40        self,
41        pred: impl Fn(&UserKey) -> bool,
42    ) -> crate::Result<Option<crate::KvPair>> {
43        let kv = self.kv?;
44
45        if pred(&kv.key.user_key) {
46            resolve_value_handle(
47                self.tree.id(),
48                self.tree.blobs_folder.as_path(),
49                &self.tree.index.config.cache,
50                &self.tree.index.config.descriptor_table,
51                &self.version,
52                kv,
53            )
54            .map(Some)
55        } else {
56            Ok(None)
57        }
58    }
59
60    fn key(self) -> crate::Result<UserKey> {
61        self.kv.map(|kv| kv.key.user_key)
62    }
63
64    fn size(self) -> crate::Result<u32> {
65        let kv = self.kv?;
66
67        if kv.key.value_type.is_indirection() {
68            let mut cursor = Cursor::new(kv.value);
69            Ok(BlobIndirection::decode_from(&mut cursor)?.size)
70        } else {
71            #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
72            Ok(kv.value.len() as u32)
73        }
74    }
75
76    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
77        resolve_value_handle(
78            self.tree.id(),
79            self.tree.blobs_folder.as_path(),
80            &self.tree.index.config.cache,
81            &self.tree.index.config.descriptor_table,
82            &self.version,
83            self.kv?,
84        )
85    }
86}
87
88fn resolve_value_handle(
89    tree_id: TreeId,
90    blobs_folder: &std::path::Path,
91    cache: &Arc<Cache>,
92    descriptor_table: &Arc<DescriptorTable>,
93    version: &Version,
94    item: InternalValue,
95) -> RangeItem {
96    if item.key.value_type.is_indirection() {
97        let mut cursor = Cursor::new(item.value);
98        let vptr = BlobIndirection::decode_from(&mut cursor)?;
99
100        // Resolve indirection using value log
101        match Accessor::new(&version.blob_files).get(
102            tree_id,
103            blobs_folder,
104            &item.key.user_key,
105            &vptr.vhandle,
106            cache,
107            descriptor_table,
108        ) {
109            Ok(Some(v)) => {
110                let k = item.key.user_key;
111                Ok((k, v))
112            }
113            Ok(None) => {
114                panic!(
115                    "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
116                    item.key.user_key, vptr.vhandle,
117                    version.id(),
118                );
119            }
120            Err(e) => Err(e),
121        }
122    } else {
123        let k = item.key.user_key;
124        let v = item.value;
125        Ok((k, v))
126    }
127}
128
129/// A key-value-separated log-structured merge tree
130///
131/// This tree is a composite structure, consisting of an
132/// index tree (LSM-tree) and a log-structured value log
133/// to reduce write amplification.
134#[derive(Clone)]
135pub struct BlobTree {
136    /// Index tree that holds value handles or small inline values
137    #[doc(hidden)]
138    pub index: crate::Tree,
139
140    blobs_folder: Arc<PathBuf>,
141}
142
143impl BlobTree {
144    pub(crate) fn open(config: Config) -> crate::Result<Self> {
145        let index = crate::Tree::open(config)?;
146
147        let blobs_folder = index.config.path.join(BLOBS_FOLDER);
148        std::fs::create_dir_all(&blobs_folder)?;
149        fsync_directory(&blobs_folder)?;
150
151        let blob_file_id_to_continue_with = index
152            .current_version()
153            .blob_files
154            .list_ids()
155            .max()
156            .map(|x| x + 1)
157            .unwrap_or_default();
158
159        index
160            .0
161            .blob_file_id_counter
162            .set(blob_file_id_to_continue_with);
163
164        Ok(Self {
165            index,
166            blobs_folder: Arc::new(blobs_folder),
167        })
168    }
169}
170
171impl AbstractTree for BlobTree {
172    fn get_version_history_lock(
173        &self,
174    ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
175        self.index.get_version_history_lock()
176    }
177
178    fn next_table_id(&self) -> TableId {
179        self.index.next_table_id()
180    }
181
182    fn id(&self) -> crate::TreeId {
183        self.index.id()
184    }
185
186    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
187        self.index.get_internal_entry(key, seqno)
188    }
189
190    fn current_version(&self) -> Version {
191        self.index.current_version()
192    }
193
194    #[cfg(feature = "metrics")]
195    fn metrics(&self) -> &Arc<crate::Metrics> {
196        self.index.metrics()
197    }
198
199    fn version_free_list_len(&self) -> usize {
200        self.index.version_free_list_len()
201    }
202
203    fn prefix<K: AsRef<[u8]>>(
204        &self,
205        prefix: K,
206        seqno: SeqNo,
207        index: Option<Arc<Memtable>>,
208    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
209        use crate::range::prefix_to_range;
210
211        let range = prefix_to_range(prefix.as_ref());
212        let version = self.index.get_version_for_snapshot(seqno).version;
213        let tree = self.clone();
214
215        Box::new(
216            self.index
217                .create_internal_range(&range, seqno, index)
218                .map(move |kv| {
219                    IterGuardImpl::Blob(Guard {
220                        tree: tree.clone(),
221                        version: version.clone(),
222                        kv,
223                    })
224                }),
225        )
226    }
227
228    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
229        &self,
230        range: R,
231        seqno: SeqNo,
232        index: Option<Arc<Memtable>>,
233    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
234        let version = self.index.get_version_for_snapshot(seqno);
235        let tree = self.clone();
236
237        Box::new(
238            self.index
239                .create_internal_range(&range, seqno, index)
240                .map(move |kv| {
241                    IterGuardImpl::Blob(Guard {
242                        tree: tree.clone(),
243                        version: version.clone().version,
244                        kv,
245                    })
246                }),
247        )
248    }
249
250    fn tombstone_count(&self) -> u64 {
251        self.index.tombstone_count()
252    }
253
254    fn weak_tombstone_count(&self) -> u64 {
255        self.index.weak_tombstone_count()
256    }
257
258    fn weak_tombstone_reclaimable_count(&self) -> u64 {
259        self.index.weak_tombstone_reclaimable_count()
260    }
261
262    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
263        self.index.drop_range(range)
264    }
265
266    fn ingest(
267        &self,
268        iter: impl Iterator<Item = (UserKey, UserValue)>,
269        seqno_generator: &SequenceNumberCounter,
270        visible_seqno: &SequenceNumberCounter,
271    ) -> crate::Result<()> {
272        use crate::tree::ingest::Ingestion;
273        use std::time::Instant;
274
275        let seqno = seqno_generator.next();
276
277        let blob_file_size = self
278            .index
279            .config
280            .kv_separation_opts
281            .as_ref()
282            .expect("kv separation options should exist")
283            .file_target_size;
284
285        let mut table_writer = Ingestion::new(&self.index)?.with_seqno(seqno);
286        let mut blob_writer = BlobFileWriter::new(
287            self.index.0.blob_file_id_counter.clone(),
288            blob_file_size,
289            self.index.config.path.join(BLOBS_FOLDER),
290        )?
291        .use_compression(
292            self.index
293                .config
294                .kv_separation_opts
295                .as_ref()
296                .expect("blob options should exist")
297                .compression,
298        );
299
300        let start = Instant::now();
301        let mut count = 0;
302        let mut last_key = None;
303
304        let separation_threshold = self
305            .index
306            .config
307            .kv_separation_opts
308            .as_ref()
309            .expect("kv separation options should exist")
310            .separation_threshold;
311
312        for (key, value) in iter {
313            if let Some(last_key) = &last_key {
314                assert!(
315                    key > last_key,
316                    "next key in bulk ingest was not greater than last key",
317                );
318            }
319            last_key = Some(key.clone());
320
321            #[expect(clippy::cast_possible_truncation, reason = "values are 32-bit max")]
322            let value_size = value.len() as u32;
323
324            if value_size >= separation_threshold {
325                let offset = blob_writer.offset();
326                let blob_file_id = blob_writer.blob_file_id();
327                let on_disk_size = blob_writer.write(&key, seqno, &value)?;
328
329                let indirection = BlobIndirection {
330                    vhandle: ValueHandle {
331                        blob_file_id,
332                        offset,
333                        on_disk_size,
334                    },
335                    size: value_size,
336                };
337
338                table_writer.write_indirection(key, indirection)?;
339            } else {
340                table_writer.write(key, value)?;
341            }
342
343            count += 1;
344        }
345
346        let blob_files = blob_writer.finish()?;
347        let results = table_writer.writer.finish()?;
348
349        let created_tables = results
350            .into_iter()
351            .map(|(table_id, checksum)| -> crate::Result<Table> {
352                Table::recover(
353                    self.index
354                        .config
355                        .path
356                        .join(crate::file::TABLES_FOLDER)
357                        .join(table_id.to_string()),
358                    checksum,
359                    self.index.id,
360                    self.index.config.cache.clone(),
361                    self.index.config.descriptor_table.clone(),
362                    false,
363                    false,
364                    #[cfg(feature = "metrics")]
365                    self.index.metrics.clone(),
366                )
367            })
368            .collect::<crate::Result<Vec<_>>>()?;
369
370        self.register_tables(&created_tables, Some(&blob_files), None, &[], 0)?;
371
372        visible_seqno.fetch_max(seqno + 1);
373
374        log::info!("Ingested {count} items in {:?}", start.elapsed());
375
376        Ok(())
377    }
378
379    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
380        self.index.major_compact(target_size, seqno_threshold)
381    }
382
383    fn clear_active_memtable(&self) {
384        self.index.clear_active_memtable();
385    }
386
387    fn l0_run_count(&self) -> usize {
388        self.index.l0_run_count()
389    }
390
391    fn blob_file_count(&self) -> usize {
392        self.current_version().blob_file_count()
393    }
394
395    // NOTE: We skip reading from the value log
396    // because the vHandles already store the value size
397    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
398        let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
399            return Ok(None);
400        };
401
402        Ok(Some(if item.key.value_type.is_indirection() {
403            let mut cursor = Cursor::new(item.value);
404            let vptr = BlobIndirection::decode_from(&mut cursor)?;
405            vptr.size
406        } else {
407            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
408            {
409                item.value.len() as u32
410            }
411        }))
412    }
413
414    fn stale_blob_bytes(&self) -> u64 {
415        self.current_version().gc_stats().stale_bytes()
416    }
417
418    fn filter_size(&self) -> usize {
419        self.index.filter_size()
420    }
421
422    fn pinned_filter_size(&self) -> usize {
423        self.index.pinned_filter_size()
424    }
425
426    fn pinned_block_index_size(&self) -> usize {
427        self.index.pinned_block_index_size()
428    }
429
430    fn sealed_memtable_count(&self) -> usize {
431        self.index.sealed_memtable_count()
432    }
433
434    fn get_flush_lock(&self) -> MutexGuard<'_, ()> {
435        self.index.get_flush_lock()
436    }
437
438    fn flush_to_tables(
439        &self,
440        stream: impl Iterator<Item = crate::Result<InternalValue>>,
441    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
442        use crate::{coding::Encode, file::TABLES_FOLDER, table::multi_writer::MultiWriter};
443
444        let start = std::time::Instant::now();
445
446        let table_folder = self.index.config.path.join(TABLES_FOLDER);
447
448        let data_block_size = self.index.config.data_block_size_policy.get(0);
449
450        let data_block_restart_interval =
451            self.index.config.data_block_restart_interval_policy.get(0);
452        let index_block_restart_interval =
453            self.index.config.index_block_restart_interval_policy.get(0);
454
455        let data_block_compression = self.index.config.data_block_compression_policy.get(0);
456        let index_block_compression = self.index.config.index_block_compression_policy.get(0);
457
458        let data_block_hash_ratio = self.index.config.data_block_hash_ratio_policy.get(0);
459
460        let index_partitioning = self.index.config.index_block_partitioning_policy.get(0);
461        let filter_partitioning = self.index.config.filter_block_partitioning_policy.get(0);
462
463        log::debug!("Flushing memtable(s) and performing key-value separation, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression}, index_block_compression={index_block_compression}");
464        log::debug!("=> to table(s) in {}", table_folder.display());
465        log::debug!("=> to blob file(s) at {}", self.blobs_folder.display());
466
467        let mut table_writer = MultiWriter::new(
468            table_folder.clone(),
469            self.index.table_id_counter.clone(),
470            64_000_000,
471            0,
472        )?
473        .use_data_block_restart_interval(data_block_restart_interval)
474        .use_index_block_restart_interval(index_block_restart_interval)
475        .use_data_block_compression(data_block_compression)
476        .use_index_block_compression(index_block_compression)
477        .use_data_block_size(data_block_size)
478        .use_data_block_hash_ratio(data_block_hash_ratio)
479        .use_bloom_policy({
480            use crate::config::FilterPolicyEntry::{Bloom, None};
481            use crate::table::filter::BloomConstructionPolicy;
482
483            match self.index.config.filter_policy.get(0) {
484                Bloom(policy) => policy,
485                None => BloomConstructionPolicy::BitsPerKey(0.0),
486            }
487        });
488
489        if index_partitioning {
490            table_writer = table_writer.use_partitioned_index();
491        }
492        if filter_partitioning {
493            table_writer = table_writer.use_partitioned_filter();
494        }
495
496        let kv_opts = self
497            .index
498            .config
499            .kv_separation_opts
500            .as_ref()
501            .expect("kv separation options should exist");
502
503        let mut blob_writer = BlobFileWriter::new(
504            self.index.0.blob_file_id_counter.clone(),
505            kv_opts.file_target_size,
506            self.index.config.path.join(BLOBS_FOLDER),
507        )?
508        .use_compression(
509            self.index
510                .config
511                .kv_separation_opts
512                .as_ref()
513                .expect("blob options should exist")
514                .compression,
515        );
516
517        let separation_threshold = kv_opts.separation_threshold;
518
519        for item in stream {
520            let item = item?;
521
522            if item.is_tombstone() {
523                // NOTE: Still need to add tombstone to index tree
524                // But no blob to blob writer
525                table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
526                continue;
527            }
528
529            let value = item.value;
530
531            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
532            let value_size = value.len() as u32;
533
534            if value_size >= separation_threshold {
535                let offset = blob_writer.offset();
536                let blob_file_id = blob_writer.blob_file_id();
537                let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
538
539                let indirection = BlobIndirection {
540                    vhandle: ValueHandle {
541                        blob_file_id,
542                        offset,
543                        on_disk_size,
544                    },
545                    size: value_size,
546                };
547
548                table_writer.write({
549                    let mut vptr =
550                        InternalValue::new(item.key.clone(), indirection.encode_into_vec());
551                    vptr.key.value_type = crate::ValueType::Indirection;
552                    vptr
553                })?;
554
555                table_writer.register_blob(indirection);
556            } else {
557                table_writer.write(InternalValue::new(item.key, value))?;
558            }
559        }
560
561        let blob_files = blob_writer.finish()?;
562
563        let result = table_writer.finish()?;
564
565        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
566
567        let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
568        let pin_index = self.index.config.index_block_pinning_policy.get(0);
569
570        // Load tables
571        let tables = result
572            .into_iter()
573            .map(|(table_id, checksum)| -> crate::Result<Table> {
574                Table::recover(
575                    table_folder.join(table_id.to_string()),
576                    checksum,
577                    self.index.id,
578                    self.index.config.cache.clone(),
579                    self.index.config.descriptor_table.clone(),
580                    pin_filter,
581                    pin_index,
582                    #[cfg(feature = "metrics")]
583                    self.index.metrics.clone(),
584                )
585            })
586            .collect::<crate::Result<Vec<_>>>()?;
587
588        Ok(Some((tables, Some(blob_files))))
589    }
590
591    fn register_tables(
592        &self,
593        tables: &[Table],
594        blob_files: Option<&[BlobFile]>,
595        frag_map: Option<FragmentationMap>,
596        sealed_memtables_to_delete: &[MemtableId],
597        gc_watermark: SeqNo,
598    ) -> crate::Result<()> {
599        self.index.register_tables(
600            tables,
601            blob_files,
602            frag_map,
603            sealed_memtables_to_delete,
604            gc_watermark,
605        )
606    }
607
608    fn set_active_memtable(&self, memtable: Memtable) {
609        self.index.set_active_memtable(memtable);
610    }
611
612    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
613        self.index.add_sealed_memtable(id, memtable);
614    }
615
616    fn compact(
617        &self,
618        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
619        seqno_threshold: SeqNo,
620    ) -> crate::Result<()> {
621        self.index.compact(strategy, seqno_threshold)
622    }
623
624    fn get_next_table_id(&self) -> TableId {
625        self.index.get_next_table_id()
626    }
627
628    fn tree_config(&self) -> &Config {
629        &self.index.config
630    }
631
632    fn get_highest_seqno(&self) -> Option<SeqNo> {
633        self.index.get_highest_seqno()
634    }
635
636    fn active_memtable_size(&self) -> u64 {
637        self.index.active_memtable_size()
638    }
639
640    fn tree_type(&self) -> crate::TreeType {
641        crate::TreeType::Blob
642    }
643
644    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
645        self.index.rotate_memtable()
646    }
647
648    fn table_count(&self) -> usize {
649        self.index.table_count()
650    }
651
652    fn level_table_count(&self, idx: usize) -> Option<usize> {
653        self.index.level_table_count(idx)
654    }
655
656    fn approximate_len(&self) -> usize {
657        self.index.approximate_len()
658    }
659
660    // NOTE: Override the default implementation to not fetch
661    // data from the value log, so we get much faster key reads
662    fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
663        self.index.is_empty(seqno, index)
664    }
665
666    // NOTE: Override the default implementation to not fetch
667    // data from the value log, so we get much faster key reads
668    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
669        self.index.contains_key(key, seqno)
670    }
671
672    // NOTE: Override the default implementation to not fetch
673    // data from the value log, so we get much faster scans
674    fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
675        self.index.len(seqno, index)
676    }
677
678    fn disk_space(&self) -> u64 {
679        let version = self.current_version();
680        self.index.disk_space() + version.blob_files.on_disk_size()
681    }
682
683    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
684        self.index.get_highest_memtable_seqno()
685    }
686
687    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
688        self.index.get_highest_persisted_seqno()
689    }
690
691    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
692        &self,
693        key: K,
694        value: V,
695        seqno: SeqNo,
696    ) -> (u64, u64) {
697        self.index.insert(key, value.into(), seqno)
698    }
699
700    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
701        let key = key.as_ref();
702
703        let Some(item) = self.index.get_internal_entry(key, seqno)? else {
704            return Ok(None);
705        };
706
707        let version = self.index.get_version_for_snapshot(seqno).version;
708
709        let (_, v) = resolve_value_handle(
710            self.id(),
711            self.blobs_folder.as_path(),
712            &self.index.config.cache,
713            &self.index.config.descriptor_table,
714            &version,
715            item,
716        )?;
717
718        Ok(Some(v))
719    }
720
721    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
722        self.index.remove(key, seqno)
723    }
724
725    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
726        self.index.remove_weak(key, seqno)
727    }
728}