coordinode-lsm-tree 5.4.0

Embedded LSM-tree storage engine: BuRR filters, zstd dictionary compression, MVCC, range tombstones, merge operators, K/V separation, AES-256-GCM at rest.
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2026-present, Structured World Foundation

//! Per-SST seqno-bounds section: a parallel `(data-block offset -> [seqno_min,
//! seqno_max])` map that powers the `scan_since_seqno` block-skip WITHOUT
//! bloating the index entries.
//!
//! The bounds used to live inline in each index entry (markers 2/3), which made
//! every point-read index probe step over two extra varints and enlarged the
//! index block (measurable point-read cost). Moving them into this optional
//! parallel section keeps the index entries at their legacy size: a point read
//! never touches the bounds, while a seqno-scoped scan looks the bounds up by
//! data-block file offset and skips blocks whose `[min, max]` cannot overlap the
//! target window.
//!
//! Optional + format-gated (emitted only when `seqno_in_index` is on), so a
//! table written without it carries zero extra bytes. Offset-keyed (not ordinal)
//! so a lookup cannot silently mis-map if block iteration order ever shifts.
//!
//! # Wire layout
//!
//! ```text
//! [count: u32 LE]
//! count × [ block_offset: u64 LE | seqno_min: u64 LE | seqno_max: u64 LE ]
//! ```
//!
//! Entries are strictly ascending by `block_offset` (write order == file order),
//! enabling a binary-search lookup.

#[cfg(not(feature = "std"))]
use alloc::vec::Vec;

use super::BlockOffset;
use crate::SeqNo;

/// Serialize the per-data-block seqno bounds into the `seqno_bounds` section.
/// `bounds[i]` is `(block_offset, (seqno_min, seqno_max))` for one data block,
/// in write (ascending-offset) order.
///
/// # Errors
///
/// Returns [`crate::Error::InvalidHeader`] if the block count does not fit the
/// `u32` section-header width. Validating at encode time fails fast at the
/// source rather than deferring detection to the decoder.
pub fn encode_seqno_bounds(
    out: &mut Vec<u8>,
    bounds: &[(BlockOffset, (SeqNo, SeqNo))],
) -> crate::Result<()> {
    let count =
        u32::try_from(bounds.len()).map_err(|_| crate::Error::InvalidHeader("SeqnoBounds"))?;
    out.extend_from_slice(&count.to_le_bytes());
    for (offset, (seqno_min, seqno_max)) in bounds {
        // Write-side invariant: a block's bounds must be ordered. Asserted here
        // (debug builds) so a writer bug surfaces at its source, in addition to
        // the decoder's runtime check that catches it at open time.
        debug_assert!(
            seqno_min <= seqno_max,
            "seqno bounds inverted: min {seqno_min} > max {seqno_max}"
        );
        out.extend_from_slice(&offset.0.to_le_bytes());
        out.extend_from_slice(&seqno_min.to_le_bytes());
        out.extend_from_slice(&seqno_max.to_le_bytes());
    }
    Ok(())
}

/// Decoded `seqno_bounds` section: a lookup from a data block's file offset to
/// its `[seqno_min, seqno_max]`. Empty when the table has no seqno bounds (the
/// section was absent), in which case every lookup returns `None` and the scan
/// falls back to a full per-entry filter.
#[derive(Debug, Default, Clone)]
pub struct SeqnoBoundsMap {
    /// `(block_offset, (seqno_min, seqno_max))`, sorted ascending by offset for
    /// binary-search lookup.
    entries: Vec<(u64, (SeqNo, SeqNo))>,
}

impl SeqnoBoundsMap {
    /// Decode a `seqno_bounds` section payload.
    ///
    /// # Errors
    ///
    /// Returns [`crate::Error::InvalidHeader`] if the payload is truncated, the
    /// entries are not strictly ascending by offset, or any entry has
    /// `seqno_min > seqno_max` (a corrupt section must surface rather than
    /// silently mis-skip a block).
    pub fn decode(bytes: &[u8]) -> crate::Result<Self> {
        const ERR: crate::Error = crate::Error::InvalidHeader("SeqnoBounds");

        fn take<'a>(r: &mut &'a [u8], n: usize) -> Option<&'a [u8]> {
            if r.len() < n {
                return None;
            }
            let (head, tail) = r.split_at(n);
            *r = tail;
            Some(head)
        }
        fn read_u32(r: &mut &[u8]) -> Option<u32> {
            let b: [u8; 4] = take(r, 4)?.try_into().ok()?;
            Some(u32::from_le_bytes(b))
        }
        fn read_u64(r: &mut &[u8]) -> Option<u64> {
            let b: [u8; 8] = take(r, 8)?.try_into().ok()?;
            Some(u64::from_le_bytes(b))
        }

        // Each entry is exactly three u64s on the wire.
        const ENTRY_SIZE: usize = 3 * core::mem::size_of::<u64>();

        let mut r = bytes;
        let count = read_u32(&mut r).ok_or(ERR)?;
        // Reject a corrupt count BEFORE the speculative allocation below: a
        // count claiming more entries than the remaining payload can hold is
        // invalid, and validating it up front bounds `with_capacity` to the
        // real payload size (so a corrupt count cannot trigger a multi-GB
        // pre-allocation / OOM instead of a clean `InvalidHeader`).
        match (count as usize).checked_mul(ENTRY_SIZE) {
            Some(needed) if needed <= r.len() => {}
            _ => return Err(ERR),
        }
        let mut entries = Vec::with_capacity(count as usize);
        let mut prev: Option<u64> = None;
        for _ in 0..count {
            let offset = read_u64(&mut r).ok_or(ERR)?;
            if prev.is_some_and(|p| offset <= p) {
                return Err(ERR);
            }
            prev = Some(offset);
            let seqno_min = read_u64(&mut r).ok_or(ERR)?;
            let seqno_max = read_u64(&mut r).ok_or(ERR)?;
            if seqno_min > seqno_max {
                return Err(ERR);
            }
            entries.push((offset, (seqno_min, seqno_max)));
        }
        // The section must contain exactly `count` entries and nothing more:
        // leftover bytes mean a wrong count or a corrupt / padded section, so
        // reject them rather than silently accept a mis-sized parse.
        if !r.is_empty() {
            return Err(ERR);
        }
        Ok(Self { entries })
    }

    /// The `[seqno_min, seqno_max]` for the data block at `offset`, or `None` if
    /// the block was not recorded (legacy table without seqno bounds).
    #[must_use]
    pub fn bounds_for(&self, offset: u64) -> Option<(SeqNo, SeqNo)> {
        let idx = self
            .entries
            .binary_search_by_key(&offset, |(o, _)| *o)
            .ok()?;
        self.entries.get(idx).map(|(_, b)| *b)
    }

    /// Whether any per-block bounds are recorded.
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.entries.is_empty()
    }

    /// Number of recorded data blocks.
    #[must_use]
    pub fn len(&self) -> usize {
        self.entries.len()
    }
}

