lsm_tree/blob_tree/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5mod gc;
6pub mod index;
7pub mod value;
8
9use crate::{
10    coding::{Decode, Encode},
11    compaction::stream::CompactionStream,
12    file::{fsync_directory, BLOBS_FOLDER},
13    iter_guard::{IterGuard, IterGuardImpl},
14    r#abstract::{AbstractTree, RangeItem},
15    segment::Segment,
16    tree::inner::MemtableId,
17    value::InternalValue,
18    vlog::{Accessor, BlobFile, BlobFileId, BlobFileWriter, ValueHandle, ValueLog},
19    Config, Memtable, SegmentId, SeqNo, SequenceNumberCounter, UserKey, UserValue,
20};
21use gc::{reader::GcReader, writer::GcWriter};
22use index::IndexTree;
23use std::{
24    collections::BTreeMap,
25    io::Cursor,
26    ops::{RangeBounds, RangeFull},
27    path::PathBuf,
28    sync::{
29        atomic::{AtomicU64, AtomicUsize},
30        Arc,
31    },
32};
33use value::MaybeInlineValue;
34
35pub struct Guard<'a>(
36    &'a BlobTree,
37    Arc<BTreeMap<BlobFileId, BlobFile>>,
38    crate::Result<(UserKey, UserValue)>,
39);
40
41impl IterGuard for Guard<'_> {
42    fn key(self) -> crate::Result<UserKey> {
43        self.2.map(|(k, _)| k)
44    }
45
46    fn size(self) -> crate::Result<u32> {
47        use MaybeInlineValue::{Indirect, Inline};
48
49        let value = self.2?.1;
50        let mut cursor = Cursor::new(value);
51
52        Ok(match MaybeInlineValue::decode_from(&mut cursor)? {
53            // NOTE: We know LSM-tree values are 32 bits in length max
54            #[allow(clippy::cast_possible_truncation)]
55            Inline(bytes) => bytes.len() as u32,
56
57            // NOTE: No need to resolve vHandle, because the size is already stored
58            Indirect { size, .. } => size,
59        })
60    }
61
62    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
63        resolve_value_handle(self.0, &self.1, self.2)
64    }
65}
66
67fn resolve_value_handle(
68    tree: &BlobTree,
69    vlog: &BTreeMap<BlobFileId, BlobFile>,
70    item: RangeItem,
71) -> RangeItem {
72    use MaybeInlineValue::{Indirect, Inline};
73
74    match item {
75        Ok((key, value)) => {
76            let mut cursor = Cursor::new(value);
77
78            match MaybeInlineValue::decode_from(&mut cursor)? {
79                Inline(bytes) => Ok((key, bytes)),
80                Indirect { vhandle, .. } => {
81                    // Resolve indirection using value log
82                    match Accessor::new(vlog).get(
83                        &tree.blobs_folder,
84                        &key,
85                        &vhandle,
86                        &tree.index.config.cache,
87                        &tree.index.config.descriptor_table,
88                    ) {
89                        Ok(Some(bytes)) => Ok((key, bytes)),
90                        Err(e) => Err(e),
91                        _ => {
92                            panic!("value handle ({:?} => {vhandle:?}) did not match any blob - this is a bug", String::from_utf8_lossy(&key))
93                        }
94                    }
95                }
96            }
97        }
98        Err(e) => Err(e),
99    }
100}
101
102/// A key-value-separated log-structured merge tree
103///
104/// This tree is a composite structure, consisting of an
105/// index tree (LSM-tree) and a log-structured value log
106/// to reduce write amplification.
107#[derive(Clone)]
108pub struct BlobTree {
109    /// Index tree that holds value handles or small inline values
110    #[doc(hidden)]
111    pub index: IndexTree,
112
113    blobs_folder: PathBuf,
114
115    // TODO: maybe replace this with a nonce system
116    #[doc(hidden)]
117    pub pending_segments: Arc<AtomicUsize>,
118
119    blob_file_id_generator: SequenceNumberCounter,
120}
121
122impl BlobTree {
123    pub(crate) fn open(config: Config) -> crate::Result<Self> {
124        // let path = &config.path;
125
126        // let vlog_path = path.join(BLOBS_FOLDER);
127        // let vlog_cfg =
128        //     crate::vlog::Config::new(config.cache.clone(), config.descriptor_table.clone())
129        //         .blob_file_size_bytes(config.blob_file_target_size)
130        //         .compression(config.blob_compression);
131
132        let index: IndexTree = config.open()?.into();
133
134        let blobs_folder = index.config.path.join(BLOBS_FOLDER);
135        std::fs::create_dir_all(&blobs_folder)?;
136        fsync_directory(&blobs_folder)?;
137
138        let blob_file_id_to_continue_with = index
139            .manifest
140            .read()
141            .expect("lock is poisoned")
142            .current_version()
143            .value_log
144            .values()
145            .map(BlobFile::id)
146            .max()
147            .map(|x| x + 1)
148            .unwrap_or_default();
149
150        Ok(Self {
151            index,
152            blobs_folder,
153            pending_segments: Arc::new(AtomicUsize::new(0)),
154            blob_file_id_generator: SequenceNumberCounter::new(blob_file_id_to_continue_with),
155        })
156    }
157
158    #[must_use]
159    pub fn space_amp(&self) -> f32 {
160        todo!()
161    }
162
163    /// Consumes a [`BlobFileWriter`], returning a `BlobFile` handle.
164    ///
165    /// # Note
166    ///
167    /// The blob file is **not** added to the value log immediately.
168    ///
169    /// # Errors
170    ///
171    /// Will return `Err` if an IO error occurs.
172    fn consume_blob_file_writer(writer: BlobFileWriter) -> crate::Result<Vec<BlobFile>> {
173        use crate::vlog::blob_file::{GcStats, Inner as BlobFileInner, Metadata};
174
175        let writers = writer.finish()?;
176
177        let mut blob_files = Vec::with_capacity(writers.len());
178
179        for writer in writers {
180            if writer.item_count == 0 {
181                log::debug!(
182                    "Blob file writer at {} has written no data, deleting empty blob file",
183                    writer.path.display(),
184                );
185                if let Err(e) = std::fs::remove_file(&writer.path) {
186                    log::warn!(
187                        "Could not delete empty blob file at {}: {e:?}",
188                        writer.path.display(),
189                    );
190                }
191                continue;
192            }
193
194            let blob_file_id = writer.blob_file_id;
195
196            blob_files.push(BlobFile(Arc::new(BlobFileInner {
197                id: blob_file_id,
198                path: writer.path,
199                meta: Metadata {
200                    item_count: writer.item_count,
201                    compressed_bytes: writer.written_blob_bytes,
202                    total_uncompressed_bytes: writer.uncompressed_bytes,
203
204                    // NOTE: We are checking for 0 items above
205                    // so first and last key need to exist
206                    #[allow(clippy::expect_used)]
207                    key_range: crate::KeyRange::new((
208                        writer
209                            .first_key
210                            .clone()
211                            .expect("should have written at least 1 item"),
212                        writer
213                            .last_key
214                            .clone()
215                            .expect("should have written at least 1 item"),
216                    )),
217                },
218                gc_stats: GcStats::default(),
219            })));
220
221            log::debug!(
222                "Created blob file #{blob_file_id:?} ({} items, {} userdata bytes)",
223                writer.item_count,
224                writer.uncompressed_bytes,
225            );
226        }
227
228        Ok(blob_files)
229    }
230
231    /// Scans the index tree, collecting statistics about value log fragmentation.
232    #[doc(hidden)]
233    pub fn gc_scan_stats(
234        &self,
235        seqno: SeqNo,
236        gc_watermark: SeqNo,
237    ) -> crate::Result<crate::gc::Report> {
238        use std::io::Error as IoError;
239        use MaybeInlineValue::{Indirect, Inline};
240
241        todo!()
242
243        // while self
244        //     .pending_segments
245        //     .load(std::sync::atomic::Ordering::Acquire)
246        //     > 0
247        // {
248        //     // IMPORTANT: Busy wait until all segments in-flight are committed
249        //     // to the tree
250        // }
251
252        // // IMPORTANT: Lock + snapshot memtable to avoid read skew + preventing tampering with memtable
253        // let _memtable_lock = self.index.read_lock_active_memtable();
254
255        // while self
256        //     .pending_segments
257        //     .load(std::sync::atomic::Ordering::Acquire)
258        //     > 0
259        // {
260        //     // IMPORTANT: Busy wait again until all segments in-flight are committed
261        //     // to the tree
262        // }
263
264        // let iter = self
265        //     .index
266        //     .create_internal_range::<&[u8], RangeFull>(&.., seqno, None);
267
268        // // Stores the max seqno of every blob file
269        // let mut seqno_map = crate::HashMap::<SegmentId, SeqNo>::default();
270
271        // let result = self.blobs.scan_for_stats(iter.filter_map(|kv| {
272        //     let Ok(kv) = kv else {
273        //         return Some(Err(IoError::other(
274        //             "Failed to load KV pair from index tree",
275        //         )));
276        //     };
277
278        //     let mut cursor = Cursor::new(kv.value);
279        //     let value = match MaybeInlineValue::decode_from(&mut cursor) {
280        //         Ok(v) => v,
281        //         Err(e) => return Some(Err(IoError::other(e.to_string()))),
282        //     };
283
284        //     match value {
285        //         Indirect { vhandle, size } => {
286        //             seqno_map
287        //                 .entry(vhandle.blob_file_id)
288        //                 .and_modify(|x| *x = (*x).max(kv.key.seqno))
289        //                 .or_insert(kv.key.seqno);
290
291        //             Some(Ok((vhandle, size)))
292        //         }
293        //         Inline(_) => None,
294        //     }
295        // }));
296
297        // // TODO:
298
299        // // let mut lock = self
300        // //     .blobs
301        // //     .manifest
302        // //     .blob_files
303        // //     .write()
304        // //     .expect("lock is poisoned");
305
306        // // // IMPORTANT: We are overwiting the staleness of blob files
307        // // // that contain an item that is still contained in the GC watermark
308        // // // so snapshots cannot accidentally lose data
309        // // //
310        // // // TODO: 3.0.0 this should be dealt with in value-log 2.0 (make it MVCC aware)
311        // // for (blob_file_id, max_seqno) in seqno_map {
312        // //     if gc_watermark <= max_seqno {
313        // //         if let Some(blob_file) = lock.get_mut(&blob_file_id) {
314        // //             blob_file.gc_stats.set_stale_items(0);
315        // //             blob_file.gc_stats.set_stale_bytes(0);
316        // //         }
317        // //     }
318        // // }
319
320        // result
321    }
322
323    pub fn apply_gc_strategy(
324        &self,
325        strategy: &impl crate::vlog::GcStrategy,
326        seqno: SeqNo,
327    ) -> crate::Result<u64> {
328        todo!()
329
330        // // IMPORTANT: Write lock memtable to avoid read skew
331        // let memtable_lock = self.index.lock_active_memtable();
332
333        // self.blobs.apply_gc_strategy(
334        //     strategy,
335        //     &GcReader::new(&self.index, &memtable_lock),
336        //     GcWriter::new(seqno, &memtable_lock),
337        // )?;
338
339        // // NOTE: We still have the memtable lock, can't use gc_drop_stale because recursive locking
340        // self.blobs.drop_stale_blob_files()
341    }
342
343    /// Drops all stale blob segment files
344    #[doc(hidden)]
345    pub fn gc_drop_stale(&self) -> crate::Result<u64> {
346        todo!()
347
348        // // IMPORTANT: Write lock memtable to avoid read skew
349        // let _lock = self.index.lock_active_memtable();
350
351        // self.blobs.drop_stale_blob_files()
352    }
353
354    #[doc(hidden)]
355    pub fn flush_active_memtable(&self, eviction_seqno: SeqNo) -> crate::Result<Option<Segment>> {
356        let Some((segment_id, yanked_memtable)) = self.index.rotate_memtable() else {
357            return Ok(None);
358        };
359
360        let Some((segment, blob_file)) =
361            self.flush_memtable(segment_id, &yanked_memtable, eviction_seqno)?
362        else {
363            return Ok(None);
364        };
365        self.register_segments(
366            std::slice::from_ref(&segment),
367            blob_file.as_ref().map(std::slice::from_ref),
368            eviction_seqno,
369        )?;
370
371        Ok(Some(segment))
372    }
373}
374
375impl AbstractTree for BlobTree {
376    #[cfg(feature = "metrics")]
377    fn metrics(&self) -> &Arc<crate::Metrics> {
378        self.index.metrics()
379    }
380
381    fn version_free_list_len(&self) -> usize {
382        self.index.version_free_list_len()
383    }
384
385    fn prefix<K: AsRef<[u8]>>(
386        &self,
387        prefix: K,
388        seqno: SeqNo,
389        index: Option<Arc<Memtable>>,
390    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
391        let version = self
392            .index
393            .manifest
394            .read()
395            .expect("lock is poisoned")
396            .current_version()
397            .clone();
398
399        // TODO: PERF: ugly Arc clone
400        Box::new(
401            self.index
402                .0
403                .create_prefix(&prefix, seqno, index)
404                .map(move |kv| IterGuardImpl::Blob(Guard(self, version.value_log.clone(), kv))),
405        )
406    }
407
408    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
409        &self,
410        range: R,
411        seqno: SeqNo,
412        index: Option<Arc<Memtable>>,
413    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<'_>> + '_> {
414        let version = self
415            .index
416            .manifest
417            .read()
418            .expect("lock is poisoned")
419            .current_version()
420            .clone();
421
422        // TODO: PERF: ugly Arc clone
423        Box::new(
424            self.index
425                .0
426                .create_range(&range, seqno, index)
427                .map(move |kv| IterGuardImpl::Blob(Guard(self, version.value_log.clone(), kv))),
428        )
429    }
430
431    fn tombstone_count(&self) -> u64 {
432        self.index.tombstone_count()
433    }
434
435    fn drop_range(&self, key_range: crate::KeyRange) -> crate::Result<()> {
436        self.index.drop_range(key_range)
437    }
438
439    fn ingest(
440        &self,
441        iter: impl Iterator<Item = (UserKey, UserValue)>,
442        seqno_generator: &SequenceNumberCounter,
443        visible_seqno: &SequenceNumberCounter,
444    ) -> crate::Result<()> {
445        use crate::tree::ingest::Ingestion;
446        use std::time::Instant;
447
448        // TODO: take curr seqno for ingest, HOWEVER
449        // TODO: we need to take the next seqno AFTER locking the memtable
450
451        todo!();
452
453        // // NOTE: Lock active memtable so nothing else can be going on while we are bulk loading
454        // let lock = self.lock_active_memtable();
455        // assert!(
456        //     lock.is_empty(),
457        //     "can only perform bulk_ingest on empty trees",
458        // );
459
460        // let mut segment_writer = Ingestion::new(&self.index)?;
461        // let mut blob_writer = self.blobs.get_writer()?;
462
463        // let start = Instant::now();
464        // let mut count = 0;
465        // let mut last_key = None;
466
467        // for (key, value) in iter {
468        //     if let Some(last_key) = &last_key {
469        //         assert!(
470        //             key > last_key,
471        //             "next key in bulk ingest was not greater than last key",
472        //         );
473        //     }
474        //     last_key = Some(key.clone());
475
476        //     // NOTE: Values are 32-bit max
477        //     #[allow(clippy::cast_possible_truncation)]
478        //     let value_size = value.len() as u32;
479
480        //     if value_size >= self.index.config.blob_file_separation_threshold {
481        //         let vhandle = blob_writer.get_next_value_handle();
482
483        //         let indirection = MaybeInlineValue::Indirect {
484        //             vhandle,
485        //             size: value_size,
486        //         };
487        //         // TODO: use Slice::with_size
488        //         let mut serialized_indirection = vec![];
489        //         indirection.encode_into(&mut serialized_indirection)?;
490
491        //         segment_writer.write(key.clone(), serialized_indirection.into())?;
492
493        //         blob_writer.write(&key, value)?;
494        //     } else {
495        //         // TODO: use Slice::with_size
496        //         let direct = MaybeInlineValue::Inline(value);
497        //         let serialized_direct = direct.encode_into_vec();
498        //         segment_writer.write(key, serialized_direct.into())?;
499        //     }
500
501        //     count += 1;
502        // }
503
504        // // TODO: add to manifest + unit test
505        // // self.blobs.register_writer(blob_writer)?;
506        // // segment_writer.finish()?;
507
508        // log::info!("Ingested {count} items in {:?}", start.elapsed());
509
510        Ok(())
511    }
512
513    fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
514        self.index.major_compact(target_size, seqno_threshold)
515    }
516
517    fn clear_active_memtable(&self) {
518        self.index.clear_active_memtable();
519    }
520
521    fn l0_run_count(&self) -> usize {
522        self.index.l0_run_count()
523    }
524
525    fn blob_file_count(&self) -> usize {
526        self.index
527            .manifest
528            .read()
529            .expect("lock is poisoned")
530            .current_version()
531            .blob_file_count()
532    }
533
534    // NOTE: We skip reading from the value log
535    // because the vHandles already store the value size
536    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
537        let vhandle = self.index.get_vhandle(key.as_ref(), seqno)?;
538
539        Ok(vhandle.map(|x| match x {
540            // NOTE: Values are u32 length max
541            #[allow(clippy::cast_possible_truncation)]
542            MaybeInlineValue::Inline(v) => v.len() as u32,
543
544            // NOTE: We skip reading from the value log
545            // because the indirections already store the value size
546            MaybeInlineValue::Indirect { size, .. } => size,
547        }))
548    }
549
550    fn filter_size(&self) -> usize {
551        self.index.filter_size()
552    }
553
554    fn pinned_filter_size(&self) -> usize {
555        self.index.pinned_filter_size()
556    }
557
558    fn pinned_block_index_size(&self) -> usize {
559        self.index.pinned_block_index_size()
560    }
561
562    fn sealed_memtable_count(&self) -> usize {
563        self.index.sealed_memtable_count()
564    }
565
566    fn flush_memtable(
567        &self,
568        segment_id: SegmentId,
569        memtable: &Arc<Memtable>,
570        eviction_seqno: SeqNo,
571    ) -> crate::Result<Option<(Segment, Option<BlobFile>)>> {
572        use crate::{file::SEGMENTS_FOLDER, segment::Writer as SegmentWriter};
573        use value::MaybeInlineValue;
574
575        let lsm_segment_folder = self.index.config.path.join(SEGMENTS_FOLDER);
576
577        log::debug!("Flushing memtable & performing key-value separation");
578        log::debug!("=> to LSM segments in {}", lsm_segment_folder.display());
579        // log::debug!("=> to blob segment at {}", self.blobs.path.display());
580
581        let mut segment_writer = SegmentWriter::new(
582            lsm_segment_folder.join(segment_id.to_string()),
583            segment_id,
584            /* Options {
585                segment_id,
586                data_block_size: self.index.config.data_block_size,
587                index_block_size: self.index.config.index_block_size,
588                folder: lsm_segment_folder,
589            } */
590        )?
591        .use_data_block_compression(self.index.config.data_block_compression_policy.get(0));
592        // TODO: monkey
593        /* segment_writer = segment_writer.use_bloom_policy(
594            crate::segment::writer::BloomConstructionPolicy::FpRate(0.0001),
595        ); */
596
597        let mut blob_writer = BlobFileWriter::new(
598            self.blob_file_id_generator.clone(),
599            u64::MAX,
600            self.index.config.path.join(BLOBS_FOLDER),
601        )?;
602        // TODO: select compression
603
604        // let mut blob_writer = self.blobs.get_writer()?.use_target_size(u64::MAX);
605
606        let iter = memtable.iter().map(Ok);
607        let compaction_filter = CompactionStream::new(iter, eviction_seqno);
608
609        for item in compaction_filter {
610            let item = item?;
611
612            if item.is_tombstone() {
613                // NOTE: Still need to add tombstone to index tree
614                // But no blob to blob writer
615                segment_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
616                continue;
617            }
618
619            let mut cursor = Cursor::new(item.value);
620
621            let value = MaybeInlineValue::decode_from(&mut cursor)?;
622            let value = match value {
623                MaybeInlineValue::Inline(value) => value,
624                indirection @ MaybeInlineValue::Indirect { .. } => {
625                    // NOTE: This is a previous indirection, just write it to index tree
626                    // without writing the blob again
627
628                    let mut serialized_indirection = vec![];
629                    indirection.encode_into(&mut serialized_indirection)?;
630
631                    segment_writer
632                        .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
633
634                    continue;
635                }
636            };
637
638            // NOTE: Values are 32-bit max
639            #[allow(clippy::cast_possible_truncation)]
640            let value_size = value.len() as u32;
641
642            if value_size >= self.index.config.blob_file_separation_threshold {
643                let offset = blob_writer.offset();
644                let blob_file_id = blob_writer.blob_file_id();
645                let on_disk_size = blob_writer.write(&item.key.user_key, value)?;
646
647                let indirection = MaybeInlineValue::Indirect {
648                    vhandle: ValueHandle {
649                        blob_file_id,
650                        offset,
651                        on_disk_size,
652                    },
653                    size: value_size,
654                };
655                // TODO: use Slice::with_size
656                let mut serialized_indirection = vec![];
657                indirection.encode_into(&mut serialized_indirection)?;
658
659                segment_writer
660                    .write(InternalValue::new(item.key.clone(), serialized_indirection))?;
661            } else {
662                // TODO: use Slice::with_size
663                let direct = MaybeInlineValue::Inline(value);
664                let serialized_direct = direct.encode_into_vec();
665                segment_writer.write(InternalValue::new(item.key, serialized_direct))?;
666            }
667        }
668
669        // let _memtable_lock = self.lock_active_memtable();
670
671        // TODO: 3.0.0: add to vlog atomically together with the segment (that way, we don't need the pending_segments monkey patch)
672        log::trace!("Creating blob file");
673        let blob_files = Self::consume_blob_file_writer(blob_writer)?;
674        assert!(blob_files.len() <= 1);
675        let blob_file = blob_files.into_iter().next();
676
677        log::trace!("Creating LSM-tree segment {segment_id}");
678        let segment = self.index.consume_writer(segment_writer)?;
679
680        // TODO: this can probably solved in a nicer way
681        if segment.is_some() {
682            // IMPORTANT: Increment the pending count
683            // so there cannot be a GC scan now, until the segment is registered
684            self.pending_segments
685                .fetch_add(1, std::sync::atomic::Ordering::Release);
686        }
687
688        Ok(segment.map(|segment| (segment, blob_file)))
689    }
690
691    fn register_segments(
692        &self,
693        segments: &[Segment],
694        blob_files: Option<&[BlobFile]>,
695        seqno_threshold: SeqNo,
696    ) -> crate::Result<()> {
697        self.index
698            .register_segments(segments, blob_files, seqno_threshold)?;
699
700        let count = self
701            .pending_segments
702            .load(std::sync::atomic::Ordering::Acquire);
703
704        assert!(
705            count >= segments.len(),
706            "pending_segments is less than segments to register - this is a bug"
707        );
708
709        self.pending_segments
710            .fetch_sub(segments.len(), std::sync::atomic::Ordering::Release);
711
712        Ok(())
713    }
714
715    fn lock_active_memtable(&self) -> std::sync::RwLockWriteGuard<'_, Arc<Memtable>> {
716        self.index.lock_active_memtable()
717    }
718
719    fn set_active_memtable(&self, memtable: Memtable) {
720        self.index.set_active_memtable(memtable);
721    }
722
723    fn add_sealed_memtable(&self, id: MemtableId, memtable: Arc<Memtable>) {
724        self.index.add_sealed_memtable(id, memtable);
725    }
726
727    fn compact(
728        &self,
729        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
730        seqno_threshold: SeqNo,
731    ) -> crate::Result<()> {
732        self.index.compact(strategy, seqno_threshold)
733    }
734
735    fn get_next_segment_id(&self) -> SegmentId {
736        self.index.get_next_segment_id()
737    }
738
739    fn tree_config(&self) -> &Config {
740        &self.index.config
741    }
742
743    fn get_highest_seqno(&self) -> Option<SeqNo> {
744        self.index.get_highest_seqno()
745    }
746
747    fn active_memtable_size(&self) -> u64 {
748        self.index.active_memtable_size()
749    }
750
751    fn tree_type(&self) -> crate::TreeType {
752        crate::TreeType::Blob
753    }
754
755    fn rotate_memtable(&self) -> Option<(crate::tree::inner::MemtableId, Arc<crate::Memtable>)> {
756        self.index.rotate_memtable()
757    }
758
759    fn segment_count(&self) -> usize {
760        self.index.segment_count()
761    }
762
763    fn level_segment_count(&self, idx: usize) -> Option<usize> {
764        self.index.level_segment_count(idx)
765    }
766
767    fn approximate_len(&self) -> usize {
768        self.index.approximate_len()
769    }
770
771    // NOTE: Override the default implementation to not fetch
772    // data from the value log, so we get much faster key reads
773    fn is_empty(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<bool> {
774        self.index.is_empty(seqno, index)
775    }
776
777    // NOTE: Override the default implementation to not fetch
778    // data from the value log, so we get much faster key reads
779    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
780        self.index.contains_key(key, seqno)
781    }
782
783    // NOTE: Override the default implementation to not fetch
784    // data from the value log, so we get much faster scans
785    fn len(&self, seqno: SeqNo, index: Option<Arc<Memtable>>) -> crate::Result<usize> {
786        self.index.len(seqno, index)
787    }
788
789    fn disk_space(&self) -> u64 {
790        let lock = self.index.manifest.read().expect("lock is poisoned");
791        let version = lock.current_version();
792        let vlog = crate::vlog::Accessor::new(&version.value_log);
793        self.index.disk_space() + vlog.disk_space()
794    }
795
796    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
797        self.index.get_highest_memtable_seqno()
798    }
799
800    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
801        self.index.get_highest_persisted_seqno()
802    }
803
804    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
805        &self,
806        key: K,
807        value: V,
808        seqno: SeqNo,
809    ) -> (u64, u64) {
810        use value::MaybeInlineValue;
811
812        // TODO: let's store a struct in memtables instead
813        // TODO: that stores slice + is_user_value
814        // TODO: then we can avoid alloc + memcpy here
815        // TODO: benchmark for very large values
816
817        // NOTE: Initially, we always write an inline value
818        // On memtable flush, depending on the values' sizes, they will be separated
819        // into inline or indirect values
820        let item = MaybeInlineValue::Inline(value.into());
821
822        let value = item.encode_into_vec();
823
824        self.index.insert(key, value, seqno)
825    }
826
827    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
828        use value::MaybeInlineValue::{Indirect, Inline};
829
830        let key = key.as_ref();
831
832        // TODO: refactor memtable, sealed memtables, manifest lock to be a single lock (SuperVersion kind of)
833        // TODO: then, try to reduce the lock access to 1, because we are accessing it twice (index.get, and then vhandle resolving...)
834
835        let Some(value) = self.index.get_vhandle(key, seqno)? else {
836            return Ok(None);
837        };
838
839        match value {
840            Inline(bytes) => Ok(Some(bytes)),
841            Indirect { vhandle, .. } => {
842                let lock = self.index.manifest.read().expect("lock is poisoned");
843                let vlog = crate::vlog::Accessor::new(&lock.current_version().value_log);
844
845                // Resolve indirection using value log
846                match vlog.get(
847                    &self.blobs_folder,
848                    key,
849                    &vhandle,
850                    &self.index.config.cache,
851                    &self.index.config.descriptor_table,
852                )? {
853                    Some(v) => Ok(Some(v)),
854                    None => {
855                        panic!("value handle ({key:?} => {vhandle:?}) did not match any blob - this is a bug")
856                    }
857                }
858            }
859        }
860    }
861
862    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
863        self.index.remove(key, seqno)
864    }
865
866    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
867        self.index.remove_weak(key, seqno)
868    }
869}