Skip to main content

lsm_tree/blob_tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod gc;
6pub mod handle;
7pub mod ingest;
8
9#[doc(hidden)]
10pub use gc::{FragmentationEntry, FragmentationMap};
11
12use crate::{
13    coding::Decode,
14    iter_guard::{IterGuard, IterGuardImpl},
15    r#abstract::{AbstractTree, RangeItem},
16    table::Table,
17    tree::inner::MemtableId,
18    value::InternalValue,
19    version::Version,
20    vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
21    Cache, Config, Memtable, SeqNo, TableId, TreeId, UserKey, UserValue,
22};
23use handle::BlobIndirection;
24use std::{
25    ops::RangeBounds,
26    path::{Path, PathBuf},
27    sync::{Arc, MutexGuard},
28};
29
30/// Iterator value guard
31pub struct Guard {
32    tree: crate::BlobTree,
33    version: Version,
34    kv: crate::Result<InternalValue>,
35}
36
37impl IterGuard for Guard {
38    fn into_inner_if(
39        self,
40        pred: impl Fn(&UserKey) -> bool,
41    ) -> crate::Result<(UserKey, Option<UserValue>)> {
42        let kv = self.kv?;
43
44        if pred(&kv.key.user_key) {
45            resolve_value_handle(
46                self.tree.id(),
47                self.tree.blobs_folder.as_path(),
48                &self.tree.index.config.cache,
49                &self.version,
50                kv,
51            )
52            .map(|(k, v)| (k, Some(v)))
53        } else {
54            Ok((kv.key.user_key, None))
55        }
56    }
57
58    fn key(self) -> crate::Result<UserKey> {
59        self.kv.map(|kv| kv.key.user_key)
60    }
61
62    fn size(self) -> crate::Result<u32> {
63        let kv = self.kv?;
64
65        if kv.key.value_type.is_indirection() {
66            let mut cursor = std::io::Cursor::new(kv.value);
67            Ok(BlobIndirection::decode_from(&mut cursor)?.size)
68        } else {
69            #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
70            Ok(kv.value.len() as u32)
71        }
72    }
73
74    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
75        resolve_value_handle(
76            self.tree.id(),
77            self.tree.blobs_folder.as_path(),
78            &self.tree.index.config.cache,
79            &self.version,
80            self.kv?,
81        )
82    }
83}
84
85fn resolve_value_handle(
86    tree_id: TreeId,
87    blobs_folder: &Path,
88    cache: &Cache,
89    version: &Version,
90    item: InternalValue,
91) -> RangeItem {
92    if item.key.value_type.is_indirection() {
93        let mut cursor = std::io::Cursor::new(item.value);
94        let vptr = BlobIndirection::decode_from(&mut cursor)?;
95
96        // Resolve indirection using value log
97        match Accessor::new(&version.blob_files).get(
98            tree_id,
99            blobs_folder,
100            &item.key.user_key,
101            &vptr.vhandle,
102            cache,
103        ) {
104            Ok(Some(v)) => {
105                let k = item.key.user_key;
106                Ok((k, v))
107            }
108            Ok(None) => {
109                panic!(
110                    "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
111                    item.key.user_key, vptr.vhandle,
112                    version.id(),
113                );
114            }
115            Err(e) => Err(e),
116        }
117    } else {
118        let k = item.key.user_key;
119        let v = item.value;
120        Ok((k, v))
121    }
122}
123
124/// A key-value-separated log-structured merge tree
125///
126/// This tree is a composite structure, consisting of an
127/// index tree (LSM-tree) and a log-structured value log
128/// to reduce write amplification.
129#[derive(Clone)]
130pub struct BlobTree {
131    /// Index tree that holds value handles or small inline values
132    #[doc(hidden)]
133    pub index: crate::Tree,
134
135    blobs_folder: Arc<PathBuf>,
136}
137
138impl BlobTree {
139    pub(crate) fn open(config: Config) -> crate::Result<Self> {
140        use crate::file::{fsync_directory, BLOBS_FOLDER};
141
142        let index = crate::Tree::open(config)?;
143
144        let blobs_folder = index.config.path.join(BLOBS_FOLDER);
145        std::fs::create_dir_all(&blobs_folder)?;
146        fsync_directory(&blobs_folder)?;
147
148        let blob_file_id_to_continue_with = index
149            .current_version()
150            .blob_files
151            .list_ids()
152            .max()
153            .map(|x| x + 1)
154            .unwrap_or_default();
155
156        index
157            .0
158            .blob_file_id_counter
159            .set(blob_file_id_to_continue_with);
160
161        Ok(Self {
162            index,
163            blobs_folder: Arc::new(blobs_folder),
164        })
165    }
166}
167
168impl AbstractTree for BlobTree {
169    fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
170        self.index.print_trace(key)
171    }
172
173    fn table_file_cache_size(&self) -> usize {
174        self.index.table_file_cache_size()
175    }
176
177    fn get_version_history_lock(
178        &self,
179    ) -> std::sync::RwLockWriteGuard<'_, crate::version::SuperVersions> {
180        self.index.get_version_history_lock()
181    }
182
183    fn next_table_id(&self) -> TableId {
184        self.index.next_table_id()
185    }
186
187    fn id(&self) -> crate::TreeId {
188        self.index.id()
189    }
190
191    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
192        self.index.get_internal_entry(key, seqno)
193    }
194
195    fn current_version(&self) -> Version {
196        self.index.current_version()
197    }
198
199    #[cfg(feature = "metrics")]
200    fn metrics(&self) -> &Arc<crate::Metrics> {
201        self.index.metrics()
202    }
203
204    fn version_free_list_len(&self) -> usize {
205        self.index.version_free_list_len()
206    }
207
208    fn prefix<K: AsRef<[u8]>>(
209        &self,
210        prefix: K,
211        seqno: SeqNo,
212        index: Option<(Arc<Memtable>, SeqNo)>,
213    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
214        use crate::range::prefix_to_range;
215
216        let super_version = self.index.get_version_for_snapshot(seqno);
217        let tree = self.clone();
218
219        let range = prefix_to_range(prefix.as_ref());
220
221        Box::new(
222            crate::Tree::create_internal_range(super_version.clone(), &range, seqno, index).map(
223                move |kv| {
224                    IterGuardImpl::Blob(Guard {
225                        tree: tree.clone(),
226                        version: super_version.version.clone(),
227                        kv,
228                    })
229                },
230            ),
231        )
232    }
233
234    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
235        &self,
236        range: R,
237        seqno: SeqNo,
238        index: Option<(Arc<Memtable>, SeqNo)>,
239    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
240        let super_version = self.index.get_version_for_snapshot(seqno);
241        let tree = self.clone();
242
243        Box::new(
244            crate::Tree::create_internal_range(super_version.clone(), &range, seqno, index).map(
245                move |kv| {
246                    IterGuardImpl::Blob(Guard {
247                        tree: tree.clone(),
248                        version: super_version.version.clone(),
249                        kv,
250                    })
251                },
252            ),
253        )
254    }
255
256    fn tombstone_count(&self) -> u64 {
257        self.index.tombstone_count()
258    }
259
260    fn weak_tombstone_count(&self) -> u64 {
261        self.index.weak_tombstone_count()
262    }
263
264    fn weak_tombstone_reclaimable_count(&self) -> u64 {
265        self.index.weak_tombstone_reclaimable_count()
266    }
267
268    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
269        self.index.drop_range(range)
270    }
271
272    fn clear(&self) -> crate::Result<()> {
273        self.index.clear()
274    }
275
276    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
277        self.index.major_compact(target_size, seqno_threshold)
278    }
279
280    fn clear_active_memtable(&self) {
281        self.index.clear_active_memtable();
282    }
283
284    fn l0_run_count(&self) -> usize {
285        self.index.l0_run_count()
286    }
287
288    fn blob_file_count(&self) -> usize {
289        self.current_version().blob_file_count()
290    }
291
292    // NOTE: We skip reading from the value log
293    // because the vHandles already store the value size
294    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
295        let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
296            return Ok(None);
297        };
298
299        Ok(Some(if item.key.value_type.is_indirection() {
300            let mut cursor = std::io::Cursor::new(item.value);
301            let vptr = BlobIndirection::decode_from(&mut cursor)?;
302            vptr.size
303        } else {
304            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
305            {
306                item.value.len() as u32
307            }
308        }))
309    }
310
311    fn stale_blob_bytes(&self) -> u64 {
312        self.current_version().gc_stats().stale_bytes()
313    }
314
315    fn filter_size(&self) -> u64 {
316        self.index.filter_size()
317    }
318
319    fn pinned_filter_size(&self) -> usize {
320        self.index.pinned_filter_size()
321    }
322
323    fn pinned_block_index_size(&self) -> usize {
324        self.index.pinned_block_index_size()
325    }
326
327    fn sealed_memtable_count(&self) -> usize {
328        self.index.sealed_memtable_count()
329    }
330
331    fn get_flush_lock(&self) -> MutexGuard<'_, ()> {
332        self.index.get_flush_lock()
333    }
334
335    fn flush_to_tables(
336        &self,
337        stream: impl Iterator<Item = crate::Result<InternalValue>>,
338    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
339        use crate::{
340            coding::Encode, file::BLOBS_FOLDER, file::TABLES_FOLDER,
341            table::multi_writer::MultiWriter,
342        };
343
344        let start = std::time::Instant::now();
345
346        let table_folder = self.index.config.path.join(TABLES_FOLDER);
347
348        let data_block_size = self.index.config.data_block_size_policy.get(0);
349
350        let data_block_restart_interval =
351            self.index.config.data_block_restart_interval_policy.get(0);
352        let index_block_restart_interval =
353            self.index.config.index_block_restart_interval_policy.get(0);
354
355        let data_block_compression = self.index.config.data_block_compression_policy.get(0);
356        let index_block_compression = self.index.config.index_block_compression_policy.get(0);
357
358        let data_block_hash_ratio = self.index.config.data_block_hash_ratio_policy.get(0);
359
360        let index_partitioning = self.index.config.index_block_partitioning_policy.get(0);
361        let filter_partitioning = self.index.config.filter_block_partitioning_policy.get(0);
362
363        log::debug!("Flushing memtable(s) and performing key-value separation, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}");
364        log::debug!("=> to table(s) in {}", table_folder.display());
365        log::debug!("=> to blob file(s) at {}", self.blobs_folder.display());
366
367        let mut table_writer = MultiWriter::new(
368            table_folder.clone(),
369            self.index.table_id_counter.clone(),
370            64 * 1_024 * 1_024,
371            0,
372        )?
373        .use_data_block_restart_interval(data_block_restart_interval)
374        .use_index_block_restart_interval(index_block_restart_interval)
375        .use_data_block_compression(data_block_compression)
376        .use_index_block_compression(index_block_compression)
377        .use_data_block_size(data_block_size)
378        .use_data_block_hash_ratio(data_block_hash_ratio)
379        .use_bloom_policy({
380            use crate::config::FilterPolicyEntry::{Bloom, None};
381            use crate::table::filter::BloomConstructionPolicy;
382
383            match self.index.config.filter_policy.get(0) {
384                Bloom(policy) => policy,
385                None => BloomConstructionPolicy::BitsPerKey(0.0),
386            }
387        });
388
389        if index_partitioning {
390            table_writer = table_writer.use_partitioned_index();
391        }
392        if filter_partitioning {
393            table_writer = table_writer.use_partitioned_filter();
394        }
395
396        #[expect(
397            clippy::expect_used,
398            reason = "cannot create blob tree without defining kv separation options"
399        )]
400        let kv_opts = self
401            .index
402            .config
403            .kv_separation_opts
404            .as_ref()
405            .expect("kv separation options should exist");
406
407        let mut blob_writer = BlobFileWriter::new(
408            self.index.0.blob_file_id_counter.clone(),
409            self.index.config.path.join(BLOBS_FOLDER),
410            self.id(),
411            self.index.config.descriptor_table.clone(),
412        )?
413        .use_target_size(kv_opts.file_target_size)
414        .use_compression(kv_opts.compression);
415
416        let separation_threshold = kv_opts.separation_threshold;
417
418        for item in stream {
419            let item = item?;
420
421            if item.is_tombstone() {
422                // NOTE: Still need to add tombstone to index tree
423                // But no blob to blob writer
424                table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
425                continue;
426            }
427
428            let value = item.value;
429
430            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
431            let value_size = value.len() as u32;
432
433            if value_size >= separation_threshold {
434                let offset = blob_writer.offset();
435                let blob_file_id = blob_writer.blob_file_id();
436                let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
437
438                let indirection = BlobIndirection {
439                    vhandle: ValueHandle {
440                        blob_file_id,
441                        offset,
442                        on_disk_size,
443                    },
444                    size: value_size,
445                };
446
447                table_writer.write({
448                    let mut vptr =
449                        InternalValue::new(item.key.clone(), indirection.encode_into_vec());
450                    vptr.key.value_type = crate::ValueType::Indirection;
451                    vptr
452                })?;
453
454                table_writer.register_blob(indirection);
455            } else {
456                table_writer.write(InternalValue::new(item.key, value))?;
457            }
458        }
459
460        let blob_files = blob_writer.finish()?;
461
462        let result = table_writer.finish()?;
463
464        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
465
466        let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
467        let pin_index = self.index.config.index_block_pinning_policy.get(0);
468
469        // Load tables
470        let tables = result
471            .into_iter()
472            .map(|(table_id, checksum)| -> crate::Result<Table> {
473                Table::recover(
474                    table_folder.join(table_id.to_string()),
475                    checksum,
476                    0,
477                    self.index.id,
478                    self.index.config.cache.clone(),
479                    self.index.config.descriptor_table.clone(),
480                    pin_filter,
481                    pin_index,
482                    #[cfg(feature = "metrics")]
483                    self.index.metrics.clone(),
484                )
485            })
486            .collect::<crate::Result<Vec<_>>>()?;
487
488        Ok(Some((tables, Some(blob_files))))
489    }
490
491    fn register_tables(
492        &self,
493        tables: &[Table],
494        blob_files: Option<&[BlobFile]>,
495        frag_map: Option<FragmentationMap>,
496        sealed_memtables_to_delete: &[MemtableId],
497        gc_watermark: SeqNo,
498    ) -> crate::Result<()> {
499        self.index.register_tables(
500            tables,
501            blob_files,
502            frag_map,
503            sealed_memtables_to_delete,
504            gc_watermark,
505        )
506    }
507
508    fn compact(
509        &self,
510        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
511        seqno_threshold: SeqNo,
512    ) -> crate::Result<()> {
513        self.index.compact(strategy, seqno_threshold)
514    }
515
516    fn get_next_table_id(&self) -> TableId {
517        self.index.get_next_table_id()
518    }
519
520    fn tree_config(&self) -> &Config {
521        &self.index.config
522    }
523
524    fn get_highest_seqno(&self) -> Option<SeqNo> {
525        self.index.get_highest_seqno()
526    }
527
528    fn active_memtable(&self) -> Arc<Memtable> {
529        self.index.active_memtable()
530    }
531
532    fn tree_type(&self) -> crate::TreeType {
533        crate::TreeType::Blob
534    }
535
536    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
537        self.index.rotate_memtable()
538    }
539
540    fn table_count(&self) -> usize {
541        self.index.table_count()
542    }
543
544    fn level_table_count(&self, idx: usize) -> Option<usize> {
545        self.index.level_table_count(idx)
546    }
547
548    fn approximate_len(&self) -> usize {
549        self.index.approximate_len()
550    }
551
552    // NOTE: Override the default implementation to not fetch
553    // data from the value log, so we get much faster key reads
554    fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
555        self.index.is_empty(seqno, index)
556    }
557
558    // NOTE: Override the default implementation to not fetch
559    // data from the value log, so we get much faster key reads
560    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
561        self.index.contains_key(key, seqno)
562    }
563
564    // NOTE: Override the default implementation to not fetch
565    // data from the value log, so we get much faster scans
566    fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
567        self.index.len(seqno, index)
568    }
569
570    fn disk_space(&self) -> u64 {
571        let version = self.current_version();
572        self.index.disk_space() + version.blob_files.on_disk_size()
573    }
574
575    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
576        self.index.get_highest_memtable_seqno()
577    }
578
579    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
580        self.index.get_highest_persisted_seqno()
581    }
582
583    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
584        &self,
585        key: K,
586        value: V,
587        seqno: SeqNo,
588    ) -> (u64, u64) {
589        self.index.insert(key, value.into(), seqno)
590    }
591
592    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
593        let key = key.as_ref();
594
595        #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
596        let super_version = self
597            .index
598            .version_history
599            .read()
600            .expect("lock is poisoned")
601            .get_version_for_snapshot(seqno);
602
603        let Some(item) = crate::Tree::get_internal_entry_from_version(&super_version, key, seqno)?
604        else {
605            return Ok(None);
606        };
607
608        let (_, v) = resolve_value_handle(
609            self.id(),
610            self.blobs_folder.as_path(),
611            &self.index.config.cache,
612            &super_version.version,
613            item,
614        )?;
615
616        Ok(Some(v))
617    }
618
619    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
620        self.index.remove(key, seqno)
621    }
622
623    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
624        self.index.remove_weak(key, seqno)
625    }
626}