lsm_tree/blob_tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod gc;
6pub mod handle;
7
8#[doc(hidden)]
9pub use gc::{FragmentationEntry, FragmentationMap};
10
11use crate::{
12    coding::{Decode, Encode},
13    compaction::stream::CompactionStream,
14    file::{fsync_directory, BLOBS_FOLDER},
15    iter_guard::{IterGuard, IterGuardImpl},
16    r#abstract::{AbstractTree, RangeItem},
17    table::Table,
18    tree::inner::MemtableId,
19    value::InternalValue,
20    version::Version,
21    vlog::{Accessor, BlobFile, BlobFileWriter, ValueHandle},
22    Cache, Config, DescriptorTable, Memtable, SeqNo, SequenceNumberCounter, TableId, TreeId,
23    UserKey, UserValue,
24};
25use handle::BlobIndirection;
26use std::{io::Cursor, ops::RangeBounds, path::PathBuf, sync::Arc};
27
28pub struct Guard {
29    tree: crate::BlobTree,
30    version: Version,
31    kv: crate::Result<InternalValue>,
32}
33
34impl IterGuard for Guard {
35    fn key(self) -> crate::Result<UserKey> {
36        self.kv.map(|kv| kv.key.user_key)
37    }
38
39    fn size(self) -> crate::Result<u32> {
40        let kv = self.kv?;
41
42        if kv.key.value_type.is_indirection() {
43            let mut cursor = Cursor::new(kv.value);
44            Ok(BlobIndirection::decode_from(&mut cursor)?.size)
45        } else {
46            #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
47            Ok(kv.value.len() as u32)
48        }
49    }
50
51    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
52        resolve_value_handle(
53            self.tree.id(),
54            self.tree.blobs_folder.as_path(),
55            &self.tree.index.config.cache,
56            &self.tree.index.config.descriptor_table,
57            &self.version,
58            self.kv?,
59        )
60    }
61}
62
63fn resolve_value_handle(
64    tree_id: TreeId,
65    blobs_folder: &std::path::Path,
66    cache: &Arc<Cache>,
67    descriptor_table: &Arc<DescriptorTable>,
68    version: &Version,
69    item: InternalValue,
70) -> RangeItem {
71    if item.key.value_type.is_indirection() {
72        let mut cursor = Cursor::new(item.value);
73        let vptr = BlobIndirection::decode_from(&mut cursor)?;
74
75        // Resolve indirection using value log
76        match Accessor::new(&version.blob_files).get(
77            tree_id,
78            blobs_folder,
79            &item.key.user_key,
80            &vptr.vhandle,
81            cache,
82            descriptor_table,
83        ) {
84            Ok(Some(v)) => {
85                let k = item.key.user_key;
86                Ok((k, v))
87            }
88            Ok(None) => {
89                panic!(
90                    "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
91                    item.key.user_key, vptr.vhandle,
92                    version.id(),
93                );
94            }
95            Err(e) => Err(e),
96        }
97    } else {
98        let k = item.key.user_key;
99        let v = item.value;
100        Ok((k, v))
101    }
102}
103
104/// A key-value-separated log-structured merge tree
105///
106/// This tree is a composite structure, consisting of an
107/// index tree (LSM-tree) and a log-structured value log
108/// to reduce write amplification.
109#[derive(Clone)]
110pub struct BlobTree {
111    /// Index tree that holds value handles or small inline values
112    #[doc(hidden)]
113    pub index: crate::Tree,
114
115    blobs_folder: Arc<PathBuf>,
116}
117
118impl BlobTree {
119    pub(crate) fn open(config: Config) -> crate::Result<Self> {
120        let index = crate::Tree::open(config)?;
121
122        let blobs_folder = index.config.path.join(BLOBS_FOLDER);
123        std::fs::create_dir_all(&blobs_folder)?;
124        fsync_directory(&blobs_folder)?;
125
126        let blob_file_id_to_continue_with = index
127            .current_version()
128            .blob_files
129            .list_ids()
130            .max()
131            .map(|x| x + 1)
132            .unwrap_or_default();
133
134        index
135            .0
136            .blob_file_id_generator
137            .set(blob_file_id_to_continue_with);
138
139        Ok(Self {
140            index,
141            blobs_folder: Arc::new(blobs_folder),
142        })
143    }
144}
145
146impl AbstractTree for BlobTree {
147    fn next_table_id(&self) -> TableId {
148        self.index.next_table_id()
149    }
150
151    fn id(&self) -> crate::TreeId {
152        self.index.id()
153    }
154
155    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
156        self.index.get_internal_entry(key, seqno)
157    }
158
159    fn current_version(&self) -> Version {
160        self.index.current_version()
161    }
162
163    fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Table>> {
164        let Some((table_id, yanked_memtable)) = self.index.rotate_memtable() else {
165            return Ok(None);
166        };
167
168        let Some((table, blob_file)) =
169            self.flush_memtable(table_id, &yanked_memtable, eviction_seqno)?
170        else {
171            return Ok(None);
172        };
173        self.register_tables(
174            std::slice::from_ref(&table),
175            blob_file.as_ref().map(std::slice::from_ref),
176            None,
177        )?;
178
179        Ok(Some(table))
180    }
181
182    #[cfg(feature = "metrics")]
183    fn metrics(&self) -> &Arc<crate::Metrics> {
184        self.index.metrics()
185    }
186
187    fn version_free_list_len(&self) -> usize {
188        self.index.version_free_list_len()
189    }
190
191    fn prefix<K: AsRef<[u8]>>(
192        &self,
193        prefix: K,
194        seqno: SeqNo,
195        index: Option<Arc<Memtable>>,
196    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
197        use crate::range::prefix_to_range;
198
199        let range = prefix_to_range(prefix.as_ref());
200        let version = self.index.get_version_for_snapshot(seqno).version;
201        let tree = self.clone();
202
203        Box::new(
204            self.index
205                .create_internal_range(&range, seqno, index)
206                .map(move |kv| {
207                    IterGuardImpl::Blob(Guard {
208                        tree: tree.clone(),
209                        version: version.clone(),
210                        kv,
211                    })
212                }),
213        )
214    }
215
216    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
217        &self,
218        range: R,
219        seqno: SeqNo,
220        index: Option<Arc<Memtable>>,
221    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
222        let version = self.index.get_version_for_snapshot(seqno);
223        let tree = self.clone();
224
225        Box::new(
226            self.index
227                .create_internal_range(&range, seqno, index)
228                .map(move |kv| {
229                    IterGuardImpl::Blob(Guard {
230                        tree: tree.clone(),
231                        version: version.clone().version,
232                        kv,
233                    })
234                }),
235        )
236    }
237
238    fn tombstone_count(&self) -> u64 {
239        self.index.tombstone_count()
240    }
241
242    fn weak_tombstone_count(&self) -> u64 {
243        self.index.weak_tombstone_count()
244    }
245
246    fn weak_tombstone_reclaimable_count(&self) -> u64 {
247        self.index.weak_tombstone_reclaimable_count()
248    }
249
250    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
251        self.index.drop_range(range)
252    }
253
254    fn ingest(
255        &self,
256        iter: impl Iterator<Item = (UserKey, UserValue)>,
257        seqno_generator: &SequenceNumberCounter,
258        visible_seqno: &SequenceNumberCounter,
259    ) -> crate::Result<()> {
260        use crate::{compaction::MoveDown, tree::ingest::Ingestion};
261        use std::time::Instant;
262
263        let seqno = seqno_generator.next();
264
265        let blob_file_size = self
266            .index
267            .config
268            .kv_separation_opts
269            .as_ref()
270            .expect("kv separation options should exist")
271            .file_target_size;
272
273        let mut table_writer = Ingestion::new(&self.index)?.with_seqno(seqno);
274        let mut blob_writer = BlobFileWriter::new(
275            self.index.0.blob_file_id_generator.clone(),
276            blob_file_size,
277            self.index.config.path.join(BLOBS_FOLDER),
278        )?
279        .use_compression(
280            self.index
281                .config
282                .kv_separation_opts
283                .as_ref()
284                .expect("blob options should exist")
285                .compression,
286        );
287
288        let start = Instant::now();
289        let mut count = 0;
290        let mut last_key = None;
291
292        let separation_threshold = self
293            .index
294            .config
295            .kv_separation_opts
296            .as_ref()
297            .expect("kv separation options should exist")
298            .separation_threshold;
299
300        for (key, value) in iter {
301            if let Some(last_key) = &last_key {
302                assert!(
303                    key > last_key,
304                    "next key in bulk ingest was not greater than last key",
305                );
306            }
307            last_key = Some(key.clone());
308
309            #[expect(clippy::cast_possible_truncation, reason = "values are 32-bit max")]
310            let value_size = value.len() as u32;
311
312            if value_size >= separation_threshold {
313                let offset = blob_writer.offset();
314                let blob_file_id = blob_writer.blob_file_id();
315                let on_disk_size = blob_writer.write(&key, seqno, &value)?;
316
317                let indirection = BlobIndirection {
318                    vhandle: ValueHandle {
319                        blob_file_id,
320                        offset,
321                        on_disk_size,
322                    },
323                    size: value_size,
324                };
325
326                table_writer.write_indirection(key, indirection)?;
327            } else {
328                table_writer.write(key, value)?;
329            }
330
331            count += 1;
332        }
333
334        let blob_files = blob_writer.finish()?;
335        let results = table_writer.writer.finish()?;
336
337        let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
338        let pin_index = self.index.config.filter_block_pinning_policy.get(0);
339
340        let created_tables = results
341            .into_iter()
342            .map(|(table_id, checksum)| -> crate::Result<Table> {
343                Table::recover(
344                    self.index
345                        .config
346                        .path
347                        .join(crate::file::TABLES_FOLDER)
348                        .join(table_id.to_string()),
349                    checksum,
350                    self.index.id,
351                    self.index.config.cache.clone(),
352                    self.index.config.descriptor_table.clone(),
353                    pin_filter,
354                    pin_index,
355                    #[cfg(feature = "metrics")]
356                    self.index.metrics.clone(),
357                )
358            })
359            .collect::<crate::Result<Vec<_>>>()?;
360
361        self.register_tables(&created_tables, Some(&blob_files), None)?;
362
363        let last_level_idx = self.index.config.level_count - 1;
364
365        self.compact(Arc::new(MoveDown(0, last_level_idx)), 0)?;
366
367        visible_seqno.fetch_max(seqno + 1);
368
369        log::info!("Ingested {count} items in {:?}", start.elapsed());
370
371        Ok(())
372    }
373
374    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
375        self.index.major_compact(target_size, seqno_threshold)
376    }
377
378    fn clear_active_memtable(&self) {
379        self.index.clear_active_memtable();
380    }
381
382    fn l0_run_count(&self) -> usize {
383        self.index.l0_run_count()
384    }
385
386    fn blob_file_count(&self) -> usize {
387        self.current_version().blob_file_count()
388    }
389
390    // NOTE: We skip reading from the value log
391    // because the vHandles already store the value size
392    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
393        let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
394            return Ok(None);
395        };
396
397        Ok(Some(if item.key.value_type.is_indirection() {
398            let mut cursor = Cursor::new(item.value);
399            let vptr = BlobIndirection::decode_from(&mut cursor)?;
400            vptr.size
401        } else {
402            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
403            {
404                item.value.len() as u32
405            }
406        }))
407    }
408
409    fn stale_blob_bytes(&self) -> u64 {
410        self.current_version().gc_stats().stale_bytes()
411    }
412
413    fn filter_size(&self) -> usize {
414        self.index.filter_size()
415    }
416
417    fn pinned_filter_size(&self) -> usize {
418        self.index.pinned_filter_size()
419    }
420
421    fn pinned_block_index_size(&self) -> usize {
422        self.index.pinned_block_index_size()
423    }
424
425    fn sealed_memtable_count(&self) -> usize {
426        self.index.sealed_memtable_count()
427    }
428
429    fn flush_memtable(
430        &self,
431        table_id: TableId,
432        memtable: &Arc<Memtable>,
433        eviction_seqno: SeqNo,
434    ) -> crate::Result<Option<(Table, Option<BlobFile>)>> {
435        use crate::{file::TABLES_FOLDER, table::Writer as TableWriter};
436
437        let table_folder = self.index.config.path.join(TABLES_FOLDER);
438
439        log::debug!("Flushing memtable & performing key-value separation");
440        log::debug!("=> to table in {}", table_folder.display());
441        log::debug!("=> to blob file at {}", self.blobs_folder.display());
442
443        let mut table_writer =
444            TableWriter::new(table_folder.join(table_id.to_string()), table_id, 0)?
445                // TODO: apply other policies
446                .use_data_block_compression(self.index.config.data_block_compression_policy.get(0))
447                .use_bloom_policy({
448                    use crate::config::FilterPolicyEntry::{Bloom, None};
449                    use crate::table::filter::BloomConstructionPolicy;
450
451                    match self.index.config.filter_policy.get(0) {
452                        Bloom(policy) => policy,
453                        None => BloomConstructionPolicy::BitsPerKey(0.0),
454                    }
455                });
456
457        let mut blob_writer = BlobFileWriter::new(
458            self.index.0.blob_file_id_generator.clone(),
459            u64::MAX,
460            self.index.config.path.join(BLOBS_FOLDER),
461        )?
462        .use_compression(
463            self.index
464                .config
465                .kv_separation_opts
466                .as_ref()
467                .expect("blob options should exist")
468                .compression,
469        );
470
471        let iter = memtable.iter().map(Ok);
472        let compaction_stream = CompactionStream::new(iter, eviction_seqno);
473
474        let mut blob_bytes_referenced = 0;
475        let mut blob_on_disk_bytes_referenced = 0;
476        let mut blobs_referenced_count = 0;
477
478        let separation_threshold = self
479            .index
480            .config
481            .kv_separation_opts
482            .as_ref()
483            .expect("kv separation options should exist")
484            .separation_threshold;
485
486        for item in compaction_stream {
487            let item = item?;
488
489            if item.is_tombstone() {
490                // NOTE: Still need to add tombstone to index tree
491                // But no blob to blob writer
492                table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
493                continue;
494            }
495
496            let value = item.value;
497
498            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
499            let value_size = value.len() as u32;
500
501            if value_size >= separation_threshold {
502                let offset = blob_writer.offset();
503                let blob_file_id = blob_writer.blob_file_id();
504                let on_disk_size = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
505
506                let indirection = BlobIndirection {
507                    vhandle: ValueHandle {
508                        blob_file_id,
509                        offset,
510                        on_disk_size,
511                    },
512                    size: value_size,
513                };
514
515                table_writer.write({
516                    let mut vptr =
517                        InternalValue::new(item.key.clone(), indirection.encode_into_vec());
518                    vptr.key.value_type = crate::ValueType::Indirection;
519                    vptr
520                })?;
521
522                blob_bytes_referenced += u64::from(value_size);
523                blob_on_disk_bytes_referenced += u64::from(on_disk_size);
524                blobs_referenced_count += 1;
525            } else {
526                table_writer.write(InternalValue::new(item.key, value))?;
527            }
528        }
529
530        log::trace!("Creating blob file");
531        let blob_files = blob_writer.finish()?;
532        assert!(blob_files.len() <= 1);
533        let blob_file = blob_files.into_iter().next();
534
535        log::trace!("Creating LSM-tree table {table_id}");
536
537        if blob_bytes_referenced > 0 {
538            if let Some(blob_file) = &blob_file {
539                table_writer.link_blob_file(
540                    blob_file.id(),
541                    blobs_referenced_count,
542                    blob_bytes_referenced,
543                    blob_on_disk_bytes_referenced,
544                );
545            }
546        }
547
548        let table = self.index.consume_writer(table_writer)?;
549
550        Ok(table.map(|table| (table, blob_file)))
551    }
552
553    fn register_tables(
554        &self,
555        tables: &[Table],
556        blob_files: Option<&[BlobFile]>,
557        frag_map: Option<FragmentationMap>,
558    ) -> crate::Result<()> {
559        self.index.register_tables(tables, blob_files, frag_map)
560    }
561
562    fn set_active_memtable(&self, memtable: Memtable) {
563        self.index.set_active_memtable(memtable);
564    }
565
566    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
567        self.index.add_sealed_memtable(id, memtable);
568    }
569
570    fn compact(
571        &self,
572        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
573        seqno_threshold: SeqNo,
574    ) -> crate::Result<()> {
575        self.index.compact(strategy, seqno_threshold)
576    }
577
578    fn get_next_table_id(&self) -> TableId {
579        self.index.get_next_table_id()
580    }
581
582    fn tree_config(&self) -> &Config {
583        &self.index.config
584    }
585
586    fn get_highest_seqno(&self) -> Option<SeqNo> {
587        self.index.get_highest_seqno()
588    }
589
590    fn active_memtable_size(&self) -> u64 {
591        self.index.active_memtable_size()
592    }
593
594    fn tree_type(&self) -> crate::TreeType {
595        crate::TreeType::Blob
596    }
597
598    fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
599        self.index.rotate_memtable()
600    }
601
602    fn table_count(&self) -> usize {
603        self.index.table_count()
604    }
605
606    fn level_table_count(&self, idx: usize) -> Option<usize> {
607        self.index.level_table_count(idx)
608    }
609
610    fn approximate_len(&self) -> usize {
611        self.index.approximate_len()
612    }
613
614    // NOTE: Override the default implementation to not fetch
615    // data from the value log, so we get much faster key reads
616    fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
617        self.index.is_empty(seqno, index)
618    }
619
620    // NOTE: Override the default implementation to not fetch
621    // data from the value log, so we get much faster key reads
622    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
623        self.index.contains_key(key, seqno)
624    }
625
626    // NOTE: Override the default implementation to not fetch
627    // data from the value log, so we get much faster scans
628    fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
629        self.index.len(seqno, index)
630    }
631
632    fn disk_space(&self) -> u64 {
633        let version = self.current_version();
634        self.index.disk_space() + version.blob_files.on_disk_size()
635    }
636
637    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
638        self.index.get_highest_memtable_seqno()
639    }
640
641    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
642        self.index.get_highest_persisted_seqno()
643    }
644
645    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
646        &self,
647        key: K,
648        value: V,
649        seqno: SeqNo,
650    ) -> (u64, u64) {
651        self.index.insert(key, value.into(), seqno)
652    }
653
654    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
655        let key = key.as_ref();
656
657        let Some(item) = self.index.get_internal_entry(key, seqno)? else {
658            return Ok(None);
659        };
660
661        let version = self.index.get_version_for_snapshot(seqno).version;
662
663        let (_, v) = resolve_value_handle(
664            self.id(),
665            self.blobs_folder.as_path(),
666            &self.index.config.cache,
667            &self.index.config.descriptor_table,
668            &version,
669            item,
670        )?;
671
672        Ok(Some(v))
673    }
674
675    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
676        self.index.remove(key, seqno)
677    }
678
679    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
680        self.index.remove_weak(key, seqno)
681    }
682}