coordinode-lsm-tree 5.6.0

Embedded LSM-tree storage engine: BuRR filters, zstd dictionary compression, MVCC, range tombstones, merge operators, K/V separation, AES-256-GCM at rest.
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2024-present, fjall-rs
// Copyright (c) 2026-present, Structured World Foundation

//! Active tombstone sets for tracking range tombstones during iteration.
//!
//! During forward or reverse scans, range tombstones must be activated when
//! the scan enters their range and expired when it leaves. These sets use
//! a seqno multiset (`BTreeMap<SeqNo, u32>`) for O(log t) max-seqno queries,
//! and a comparator-ordered expiry queue for deterministic retirement.
//!
//! A unique monotonic `id` on each expiry entry ensures total ordering even
//! when multiple tombstones share the same boundary key.

use crate::{SeqNo, UserKey, comparator::SharedComparator, range_tombstone::RangeTombstone};
use alloc::collections::BTreeMap;
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;

/// Tracks active range tombstones during forward iteration.
///
/// Tombstones are activated when the scan reaches their `start` key, and
/// expired when the scan reaches or passes their `end` key.
///
/// Uses a sorted vector keyed by `(end desc, id asc)` in comparator order,
/// with the expiring-soonest tombstone kept at the tail for cheap `last()`.
pub struct ActiveTombstoneSet {
    comparator: SharedComparator,
    seqno_counts: BTreeMap<SeqNo, u32>,
    // A comparator-ordered Vec keeps expiry deterministic for custom
    // comparators; overlap cardinality is typically small, so O(t) inserts are
    // an acceptable tradeoff for a compact structure with cheap tail expiry.
    pending_expiry: Vec<(UserKey, u64, SeqNo)>,
    next_id: u64,
}

