coordinode-lsm-tree 5.6.0

Embedded LSM-tree storage engine: BuRR filters, zstd dictionary compression, MVCC, range tombstones, merge operators, K/V separation, AES-256-GCM at rest.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2026-present, Structured World Foundation

//! Positional delete-bitmap for columnar segments.
//!
//! A [`DeleteBitmap`] marks, by row position, which rows of a columnar segment
//! are deleted. It is a pure membership set: MVCC reconciliation happens at
//! materialization time (a row is added only once its deleting tombstone is
//! visible to every live snapshot, i.e. its seqno is below the compaction
//! watermark), so the read path is a plain `contains` check with no per-row
//! seqno to interpret.
//!
//! # Layout
//!
//! Rows are grouped into fixed [`CHUNK_ROWS`]-row chunks (a bulk delete of a
//! whole chunk is therefore O(1), not O(rows)). Each non-empty chunk is stored
//! either as a sorted `Container::Sparse` array of in-chunk offsets, or, once
//! it holds more than `SPARSE_MAX` rows, as a `Container::Dense` bitset.
//! This mirrors a Roaring bitmap's array/bitset containers but at a fixed
//! 2048-row granularity tuned to the columnar block size. Empty chunks are not
//! stored.
//!
//! # no-std
//!
//! Build, query, union, and the serialized [`DeleteBitmap::encode`] /
//! [`DeleteBitmap::decode`] round-trip are all `core` + `alloc` only, so an
//! embedded or WASM reader can apply deletes without `std`.
//!
//! # Examples
//!
//! ```
//! use lsm_tree::table::delete_bitmap::DeleteBitmap;
//!
//! let mut dv = DeleteBitmap::new();
//! dv.insert(5);
//! dv.insert(5000);
//! assert!(dv.contains(5));
//! assert!(!dv.contains(6));
//!
//! let bytes = dv.encode();
//! let decoded = DeleteBitmap::decode(&bytes).unwrap();
//! assert_eq!(decoded, dv);
//! ```

use crate::{Error, Result};
use alloc::boxed::Box;
use alloc::vec::Vec;

/// Rows per chunk. A bulk delete spanning a full chunk costs O(1) bitmap work.
pub const CHUNK_ROWS: u32 = 2048;

/// 64-bit words in a dense chunk bitset (`CHUNK_ROWS / 64`).
const WORDS_PER_CHUNK: usize = (CHUNK_ROWS as usize) / 64;

/// Sparse-to-dense break-even: a dense chunk is `WORDS_PER_CHUNK * 8` bytes; a
/// sparse `u16` offset is 2 bytes, so beyond this many rows the dense bitset is
/// the smaller (and faster) representation.
const SPARSE_MAX: usize = WORDS_PER_CHUNK * 4;

const KIND_SPARSE: u8 = 0;
const KIND_DENSE: u8 = 1;

/// Splits a row position into its `(chunk index, in-chunk offset)`. The offset
/// is `row % CHUNK_ROWS`, which the compiler proves fits `u16`.
fn split(row: u32) -> (u32, u16) {
    (row / CHUNK_ROWS, (row % CHUNK_ROWS) as u16)
}

/// One chunk's in-chunk row offsets, in the representation that is currently
/// smaller for its cardinality.
#[derive(Clone, Debug, Eq, PartialEq)]
enum Container {
    /// Sorted, deduplicated in-chunk offsets in `0..CHUNK_ROWS`.
    Sparse(Vec<u16>),
    /// Dense `CHUNK_ROWS`-bit bitset, word `w` bit `b` = row `w * 64 + b`.
    Dense(Box<[u64; WORDS_PER_CHUNK]>),
}

impl Container {
    fn contains(&self, off: u16) -> bool {
        match self {
            Self::Sparse(offs) => offs.binary_search(&off).is_ok(),
            Self::Dense(words) => {
                let (w, b) = (off as usize / 64, off as usize % 64);
                words.get(w).is_some_and(|word| word & (1u64 << b) != 0)
            }
        }
    }

