coordinode-lsm-tree 5.3.0

Embedded LSM-tree storage engine: BuRR filters, zstd dictionary compression, MVCC, range tombstones, merge operators, K/V separation, AES-256-GCM at rest.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
// SPDX-License-Identifier: Apache-2.0
// Copyright (c) 2026-present, Structured World Foundation

//! Compaction strategy that rewrites SSTs flagged for ECC self-healing.
//!
//! A block read that recovers its payload from Page-ECC parity returns correct
//! bytes but leaves the fault on disk (SSTs are immutable, so the block cannot
//! be patched). The read path records the owning SST in the tree's
//! [`HealHints`]; this strategy claims one such SST per pass and emits a
//! single-table [`Merge`](super::Choice::Merge) back into its own level. The
//! merge re-reads the block (correcting it once more on the way in) and writes a
//! fresh SST with newly computed parity, so subsequent reads need no correction.
//!
//! Run it repeatedly (from the compaction loop, leader-only in a clustered
//! deployment) until [`HealHints::is_empty`] reports the queue drained.

use super::{Choice, CompactionStrategy, Input as CompactionInput};
use crate::{
    HashSet, compaction::state::CompactionState, config::Config, heal_hints::HealHints,
    version::Version,
};
use alloc::sync::Arc;

/// Name reported by [`CompactionStrategy::get_name`].
pub const NAME: &str = "EccHealCompaction";

/// Rewrites one ECC-flagged SST per invocation to clear a latent parity fault.
///
/// Holds a shared handle to the tree's [`HealHints`]; obtain it from
/// [`Tree::heal_hints`](crate::Tree::heal_hints).
pub struct Strategy {
    hints: Arc<HealHints>,
    target_size: u64,
}

impl Strategy {
    /// Builds a heal strategy over `hints`.
    ///
    /// `target_size` caps the rewritten output run's table size (use the level's
    /// target, or [`u64::MAX`] to keep the SST a single table).
    #[must_use]
    pub fn new(hints: Arc<HealHints>, target_size: u64) -> Self {
        Self { hints, target_size }
    }
}

impl CompactionStrategy for Strategy {
    fn get_name(&self) -> &'static str {
        NAME
    }

    fn choose(&self, version: &Version, _cfg: &Config, state: &CompactionState) -> Choice {
        // Claim flagged SSTs one at a time. An id no longer in the tree (already
        // compacted away since the hint) is dropped; one currently hidden in
        // another compaction is re-queued for the next pass.
        while let Some(global_id) = self.hints.pop() {
            let table_id = global_id.table_id();

            let Some(level_idx) = version
                .iter_levels()
                .position(|level| level.list_ids().contains(&table_id))
            else {
                // Gone — nothing left to heal for this id.
                continue;
            };

            if state.hidden_set().is_hidden(table_id) {
                // Busy in another compaction; put it back and try next pass.
                self.hints.record(global_id);
                return Choice::DoNothing;
            }

            #[expect(
                clippy::cast_possible_truncation,
                reason = "level index is bounded by level_count, which is a u8"
            )]
            let level = level_idx as u8;

            return Choice::Merge(CompactionInput {
                table_ids: core::iter::once(table_id).collect::<HashSet<_>>(),
                dest_level: level,
                canonical_level: level,
                target_size: self.target_size,
            });
        }

        Choice::DoNothing
    }
}

#[cfg(test)]
mod strategy_tests {
    use crate::{AbstractTree, AnyTree, Config, GlobalTableId, SequenceNumberCounter};
    use alloc::sync::Arc;

    /// A queued id that is no longer present in the tree (already compacted away
    /// since the hint) is dropped: the strategy drains it and chooses nothing,
    /// rather than emitting a merge for a missing table.
    #[test]
    fn ecc_heal_drops_ids_no_longer_in_the_tree() -> crate::Result<()> {
        let dir = tempfile::tempdir()?;
        let AnyTree::Standard(tree) = Config::new(
            dir.path(),
            SequenceNumberCounter::default(),
            SequenceNumberCounter::default(),
        )
        .open()?
        else {
            unreachable!("standard tree configured (no kv separation)");
        };
        tree.insert("k", "v", 1);
        tree.flush_active_memtable(1)?;

        // Flag an SST id that does not exist in this tree.
        let hints = tree.heal_hints();
        hints.record(GlobalTableId::from((tree.id(), 999_999)));
        assert!(!hints.is_empty());

        let result = tree.compact(Arc::new(super::Strategy::new(hints.clone(), u64::MAX)), 0)?;
        assert!(
            hints.is_empty(),
            "a stale id must be drained, not left queued; got {result:?}",
        );
        Ok(())
    }
}

