coordinode-lsm-tree 4.4.0

A K.I.S.S. implementation of log-structured merge trees (LSM-trees/LSMTs) — CoordiNode fork
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

//! Active tombstone sets for tracking range tombstones during iteration.
//!
//! During forward or reverse scans, range tombstones must be activated when
//! the scan enters their range and expired when it leaves. These sets use
//! a seqno multiset (`BTreeMap<SeqNo, u32>`) for O(log t) max-seqno queries,
//! and a comparator-ordered expiry queue for deterministic retirement.
//!
//! A unique monotonic `id` on each expiry entry ensures total ordering even
//! when multiple tombstones share the same boundary key.

use crate::{SeqNo, UserKey, comparator::SharedComparator, range_tombstone::RangeTombstone};
use std::collections::BTreeMap;

/// Tracks active range tombstones during forward iteration.
///
/// Tombstones are activated when the scan reaches their `start` key, and
/// expired when the scan reaches or passes their `end` key.
///
/// Uses a sorted vector keyed by `(end desc, id asc)` in comparator order,
/// with the expiring-soonest tombstone kept at the tail for cheap `last()`.
pub struct ActiveTombstoneSet {
    comparator: SharedComparator,
    seqno_counts: BTreeMap<SeqNo, u32>,
    // A comparator-ordered Vec keeps expiry deterministic for custom
    // comparators; overlap cardinality is typically small, so O(t) inserts are
    // an acceptable tradeoff for a compact structure with cheap tail expiry.
    pending_expiry: Vec<(UserKey, u64, SeqNo)>,
    next_id: u64,
}

impl ActiveTombstoneSet {
    /// Creates a new forward active tombstone set.
    #[must_use]
    #[cfg_attr(
        not(test),
        expect(
            dead_code,
            reason = "backward-compatible default-comparator constructor"
        )
    )]
    pub fn new() -> Self {
        Self::new_with_comparator(crate::comparator::default_comparator())
    }

    /// Creates a new forward active tombstone set with the given comparator.
    #[must_use]
    pub fn new_with_comparator(comparator: SharedComparator) -> Self {
        Self {
            comparator,
            seqno_counts: BTreeMap::new(),
            pending_expiry: Vec::new(),
            next_id: 0,
        }
    }

    /// Activates a range tombstone, adding it to the active set.
    ///
    /// The tombstone is only activated if it is visible at `cutoff_seqno`
    /// (i.e., `rt.seqno < cutoff_seqno`). Each source may supply a different
    /// cutoff (e.g., ephemeral memtable uses its own `index_seqno`).
    /// Duplicate activations (same seqno from different sources) are handled
    /// correctly via multiset accounting.
    pub fn activate(&mut self, rt: &RangeTombstone, cutoff_seqno: SeqNo) {
        if !rt.visible_at(cutoff_seqno) {
            return;
        }
        let id = self.next_id;
        self.next_id += 1;
        *self.seqno_counts.entry(rt.seqno).or_insert(0) += 1;
        let end = rt.end.clone();
        let seqno = rt.seqno;
        let comparator = self.comparator.as_ref();
        let insert_idx = self
            .pending_expiry
            .binary_search_by(|(existing_end, existing_id, _)| {
                // `binary_search_by` uses the closure to position the new
                // target within the existing slice order. Because this Vec is
                // intentionally sorted by `(end desc, id asc)`, we compare the
                // target `(end, id)` against the existing probe here.
                // Swapping the arguments would search as if the slice were in
                // ascending comparator order and would break the tested expiry
                // invariant that the earliest tombstone stays at `last()`.
                comparator
                    .compare(&end, existing_end)
                    .then_with(|| existing_id.cmp(&id))
            })
            .unwrap_or_else(|idx| idx);
        self.pending_expiry.insert(insert_idx, (end, id, seqno));
    }

    /// Expires tombstones whose `end <= current_key`.
    ///
    /// In the half-open convention `[start, end)`, a tombstone stops covering
    /// keys at `end`. So when `current_key >= end`, the tombstone no longer
    /// applies and is removed from the active set.
    ///
    /// # Panics
    ///
    /// Panics if an expiry pop has no matching activation in the seqno multiset.
    pub fn expire_until(&mut self, current_key: &[u8]) {
        while let Some((end, _, seqno)) = self.pending_expiry.last() {
            if self.comparator.compare(end, current_key) == std::cmp::Ordering::Greater {
                break;
            }
            let seqno = *seqno;
            self.pending_expiry.pop();
            #[expect(
                clippy::expect_used,
                reason = "expiry pop must have matching activation"
            )]
            let count = self
                .seqno_counts
                .get_mut(&seqno)
                .expect("expiry pop must have matching activation");
            *count -= 1;
            if *count == 0 {
                self.seqno_counts.remove(&seqno);
            }
        }
    }

    /// Returns the highest seqno among all active tombstones, or `None` if
    /// no tombstones are active.
    #[must_use]
    pub fn max_active_seqno(&self) -> Option<SeqNo> {
        self.seqno_counts.keys().next_back().copied()
    }

    /// Returns `true` if a KV with the given seqno is suppressed by any
    /// active tombstone (i.e., there exists an active tombstone with a
    /// higher seqno).
    #[must_use]
    pub fn is_suppressed(&self, key_seqno: SeqNo) -> bool {
        self.max_active_seqno().is_some_and(|max| key_seqno < max)
    }

    /// Bulk-activates tombstones at a seek position.
    ///
    /// # Invariant
    ///
    /// At any iterator position, the active set contains only tombstones
    /// where `start <= current_key < end` (and visible at their respective
    /// `cutoff_seqno`). Seek prefill must collect truly overlapping tombstones
    /// (`start <= key < end`); `expire_until` immediately enforces the
    /// `end` bound.
    #[cfg_attr(
        not(test),
        expect(dead_code, reason = "used by iterator initialization logic")
    )]
    pub fn initialize_from(
        &mut self,
        tombstones: impl IntoIterator<Item = (RangeTombstone, SeqNo)>,
    ) {
        for (rt, cutoff) in tombstones {
            self.activate(&rt, cutoff);
        }
    }

    /// Returns `true` if there are no active tombstones.
    #[cfg_attr(
        not(test),
        expect(dead_code, reason = "helper for callers to inspect active tombstones")
    )]
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.seqno_counts.is_empty()
    }
}