    /// Inserts `off`; returns `true` if it was newly added.
    fn insert(&mut self, off: u16) -> bool {
        match self {
            Self::Sparse(offs) => match offs.binary_search(&off) {
                Ok(_) => false,
                Err(pos) => {
                    offs.insert(pos, off);
                    if offs.len() > SPARSE_MAX {
                        *self = Self::densify(offs);
                    }
                    true
                }
            },
            Self::Dense(words) => {
                let (w, b) = (off as usize / 64, off as usize % 64);
                let mask = 1u64 << b;
                // off < CHUNK_ROWS is a caller invariant, so the word exists; the
                // bounds-checked access keeps the read path panic-free regardless.
                let Some(word) = words.get_mut(w) else {
                    return false;
                };
                let was = *word & mask != 0;
                *word |= mask;
                !was
            }
        }
    }

    fn densify(offs: &[u16]) -> Self {
        let mut words = Box::new([0u64; WORDS_PER_CHUNK]);
        for &off in offs {
            if let Some(word) = words.get_mut(off as usize / 64) {
                *word |= 1u64 << (off as usize % 64);
            }
        }
        Self::Dense(words)
    }

    #[expect(
        clippy::cast_possible_truncation,
        reason = "a chunk holds at most CHUNK_ROWS (2048) distinct offsets, well within u32"
    )]
    fn cardinality(&self) -> u32 {
        match self {
            Self::Sparse(offs) => offs.len() as u32,
            Self::Dense(words) => words.iter().map(|w| w.count_ones()).sum(),
        }
    }

    #[expect(
        clippy::cast_possible_truncation,
        reason = "w < 32 and b < 64, so w*64 + b < 2048 fits u16"
    )]
    fn for_each<F: FnMut(u16)>(&self, mut f: F) {
        match self {
            Self::Sparse(offs) => offs.iter().for_each(|&o| f(o)),
            Self::Dense(words) => {
                for (w, &word) in words.iter().enumerate() {
                    let mut bits = word;
                    while bits != 0 {
                        let b = bits.trailing_zeros() as usize;
                        // w < 32 and b < 64, so w*64 + b < 2048 fits in u16.
                        f((w * 64 + b) as u16);
                        bits &= bits - 1;
                    }
                }
            }
        }
    }
}

/// A positional delete set over the rows of one columnar segment.
///
/// See the [module documentation](self) for the layout and MVCC semantics.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct DeleteBitmap {
    /// Non-empty chunks, kept sorted by chunk index for `contains` lookups.
    chunks: Vec<(u32, Container)>,
}

impl DeleteBitmap {
    /// Creates an empty delete set (no rows deleted).
    #[must_use]
    pub fn new() -> Self {
        Self { chunks: Vec::new() }
    }