#[cfg(all(test, feature = "page_ecc"))]
mod tests {
    use crate::{
        AbstractTree,
        Config,
        MAX_SEQNO,
        SequenceNumberCounter,
        runtime_config::EccScheme,
        // `BlockIndex` is imported only for its `.iter()` method on
        // `table.block_index` (a trait method); `as _` keeps it in scope for
        // method resolution without binding the unused type name.
        table::{block::Header, block_index::BlockIndex as _},
    };
    use alloc::sync::Arc;

    /// Full self-heal cycle: a read that recovers a corrupted data block via RS
    /// parity records the SST; running [`super::Strategy`] rewrites that SST so
    /// the re-read is clean and records no further hint.
    #[test]
    fn ecc_heal_strategy_rewrites_flagged_sst_so_reread_is_clean() -> crate::Result<()> {
        let dir = tempfile::tempdir()?;

        // Write one SST whose data blocks carry RS(8,2) parity, then capture the
        // first data block's on-disk offset before dropping the tree.
        let (sst_path, corrupt_pos) = {
            let tree = Config::new(
                dir.path(),
                SequenceNumberCounter::default(),
                SequenceNumberCounter::default(),
            )
            .page_ecc(true)
            .ecc_scheme(EccScheme::ReedSolomon {
                data_shards: 8,
                parity_shards: 2,
            })
            .open()?;
            let crate::AnyTree::Standard(tree) = tree else {
                unreachable!("standard tree configured (no kv separation)");
            };
            for i in 0u64..2_000 {
                tree.insert(format!("key-{i:06}"), format!("v{i:06}"), i);
            }
            tree.flush_active_memtable(2_000)?;

            let binding = tree.version_history.read().latest_version();
            #[expect(clippy::expect_used, reason = "flush produced exactly one table")]
            let table = binding.version.iter_tables().next().expect("one table");
            #[expect(clippy::expect_used, reason = "table has at least one data block")]
            let keyed = table.block_index.iter().next().expect("a data block")?;
            // Target-conditional truncation: `as usize` only narrows on 32-bit
            // pointer widths, so `allow` (not `expect`) keeps it clean on the
            // 64-bit host where clippy frames it as a portability note.
            #[allow(
                clippy::cast_possible_truncation,
                reason = "in-file block offset fits usize; only narrows on 32-bit targets"
            )]
            let off = keyed.offset().0 as usize;
            ((*table.path).clone(), off + Header::MIN_LEN + 3)
        };

        // Flip one payload byte of the first data block (RS-correctable).
        let mut bytes = std::fs::read(&sst_path)?;
        #[expect(
            clippy::indexing_slicing,
            reason = "corrupt_pos is an in-file block offset, in range for the SST bytes"
        )]
        let slot = &mut bytes[corrupt_pos];
        *slot ^= 0x80;
        std::fs::write(&sst_path, &bytes)?;

        // Reopen (fresh caches + fds) so the read hits the tampered bytes.
        let tree = Config::new(
            dir.path(),
            SequenceNumberCounter::default(),
            SequenceNumberCounter::default(),
        )
        .page_ecc(true)
        .ecc_scheme(EccScheme::ReedSolomon {
            data_shards: 8,
            parity_shards: 2,
        })
        .open()?;
        let crate::AnyTree::Standard(tree) = tree else {
            unreachable!("standard tree configured (no kv separation)");
        };
        assert!(tree.heal_hints().is_empty(), "fresh tree has no hints");

        // Opt into rewrite scheduling (default off). The gate tracks auto_heal.
        assert!(!tree.heal_hints().is_enabled(), "auto_heal defaults off");
        tree.update_runtime_config(|c| c.auto_heal = true)?;
        assert!(
            tree.heal_hints().is_enabled(),
            "auto_heal toggle syncs the gate"
        );

        // A read of a key in the corrupted block repairs it (correct value) and
        // records the SST for healing.
        #[expect(clippy::expect_used, reason = "key was inserted before flush")]
        let got = tree.get(b"key-000000", MAX_SEQNO)?.expect("key present");
        assert_eq!(&*got, b"v000000", "ECC must repair the value on read");
        assert!(
            !tree.heal_hints().is_empty(),
            "a persistent ECC correction must record a heal hint",
        );
        #[cfg(feature = "metrics")]
        {
            assert_eq!(
                tree.metrics().ecc_auto_heal_scheduled_count(),
                1,
                "the scheduled SST is counted once",
            );
            // The recovery is attributed to the RS shard path (this SST uses an
            // RS scheme), not SEC-DED, and counted once on the primary read.
            assert_eq!(
                tree.metrics().ecc_shard_recovered_count(),
                1,
                "the RS recovery is counted once",
            );
            assert_eq!(
                tree.metrics().ecc_secded_corrected_count(),
                0,
                "an RS recovery must not increment the SEC-DED counter",
            );
            assert_eq!(
                tree.metrics().ecc_recovered_count(),
                1,
                "one total recovery"
            );
        }

        // Run the heal strategy: it claims the SST and rewrites it clean.
        let result = tree.compact(
            Arc::new(super::Strategy::new(tree.heal_hints(), u64::MAX)),
            0,
        )?;
        assert!(
            tree.heal_hints().is_empty(),
            "heal compaction must drain the hint queue, got {result:?}",
        );

        // The healed SST reads clean: correct value, no fresh correction hint.
        #[expect(clippy::expect_used, reason = "key survives the rewrite")]
        let got = tree
            .get(b"key-000000", MAX_SEQNO)?
            .expect("key present after heal");
        assert_eq!(&*got, b"v000000", "healed value must still be correct");
        assert!(
            tree.heal_hints().is_empty(),
            "the rewritten SST must read clean (no further correction)",
        );

        Ok(())
    }

    /// End-to-end SEC-DED recovery through a tree read: a single-bit flip in a
    /// SEC-DED-protected SST is healed on read by the SEC-DED fast path and
    /// attributed to the SEC-DED counter (NOT the RS shard counter), proving the
    /// unified recovery metric distinguishes the two heal mechanisms. Gated on
    /// `metrics`: the counter is the whole point.
    #[cfg(feature = "metrics")]
    #[test]
    fn read_healing_single_bit_increments_secded_counter() -> crate::Result<()> {
        let dir = tempfile::tempdir()?;

        // Write one SST whose data blocks carry SEC-DED parity, capture the
        // first data block's on-disk offset.
        let (sst_path, corrupt_pos) = {
            let crate::AnyTree::Standard(tree) = Config::new(
                dir.path(),
                SequenceNumberCounter::default(),
                SequenceNumberCounter::default(),
            )
            .page_ecc(true)
            .ecc_scheme(EccScheme::Secded)
            .open()?
            else {
                unreachable!("standard tree configured (no kv separation)");
            };
            for i in 0u64..2_000 {
                tree.insert(format!("key-{i:06}"), format!("v{i:06}"), i);
            }
            tree.flush_active_memtable(2_000)?;

            let binding = tree.version_history.read().latest_version();
            #[expect(clippy::expect_used, reason = "flush produced exactly one table")]
            let table = binding.version.iter_tables().next().expect("one table");
            #[expect(clippy::expect_used, reason = "table has at least one data block")]
            let keyed = table.block_index.iter().next().expect("a data block")?;
            // Target-conditional truncation: `as usize` only narrows on 32-bit
            // pointer widths, so `allow` (not `expect`) keeps it clean on the
            // 64-bit host.
            #[allow(
                clippy::cast_possible_truncation,
                reason = "in-file block offset fits usize; only narrows on 32-bit targets"
            )]
            let off = keyed.offset().0 as usize;
            ((*table.path).clone(), off + Header::MIN_LEN + 3)
        };

        // Flip a SINGLE bit: the SEC-DED fast path corrects one bit per word.
        let mut bytes = std::fs::read(&sst_path)?;
        {
            #[expect(
                clippy::expect_used,
                reason = "corrupt_pos is an in-file block offset, in range for the SST bytes"
            )]
            let slot = bytes.get_mut(corrupt_pos).expect("corrupt_pos in range");
            *slot ^= 0x01;
        }
        std::fs::write(&sst_path, &bytes)?;

        // Reopen (fresh caches + fds) so the read hits the tampered bytes.
        let crate::AnyTree::Standard(tree) = Config::new(
            dir.path(),
            SequenceNumberCounter::default(),
            SequenceNumberCounter::default(),
        )
        .page_ecc(true)
        .ecc_scheme(EccScheme::Secded)
        .open()?
        else {
            unreachable!("standard tree configured (no kv separation)");
        };

        #[expect(clippy::expect_used, reason = "key was inserted before flush")]
        let got = tree.get(b"key-000000", MAX_SEQNO)?.expect("key present");
        assert_eq!(&*got, b"v000000", "SEC-DED must heal the single-bit flip");

        assert_eq!(
            tree.metrics().ecc_secded_corrected_count(),
            1,
            "the SEC-DED heal is counted once",
        );
        assert_eq!(
            tree.metrics().ecc_shard_recovered_count(),
            0,
            "a SEC-DED heal must not increment the RS shard counter",
        );
        assert_eq!(
            tree.metrics().ecc_recovered_count(),
            1,
            "one total recovery"
        );
        Ok(())
    }

    /// The zstd partial-decode read path also schedules auto-heal: a bounded
    /// range scan that takes the partial path over a corrupted large block
    /// recovers via parity and flags the SST, just like the full load path.
    #[cfg(feature = "zstd")]
    #[test]
    fn ecc_heal_scheduled_on_partial_decode_corrected_read() -> crate::Result<()> {
        use crate::{
            CompressionType,
            config::{BlockSizePolicy, CompressionPolicy},
        };

        // Opt into the partial-decode path for this test process (OnceLock-cached;
        // nextest isolates each test in its own process so this does not leak).
        // SAFETY: set before any tree in this process reads the env.
        unsafe { std::env::set_var("LSM_PARTIAL_DECODE", "1") };

        let dir = tempfile::tempdir()?;
        let open = || {
            Config::new(
                dir.path(),
                SequenceNumberCounter::default(),
                SequenceNumberCounter::default(),
            )
            .page_ecc(true)
            .ecc_scheme(EccScheme::ReedSolomon {
                data_shards: 8,
                parity_shards: 2,
            })
            .data_block_compression_policy(CompressionPolicy::all(
                #[expect(clippy::expect_used, reason = "19 is a valid zstd level")]
                CompressionType::zstd(19).expect("valid level"),
            ))
            .data_block_size_policy(BlockSizePolicy::all(512 * 1024))
            .open()
        };

        // Build a large multi-inner-block zstd+ECC SST; capture the first data
        // block's on-disk offset before dropping the tree.
        let (sst_path, corrupt_pos) = {
            let crate::AnyTree::Standard(tree) = open()? else {
                unreachable!("standard tree configured");
            };
            for i in 0u64..20_000 {
                tree.insert(
                    format!("key-{i:08}"),
                    format!("value-{i:08}-padding-padding"),
                    0,
                );
            }
            tree.flush_active_memtable(0)?;

            let binding = tree.version_history.read().latest_version();
            #[expect(clippy::expect_used, reason = "flush produced exactly one table")]
            let table = binding.version.iter_tables().next().expect("one table");
            #[expect(clippy::expect_used, reason = "table has at least one data block")]
            let keyed = table.block_index.iter().next().expect("a data block")?;
            // Target-conditional truncation: `as usize` only narrows on 32-bit
            // pointer widths, so `allow` (not `expect`) keeps it clean on the
            // 64-bit host where clippy frames it as a portability note.
            #[allow(
                clippy::cast_possible_truncation,
                reason = "in-file block offset fits usize; only narrows on 32-bit targets"
            )]
            let off = keyed.offset().0 as usize;
            ((*table.path).clone(), off + Header::MIN_LEN + 8)
        };

        // Flip one byte of the first block's compressed frame (RS-correctable).
        let mut bytes = std::fs::read(&sst_path)?;
        #[expect(
            clippy::indexing_slicing,
            reason = "corrupt_pos is an in-file block offset, in range for the SST bytes"
        )]
        let slot = &mut bytes[corrupt_pos];
        *slot ^= 0x01;
        std::fs::write(&sst_path, &bytes)?;

        // Reopen (fresh caches/fds), opt into healing, then run a bounded range
        // scan whose upper bound falls inside the first block so the read takes
        // the partial-decode path.
        let crate::AnyTree::Standard(tree) = open()? else {
            unreachable!("standard tree configured");
        };
        tree.update_runtime_config(|c| c.auto_heal = true)?;

        let count = tree
            .range(
                b"key-00000000".to_vec()..b"key-00000050".to_vec(),
                MAX_SEQNO,
                None,
            )
            .count();
        assert!(count > 0, "bounded range returned rows");
        assert!(
            !tree.heal_hints().is_empty(),
            "a corrected read on the partial-decode path must schedule healing",
        );

        Ok(())
    }
}