impl ActiveTombstoneSet {
    /// Creates a new forward active tombstone set.
    #[must_use]
    #[cfg_attr(
        not(test),
        expect(
            dead_code,
            reason = "backward-compatible default-comparator constructor"
        )
    )]
    pub fn new() -> Self {
        Self::new_with_comparator(crate::comparator::default_comparator())
    }

    /// Creates a new forward active tombstone set with the given comparator.
    #[must_use]
    pub fn new_with_comparator(comparator: SharedComparator) -> Self {
        Self {
            comparator,
            seqno_counts: BTreeMap::new(),
            pending_expiry: Vec::new(),
            next_id: 0,
        }
    }

    /// Resets the set to empty in place, reusing the backing `BTreeMap` /
    /// `Vec` storage instead of dropping it. Equivalent to replacing with a
    /// fresh [`Self::new_with_comparator`] but allocation-preserving: a
    /// seek-then-iterate-then-reseek loop that activated tombstones keeps the
    /// node / buffer capacity for the next pass.
    pub fn clear(&mut self) {
        self.seqno_counts.clear();
        self.pending_expiry.clear();
        self.next_id = 0;
    }

    /// Activates a range tombstone, adding it to the active set.
    ///
    /// The tombstone is only activated if it is visible at `cutoff_seqno`
    /// (i.e., `rt.seqno < cutoff_seqno`). Each source may supply a different
    /// cutoff (e.g., ephemeral memtable uses its own `index_seqno`).
    /// Duplicate activations (same seqno from different sources) are handled
    /// correctly via multiset accounting.
    pub fn activate(&mut self, rt: &RangeTombstone, cutoff_seqno: SeqNo) {
        if !rt.visible_at(cutoff_seqno) {
            return;
        }
        let id = self.next_id;
        self.next_id += 1;
        *self.seqno_counts.entry(rt.seqno).or_insert(0) += 1;
        let end = rt.end.clone();
        let seqno = rt.seqno;
        let comparator = self.comparator.as_ref();
        let insert_idx = self
            .pending_expiry
            .binary_search_by(|(existing_end, existing_id, _)| {
                // `binary_search_by` uses the closure to position the new
                // target within the existing slice order. Because this Vec is
                // intentionally sorted by `(end desc, id asc)`, we compare the
                // target `(end, id)` against the existing probe here.
                // Swapping the arguments would search as if the slice were in
                // ascending comparator order and would break the tested expiry
                // invariant that the earliest tombstone stays at `last()`.
                comparator
                    .compare(&end, existing_end)
                    .then_with(|| existing_id.cmp(&id))
            })
            .unwrap_or_else(|idx| idx);
        self.pending_expiry.insert(insert_idx, (end, id, seqno));
    }

    /// Expires tombstones whose `end <= current_key`.
    ///
    /// In the half-open convention `[start, end)`, a tombstone stops covering
    /// keys at `end`. So when `current_key >= end`, the tombstone no longer
    /// applies and is removed from the active set.
    ///
    /// # Panics
    ///
    /// Panics if an expiry pop has no matching activation in the seqno multiset.
    pub fn expire_until(&mut self, current_key: &[u8]) {
        while let Some((end, _, seqno)) = self.pending_expiry.last() {
            if self.comparator.compare(end, current_key) == core::cmp::Ordering::Greater {
                break;
            }
            let seqno = *seqno;
            self.pending_expiry.pop();
            #[expect(
                clippy::expect_used,
                reason = "expiry pop must have matching activation"
            )]
            let count = self
                .seqno_counts
                .get_mut(&seqno)
                .expect("expiry pop must have matching activation");
            *count -= 1;
            if *count == 0 {
                self.seqno_counts.remove(&seqno);
            }
        }
    }

    /// Returns the highest seqno among all active tombstones, or `None` if
    /// no tombstones are active.
    #[must_use]
    pub fn max_active_seqno(&self) -> Option<SeqNo> {
        self.seqno_counts.keys().next_back().copied()
    }

    /// Returns `true` if a KV with the given seqno is suppressed by any
    /// active tombstone (i.e., there exists an active tombstone with a
    /// higher seqno).
    #[must_use]
    pub fn is_suppressed(&self, key_seqno: SeqNo) -> bool {
        self.max_active_seqno().is_some_and(|max| key_seqno < max)
    }

    /// Bulk-activates tombstones at a seek position.
    ///
    /// # Invariant
    ///
    /// At any iterator position, the active set contains only tombstones
    /// where `start <= current_key < end` (and visible at their respective
    /// `cutoff_seqno`). Seek prefill must collect truly overlapping tombstones
    /// (`start <= key < end`); `expire_until` immediately enforces the
    /// `end` bound.
    #[cfg_attr(
        not(test),
        expect(dead_code, reason = "used by iterator initialization logic")
    )]
    pub fn initialize_from(
        &mut self,
        tombstones: impl IntoIterator<Item = (RangeTombstone, SeqNo)>,
    ) {
        for (rt, cutoff) in tombstones {
            self.activate(&rt, cutoff);
        }
    }

    /// Returns `true` if there are no active tombstones.
    #[cfg_attr(
        not(test),
        expect(dead_code, reason = "helper for callers to inspect active tombstones")
    )]
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.seqno_counts.is_empty()
    }
}

/// Tracks active range tombstones during reverse iteration.
///
/// During reverse scans, tombstones are activated when the scan reaches
/// a key < `end` (strict `>`: `rt.end > current_key`), and expired when
/// `current_key < rt.start`.
///
/// Uses a sorted vector keyed by `(start asc, id asc)` in comparator order,
/// with the expiring-soonest tombstone kept at the tail for cheap `last()`.
pub struct ActiveTombstoneSetReverse {
    comparator: SharedComparator,
    seqno_counts: BTreeMap<SeqNo, u32>,
    pending_expiry: Vec<(UserKey, u64, SeqNo)>,
    next_id: u64,
}