    /// Returns `true` if no rows are marked deleted.
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.chunks.is_empty()
    }

    /// Returns the number of deleted rows.
    #[must_use]
    pub fn len(&self) -> u64 {
        self.chunks
            .iter()
            .map(|(_, c)| u64::from(c.cardinality()))
            .sum()
    }

    /// Marks row `row` deleted. Returns `true` if it was not already deleted.
    pub fn insert(&mut self, row: u32) -> bool {
        let (chunk, off) = split(row);
        match self.chunks.binary_search_by_key(&chunk, |(c, _)| *c) {
            Ok(pos) => self.chunks.get_mut(pos).is_some_and(|(_, c)| c.insert(off)),
            Err(pos) => {
                self.chunks
                    .insert(pos, (chunk, Container::Sparse(alloc::vec![off])));
                true
            }
        }
    }

    /// Returns `true` if row `row` is marked deleted.
    #[must_use]
    pub fn contains(&self, row: u32) -> bool {
        let (chunk, off) = split(row);
        match self.chunks.binary_search_by_key(&chunk, |(c, _)| *c) {
            Ok(pos) => self.chunks.get(pos).is_some_and(|(_, c)| c.contains(off)),
            Err(_) => false,
        }
    }

    /// Returns `true` if any row in chunk `chunk_index` is deleted. Lets a scan
    /// skip an entire untouched block in O(log chunks) without per-row checks.
    #[must_use]
    pub fn chunk_has_deletes(&self, chunk_index: u32) -> bool {
        self.chunks
            .binary_search_by_key(&chunk_index, |(c, _)| *c)
            .is_ok()
    }

    /// Unions `other` into `self` (merge-on-write: the result marks a row
    /// deleted iff either side did).
    pub fn union(&mut self, other: &Self) {
        for (chunk, container) in &other.chunks {
            match self.chunks.binary_search_by_key(chunk, |(c, _)| *c) {
                Ok(pos) => {
                    if let Some((_, dst)) = self.chunks.get_mut(pos) {
                        container.for_each(|off| {
                            dst.insert(off);
                        });
                    }
                }
                Err(pos) => self.chunks.insert(pos, (*chunk, container.clone())),
            }
        }
    }

    /// Iterates the deleted row positions in ascending order.
    pub fn iter(&self) -> impl Iterator<Item = u32> + '_ {
        self.chunks.iter().flat_map(|(chunk, container)| {
            let base = chunk * CHUNK_ROWS;
            let mut rows = Vec::with_capacity(container.cardinality() as usize);
            container.for_each(|off| rows.push(base + u32::from(off)));
            rows.into_iter()
        })
    }

    /// Serializes to the portable on-disk form (`core` + `alloc` decodable).
    ///
    /// Layout: `u32` non-empty-chunk count, then per chunk `u32` index, `u8`
    /// kind, and the container payload (sparse: `u16` count + offsets; dense:
    /// [`WORDS_PER_CHUNK`] little-endian `u64` words).
    #[must_use]
    #[expect(
        clippy::cast_possible_truncation,
        reason = "chunk count <= row_count/2048 fits u32; a chunk holds <= 2048 offsets, fitting u16"
    )]
    pub fn encode(&self) -> Vec<u8> {
        let mut out = Vec::new();
        out.extend_from_slice(&(self.chunks.len() as u32).to_le_bytes());
        for (chunk, container) in &self.chunks {
            out.extend_from_slice(&chunk.to_le_bytes());
            match container {
                Container::Sparse(offs) => {
                    out.push(KIND_SPARSE);
                    out.extend_from_slice(&(offs.len() as u16).to_le_bytes());
                    for &off in offs {
                        out.extend_from_slice(&off.to_le_bytes());
                    }
                }
                Container::Dense(words) => {
                    out.push(KIND_DENSE);
                    for &word in words.iter() {
                        out.extend_from_slice(&word.to_le_bytes());
                    }
                }
            }
        }
        out
    }

    /// Decodes a buffer produced by [`DeleteBitmap::encode`].
    ///
    /// # Errors
    ///
    /// Returns [`Error::InvalidHeader`] on a truncated buffer, an unknown
    /// container kind, an out-of-range offset, or chunk indices that are not
    /// strictly ascending.
    pub fn decode(bytes: &[u8]) -> Result<Self> {
        let mut cur = Cursor::new(bytes);
        let chunk_count = usize::try_from(cur.u32()?)
            .map_err(|_| Error::InvalidHeader("delete_bitmap: chunk count does not fit usize"))?;
        // Bound the preallocation by the smallest a chunk can be (index u32 +
        // kind u8 + sparse count u16 = 7 bytes), so a corrupt header cannot
        // request a huge `Vec` before the loop fails on a truncated buffer.
        if chunk_count > cur.remaining_len() / 7 {
            return Err(Error::InvalidHeader(
                "delete_bitmap: chunk count exceeds encoded payload",
            ));
        }
        let mut chunks = Vec::with_capacity(chunk_count);
        let mut last_chunk: Option<u32> = None;
        for _ in 0..chunk_count {
            let chunk = cur.u32()?;
            // The chunk's base row is `chunk * CHUNK_ROWS`; reject an index whose
            // base would overflow the u32 position space the mask addresses.
            if chunk > u32::MAX / CHUNK_ROWS {
                return Err(Error::InvalidHeader(
                    "delete_bitmap: chunk index out of row-position range",
                ));
            }
            if last_chunk.is_some_and(|p| chunk <= p) {
                return Err(Error::InvalidHeader(
                    "delete_bitmap: chunk indices not strictly ascending",
                ));
            }
            last_chunk = Some(chunk);
            let container = match cur.u8()? {
                KIND_SPARSE => {
                    let count = cur.u16()? as usize;
                    if count == 0 || count > CHUNK_ROWS as usize {
                        return Err(Error::InvalidHeader(
                            "delete_bitmap: sparse count out of range",
                        ));
                    }
                    let mut offs = Vec::with_capacity(count);
                    let mut last: Option<u16> = None;
                    for _ in 0..count {
                        let off = cur.u16()?;
                        if u32::from(off) >= CHUNK_ROWS {
                            return Err(Error::InvalidHeader(
                                "delete_bitmap: sparse offset out of range",
                            ));
                        }
                        if last.is_some_and(|p| off <= p) {
                            return Err(Error::InvalidHeader(
                                "delete_bitmap: sparse offsets not strictly ascending",
                            ));
                        }
                        last = Some(off);
                        offs.push(off);
                    }
                    Container::Sparse(offs)
                }
                KIND_DENSE => {
                    let mut words = Box::new([0u64; WORDS_PER_CHUNK]);
                    for word in words.iter_mut() {
                        *word = cur.u64()?;
                    }
                    Container::Dense(words)
                }
                _ => {
                    return Err(Error::InvalidHeader(
                        "delete_bitmap: unknown container kind",
                    ));
                }
            };
            // `encode` never emits an empty container; an empty one is malformed.
            if container.cardinality() == 0 {
                return Err(Error::InvalidHeader("delete_bitmap: empty container"));
            }
            chunks.push((chunk, container));
        }
        // The section must decode exactly, with nothing left over.
        if cur.remaining_len() != 0 {
            return Err(Error::InvalidHeader(
                "delete_bitmap: trailing bytes after chunks",
            ));
        }
        Ok(Self { chunks })
    }
}