#[cfg(test)]
#[expect(clippy::expect_used, reason = "test code")]
mod tests {
    use super::*;

    #[test]
    fn encode_decode_round_trips() {
        let bounds = [
            (BlockOffset(0), (10u64, 20u64)),
            (BlockOffset(4096), (5, 5)),
            (BlockOffset(9000), (0, 1_000_000)),
        ];
        let mut buf = Vec::new();
        encode_seqno_bounds(&mut buf, &bounds).expect("encode");
        let map = SeqnoBoundsMap::decode(&buf).expect("decode");
        assert_eq!(map.len(), 3);
        assert_eq!(map.bounds_for(0), Some((10, 20)));
        assert_eq!(map.bounds_for(4096), Some((5, 5)));
        assert_eq!(map.bounds_for(9000), Some((0, 1_000_000)));
        // An offset not present (e.g. between recorded blocks) returns None.
        assert_eq!(map.bounds_for(1), None);
        assert_eq!(map.bounds_for(5000), None);
    }

    #[test]
    fn decode_empty_section_is_empty_map() {
        let mut buf = Vec::new();
        encode_seqno_bounds(&mut buf, &[]).expect("encode");
        let map = SeqnoBoundsMap::decode(&buf).expect("decode empty");
        assert!(map.is_empty());
        assert_eq!(map.bounds_for(0), None);
    }

    #[test]
    fn decode_rejects_non_ascending_offsets() {
        // count=2 with offsets 100 then 50 (descending) must be rejected.
        let mut buf = Vec::new();
        buf.extend_from_slice(&2u32.to_le_bytes());
        for (off, lo, hi) in [(100u64, 1u64, 2u64), (50, 1, 2)] {
            buf.extend_from_slice(&off.to_le_bytes());
            buf.extend_from_slice(&lo.to_le_bytes());
            buf.extend_from_slice(&hi.to_le_bytes());
        }
        assert!(SeqnoBoundsMap::decode(&buf).is_err());
    }

    #[test]
    fn decode_rejects_inverted_bounds() {
        let mut buf = Vec::new();
        buf.extend_from_slice(&1u32.to_le_bytes());
        buf.extend_from_slice(&0u64.to_le_bytes());
        buf.extend_from_slice(&9u64.to_le_bytes()); // min
        buf.extend_from_slice(&3u64.to_le_bytes()); // max < min
        assert!(SeqnoBoundsMap::decode(&buf).is_err());
    }

    #[test]
    fn decode_rejects_truncated_payload() {
        // count=1 but no entry bytes.
        let buf = 1u32.to_le_bytes().to_vec();
        assert!(SeqnoBoundsMap::decode(&buf).is_err());
    }

    #[test]
    fn decode_rejects_trailing_bytes_after_last_entry() {
        // A section with bytes past the declared entries must be rejected, not
        // silently parsed with the wrong length: leftover data means a wrong
        // count or a corrupt / padded section, so it must surface as an error.
        let mut buf = Vec::new();
        encode_seqno_bounds(&mut buf, &[(BlockOffset(0), (1u64, 2u64))]).expect("encode");
        buf.push(0xAB); // one stray trailing byte
        assert!(SeqnoBoundsMap::decode(&buf).is_err());
    }

    #[test]
    fn decode_rejects_count_larger_than_payload() {
        // A corrupt count must be rejected up front, before any speculative
        // allocation sized by that count: each entry is exactly 24 bytes, so a
        // count claiming far more entries than the payload can hold is invalid.
        // (The pathological multi-GB count that would OOM the old `with_capacity`
        // cannot itself be unit-tested without risking an allocator abort; this
        // guards the same contract with a count the decoder must reject fast.)
        let mut buf = Vec::new();
        buf.extend_from_slice(&100_000u32.to_le_bytes()); // claims 100k entries
        buf.extend_from_slice(&0u64.to_le_bytes()); // ...but only 8 stray bytes
        assert!(SeqnoBoundsMap::decode(&buf).is_err());
    }
}