Skip to main content

lsm_tree/blob_tree/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5mod gc;
6pub mod handle;
7pub mod ingest;
8
9#[doc(hidden)]
10pub use gc::{FragmentationEntry, FragmentationMap};
11
12use crate::path::{Path, PathBuf};
13use crate::tree::inner::{FlushGuard, VersionsWriteGuard};
14use crate::{
15    Cache, Config, Memtable, ScanSinceEvent, SeqNo, TableId, TreeId, UserKey, UserValue,
16    abstract_tree::{AbstractTree, RangeItem},
17    coding::Decode,
18    iter_guard::{IterGuard, IterGuardImpl},
19    table::Table,
20    tree::inner::MemtableId,
21    value::InternalValue,
22    version::Version,
23    vlog::{Accessor, BlobFile, BlobFileWriter},
24};
25use alloc::sync::Arc;
26#[cfg(not(feature = "std"))]
27use alloc::{boxed::Box, string::ToString, vec::Vec};
28use core::ops::RangeBounds;
29use handle::BlobIndirection;
30
31/// Iterator value guard
32pub struct Guard {
33    tree: crate::BlobTree,
34    version: Version,
35    kv: crate::Result<InternalValue>,
36}
37
38impl IterGuard for Guard {
39    fn into_inner_if(
40        self,
41        pred: impl Fn(&UserKey) -> bool,
42    ) -> crate::Result<(UserKey, Option<UserValue>)> {
43        let kv = self.kv?;
44
45        if pred(&kv.key.user_key) {
46            resolve_value_handle(
47                self.tree.id(),
48                self.tree.blobs_folder.as_path(),
49                &self.tree.index.config.cache,
50                &self.version,
51                kv,
52                #[cfg(zstd_any)]
53                self.tree
54                    .index
55                    .config
56                    .kv_separation_opts
57                    .as_ref()
58                    .and_then(|o| o.zstd_dictionary.as_deref()),
59            )
60            .map(|(k, v)| (k, Some(v)))
61        } else {
62            Ok((kv.key.user_key, None))
63        }
64    }
65
66    fn key(self) -> crate::Result<UserKey> {
67        self.kv.map(|kv| kv.key.user_key)
68    }
69
70    fn size(self) -> crate::Result<u32> {
71        let kv = self.kv?;
72
73        if kv.key.value_type.is_indirection() {
74            let mut cursor = crate::io::Cursor::new(kv.value);
75            Ok(BlobIndirection::decode_from(&mut cursor)?.size)
76        } else {
77            #[expect(clippy::cast_possible_truncation, reason = "values are u32 max length")]
78            Ok(kv.value.len() as u32)
79        }
80    }
81
82    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
83        resolve_value_handle(
84            self.tree.id(),
85            self.tree.blobs_folder.as_path(),
86            &self.tree.index.config.cache,
87            &self.version,
88            self.kv?,
89            #[cfg(zstd_any)]
90            self.tree
91                .index
92                .config
93                .kv_separation_opts
94                .as_ref()
95                .and_then(|o| o.zstd_dictionary.as_deref()),
96        )
97    }
98}
99
100fn resolve_value_handle(
101    tree_id: TreeId,
102    blobs_folder: &Path,
103    cache: &Cache,
104    version: &Version,
105    item: InternalValue,
106    #[cfg(zstd_any)] zstd_dictionary: Option<&crate::compression::ZstdDictionary>,
107) -> RangeItem {
108    if item.key.value_type.is_indirection() {
109        let mut cursor = crate::io::Cursor::new(item.value);
110        let vptr = BlobIndirection::decode_from(&mut cursor)?;
111
112        // Resolve indirection using value log
113        let accessor = {
114            let a = Accessor::new(&version.blob_files);
115            #[cfg(zstd_any)]
116            let a = a.with_dict(zstd_dictionary);
117            a
118        };
119
120        match accessor.get(
121            tree_id,
122            blobs_folder,
123            &item.key.user_key,
124            &vptr.vhandle,
125            cache,
126        ) {
127            Ok(Some(v)) => {
128                let k = item.key.user_key;
129                Ok((k, v))
130            }
131            Ok(None) => {
132                panic!(
133                    "value handle ({:?} => {:?}) did not match any blob - this is a bug; version={}",
134                    item.key.user_key,
135                    vptr.vhandle,
136                    version.id(),
137                );
138            }
139            Err(e) => Err(e),
140        }
141    } else {
142        let k = item.key.user_key;
143        let v = item.value;
144        Ok((k, v))
145    }
146}
147
148/// A key-value-separated log-structured merge tree
149///
150/// This tree is a composite structure, consisting of an
151/// index tree (LSM-tree) and a log-structured value log
152/// to reduce write amplification.
153#[derive(Clone)]
154pub struct BlobTree {
155    /// Index tree that holds value handles or small inline values
156    #[doc(hidden)]
157    pub index: crate::Tree,
158
159    blobs_folder: Arc<PathBuf>,
160}
161
162impl BlobTree {
163    /// Physical footprint of the stale blob files a full compaction would
164    /// relocate: the linked, stale, non-dead subset across every live table (the
165    /// SAME set the merge gate budgets via `pick_blob_files_to_rewrite`), summed
166    /// by on-disk size. Zero when KV separation is unconfigured.
167    fn full_compaction_blob_need(&self, version: &crate::version::Version) -> crate::Result<u64> {
168        let Some(blob_opts) = &self.index.config.kv_separation_opts else {
169            return Ok(0);
170        };
171        let all_tables: crate::HashSet<TableId> = version.iter_tables().map(Table::id).collect();
172        crate::compaction::worker::pick_blob_files_to_rewrite(&all_tables, version, blob_opts)?
173            .iter()
174            .try_fold(0u64, |acc, bf| bf.physical_size().map(|size| acc + size))
175    }
176
177    pub(crate) fn open(config: Config) -> crate::Result<Self> {
178        use crate::file::{BLOBS_FOLDER, fsync_directory};
179
180        let index = crate::Tree::open(config)?;
181
182        let blobs_folder = index.config.path.join(BLOBS_FOLDER);
183        (*index.config.fs).create_dir_all(&blobs_folder)?;
184        fsync_directory(&blobs_folder, &*index.config.fs, index.config.sync_mode)?;
185
186        let blob_file_id_to_continue_with = index
187            .current_version()
188            .blob_files
189            .list_ids()
190            .max()
191            .map(|x| x + 1)
192            .unwrap_or_default();
193
194        index
195            .0
196            .blob_file_id_counter
197            .set(blob_file_id_to_continue_with);
198
199        Ok(Self {
200            index,
201            blobs_folder: Arc::new(blobs_folder),
202        })
203    }
204
205    /// Resolves a single key against a pre-acquired [`SuperVersion`](crate::version::SuperVersion).
206    fn resolve_key(
207        &self,
208        super_version: &crate::version::SuperVersion,
209        key: &[u8],
210        seqno: SeqNo,
211    ) -> crate::Result<Option<UserValue>> {
212        let Some(item) = crate::Tree::get_internal_entry_from_version(
213            super_version,
214            key,
215            seqno,
216            self.index.config.comparator.as_ref(),
217        )?
218        else {
219            return Ok(None);
220        };
221
222        let (_, v) = resolve_value_handle(
223            self.id(),
224            self.blobs_folder.as_path(),
225            &self.index.config.cache,
226            &super_version.version,
227            item,
228            #[cfg(zstd_any)]
229            self.index
230                .config
231                .kv_separation_opts
232                .as_ref()
233                .and_then(|o| o.zstd_dictionary.as_deref()),
234        )?;
235
236        Ok(Some(v))
237    }
238
239    /// Iterate change events with `seqno >= target_seqno`, resolving
240    /// KV-separated values.
241    ///
242    /// Same change-data-capture contract as [`Tree::scan_since_seqno`](crate::Tree::scan_since_seqno),
243    /// but a blob-indirected value is resolved from its blob file into a
244    /// [`ScanSinceEvent::Insert`] carrying the real value, so a downstream
245    /// consumer can replicate without access to the source's blob files.
246    /// Block-skip, seqno ordering, and tombstone handling are identical to the
247    /// standard-tree path (the same shared aggregation backs both).
248    ///
249    /// # Panics
250    ///
251    /// Panics if the internal version-history lock is poisoned.
252    ///
253    /// # Errors
254    ///
255    /// Returns `Err` if reading the index, a data block, or a referenced blob
256    /// fails.
257    pub fn scan_since_seqno(
258        &self,
259        target_seqno: SeqNo,
260    ) -> crate::Result<impl Iterator<Item = ScanSinceEvent> + use<>> {
261        self.index
262            .scan_since_seqno_with(target_seqno, true, |version, entry| {
263                let seqno = entry.key.seqno;
264                let (key, value) = resolve_value_handle(
265                    self.id(),
266                    self.blobs_folder.as_path(),
267                    &self.index.config.cache,
268                    version,
269                    entry,
270                    #[cfg(zstd_any)]
271                    self.index
272                        .config
273                        .kv_separation_opts
274                        .as_ref()
275                        .and_then(|o| o.zstd_dictionary.as_deref()),
276                )?;
277                Ok(ScanSinceEvent::Insert { key, value, seqno })
278            })
279    }
280}
281
282impl crate::abstract_tree::sealed::Sealed for BlobTree {}
283
284/// Maps a raw merge-pipeline item into a KV-separated iterator guard that
285/// resolves the blob handle lazily against `version`.
286fn blob_guard(
287    tree: &crate::BlobTree,
288    version: &Version,
289    item: crate::Result<InternalValue>,
290) -> IterGuardImpl {
291    IterGuardImpl::Blob(Guard {
292        tree: tree.clone(),
293        version: version.clone(),
294        kv: item,
295    })
296}
297
298/// Wraps a [`SeekableTreeIter`](crate::range::SeekableTreeIter) over the index
299/// tree so a KV-separated tree can expose it as a [`SeekableGuardIter`](crate::iter_guard::SeekableGuardIter).
300struct BlobSeekable {
301    inner: crate::range::SeekableTreeIter,
302    tree: crate::BlobTree,
303    version: Version,
304}
305
306impl Iterator for BlobSeekable {
307    type Item = IterGuardImpl;
308
309    fn next(&mut self) -> Option<Self::Item> {
310        self.inner
311            .next()
312            .map(|item| blob_guard(&self.tree, &self.version, item))
313    }
314}
315
316impl DoubleEndedIterator for BlobSeekable {
317    fn next_back(&mut self) -> Option<Self::Item> {
318        self.inner
319            .next_back()
320            .map(|item| blob_guard(&self.tree, &self.version, item))
321    }
322}
323
324impl crate::iter_guard::SeekableGuardIter for BlobSeekable {
325    fn seek_to(&mut self, key: &[u8]) {
326        self.inner.seek_to(key);
327    }
328
329    fn seek_to_for_prev(&mut self, key: &[u8]) {
330        self.inner.seek_to_for_prev(key);
331    }
332
333    fn peek_key(&mut self) -> Option<crate::Result<crate::UserKey>> {
334        // The key lives in the index tree (blob separation only moves the value),
335        // so the inner seekable's peek already yields the right user key.
336        self.inner.peek_key()
337    }
338}
339
340impl AbstractTree for BlobTree {
341    #[cfg(feature = "std")]
342    fn create_checkpoint(
343        &self,
344        target_path: &crate::path::Path,
345    ) -> crate::Result<crate::CheckpointInfo> {
346        crate::checkpoint::run_checkpoint(
347            self,
348            &crate::checkpoint::CheckpointParams {
349                target_root: target_path,
350                target_fs: &self.index.config.fs,
351                src_root: &self.index.config.path,
352                src_fs: &self.index.config.fs,
353                deletion_pause: &self.index.deletion_pause,
354                visible_seqno: &self.index.config.visible_seqno,
355                include_blobs: true,
356                runtime_config: self.index.0.runtime_config.load_full(),
357                encryption: self.index.0.config.encryption.clone(),
358            },
359        )
360    }
361
362    fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
363        self.index.print_trace(key)
364    }
365
366    fn table_file_cache_size(&self) -> usize {
367        self.index.table_file_cache_size()
368    }
369
370    fn get_version_history_lock(&self) -> VersionsWriteGuard<'_> {
371        self.index.get_version_history_lock()
372    }
373
374    fn next_table_id(&self) -> TableId {
375        self.index.next_table_id()
376    }
377
378    fn id(&self) -> crate::TreeId {
379        self.index.id()
380    }
381
382    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
383        self.index.get_internal_entry(key, seqno)
384    }
385
386    fn current_version(&self) -> Version {
387        self.index.current_version()
388    }
389
390    fn storage_stats(&self) -> crate::Result<crate::StorageStats> {
391        // Forward the index tree's compaction state (the default impl would
392        // always report idle), and mark value bytes as NOT user values: large
393        // values are KV-separated into blob files, so the SST records only
394        // indirection pointers. One version snapshot is reused for both the
395        // footprint and the blob-headroom sum below: a second `current_version()`
396        // could race a concurrent flush / compaction and mix two snapshots.
397        let version = self.current_version();
398        let mut stats = crate::storage_stats::compute_storage_stats(
399            &version,
400            self.index.is_compacting(),
401            false,
402        )?;
403        // Capacity is disk-aware and driven by the index tree's runtime config;
404        // its footprint basis (`compute_used_bytes`) already stats blob files, so
405        // `used_bytes` here is the blob-inclusive figure capacity is measured
406        // against.
407        let (capacity, available, compaction_possible) =
408            self.index.admission_capacity(stats.used_bytes);
409        stats.capacity_bytes = capacity;
410        stats.available_bytes = available;
411        stats.compaction_possible = compaction_possible;
412        // A full compaction of a blob tree also relocates STALE blob files, which
413        // the merge gate budgets via `pick_blob_files_to_rewrite` (linked, stale,
414        // non-dead). Estimate the same subset across every table — NOT the whole
415        // live blob footprint, which would overstate the need (large non-stale
416        // blobs are not rewritten) and report tight while the gate would admit
417        // the merge. Fold it into the gauge figure.
418        let blob_need = self.full_compaction_blob_need(&version)?;
419        stats.full_compaction_bytes += blob_need;
420        // Surface full-vs-tight compaction availability through the SAME two-layer
421        // check the compaction space gate enforces (logical quota + physical free
422        // per destination volume), so the status matches what the gate admits.
423        if self.index.storage_admission_enabled()
424            && capacity.is_some()
425            && stats.status == crate::StorageStatus::Healthy
426        {
427            // SST output lands in the LAST configured level's volume
428            // (`level_count - 1`), stale blob relocation in the primary blobs
429            // volume; the demand is bounded by the largest level's size. The
430            // per-volume gate (not `available >= full_compaction_bytes` against
431            // the min-volume free) keeps the status from reporting tight when the
432            // SST and blob outputs each fit their own volume — see the gate's
433            // two-layer model.
434            let sst_need = crate::storage_stats::full_compaction_demand_bytes(&version);
435            // `saturating_sub`: `level_count >= 1` always (the clamp only guards a
436            // degenerate zero-level config) → the last level index.
437            let sst_dest_level = self.index.config.level_count.saturating_sub(1);
438            let quota_headroom = self.index.quota_headroom(stats.used_bytes);
439            let full_fits = crate::compaction::worker::space_fits_two_layer(
440                &self.index.config,
441                quota_headroom,
442                sst_need,
443                sst_dest_level,
444                blob_need,
445            );
446            stats.status = if full_fits {
447                crate::StorageStatus::FullCompactionAvailable
448            } else {
449                crate::StorageStatus::TightCompactionAvailable
450            };
451        }
452        // Admission is driven by the index tree's runtime config / footprint;
453        // a closed gate is the operator-actionable state (see the standard
454        // tree's override for the precedence rationale).
455        if self.index.is_read_only() {
456            stats.status = crate::StorageStatus::ReadOnlyOutOfSpace;
457        }
458        Ok(stats)
459    }
460
461    fn write_admission(&self) -> crate::Result<()> {
462        // Admission state lives on the index tree (which holds the runtime
463        // config). The forward is blob-aware: the index tree's version IS this
464        // blob tree's version (current_version() delegates here), and the gate's
465        // footprint basis (`storage_stats::compute_used_bytes`) stats live
466        // tables AND blob files, so blob bytes count toward the budget.
467        self.index.write_admission()
468    }
469
470    fn write_backpressure(
471        &self,
472        strategy: &dyn crate::compaction::CompactionStrategy,
473    ) -> crate::Backpressure {
474        // Same delegation as write_admission: the index tree holds the runtime
475        // config and its version is this blob tree's version, so the L0-count
476        // and pending-compaction-bytes signals are computed on the right state.
477        self.index.write_backpressure(strategy)
478    }
479
480    #[cfg(feature = "metrics")]
481    fn cache_stats(&self) -> crate::CacheStats {
482        self.index.cache_stats()
483    }
484
485    #[cfg(feature = "metrics")]
486    fn metrics(&self) -> &Arc<crate::Metrics> {
487        self.index.metrics()
488    }
489
490    fn version_free_list_len(&self) -> usize {
491        self.index.version_free_list_len()
492    }
493
494    fn prefix<K: AsRef<[u8]>>(
495        &self,
496        prefix: K,
497        seqno: SeqNo,
498        index: Option<(Arc<Memtable>, SeqNo)>,
499    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
500        use crate::prefix::compute_prefix_hash;
501        use crate::range::prefix_to_range;
502
503        let prefix_bytes = prefix.as_ref();
504
505        let prefix_hash =
506            compute_prefix_hash(self.index.config.prefix_extractor.as_ref(), prefix_bytes);
507
508        let super_version = self.index.get_version_for_snapshot(seqno);
509        let tree = self.clone();
510
511        let range = prefix_to_range(prefix_bytes);
512
513        Box::new(
514            crate::Tree::create_internal_range_with_prefix_hash(
515                super_version.clone(),
516                &range,
517                seqno,
518                index,
519                None, // BlobTree does not use merge operators for prefix scans
520                self.index.config.comparator.clone(),
521                prefix_hash,
522            )
523            .map(move |kv| {
524                IterGuardImpl::Blob(Guard {
525                    tree: tree.clone(),
526                    version: super_version.version.clone(),
527                    kv,
528                })
529            }),
530        )
531    }
532
533    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
534        &self,
535        range: R,
536        seqno: SeqNo,
537        index: Option<(Arc<Memtable>, SeqNo)>,
538    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
539        let super_version = self.index.get_version_for_snapshot(seqno);
540        let tree = self.clone();
541
542        Box::new(
543            crate::Tree::create_internal_range(
544                super_version.clone(),
545                &range,
546                seqno,
547                index,
548                None,
549                self.index.config.comparator.clone(),
550            )
551            .map(move |kv| {
552                IterGuardImpl::Blob(Guard {
553                    tree: tree.clone(),
554                    version: super_version.version.clone(),
555                    kv,
556                })
557            }),
558        )
559    }
560
561    fn range_seekable<K: AsRef<[u8]>, R: RangeBounds<K>>(
562        &self,
563        range: R,
564        seqno: SeqNo,
565        index: Option<(Arc<Memtable>, SeqNo)>,
566    ) -> Box<dyn crate::iter_guard::SeekableGuardIter + 'static> {
567        let (lo, hi) = crate::tree::range_to_user_bounds(&range);
568        let inner = self
569            .index
570            .create_seekable_range_bounds(lo, hi, seqno, index);
571        let version = inner.version();
572        Box::new(BlobSeekable {
573            inner,
574            tree: self.clone(),
575            version,
576        })
577    }
578
579    fn batch_range_scan<K: AsRef<[u8]>, R: RangeBounds<K> + 'static, I: IntoIterator<Item = R>>(
580        &self,
581        intervals: I,
582        seqno: SeqNo,
583        index: Option<(Arc<Memtable>, SeqNo)>,
584    ) -> Box<dyn Iterator<Item = IterGuardImpl> + Send + 'static>
585    where
586        I::IntoIter: Send + 'static,
587    {
588        let inner = self.index.create_seekable_range_bounds(
589            core::ops::Bound::Unbounded,
590            core::ops::Bound::Unbounded,
591            seqno,
592            index,
593        );
594        let version = inner.version();
595        let tree = self.clone();
596        let intervals = intervals
597            .into_iter()
598            .map(|r| crate::tree::range_to_user_bounds(&r));
599        Box::new(
600            crate::range::BatchRangeScan::new(inner, intervals)
601                .map(move |item| blob_guard(&tree, &version, item)),
602        )
603    }
604
605    fn tombstone_count(&self) -> u64 {
606        self.index.tombstone_count()
607    }
608
609    fn weak_tombstone_count(&self) -> u64 {
610        self.index.weak_tombstone_count()
611    }
612
613    fn weak_tombstone_reclaimable_count(&self) -> u64 {
614        self.index.weak_tombstone_reclaimable_count()
615    }
616
617    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
618        self.index.drop_range(range)
619    }
620
621    fn clear(&self) -> crate::Result<()> {
622        let config = self.tree_config();
623        let mut versions = self.get_version_history_lock();
624
625        // Pre-clear snapshot: its tables AND blob files all become garbage once
626        // the new empty version is installed.
627        let prior = versions.latest_version();
628
629        versions.upgrade_version(
630            &config.path,
631            |v| {
632                let mut copy = v.clone();
633                copy.active_memtable = Arc::new(Memtable::new(
634                    self.index.memtable_id_counter.next(),
635                    config.comparator.clone(),
636                ));
637                copy.sealed_memtables = Arc::default();
638                copy.version = Version::new(v.version.id() + 1, self.tree_type());
639                Ok(copy)
640            },
641            &config.seqno,
642            &config.visible_seqno,
643            &*config.fs,
644            self.index.0.runtime_config.load_full(),
645            self.index.0.config.encryption.clone(),
646        )?;
647
648        // Same MVCC-safe reclaim as the standard tree, plus the blob files:
649        // mark every obsolete table / blob file deleted, drop the history's
650        // hold, and let Inner::Drop reclaim each once its last reference is
651        // released (a live reader's snapshot clone defers deletion).
652        versions.drain_obsolete_to_latest();
653        drop(versions);
654
655        for table in prior.version.iter_tables() {
656            table.mark_as_deleted();
657        }
658        for blob_file in prior.version.blob_files.iter() {
659            blob_file.mark_as_deleted();
660        }
661
662        Ok(())
663    }
664
665    fn major_compact(
666        &self,
667        target_size: u64,
668        seqno_threshold: SeqNo,
669    ) -> crate::Result<crate::compaction::CompactionResult> {
670        self.index.major_compact(target_size, seqno_threshold)
671    }
672
673    fn clear_active_memtable(&self) {
674        self.index.clear_active_memtable();
675    }
676
677    fn l0_run_count(&self) -> usize {
678        self.index.l0_run_count()
679    }
680
681    fn blob_file_count(&self) -> usize {
682        self.current_version().blob_file_count()
683    }
684
685    // NOTE: We skip reading from the value log
686    // because the vHandles already store the value size
687    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
688        let Some(item) = self.index.get_internal_entry(key.as_ref(), seqno)? else {
689            return Ok(None);
690        };
691
692        Ok(Some(if item.key.value_type.is_indirection() {
693            let mut cursor = crate::io::Cursor::new(item.value);
694            let vptr = BlobIndirection::decode_from(&mut cursor)?;
695            vptr.size
696        } else {
697            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
698            {
699                item.value.len() as u32
700            }
701        }))
702    }
703
704    fn stale_blob_bytes(&self) -> u64 {
705        self.current_version().gc_stats().stale_bytes()
706    }
707
708    fn filter_size(&self) -> u64 {
709        self.index.filter_size()
710    }
711
712    fn pinned_filter_size(&self) -> usize {
713        self.index.pinned_filter_size()
714    }
715
716    fn pinned_block_index_size(&self) -> usize {
717        self.index.pinned_block_index_size()
718    }
719
720    fn sealed_memtable_count(&self) -> usize {
721        self.index.sealed_memtable_count()
722    }
723
724    fn get_flush_lock(&self) -> FlushGuard<'_> {
725        self.index.get_flush_lock()
726    }
727
728    #[expect(clippy::too_many_lines, reason = "flush logic is inherently complex")]
729    fn flush_to_tables_with_rt(
730        &self,
731        stream: impl Iterator<Item = crate::Result<InternalValue>>,
732        range_tombstones: Vec<crate::range_tombstone::RangeTombstone>,
733    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
734        use crate::{coding::Encode, file::BLOBS_FOLDER, table::multi_writer::MultiWriter};
735
736        let start = crate::time::Instant::now();
737
738        let (table_folder, level_fs) = self.index.config.tables_folder_for_level(0);
739
740        let data_block_size = self.index.config.data_block_size_policy.get(0);
741
742        let data_block_restart_interval =
743            self.index.config.data_block_restart_interval_policy.get(0);
744        let index_block_restart_interval =
745            self.index.config.index_block_restart_interval_policy.get(0);
746
747        let data_block_compression = self.index.config.data_block_compression_policy.get(0);
748        let index_block_compression = self.index.config.index_block_compression_policy.get(0);
749
750        let data_block_hash_ratio = self.index.config.data_block_hash_ratio_policy.get(0);
751
752        let index_partitioning = self.index.config.index_block_partitioning_policy.get(0);
753        let filter_partitioning = self.index.config.filter_block_partitioning_policy.get(0);
754
755        log::debug!(
756            "Flushing memtable(s) and performing key-value separation, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}"
757        );
758        log::debug!("=> to table(s) in {}", table_folder.display());
759        log::debug!("=> to blob file(s) at {}", self.blobs_folder.display());
760
761        let mut table_writer = MultiWriter::new(
762            table_folder.clone(),
763            self.index.table_id_counter.clone(),
764            64 * 1_024 * 1_024,
765            0,
766            level_fs.clone(),
767        )?
768        .set_comparator(self.index.config.comparator.clone())
769        .use_data_block_restart_interval(data_block_restart_interval)
770        .use_index_block_restart_interval(index_block_restart_interval)
771        .use_data_block_compression(data_block_compression)
772        .use_index_block_compression(index_block_compression)
773        .use_data_block_size(data_block_size)
774        .use_data_block_hash_ratio(data_block_hash_ratio)
775        .use_bloom_policy({
776            use crate::config::FilterPolicyEntry::{Bloom, None};
777            use crate::table::filter::BloomConstructionPolicy;
778
779            match self.index.config.filter_policy.get(0) {
780                Bloom(policy) => policy,
781                None => BloomConstructionPolicy::BitsPerKey(0.0),
782            }
783        });
784
785        // Live runtime snapshot: a blob tree's index must honor the same
786        // per-flush SST config as a standard tree's flush, not just the
787        // structural block options above.
788        let rc = self.index.0.runtime_config.load_full();
789
790        if index_partitioning {
791            // Size-adaptive index: single-level for small SSTs, spill to
792            // partitioned only past the threshold (see flush path).
793            table_writer = table_writer.use_adaptive_index(rc.index_partition_spill_threshold);
794        }
795        if filter_partitioning {
796            table_writer = table_writer.use_partitioned_filter();
797        }
798
799        table_writer =
800            table_writer.use_prefix_extractor(self.index.config.prefix_extractor.clone());
801        table_writer = table_writer.use_encryption(self.index.config.encryption.clone());
802        // Runtime-driven SST options, mirroring the standard tree's flush so a
803        // blob tree's index honors columnar, zone maps, ECC, seqno-in-index,
804        // per-KV checksums, locator policy, sync mode, and CoW disable.
805        table_writer = table_writer.use_page_ecc(self.index.config.page_ecc, rc.ecc_scheme);
806        table_writer = table_writer.use_sync_mode(self.index.config.sync_mode);
807        table_writer = table_writer.use_seqno_in_index(rc.seqno_in_index);
808        table_writer = table_writer.use_zone_map(rc.zone_map);
809        table_writer = table_writer.use_columnar(rc.columnar);
810        table_writer = table_writer.use_disable_cow_on_sst(rc.disable_cow_on_sst_files);
811        table_writer = table_writer.use_kv_checksums(rc.kv_checksums, rc.kv_checksum_algo);
812        table_writer = table_writer.use_locator(self.index.config.locator_policy.get(0));
813
814        #[cfg(zstd_any)]
815        {
816            table_writer =
817                table_writer.use_zstd_dictionary(self.index.config.zstd_dictionary.clone());
818        }
819
820        #[expect(
821            clippy::expect_used,
822            reason = "cannot create blob tree without defining kv separation options"
823        )]
824        let kv_opts = self
825            .index
826            .config
827            .kv_separation_opts
828            .as_ref()
829            .expect("kv separation options should exist");
830
831        let mut blob_writer = {
832            let w = BlobFileWriter::new(
833                self.index.0.blob_file_id_counter.clone(),
834                self.index.config.path.join(BLOBS_FOLDER),
835                self.id(),
836                self.index.config.descriptor_table.clone(),
837                self.index.config.fs.clone(),
838            )?
839            .use_target_size(kv_opts.file_target_size)
840            .use_compression(kv_opts.compression)
841            .use_sync_mode(self.index.config.sync_mode);
842            #[cfg(zstd_any)]
843            let w = w.use_zstd_dictionary(kv_opts.zstd_dictionary.clone());
844            w
845        };
846
847        let separation_threshold = kv_opts.separation_threshold;
848
849        // Set range tombstones BEFORE writing KV items so that if MultiWriter
850        // rotates to a new table during the write loop, earlier tables already
851        // carry the RT metadata.
852        table_writer.set_range_tombstones(range_tombstones);
853
854        for item in stream {
855            let item = item?;
856
857            if item.is_tombstone() {
858                // NOTE: Still need to add tombstone to index tree
859                // But no blob to blob writer
860                table_writer.write(InternalValue::new(item.key, UserValue::empty()))?;
861                continue;
862            }
863
864            let value = item.value;
865
866            #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
867            let value_size = value.len() as u32;
868
869            if value_size >= separation_threshold {
870                let vhandle = blob_writer.write(&item.key.user_key, item.key.seqno, &value)?;
871
872                let indirection = BlobIndirection {
873                    vhandle,
874                    size: value_size,
875                };
876
877                table_writer.write({
878                    let mut vptr =
879                        InternalValue::new(item.key.clone(), indirection.encode_into_vec());
880                    vptr.key.value_type = crate::ValueType::Indirection;
881                    vptr
882                })?;
883
884                table_writer.register_blob(indirection);
885            } else {
886                table_writer.write(InternalValue::new(item.key, value))?;
887            }
888        }
889
890        let blob_files = blob_writer.finish()?;
891
892        let result = table_writer.finish()?;
893
894        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
895
896        let pin_filter = self.index.config.filter_block_pinning_policy.get(0);
897        let pin_index = self.index.config.index_block_pinning_policy.get(0);
898
899        // Load tables
900        let tables = result
901            .into_iter()
902            .map(|(table_id, checksum)| -> crate::Result<Table> {
903                Table::recover(
904                    table_folder.join(table_id.to_string()),
905                    checksum,
906                    0,
907                    self.index.id,
908                    table_id,
909                    self.index.config.cache.clone(),
910                    self.index.config.descriptor_table.clone(),
911                    level_fs.clone(),
912                    pin_filter,
913                    pin_index,
914                    self.index.config.encryption.clone(),
915                    #[cfg(zstd_any)]
916                    self.index.config.zstd_dictionary.clone(),
917                    self.index.config.comparator.clone(),
918                    #[cfg(feature = "metrics")]
919                    self.index.metrics.clone(),
920                )
921            })
922            .collect::<crate::Result<Vec<_>>>()?;
923
924        // Return Some even when tables is empty (RT-only flush): the caller
925        // (AbstractTree::flush) handles empty tables by re-inserting RTs into
926        // the active memtable and still needs to delete sealed memtables.
927        Ok(Some((tables, Some(blob_files))))
928    }
929
930    fn register_tables(
931        &self,
932        tables: &[Table],
933        blob_files: Option<&[BlobFile]>,
934        frag_map: Option<FragmentationMap>,
935        sealed_memtables_to_delete: &[MemtableId],
936        gc_watermark: SeqNo,
937    ) -> crate::Result<()> {
938        self.index.register_tables(
939            tables,
940            blob_files,
941            frag_map,
942            sealed_memtables_to_delete,
943            gc_watermark,
944        )
945    }
946
947    fn compact(
948        &self,
949        strategy: Arc<dyn crate::compaction::CompactionStrategy>,
950        seqno_threshold: SeqNo,
951    ) -> crate::Result<crate::compaction::CompactionResult> {
952        self.index.compact(strategy, seqno_threshold)
953    }
954
955    fn get_next_table_id(&self) -> TableId {
956        self.index.get_next_table_id()
957    }
958
959    fn tree_config(&self) -> &Config {
960        &self.index.config
961    }
962
963    fn get_highest_seqno(&self) -> Option<SeqNo> {
964        self.index.get_highest_seqno()
965    }
966
967    fn active_memtable(&self) -> Arc<Memtable> {
968        self.index.active_memtable()
969    }
970
971    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
972        self.index.rotate_memtable()
973    }
974
975    fn table_count(&self) -> usize {
976        self.index.table_count()
977    }
978
979    fn level_table_count(&self, idx: usize) -> Option<usize> {
980        self.index.level_table_count(idx)
981    }
982
983    fn approximate_len(&self) -> usize {
984        self.index.approximate_len()
985    }
986
987    // NOTE: Override the default implementation to not fetch
988    // data from the value log, so we get much faster key reads
989    fn is_empty(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<bool> {
990        self.index.is_empty(seqno, index)
991    }
992
993    // NOTE: Override the default implementation to not fetch
994    // data from the value log, so we get much faster key reads
995    fn contains_key<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<bool> {
996        self.index.contains_key(key, seqno)
997    }
998
999    // NOTE: Override the default implementation to delegate directly
1000    // to the index tree, avoiding extra iterator/guard overhead for
1001    // prefix checks
1002    fn contains_prefix<K: AsRef<[u8]>>(
1003        &self,
1004        prefix: K,
1005        seqno: SeqNo,
1006        index: Option<(Arc<Memtable>, SeqNo)>,
1007    ) -> crate::Result<bool> {
1008        self.index.contains_prefix(prefix, seqno, index)
1009    }
1010
1011    // NOTE: Override the default implementation to not fetch
1012    // data from the value log, so we get much faster scans
1013    fn len(&self, seqno: SeqNo, index: Option<(Arc<Memtable>, SeqNo)>) -> crate::Result<usize> {
1014        self.index.len(seqno, index)
1015    }
1016
1017    fn disk_space(&self) -> u64 {
1018        let version = self.current_version();
1019        self.index.disk_space() + version.blob_files.on_disk_size()
1020    }
1021
1022    fn approximate_range_stats<K: AsRef<[u8]>, R: core::ops::RangeBounds<K>>(
1023        &self,
1024        range: R,
1025        seqno: SeqNo,
1026    ) -> crate::Result<crate::ApproximateRangeStats> {
1027        // The index tree's SSTs record their referenced blob bytes per-SST (at
1028        // both flush and compaction), so its estimate already apportions the
1029        // blob bytes by the in-range fraction. Delegating keeps the
1030        // KV-separated estimate unified with the standard path.
1031        //
1032        // The figure tracks the value's CURRENT physical location: an unflushed
1033        // value still lives inline in the index memtable, so the memtable share
1034        // counts its full size there; once flushed it is separated into a blob
1035        // file and counted via the SST's referenced-blob bytes. Both reflect the
1036        // real footprint at the requested snapshot.
1037        self.index.approximate_range_stats(range, seqno)
1038    }
1039
1040    fn approximate_range_cardinality<K: AsRef<[u8]>, R: core::ops::RangeBounds<K>>(
1041        &self,
1042        range: R,
1043        seqno: SeqNo,
1044    ) -> crate::Result<crate::RangeCardinality> {
1045        // Cardinality counts rows (index entries) and their key ranges, both of
1046        // which live entirely in the index tree; blob files hold only the
1047        // separated values, so the index tree's zone map and counts are authoritative.
1048        self.index.approximate_range_cardinality(range, seqno)
1049    }
1050
1051    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
1052        self.index.get_highest_memtable_seqno()
1053    }
1054
1055    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
1056        self.index.get_highest_persisted_seqno()
1057    }
1058
1059    fn apply_batch(&self, batch: crate::WriteBatch, seqno: SeqNo) -> crate::Result<(u64, u64)> {
1060        self.index.apply_batch(batch, seqno)
1061    }
1062
1063    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
1064        &self,
1065        key: K,
1066        value: V,
1067        seqno: SeqNo,
1068    ) -> (u64, u64) {
1069        self.index.insert(key, value.into(), seqno)
1070    }
1071
1072    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
1073        let super_version = self.index.get_version_for_snapshot(seqno);
1074        self.resolve_key(&super_version, key.as_ref(), seqno)
1075    }
1076
1077    #[expect(
1078        clippy::indexing_slicing,
1079        reason = "indices are generated from 0..n range, always in bounds"
1080    )]
1081    fn multi_get<K: AsRef<[u8]>>(
1082        &self,
1083        keys: impl IntoIterator<Item = K>,
1084        seqno: SeqNo,
1085    ) -> crate::Result<Vec<Option<crate::UserValue>>> {
1086        let keys: Vec<_> = keys.into_iter().collect();
1087        let n = keys.len();
1088        if n == 0 {
1089            return Ok(Vec::new());
1090        }
1091
1092        let super_version = self.index.get_version_for_snapshot(seqno);
1093        let comparator = self.index.config.comparator.as_ref();
1094
1095        // For small batches, use the simple per-key path
1096        if n <= 2 {
1097            return keys
1098                .iter()
1099                .map(|key| self.resolve_key(&super_version, key.as_ref(), seqno))
1100                .collect();
1101        }
1102
1103        // Phase 1: Check memtables (unsorted — defer sort+hash for SST phase)
1104        let mut internal_entries: Vec<Option<crate::value::InternalValue>> = vec![None; n];
1105        let mut remaining: Vec<usize> = Vec::with_capacity(n);
1106
1107        for idx in 0..n {
1108            let key = keys[idx].as_ref();
1109            if let Some(entry) = super_version.active_memtable.get(key, seqno) {
1110                internal_entries[idx] = Some(entry);
1111                continue;
1112            }
1113            if let Some(entry) =
1114                crate::Tree::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
1115            {
1116                internal_entries[idx] = Some(entry);
1117                continue;
1118            }
1119            remaining.push(idx);
1120        }
1121
1122        // Phase 2: Sort + hash only if memtable misses exist
1123        if !remaining.is_empty() {
1124            remaining.sort_by(|&a, &b| comparator.compare(keys[a].as_ref(), keys[b].as_ref()));
1125
1126            // Shared dedup + fan-out with `Tree::multi_get` (see those helpers):
1127            // the batched on-disk path needs strictly-sorted-unique input, and
1128            // keeping one copy of this logic is what stops the two multi-get
1129            // paths from drifting back apart.
1130            let (miss_keys, duplicates) =
1131                crate::Tree::dedup_sorted_miss_keys(&remaining, &keys, comparator);
1132
1133            crate::Tree::batch_get_from_tables(
1134                &super_version.version,
1135                &keys,
1136                miss_keys,
1137                seqno,
1138                comparator,
1139                &*self.index.config.fs,
1140                &mut internal_entries,
1141            )?;
1142
1143            crate::Tree::fan_out_duplicates(&duplicates, &mut internal_entries);
1144        }
1145
1146        // Phase 3: Resolve each entry (tombstones, RT suppression, merge, blob indirections)
1147        let mut results = vec![None; n];
1148        for idx in 0..n {
1149            if let Some(item) = internal_entries[idx].take() {
1150                if item.is_tombstone() {
1151                    continue;
1152                }
1153                if crate::Tree::is_suppressed_by_range_tombstones(
1154                    &super_version,
1155                    keys[idx].as_ref(),
1156                    item.key.seqno,
1157                    seqno,
1158                    comparator,
1159                ) {
1160                    continue;
1161                }
1162                // Merge operand resolution. Merge operands in BlobTree are stored
1163                // inline (not as blob indirection), so the pipeline result is a
1164                // plain value. Without a merge operator, return raw operand value
1165                // (same as resolve_key / resolve_pinned_entry behavior).
1166                if item.key.value_type.is_merge_operand() {
1167                    if let Some(merge_op) = &self.index.config.merge_operator {
1168                        results[idx] = crate::Tree::resolve_merge_via_pipeline(
1169                            super_version.clone(),
1170                            keys[idx].as_ref(),
1171                            seqno,
1172                            Arc::clone(merge_op),
1173                        )?;
1174                    } else {
1175                        results[idx] = Some(item.value);
1176                    }
1177                    continue;
1178                }
1179                let (_, v) = resolve_value_handle(
1180                    self.id(),
1181                    self.blobs_folder.as_path(),
1182                    &self.index.config.cache,
1183                    &super_version.version,
1184                    item,
1185                    #[cfg(zstd_any)]
1186                    self.index
1187                        .config
1188                        .kv_separation_opts
1189                        .as_ref()
1190                        .and_then(|o| o.zstd_dictionary.as_deref()),
1191                )?;
1192                results[idx] = Some(v);
1193            }
1194        }
1195
1196        Ok(results)
1197    }
1198
1199    fn merge<K: Into<UserKey>, V: Into<UserValue>>(
1200        &self,
1201        key: K,
1202        operand: V,
1203        seqno: SeqNo,
1204    ) -> (u64, u64) {
1205        self.index.merge(key, operand, seqno)
1206    }
1207
1208    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
1209        self.index.remove(key, seqno)
1210    }
1211
1212    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
1213        self.index.remove_weak(key, seqno)
1214    }
1215
1216    fn remove_range<K: Into<UserKey>>(&self, start: K, end: K, seqno: SeqNo) -> u64 {
1217        self.index.remove_range(start, end, seqno)
1218    }
1219}