impl ActiveTombstoneSetReverse {
    /// Creates a new reverse active tombstone set.
    #[must_use]
    #[cfg_attr(
        not(test),
        expect(
            dead_code,
            reason = "backward-compatible default-comparator constructor"
        )
    )]
    pub fn new() -> Self {
        Self::new_with_comparator(crate::comparator::default_comparator())
    }

    /// Creates a new reverse active tombstone set with the given comparator.
    #[must_use]
    pub fn new_with_comparator(comparator: SharedComparator) -> Self {
        Self {
            comparator,
            seqno_counts: BTreeMap::new(),
            pending_expiry: Vec::new(),
            next_id: 0,
        }
    }

    /// Resets the set to empty in place, reusing the backing `BTreeMap` /
    /// `Vec` storage instead of dropping it (see
    /// [`ActiveTombstoneSet::clear`]).
    pub fn clear(&mut self) {
        self.seqno_counts.clear();
        self.pending_expiry.clear();
        self.next_id = 0;
    }

    /// Activates a range tombstone, adding it to the active set.
    ///
    /// The tombstone is only activated if it is visible at `cutoff_seqno`
    /// (i.e., `rt.seqno < cutoff_seqno`). Each source may supply a different
    /// cutoff (e.g., ephemeral memtable uses its own `index_seqno`).
    /// Duplicate activations (same seqno from different sources) are handled
    /// correctly via multiset accounting.
    ///
    /// For reverse iteration, activation uses strict `>`: tombstones with
    /// `rt.end > current_key` are activated. `key == end` is NOT covered
    /// (half-open).
    pub fn activate(&mut self, rt: &RangeTombstone, cutoff_seqno: SeqNo) {
        if !rt.visible_at(cutoff_seqno) {
            return;
        }
        let id = self.next_id;
        self.next_id += 1;
        *self.seqno_counts.entry(rt.seqno).or_insert(0) += 1;
        let comparator = self.comparator.as_ref();
        let pos = self
            .pending_expiry
            .binary_search_by(|(start, existing_id, _)| {
                comparator
                    .compare(start, &rt.start)
                    .then_with(|| existing_id.cmp(&id))
            })
            .unwrap_or_else(|idx| idx);
        self.pending_expiry
            .insert(pos, (rt.start.clone(), id, rt.seqno));
    }

    /// Expires tombstones whose `start > current_key`.
    ///
    /// During reverse iteration, when the scan moves to a key that is
    /// before a tombstone's start, that tombstone no longer applies.
    ///
    /// # Panics
    ///
    /// Panics if an expiry pop has no matching activation in the seqno multiset.
    pub fn expire_until(&mut self, current_key: &[u8]) {
        while let Some((start, _, seqno)) = self.pending_expiry.last() {
            if self.comparator.compare(current_key, start) == core::cmp::Ordering::Less {
                let seqno = *seqno;
                self.pending_expiry.pop();
                #[expect(
                    clippy::expect_used,
                    reason = "expiry pop must have matching activation"
                )]
                let count = self
                    .seqno_counts
                    .get_mut(&seqno)
                    .expect("expiry pop must have matching activation");
                *count -= 1;
                if *count == 0 {
                    self.seqno_counts.remove(&seqno);
                }
            } else {
                break;
            }
        }
    }

    /// Returns the highest seqno among all active tombstones, or `None` if
    /// no tombstones are active.
    #[must_use]
    pub fn max_active_seqno(&self) -> Option<SeqNo> {
        self.seqno_counts.keys().next_back().copied()
    }

    /// Returns `true` if a KV with the given seqno is suppressed by any
    /// active tombstone (i.e., there exists an active tombstone with a
    /// higher seqno).
    #[must_use]
    pub fn is_suppressed(&self, key_seqno: SeqNo) -> bool {
        self.max_active_seqno().is_some_and(|max| key_seqno < max)
    }

    /// Bulk-activates tombstones at a seek position (for reverse).
    #[cfg_attr(
        not(test),
        expect(dead_code, reason = "used by iterator initialization logic")
    )]
    pub fn initialize_from(
        &mut self,
        tombstones: impl IntoIterator<Item = (RangeTombstone, SeqNo)>,
    ) {
        for (rt, cutoff) in tombstones {
            self.activate(&rt, cutoff);
        }
    }

    /// Returns `true` if there are no active tombstones.
    #[cfg_attr(
        not(test),
        expect(dead_code, reason = "helper for callers to inspect active tombstones")
    )]
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.seqno_counts.is_empty()
    }
}

#[cfg(test)]
mod tests;