Skip to main content

lsm_tree/tree/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5pub mod ingest;
6pub mod inner;
7pub mod sealed;
8
9use crate::path::Path;
10use crate::{
11    AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue,
12    ValueType,
13    compaction::{CompactionStrategy, drop_range::OwnedBounds, state::CompactionState},
14    config::Config,
15    format_version::FormatVersion,
16    fs::Fs,
17    iter_guard::{IterGuard, IterGuardImpl},
18    key::InternalKey,
19    manifest::Manifest,
20    memtable::Memtable,
21    range_tombstone::RangeTombstone,
22    scan_since::ScanSinceEvent,
23    slice::Slice,
24    table::Table,
25    value::InternalValue,
26    version::{SuperVersion, SuperVersions, Version, recovery::recover},
27    vlog::BlobFile,
28};
29use alloc::sync::Arc;
30#[cfg(not(feature = "std"))]
31use alloc::{boxed::Box, string::ToString, vec::Vec};
32use core::ops::{Bound, RangeBounds};
33use inner::{FlushGuard, TreeId, TreeInner, VersionsWriteGuard};
34// no-std: spin mirrors parking_lot's Mutex/RwLock API without an allocator.
35// parking_lot wins on the std hot path, so keep it for std.
36#[cfg(feature = "std")]
37use parking_lot::{Mutex, RwLock};
38#[cfg(not(feature = "std"))]
39use spin::{Mutex, RwLock};
40
41#[cfg(feature = "metrics")]
42use crate::metrics::Metrics;
43
44/// Floor for the storage-admission reserved headroom band (see
45/// [`Tree::compute_write_admission`]). Even with an empty active memtable the
46/// gate keeps at least this much room below the budget so the next writes and a
47/// space-reclaiming compaction have somewhere to land. 1 MiB.
48pub const MIN_RESERVED_HEADROOM: u64 = 1024 * 1024;
49
50/// How long a cached disk-free sample stays valid before the admission gate
51/// re-probes. Bounds how stale the physical free-space figure can be when the
52/// filesystem fills from another process between flushes, without issuing a
53/// `statfs`/`statvfs` syscall on every gated write. 1 second.
54const ADMISSION_DISK_FREE_TTL: core::time::Duration = core::time::Duration::from_secs(1);
55
56/// Iterator value guard
57pub struct Guard(crate::Result<(UserKey, UserValue)>);
58
59impl IterGuard for Guard {
60    fn into_inner_if(
61        self,
62        pred: impl Fn(&UserKey) -> bool,
63    ) -> crate::Result<(UserKey, Option<UserValue>)> {
64        let (k, v) = self.0?;
65
66        if pred(&k) {
67            Ok((k, Some(v)))
68        } else {
69            Ok((k, None))
70        }
71    }
72
73    fn key(self) -> crate::Result<UserKey> {
74        self.0.map(|(k, _)| k)
75    }
76
77    fn size(self) -> crate::Result<u32> {
78        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
79        self.into_inner().map(|(_, v)| v.len() as u32)
80    }
81
82    fn into_inner(self) -> crate::Result<(UserKey, UserValue)> {
83        self.0
84    }
85}
86
87/// Trait for monomorphized table point-read results.
88///
89/// Allows `find_in_tables` to operate generically over `InternalValue` (for
90/// `get`) and `(InternalValue, Block)` (for `get_pinned`), generating optimal
91/// code for each path without runtime dispatch or extra refcount overhead.
92trait TablePointLookup: Sized {
93    fn lookup(
94        table: &Table,
95        key: &[u8],
96        seqno: SeqNo,
97        key_hash: u64,
98    ) -> crate::Result<Option<Self>>;
99    fn entry_seqno(&self) -> SeqNo;
100    fn filter_tombstone(self) -> Option<Self>;
101}
102
103/// Lookup result for standard `get()` — entry only, no block retained.
104type TableEntry = InternalValue;
105
106/// One covered key in a batched run resolution: `(input index, key hash,
107/// resolved item)`. Aliased to keep `resolve_run_batched`'s return readable.
108type CoveredKey = (usize, u64, Option<InternalValue>);
109
110/// `(miss_keys, duplicates)` from [`Tree::dedup_sorted_miss_keys`]: `miss_keys`
111/// is `(key_index, bloom_hash)` for the strictly-sorted-unique batched resolver,
112/// `duplicates` is `(duplicate_index, representative_index)` for the fan-out.
113type DedupedMissKeys = (Vec<(usize, u64)>, Vec<(usize, usize)>);
114
115/// The outcome of resolving a key batch against one run (see `resolve_run_batched`).
116struct RunResolve {
117    /// Covered, non-skipped keys with their resolved item, in input order.
118    covered: Vec<CoveredKey>,
119    /// Keys this run does not cover, in input order, for the next run or level.
120    not_covered: Vec<(usize, u64)>,
121}
122
123/// One data block the chunked `multi_get` resolver will read (see
124/// `resolve_level_chunked`): the block, the SST it lives in (`table` + its `file`
125/// handle), the table-local read seqno, whether it needs the special load path
126/// (Page-ECC / columnar), and the ORIGINAL key indices that fall in this block.
127struct BlockTask<'a> {
128    table: &'a crate::Table,
129    file: Arc<dyn crate::fs::FsFile>,
130    handle: crate::table::BlockHandle,
131    table_seqno: SeqNo,
132    special: bool,
133    keys: Vec<usize>,
134}
135
136impl TablePointLookup for TableEntry {
137    fn lookup(
138        table: &Table,
139        key: &[u8],
140        seqno: SeqNo,
141        key_hash: u64,
142    ) -> crate::Result<Option<Self>> {
143        table.get(key, seqno, key_hash)
144    }
145
146    fn entry_seqno(&self) -> SeqNo {
147        self.key.seqno
148    }
149
150    fn filter_tombstone(self) -> Option<Self> {
151        ignore_tombstone_value(self)
152    }
153}
154
155/// Lookup result for `get_pinned()` — entry + block for zero-copy pinning.
156type TableEntryWithBlock = (InternalValue, crate::table::Block);
157
158impl TablePointLookup for TableEntryWithBlock {
159    fn lookup(
160        table: &Table,
161        key: &[u8],
162        seqno: SeqNo,
163        key_hash: u64,
164    ) -> crate::Result<Option<Self>> {
165        table.get_with_block(key, seqno, key_hash)
166    }
167
168    fn entry_seqno(&self) -> SeqNo {
169        self.0.key.seqno
170    }
171
172    fn filter_tombstone(self) -> Option<Self> {
173        ignore_tombstone_value(self.0).map(|iv| (iv, self.1))
174    }
175}
176
177/// Lookup result for the value-returning `get()` path: `(value_type, seqno,
178/// value)`, no key reconstruction (the caller has the needle).
179type TableValue = (ValueType, SeqNo, crate::Slice);
180
181impl TablePointLookup for TableValue {
182    fn lookup(
183        table: &Table,
184        key: &[u8],
185        seqno: SeqNo,
186        key_hash: u64,
187    ) -> crate::Result<Option<Self>> {
188        table.get_value(key, seqno, key_hash)
189    }
190
191    fn entry_seqno(&self) -> SeqNo {
192        self.1
193    }
194
195    fn filter_tombstone(self) -> Option<Self> {
196        if self.0.is_tombstone() {
197            None
198        } else {
199            Some(self)
200        }
201    }
202}
203
204fn ignore_tombstone_value(item: InternalValue) -> Option<InternalValue> {
205    if item.is_tombstone() {
206        None
207    } else {
208        Some(item)
209    }
210}
211
212/// A log-structured merge tree (LSM-tree/LSMT)
213#[derive(Clone)]
214pub struct Tree(#[doc(hidden)] pub Arc<TreeInner>);
215
216impl core::ops::Deref for Tree {
217    type Target = TreeInner;
218
219    fn deref(&self) -> &Self::Target {
220        &self.0
221    }
222}
223
224impl crate::abstract_tree::sealed::Sealed for Tree {}
225
226/// Maps a raw merge-pipeline item into a standard-tree iterator guard.
227fn standard_guard(item: crate::Result<InternalValue>) -> IterGuardImpl {
228    IterGuardImpl::Standard(Guard(item.map(|iv| (iv.key.user_key, iv.value))))
229}
230
231/// Extract owned user-key bounds from any range.
232#[expect(
233    clippy::redundant_pub_crate,
234    reason = "reached from blob_tree as crate::tree::range_to_user_bounds"
235)]
236pub(crate) fn range_to_user_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
237    range: &R,
238) -> (Bound<UserKey>, Bound<UserKey>) {
239    use core::ops::Bound::{Excluded, Included, Unbounded};
240    let lo = match range.start_bound() {
241        Included(x) => Included(x.as_ref().into()),
242        Excluded(x) => Excluded(x.as_ref().into()),
243        Unbounded => Unbounded,
244    };
245    let hi = match range.end_bound() {
246        Included(x) => Included(x.as_ref().into()),
247        Excluded(x) => Excluded(x.as_ref().into()),
248        Unbounded => Unbounded,
249    };
250    (lo, hi)
251}
252
253/// Wraps a [`SeekableTreeIter`](crate::range::SeekableTreeIter) so a standard
254/// tree can expose it as a [`SeekableGuardIter`](crate::iter_guard::SeekableGuardIter).
255struct StandardSeekable {
256    inner: crate::range::SeekableTreeIter,
257}
258
259impl Iterator for StandardSeekable {
260    type Item = IterGuardImpl;
261
262    fn next(&mut self) -> Option<Self::Item> {
263        self.inner.next().map(standard_guard)
264    }
265}
266
267impl DoubleEndedIterator for StandardSeekable {
268    fn next_back(&mut self) -> Option<Self::Item> {
269        self.inner.next_back().map(standard_guard)
270    }
271}
272
273impl crate::iter_guard::SeekableGuardIter for StandardSeekable {
274    fn seek_to(&mut self, key: &[u8]) {
275        self.inner.seek_to(key);
276    }
277
278    fn seek_to_for_prev(&mut self, key: &[u8]) {
279        self.inner.seek_to_for_prev(key);
280    }
281
282    fn peek_key(&mut self) -> Option<crate::Result<crate::UserKey>> {
283        self.inner.peek_key()
284    }
285}
286
287impl AbstractTree for Tree {
288    fn table_file_cache_size(&self) -> usize {
289        self.config
290            .descriptor_table
291            .as_ref()
292            .map_or(0, |dt| dt.len())
293    }
294
295    fn get_version_history_lock(&self) -> VersionsWriteGuard<'_> {
296        self.version_history.write()
297    }
298
299    fn next_table_id(&self) -> TableId {
300        self.0.table_id_counter.get()
301    }
302
303    fn id(&self) -> TreeId {
304        self.id
305    }
306
307    fn blob_file_count(&self) -> usize {
308        0
309    }
310
311    #[cfg(feature = "std")]
312    fn create_checkpoint(
313        &self,
314        target_path: &crate::path::Path,
315    ) -> crate::Result<crate::CheckpointInfo> {
316        crate::checkpoint::run_checkpoint(
317            self,
318            &crate::checkpoint::CheckpointParams {
319                target_root: target_path,
320                target_fs: &self.config.fs,
321                src_root: &self.config.path,
322                src_fs: &self.config.fs,
323                deletion_pause: &self.deletion_pause,
324                visible_seqno: &self.config.visible_seqno,
325                include_blobs: false,
326                runtime_config: self.0.runtime_config.load_full(),
327                encryption: self.0.config.encryption.clone(),
328            },
329        )
330    }
331
332    fn print_trace(&self, key: &[u8]) -> crate::Result<()> {
333        let super_version = self.version_history.read().latest_version();
334
335        let key = Slice::from(key);
336
337        for kv in super_version.active_memtable.range_internal((
338            Bound::Included(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)),
339            Bound::Unbounded,
340        )) {
341            log::info!("[Active] {kv:?}");
342        }
343
344        for mt in super_version.sealed_memtables.iter().rev() {
345            for kv in mt.range_internal((
346                Bound::Included(InternalKey::new(key.clone(), SeqNo::MAX, ValueType::Value)),
347                Bound::Unbounded,
348            )) {
349                log::info!("[Sealed #{}] {kv:?}", mt.id());
350            }
351        }
352
353        for table in super_version
354            .version
355            .iter_levels()
356            .flat_map(|lvl| lvl.iter())
357            .filter_map(|run| run.get_for_key_cmp(&key, self.config.comparator.as_ref()))
358        {
359            for kv in table.range(..) {
360                let kv = kv?;
361
362                if kv.key.user_key != key {
363                    break;
364                }
365
366                log::info!("[Table #{}] {kv:?}", table.id());
367            }
368        }
369
370        Ok(())
371    }
372
373    fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>> {
374        // Lock-free fast path: when reading at or beyond the latest installed
375        // version (always the case for MAX_SEQNO, and the common case), the
376        // mirrored latest SuperVersion is exactly what `get_version_for_snapshot`
377        // would return (it yields the latest iff `latest.seqno < seqno`), so
378        // load it without taking the history RwLock or cloning a deque entry.
379        // Recent inserts stay visible because they mutate the shared
380        // `active_memtable` behind a stable Arc; the back only changes on
381        // flush / compaction, which refresh this mirror under the write lock.
382        //
383        // std-only: the mirror needs `arc-swap` (not no_std). Under no-std we
384        // skip straight to the history RwLock path below.
385        #[cfg(feature = "std")]
386        {
387            let latest = self.latest_super_version.load();
388            if seqno > latest.seqno {
389                return Self::get_internal_entry_from_version(
390                    &latest,
391                    key,
392                    seqno,
393                    self.config.comparator.as_ref(),
394                );
395            }
396        }
397
398        // Historical snapshot read (seqno <= latest.seqno): consult the locked
399        // version history for the correct point-in-time SuperVersion.
400        let super_version = self.version_history.read().get_version_for_snapshot(seqno);
401
402        Self::get_internal_entry_from_version(
403            &super_version,
404            key,
405            seqno,
406            self.config.comparator.as_ref(),
407        )
408    }
409
410    fn current_version(&self) -> Version {
411        self.version_history.read().latest_version().version
412    }
413
414    fn storage_stats(&self) -> crate::Result<crate::StorageStats> {
415        // One version snapshot reused for the footprint and the full-compaction
416        // estimate below: a second `current_version()` could race a concurrent
417        // flush / compaction and mix two snapshots.
418        let version = self.current_version();
419        // Standard tree: SST values ARE user values (no KV separation).
420        let mut stats =
421            crate::storage_stats::compute_storage_stats(&version, self.is_compacting(), true)?;
422        // Fill the disk-aware capacity figures (quota + free-space probe) the
423        // version-only computation can't know.
424        let (capacity, available, compaction_possible) = self.admission_capacity(stats.used_bytes);
425        stats.capacity_bytes = capacity;
426        stats.available_bytes = available;
427        stats.compaction_possible = compaction_possible;
428        // When admission gating is active and a compaction is not already
429        // running, surface whether a full compaction has working room through the
430        // SAME two-layer check the compaction space gate enforces (logical quota +
431        // physical free per destination volume), so the reported status matches
432        // what the gate will admit. With gating off the gate never runs, so the
433        // status stays `Healthy` even though the backend can report a finite
434        // capacity.
435        if self.storage_admission_enabled()
436            && capacity.is_some()
437            && stats.status == crate::StorageStatus::Healthy
438        {
439            // A full compaction's transient output is bounded by the largest
440            // level's on-disk size, but it LANDS in the last configured level's
441            // volume (`level_count - 1`), which under tiered routing can be a
442            // different filesystem than the largest level. A standard tree has no
443            // blob relocation. Using the per-volume gate (not `available >=
444            // full_compaction_bytes` against the min-volume free) keeps the status
445            // from reporting tight when a routed merge would actually be admitted.
446            let sst_need = crate::storage_stats::full_compaction_demand_bytes(&version);
447            // `saturating_sub`: `level_count >= 1` always, so this is the last
448            // level index; the clamp only guards a degenerate zero-level config.
449            let sst_dest_level = self.0.config.level_count.saturating_sub(1);
450            let quota_headroom = self.quota_headroom(stats.used_bytes);
451            let full_fits = crate::compaction::worker::space_fits_two_layer(
452                &self.0.config,
453                quota_headroom,
454                sst_need,
455                sst_dest_level,
456                0,
457            );
458            stats.status = if full_fits {
459                crate::StorageStatus::FullCompactionAvailable
460            } else {
461                crate::StorageStatus::TightCompactionAvailable
462            };
463        }
464        // A closed admission gate is the operator-actionable state, so it takes
465        // precedence over the others (a read-only tree may well be compacting to
466        // reclaim space).
467        if self.is_read_only() {
468            stats.status = crate::StorageStatus::ReadOnlyOutOfSpace;
469        }
470        Ok(stats)
471    }
472
473    fn write_admission(&self) -> crate::Result<()> {
474        self.compute_write_admission()
475    }
476
477    fn write_backpressure(
478        &self,
479        strategy: &dyn crate::compaction::CompactionStrategy,
480    ) -> crate::Backpressure {
481        // Copy the thresholds out (BackpressureThresholds is Copy) so the
482        // arc-swap guard drops immediately; the off check short-circuits before
483        // touching the version, keeping the disabled path free.
484        let thresholds = self.0.runtime_config.load().backpressure;
485        if thresholds.is_off() {
486            return crate::Backpressure::None;
487        }
488        let version = self.current_version();
489        // L0 is the first level; its table (file) count is the count-trigger
490        // signal, matching the leveled `choose` trigger and the L0 term of
491        // `pending_compaction_bytes`.
492        let l0_count = version
493            .iter_levels()
494            .next()
495            .map_or(0, |level| level.table_count());
496        let pending = strategy.pending_compaction_bytes(&version);
497        crate::Backpressure::compute(l0_count, pending, &thresholds)
498    }
499
500    fn get_flush_lock(&self) -> FlushGuard<'_> {
501        self.flush_lock.lock()
502    }
503
504    #[cfg(feature = "metrics")]
505    fn metrics(&self) -> &Arc<crate::Metrics> {
506        &self.0.metrics
507    }
508
509    #[cfg(feature = "metrics")]
510    fn cache_stats(&self) -> crate::CacheStats {
511        let cache = &self.0.config.cache;
512        self.metrics().cache_stats(cache.size(), cache.capacity())
513    }
514
515    fn version_free_list_len(&self) -> usize {
516        self.version_history.read().free_list_len()
517    }
518
519    fn prefix<K: AsRef<[u8]>>(
520        &self,
521        prefix: K,
522        seqno: SeqNo,
523        index: Option<(Arc<Memtable>, SeqNo)>,
524    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
525        Box::new(
526            self.create_prefix(&prefix, seqno, index)
527                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
528        )
529    }
530
531    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
532        &self,
533        range: R,
534        seqno: SeqNo,
535        index: Option<(Arc<Memtable>, SeqNo)>,
536    ) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
537        Box::new(
538            self.create_range(&range, seqno, index)
539                .map(|kv| IterGuardImpl::Standard(Guard(kv))),
540        )
541    }
542
543    fn range_seekable<K: AsRef<[u8]>, R: RangeBounds<K>>(
544        &self,
545        range: R,
546        seqno: SeqNo,
547        index: Option<(Arc<Memtable>, SeqNo)>,
548    ) -> Box<dyn crate::iter_guard::SeekableGuardIter + 'static> {
549        let (lo, hi) = range_to_user_bounds(&range);
550        let inner = self.create_seekable_range_bounds(lo, hi, seqno, index);
551        Box::new(StandardSeekable { inner })
552    }
553
554    fn batch_range_scan<K: AsRef<[u8]>, R: RangeBounds<K> + 'static, I: IntoIterator<Item = R>>(
555        &self,
556        intervals: I,
557        seqno: SeqNo,
558        index: Option<(Arc<Memtable>, SeqNo)>,
559    ) -> Box<dyn Iterator<Item = IterGuardImpl> + Send + 'static>
560    where
561        I::IntoIter: Send + 'static,
562    {
563        // Open the seekable iterator over the whole keyspace once; each interval
564        // is served by repositioning it (single per-SST setup, amortized).
565        let inner =
566            self.create_seekable_range_bounds(Bound::Unbounded, Bound::Unbounded, seqno, index);
567        let intervals = intervals.into_iter().map(|r| range_to_user_bounds(&r));
568        Box::new(crate::range::BatchRangeScan::new(inner, intervals).map(standard_guard))
569    }
570
571    /// Returns the number of tombstones in the tree.
572    fn tombstone_count(&self) -> u64 {
573        self.current_version()
574            .iter_tables()
575            .map(Table::tombstone_count)
576            .sum()
577    }
578
579    /// Returns the number of weak tombstones (single deletes) in the tree.
580    fn weak_tombstone_count(&self) -> u64 {
581        self.current_version()
582            .iter_tables()
583            .map(Table::weak_tombstone_count)
584            .sum()
585    }
586
587    /// Returns the number of value entries that become reclaimable once weak tombstones can be GC'd.
588    fn weak_tombstone_reclaimable_count(&self) -> u64 {
589        self.current_version()
590            .iter_tables()
591            .map(Table::weak_tombstone_reclaimable)
592            .sum()
593    }
594
595    fn drop_range<K: AsRef<[u8]>, R: RangeBounds<K>>(&self, range: R) -> crate::Result<()> {
596        let (bounds, is_empty) = Self::range_bounds_to_owned_bounds(&range);
597
598        if is_empty {
599            return Ok(());
600        }
601
602        let strategy = Arc::new(crate::compaction::drop_range::Strategy::new(bounds));
603
604        // IMPORTANT: Write lock so we can be the only compaction going on
605        let _lock = self.0.major_compaction_lock.write();
606
607        log::info!("Starting drop_range compaction");
608        self.inner_compact(strategy, 0)?;
609        Ok(())
610    }
611
612    fn clear(&self) -> crate::Result<()> {
613        let config = self.tree_config();
614        let mut versions = self.get_version_history_lock();
615
616        // Pre-clear snapshot: every table + blob file it references becomes
617        // garbage the moment the new empty version is installed.
618        let prior = versions.latest_version();
619
620        versions.upgrade_version(
621            &config.path,
622            |v| {
623                let mut copy = v.clone();
624                copy.active_memtable = Arc::new(Memtable::new(
625                    self.memtable_id_counter.next(),
626                    self.config.comparator.clone(),
627                ));
628                copy.sealed_memtables = Arc::default();
629                copy.version = Version::new(v.version.id() + 1, self.tree_type());
630                Ok(copy)
631            },
632            &config.seqno,
633            &config.visible_seqno,
634            &*config.fs,
635            self.0.runtime_config.load_full(),
636            self.0.config.encryption.clone(),
637        )?;
638
639        // Release the history's hold on the now-obsolete versions; only the new
640        // empty version remains. `prior` still holds them, so nothing reaches
641        // refcount zero yet.
642        versions.drain_obsolete_to_latest();
643        drop(versions); // release the version-history lock before any fs work
644
645        // Mark every obsolete table / blob file deleted so the file is
646        // reclaimed (Inner::Drop) once its last reference is released. A
647        // concurrent reader still holding the pre-clear snapshot keeps its own
648        // clone alive, deferring physical deletion until it finishes — the
649        // version-history Arc refcount is the MVCC guard, so reclaim never
650        // races a live read. Tables with no other live reference are reclaimed
651        // as `prior` drops at the end of this call.
652        for table in prior.version.iter_tables() {
653            table.mark_as_deleted();
654        }
655        for blob_file in prior.version.blob_files.iter() {
656            blob_file.mark_as_deleted();
657        }
658
659        Ok(())
660    }
661
662    #[doc(hidden)]
663    fn major_compact(
664        &self,
665        target_size: u64,
666        seqno_threshold: SeqNo,
667    ) -> crate::Result<crate::compaction::CompactionResult> {
668        let strategy = Arc::new(crate::compaction::major::Strategy::new(target_size));
669
670        // IMPORTANT: Write lock so we can be the only compaction going on
671        let _lock = self.0.major_compaction_lock.write();
672
673        log::info!("Starting major compaction");
674        self.inner_compact(strategy, seqno_threshold)
675    }
676
677    fn l0_run_count(&self) -> usize {
678        self.current_version()
679            .level(0)
680            .map(|x| x.run_count())
681            .unwrap_or_default()
682    }
683
684    fn size_of<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<u32>> {
685        #[expect(clippy::cast_possible_truncation, reason = "values are u32 length max")]
686        Ok(self.get(key, seqno)?.map(|x| x.len() as u32))
687    }
688
689    fn filter_size(&self) -> u64 {
690        self.current_version()
691            .iter_tables()
692            .map(Table::filter_size)
693            .map(u64::from)
694            .sum()
695    }
696
697    fn pinned_filter_size(&self) -> usize {
698        self.current_version()
699            .iter_tables()
700            .map(Table::pinned_filter_size)
701            .sum()
702    }
703
704    fn pinned_block_index_size(&self) -> usize {
705        self.current_version()
706            .iter_tables()
707            .map(Table::pinned_block_index_size)
708            .sum()
709    }
710
711    fn sealed_memtable_count(&self) -> usize {
712        self.version_history
713            .read()
714            .latest_version()
715            .sealed_memtables
716            .len()
717    }
718
719    fn flush_to_tables_with_rt(
720        &self,
721        stream: impl Iterator<Item = crate::Result<InternalValue>>,
722        range_tombstones: Vec<crate::range_tombstone::RangeTombstone>,
723    ) -> crate::Result<Option<(Vec<Table>, Option<Vec<BlobFile>>)>> {
724        use crate::table::multi_writer::MultiWriter;
725        use crate::time::Instant;
726
727        let start = Instant::now();
728
729        let (folder, level_fs) = self.config.tables_folder_for_level(0);
730
731        let data_block_size = self.config.data_block_size_policy.get(0);
732
733        let data_block_restart_interval = self.config.data_block_restart_interval_policy.get(0);
734        let index_block_restart_interval = self.config.index_block_restart_interval_policy.get(0);
735
736        let data_block_compression = self.config.data_block_compression_policy.get(0);
737        let index_block_compression = self.config.index_block_compression_policy.get(0);
738
739        let data_block_hash_ratio = self.config.data_block_hash_ratio_policy.get(0);
740
741        let index_partitioning = self.config.index_block_partitioning_policy.get(0);
742        let filter_partitioning = self.config.filter_block_partitioning_policy.get(0);
743
744        // One runtime-config snapshot for the whole flush writer setup. The
745        // index spill threshold, `seqno_in_index`, and the per-KV checksum
746        // policy are all live (toggleable via `update_runtime_config`); reading
747        // `load_full()` per field could straddle a concurrent update and mix two
748        // snapshots into one SST. Compaction is the migration mechanism, so a
749        // toggle takes effect on the next flush / compaction.
750        let rc = self.0.runtime_config.load_full();
751
752        log::debug!(
753            "Flushing memtable(s) to {}, data_block_restart_interval={data_block_restart_interval}, index_block_restart_interval={index_block_restart_interval}, data_block_size={data_block_size}, data_block_compression={data_block_compression:?}, index_block_compression={index_block_compression:?}",
754            folder.display(),
755        );
756
757        let mut table_writer = MultiWriter::new(
758            folder.clone(),
759            self.table_id_counter.clone(),
760            64 * 1_024 * 1_024,
761            0,
762            level_fs.clone(),
763        )?
764        .set_comparator(self.config.comparator.clone())
765        .use_data_block_restart_interval(data_block_restart_interval)
766        .use_index_block_restart_interval(index_block_restart_interval)
767        .use_data_block_compression(data_block_compression)
768        .use_index_block_compression(index_block_compression)
769        .use_data_block_size(data_block_size)
770        .use_data_block_hash_ratio(data_block_hash_ratio)
771        .use_bloom_policy({
772            use crate::config::FilterPolicyEntry::{Bloom, None};
773            use crate::table::filter::BloomConstructionPolicy;
774
775            match self.config.filter_policy.get(0) {
776                Bloom(policy) => policy,
777                None => BloomConstructionPolicy::BitsPerKey(0.0),
778            }
779        });
780
781        if index_partitioning {
782            // Size-adaptive: single-level index for small SSTs (where pinning
783            // the whole index is cheap and a two-level lookup is pure overhead),
784            // spilling to a partitioned index only once the index grows past the
785            // threshold. Recovers the point-read cost of an unconditional
786            // two-level index on small/medium SSTs.
787            table_writer = table_writer.use_adaptive_index(rc.index_partition_spill_threshold);
788        }
789        if filter_partitioning {
790            table_writer = table_writer.use_partitioned_filter();
791        }
792
793        table_writer = table_writer.use_prefix_extractor(self.config.prefix_extractor.clone());
794        table_writer = table_writer.use_encryption(self.config.encryption.clone());
795        // ECC scheme from the live runtime snapshot (same as `seqno_in_index`
796        // / `kv_checksums` below), so a flush after a scheme change writes the
797        // SST with the current scheme rather than the startup one.
798        table_writer = table_writer.use_page_ecc(self.config.page_ecc, rc.ecc_scheme);
799        table_writer = table_writer.use_sync_mode(self.config.sync_mode);
800
801        table_writer = table_writer.use_seqno_in_index(rc.seqno_in_index);
802        table_writer = table_writer.use_zone_map(rc.zone_map);
803        table_writer = table_writer.use_columnar(rc.columnar);
804        table_writer = table_writer.use_disable_cow_on_sst(rc.disable_cow_on_sst_files);
805        // `Off` (default) emits no per-KV footer and leaves the data-block
806        // payload encoding unchanged (the V5 header carries a block_flags byte
807        // and the meta block a descriptor key regardless, so the on-disk bytes
808        // are not identical to a pre-V5 table).
809        table_writer = table_writer.use_kv_checksums(rc.kv_checksums, rc.kv_checksum_algo);
810        // Flush writes level 0; resolve that level's locator policy entry.
811        table_writer = table_writer.use_locator(self.config.locator_policy.get(0));
812
813        #[cfg(zstd_any)]
814        {
815            table_writer = table_writer.use_zstd_dictionary(self.config.zstd_dictionary.clone());
816        }
817
818        // Set range tombstones BEFORE writing KV items so that if MultiWriter
819        // rotates to a new table during the write loop, earlier tables already
820        // carry the RT metadata.
821        table_writer.set_range_tombstones(range_tombstones);
822
823        for item in stream {
824            table_writer.write(item?)?;
825        }
826
827        let result = table_writer.finish()?;
828
829        log::debug!("Flushed memtable(s) in {:?}", start.elapsed());
830
831        let pin_filter = self.config.filter_block_pinning_policy.get(0);
832        let pin_index = self.config.index_block_pinning_policy.get(0);
833
834        // Load tables
835        let tables = result
836            .into_iter()
837            .map(|(table_id, checksum)| -> crate::Result<Table> {
838                Table::recover(
839                    folder.join(table_id.to_string()),
840                    checksum,
841                    0,
842                    self.id,
843                    table_id,
844                    self.config.cache.clone(),
845                    self.config.descriptor_table.clone(),
846                    level_fs.clone(),
847                    pin_filter,
848                    pin_index,
849                    self.config.encryption.clone(),
850                    #[cfg(zstd_any)]
851                    self.config.zstd_dictionary.clone(),
852                    self.config.comparator.clone(),
853                    #[cfg(feature = "metrics")]
854                    self.metrics.clone(),
855                )
856            })
857            .collect::<crate::Result<Vec<_>>>()?;
858
859        // Return Some even when tables is empty (RT-only flush): the caller
860        // (AbstractTree::flush) handles empty tables by re-inserting RTs into
861        // the active memtable and still needs to delete sealed memtables.
862        Ok(Some((tables, None)))
863    }
864
865    #[expect(clippy::significant_drop_tightening)]
866    fn register_tables(
867        &self,
868        tables: &[Table],
869        blob_files: Option<&[BlobFile]>,
870        frag_map: Option<crate::blob_tree::FragmentationMap>,
871        sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
872        gc_watermark: SeqNo,
873    ) -> crate::Result<()> {
874        log::trace!(
875            "Registering {} tables, {} blob files",
876            tables.len(),
877            blob_files.map(<[BlobFile]>::len).unwrap_or_default(),
878        );
879
880        // Wire the tree-wide deletion pause into every fresh table / blob
881        // file so an in-flight checkpoint defers their cleanup if they
882        // later get marked `is_deleted` by compaction.
883        for table in tables {
884            table.install_deletion_pause(Arc::clone(&self.deletion_pause));
885            #[cfg(feature = "std")]
886            table.install_background_deleter(Arc::clone(&self.background_deleter));
887            table.install_heal_hints(Arc::clone(&self.heal_hints));
888        }
889        if let Some(bfs) = blob_files {
890            for bf in bfs {
891                bf.install_deletion_pause(Arc::clone(&self.deletion_pause));
892                #[cfg(feature = "std")]
893                bf.install_background_deleter(Arc::clone(&self.background_deleter));
894            }
895        }
896
897        let mut _compaction_state = self.compaction_state.lock();
898        let mut version_lock = self.version_history.write();
899
900        version_lock.upgrade_version(
901            &self.config.path,
902            |current| {
903                let mut copy = current.clone();
904
905                let ctx = crate::version::TransformContext::new(self.config.comparator.as_ref());
906                copy.version = copy.version.with_new_l0_run(
907                    tables,
908                    blob_files,
909                    frag_map.filter(|x| !x.is_empty()),
910                    &ctx,
911                );
912
913                for &table_id in sealed_memtables_to_delete {
914                    log::trace!("releasing sealed memtable #{table_id}");
915                    copy.sealed_memtables = Arc::new(copy.sealed_memtables.remove(table_id));
916                }
917
918                Ok(copy)
919            },
920            &self.config.seqno,
921            &self.config.visible_seqno,
922            &*self.config.fs,
923            self.0.runtime_config.load_full(),
924            self.0.config.encryption.clone(),
925        )?;
926
927        if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark, &*self.config.fs)
928        {
929            log::warn!("Version GC failed: {e:?}");
930        }
931
932        Ok(())
933    }
934
935    fn clear_active_memtable(&self) {
936        use crate::tree::sealed::SealedMemtables;
937
938        let mut version_history_lock = self.version_history.write();
939        let super_version = version_history_lock.latest_version();
940
941        if super_version.active_memtable.is_empty() {
942            return;
943        }
944
945        let mut copy = version_history_lock.latest_version();
946        copy.active_memtable = Arc::new(Memtable::new(
947            self.memtable_id_counter.next(),
948            self.config.comparator.clone(),
949        ));
950        copy.sealed_memtables = Arc::new(SealedMemtables::default());
951
952        // Rotate does not modify the memtable, so it cannot break snapshots
953        copy.seqno = super_version.seqno;
954
955        version_history_lock.replace_latest_version(copy);
956
957        log::trace!("cleared active memtable");
958    }
959
960    fn compact(
961        &self,
962        strategy: Arc<dyn CompactionStrategy>,
963        seqno_threshold: SeqNo,
964    ) -> crate::Result<crate::compaction::CompactionResult> {
965        // NOTE: Read lock major compaction lock
966        // That way, if a major compaction is running, we cannot proceed
967        // But in general, parallel (non-major) compactions can occur
968        let _lock = self.0.major_compaction_lock.read();
969
970        self.inner_compact(strategy, seqno_threshold)
971    }
972
973    fn get_next_table_id(&self) -> TableId {
974        self.0.get_next_table_id()
975    }
976
977    fn tree_config(&self) -> &Config {
978        &self.config
979    }
980
981    fn active_memtable(&self) -> Arc<Memtable> {
982        self.version_history.read().latest_version().active_memtable
983    }
984
985    #[expect(clippy::significant_drop_tightening)]
986    fn rotate_memtable(&self) -> Option<Arc<Memtable>> {
987        let mut version_history_lock = self.version_history.write();
988        let super_version = version_history_lock.latest_version();
989
990        if super_version.active_memtable.is_empty() {
991            return None;
992        }
993
994        let yanked_memtable = super_version.active_memtable;
995
996        let mut copy = version_history_lock.latest_version();
997        copy.active_memtable = Arc::new(Memtable::new(
998            self.memtable_id_counter.next(),
999            self.config.comparator.clone(),
1000        ));
1001        copy.sealed_memtables =
1002            Arc::new(super_version.sealed_memtables.add(yanked_memtable.clone()));
1003
1004        // Rotate does not modify the memtable so it cannot break snapshots
1005        copy.seqno = super_version.seqno;
1006
1007        version_history_lock.replace_latest_version(copy);
1008
1009        log::trace!(
1010            "rotate: added memtable id={} to sealed memtables",
1011            yanked_memtable.id,
1012        );
1013
1014        Some(yanked_memtable)
1015    }
1016
1017    fn table_count(&self) -> usize {
1018        self.current_version().table_count()
1019    }
1020
1021    fn level_table_count(&self, idx: usize) -> Option<usize> {
1022        self.current_version().level(idx).map(|x| x.table_count())
1023    }
1024
1025    fn approximate_len(&self) -> usize {
1026        let super_version = self.version_history.read().latest_version();
1027
1028        let tables_item_count = self
1029            .current_version()
1030            .iter_tables()
1031            .map(|x| x.metadata.item_count)
1032            .sum::<u64>();
1033
1034        let memtable_count = super_version.active_memtable.len() as u64;
1035        let sealed_count = super_version
1036            .sealed_memtables
1037            .iter()
1038            .map(|mt| mt.len())
1039            .sum::<usize>() as u64;
1040
1041        #[expect(clippy::expect_used, reason = "result should fit into usize")]
1042        (memtable_count + sealed_count + tables_item_count)
1043            .try_into()
1044            .expect("approximate_len too large for usize")
1045    }
1046
1047    fn disk_space(&self) -> u64 {
1048        self.current_version()
1049            .iter_levels()
1050            .map(super::version::Level::size)
1051            .sum()
1052    }
1053
1054    fn approximate_range_stats<K: AsRef<[u8]>, R: core::ops::RangeBounds<K>>(
1055        &self,
1056        range: R,
1057        seqno: SeqNo,
1058    ) -> crate::Result<crate::ApproximateRangeStats> {
1059        use crate::table::block_index::BlockIndex;
1060        use core::ops::Bound;
1061
1062        let lo: Bound<&[u8]> = match range.start_bound() {
1063            Bound::Included(k) => Bound::Included(k.as_ref()),
1064            Bound::Excluded(k) => Bound::Excluded(k.as_ref()),
1065            Bound::Unbounded => Bound::Unbounded,
1066        };
1067        let hi: Bound<&[u8]> = match range.end_bound() {
1068            Bound::Included(k) => Bound::Included(k.as_ref()),
1069            Bound::Excluded(k) => Bound::Excluded(k.as_ref()),
1070            Bound::Unbounded => Bound::Unbounded,
1071        };
1072        let bounds = (lo, hi);
1073
1074        let mut bytes: u64 = 0;
1075        let mut key_count: u64 = 0;
1076
1077        // Use ONE snapshot at the requested seqno for both the SST and memtable
1078        // contributions, so the estimate reflects the same visibility as a read
1079        // at `seqno` (no entries newer than the snapshot, and a consistent set of
1080        // tables + memtables even during a concurrent flush / compaction).
1081        let comparator = self.config.comparator.as_ref();
1082        let super_version = self.version_history.read().get_version_for_snapshot(seqno);
1083
1084        // SST contribution: interpolate data-block offsets at the boundaries
1085        // (block granularity), no data-block reads. For a KV-separated SST the
1086        // referenced blob bytes are apportioned by the same in-range fraction.
1087        for table in super_version.version.iter_tables() {
1088            // Comparator-aware overlap: a custom user comparator orders keys
1089            // differently from raw bytes, so use the same comparison the read
1090            // path does instead of default byte ordering.
1091            if !table
1092                .metadata
1093                .key_range
1094                .overlaps_with_bounds_cmp(&bounds, comparator)
1095            {
1096                continue;
1097            }
1098            // The block index is keyed by the table-LOCAL seqno; a bulk-ingested
1099            // table carries a non-zero global seqno, so translate the snapshot
1100            // seqno the same way the read path does before seeking it. A snapshot
1101            // below the table's base means the table postdates it and contributes
1102            // nothing to the estimate, so skip it (`checked_sub` yields `None`).
1103            let Some(table_seqno) = seqno.checked_sub(table.global_seqno()) else {
1104                continue;
1105            };
1106
1107            // data_end = the data section's byte extent = last data block's end.
1108            let Some(last) = table.block_index.iter().next_back() else {
1109                continue;
1110            };
1111            let last = last?;
1112            let data_end = *last.offset() + u64::from(last.size());
1113            if data_end == 0 {
1114                continue;
1115            }
1116
1117            // The data block that would contain `key`, as (start, end) byte
1118            // offsets, or `None` when `key` is past the last block. The full
1119            // extent is returned so the lower bound counts from the block start
1120            // and the upper bound INCLUDES it (a range inside a single block must
1121            // not collapse to zero bytes).
1122            let block_span = |key: &[u8]| -> crate::Result<Option<(u64, u64)>> {
1123                let Some(mut iter) = table.block_index.forward_reader(key, table_seqno) else {
1124                    return Ok(None);
1125                };
1126                let Some(handle) = iter.next() else {
1127                    return Ok(None);
1128                };
1129                let h = handle?;
1130                let start = *h.offset();
1131                Ok(Some((start, (start + u64::from(h.size())).min(data_end))))
1132            };
1133            let off_lo = match lo {
1134                Bound::Included(k) | Bound::Excluded(k) => {
1135                    block_span(k)?.map_or(data_end, |(start, _)| start)
1136                }
1137                Bound::Unbounded => 0,
1138            };
1139            // Tight-space restriction: a restricted table view serves only keys
1140            // at or above its lower bound, with the punched-out prefix served by
1141            // the replacement table. Raise the lower offset to that bound so the
1142            // prefix is not double-counted (matching how scans skip it).
1143            let off_lo = match table.restrict_lower_bound() {
1144                Some(rb) => {
1145                    off_lo.max(block_span(rb.as_ref())?.map_or(data_end, |(start, _)| start))
1146                }
1147                None => off_lo,
1148            };
1149            let off_hi = match hi {
1150                Bound::Included(k) | Bound::Excluded(k) => {
1151                    block_span(k)?.map_or(data_end, |(_, end)| end)
1152                }
1153                Bound::Unbounded => data_end,
1154            };
1155            let idx_bytes = off_hi.saturating_sub(off_lo);
1156            if idx_bytes == 0 {
1157                continue;
1158            }
1159
1160            // fraction = idx_bytes / data_end, in u128 to avoid overflow. For a
1161            // standard tree `idx_bytes` already includes the inline values. For a
1162            // KV-separated SST it covers only the key + pointer bytes, so the
1163            // SST's referenced blob bytes (recorded per-SST at both flush and
1164            // compaction) are apportioned by the same in-range fraction; blob
1165            // files are not key-indexed, so this fraction is the finest estimate
1166            // possible without reading data blocks.
1167            let num = u128::from(idx_bytes);
1168            let den = u128::from(data_end);
1169            let blob_bytes = table.referenced_blob_bytes()?;
1170            let sst_blob = u64::try_from(u128::from(blob_bytes) * num / den).unwrap_or(u64::MAX);
1171            // Round up to at least one entry: a non-empty byte span over a
1172            // non-empty SST always covers at least one row, so a narrow range
1173            // never reports bytes with a zero key count.
1174            let in_range_entries = u64::try_from(u128::from(table.metadata.item_count) * num / den)
1175                .unwrap_or(u64::MAX)
1176                .max(1);
1177            bytes = bytes.saturating_add(idx_bytes).saturating_add(sst_blob);
1178            key_count = key_count.saturating_add(in_range_entries);
1179        }
1180
1181        // Memtable contribution: the in-range fraction of each memtable's
1182        // approximate size. Built from the SAME snapshot and the SAME
1183        // `range_internal` + internal-key bounds the read path uses (range.rs),
1184        // so the counted slice matches what a read at `seqno` would traverse.
1185        let mt_range = (
1186            match lo {
1187                Bound::Included(k) => {
1188                    Bound::Included(InternalKey::new(k, SeqNo::MAX, crate::ValueType::Tombstone))
1189                }
1190                Bound::Excluded(k) => {
1191                    Bound::Excluded(InternalKey::new(k, 0, crate::ValueType::Tombstone))
1192                }
1193                Bound::Unbounded => Bound::Unbounded,
1194            },
1195            match hi {
1196                Bound::Included(k) => {
1197                    Bound::Included(InternalKey::new(k, 0, crate::ValueType::Value))
1198                }
1199                Bound::Excluded(k) => {
1200                    Bound::Excluded(InternalKey::new(k, SeqNo::MAX, crate::ValueType::Value))
1201                }
1202                Bound::Unbounded => Bound::Unbounded,
1203            },
1204        );
1205        let estimate = |mt: &crate::Memtable| -> (u64, u64) {
1206            let total = mt.len() as u64;
1207            if total == 0 {
1208                return (0, 0);
1209            }
1210            // Count only entries visible at the snapshot (the same seqno cutoff
1211            // reads apply), so the estimate excludes writes newer than `seqno`.
1212            let count = mt
1213                .range_internal(mt_range.clone())
1214                .filter(|kv| kv.key.seqno < seqno)
1215                .count() as u64;
1216            if count == 0 {
1217                return (0, 0);
1218            }
1219            let mt_bytes =
1220                u64::try_from(u128::from(mt.size()) * u128::from(count) / u128::from(total))
1221                    .unwrap_or(u64::MAX);
1222            (mt_bytes, count)
1223        };
1224        let (b, c) = estimate(&super_version.active_memtable);
1225        bytes = bytes.saturating_add(b);
1226        key_count = key_count.saturating_add(c);
1227        for mt in super_version.sealed_memtables.iter() {
1228            let (b, c) = estimate(mt);
1229            bytes = bytes.saturating_add(b);
1230            key_count = key_count.saturating_add(c);
1231        }
1232
1233        Ok(crate::ApproximateRangeStats { bytes, key_count })
1234    }
1235
1236    fn approximate_range_cardinality<K: AsRef<[u8]>, R: core::ops::RangeBounds<K>>(
1237        &self,
1238        range: R,
1239        seqno: SeqNo,
1240    ) -> crate::Result<crate::RangeCardinality> {
1241        use crate::table::block_index::BlockIndex;
1242        use core::cmp::Ordering;
1243        use core::ops::Bound;
1244
1245        let lo: Bound<&[u8]> = match range.start_bound() {
1246            Bound::Included(k) => Bound::Included(k.as_ref()),
1247            Bound::Excluded(k) => Bound::Excluded(k.as_ref()),
1248            Bound::Unbounded => Bound::Unbounded,
1249        };
1250        let hi: Bound<&[u8]> = match range.end_bound() {
1251            Bound::Included(k) => Bound::Included(k.as_ref()),
1252            Bound::Excluded(k) => Bound::Excluded(k.as_ref()),
1253            Bound::Unbounded => Bound::Unbounded,
1254        };
1255        let bounds = (lo, hi);
1256        let comparator = self.config.comparator.as_ref();
1257        let super_version = self.version_history.read().get_version_for_snapshot(seqno);
1258
1259        let mut rows: u64 = 0;
1260        let mut total_rows: u64 = 0;
1261
1262        for table in super_version.version.iter_tables() {
1263            total_rows = total_rows.saturating_add(table.metadata.item_count);
1264            if !table
1265                .metadata
1266                .key_range
1267                .overlaps_with_bounds_cmp(&bounds, comparator)
1268            {
1269                continue;
1270            }
1271            // A snapshot below the table's base means the table postdates it and
1272            // contributes nothing here, so skip it (`checked_sub` yields `None`).
1273            let Some(table_seqno) = seqno.checked_sub(table.global_seqno()) else {
1274                continue;
1275            };
1276            // Honor a tight-space restricted view: keys below
1277            // `restrict_lower_bound()` are the punched-out prefix served by the
1278            // replacement table, so raise this table's effective lower bound to
1279            // it (mirrors approximate_range_stats) and never charge that prefix.
1280            let eff_lo = effective_lower_bound(
1281                lo,
1282                table.restrict_lower_bound().map(AsRef::as_ref),
1283                comparator,
1284            );
1285            let zone_map = &table.zone_map;
1286            if !zone_map.is_empty() {
1287                // Zone map present: sum the per-block row counts of blocks whose
1288                // key range overlaps the query. A block is past the range once its
1289                // minimum key is above the upper bound; the boundary block at the
1290                // effective lower bound is counted in full (block granularity). A
1291                // range that lands in a key-space gap legitimately yields zero, so
1292                // this path is authoritative and never falls back to the byte fraction.
1293                let reader = match eff_lo {
1294                    Bound::Included(k) | Bound::Excluded(k) => {
1295                        table.block_index.forward_reader(k, table_seqno)
1296                    }
1297                    Bound::Unbounded => Some(table.block_index.iter()),
1298                };
1299                if let Some(reader) = reader {
1300                    for handle in reader {
1301                        let handle = handle?;
1302                        let Some(col) = zone_map
1303                            .columns_for(*handle.offset())
1304                            .and_then(|c| c.first())
1305                        else {
1306                            continue;
1307                        };
1308                        let above_hi = match hi {
1309                            Bound::Included(hk) => {
1310                                comparator.compare(&col.min, hk) == Ordering::Greater
1311                            }
1312                            Bound::Excluded(hk) => {
1313                                comparator.compare(&col.min, hk) != Ordering::Less
1314                            }
1315                            Bound::Unbounded => false,
1316                        };
1317                        if above_hi {
1318                            break;
1319                        }
1320                        rows = rows.saturating_add(u64::from(col.row_count));
1321                    }
1322                }
1323            } else if let Some(last) = table.block_index.iter().next_back() {
1324                // No zone map: apportion item_count by the in-range
1325                // data-block byte fraction, mirroring approximate_range_stats.
1326                let last = last?;
1327                let data_end = *last.offset() + u64::from(last.size());
1328                if data_end > 0 {
1329                    let off = |key: &[u8], end: bool| -> crate::Result<u64> {
1330                        match table.block_index.forward_reader(key, table_seqno) {
1331                            Some(mut it) => match it.next() {
1332                                Some(h) => {
1333                                    let h = h?;
1334                                    Ok(if end {
1335                                        (*h.offset() + u64::from(h.size())).min(data_end)
1336                                    } else {
1337                                        *h.offset()
1338                                    })
1339                                }
1340                                None => Ok(data_end),
1341                            },
1342                            None => Ok(data_end),
1343                        }
1344                    };
1345                    let off_lo = match eff_lo {
1346                        Bound::Included(k) | Bound::Excluded(k) => off(k, false)?,
1347                        Bound::Unbounded => 0,
1348                    };
1349                    let off_hi = match hi {
1350                        Bound::Included(k) | Bound::Excluded(k) => off(k, true)?,
1351                        Bound::Unbounded => data_end,
1352                    };
1353                    let idx_bytes = off_hi.saturating_sub(off_lo);
1354                    if idx_bytes > 0 {
1355                        let est = u64::try_from(
1356                            u128::from(table.metadata.item_count) * u128::from(idx_bytes)
1357                                / u128::from(data_end),
1358                        )
1359                        .unwrap_or(u64::MAX)
1360                        .max(1);
1361                        rows = rows.saturating_add(est);
1362                    }
1363                }
1364            }
1365        }
1366
1367        // Memtables: count the in-range, snapshot-visible entries and add them to
1368        // both the matched rows and the total (matching the SST accounting).
1369        let mt_range = (
1370            match lo {
1371                Bound::Included(k) => {
1372                    Bound::Included(InternalKey::new(k, SeqNo::MAX, crate::ValueType::Tombstone))
1373                }
1374                Bound::Excluded(k) => {
1375                    Bound::Excluded(InternalKey::new(k, 0, crate::ValueType::Tombstone))
1376                }
1377                Bound::Unbounded => Bound::Unbounded,
1378            },
1379            match hi {
1380                Bound::Included(k) => {
1381                    Bound::Included(InternalKey::new(k, 0, crate::ValueType::Value))
1382                }
1383                Bound::Excluded(k) => {
1384                    Bound::Excluded(InternalKey::new(k, SeqNo::MAX, crate::ValueType::Value))
1385                }
1386                Bound::Unbounded => Bound::Unbounded,
1387            },
1388        );
1389        let mut add_memtable = |mt: &crate::Memtable| {
1390            total_rows = total_rows.saturating_add(mt.len() as u64);
1391            let in_range = mt
1392                .range_internal(mt_range.clone())
1393                .filter(|kv| kv.key.seqno < seqno)
1394                .count() as u64;
1395            rows = rows.saturating_add(in_range);
1396        };
1397        add_memtable(&super_version.active_memtable);
1398        for mt in super_version.sealed_memtables.iter() {
1399            add_memtable(mt);
1400        }
1401
1402        // selectivity is an approximate ratio; u64 row counts are well within
1403        // f64's exact-integer range (2^52) for any realistic table.
1404        #[expect(
1405            clippy::cast_precision_loss,
1406            reason = "row counts never approach 2^52; the ratio is approximate anyway"
1407        )]
1408        let selectivity = if total_rows == 0 {
1409            0.0
1410        } else {
1411            (rows.min(total_rows) as f64) / (total_rows as f64)
1412        };
1413        Ok(crate::RangeCardinality { rows, selectivity })
1414    }
1415
1416    fn get_highest_memtable_seqno(&self) -> Option<SeqNo> {
1417        let version = self.version_history.read().latest_version();
1418
1419        let active = version.active_memtable.get_highest_seqno();
1420
1421        let sealed = version
1422            .sealed_memtables
1423            .iter()
1424            .map(|mt| mt.get_highest_seqno())
1425            .max()
1426            .flatten();
1427
1428        active.max(sealed)
1429    }
1430
1431    fn get_highest_persisted_seqno(&self) -> Option<SeqNo> {
1432        self.current_version()
1433            .iter_tables()
1434            .map(Table::get_highest_seqno)
1435            .max()
1436    }
1437
1438    fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>> {
1439        let key = key.as_ref();
1440
1441        let super_version = self.version_history.read().get_version_for_snapshot(seqno);
1442
1443        Self::resolve_or_passthrough(
1444            &super_version,
1445            key,
1446            seqno,
1447            self.config.merge_operator.as_ref(),
1448            self.config.comparator.as_ref(),
1449        )
1450    }
1451
1452    fn get_pinned<K: AsRef<[u8]>>(
1453        &self,
1454        key: K,
1455        seqno: SeqNo,
1456    ) -> crate::Result<Option<crate::PinnableSlice>> {
1457        let key = key.as_ref();
1458
1459        let super_version = self.version_history.read().get_version_for_snapshot(seqno);
1460
1461        Self::resolve_or_passthrough_pinned(
1462            &super_version,
1463            key,
1464            seqno,
1465            self.config.merge_operator.as_ref(),
1466            self.config.comparator.as_ref(),
1467        )
1468    }
1469
1470    #[expect(
1471        clippy::indexing_slicing,
1472        reason = "indices are generated from 0..n range, always in bounds"
1473    )]
1474    fn multi_get<K: AsRef<[u8]>>(
1475        &self,
1476        keys: impl IntoIterator<Item = K>,
1477        seqno: SeqNo,
1478    ) -> crate::Result<Vec<Option<UserValue>>> {
1479        let super_version = self.get_version_for_snapshot(seqno);
1480        let comparator = self.config.comparator.as_ref();
1481        let merge_operator = self.config.merge_operator.as_ref();
1482
1483        // Collect keys up front; bloom hashes computed lazily in Phase 2
1484        let keys: Vec<_> = keys.into_iter().collect();
1485        let n = keys.len();
1486        if n == 0 {
1487            return Ok(Vec::new());
1488        }
1489
1490        // For small batches, use the simple per-key path
1491        if n <= 2 {
1492            return keys
1493                .iter()
1494                .map(|key| {
1495                    Self::resolve_or_passthrough(
1496                        &super_version,
1497                        key.as_ref(),
1498                        seqno,
1499                        merge_operator,
1500                        comparator,
1501                    )
1502                })
1503                .collect();
1504        }
1505
1506        // Phase 1: Check active + sealed memtables (unsorted — memtable lookup
1507        // is O(log n) per key regardless of order, skip sort+hash overhead for
1508        // memtable-only batches).
1509        let mut internal_entries: Vec<Option<InternalValue>> = vec![None; n];
1510        let mut remaining: Vec<usize> = Vec::with_capacity(n);
1511
1512        for idx in 0..n {
1513            let key = keys[idx].as_ref();
1514
1515            // Active memtable
1516            if let Some(entry) = super_version.active_memtable.get(key, seqno) {
1517                internal_entries[idx] = Some(entry);
1518                continue;
1519            }
1520
1521            // Sealed memtables (newest first)
1522            if let Some(entry) =
1523                Self::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
1524            {
1525                internal_entries[idx] = Some(entry);
1526                continue;
1527            }
1528
1529            remaining.push(idx);
1530        }
1531
1532        // Phase 2: Sort remaining keys + compute bloom hashes only if needed
1533        // (memtable-only batches skip this entirely).
1534        if !remaining.is_empty() {
1535            remaining.sort_by(|&a, &b| comparator.compare(keys[a].as_ref(), keys[b].as_ref()));
1536
1537            // De-duplicate equal query keys (the batched on-disk path requires
1538            // strictly-sorted-unique input) and resolve the misses. Shared with
1539            // the BlobTree path via these helpers so the two cannot drift.
1540            let (miss_keys, duplicates) =
1541                Self::dedup_sorted_miss_keys(&remaining, &keys, comparator);
1542
1543            Self::batch_get_from_tables(
1544                &super_version.version,
1545                &keys,
1546                miss_keys,
1547                seqno,
1548                comparator,
1549                &*self.config.fs,
1550                &mut internal_entries,
1551            )?;
1552
1553            Self::fan_out_duplicates(&duplicates, &mut internal_entries);
1554        }
1555
1556        // Phase 3: Resolve entries (tombstones, RT suppression, merge operands)
1557        let mut results = vec![None; n];
1558        for idx in 0..n {
1559            let entry = internal_entries[idx].take();
1560            results[idx] = Self::resolve_entry(
1561                &super_version,
1562                keys[idx].as_ref(),
1563                entry,
1564                seqno,
1565                merge_operator,
1566                comparator,
1567            )?;
1568        }
1569
1570        Ok(results)
1571    }
1572
1573    fn apply_batch(&self, batch: crate::WriteBatch, seqno: SeqNo) -> crate::Result<(u64, u64)> {
1574        if batch.is_empty() {
1575            return Ok((0, self.active_memtable().size()));
1576        }
1577        Ok(self.append_batch(batch.materialize(seqno)?))
1578    }
1579
1580    fn insert<K: Into<UserKey>, V: Into<UserValue>>(
1581        &self,
1582        key: K,
1583        value: V,
1584        seqno: SeqNo,
1585    ) -> (u64, u64) {
1586        let value = InternalValue::from_components(key, value, seqno, ValueType::Value);
1587        self.append_entry(value)
1588    }
1589
1590    fn merge<K: Into<UserKey>, V: Into<UserValue>>(
1591        &self,
1592        key: K,
1593        operand: V,
1594        seqno: SeqNo,
1595    ) -> (u64, u64) {
1596        let value = InternalValue::new_merge_operand(key, operand, seqno);
1597        self.append_entry(value)
1598    }
1599
1600    fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
1601        let value = InternalValue::new_tombstone(key, seqno);
1602        self.append_entry(value)
1603    }
1604
1605    fn remove_weak<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
1606        let value = InternalValue::new_weak_tombstone(key, seqno);
1607        self.append_entry(value)
1608    }
1609
1610    fn remove_range<K: Into<UserKey>>(&self, start: K, end: K, seqno: SeqNo) -> u64 {
1611        let memtable = Arc::clone(&self.version_history.read().latest_version().active_memtable);
1612
1613        memtable.insert_range_tombstone(start.into(), end.into(), seqno)
1614    }
1615}
1616
1617impl Tree {
1618    /// Maps a raw internal entry to its change-data-capture event, routing
1619    /// `Indirection` (KV-separated) values through `resolve_indirection`.
1620    ///
1621    /// A standard tree never stores `Indirection` and supplies a resolver that
1622    /// errors; the blob-tree scan path supplies one that reads the blob and
1623    /// returns an [`ScanSinceEvent::Insert`].
1624    fn map_event<F>(
1625        entry: InternalValue,
1626        version: &Version,
1627        resolve_indirection: &F,
1628    ) -> crate::Result<ScanSinceEvent>
1629    where
1630        F: Fn(&Version, InternalValue) -> crate::Result<ScanSinceEvent>,
1631    {
1632        if entry.key.value_type == ValueType::Indirection {
1633            return resolve_indirection(version, entry);
1634        }
1635        let seqno = entry.key.seqno;
1636        let key = entry.key.user_key;
1637        Ok(match entry.key.value_type {
1638            ValueType::Value => ScanSinceEvent::Insert {
1639                key,
1640                value: entry.value,
1641                seqno,
1642            },
1643            ValueType::MergeOperand => ScanSinceEvent::MergeOperand {
1644                key,
1645                operand: entry.value,
1646                seqno,
1647            },
1648            ValueType::Tombstone | ValueType::WeakTombstone => {
1649                ScanSinceEvent::PointTombstone { key, seqno }
1650            }
1651            ValueType::Indirection => unreachable!("Indirection handled above"),
1652        })
1653    }
1654
1655    /// Shared CDC aggregation behind [`Self::scan_since_seqno`] and the
1656    /// blob-tree scan path: gathers qualifying entries (`seqno >= target`) plus
1657    /// range tombstones from the active + sealed memtables and every SST (with
1658    /// block-skip), maps each entry to a [`ScanSinceEvent`] — routing
1659    /// `Indirection` values through `resolve_indirection` against the same
1660    /// version snapshot — and returns them in increasing seqno order.
1661    ///
1662    /// # Panics
1663    ///
1664    /// Panics if the internal version-history lock is poisoned.
1665    ///
1666    /// # Errors
1667    ///
1668    /// Returns `Err` if reading the index or a data block fails, or if
1669    /// `resolve_indirection` errors.
1670    pub(crate) fn scan_since_seqno_with<F>(
1671        &self,
1672        target_seqno: SeqNo,
1673        block_skip: bool,
1674        resolve_indirection: F,
1675    ) -> crate::Result<alloc::vec::IntoIter<ScanSinceEvent>>
1676    where
1677        F: Fn(&Version, InternalValue) -> crate::Result<ScanSinceEvent>,
1678    {
1679        let super_version = self.version_history.read().latest_version();
1680        let version = &super_version.version;
1681
1682        // Stable upper watermark, captured once before walking any source: the
1683        // highest seqno present across every source at scan start (in global
1684        // coordinates — `Table::get_highest_seqno` already adds the offset).
1685        // The active memtable is shared and mutable, so without this cap a
1686        // write committed mid-scan could leak in and break the "consistent
1687        // snapshot of changes in [target, watermark]" contract. The seqno
1688        // counter is not a reliable bound here because callers may assign
1689        // seqnos explicitly without advancing it, so derive it from the data.
1690        let end_seqno = {
1691            let active = super_version.active_memtable.get_highest_seqno();
1692            let sealed = super_version
1693                .sealed_memtables
1694                .iter()
1695                .map(|mt| mt.get_highest_seqno())
1696                .max()
1697                .flatten();
1698            let tables = version.iter_tables().map(Table::get_highest_seqno).max();
1699            active.max(sealed).max(tables)
1700        };
1701        // No entries anywhere ⇒ nothing qualifies, regardless of target.
1702        let Some(end_seqno) = end_seqno else {
1703            return Ok(Vec::new().into_iter());
1704        };
1705
1706        let mut events: Vec<ScanSinceEvent> = Vec::new();
1707
1708        // Point entries: active + sealed memtables, then SSTs (block-skip).
1709        for entry in super_version.active_memtable.iter() {
1710            if entry.key.seqno >= target_seqno && entry.key.seqno <= end_seqno {
1711                events.push(Self::map_event(entry, version, &resolve_indirection)?);
1712            }
1713        }
1714        for memtable in super_version.sealed_memtables.iter() {
1715            for entry in memtable.iter() {
1716                if entry.key.seqno >= target_seqno && entry.key.seqno <= end_seqno {
1717                    events.push(Self::map_event(entry, version, &resolve_indirection)?);
1718                }
1719            }
1720        }
1721        for table in version.iter_tables() {
1722            // `scan_seqno_range` upper bound is exclusive; `end_seqno` is the
1723            // inclusive max, so add one (saturating for the MAX edge).
1724            for entry in
1725                table.scan_seqno_range(target_seqno, end_seqno.saturating_add(1), block_skip)?
1726            {
1727                events.push(Self::map_event(entry, version, &resolve_indirection)?);
1728            }
1729        }
1730
1731        // Range tombstones from the same sources, carrying their own seqno.
1732        let mut push_range_tombstone = |rt: &RangeTombstone| {
1733            if rt.seqno >= target_seqno && rt.seqno <= end_seqno {
1734                events.push(ScanSinceEvent::RangeTombstone {
1735                    start_key: rt.start.clone(),
1736                    end_key: rt.end.clone(),
1737                    seqno: rt.seqno,
1738                });
1739            }
1740        };
1741        for rt in super_version.active_memtable.range_tombstones_sorted() {
1742            push_range_tombstone(&rt);
1743        }
1744        for memtable in super_version.sealed_memtables.iter() {
1745            for rt in memtable.range_tombstones_sorted() {
1746                push_range_tombstone(&rt);
1747            }
1748        }
1749        for table in version.iter_tables() {
1750            for rt in table.range_tombstones() {
1751                push_range_tombstone(rt);
1752            }
1753        }
1754
1755        // Replay order is increasing seqno across every source.
1756        events.sort_by_key(ScanSinceEvent::seqno);
1757
1758        Ok(events.into_iter())
1759    }
1760
1761    /// Iterate change events with `seqno >= target_seqno`.
1762    ///
1763    /// Returns every change committed at or after `target_seqno` as a stream
1764    /// of [`ScanSinceEvent`]s in increasing seqno order. This is the canonical
1765    /// change-data-capture primitive: a downstream consumer (replica, Kafka
1766    /// connector, Debezium-style pipeline) replays the events in order to
1767    /// reconstruct the source's history. Superseded versions are not collapsed
1768    /// (a key written three times after the target yields three events).
1769    ///
1770    /// # Block-skip
1771    ///
1772    /// On SSTs written with the `seqno_bounds` section (`seqno_in_index`), data
1773    /// blocks whose bounds cannot overlap the target window are skipped without
1774    /// being read; SSTs without the section are read and filtered per entry, so
1775    /// mixed trees are handled transparently.
1776    ///
1777    /// # KV-separation
1778    ///
1779    /// Standard trees never store blob-indirected values. On the inner tree of
1780    /// a KV-separated (blob) tree this returns an `Err` for indirected entries:
1781    /// blob resolution into [`ScanSinceEvent::Insert`] is provided by the
1782    /// blob-tree scan path, which owns the blob files.
1783    ///
1784    /// # Corruption resilience
1785    ///
1786    /// The per-block seqno-bounds used for skipping live in the optional
1787    /// `seqno_bounds` SST section, a Block covered by XXH3-128 (+ optional Page
1788    /// ECC) and verified when it is loaded at open, plus a decode that rejects
1789    /// non-ascending offsets and inverted bounds, so a corrupted bound is caught
1790    /// rather than trusted. Even in the impossible case of a fault bypassing
1791    /// those checks, a bad bound can only cause a *missed* record, never a wrong
1792    /// one. Callers who want defense against that hypothetical can use
1793    /// [`Self::scan_since_seqno_full_scan`], which reads every block (slower, no
1794    /// skip).
1795    ///
1796    /// # Panics
1797    ///
1798    /// Panics if the internal version-history lock is poisoned.
1799    ///
1800    /// # Errors
1801    ///
1802    /// Returns `Err` if reading the index or a data block fails, or if an entry
1803    /// is a KV-separated value (see above).
1804    pub fn scan_since_seqno(
1805        &self,
1806        target_seqno: SeqNo,
1807    ) -> crate::Result<impl Iterator<Item = ScanSinceEvent> + use<>> {
1808        // A standard tree never stores blob-indirected values; the resolver
1809        // errors so an indirected entry (only reachable via a blob tree's inner
1810        // index) surfaces as a clear error rather than a wrong event.
1811        self.scan_since_seqno_with(target_seqno, true, |_version, _entry| {
1812            Err(crate::Error::FeatureUnsupported(
1813                "scan_since_seqno on KV-separated values requires the blob-tree scan path",
1814            ))
1815        })
1816    }
1817
1818    /// Paranoid variant of [`Self::scan_since_seqno`] that disables the
1819    /// per-block seqno-bounds skip: every data block is read and filtered per
1820    /// entry, even on seqno-indexed SSTs.
1821    ///
1822    /// # When to use
1823    ///
1824    /// The fast [`Self::scan_since_seqno`] trusts each block's recorded
1825    /// `[seqno_min, seqno_max]` to skip blocks that cannot hold a qualifying
1826    /// record. Those bounds live in the `seqno_bounds` SST section, a Block
1827    /// covered by XXH3-128 (and optional Page ECC) and verified at open, so
1828    /// on-disk corruption is caught, not silently trusted. This method exists
1829    /// for callers who
1830    /// want defense even against a fault that somehow bypassed those checks: a
1831    /// corrupted `seqno_max` can only ever cause a *missed* record (never a
1832    /// wrong one), and a full scan cannot miss. It is slower (no skip), so
1833    /// prefer [`Self::scan_since_seqno`] unless you specifically need this
1834    /// guarantee.
1835    ///
1836    /// # Panics
1837    ///
1838    /// Panics if the internal version-history lock is poisoned.
1839    ///
1840    /// # Errors
1841    ///
1842    /// Same as [`Self::scan_since_seqno`].
1843    pub fn scan_since_seqno_full_scan(
1844        &self,
1845        target_seqno: SeqNo,
1846    ) -> crate::Result<impl Iterator<Item = ScanSinceEvent> + use<>> {
1847        self.scan_since_seqno_with(target_seqno, false, |_version, _entry| {
1848            Err(crate::Error::FeatureUnsupported(
1849                "scan_since_seqno on KV-separated values requires the blob-tree scan path",
1850            ))
1851        })
1852    }
1853
1854    /// Update the live [`crate::runtime_config::RuntimeConfig`].
1855    ///
1856    /// Mutator runs on a clone of the current snapshot; the new snapshot
1857    /// is then atomically swapped in. Subsequent calls to
1858    /// [`Self::runtime_config`] observe the new snapshot.
1859    ///
1860    /// ## Current scope
1861    ///
1862    /// This API ships the snapshot + atomic-swap mechanism. No write
1863    /// path in the current tree consults `runtime_config` yet — that
1864    /// wiring lands with the V5-batch format features (manifest
1865    /// hardening, per-KV protection, scan-since-seqno) which extend
1866    /// [`RuntimeConfig`](crate::runtime_config::RuntimeConfig) with
1867    /// their own fields and read it at block write / manifest commit /
1868    /// compaction boundaries.
1869    ///
1870    /// ## Designed semantics (effective once wired by V5 features)
1871    ///
1872    /// - Subsequent write paths load the new snapshot lockless on their
1873    ///   next operation.
1874    /// - Existing on-disk data remains in its original format and reads
1875    ///   transparently — every block / manifest is self-describing via
1876    ///   its own header.
1877    /// - Compaction acts as the live-migration mechanism: source blocks
1878    ///   are rewritten per the current snapshot over subsequent cycles,
1879    ///   so all data converges to the current settings without
1880    ///   stop-the-world coordination.
1881    ///
1882    /// ## Concurrency
1883    ///
1884    /// **Reader atomicity:** concurrent readers observe either the old
1885    /// or the new snapshot, never a torn intermediate state.
1886    ///
1887    /// **Writer semantics: last-writer-wins.** Two `update` calls racing
1888    /// from the same starting snapshot will have the second `store`
1889    /// overwrite the first — the first writer's mutation is lost. There
1890    /// is no CAS / RCU merge. Callers that need lost-update avoidance
1891    /// (e.g. two threads concurrently toggling different fields) MUST
1892    /// serialize their `update_runtime_config` calls, typically via a
1893    /// `Mutex` around the call site.
1894    /// # Errors
1895    ///
1896    /// Returns [`crate::Error::PageEccUnsupported`] when the mutator
1897    /// leaves `page_ecc = true` on a binary built without the
1898    /// `page_ecc` cargo feature. The live snapshot stays at its
1899    /// pre-mutation value on error.
1900    pub fn update_runtime_config<F>(&self, mutator: F) -> crate::Result<()>
1901    where
1902        F: FnOnce(&mut crate::runtime_config::RuntimeConfig),
1903    {
1904        // Route through the validating handle path so an invalid
1905        // mutation (currently: `page_ecc = true` on a non-`page_ecc`
1906        // build) is rejected at update time, not silently swallowed
1907        // at the next manifest write.
1908        // Capture this update's `auto_heal` inside the mutation so the read-path
1909        // heal gate reflects exactly the config THIS call commits, rather than a
1910        // separate `load_full()` that could observe a different concurrent
1911        // update's value. Concurrent `update_runtime_config` calls must be
1912        // serialized by the caller (see the last-writer-wins note above); under
1913        // that contract the gate and the committed config stay in sync. On a
1914        // validation error `try_update` does not commit and `?` returns before
1915        // the gate is touched, so it keeps tracking the unchanged config.
1916        let mut auto_heal = false;
1917        self.0.runtime_config.try_update(|c| {
1918            mutator(c);
1919            auto_heal = c.auto_heal;
1920        })?;
1921        self.0.heal_hints.set_enabled(auto_heal);
1922        // Drop the cached admission footprint so the next check re-probes
1923        // disk-free: an operator who just raised the budget (or freed disk)
1924        // should see it promptly, not at the next flush.
1925        *self.0.admission_used_cache.lock() = None;
1926        Ok(())
1927    }
1928
1929    /// Snapshot of the current runtime config. Cheap atomic load —
1930    /// safe to call on hot paths.
1931    #[must_use]
1932    pub fn runtime_config(&self) -> Arc<crate::runtime_config::RuntimeConfig> {
1933        self.0.runtime_config.load_full()
1934    }
1935
1936    /// Shared handle to this tree's ECC heal-hint queue.
1937    ///
1938    /// A read that recovers a block from Page-ECC parity records the owning SST
1939    /// here (when the on-disk fault is confirmed persistent). Pass the handle to
1940    /// [`compaction::EccHeal`](crate::compaction::EccHeal) and run that strategy
1941    /// via [`Tree::compact`](crate::AbstractTree::compact) — leader-only in a
1942    /// clustered deployment — to rewrite the flagged SSTs clean. Check
1943    /// [`HealHints::is_empty`](crate::heal_hints::HealHints::is_empty) to skip
1944    /// the pass when nothing is queued.
1945    ///
1946    /// # Examples
1947    ///
1948    /// ```no_run
1949    /// use lsm_tree::{AbstractTree, AnyTree, Config, SequenceNumberCounter, compaction::EccHeal};
1950    /// use std::sync::Arc;
1951    /// # fn main() -> lsm_tree::Result<()> {
1952    /// let AnyTree::Standard(tree) = Config::new(
1953    ///     "/tmp/db",
1954    ///     SequenceNumberCounter::default(),
1955    ///     SequenceNumberCounter::default(),
1956    /// )
1957    /// .open()?
1958    /// else {
1959    ///     return Ok(());
1960    /// };
1961    ///
1962    /// // Opt into rewrite scheduling; reads that recover a block from parity now
1963    /// // flag its SST for healing.
1964    /// tree.update_runtime_config(|c| c.auto_heal = true)?;
1965    ///
1966    /// // Drain the queue, rewriting each flagged SST clean (leader-only in a
1967    /// // clustered deployment).
1968    /// let hints = tree.heal_hints();
1969    /// while !hints.is_empty() {
1970    ///     tree.compact(Arc::new(EccHeal::new(tree.heal_hints(), u64::MAX)), 0)?;
1971    /// }
1972    /// # Ok(())
1973    /// # }
1974    /// ```
1975    #[must_use]
1976    pub fn heal_hints(&self) -> Arc<crate::heal_hints::HealHints> {
1977        Arc::clone(&self.0.heal_hints)
1978    }
1979
1980    /// Shared point-read logic for `get()` and `multi_get()`: finds the newest
1981    /// entry, applies merge resolution or RT suppression, and returns the value.
1982    fn resolve_or_passthrough(
1983        super_version: &SuperVersion,
1984        key: &[u8],
1985        seqno: SeqNo,
1986        merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
1987        comparator: &dyn crate::comparator::UserComparator,
1988    ) -> crate::Result<Option<UserValue>> {
1989        let entry = Self::get_value(super_version, key, seqno, comparator)?;
1990
1991        match entry {
1992            Some((ValueType::MergeOperand, entry_seqno, value)) => {
1993                if let Some(merge_op) = merge_operator {
1994                    // Build a bloom-filtered single-key iterator pipeline that
1995                    // reuses MvccStream for merge/RT/Indirection resolution,
1996                    // eliminating the previous hand-rolled merge collection.
1997                    Self::resolve_merge_via_pipeline(
1998                        super_version.clone(),
1999                        key,
2000                        seqno,
2001                        Arc::clone(merge_op),
2002                    )
2003                } else if Self::is_suppressed_by_range_tombstones(
2004                    super_version,
2005                    key,
2006                    entry_seqno,
2007                    seqno,
2008                    comparator,
2009                ) {
2010                    Ok(None)
2011                } else {
2012                    Ok(Some(value))
2013                }
2014            }
2015            Some((_, _, value)) => Ok(Some(value)),
2016            None => Ok(None),
2017        }
2018    }
2019
2020    /// Shared post-lookup resolution for `get_pinned` and `multi_get`:
2021    /// tombstone filter, range-tombstone suppression, merge operand resolution.
2022    /// Returns `None` if entry is tombstoned or suppressed.
2023    fn resolve_pinned_entry(
2024        super_version: &SuperVersion,
2025        key: &[u8],
2026        entry: InternalValue,
2027        seqno: SeqNo,
2028        merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
2029        comparator: &dyn crate::comparator::UserComparator,
2030        wrap: impl FnOnce(UserValue) -> crate::PinnableSlice,
2031    ) -> crate::Result<Option<crate::PinnableSlice>> {
2032        use crate::PinnableSlice;
2033
2034        let Some(entry) = ignore_tombstone_value(entry) else {
2035            return Ok(None);
2036        };
2037        if Self::is_suppressed_by_range_tombstones(
2038            super_version,
2039            key,
2040            entry.key.seqno,
2041            seqno,
2042            comparator,
2043        ) {
2044            return Ok(None);
2045        }
2046        if entry.key.value_type == ValueType::MergeOperand
2047            && let Some(merge_op) = merge_operator
2048        {
2049            // Merge resolution always produces Owned (pipeline result).
2050            return Self::resolve_merge_via_pipeline(
2051                super_version.clone(),
2052                key,
2053                seqno,
2054                Arc::clone(merge_op),
2055            )
2056            .map(|opt| opt.map(PinnableSlice::owned));
2057        }
2058        Ok(Some(wrap(entry.value)))
2059    }
2060
2061    /// Like [`Tree::resolve_or_passthrough`], but returns a [`PinnableSlice`](crate::PinnableSlice)
2062    /// that may keep the decompressed block buffer alive.
2063    fn resolve_or_passthrough_pinned(
2064        super_version: &SuperVersion,
2065        key: &[u8],
2066        seqno: SeqNo,
2067        merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
2068        comparator: &dyn crate::comparator::UserComparator,
2069    ) -> crate::Result<Option<crate::PinnableSlice>> {
2070        use crate::PinnableSlice;
2071
2072        // Check memtables first — always Owned
2073        if let Some(entry) = super_version.active_memtable.get(key, seqno) {
2074            return Self::resolve_pinned_entry(
2075                super_version,
2076                key,
2077                entry,
2078                seqno,
2079                merge_operator,
2080                comparator,
2081                PinnableSlice::owned,
2082            );
2083        }
2084
2085        // Sealed memtables — always Owned
2086        if let Some(entry) =
2087            Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
2088        {
2089            return Self::resolve_pinned_entry(
2090                super_version,
2091                key,
2092                entry,
2093                seqno,
2094                merge_operator,
2095                comparator,
2096                PinnableSlice::owned,
2097            );
2098        }
2099
2100        // Tables — Pinned (value shares decompressed block buffer)
2101        let key_hash = crate::hash::hash64(key);
2102
2103        if let Some((entry, block)) = Self::get_internal_entry_with_block_from_tables(
2104            &super_version.version,
2105            key,
2106            seqno,
2107            key_hash,
2108            comparator,
2109        )? {
2110            return Self::resolve_pinned_entry(
2111                super_version,
2112                key,
2113                entry,
2114                seqno,
2115                merge_operator,
2116                comparator,
2117                |value| PinnableSlice::pinned(block, value),
2118            );
2119        }
2120
2121        Ok(None)
2122    }
2123
2124    /// Like [`Tree::get_internal_entry_from_tables`], but returns the block
2125    /// along with the entry for pinned zero-copy access.
2126    fn get_internal_entry_with_block_from_tables(
2127        version: &Version,
2128        key: &[u8],
2129        seqno: SeqNo,
2130        key_hash: u64,
2131        comparator: &dyn crate::comparator::UserComparator,
2132    ) -> crate::Result<Option<(InternalValue, crate::table::Block)>> {
2133        Self::find_in_tables::<TableEntryWithBlock>(version, key, seqno, key_hash, comparator)
2134    }
2135
2136    /// Resolves merge operands for a point read via a bloom-filtered iterator pipeline.
2137    ///
2138    /// Builds a single-key range (`key..=key`) with bloom pre-filtering, wraps
2139    /// all sources in `Merger → MvccStream`, and takes the first result. This
2140    /// reuses the unified merge/RT/Indirection resolution logic from `MvccStream`
2141    /// instead of duplicating it in a hand-rolled collection loop.
2142    ///
2143    /// Bloom pre-filtering can reject many disk tables at the filter level,
2144    /// which typically improves point-read performance on deep LSM trees.
2145    pub(crate) fn resolve_merge_via_pipeline(
2146        version: SuperVersion,
2147        key: &[u8],
2148        seqno: SeqNo,
2149        merge_operator: Arc<dyn crate::merge_operator::MergeOperator>,
2150    ) -> crate::Result<Option<UserValue>> {
2151        use crate::range::{IterState, TreeIter};
2152
2153        let key_hash = crate::hash::hash64(key);
2154        // NOTE: Slice::from(&[u8]) copies the key (small, typically < 100 bytes).
2155        // This runs once per merge resolution, not per-table — cost is negligible
2156        // compared to the I/O saved by partition-aware bloom filtering.
2157        let bloom_key = crate::Slice::from(key);
2158        let comparator = version.active_memtable.comparator.clone();
2159
2160        let iter_state = IterState {
2161            version,
2162            ephemeral: None,
2163            merge_operator: Some(merge_operator),
2164            comparator,
2165            prefix_hash: None,
2166            key_hash: Some(key_hash),
2167            bloom_key: Some(bloom_key),
2168            #[cfg(feature = "metrics")]
2169            metrics: None,
2170        };
2171
2172        // Point-read fast path: skips eager RT collection, sort+dedup, table-skip,
2173        // and RangeTombstoneFilter wrapper. MvccStream handles merge-internal RT
2174        // suppression; a post-merge linear RT check catches the rest.
2175        let mut iter = TreeIter::create_range_point(iter_state, key, seqno);
2176
2177        match iter.next() {
2178            Some(Ok(entry)) => Ok(Some(entry.value)),
2179            Some(Err(e)) => Err(e),
2180            None => Ok(None),
2181        }
2182    }
2183
2184    #[doc(hidden)]
2185    pub fn create_internal_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
2186        version: SuperVersion,
2187        range: &'a R,
2188        seqno: SeqNo,
2189        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
2190        merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
2191        comparator: crate::comparator::SharedComparator,
2192    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
2193        Self::create_internal_range_with_prefix_hash(
2194            version,
2195            range,
2196            seqno,
2197            ephemeral,
2198            merge_operator,
2199            comparator,
2200            None,
2201        )
2202    }
2203
2204    /// Like [`Tree::create_internal_range`], but with an optional prefix hash
2205    /// for prefix bloom filter skipping during prefix scans.
2206    #[doc(hidden)]
2207    pub(crate) fn create_internal_range_with_prefix_hash<
2208        'a,
2209        K: AsRef<[u8]> + 'a,
2210        R: RangeBounds<K> + 'a,
2211    >(
2212        version: SuperVersion,
2213        range: &'a R,
2214        seqno: SeqNo,
2215        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
2216        merge_operator: Option<Arc<dyn crate::merge_operator::MergeOperator>>,
2217        comparator: crate::comparator::SharedComparator,
2218        prefix_hash: Option<u64>,
2219    ) -> impl DoubleEndedIterator<Item = crate::Result<InternalValue>> + 'static {
2220        use crate::range::{IterState, TreeIter};
2221        use core::ops::Bound::{self, Excluded, Included, Unbounded};
2222
2223        let lo: Bound<UserKey> = match range.start_bound() {
2224            Included(x) => Included(x.as_ref().into()),
2225            Excluded(x) => Excluded(x.as_ref().into()),
2226            Unbounded => Unbounded,
2227        };
2228
2229        let hi: Bound<UserKey> = match range.end_bound() {
2230            Included(x) => Included(x.as_ref().into()),
2231            Excluded(x) => Excluded(x.as_ref().into()),
2232            Unbounded => Unbounded,
2233        };
2234
2235        let bounds: (Bound<UserKey>, Bound<UserKey>) = (lo, hi);
2236
2237        let iter_state = IterState {
2238            version,
2239            ephemeral,
2240            merge_operator,
2241            comparator,
2242            prefix_hash,
2243            key_hash: None,
2244            bloom_key: None,
2245            #[cfg(feature = "metrics")]
2246            metrics: None,
2247        };
2248
2249        TreeIter::create_range(iter_state, bounds, seqno)
2250    }
2251
2252    pub(crate) fn get_internal_entry_from_version(
2253        super_version: &SuperVersion,
2254        key: &[u8],
2255        seqno: SeqNo,
2256        comparator: &dyn crate::comparator::UserComparator,
2257    ) -> crate::Result<Option<InternalValue>> {
2258        // Search order: active → sealed → SST (newest first). A point
2259        // tombstone in a newer source is authoritative — no older source
2260        // can contain a newer value, so returning None is correct.
2261        if let Some(entry) = super_version.active_memtable.get(key, seqno) {
2262            let Some(entry) = ignore_tombstone_value(entry) else {
2263                return Ok(None);
2264            };
2265
2266            // Check if any range tombstone suppresses this entry
2267            if Self::is_suppressed_by_range_tombstones(
2268                super_version,
2269                key,
2270                entry.key.seqno,
2271                seqno,
2272                comparator,
2273            ) {
2274                return Ok(None);
2275            }
2276            return Ok(Some(entry));
2277        }
2278
2279        // Now look in sealed memtables
2280        if let Some(entry) =
2281            Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
2282        {
2283            let Some(entry) = ignore_tombstone_value(entry) else {
2284                return Ok(None);
2285            };
2286
2287            if Self::is_suppressed_by_range_tombstones(
2288                super_version,
2289                key,
2290                entry.key.seqno,
2291                seqno,
2292                comparator,
2293            ) {
2294                return Ok(None);
2295            }
2296            return Ok(Some(entry));
2297        }
2298
2299        // Now look in tables... this may involve disk I/O
2300        let entry =
2301            Self::get_internal_entry_from_tables(&super_version.version, key, seqno, comparator)?;
2302
2303        if let Some(entry) = entry {
2304            if Self::is_suppressed_by_range_tombstones(
2305                super_version,
2306                key,
2307                entry.key.seqno,
2308                seqno,
2309                comparator,
2310            ) {
2311                return Ok(None);
2312            }
2313            return Ok(Some(entry));
2314        }
2315
2316        Ok(None)
2317    }
2318
2319    /// Value-only mirror of [`Self::get_internal_entry_from_version`].
2320    ///
2321    /// Returns `(value_type, seqno, value)` for the newest visible entry without
2322    /// reconstructing the entry key. Same search order (active -> sealed -> SST,
2323    /// newest first), tombstone filtering, and range-tombstone suppression; only
2324    /// the SST path differs, using the value-only [`TableValue`] lookup that
2325    /// skips the delta-key fusion of the full `InternalValue` path. Used by the
2326    /// value-returning `get` path, which never reads the matched key.
2327    pub(crate) fn get_value(
2328        super_version: &SuperVersion,
2329        key: &[u8],
2330        seqno: SeqNo,
2331        comparator: &dyn crate::comparator::UserComparator,
2332    ) -> crate::Result<Option<(ValueType, SeqNo, crate::Slice)>> {
2333        if let Some(entry) = super_version.active_memtable.get(key, seqno) {
2334            let Some(entry) = ignore_tombstone_value(entry) else {
2335                return Ok(None);
2336            };
2337            if Self::is_suppressed_by_range_tombstones(
2338                super_version,
2339                key,
2340                entry.key.seqno,
2341                seqno,
2342                comparator,
2343            ) {
2344                return Ok(None);
2345            }
2346            return Ok(Some((entry.key.value_type, entry.key.seqno, entry.value)));
2347        }
2348
2349        if let Some(entry) =
2350            Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno)
2351        {
2352            let Some(entry) = ignore_tombstone_value(entry) else {
2353                return Ok(None);
2354            };
2355            if Self::is_suppressed_by_range_tombstones(
2356                super_version,
2357                key,
2358                entry.key.seqno,
2359                seqno,
2360                comparator,
2361            ) {
2362                return Ok(None);
2363            }
2364            return Ok(Some((entry.key.value_type, entry.key.seqno, entry.value)));
2365        }
2366
2367        let key_hash = crate::hash::hash64(key);
2368        let entry = Self::find_in_tables::<TableValue>(
2369            &super_version.version,
2370            key,
2371            seqno,
2372            key_hash,
2373            comparator,
2374        )?;
2375        if let Some((value_type, entry_seqno, value)) = entry {
2376            if Self::is_suppressed_by_range_tombstones(
2377                super_version,
2378                key,
2379                entry_seqno,
2380                seqno,
2381                comparator,
2382            ) {
2383                return Ok(None);
2384            }
2385            return Ok(Some((value_type, entry_seqno, value)));
2386        }
2387
2388        Ok(None)
2389    }
2390
2391    /// Checks if a key at `key_seqno` is suppressed by any range tombstone
2392    /// in the active memtable, sealed memtables, or SST tables, visible at `read_seqno`.
2393    pub(crate) fn is_suppressed_by_range_tombstones(
2394        super_version: &SuperVersion,
2395        key: &[u8],
2396        key_seqno: SeqNo,
2397        read_seqno: SeqNo,
2398        comparator: &dyn crate::comparator::UserComparator,
2399    ) -> bool {
2400        // Check active memtable range tombstones.
2401        // Future optimization: skip lock when memtable has no RTs (atomic count).
2402        if super_version
2403            .active_memtable
2404            .is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno)
2405        {
2406            return true;
2407        }
2408
2409        // Check sealed memtable range tombstones
2410        for mt in super_version.sealed_memtables.iter().rev() {
2411            if mt.is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno) {
2412                return true;
2413            }
2414        }
2415
2416        // Check SST table range tombstones.
2417        //
2418        // Per-table RT lists are sorted by start key (using comparator) on load,
2419        // so binary search narrows candidates to RTs with start <= key.
2420        // The key_range early reject uses the comparator so it works with
2421        // non-lexicographic orderings.
2422        for table in super_version
2423            .version
2424            .iter_levels()
2425            .flat_map(|lvl| lvl.iter())
2426            .flat_map(|run| run.iter())
2427            .filter(|t| !t.range_tombstones().is_empty())
2428            .filter(|t| {
2429                // Early reject: skip tables whose key range doesn't contain the key.
2430                let kr = &t.metadata.key_range;
2431                comparator.compare(kr.min(), key) != core::cmp::Ordering::Greater
2432                    && comparator.compare(key, kr.max()) != core::cmp::Ordering::Greater
2433            })
2434        {
2435            let rts = table.range_tombstones();
2436
2437            // Binary search: find the first RT whose start is > key (in comparator order).
2438            // All RTs before that index have start <= key and are candidates.
2439            let candidate_end = rts.partition_point(|rt| {
2440                comparator.compare(&rt.start, key) != core::cmp::Ordering::Greater
2441            });
2442
2443            for rt in rts.iter().take(candidate_end) {
2444                // Check: start <= key < end (in comparator order) AND seqno visibility.
2445                if rt.visible_at(read_seqno)
2446                    && comparator.compare(&rt.start, key) != core::cmp::Ordering::Greater
2447                    && comparator.compare(key, &rt.end) == core::cmp::Ordering::Less
2448                    && key_seqno < rt.seqno
2449                {
2450                    return true;
2451                }
2452            }
2453        }
2454
2455        false
2456    }
2457
2458    /// Resolves a single internal entry into a user value, handling tombstones,
2459    /// range tombstone suppression, and merge operand resolution.
2460    /// Resolves an entry for `multi_get`: tombstone filter, RT suppression,
2461    /// merge operand resolution. Delegates to [`Self::resolve_pinned_entry`] with
2462    /// `Owned` wrapping, then extracts the value.
2463    fn resolve_entry(
2464        super_version: &SuperVersion,
2465        key: &[u8],
2466        entry: Option<InternalValue>,
2467        seqno: SeqNo,
2468        merge_operator: Option<&Arc<dyn crate::merge_operator::MergeOperator>>,
2469        comparator: &dyn crate::comparator::UserComparator,
2470    ) -> crate::Result<Option<UserValue>> {
2471        let Some(entry) = entry else {
2472            return Ok(None);
2473        };
2474        Self::resolve_pinned_entry(
2475            super_version,
2476            key,
2477            entry,
2478            seqno,
2479            merge_operator,
2480            comparator,
2481            crate::PinnableSlice::owned,
2482        )
2483        .map(|opt| opt.map(crate::PinnableSlice::into_value))
2484    }
2485
2486    /// De-duplicates equal query keys in a comparator-sorted `remaining` index
2487    /// list, returning the `(key_index, bloom_hash)` pairs for the batched
2488    /// on-disk resolver (which requires strictly-sorted-unique input) and a
2489    /// `(duplicate_index, representative_index)` map. Pair with
2490    /// [`Self::fan_out_duplicates`] after the batch resolves.
2491    ///
2492    /// Shared by [`Self::multi_get`] and the `BlobTree` multi-get so the two
2493    /// cannot silently diverge: forwarding duplicate miss keys into the
2494    /// strictly-sorted-unique resolver was exactly the regression class this
2495    /// guards against. `remaining` must already be sorted by `comparator`.
2496    #[expect(
2497        clippy::indexing_slicing,
2498        reason = "remaining/miss_keys carry batch-local key indices < keys.len()"
2499    )]
2500    pub(crate) fn dedup_sorted_miss_keys<K: AsRef<[u8]>>(
2501        remaining: &[usize],
2502        keys: &[K],
2503        comparator: &dyn crate::comparator::UserComparator,
2504    ) -> DedupedMissKeys {
2505        let mut miss_keys: Vec<(usize, u64)> = Vec::with_capacity(remaining.len());
2506        let mut duplicates: Vec<(usize, usize)> = Vec::new();
2507        for &idx in remaining {
2508            let key = keys[idx].as_ref();
2509            match miss_keys.last() {
2510                Some(&(rep_idx, _))
2511                    if comparator.compare(keys[rep_idx].as_ref(), key)
2512                        == core::cmp::Ordering::Equal =>
2513                {
2514                    duplicates.push((idx, rep_idx));
2515                }
2516                _ => miss_keys.push((idx, crate::hash::hash64(key))),
2517            }
2518        }
2519        (miss_keys, duplicates)
2520    }
2521
2522    /// Fans each representative's resolved entry out to its duplicate positions,
2523    /// so every input slot carries the same answer the per-key path would have
2524    /// produced. Counterpart to [`Self::dedup_sorted_miss_keys`]; call after the
2525    /// batched resolver fills `internal_entries`.
2526    #[expect(
2527        clippy::indexing_slicing,
2528        reason = "duplicate/representative indices are batch-local key indices < entries.len()"
2529    )]
2530    pub(crate) fn fan_out_duplicates(
2531        duplicates: &[(usize, usize)],
2532        internal_entries: &mut [Option<InternalValue>],
2533    ) {
2534        for &(dup_idx, rep_idx) in duplicates {
2535            let resolved = internal_entries[rep_idx].clone();
2536            internal_entries[dup_idx] = resolved;
2537        }
2538    }
2539
2540    /// Queries tables for multiple keys using sorted access order.
2541    ///
2542    /// `miss_keys` contains `(key_index, bloom_hash)` pairs for keys not yet
2543    /// found, in comparator-sorted order. Keys are looked up individually via
2544    /// `Table::get`, but sorted order improves I/O locality. The precomputed
2545    /// bloom hash in each pair is reused across all table probes. Per-SST
2546    /// batched bloom checks and block walks are tracked in `#223`.
2547    #[expect(
2548        clippy::indexing_slicing,
2549        reason = "miss_keys entries carry batch-local indices; callers must pass a results slice aligned with keys"
2550    )]
2551    pub(crate) fn batch_get_from_tables<K: AsRef<[u8]>>(
2552        version: &Version,
2553        keys: &[K],
2554        miss_keys: Vec<(usize, u64)>,
2555        seqno: SeqNo,
2556        comparator: &dyn crate::comparator::UserComparator,
2557        fs: &dyn crate::fs::Fs,
2558        results: &mut [Option<InternalValue>],
2559    ) -> crate::Result<()> {
2560        debug_assert_eq!(results.len(), keys.len());
2561        debug_assert!(miss_keys.iter().all(|&(i, _)| i < keys.len()));
2562
2563        // Consume the caller's Vec directly — no allocation+copy.
2564        let mut still_remaining = miss_keys;
2565
2566        for (level_idx, level) in version.iter_levels().enumerate() {
2567            if still_remaining.is_empty() {
2568                break;
2569            }
2570
2571            // Warm the cold data blocks this level will read across ALL its SSTs
2572            // in one cross-file batched read, so the serial resolve below hits the
2573            // cache. On io_uring the reads coalesce into one submission and the
2574            // kernel fans them out across the underlying devices. When the cold
2575            // working set is too large to warm without thrashing the cache, this
2576            // signals oversize and warms nothing; the level is then resolved by
2577            // reading its blocks in budget-sized chunks into a scratch and
2578            // point-reading directly (no cache, no eviction).
2579            if Self::prewarm_level_cross_sst(fs, level, &still_remaining, keys, seqno, comparator)
2580                && Self::resolve_level_chunked(
2581                    fs,
2582                    level,
2583                    &mut still_remaining,
2584                    keys,
2585                    seqno,
2586                    comparator,
2587                    results,
2588                )?
2589            {
2590                continue;
2591            }
2592
2593            if level_idx == 0 {
2594                // L0: must check ALL runs, keep highest seqno per key. Track keys
2595                // at the seqno ceiling (seqno + 1 == read_seqno): no other L0 run
2596                // can beat them, so skip them in subsequent runs. The bitmap is
2597                // dense over 0..keys.len().
2598                let mut at_ceiling = vec![false; keys.len()];
2599
2600                for run in level.iter() {
2601                    // `at_ceiling` is read as this run's skip set (a key is visited
2602                    // once per run, so the updates below only affect later runs)
2603                    // and mutated from the returned outcomes: never both at once.
2604                    let resolved = Self::resolve_run_batched(
2605                        run,
2606                        &still_remaining,
2607                        keys,
2608                        seqno,
2609                        comparator,
2610                        |idx| at_ceiling[idx],
2611                    )?;
2612                    for (idx, _hash, item) in resolved.covered {
2613                        let Some(item) = item else { continue };
2614                        match &results[idx] {
2615                            Some(current) if current.key.seqno >= item.key.seqno => {}
2616                            _ => {
2617                                if item.key.seqno.checked_add(1) == Some(seqno) {
2618                                    at_ceiling[idx] = true;
2619                                }
2620                                results[idx] = Some(item);
2621                            }
2622                        }
2623                    }
2624                    // Uncovered keys stay in `still_remaining`; the retain below
2625                    // prunes the ones any run resolved.
2626                }
2627
2628                // Remove found keys (both values and tombstones)
2629                still_remaining.retain(|&(idx, _)| results[idx].is_none());
2630            } else {
2631                // L1+ runs have non-overlapping key ranges within a level. A
2632                // covering run resolves a key definitively: a hit sets the result,
2633                // a covering miss drops it to lower levels (`covered_miss`), and an
2634                // uncovered key tries the next run in this level (`not_covered`).
2635                let mut covered_miss: Vec<(usize, u64)> = Vec::new();
2636
2637                for run in level.iter() {
2638                    let resolved = Self::resolve_run_batched(
2639                        run,
2640                        &still_remaining,
2641                        keys,
2642                        seqno,
2643                        comparator,
2644                        |_| false,
2645                    )?;
2646                    for (idx, hash, item) in resolved.covered {
2647                        if let Some(item) = item {
2648                            results[idx] = Some(item);
2649                        } else {
2650                            // Covering run found, key absent: no other run in this
2651                            // level can have it. Keep for lower levels.
2652                            covered_miss.push((idx, hash));
2653                        }
2654                    }
2655                    still_remaining = resolved.not_covered;
2656                }
2657
2658                // Merge back: keys without a covering run + keys with a covering
2659                // miss both proceed to lower levels. Re-sort to preserve
2660                // comparator order for the next level's sequential scan.
2661                let needs_sort = !covered_miss.is_empty();
2662                still_remaining.extend(covered_miss);
2663                if needs_sort {
2664                    still_remaining.sort_by(|&(a, _), &(b, _)| {
2665                        comparator.compare(keys[a].as_ref(), keys[b].as_ref())
2666                    });
2667                }
2668            }
2669        }
2670
2671        Ok(())
2672    }
2673
2674    /// Resolves `remaining` (sorted ascending under `comparator`) against a
2675    /// single run with per-table batched gets instead of a per-key `table.get`:
2676    /// consecutive keys covered by the same table within the run share one
2677    /// [`Table::batch_get`], so co-located keys decode their data block once.
2678    /// Byte-identical to per-key resolution (the same point reads, the same
2679    /// values). `skip(idx)` omits a key (e.g. one already pinned at the L0 seqno
2680    /// ceiling, where no later run can beat it).
2681    ///
2682    /// Returns, per covered non-skipped key, `(idx, hash, resolved item)` in
2683    /// input order, plus the keys this run does not cover (also in input order)
2684    /// for the caller to pass to the next run or level.
2685    #[expect(
2686        clippy::indexing_slicing,
2687        reason = "i < remaining.len() is loop-checked; idx values are valid key indices (caller's keys/results are aligned, same as batch_get_from_tables)"
2688    )]
2689    fn resolve_run_batched<K: AsRef<[u8]>>(
2690        run: &crate::version::Run<crate::Table>,
2691        remaining: &[(usize, u64)],
2692        keys: &[K],
2693        seqno: SeqNo,
2694        comparator: &dyn crate::comparator::UserComparator,
2695        skip: impl Fn(usize) -> bool,
2696    ) -> crate::Result<RunResolve> {
2697        let mut covered: Vec<CoveredKey> = Vec::new();
2698        let mut not_covered: Vec<(usize, u64)> = Vec::new();
2699
2700        let mut i = 0;
2701        while i < remaining.len() {
2702            let (idx, hash) = remaining[i];
2703            if skip(idx) {
2704                i += 1;
2705                continue;
2706            }
2707            let key = keys[idx].as_ref();
2708            let Some(table) = run.get_for_key_cmp(key, comparator) else {
2709                not_covered.push((idx, hash));
2710                i += 1;
2711                continue;
2712            };
2713
2714            // Gather the contiguous, non-skipped keys covered by THIS table. The
2715            // input is sorted and a run's tables partition the key space, so the
2716            // keys for one table form a contiguous slice; one `batch_get` drains
2717            // them with a single block decode for co-located keys.
2718            let table_id = table.id();
2719            let mut batch: Vec<(&[u8], u64)> = Vec::new();
2720            let mut batch_keys: Vec<(usize, u64)> = Vec::new();
2721            while i < remaining.len() {
2722                let (jdx, jhash) = remaining[i];
2723                if skip(jdx) {
2724                    i += 1;
2725                    continue;
2726                }
2727                let jkey = keys[jdx].as_ref();
2728                match run.get_for_key_cmp(jkey, comparator) {
2729                    Some(t) if t.id() == table_id => {
2730                        batch.push((jkey, jhash));
2731                        batch_keys.push((jdx, jhash));
2732                        i += 1;
2733                    }
2734                    _ => break,
2735                }
2736            }
2737
2738            for ((kidx, khash), item) in batch_keys.into_iter().zip(table.batch_get(&batch, seqno)?)
2739            {
2740                covered.push((kidx, khash, item));
2741            }
2742        }
2743
2744        Ok(RunResolve {
2745            covered,
2746            not_covered,
2747        })
2748    }
2749
2750    /// Warms an entire level's COLD data blocks across ALL its SSTs in one
2751    /// cross-file batched read ([`crate::fs::Fs::read_blocks_batched`]), so the
2752    /// serial resolve walk that follows hits the cache. On `io_uring` the reads of
2753    /// many SSTs (and, on a multi-device filesystem, many physical devices)
2754    /// coalesce into one submission and overlap in flight.
2755    ///
2756    /// Purely best-effort: it never changes a query result (the resolve walk
2757    /// re-reads authoritatively), and it is size-bounded to at most half the
2758    /// shared cache so the warmed blocks survive until the walk reads them.
2759    ///
2760    /// Returns `true` when the level's cold working set EXCEEDS that half-cache
2761    /// bound: warming would thrash the cache, so nothing is warmed and the caller
2762    /// resolves the level with the chunked read-into-scratch path instead
2763    /// ([`Tree::resolve_level_chunked`]). Returns `false` when it warmed the
2764    /// blocks (or had nothing to warm), i.e. the serial resolve should run.
2765    #[expect(
2766        clippy::indexing_slicing,
2767        reason = "planned[ti] and all_buffers[k..end] indices are built from `planned` itself, so they are in range by construction"
2768    )]
2769    fn prewarm_level_cross_sst<K: AsRef<[u8]>>(
2770        fs: &dyn crate::fs::Fs,
2771        level: &crate::version::Level,
2772        remaining: &[(usize, u64)],
2773        keys: &[K],
2774        seqno: SeqNo,
2775        comparator: &dyn crate::comparator::UserComparator,
2776    ) -> bool {
2777        // Gather per-table prewarm plans across the level's runs (group remaining
2778        // keys by covering table, mirroring resolve_run_batched's walk).
2779        let mut planned: Vec<(
2780            &crate::Table,
2781            Arc<dyn crate::fs::FsFile>,
2782            Vec<crate::table::BlockHandle>,
2783        )> = Vec::new();
2784        for run in level.iter() {
2785            let mut i = 0;
2786            while i < remaining.len() {
2787                let (idx, _) = remaining[i];
2788                let key = keys[idx].as_ref();
2789                let Some(table) = run.get_for_key_cmp(key, comparator) else {
2790                    i += 1;
2791                    continue;
2792                };
2793                let table_id = table.id();
2794                let mut batch: Vec<(&[u8], u64)> = Vec::new();
2795                while i < remaining.len() {
2796                    let (jdx, jhash) = remaining[i];
2797                    let jkey = keys[jdx].as_ref();
2798                    match run.get_for_key_cmp(jkey, comparator) {
2799                        Some(t) if t.id() == table_id => {
2800                            batch.push((jkey, jhash));
2801                            i += 1;
2802                        }
2803                        _ => break,
2804                    }
2805                }
2806                if let Some((file, handles)) = table.plan_prewarm(&batch, seqno) {
2807                    planned.push((table, file, handles));
2808                }
2809            }
2810        }
2811
2812        let total_cold: usize = planned.iter().map(|(_, _, h)| h.len()).sum();
2813        if total_cold < 2 {
2814            return false;
2815        }
2816        // Eviction-avoiding bound: warm at most half the (shared) cache.
2817        let Some((first_table, _, _)) = planned.first() else {
2818            return false;
2819        };
2820        let cap = first_table.cache_capacity();
2821        let total_bytes: u64 = planned
2822            .iter()
2823            .flat_map(|(_, _, h)| h.iter().map(|x| u64::from(x.size())))
2824            .sum();
2825        if cap == 0 {
2826            return false;
2827        }
2828        if total_bytes > cap / 2 {
2829            // Cold working set too large to warm without thrash: signal the caller
2830            // to resolve this level via the chunked read-into-scratch path.
2831            return true;
2832        }
2833
2834        // One buffer per cold block, in (table, block) order.
2835        let mut all_buffers: Vec<Vec<u8>> = planned
2836            .iter()
2837            .flat_map(|(_, _, handles)| handles.iter().map(|h| vec![0u8; h.size() as usize]))
2838            .collect();
2839        // (table index, offset) per buffer, so each request borrows the right file.
2840        let flat: Vec<(usize, u64)> = planned
2841            .iter()
2842            .enumerate()
2843            .flat_map(|(ti, (_, _, handles))| handles.iter().map(move |h| (ti, *h.offset())))
2844            .collect();
2845
2846        {
2847            let mut reqs: Vec<crate::fs::BlockRead<'_>> = flat
2848                .iter()
2849                .zip(all_buffers.iter_mut())
2850                .map(|(&(ti, offset), buf)| crate::fs::BlockRead {
2851                    file: planned[ti].1.as_ref(),
2852                    offset,
2853                    buf: buf.as_mut_slice(),
2854                })
2855                .collect();
2856            // Best-effort: a batched-read failure just leaves the blocks for the
2857            // resolve walk to read normally.
2858            if fs.read_blocks_batched(&mut reqs).is_err() {
2859                return false;
2860            }
2861        }
2862
2863        // Decode each table's blocks (its contiguous slice of all_buffers).
2864        let mut k = 0;
2865        for (table, _, handles) in &planned {
2866            let end = k + handles.len();
2867            table.decode_prewarmed(handles, &all_buffers[k..end]);
2868            k = end;
2869        }
2870        false
2871    }
2872
2873    /// Plans every data block this level's SSTs will read for `remaining`,
2874    /// grouping keys by covering table per run (mirrors `resolve_run_batched`'s
2875    /// walk). Each task carries the ORIGINAL key indices (into `keys`).
2876    ///
2877    /// # Errors
2878    ///
2879    /// Propagates a table-side planning failure ([`Table::plan_block_tasks`]) so
2880    /// the resolver surfaces it instead of letting a stale lower level answer.
2881    #[expect(
2882        clippy::indexing_slicing,
2883        reason = "i < remaining.len() loop-checked; idx/jdx are valid key indices; batch_idx[pos] is in range (pos came from this table's own plan)"
2884    )]
2885    fn plan_level_block_tasks<'a, K: AsRef<[u8]>>(
2886        level: &'a crate::version::Level,
2887        remaining: &[(usize, u64)],
2888        keys: &[K],
2889        seqno: SeqNo,
2890        comparator: &dyn crate::comparator::UserComparator,
2891    ) -> crate::Result<Vec<BlockTask<'a>>> {
2892        let mut tasks: Vec<BlockTask<'a>> = Vec::new();
2893        for run in level.iter() {
2894            let mut i = 0;
2895            while i < remaining.len() {
2896                let (idx, _) = remaining[i];
2897                let key = keys[idx].as_ref();
2898                let Some(table) = run.get_for_key_cmp(key, comparator) else {
2899                    i += 1;
2900                    continue;
2901                };
2902                let table_id = table.id();
2903                let mut batch: Vec<(&[u8], u64)> = Vec::new();
2904                let mut batch_idx: Vec<usize> = Vec::new();
2905                while i < remaining.len() {
2906                    let (jdx, jhash) = remaining[i];
2907                    let jkey = keys[jdx].as_ref();
2908                    match run.get_for_key_cmp(jkey, comparator) {
2909                        Some(t) if t.id() == table_id => {
2910                            batch.push((jkey, jhash));
2911                            batch_idx.push(jdx);
2912                            i += 1;
2913                        }
2914                        _ => break,
2915                    }
2916                }
2917                if let Some((file, table_seqno, special, blocks)) =
2918                    table.plan_block_tasks(&batch, seqno)?
2919                {
2920                    for (handle, positions) in blocks {
2921                        let task_keys: Vec<usize> =
2922                            positions.iter().map(|&pos| batch_idx[pos]).collect();
2923                        tasks.push(BlockTask {
2924                            table,
2925                            file: Arc::clone(&file),
2926                            handle,
2927                            table_seqno,
2928                            special,
2929                            keys: task_keys,
2930                        });
2931                    }
2932                }
2933            }
2934        }
2935        Ok(tasks)
2936    }
2937
2938    /// Resolves an ENTIRE level by reading its blocks in chunks into a scratch and
2939    /// point-reading directly (no cache, no eviction). Called after
2940    /// [`Tree::prewarm_level_cross_sst`] signals the cold working set is too large
2941    /// to warm. Returns `Ok(true)` when it resolved the level (results updated,
2942    /// found keys dropped from `still_remaining`); `Ok(false)` when the level has
2943    /// no blocks to read for this batch (every key bloom-skips) or holds a
2944    /// Page-ECC / columnar table, in which cases the caller falls through to the
2945    /// serial resolve.
2946    #[expect(
2947        clippy::indexing_slicing,
2948        reason = "start/end stay within tasks by construction"
2949    )]
2950    fn resolve_level_chunked<K: AsRef<[u8]>>(
2951        fs: &dyn crate::fs::Fs,
2952        level: &crate::version::Level,
2953        still_remaining: &mut Vec<(usize, u64)>,
2954        keys: &[K],
2955        seqno: SeqNo,
2956        comparator: &dyn crate::comparator::UserComparator,
2957        results: &mut [Option<InternalValue>],
2958    ) -> crate::Result<bool> {
2959        let tasks = Self::plan_level_block_tasks(level, still_remaining, keys, seqno, comparator)?;
2960        let Some(first) = tasks.first() else {
2961            return Ok(false);
2962        };
2963        // A Page-ECC / columnar table covers some of these keys (only possible
2964        // when the columnar/ECC policy differs between the SSTs in this level).
2965        // The scratch decode path is row-format only, so hand the whole level to
2966        // the serial resolve, which loads those blocks through their format-aware
2967        // path. The scratch fast path stays homogeneous and row-only.
2968        if tasks.iter().any(|t| t.special) {
2969            return Ok(false);
2970        }
2971        // Read blocks in chunks of at most half the shared cache, so a chunk's
2972        // scratch never dwarfs the cache it is meant to spare. `.max(1)` keeps the
2973        // chunk loop's `end > start` guard the sole progress condition when the
2974        // cache is disabled (capacity 0).
2975        let budget = (first.table.cache_capacity() / 2).max(1);
2976
2977        let mut start = 0;
2978        while start < tasks.len() {
2979            let mut bytes = 0u64;
2980            let mut end = start;
2981            while end < tasks.len() {
2982                let sz = u64::from(tasks[end].handle.size());
2983                if end > start && bytes + sz > budget {
2984                    break;
2985                }
2986                bytes += sz;
2987                end += 1;
2988            }
2989            Self::resolve_block_task_chunk(fs, &tasks[start..end], keys, results)?;
2990            start = end;
2991        }
2992        still_remaining.retain(|&(idx, _)| results[idx].is_none());
2993        Ok(true)
2994    }
2995
2996    /// Reads one chunk of block-tasks in ONE cross-file `read_blocks_batched`,
2997    /// decodes each from its scratch buffer, and point-reads its keys, keeping the
2998    /// highest-seqno hit per key in `results`. Every task is row-format (the caller
2999    /// routes any level with a Page-ECC / columnar table to the serial resolve).
3000    #[expect(
3001        clippy::indexing_slicing,
3002        reason = "buffers is built from chunk so indices align; key indices are valid (caller's keys/results aligned)"
3003    )]
3004    fn resolve_block_task_chunk<K: AsRef<[u8]>>(
3005        fs: &dyn crate::fs::Fs,
3006        chunk: &[BlockTask<'_>],
3007        keys: &[K],
3008        results: &mut [Option<InternalValue>],
3009    ) -> crate::Result<()> {
3010        let mut buffers: Vec<Vec<u8>> = chunk
3011            .iter()
3012            .map(|t| vec![0u8; t.handle.size() as usize])
3013            .collect();
3014        {
3015            let mut reqs: Vec<crate::fs::BlockRead<'_>> = chunk
3016                .iter()
3017                .zip(buffers.iter_mut())
3018                .map(|(t, buf)| crate::fs::BlockRead {
3019                    file: t.file.as_ref(),
3020                    offset: *t.handle.offset(),
3021                    buf: buf.as_mut_slice(),
3022                })
3023                .collect();
3024            fs.read_blocks_batched(&mut reqs)?;
3025        }
3026
3027        for (task, buf) in chunk.iter().zip(buffers.iter()) {
3028            if let Some(block) = task.table.decode_data_block_from_bytes(buf)? {
3029                for &kidx in &task.keys {
3030                    if let Some(item) = task.table.point_read_translated(
3031                        &block,
3032                        keys[kidx].as_ref(),
3033                        task.table_seqno,
3034                    )? {
3035                        Self::keep_highest(results, kidx, item);
3036                    }
3037                }
3038            }
3039        }
3040        Ok(())
3041    }
3042
3043    /// Keeps the higher-seqno of an existing result and a new candidate (the L0
3044    /// newest-version-wins merge; correct for L1+ too, where each key has one
3045    /// candidate).
3046    #[expect(
3047        clippy::indexing_slicing,
3048        reason = "idx is a valid key index aligned with results"
3049    )]
3050    fn keep_highest(results: &mut [Option<InternalValue>], idx: usize, item: InternalValue) {
3051        match &results[idx] {
3052            Some(current) if current.key.seqno >= item.key.seqno => {}
3053            _ => results[idx] = Some(item),
3054        }
3055    }
3056
3057    fn get_internal_entry_from_tables(
3058        version: &Version,
3059        key: &[u8],
3060        seqno: SeqNo,
3061        comparator: &dyn crate::comparator::UserComparator,
3062    ) -> crate::Result<Option<InternalValue>> {
3063        let key_hash = crate::hash::hash64(key);
3064        Self::find_in_tables::<TableEntry>(version, key, seqno, key_hash, comparator)
3065    }
3066
3067    /// Generic level-walk for point reads, monomorphized over the lookup result type.
3068    ///
3069    /// L0: check ALL runs, keep highest seqno (runs may not be newest-first).
3070    /// L1+: at most one run contains the key — return on first match.
3071    /// Once a level yields a match, lower levels cannot have newer data.
3072    fn find_in_tables<T: TablePointLookup>(
3073        version: &Version,
3074        key: &[u8],
3075        seqno: SeqNo,
3076        key_hash: u64,
3077        comparator: &dyn crate::comparator::UserComparator,
3078    ) -> crate::Result<Option<T>> {
3079        for (level_idx, level) in version.iter_levels().enumerate() {
3080            if level_idx == 0 {
3081                let mut best: Option<T> = None;
3082
3083                for run in level.iter() {
3084                    if let Some(table) = run.get_for_key_cmp(key, comparator)
3085                        && let Some(item) = T::lookup(table, key, seqno, key_hash)?
3086                    {
3087                        match &best {
3088                            Some(current) if current.entry_seqno() >= item.entry_seqno() => {}
3089                            _ => {
3090                                // Short-circuit: point reads use exclusive upper bound,
3091                                // so the highest visible seqno is read_seqno - 1.
3092                                // If matched, no other L0 run can have a higher one.
3093                                if item.entry_seqno().checked_add(1) == Some(seqno) {
3094                                    return Ok(item.filter_tombstone());
3095                                }
3096                                best = Some(item);
3097                            }
3098                        }
3099                    }
3100                }
3101
3102                if let Some(entry) = best {
3103                    return Ok(entry.filter_tombstone());
3104                }
3105            } else {
3106                // L1+ runs have non-overlapping key ranges. Once we find the
3107                // covering run (get_for_key_cmp returns Some), no other run in
3108                // this level can contain the key — break regardless of hit/miss.
3109                for run in level.iter() {
3110                    if let Some(table) = run.get_for_key_cmp(key, comparator) {
3111                        if let Some(item) = T::lookup(table, key, seqno, key_hash)? {
3112                            return Ok(item.filter_tombstone());
3113                        }
3114                        break;
3115                    }
3116                }
3117            }
3118        }
3119
3120        Ok(None)
3121    }
3122
3123    pub(crate) fn get_internal_entry_from_sealed_memtables(
3124        super_version: &SuperVersion,
3125        key: &[u8],
3126        seqno: SeqNo,
3127    ) -> Option<InternalValue> {
3128        for mt in super_version.sealed_memtables.iter().rev() {
3129            if let Some(entry) = mt.get(key, seqno) {
3130                return Some(entry);
3131            }
3132        }
3133
3134        None
3135    }
3136
3137    pub(crate) fn get_version_for_snapshot(&self, seqno: SeqNo) -> SuperVersion {
3138        self.version_history.read().get_version_for_snapshot(seqno)
3139    }
3140
3141    /// Normalizes a user-provided range into owned `Bound<Slice>` values.
3142    ///
3143    /// Returns a tuple containing:
3144    /// - the `OwnedBounds` that mirror the original bounds semantics (including
3145    ///   inclusive/exclusive markers and unbounded endpoints), and
3146    /// - a `bool` flag indicating whether the normalized range is logically
3147    ///   empty (e.g., when the lower bound is greater than the upper bound).
3148    ///
3149    /// Callers can use the flag to detect empty ranges and skip further work
3150    /// while still having access to the normalized bounds for non-empty cases.
3151    fn range_bounds_to_owned_bounds<K: AsRef<[u8]>, R: RangeBounds<K>>(
3152        range: &R,
3153    ) -> (OwnedBounds, bool) {
3154        use Bound::{Excluded, Included, Unbounded};
3155
3156        let start = match range.start_bound() {
3157            Included(key) => Included(Slice::from(key.as_ref())),
3158            Excluded(key) => Excluded(Slice::from(key.as_ref())),
3159            Unbounded => Unbounded,
3160        };
3161
3162        let end = match range.end_bound() {
3163            Included(key) => Included(Slice::from(key.as_ref())),
3164            Excluded(key) => Excluded(Slice::from(key.as_ref())),
3165            Unbounded => Unbounded,
3166        };
3167
3168        let is_empty =
3169            if let (Included(lo) | Excluded(lo), Included(hi) | Excluded(hi)) = (&start, &end) {
3170                lo.as_ref() > hi.as_ref()
3171            } else {
3172                false
3173            };
3174
3175        (OwnedBounds { start, end }, is_empty)
3176    }
3177
3178    /// Opens an LSM-tree in the given directory.
3179    ///
3180    /// Will recover previous state if the folder was previously
3181    /// occupied by an LSM-tree, including the previous configuration.
3182    /// If not, a new tree will be initialized with the given config.
3183    ///
3184    /// After recovering a previous state, use `Tree::set_active_memtable`
3185    /// to fill the memtable with data from a write-ahead log for full durability.
3186    ///
3187    /// # Errors
3188    ///
3189    /// Returns error, if an IO error occurred.
3190    pub(crate) fn open(config: Config) -> crate::Result<Self> {
3191        log::debug!("Opening LSM-tree at {}", config.path.display());
3192
3193        // Resolve the per-tree compaction compression pool once, at open: if the
3194        // caller supplied no shared pool but asked for >1 thread, build the
3195        // default rayon-backed pool now so every compaction reuses it (building
3196        // a pool per compaction would spawn threads on each run). A caller-
3197        // supplied pool is left untouched. Shadowed under `parallel` only, so
3198        // non-parallel builds don't carry an unused `mut`.
3199        #[cfg(feature = "parallel")]
3200        let config = {
3201            let mut config = config;
3202            if config.compaction_pool.is_none() && config.compaction_threads > 1 {
3203                config.compaction_pool = Some(Arc::new(
3204                    crate::table::writer::RayonSpawner::with_threads(config.compaction_threads)?,
3205                ));
3206            }
3207            config
3208        };
3209
3210        // Gate on the `page_ecc` cargo feature: caller asked for ECC
3211        // but the build does not link the Reed-Solomon codec. We have
3212        // no way to verify or recover RS parity without the codec, so
3213        // refuse to open rather than silently downgrade integrity.
3214        // Two surfaces to check:
3215        //   - `Config::page_ecc(true)`  → SST data-block ECC
3216        //   - `Config::with_runtime_config(RuntimeConfig { page_ecc: true, .. })`
3217        //     → manifest-Block ECC (consumed by manifest_blocks::writer)
3218        // Both silently no-op without the feature; refusing here is
3219        // the only place callers see a typed error.
3220        if (config.page_ecc || config.initial_runtime_config.page_ecc)
3221            && !cfg!(feature = "page_ecc")
3222        {
3223            return Err(crate::Error::PageEccUnsupported);
3224        }
3225
3226        // Acquire the cross-process directory lock BEFORE any manifest access
3227        // (the `CURRENT` probe + `has_existing_version_state` check below, and
3228        // the recover / create paths). Acquiring it here makes `open()`
3229        // exclusive end-to-end: a concurrent opener fails fast with
3230        // `Error::Locked` instead of racing through the probe and observing a
3231        // peer's half-created directory (which would surface as the InvalidData
3232        // "half-written checkpoint" path rather than `Locked`). The `LOCK` file
3233        // needs its directory to exist, so create the root directory first
3234        // (idempotent; `create_new` re-creates the `tables/` subtree). The lock
3235        // is threaded into the constructor so it lives for the tree's lifetime.
3236        #[cfg(feature = "std")]
3237        let directory_lock = {
3238            config.fs.create_dir_all(&config.path)?;
3239            crate::config::acquire_directory_lock(&*config.fs, &config.path, config.directory_lock)?
3240        };
3241
3242        // Check for old version
3243        if config.fs.exists(&config.path.join("version"))? {
3244            log::error!(
3245                "It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated."
3246            );
3247            return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
3248        }
3249
3250        // Decide between recovery and fresh creation atomically by attempting
3251        // to read the CURRENT version file. This avoids a TOCTOU race that
3252        // would occur if we probed with exists() first.
3253        let tree = match crate::version::recovery::get_current_version(
3254            &config.path,
3255            &*config.fs,
3256            config.encryption.clone(),
3257        ) {
3258            Ok(_) => Self::recover(
3259                config,
3260                #[cfg(feature = "std")]
3261                directory_lock,
3262            ),
3263            Err(crate::Error::Io(e)) if e.kind() == crate::io::ErrorKind::NotFound => {
3264                // Missing CURRENT MUST coincide with a directory that
3265                // has no version artifacts; otherwise we are looking at
3266                // a half-written checkpoint (or other interrupted
3267                // sealing). Silently calling `create_new` in that case
3268                // would overwrite the partial state with an empty tree,
3269                // turning a recoverable failure into data loss.
3270                if has_existing_version_state(&config.path, &*config.fs)? {
3271                    // Return Error::Io(InvalidData, ...) rather than
3272                    // Error::Unrecoverable so callers that don't read
3273                    // logs still get a programmatic surface with the
3274                    // path and remediation hint embedded. `log::error!`
3275                    // stays for human ops who DO watch logs and want
3276                    // the full context at the moment of failure (the
3277                    // structured error is what propagates up the call
3278                    // chain; the log line records the diagnosis next
3279                    // to the timestamp).
3280                    let msg = format!(
3281                        "Tree::open: refusing to recover {} — `current` pointer is missing \
3282                         but the directory still holds version artifacts (tables/, blobs/, \
3283                         or vN). This is the on-disk signature of a half-written checkpoint \
3284                         or interrupted sealing. Remove the partial directory and retry the \
3285                         checkpoint, or restore `current` from a backup before reopening.",
3286                        config.path.display(),
3287                    );
3288                    log::error!("{msg}");
3289                    return Err(crate::Error::from(crate::io::Error::new(
3290                        crate::io::ErrorKind::InvalidData,
3291                        msg,
3292                    )));
3293                }
3294                Self::create_new(
3295                    config,
3296                    #[cfg(feature = "std")]
3297                    directory_lock,
3298                )
3299            }
3300            Err(e) => Err(e),
3301        }?;
3302
3303        Ok(tree)
3304    }
3305
3306    /// Returns `true` if there are some tables that are being compacted.
3307    #[doc(hidden)]
3308    #[must_use]
3309    pub fn is_compacting(&self) -> bool {
3310        !self.compaction_state.lock().hidden_set().is_empty()
3311    }
3312
3313    /// Computed storage admission predicate backing
3314    /// [`AbstractTree::write_admission`].
3315    ///
3316    /// Cheap: reads in-memory size accounting only (no syscall). Returns
3317    /// `Ok(())` unless admission control is enabled AND a budget is set AND the
3318    /// live footprint plus reserved headroom exceeds it.
3319    /// Best-effort minimum free space across every filesystem this tree writes
3320    /// to: the primary data path AND each per-level route (`Config::level_routes`
3321    /// can place cold-level SSTs on separate volumes). The admission gate must
3322    /// reflect the tightest volume, since a full routed disk fails compaction /
3323    /// flush targeting it even while the primary still has room.
3324    ///
3325    /// A backend that cannot report free space (or an I/O hiccup) yields
3326    /// `u64::MAX` = "no disk pressure", so a probe failure never falsely drives
3327    /// the tree read-only.
3328    fn probe_disk_free(&self) -> u64 {
3329        self.0.config.min_available_space()
3330    }
3331
3332    /// Disk-aware capacity figures for [`AbstractTree::storage_stats`], given the
3333    /// live footprint `used`: `(capacity, available, compaction_possible)`.
3334    ///
3335    /// `capacity` is the tighter of the configured quota and the physical disk
3336    /// headroom (`free + used`) — the same effective limit
3337    /// [`Self::compute_write_admission`] gates against — reported regardless of
3338    /// whether the admission gate is enabled (introspection is always available).
3339    /// `None` capacity/available means unbounded (no quota AND the backend
3340    /// cannot report free space). `compaction_possible` is `true` when unbounded
3341    /// or when at least [`MIN_RESERVED_HEADROOM`] of working room remains.
3342    pub(crate) fn admission_capacity(&self, used: u64) -> (Option<u64>, Option<u64>, bool) {
3343        let quota = self
3344            .0
3345            .runtime_config
3346            .load()
3347            .storage_limit_bytes
3348            .unwrap_or(u64::MAX);
3349        let free = self.probe_disk_free();
3350        // `free == u64::MAX` is the "backend can't report free space" sentinel:
3351        // adding `used` would overflow, so treat capacity as quota-only (the
3352        // explicit branch avoids the overflow without masking it with saturation).
3353        // Otherwise `free + used` ≤ ~2× disk capacity and cannot overflow u64.
3354        let capacity = if free == u64::MAX {
3355            quota
3356        } else {
3357            quota.min(free + used)
3358        };
3359        if capacity == u64::MAX {
3360            return (None, None, true);
3361        }
3362        // `available = max(0, capacity - used)`: an operator quota set below the
3363        // live footprint makes `capacity < used`, and available space cannot be
3364        // negative. The clamp-to-zero IS the intended semantics here.
3365        let available = capacity.saturating_sub(used);
3366        (
3367            Some(capacity),
3368            Some(available),
3369            available >= MIN_RESERVED_HEADROOM,
3370        )
3371    }
3372
3373    /// The logical partition-quota headroom for the two-layer space model:
3374    /// `max(0, storage_limit_bytes - used)`, or `u64::MAX` when no quota is set.
3375    ///
3376    /// This is Layer 1 (volume-agnostic) of [`crate::compaction::worker::space_fits_two_layer`];
3377    /// the physical free-space probe is Layer 2. An operator quota set below the
3378    /// live footprint leaves zero headroom — the clamp-to-zero is the intended
3379    /// min-semantics, not masking.
3380    pub(crate) fn quota_headroom(&self, used: u64) -> u64 {
3381        self.0
3382            .runtime_config
3383            .load()
3384            .storage_limit_bytes
3385            .map_or(u64::MAX, |limit| limit.saturating_sub(used))
3386    }
3387
3388    /// Whether the opt-in storage admission gate is active (a near-full disk or
3389    /// configured quota can drive the tree read-only and gate compaction space).
3390    /// Capacity introspection figures are reported regardless; this only governs
3391    /// whether the gate actually enforces.
3392    pub(crate) fn storage_admission_enabled(&self) -> bool {
3393        self.0.runtime_config.load().storage_admission_check
3394    }
3395
3396    #[expect(
3397        clippy::significant_drop_tightening,
3398        reason = "the admission cache lock intentionally spans the recompute \
3399                  (stat + disk-free probe) so concurrent admission checks \
3400                  coalesce on a single probe rather than each issuing a syscall"
3401    )]
3402    fn compute_write_admission(&self) -> crate::Result<()> {
3403        let rc = self.0.runtime_config.load();
3404        if !rc.storage_admission_check {
3405            return Ok(());
3406        }
3407
3408        // Take ONE coherent snapshot of the latest super-version and derive
3409        // BOTH the on-disk footprint and the pending-memtable bytes from it.
3410        // Reading them from two separate `latest_version()` loads would be a
3411        // TOCTOU bug: a flush installing a new version between the two reads
3412        // could pair an old (larger) disk usage with new (smaller) pending
3413        // bytes — or vice versa — and open the gate incorrectly.
3414        let super_version = self.version_history.read().latest_version();
3415        let vid = super_version.version.id();
3416
3417        // True physical footprint, including blob files — the SAME basis
3418        // `storage_stats()` reports, so the gate and the reported usage agree.
3419        // NOT `disk_space()` (metadata Level::size, which omits blob files and
3420        // undercounts the physical file by the meta block / footer).
3421        //
3422        // Cached so gated writes don't re-stat every live file or re-probe disk
3423        // on every call. `used_bytes` only changes when a new version is
3424        // installed (flush / compaction), so it is recomputed on a version
3425        // change. `disk_free` can change under us (another process writing the
3426        // same filesystem), so it is ALSO re-probed once its sample is older
3427        // than `ADMISSION_DISK_FREE_TTL` — bounding staleness without a syscall
3428        // per write. `update_runtime_config` resets the entry for an immediate
3429        // re-probe. The values live behind one mutex as a coherent unit (see
3430        // `TreeInner::admission_used_cache`).
3431        //
3432        // The TTL fast-path is std-only: under `no_std` there is no monotonic
3433        // clock (`crate::time::Instant::elapsed` is a zero stub), so an
3434        // elapsed-time window cannot bound staleness — a same-version sample
3435        // would otherwise look fresh forever and a filling disk would never be
3436        // re-probed. Under `no_std` the fast-path is skipped, so `disk_free` is
3437        // re-probed on every gated write (the `used` footprint stays cached by
3438        // version either way), keeping admission safe without a monotonic clock.
3439        let now = crate::time::Instant::now();
3440        let (used, disk_free) = {
3441            let mut cache = self.0.admission_used_cache.lock();
3442            match *cache {
3443                // Fresh: same version AND disk sample within the TTL. std-only —
3444                // `cfg!(feature = "std")` is `false` under `no_std`, so the guard
3445                // short-circuits there and the next arm re-probes every call.
3446                Some((cvid, used, free, at))
3447                    if cvid == vid
3448                        && cfg!(feature = "std")
3449                        && at.elapsed() < ADMISSION_DISK_FREE_TTL =>
3450                {
3451                    (used, free)
3452                }
3453                // Same version, stale disk sample: keep `used`, re-probe disk.
3454                Some((cvid, used, _, _)) if cvid == vid => {
3455                    let free = self.probe_disk_free();
3456                    *cache = Some((vid, used, free, now));
3457                    (used, free)
3458                }
3459                // New version (or unset): recompute footprint and re-probe disk.
3460                _ => {
3461                    let used = crate::storage_stats::compute_used_bytes(&super_version.version)?;
3462                    let free = self.probe_disk_free();
3463                    *cache = Some((vid, used, free, now));
3464                    (used, free)
3465                }
3466            }
3467        };
3468
3469        // Effective limit is the tighter of the configured quota and the
3470        // physical disk headroom (free + what we already occupy): the disk can
3471        // fill from other processes even below a generous quota, and a tree with
3472        // no quota at all must still stop before ENOSPC. `None` quota = unbounded
3473        // by configuration; disk-free then alone bounds it.
3474        //
3475        // `disk_free` is the MINIMUM free across every volume the tree writes to
3476        // (`probe_disk_free` mins the primary path and all `level_routes`). The
3477        // `+ used` here is NOT an accounting of one volume's usage against
3478        // another's free space — it cancels out of the disk branch of the gate:
3479        // passing requires `used + reserved <= disk_free + used`, i.e.
3480        // `reserved <= disk_free`. So a passing gate guarantees the TIGHTEST
3481        // volume alone has at least `reserved` free — a conservative per-volume
3482        // headroom, never the sum of an empty routed volume's slack plus an
3483        // unrelated full volume's occupancy. A route that drops below `reserved`
3484        // free drives the whole tree read-only, exactly so a later flush /
3485        // compaction targeting that route cannot hit ENOSPC.
3486        let quota = rc.storage_limit_bytes.unwrap_or(u64::MAX);
3487        // `disk_free == u64::MAX` is the "backend can't report" sentinel; adding
3488        // `used` would overflow, so treat the limit as quota-only (explicit
3489        // branch, no saturation masking). Otherwise `disk_free + used` ≤ ~2× disk
3490        // capacity and cannot overflow u64.
3491        let limit = if disk_free == u64::MAX {
3492            quota
3493        } else {
3494            quota.min(disk_free + used)
3495        };
3496        // Both sources unbounded → nothing to gate.
3497        if limit == u64::MAX {
3498            return Ok(());
3499        }
3500
3501        // Reserved headroom keeps the soft budget from becoming a hard wall:
3502        // enough to flush every pending memtable (plus a margin for the
3503        // index/filter/footer overhead a flush adds) so a queued flush always
3504        // fits at the limit, with a floor for compaction working space.
3505        // Internal flush / compaction are never gated, so this band is the
3506        // engine's room to reclaim.
3507        //
3508        // Count ALL pending memtable bytes in this snapshot — the active one AND
3509        // any sealed (rotated) memtables awaiting flush — not just the active
3510        // one: after a rotation the active memtable is empty but the sealed
3511        // memtable's queued flush will still consume disk, so it must be
3512        // reserved for. Memtable sizes are bounded by RAM, so the sum (and the
3513        // +1/8 overhead margin below) cannot overflow u64 → plain arithmetic.
3514        let pending_memtable_bytes: u64 = super_version.active_memtable.size()
3515            + super_version
3516                .sealed_memtables
3517                .iter()
3518                .map(|m| m.size())
3519                .sum::<u64>();
3520
3521        let reserved =
3522            (pending_memtable_bytes + pending_memtable_bytes / 8).max(MIN_RESERVED_HEADROOM);
3523        // `used` (disk) + `reserved` (RAM-bounded) cannot realistically overflow,
3524        // but keep the comparison fail-closed with checked arithmetic: any
3525        // overflow means "definitely over budget", so deny.
3526        match used.checked_add(reserved) {
3527            Some(total) if total <= limit => Ok(()),
3528            _ => Err(crate::Error::StorageFull { used, limit }),
3529        }
3530    }
3531
3532    fn inner_compact(
3533        &self,
3534        strategy: Arc<dyn CompactionStrategy>,
3535        mvcc_gc_watermark: SeqNo,
3536    ) -> crate::Result<crate::compaction::CompactionResult> {
3537        use crate::compaction::worker::{Options, do_compaction};
3538
3539        let mut opts = Options::from_tree(self, strategy);
3540        opts.mvcc_gc_watermark = mvcc_gc_watermark;
3541
3542        let result = do_compaction(&opts)?;
3543
3544        log::debug!("Compaction run over");
3545
3546        Ok(result)
3547    }
3548
3549    #[doc(hidden)]
3550    #[must_use]
3551    pub fn create_iter(
3552        &self,
3553        seqno: SeqNo,
3554        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
3555    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
3556        self.create_range::<UserKey, _>(&.., seqno, ephemeral)
3557    }
3558
3559    #[doc(hidden)]
3560    pub fn create_range<'a, K: AsRef<[u8]> + 'a, R: RangeBounds<K> + 'a>(
3561        &self,
3562        range: &'a R,
3563        seqno: SeqNo,
3564        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
3565    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
3566        let super_version = self.version_history.read().get_version_for_snapshot(seqno);
3567
3568        Self::create_internal_range(
3569            super_version,
3570            range,
3571            seqno,
3572            ephemeral,
3573            self.config.merge_operator.clone(),
3574            self.config.comparator.clone(),
3575        )
3576        .map(|item| match item {
3577            Ok(kv) => Ok((kv.key.user_key, kv.value)),
3578            Err(e) => Err(e),
3579        })
3580    }
3581
3582    /// Build a [`SeekableTreeIter`](crate::range::SeekableTreeIter) over
3583    /// `[lo, hi)`. Source collection (Phase 1) runs once; repositions reuse it.
3584    #[doc(hidden)]
3585    #[must_use]
3586    pub fn create_seekable_range_bounds(
3587        &self,
3588        lo: Bound<UserKey>,
3589        hi: Bound<UserKey>,
3590        seqno: SeqNo,
3591        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
3592    ) -> crate::range::SeekableTreeIter {
3593        use crate::range::{IterState, SeekableTreeIter};
3594
3595        let super_version = self.version_history.read().get_version_for_snapshot(seqno);
3596
3597        let iter_state = IterState {
3598            version: super_version,
3599            ephemeral,
3600            merge_operator: self.config.merge_operator.clone(),
3601            comparator: self.config.comparator.clone(),
3602            prefix_hash: None,
3603            key_hash: None,
3604            bloom_key: None,
3605            #[cfg(feature = "metrics")]
3606            metrics: Some(self.0.metrics.clone()),
3607        };
3608
3609        SeekableTreeIter::create(iter_state, lo, hi, seqno)
3610    }
3611
3612    #[doc(hidden)]
3613    pub fn create_prefix<'a, K: AsRef<[u8]> + 'a>(
3614        &self,
3615        prefix: K,
3616        seqno: SeqNo,
3617        ephemeral: Option<(Arc<Memtable>, SeqNo)>,
3618    ) -> impl DoubleEndedIterator<Item = crate::Result<KvPair>> + 'static {
3619        use crate::prefix::compute_prefix_hash;
3620        use crate::range::{IterState, TreeIter, prefix_to_range};
3621
3622        let prefix_bytes = prefix.as_ref();
3623
3624        let prefix_hash = compute_prefix_hash(self.config.prefix_extractor.as_ref(), prefix_bytes);
3625
3626        let range = prefix_to_range(prefix_bytes);
3627
3628        let super_version = self.version_history.read().get_version_for_snapshot(seqno);
3629
3630        let iter_state = IterState {
3631            version: super_version,
3632            ephemeral,
3633            merge_operator: self.config.merge_operator.clone(),
3634            comparator: self.config.comparator.clone(),
3635            prefix_hash,
3636            key_hash: None,
3637            bloom_key: None,
3638            #[cfg(feature = "metrics")]
3639            metrics: Some(self.0.metrics.clone()),
3640        };
3641
3642        TreeIter::create_range(iter_state, range, seqno).map(|item| match item {
3643            Ok(kv) => Ok((kv.key.user_key, kv.value)),
3644            Err(e) => Err(e),
3645        })
3646    }
3647
3648    /// Adds an item to the active memtable.
3649    ///
3650    /// Returns the added item's size and new size of the memtable.
3651    #[doc(hidden)]
3652    #[must_use]
3653    pub fn append_entry(&self, value: InternalValue) -> (u64, u64) {
3654        use crate::runtime_config::{KvChecksumComputePoint, KvChecksumPolicy};
3655
3656        // Per-KV residence digest (KvChecksumComputePoint::AtInsert): compute
3657        // the entry's 4-byte logical-content digest now, so a RAM bit-flip
3658        // while it sits in the memtable is caught at flush. The digest covers
3659        // the OWNED `value` and is independent of which active memtable
3660        // receives it, so computing it before taking the version-history guard
3661        // is correct (a concurrent rotation just routes the same value+digest
3662        // into the new active memtable) AND keeps the hash out of the read-lock
3663        // critical section. Reading the live snapshot is a cheap arc-swap load;
3664        // under the default `AtBlockCompile` (or `Off`) the two `matches!`
3665        // checks short-circuit and no digest is computed, so the hot insert
3666        // path is unchanged.
3667        let kv_digest = {
3668            let rc = self.0.runtime_config.load();
3669            if matches!(
3670                rc.kv_checksum_compute_point,
3671                KvChecksumComputePoint::AtInsert
3672            ) && !matches!(rc.kv_checksums, KvChecksumPolicy::Off)
3673            {
3674                crate::table::block::kv_checksum::kv_digest(&value, rc.kv_checksum_algo).map(|d| {
3675                    #[expect(
3676                        clippy::cast_possible_truncation,
3677                        reason = "AtInsert is config-validated to a 4-byte algorithm; the digest fits u32"
3678                    )]
3679                    let lo = d as u32;
3680                    (lo, rc.kv_checksum_algo)
3681                })
3682            } else {
3683                None
3684            }
3685        };
3686
3687        // The `.read()` guard is a temporary that lives until the end of this
3688        // statement, so the insert runs under the version-history read lock:
3689        // `value` + its digest land in the current active memtable atomically,
3690        // and a concurrent `rotate_memtable()` cannot seal it mid-insert.
3691        self.version_history
3692            .read()
3693            .latest_version()
3694            .active_memtable
3695            .insert_with_kv_digest(value, kv_digest)
3696    }
3697
3698    /// Adds multiple items to the active memtable in bulk.
3699    ///
3700    /// Acquires the version-history lock once and delegates to
3701    /// [`Memtable::insert_batch`] for batch size accounting.
3702    ///
3703    /// Returns the total bytes added and new size of the memtable.
3704    #[doc(hidden)]
3705    #[must_use]
3706    pub(crate) fn append_batch(&self, items: Vec<InternalValue>) -> (u64, u64) {
3707        use crate::runtime_config::{KvChecksumComputePoint, KvChecksumPolicy};
3708
3709        // Per-KV residence digest under AtInsert (see `append_entry`): pass the
3710        // algorithm so the bulk path fixes each entry's digest at insert. The
3711        // default path passes `None` and is unchanged.
3712        let kv_algo = {
3713            let rc = self.0.runtime_config.load();
3714            if matches!(
3715                rc.kv_checksum_compute_point,
3716                KvChecksumComputePoint::AtInsert
3717            ) && !matches!(rc.kv_checksums, KvChecksumPolicy::Off)
3718            {
3719                Some(rc.kv_checksum_algo)
3720            } else {
3721                None
3722            }
3723        };
3724
3725        // Hold the read guard for the entire insert to prevent rotate_memtable()
3726        // from sealing this memtable mid-batch (which could cause data loss if
3727        // a concurrent flush persists only a prefix of the batch).
3728        self.version_history
3729            .read()
3730            .latest_version()
3731            .active_memtable
3732            .insert_batch_with_kv_algo(items, kv_algo)
3733    }
3734
3735    /// Recovers previous state, by loading the level manifest, tables and blob files.
3736    ///
3737    /// # Errors
3738    ///
3739    /// Returns error, if an IO error occurred.
3740    #[expect(
3741        clippy::too_many_lines,
3742        reason = "Tree::recover threads the whole open sequence (CURRENT validation, \
3743                  Manifest decode, encryption + runtime plumbing, version recovery, \
3744                  TreeInner assembly) — splitting it would create helper functions whose \
3745                  only caller is this one site"
3746    )]
3747    fn recover(
3748        mut config: Config,
3749        // The cross-process directory lock acquired by `Tree::open` before the
3750        // manifest probe; held for the tree's lifetime via
3751        // `TreeInner::_directory_lock`.
3752        #[cfg(feature = "std")] directory_lock: Option<Box<dyn crate::fs::FsFile>>,
3753    ) -> crate::Result<Self> {
3754        use crate::stop_signal::StopSignal;
3755        use inner::get_next_tree_id;
3756
3757        log::info!("Recovering LSM-tree at {}", config.path.display());
3758
3759        // Validate manifest metadata (format version, comparator name)
3760        // BEFORE recover_levels, so a rejected open is side-effect free
3761        // — recover_levels loads tables and cleans up orphans.
3762        // Tree type is checked after recovery (needs the Version object).
3763        // NOTE: the version file is read twice (here for metadata, then inside
3764        // recover_levels for table/blob data). This is intentional — metadata
3765        // validation must complete before any disk-mutating recovery work.
3766        // Version id of the on-disk snapshot CURRENT references. This is the
3767        // base the edit log replays on top of; the live version id can be higher
3768        // (it has no `v{id}` file of its own). Threaded into the version history
3769        // so the next persist appends to / rotates the right snapshot's log.
3770        let snapshot_id = crate::version::recovery::get_current_version(
3771            &config.path,
3772            &*config.fs,
3773            config.encryption.clone(),
3774        )?;
3775        {
3776            let version_id = snapshot_id;
3777            let manifest_path = config.path.join(format!("v{version_id}"));
3778            // Open the manifest with a default runtime snapshot:
3779            // ECC awareness is captured per-Block via the header
3780            // (`ECC_PARITY` flag) so the reader doesn't actually
3781            // need to know which ECC mode the writer used. The
3782            // captured runtime here is a placeholder; once we want
3783            // runtime-driven decisions on the read path (e.g.
3784            // checksum_algo dispatch per #298) we'll seed it from
3785            // Config + persisted format-version fields.
3786            let mut archive_reader = crate::manifest_blocks::reader::ManifestArchiveReader::open(
3787                &manifest_path,
3788                &*config.fs,
3789                alloc::sync::Arc::new(crate::runtime_config::RuntimeConfig::default()),
3790                config.encryption.clone(),
3791            )?;
3792            let manifest = Manifest::decode_from(&mut archive_reader)?;
3793
3794            if !matches!(manifest.version, FormatVersion::V5) {
3795                return Err(crate::Error::InvalidVersion(manifest.version.into()));
3796            }
3797
3798            let supplied_name = config.comparator.name();
3799            if manifest.comparator_name != supplied_name {
3800                log::warn!(
3801                    "Comparator mismatch: tree was created with {:?} but opened with {:?}",
3802                    manifest.comparator_name,
3803                    supplied_name,
3804                );
3805                return Err(crate::Error::ComparatorMismatch {
3806                    stored: manifest.comparator_name,
3807                    supplied: supplied_name,
3808                });
3809            }
3810
3811            // IMPORTANT: Restore persisted config
3812            config.level_count = manifest.level_count;
3813        }
3814
3815        let tree_id = get_next_tree_id();
3816
3817        #[cfg(feature = "metrics")]
3818        let metrics = Arc::new(Metrics::default());
3819
3820        let version = Self::recover_levels(
3821            &config.path,
3822            tree_id,
3823            &config,
3824            #[cfg(feature = "metrics")]
3825            &metrics,
3826        )?;
3827
3828        {
3829            let requested_tree_type = match config.kv_separation_opts {
3830                Some(_) => crate::TreeType::Blob,
3831                None => crate::TreeType::Standard,
3832            };
3833
3834            if version.tree_type() != requested_tree_type {
3835                log::error!(
3836                    "Tried to open a {requested_tree_type:?}Tree, but the existing tree is of type {:?}Tree. This indicates a misconfiguration or corruption.",
3837                    version.tree_type(),
3838                );
3839                return Err(crate::Error::Unrecoverable);
3840            }
3841        }
3842
3843        let highest_table_id = version
3844            .iter_tables()
3845            .map(Table::id)
3846            .max()
3847            .unwrap_or_default();
3848
3849        let comparator = config.comparator.clone();
3850
3851        let deletion_pause = crate::deletion_pause::DeletionPause::new_shared();
3852        #[cfg(feature = "std")]
3853        let background_deleter = Arc::new(crate::BackgroundDeleter::new(None));
3854        let heal_hints =
3855            crate::heal_hints::HealHints::new_shared(config.initial_runtime_config.auto_heal);
3856
3857        // Clone the seed snapshot BEFORE moving config into the Arc
3858        // below — the runtime handle initializer needs it after the
3859        // move.
3860        let initial_runtime = config.initial_runtime_config.clone();
3861        let sync_mode = config.sync_mode;
3862        let super_versions = SuperVersions::new(
3863            version,
3864            &comparator,
3865            sync_mode,
3866            snapshot_id,
3867            config.manifest_log_rotate_bytes,
3868        );
3869        #[cfg(feature = "std")]
3870        let latest_super_version = super_versions.latest_handle();
3871        let inner = TreeInner {
3872            id: tree_id,
3873            memtable_id_counter: SequenceNumberCounter::new(1),
3874            table_id_counter: SequenceNumberCounter::new(highest_table_id + 1),
3875            blob_file_id_counter: SequenceNumberCounter::default(),
3876            version_history: Arc::new(RwLock::new(super_versions)),
3877            #[cfg(feature = "std")]
3878            latest_super_version,
3879            stop_signal: StopSignal::default(),
3880            config: Arc::new(config),
3881            major_compaction_lock: RwLock::default(),
3882            flush_lock: Mutex::default(),
3883            #[cfg(feature = "std")]
3884            _directory_lock: directory_lock,
3885            compaction_state: Arc::new(Mutex::new(CompactionState::default())),
3886            deletion_pause: Arc::clone(&deletion_pause),
3887            #[cfg(feature = "std")]
3888            background_deleter: Arc::clone(&background_deleter),
3889            heal_hints: Arc::clone(&heal_hints),
3890            runtime_config: Arc::new(crate::runtime_config::handle::RuntimeConfigHandle::new(
3891                initial_runtime,
3892            )),
3893            admission_used_cache: Mutex::new(None),
3894
3895            #[cfg(feature = "metrics")]
3896            metrics,
3897        };
3898
3899        // Install the pause on every recovered table / blob file so their
3900        // Drop impls consult it when a checkpoint is in flight. Snapshot
3901        // the Arc handles into owned collections so the read lock is
3902        // released before iterating (avoids `significant_drop_tightening`).
3903        // Snapshot the version under the read lock, then drop the lock before
3904        // collecting so the version_history lock isn't held across the clones.
3905        let version = inner.version_history.read().latest_version().version;
3906        let recovered_tables: Vec<Table> = version.iter_tables().cloned().collect();
3907        let recovered_blobs: Vec<BlobFile> = version.blob_files.iter().cloned().collect();
3908
3909        for table in &recovered_tables {
3910            table.install_deletion_pause(Arc::clone(&deletion_pause));
3911            #[cfg(feature = "std")]
3912            table.install_background_deleter(Arc::clone(&background_deleter));
3913            table.install_heal_hints(Arc::clone(&heal_hints));
3914        }
3915        for blob_file in &recovered_blobs {
3916            blob_file.install_deletion_pause(Arc::clone(&deletion_pause));
3917            #[cfg(feature = "std")]
3918            blob_file.install_background_deleter(Arc::clone(&background_deleter));
3919        }
3920
3921        Ok(Self(Arc::new(inner)))
3922    }
3923
3924    /// Creates a new LSM-tree in a directory.
3925    fn create_new(
3926        config: Config,
3927        // The cross-process directory lock acquired by `Tree::open`, held for
3928        // the tree's lifetime.
3929        #[cfg(feature = "std")] directory_lock: Option<Box<dyn crate::fs::FsFile>>,
3930    ) -> crate::Result<Self> {
3931        use crate::file::fsync_directory;
3932
3933        let path = config.path.clone();
3934        log::trace!("Creating LSM-tree at {}", path.display());
3935
3936        let sync_mode = config.sync_mode;
3937
3938        (*config.fs).create_dir_all(&path)?;
3939
3940        // Create tables directories for all configured paths (primary + routes).
3941        // create_dir_all may create both <route> and <route>/tables.
3942        // Fsync the tables dir, its parent (route dir), AND the route's parent
3943        // to make all newly-created directory entries durable on POSIX.
3944        for (table_folder_path, folder_fs) in config.all_tables_folders() {
3945            folder_fs.create_dir_all(&table_folder_path)?;
3946            fsync_directory(&table_folder_path, &*folder_fs, sync_mode)?;
3947            if let Some(parent) = table_folder_path.parent() {
3948                fsync_directory(parent, &*folder_fs, sync_mode)?;
3949                if let Some(grandparent) = parent.parent() {
3950                    fsync_directory(grandparent, &*folder_fs, sync_mode)?;
3951                }
3952            }
3953        }
3954
3955        // IMPORTANT: fsync primary folder on Unix
3956        fsync_directory(&path, &*config.fs, sync_mode)?;
3957
3958        let inner = TreeInner::create_new(
3959            config,
3960            #[cfg(feature = "std")]
3961            directory_lock,
3962        )?;
3963        Ok(Self(Arc::new(inner)))
3964    }
3965
3966    /// Recovers the level manifest, loading all tables from disk.
3967    ///
3968    /// When [`level_routes`](Config::level_routes) is configured, all
3969    /// configured table folders are scanned so tables on different storage
3970    /// tiers are discovered correctly.
3971    #[expect(
3972        clippy::too_many_lines,
3973        reason = "recovery logic is inherently complex"
3974    )]
3975    fn recover_levels<P: AsRef<Path>>(
3976        tree_path: P,
3977        tree_id: TreeId,
3978        config: &Config,
3979        #[cfg(feature = "metrics")] metrics: &Arc<Metrics>,
3980    ) -> crate::Result<Version> {
3981        use crate::{TableId, file::fsync_directory};
3982
3983        let tree_path = tree_path.as_ref();
3984
3985        let recovery = recover(
3986            tree_path,
3987            &*config.fs,
3988            config.manifest_recovery_mode,
3989            config.encryption.clone(),
3990        )?;
3991
3992        // The on-disk snapshot CURRENT points at — the generation orphan cleanup
3993        // must preserve. Intermediate versions live only in the edit log, so the
3994        // latest version id (`version.id()`) has no `v{id}` file of its own.
3995        let snapshot_id = recovery.snapshot_id;
3996
3997        let mut table_map = {
3998            let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum, SeqNo)> =
3999                crate::HashMap::default();
4000
4001            for (level_idx, table_ids) in recovery.table_ids.iter().enumerate() {
4002                for run in table_ids {
4003                    for table in run {
4004                        #[expect(
4005                            clippy::expect_used,
4006                            reason = "there are always less than 256 levels"
4007                        )]
4008                        result.insert(
4009                            table.id,
4010                            (
4011                                level_idx
4012                                    .try_into()
4013                                    .expect("there are less than 256 levels"),
4014                                table.checksum,
4015                                table.global_seqno,
4016                            ),
4017                        );
4018                    }
4019                }
4020            }
4021
4022            result
4023        };
4024
4025        let cnt = table_map.len();
4026
4027        log::debug!("Recovering {cnt} tables from {}", tree_path.display());
4028
4029        let progress_mod = match cnt {
4030            _ if cnt <= 20 => 1,
4031            _ if cnt <= 100 => 10,
4032            _ => 100,
4033        };
4034
4035        let mut tables = vec![];
4036        // Track recovered table IDs so duplicate sightings (via symlinks,
4037        // junctions, or case-insensitive aliases of the same directory) are
4038        // skipped rather than orphan-deleted.
4039        let mut recovered_table_ids: crate::HashSet<TableId> = crate::HashSet::default();
4040        let mut orphaned_tables: Vec<(crate::path::PathBuf, Arc<dyn crate::fs::Fs>)> = vec![];
4041
4042        // Scan all configured table folders (primary + level routes).
4043        let all_folders = config.all_tables_folders();
4044
4045        for (table_base_folder, folder_fs) in &all_folders {
4046            if !folder_fs.exists(table_base_folder)? {
4047                folder_fs.create_dir_all(table_base_folder)?;
4048                fsync_directory(table_base_folder, &**folder_fs, config.sync_mode)?;
4049                if let Some(parent) = table_base_folder.parent() {
4050                    fsync_directory(parent, &**folder_fs, config.sync_mode)?;
4051                    if let Some(grandparent) = parent.parent() {
4052                        fsync_directory(grandparent, &**folder_fs, config.sync_mode)?;
4053                    }
4054                }
4055            }
4056
4057            for dirent in folder_fs.read_dir(table_base_folder)? {
4058                let crate::fs::FsDirEntry {
4059                    path: table_file_path,
4060                    file_name,
4061                    is_dir,
4062                } = dirent;
4063
4064                // https://en.wikipedia.org/wiki/.DS_Store
4065                if file_name == ".DS_Store" {
4066                    continue;
4067                }
4068
4069                // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats
4070                if file_name.starts_with("._") {
4071                    continue;
4072                }
4073
4074                let table_file_name = &file_name;
4075                if is_dir {
4076                    log::warn!(
4077                        "Skipping unexpected directory in tables folder: {}",
4078                        table_file_path.display()
4079                    );
4080                    continue;
4081                }
4082
4083                let table_id = table_file_name.parse::<TableId>().map_err(|e| {
4084                    log::error!("invalid table file name {table_file_name:?}: {e:?}");
4085                    crate::Error::Unrecoverable
4086                })?;
4087
4088                // Remove from map to prevent duplicate recovery if the same
4089                // table file exists in multiple scanned folders.
4090                if let Some((level_idx, checksum, global_seqno)) = table_map.remove(&table_id) {
4091                    let pin_filter = config.filter_block_pinning_policy.get(level_idx.into());
4092                    let pin_index = config.index_block_pinning_policy.get(level_idx.into());
4093
4094                    let table = Table::recover(
4095                        table_file_path,
4096                        checksum,
4097                        global_seqno,
4098                        tree_id,
4099                        table_id,
4100                        config.cache.clone(),
4101                        config.descriptor_table.clone(),
4102                        folder_fs.clone(),
4103                        pin_filter,
4104                        pin_index,
4105                        config.encryption.clone(),
4106                        #[cfg(zstd_any)]
4107                        config.zstd_dictionary.clone(),
4108                        config.comparator.clone(),
4109                        #[cfg(feature = "metrics")]
4110                        metrics.clone(),
4111                    )?;
4112
4113                    tables.push(table);
4114                    recovered_table_ids.insert(table_id);
4115
4116                    if tables.len() % progress_mod == 0 {
4117                        log::debug!("Recovered {}/{cnt} tables", tables.len());
4118                    }
4119                } else if recovered_table_ids.contains(&table_id) {
4120                    // Duplicate sighting of an already-recovered manifest table
4121                    // (e.g., via symlink or case-insensitive alias). Skip it —
4122                    // do NOT treat as orphan or the live SST will be deleted.
4123                    log::warn!(
4124                        "Skipping duplicate sighting of manifest table {table_id} in {}",
4125                        table_file_path.display(),
4126                    );
4127                } else {
4128                    orphaned_tables.push((table_file_path, folder_fs.clone()));
4129                }
4130            }
4131        }
4132
4133        if tables.len() < cnt {
4134            // Route configuration is NOT persisted.  This is a best-effort
4135            // heuristic: it checks each missing table's level against the
4136            // current routes, but cannot detect same-level path changes
4137            // (e.g., L0 routed to /hot_old → /hot_new).  Persisting route
4138            // provenance per-table in the manifest would enable exact
4139            // detection but requires a format change.
4140            //
4141            // - Level IS covered by a current route → its directory was scanned
4142            //   and the file was not found → data corruption / deletion.
4143            // - Level is NOT covered → falls back to primary (always scanned).
4144            //   If the table isn't there, it was likely in a route that has
4145            //   since been removed from the config.
4146            //
4147            // Return RouteMismatch only when ALL missing tables are on levels
4148            // not covered by any current route.  If ANY missing table is on a
4149            // covered level, at least one SST was genuinely lost.
4150            if let Some(routes) = &config.level_routes {
4151                let all_missing_uncovered = table_map
4152                    .values()
4153                    .all(|(level, _, _)| !routes.iter().any(|r| r.levels.contains(level)));
4154
4155                if all_missing_uncovered {
4156                    let found = tables.len();
4157                    let missing_ids: Vec<_> = table_map.keys().collect();
4158
4159                    log::error!(
4160                        "Route mismatch: expected {cnt} tables but found {found} — \
4161                         level_routes do not cover all previously used levels. \
4162                         Missing table IDs: {missing_ids:?}",
4163                    );
4164                    return Err(crate::Error::RouteMismatch {
4165                        expected: cnt,
4166                        found,
4167                    });
4168                }
4169            }
4170
4171            log::error!(
4172                "Recovered less tables than expected: {:?}",
4173                table_map.keys(),
4174            );
4175            return Err(crate::Error::Unrecoverable);
4176        }
4177
4178        log::debug!("Successfully recovered {} tables", tables.len());
4179
4180        let (blob_files, orphaned_blob_files) = crate::vlog::recover_blob_files(
4181            &tree_path.join(crate::file::BLOBS_FOLDER),
4182            &recovery.blob_file_ids,
4183            tree_id,
4184            config.descriptor_table.as_ref(),
4185            &config.fs,
4186        )?;
4187
4188        let version = Version::from_recovery(recovery, &tables, &blob_files)?;
4189
4190        // NOTE: Cleanup old versions
4191        // But only after we definitely recovered the latest version.
4192        // Preserve the snapshot CURRENT references (and its `edits-` log) — the
4193        // latest version id has no file of its own under the incremental
4194        // manifest, so cleaning by it would delete the live snapshot.
4195        Self::cleanup_orphaned_version(tree_path, snapshot_id, &*config.fs)?;
4196
4197        for (table_path, orphan_fs) in orphaned_tables {
4198            log::debug!("Deleting orphaned table {}", table_path.display());
4199            orphan_fs.remove_file(&table_path)?;
4200        }
4201
4202        for blob_file_path in orphaned_blob_files {
4203            log::debug!("Deleting orphaned blob file {}", blob_file_path.display());
4204            (*config.fs).remove_file(&blob_file_path)?;
4205        }
4206
4207        Ok(version)
4208    }
4209
4210    /// Removes stale version files left over from a crash during version swap.
4211    ///
4212    /// # Behavior change vs pre-Fs-trait code
4213    ///
4214    /// The previous implementation used `std::fs::read_dir` + `to_string_lossy()`,
4215    /// which silently skipped non-UTF-8 filenames. `Fs::read_dir` returns
4216    /// `InvalidData` for such entries instead (see [`FsDirEntry`](crate::fs::FsDirEntry) docs), so this
4217    /// function now fails fast on non-UTF-8 names. This is intentional: version
4218    /// files are always `v{u64}` — any non-UTF-8 entry indicates filesystem
4219    /// corruption and should surface as an error rather than be silently ignored.
4220    /// Removes stale manifest files left by older generations: every `v{id}`
4221    /// snapshot except the live one (`v{snapshot_id}`) and every `edits-{id}`
4222    /// log except the live snapshot's (`edits-{snapshot_id}`). A crashed
4223    /// rotation can leak an old snapshot or its log; this sweeps them on open.
4224    /// The live snapshot and log are exactly the generation `CURRENT` points at.
4225    fn cleanup_orphaned_version(
4226        path: &Path,
4227        snapshot_id: crate::version::VersionId,
4228        fs: &dyn crate::fs::Fs,
4229    ) -> crate::Result<()> {
4230        let snapshot_str = format!("v{snapshot_id}");
4231        let log_str = format!("edits-{snapshot_id}");
4232
4233        for dirent in fs.read_dir(path)? {
4234            if dirent.is_dir {
4235                continue;
4236            }
4237
4238            let name = &dirent.file_name;
4239            let is_orphan_snapshot = name.starts_with('v') && *name != snapshot_str;
4240            let is_orphan_log = name.starts_with("edits-") && *name != log_str;
4241            if is_orphan_snapshot || is_orphan_log {
4242                log::trace!("Cleanup orphaned manifest file {name}");
4243                match fs.remove_file(&dirent.path) {
4244                    Ok(()) => {}
4245                    Err(e) if e.kind() == crate::io::ErrorKind::NotFound => {}
4246                    Err(e) => return Err(e.into()),
4247                }
4248            }
4249        }
4250
4251        Ok(())
4252    }
4253}
4254
4255/// Returns `true` if the directory contains version-related artifacts
4256/// (a `tables/` subdir, a `blobs/` subdir, or any `vN` manifest file).
4257///
4258/// Used by [`Tree::open`] to distinguish a genuinely fresh directory
4259/// (safe to `create_new`) from a half-written checkpoint or other
4260/// interrupted sealing (must error rather than silently overwrite).
4261///
4262/// A missing parent directory is treated as "no state" — `create_new`
4263/// is what creates the directory in the first place, so callers may
4264/// invoke `Tree::open` against a path that does not exist yet.
4265fn has_existing_version_state(folder: &Path, fs: &dyn Fs) -> crate::Result<bool> {
4266    if fs.exists(&folder.join(crate::file::TABLES_FOLDER))?
4267        || fs.exists(&folder.join(crate::file::BLOBS_FOLDER))?
4268    {
4269        return Ok(true);
4270    }
4271    let entries = match fs.read_dir(folder) {
4272        Ok(entries) => entries,
4273        Err(e) if e.kind() == crate::io::ErrorKind::NotFound => return Ok(false),
4274        Err(e) => return Err(e.into()),
4275    };
4276    for entry in entries {
4277        let name = &entry.file_name;
4278        if name.starts_with('v') && name.len() > 1 && name[1..].bytes().all(|c| c.is_ascii_digit())
4279        {
4280            return Ok(true);
4281        }
4282    }
4283    Ok(false)
4284}
4285
4286/// Raises a query's lower bound to a table's tight-space restriction, if any.
4287///
4288/// Keys below `restriction` are the punched-out prefix served by the
4289/// replacement table, so a range estimate must not charge them to the
4290/// restricted view. Returns `lo` unchanged when the table is unrestricted or
4291/// the restriction is at or below `lo`.
4292fn effective_lower_bound<'a>(
4293    lo: core::ops::Bound<&'a [u8]>,
4294    restriction: Option<&'a [u8]>,
4295    cmp: &dyn crate::comparator::UserComparator,
4296) -> core::ops::Bound<&'a [u8]> {
4297    use core::cmp::Ordering;
4298    use core::ops::Bound;
4299    match (lo, restriction) {
4300        (Bound::Unbounded, Some(rb)) => Bound::Included(rb),
4301        (Bound::Included(k) | Bound::Excluded(k), Some(rb))
4302            if cmp.compare(rb, k) == Ordering::Greater =>
4303        {
4304            Bound::Included(rb)
4305        }
4306        _ => lo,
4307    }
4308}
4309
4310#[cfg(test)]
4311mod cardinality_tests;
4312
4313#[cfg(all(test, feature = "metrics"))]
4314mod cache_stats_tests;