/// Little-endian cursor over a byte buffer with bounds-checked reads.
struct Cursor<'a> {
    buf: &'a [u8],
    pos: usize,
}

impl<'a> Cursor<'a> {
    fn new(buf: &'a [u8]) -> Self {
        Self { buf, pos: 0 }
    }

    /// Bytes not yet consumed. `pos` never exceeds `buf.len()` (every read goes
    /// through bounds-checked `take`), so the floor at 0 only guards against a
    /// future bug rather than a reachable underflow.
    fn remaining_len(&self) -> usize {
        self.buf.len().saturating_sub(self.pos)
    }

    fn take<const N: usize>(&mut self) -> Result<[u8; N]> {
        let end = self
            .pos
            .checked_add(N)
            .ok_or(Error::InvalidHeader("delete_bitmap: length overflow"))?;
        let arr = self
            .buf
            .get(self.pos..end)
            .and_then(|s| <[u8; N]>::try_from(s).ok())
            .ok_or(Error::InvalidHeader("delete_bitmap: truncated buffer"))?;
        self.pos = end;
        Ok(arr)
    }

    fn u8(&mut self) -> Result<u8> {
        let [b] = self.take()?;
        Ok(b)
    }

    fn u16(&mut self) -> Result<u16> {
        Ok(u16::from_le_bytes(self.take()?))
    }

    fn u32(&mut self) -> Result<u32> {
        Ok(u32::from_le_bytes(self.take()?))
    }

    fn u64(&mut self) -> Result<u64> {
        Ok(u64::from_le_bytes(self.take()?))
    }
}

#[cfg(test)]
#[expect(
    clippy::unwrap_used,
    clippy::indexing_slicing,
    clippy::cast_possible_truncation,
    reason = "test assertions over fixed in-test buffers"
)]
mod tests;