/// Tracks active range tombstones during reverse iteration.
///
/// During reverse scans, tombstones are activated when the scan reaches
/// a key < `end` (strict `>`: `rt.end > current_key`), and expired when
/// `current_key < rt.start`.
///
/// Uses a sorted vector keyed by `(start asc, id asc)` in comparator order,
/// with the expiring-soonest tombstone kept at the tail for cheap `last()`.
pub struct ActiveTombstoneSetReverse {
    comparator: SharedComparator,
    seqno_counts: BTreeMap<SeqNo, u32>,
    pending_expiry: Vec<(UserKey, u64, SeqNo)>,
    next_id: u64,
}

impl ActiveTombstoneSetReverse {
    /// Creates a new reverse active tombstone set.
    #[must_use]
    #[cfg_attr(
        not(test),
        expect(
            dead_code,
            reason = "backward-compatible default-comparator constructor"
        )
    )]
    pub fn new() -> Self {
        Self::new_with_comparator(crate::comparator::default_comparator())
    }

    /// Creates a new reverse active tombstone set with the given comparator.
    #[must_use]
    pub fn new_with_comparator(comparator: SharedComparator) -> Self {
        Self {
            comparator,
            seqno_counts: BTreeMap::new(),
            pending_expiry: Vec::new(),
            next_id: 0,
        }
    }

    /// Activates a range tombstone, adding it to the active set.
    ///
    /// The tombstone is only activated if it is visible at `cutoff_seqno`
    /// (i.e., `rt.seqno < cutoff_seqno`). Each source may supply a different
    /// cutoff (e.g., ephemeral memtable uses its own `index_seqno`).
    /// Duplicate activations (same seqno from different sources) are handled
    /// correctly via multiset accounting.
    ///
    /// For reverse iteration, activation uses strict `>`: tombstones with
    /// `rt.end > current_key` are activated. `key == end` is NOT covered
    /// (half-open).
    pub fn activate(&mut self, rt: &RangeTombstone, cutoff_seqno: SeqNo) {
        if !rt.visible_at(cutoff_seqno) {
            return;
        }
        let id = self.next_id;
        self.next_id += 1;
        *self.seqno_counts.entry(rt.seqno).or_insert(0) += 1;
        let comparator = self.comparator.as_ref();
        let pos = self
            .pending_expiry
            .binary_search_by(|(start, existing_id, _)| {
                comparator
                    .compare(start, &rt.start)
                    .then_with(|| existing_id.cmp(&id))
            })
            .unwrap_or_else(|idx| idx);
        self.pending_expiry
            .insert(pos, (rt.start.clone(), id, rt.seqno));
    }

    /// Expires tombstones whose `start > current_key`.
    ///
    /// During reverse iteration, when the scan moves to a key that is
    /// before a tombstone's start, that tombstone no longer applies.
    ///
    /// # Panics
    ///
    /// Panics if an expiry pop has no matching activation in the seqno multiset.
    pub fn expire_until(&mut self, current_key: &[u8]) {
        while let Some((start, _, seqno)) = self.pending_expiry.last() {
            if self.comparator.compare(current_key, start) == std::cmp::Ordering::Less {
                let seqno = *seqno;
                self.pending_expiry.pop();
                #[expect(
                    clippy::expect_used,
                    reason = "expiry pop must have matching activation"
                )]
                let count = self
                    .seqno_counts
                    .get_mut(&seqno)
                    .expect("expiry pop must have matching activation");
                *count -= 1;
                if *count == 0 {
                    self.seqno_counts.remove(&seqno);
                }
            } else {
                break;
            }
        }
    }

    /// Returns the highest seqno among all active tombstones, or `None` if
    /// no tombstones are active.
    #[must_use]
    pub fn max_active_seqno(&self) -> Option<SeqNo> {
        self.seqno_counts.keys().next_back().copied()
    }

    /// Returns `true` if a KV with the given seqno is suppressed by any
    /// active tombstone (i.e., there exists an active tombstone with a
    /// higher seqno).
    #[must_use]
    pub fn is_suppressed(&self, key_seqno: SeqNo) -> bool {
        self.max_active_seqno().is_some_and(|max| key_seqno < max)
    }

    /// Bulk-activates tombstones at a seek position (for reverse).
    #[cfg_attr(
        not(test),
        expect(dead_code, reason = "used by iterator initialization logic")
    )]
    pub fn initialize_from(
        &mut self,
        tombstones: impl IntoIterator<Item = (RangeTombstone, SeqNo)>,
    ) {
        for (rt, cutoff) in tombstones {
            self.activate(&rt, cutoff);
        }
    }

    /// Returns `true` if there are no active tombstones.
    #[cfg_attr(
        not(test),
        expect(dead_code, reason = "helper for callers to inspect active tombstones")
    )]
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.seqno_counts.is_empty()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::UserKey;

    fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone {
        RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno)
    }

    // ──── Forward tests ────

    #[test]
    fn forward_activate_and_suppress() {
        let mut set = ActiveTombstoneSet::new();
        set.activate(&rt(b"a", b"m", 10), 100);
        assert!(set.is_suppressed(5));
        assert!(!set.is_suppressed(10));
        assert!(!set.is_suppressed(15));
    }

    #[test]
    fn forward_expire_at_end() {
        let mut set = ActiveTombstoneSet::new();
        set.activate(&rt(b"a", b"m", 10), 100);
        assert!(set.is_suppressed(5));
        set.expire_until(b"m"); // key == end, tombstone expires
        assert!(!set.is_suppressed(5));
    }

    #[test]
    fn forward_expire_past_end() {
        let mut set = ActiveTombstoneSet::new();
        set.activate(&rt(b"a", b"m", 10), 100);
        set.expire_until(b"z");
        assert!(set.is_empty());
    }

    #[test]
    fn forward_not_expired_before_end() {
        let mut set = ActiveTombstoneSet::new();
        set.activate(&rt(b"a", b"m", 10), 100);
        set.expire_until(b"l");
        assert!(set.is_suppressed(5)); // still active
    }

    #[test]
    fn forward_invisible_tombstone_not_activated() {
        let mut set = ActiveTombstoneSet::new();
        set.activate(&rt(b"a", b"m", 10), 5); // seqno 10 > cutoff 5
        assert!(!set.is_suppressed(1));
        assert!(set.is_empty());
    }

    #[test]
    fn forward_multiple_tombstones_max_seqno() {
        let mut set = ActiveTombstoneSet::new();
        set.activate(&rt(b"a", b"m", 10), 100);
        set.activate(&rt(b"b", b"n", 20), 100);
        assert_eq!(set.max_active_seqno(), Some(20));
        assert!(set.is_suppressed(15)); // 15 < 20
    }

    #[test]
    fn forward_duplicate_end_seqno_accounting() {
        let mut set = ActiveTombstoneSet::new();
        set.activate(&rt(b"a", b"m", 10), 100);
        set.activate(&rt(b"b", b"m", 10), 100);
        assert_eq!(set.max_active_seqno(), Some(10));

        set.expire_until(b"m");
        assert_eq!(set.max_active_seqno(), None);
        assert!(set.is_empty());
    }

    #[test]
    fn forward_initialize_from() {
        let mut set = ActiveTombstoneSet::new();
        set.initialize_from(vec![(rt(b"a", b"m", 10), 100), (rt(b"b", b"z", 20), 100)]);
        assert_eq!(set.max_active_seqno(), Some(20));
    }

    #[test]
    fn forward_initialize_and_expire() {
        let mut set = ActiveTombstoneSet::new();
        set.initialize_from(vec![(rt(b"a", b"d", 10), 100), (rt(b"b", b"f", 20), 100)]);
        set.expire_until(b"e"); // expires [a,d) but not [b,f)
        assert_eq!(set.max_active_seqno(), Some(20));
        set.expire_until(b"f"); // expires [b,f)
        assert!(set.is_empty());
    }

    #[test]
    fn forward_mixed_cutoffs_activates_only_visible_rt() {
        let mut set = ActiveTombstoneSet::new();
        // RT from source with cutoff 15 — visible (10 < 15)
        set.activate(&rt(b"a", b"m", 10), 15);
        // RT from source with cutoff 5 — NOT visible (10 >= 5)
        set.activate(&rt(b"a", b"z", 10), 5);
        assert_eq!(set.max_active_seqno(), Some(10));
        assert!(!set.is_empty());

        // Expire past the first RT's end; the set should now be empty if the
        // second RT was never incorrectly activated.
        set.expire_until(b"m");
        assert!(set.is_empty());
    }

    #[test]
    fn forward_expire_narrower_tombstone_before_wider_one() {
        let mut set = ActiveTombstoneSet::new();
        set.activate(&rt(b"\x00", b"\x06", 3), 100);
        set.activate(&rt(b"\x00", b"\x01", 5), 100);

        assert_eq!(set.max_active_seqno(), Some(5));
        set.expire_until(b"\x02");

        assert_eq!(set.max_active_seqno(), Some(3));
        assert!(!set.is_suppressed(4));
        assert!(set.is_suppressed(2));
    }

    // ──── Reverse tests ────

    #[test]
    fn reverse_activate_and_suppress() {
        let mut set = ActiveTombstoneSetReverse::new();
        set.activate(&rt(b"a", b"m", 10), 100);
        assert!(set.is_suppressed(5));
        assert!(!set.is_suppressed(10));
    }

    #[test]
    fn reverse_expire_before_start() {
        let mut set = ActiveTombstoneSetReverse::new();
        set.activate(&rt(b"d", b"m", 10), 100);

        set.expire_until(b"c");
        assert!(set.is_empty());
    }

    #[test]
    fn reverse_initialize_from() {
        let mut set = ActiveTombstoneSetReverse::new();
        set.initialize_from(vec![(rt(b"a", b"m", 10), 100), (rt(b"b", b"z", 20), 100)]);
        assert_eq!(set.max_active_seqno(), Some(20));
    }

    #[test]
    fn reverse_not_expired_at_start() {
        let mut set = ActiveTombstoneSetReverse::new();
        set.activate(&rt(b"d", b"m", 10), 100);

        set.expire_until(b"d");
        assert!(set.is_suppressed(5));
    }

    #[test]
    fn reverse_invisible_tombstone_not_activated() {
        let mut set = ActiveTombstoneSetReverse::new();
        set.activate(&rt(b"a", b"m", 10), 5);
        assert!(set.is_empty());
    }

    #[test]
    fn reverse_duplicate_end_seqno_accounting() {
        let mut set = ActiveTombstoneSetReverse::new();
        set.activate(&rt(b"d", b"m", 10), 100);
        set.activate(&rt(b"d", b"n", 10), 100);
        assert_eq!(set.max_active_seqno(), Some(10));

        set.expire_until(b"c");
        assert_eq!(set.max_active_seqno(), None);
        assert!(set.is_empty());
    }

    #[test]
    fn reverse_multiple_tombstones() {
        let mut set = ActiveTombstoneSetReverse::new();
        set.activate(&rt(b"a", b"m", 10), 100);
        set.activate(&rt(b"f", b"z", 20), 100);
        assert_eq!(set.max_active_seqno(), Some(20));

        set.expire_until(b"e");
        assert_eq!(set.max_active_seqno(), Some(10));
    }

    #[test]
    fn reverse_mixed_cutoffs_activates_only_visible_rt() {
        let mut set = ActiveTombstoneSetReverse::new();
        // RT from source with cutoff 15 — visible (10 < 15)
        set.activate(&rt(b"n", b"z", 10), 15);
        // RT from source with cutoff 5 — NOT visible (10 >= 5)
        set.activate(&rt(b"a", b"m", 10), 5);
        assert_eq!(set.max_active_seqno(), Some(10));

        // Advance expiry past the visible tombstone's start but not the
        // invisible one's.  If only the visible RT was activated, the set
        // should become empty.
        set.expire_until(b"l");
        assert_eq!(set.max_active_seqno(), None);
        assert!(set.is_empty());
    }
}