Skip to main content

lsm_tree/
verify.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5use crate::path::{Path, PathBuf};
6use crate::{checksum::Checksum, coding::Decode, io, table::TableId, table::block::Header};
7#[cfg(not(feature = "std"))]
8use alloc::{boxed::Box, string::String, vec::Vec};
9
10/// Describes a single integrity error found during verification.
11///
12/// Full-file integrity (hashing whole files by path) uses `std::fs` directly and
13/// is gated to `std`; the `no_std` verify path is block-level over the injected
14/// [`Fs`](crate::fs::Fs) backend (see [`verify_block_checksums`]).
15#[cfg(feature = "std")]
16#[derive(Debug)]
17#[non_exhaustive]
18pub enum IntegrityError {
19    /// Full-file checksum mismatch for an SST table.
20    SstFileCorrupted {
21        /// Table ID
22        table_id: TableId,
23        /// Path to the corrupted file
24        path: PathBuf,
25        /// Checksum stored in the manifest
26        expected: Checksum,
27        /// Checksum computed from disk
28        got: Checksum,
29    },
30
31    /// Full-file checksum mismatch for a blob file.
32    BlobFileCorrupted {
33        /// Blob file ID
34        blob_file_id: u64,
35        /// Path to the corrupted file
36        path: PathBuf,
37        /// Checksum stored in the manifest
38        expected: Checksum,
39        /// Checksum computed from disk
40        got: Checksum,
41    },
42
43    /// I/O error while reading a file during verification.
44    IoError {
45        /// Path to the file that could not be read
46        path: PathBuf,
47        /// The underlying I/O error
48        error: io::Error,
49    },
50}
51
52#[cfg(feature = "std")]
53impl core::fmt::Display for IntegrityError {
54    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
55        match self {
56            Self::SstFileCorrupted {
57                table_id,
58                path,
59                expected,
60                got,
61            } => write!(
62                f,
63                "SST table {table_id} corrupted at {}: expected {expected}, got {got}",
64                path.display()
65            ),
66            Self::BlobFileCorrupted {
67                blob_file_id,
68                path,
69                expected,
70                got,
71            } => write!(
72                f,
73                "blob file {blob_file_id} corrupted at {}: expected {expected}, got {got}",
74                path.display()
75            ),
76            Self::IoError { path, error } => {
77                write!(f, "I/O error reading {}: {}", path.display(), error)
78            }
79        }
80    }
81}
82
83#[cfg(feature = "std")]
84impl core::error::Error for IntegrityError {
85    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
86        match self {
87            Self::IoError { error, .. } => Some(error),
88            _ => None,
89        }
90    }
91}
92
93/// Result of an integrity verification scan.
94///
95/// The `sst_files_checked` and `blob_files_checked` counters reflect
96/// the number of files *attempted* — including those that produced I/O
97/// errors. This lets callers reconcile the total against the manifest
98/// even when some files were unreadable.
99#[cfg(feature = "std")]
100#[derive(Debug)]
101#[non_exhaustive]
102pub struct IntegrityReport {
103    /// Number of SST table files checked (includes I/O errors).
104    pub sst_files_checked: usize,
105
106    /// Number of blob files checked (includes I/O errors).
107    pub blob_files_checked: usize,
108
109    /// Integrity errors found during verification.
110    pub errors: Vec<IntegrityError>,
111}
112
113#[cfg(feature = "std")]
114impl IntegrityReport {
115    /// Returns `true` if no errors were found.
116    #[must_use]
117    pub fn is_ok(&self) -> bool {
118        self.errors.is_empty()
119    }
120
121    /// Total number of files checked (SST + blob).
122    #[must_use]
123    pub fn files_checked(&self) -> usize {
124        self.sst_files_checked + self.blob_files_checked
125    }
126}
127
128/// Computes a streaming XXH3 128-bit checksum for a file without loading it entirely into memory.
129///
130/// `pub(crate)` so [`crate::salvage`] can stamp the salvaged-source open with
131/// the file's current digest (the source may be corrupt, so its digest is
132/// whatever bytes are on disk; per-block checksums catch the actual damage).
133#[cfg(feature = "std")]
134pub(crate) fn stream_checksum(path: &std::path::Path) -> std::io::Result<Checksum> {
135    use std::io::Read;
136
137    let mut reader = std::fs::File::open(path)?;
138    let mut hasher = xxhash_rust::xxh3::Xxh3Default::new();
139    let mut buf = vec![0u8; 64 * 1024];
140
141    loop {
142        let n = match reader.read(&mut buf) {
143            Ok(n) => n,
144            Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
145            Err(e) => return Err(e),
146        };
147        if n == 0 {
148            break;
149        }
150        // Safety: Read::read guarantees n <= buf.len(), so get(..n) always
151        // returns Some. We use .get() instead of direct indexing to satisfy
152        // the crate-wide #[deny(clippy::indexing_slicing)] lint.
153        if let Some(chunk) = buf.get(..n) {
154            hasher.update(chunk);
155        }
156    }
157
158    Ok(Checksum::from_raw(hasher.digest128()))
159}
160
161/// Verifies full-file checksums for all SST and blob files in the given tree.
162///
163/// Each file's content is read from disk and hashed with XXHash-3 128-bit,
164/// then compared against the checksum stored in the version manifest.
165///
166/// This detects silent bit-rot, partial writes, and other on-disk corruption.
167///
168/// Per-file errors (e.g., unreadable files, checksum mismatches) are collected
169/// into [`IntegrityReport::errors`] — the scan always runs to completion.
170#[cfg(feature = "std")]
171#[must_use]
172pub fn verify_integrity(tree: &impl crate::AbstractTree) -> IntegrityReport {
173    let version = tree.current_version();
174
175    let mut report = IntegrityReport {
176        sst_files_checked: 0,
177        blob_files_checked: 0,
178        errors: Vec::new(),
179    };
180
181    // Verify all SST table files
182    for table in version.iter_tables() {
183        let path = &*table.path;
184        let expected = table.checksum();
185
186        match stream_checksum(path) {
187            Ok(got) if got != expected => {
188                report.errors.push(IntegrityError::SstFileCorrupted {
189                    table_id: table.id(),
190                    path: (*table.path).clone(),
191                    expected,
192                    got,
193                });
194            }
195            Ok(_) => {}
196            Err(e) => {
197                report.errors.push(IntegrityError::IoError {
198                    path: (*table.path).clone(),
199                    error: e.into(),
200                });
201            }
202        }
203
204        report.sst_files_checked += 1;
205    }
206
207    // Verify all blob files
208    for blob_file in version.blob_files.iter() {
209        let path = blob_file.path();
210        let expected = blob_file.checksum();
211
212        match stream_checksum(path) {
213            Ok(got) if got != expected => {
214                report.errors.push(IntegrityError::BlobFileCorrupted {
215                    blob_file_id: blob_file.id(),
216                    path: path.to_path_buf(),
217                    expected,
218                    got,
219                });
220            }
221            Ok(_) => {}
222            Err(e) => {
223                report.errors.push(IntegrityError::IoError {
224                    path: path.to_path_buf(),
225                    error: e.into(),
226                });
227            }
228        }
229
230        report.blob_files_checked += 1;
231    }
232
233    report
234}
235
236// ── Block-level scrub ─────────────────────────────────────────────────────
237// `verify_integrity` above hashes each SST as one opaque byte stream and
238// compares the digest to the per-file checksum stored in the manifest. That
239// catches whole-file corruption but identifies the bad region only at file
240// granularity. The functions below walk every block inside every SST and
241// verify per-block XXH3 against the value embedded in each block's own
242// header, so a corrupt block can be reported with its exact `(file, offset)`
243// without re-running the manifest-level scan.
244
245/// Per-block verification error.
246#[derive(Debug)]
247#[non_exhaustive]
248pub enum BlockVerifyError {
249    /// SST file could not be opened or its trailer parsed.
250    SstFileUnreadable {
251        /// Table ID.
252        table_id: TableId,
253        /// Path to the SST file.
254        path: PathBuf,
255        /// Underlying I/O / format error.
256        error: io::Error,
257    },
258
259    /// A block header at the given offset failed to parse — either
260    /// XXH3 mismatch on the header itself, or invalid magic bytes /
261    /// length fields that point at on-disk corruption.
262    HeaderCorrupted {
263        /// Table ID.
264        table_id: TableId,
265        /// Path to the SST file.
266        path: PathBuf,
267        /// File offset where the corrupt header was read from.
268        offset: u64,
269        /// Short description of the failure surfaced by header decoding.
270        reason: String,
271    },
272
273    /// A block's data XXH3 did not match the value stored in its header.
274    /// Indicates bit-rot or torn write on the block payload.
275    DataCorrupted {
276        /// Table ID.
277        table_id: TableId,
278        /// Path to the SST file.
279        path: PathBuf,
280        /// File offset where the block header sits (the data follows it).
281        offset: u64,
282        /// Length of the on-disk data segment, in bytes.
283        data_length: u32,
284        /// Checksum stored in the block header.
285        expected: Checksum,
286        /// Checksum computed from the on-disk bytes.
287        got: Checksum,
288    },
289
290    /// The block header was successfully decoded (its own XXH3
291    /// matched) but the subsequent fixed-length read of the data
292    /// segment failed at the filesystem layer — truncated file,
293    /// unexpected EOF, transient I/O error. Distinct from
294    /// `HeaderCorrupted` because the header itself was clean: the
295    /// failure is on the bytes that should follow it.
296    DataReadError {
297        /// Table ID.
298        table_id: TableId,
299        /// Path to the SST file.
300        path: PathBuf,
301        /// File offset where the (clean) header sits; the read for
302        /// its data segment started at `offset + Header::header_len(block_type)`.
303        offset: u64,
304        /// Length the (clean) header advertised for the data segment.
305        data_length: u32,
306        /// Underlying I/O error from the failed data-segment read.
307        /// Kept as `std::io::Error` (matching `SstFileUnreadable`) so
308        /// `ErrorKind` / OS code stay available to callers and so
309        /// `Error::source()` produces a coherent chain.
310        error: io::Error,
311    },
312
313    /// SFA TOC-level corruption: a named section's length / position
314    /// fields are inconsistent (overflow on addition), or seeking to
315    /// its declared start offset fails before any block is read.
316    /// Distinct from `HeaderCorrupted` (which is per-block) so
317    /// callers can tell "the section catalogue itself is bad" apart
318    /// from "block N inside an otherwise-walkable section is bad" —
319    /// e.g. a `TocCorrupted` makes the whole section unreachable,
320    /// while a `HeaderCorrupted` only stops that section's walk.
321    TocCorrupted {
322        /// Table ID.
323        table_id: TableId,
324        /// Path to the SST file.
325        path: PathBuf,
326        /// Section name from the TOC entry (e.g. `b"data"`,
327        /// `b"tli"`). Stored verbatim, not lossy-decoded, because
328        /// SFA section names are byte strings.
329        section_name: Vec<u8>,
330        /// File offset where the section *would* start per the TOC
331        /// entry. Useful for forensics even when the start is
332        /// unreachable.
333        section_offset: u64,
334        /// Short description of the failure (overflow on
335        /// start+length, seek error, etc.).
336        reason: String,
337    },
338}
339
340impl core::fmt::Display for BlockVerifyError {
341    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
342        match self {
343            Self::SstFileUnreadable {
344                table_id,
345                path,
346                error,
347            } => write!(
348                f,
349                "SST table {table_id} at {} could not be opened/parsed: {error}",
350                path.display(),
351            ),
352            Self::HeaderCorrupted {
353                table_id,
354                path,
355                offset,
356                reason,
357            } => write!(
358                f,
359                "SST table {table_id} at {}: block header at offset {offset} is corrupt ({reason})",
360                path.display(),
361            ),
362            Self::DataCorrupted {
363                table_id,
364                path,
365                offset,
366                data_length,
367                expected,
368                got,
369            } => write!(
370                f,
371                "SST table {table_id} at {}: block at offset {offset} ({data_length} bytes) data \
372                 checksum mismatch, expected {expected}, got {got}",
373                path.display(),
374            ),
375            Self::DataReadError {
376                table_id,
377                path,
378                offset,
379                data_length,
380                error,
381            } => write!(
382                f,
383                "SST table {table_id} at {}: failed to read {data_length}-byte data segment for \
384                 block at offset {offset}: {error}",
385                path.display(),
386            ),
387            Self::TocCorrupted {
388                table_id,
389                path,
390                section_name,
391                section_offset,
392                reason,
393            } => write!(
394                f,
395                "SST table {table_id} at {}: TOC section {:?} at offset {section_offset} is \
396                 unreachable ({reason})",
397                path.display(),
398                String::from_utf8_lossy(section_name),
399            ),
400        }
401    }
402}
403
404impl core::error::Error for BlockVerifyError {
405    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
406        match self {
407            Self::SstFileUnreadable { error, .. } | Self::DataReadError { error, .. } => {
408                Some(error)
409            }
410            _ => None,
411        }
412    }
413}
414
415/// A non-fatal finding from a scrub run: the data is intact, but something
416/// about a table could not be fully checked.
417#[derive(Debug)]
418#[non_exhaustive]
419pub enum BlockVerifyWarning {
420    /// The table's `descriptor#page_ecc` decodes to an ECC scheme this build
421    /// cannot apply (an unimplemented scheme, page granularity, an unknown
422    /// kind, or a non-canonical descriptor). Block payloads still verify by
423    /// their own checksums, but the parity trailer length is not derivable
424    /// from a scheme, so the sequential block walk cannot size it and ECC
425    /// verification was skipped for this table. Recompaction re-stamps the
426    /// table with a supported scheme.
427    UnrecognizedEcc {
428        /// Table the warning applies to.
429        table_id: TableId,
430        /// On-disk path of the SST.
431        path: PathBuf,
432    },
433}
434
435/// Aggregated result of a per-block scrub run.
436#[derive(Debug, Default)]
437#[non_exhaustive]
438pub struct BlockVerifyReport {
439    /// Number of SST table files visited (one per scan).
440    pub sst_files_scanned: usize,
441    /// Total blocks successfully header-read across all SSTs. Includes
442    /// blocks where the data checksum subsequently failed.
443    pub blocks_scanned: usize,
444    /// Per-block errors collected during the scan. The scan always
445    /// runs to completion across all SSTs even if individual blocks
446    /// or whole files are corrupt.
447    pub errors: Vec<BlockVerifyError>,
448    /// Non-fatal findings: data verified, but ECC could not be checked for
449    /// some tables (unrecognized scheme — recompaction recommended). Distinct
450    /// from `errors`: warnings do NOT make [`Self::is_ok`] false.
451    pub warnings: Vec<BlockVerifyWarning>,
452}
453
454impl BlockVerifyReport {
455    /// `true` if every block in every SST verified clean. Warnings (e.g. an
456    /// unrecognized ECC scheme whose data still checksum-verified) do NOT
457    /// make this false — only real corruption (`errors`) does.
458    #[must_use]
459    pub fn is_ok(&self) -> bool {
460        self.errors.is_empty()
461    }
462
463    /// `true` if the scrub produced any non-fatal warning.
464    #[must_use]
465    pub fn has_warnings(&self) -> bool {
466        !self.warnings.is_empty()
467    }
468}
469
470/// Options for the block-checksum scrubber
471/// ([`verify_block_checksums_with`] / [`AbstractTree::verify_checksum_with`](crate::AbstractTree::verify_checksum_with)).
472#[derive(Clone, Debug)]
473pub struct VerifyOptions {
474    /// Number of SSTs to scan concurrently. Clamped to `>= 1` and to the table
475    /// count. `1` (the default) scans sequentially in table order with no
476    /// thread spawn. Per-SST scans are independent (each opens its own file
477    /// through the table's `Fs` handle), so they parallelize cleanly.
478    pub parallelism: usize,
479
480    /// Minimum delay each worker waits after finishing one SST before taking
481    /// the next, capping I/O pressure on a production box during a scrub.
482    /// `None` (default) runs at full speed.
483    pub throttle: Option<core::time::Duration>,
484}
485
486impl Default for VerifyOptions {
487    fn default() -> Self {
488        Self {
489            parallelism: 1,
490            throttle: None,
491        }
492    }
493}
494
495impl VerifyOptions {
496    /// Sets the number of SSTs to scan concurrently.
497    #[must_use]
498    pub const fn parallelism(mut self, workers: usize) -> Self {
499        self.parallelism = workers;
500        self
501    }
502
503    /// Sets the per-worker inter-SST throttle delay.
504    #[must_use]
505    pub const fn throttle(mut self, delay: core::time::Duration) -> Self {
506        self.throttle = Some(delay);
507        self
508    }
509}
510
511/// Merges a per-SST partial report into an accumulator.
512fn merge_report(dst: &mut BlockVerifyReport, src: BlockVerifyReport) {
513    dst.sst_files_scanned += src.sst_files_scanned;
514    dst.blocks_scanned += src.blocks_scanned;
515    dst.errors.extend(src.errors);
516    dst.warnings.extend(src.warnings);
517}
518
519/// Scans one SST and returns a partial report (`sst_files_scanned == 1`).
520///
521/// Self-contained per table: opens the file through the table's own `Fs`
522/// handle, sizes encryption overhead and ECC params from the table's
523/// descriptor, so it can run on its own worker thread without shared state.
524fn scan_one_table(table: &crate::table::Table) -> BlockVerifyReport {
525    let mut report = BlockVerifyReport {
526        sst_files_scanned: 1,
527        ..BlockVerifyReport::default()
528    };
529    let path: &Path = &table.path;
530    let table_id = table.id();
531
532    // Tables whose ECC descriptor decodes to a scheme this build can't apply
533    // can't have their SST-block parity trailers sized (the length isn't
534    // derivable without the scheme), so those sections are skipped with a
535    // warning rather than mis-walked. The self-describing `meta` / `meta_mid`
536    // sections are still walked (parity sized from their own `block_flags`),
537    // so corruption there is NOT downgraded. The per-block read path still
538    // serves the data (framed by data_length, checksum-verified), hence a
539    // warning, not an error.
540    let ecc_unrecognized = table.metadata.ecc_unrecognized;
541    if ecc_unrecognized {
542        log::warn!(
543            "table {table_id} at {}: unrecognized ECC scheme — skipping the \
544             ECC-dependent block sections; recompact to re-stamp with a \
545             supported scheme",
546            path.display(),
547        );
548        report.warnings.push(BlockVerifyWarning::UnrecognizedEcc {
549            table_id,
550            path: path.to_path_buf(),
551        });
552    }
553
554    // Use each Table's own `Fs` handle (StdFs, MemFs, IoUring, …).
555    // Encryption overhead is per-table (different keys / AEAD suites can attach
556    // to different SSTs), so feed each table's `max_overhead()` separately.
557    let max_enc_overhead = table.encryption.as_ref().map_or(0u32, |e| e.max_overhead());
558    match scan_sst_blocks(
559        &*table.fs,
560        path,
561        table_id,
562        max_enc_overhead,
563        table.metadata.ecc_params,
564        ecc_unrecognized,
565    ) {
566        Ok(per_file) => {
567            report.blocks_scanned += per_file.blocks_scanned;
568            report.errors.extend(per_file.errors);
569        }
570        Err(error) => {
571            report.errors.push(BlockVerifyError::SstFileUnreadable {
572                table_id,
573                path: path.to_path_buf(),
574                error,
575            });
576        }
577    }
578    report
579}
580
581/// Walks every block in every SST referenced by the tree's current
582/// version and verifies each block's XXH3 checksum.
583///
584/// Pipeline per SST:
585///
586/// 1. Open the file and parse the SFA trailer to obtain the TOC.
587/// 2. For each TOC section, skip if its name is in `RAW_FORMAT_SECTIONS`
588///    (those payloads are not `Header`-prefixed and are covered by the
589///    SFA-trailer checksum). Otherwise seek to the section's start
590///    offset and walk it as a contiguous block region in
591///    `[start, start + length)`.
592/// 3. Inside each block region, decode each block's `Header` (which
593///    validates the header's own XXH3), read the data segment, and
594///    compare a fresh XXH3 over the data against `header.checksum`.
595///    Advance by `Header::header_len(block_type) + data_length` until the
596///    section end. A corrupt header inside a section stops that
597///    section's walk and is reported; the next section is still walked.
598///
599/// This is the read-side scrub primitive: it catches the same bit-rot
600/// signal a live read would surface, ahead of time, with per-block
601/// `(file, offset)` granularity. Decompression and decryption errors
602/// are out of scope here — those depend on per-level/per-block context
603/// (compression policy, encryption key, dictionary) that the scrub
604/// path does not need to reach checksum-level corruption.
605#[must_use]
606pub fn verify_block_checksums(tree: &impl crate::AbstractTree) -> BlockVerifyReport {
607    verify_block_checksums_with(tree, &VerifyOptions::default())
608}
609
610/// Like [`verify_block_checksums`] but with configurable parallelism and
611/// throttle (see [`VerifyOptions`]).
612///
613/// With `parallelism == 1` (default) SSTs are scanned sequentially in table
614/// order. With `> 1`, up to that many worker threads pull SSTs from a shared
615/// cursor and scan them concurrently (each scan is independent — its own file
616/// handle through the table's `Fs`), then their partial reports are merged.
617/// Parallel runs report the same findings as a sequential run; only the order
618/// of `errors` / `warnings` may differ. `throttle` makes each worker pause
619/// between SSTs so a scrub does not saturate production I/O.
620#[must_use]
621pub fn verify_block_checksums_with(
622    tree: &impl crate::AbstractTree,
623    options: &VerifyOptions,
624) -> BlockVerifyReport {
625    let version = tree.current_version();
626    let tables: Vec<crate::table::Table> = version.iter_tables().cloned().collect();
627
628    // `parallelism` + `throttle` only drive the std thread-fan-out + sleep below.
629    #[cfg(not(feature = "std"))]
630    let _ = options;
631
632    // Parallel scan (std only): up to `parallelism` worker threads pull SSTs from
633    // a shared cursor and scan them concurrently. A `no_std` build has no
634    // threads, so it always takes the serial path below.
635    #[cfg(feature = "std")]
636    {
637        let workers = options.parallelism.max(1).min(tables.len().max(1));
638        if workers > 1 {
639            let cursor = core::sync::atomic::AtomicUsize::new(0);
640            let partials = std::thread::scope(|scope| {
641                let handles: Vec<_> = (0..workers)
642                    .map(|_| {
643                        scope.spawn(|| {
644                            let mut local = BlockVerifyReport::default();
645                            let mut idx =
646                                cursor.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
647                            while let Some(table) = tables.get(idx) {
648                                merge_report(&mut local, scan_one_table(table));
649                                // Claim the next SST first; only pause if this
650                                // worker actually has another table to scan.
651                                idx = cursor.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
652                                if tables.get(idx).is_some()
653                                    && let Some(delay) = options.throttle
654                                {
655                                    std::thread::sleep(delay);
656                                }
657                            }
658                            local
659                        })
660                    })
661                    .collect();
662                handles
663                    .into_iter()
664                    .map(|handle| match handle.join() {
665                        Ok(local) => local,
666                        // A scrub worker panicking is a bug, not a corruption
667                        // finding — propagate rather than drop its SSTs.
668                        Err(payload) => std::panic::resume_unwind(payload),
669                    })
670                    .collect::<Vec<_>>()
671            });
672
673            let mut report = BlockVerifyReport::default();
674            for partial in partials {
675                merge_report(&mut report, partial);
676            }
677            return report;
678        }
679    }
680
681    // Serial scan: every `no_std` build, and `std` with `parallelism <= 1`. Scans
682    // SSTs in deterministic table order, each over its own `Fs` handle.
683    let mut report = BlockVerifyReport::default();
684    for (idx, table) in tables.iter().enumerate() {
685        merge_report(&mut report, scan_one_table(table));
686        // Inter-SST throttle (std only — `no_std` has no sleep primitive). Skip
687        // after the final table so a finished scrub returns promptly instead of
688        // waiting one extra throttle interval.
689        #[cfg(feature = "std")]
690        if idx + 1 < tables.len()
691            && let Some(delay) = options.throttle
692        {
693            std::thread::sleep(delay);
694        }
695        #[cfg(not(feature = "std"))]
696        let _ = idx;
697    }
698    report
699}
700
701/// Verifies the per-KV checksum footer of every data block across all SST
702/// tables in the tree (the paranoid / scrub integrity path).
703///
704/// Footer presence is a per-SST property read from each table's descriptor
705/// (`ParsedMeta::kv_checksum_algo`), not a per-block header flag — SST data
706/// blocks omit the `block_flags` byte. A table whose descriptor reports no
707/// footers is skipped wholesale.
708///
709/// This is stronger than [`verify_block_checksums`]: for footer-bearing
710/// tables it decodes each block and recomputes every entry's logical-content
711/// digest, localising which entry diverged rather than only flagging the
712/// block. Tables written without per-KV footers carry no per-KV digests and
713/// are covered by [`verify_block_checksums`] only.
714///
715/// Returns the first error encountered (`ChecksumMismatch` on a per-entry
716/// digest disagreement, or an I/O / decode error). `Ok(())` means every
717/// per-KV-checked table verified. A tree written entirely with
718/// `kv_checksums = Off` has no footer-bearing tables, so this is a no-op
719/// returning `Ok(())`.
720///
721/// # Errors
722///
723/// Propagates [`crate::Error::ChecksumMismatch`] on a detected per-entry
724/// corruption, or any I/O / decode error from loading a block.
725pub fn verify_kv_checksums(tree: &impl crate::AbstractTree) -> crate::Result<()> {
726    let version = tree.current_version();
727    for table in version.iter_tables() {
728        table.verify_kv_checksums()?;
729    }
730    Ok(())
731}
732
733/// Out-of-band variant of [`verify_block_checksums`].
734///
735/// Walks one SST file directly from a filesystem path, without
736/// needing a live `Tree` or the version manifest. Intended for
737/// offline diagnostic tools (`tools/sst-dump verify`, `repair_db`,
738/// forensics CLIs) that operate on a single file in isolation — for
739/// example when the manifest itself is corrupt or the surrounding
740/// tree directory has been moved.
741///
742/// Uses [`StdFs`](crate::fs::StdFs) (the only `Fs` backend that
743/// makes sense for an out-of-band tool — `MemFs` / `IoUring` trees
744/// never produce files at real filesystem paths) and stamps
745/// `table_id = 0` in error reports. The caller's downstream
746/// filtering / logging should refer to the file by path, not by
747/// table id.
748///
749/// AEAD overhead is conservatively assumed to be zero: out-of-band
750/// tools don't carry the per-table encryption provider that would let
751/// them recover the real `max_overhead()`. Encrypted SSTs near the
752/// 256 MiB plaintext ceiling may therefore false-flag as
753/// [`BlockVerifyError::HeaderCorrupted`]. In practice block sizes are
754/// typically a few KiB, so this only matters on artificially-
755/// constructed huge blocks; encrypted-aware verification should go
756/// through [`verify_block_checksums`] on a live tree.
757///
758/// The returned [`BlockVerifyReport`] has `sst_files_scanned == 1`
759/// (always) plus per-block errors collected during the walk.
760#[cfg(feature = "std")]
761#[must_use]
762pub fn verify_sst_file(path: &std::path::Path) -> BlockVerifyReport {
763    verify_sst_file_with_fs(&crate::fs::StdFs, path)
764}
765
766/// As [`verify_sst_file`], but reads `path` through the given filesystem.
767///
768/// `pub(crate)` so `repair` can block-verify an SST on the tree's own `Fs`
769/// before deciding whether to salvage it, rather than assuming `StdFs`.
770#[cfg(feature = "std")]
771pub(crate) fn verify_sst_file_with_fs(
772    fs: &dyn crate::fs::Fs,
773    path: &std::path::Path,
774) -> BlockVerifyReport {
775    let mut report = BlockVerifyReport {
776        sst_files_scanned: 1,
777        ..BlockVerifyReport::default()
778    };
779
780    // SST blocks omit the block_flags byte, so the parity-trailer presence and
781    // shard layout the walk must skip come from the per-SST ECC descriptor —
782    // read it from the meta block. If it can't be determined (corrupt meta, or
783    // an encrypted SST with no key out-of-band), DO NOT assume disabled:
784    // walking an ECC-bearing SST without skipping parity trailers mis-aligns
785    // the scan and reports spurious corruption. Surface the indeterminacy and
786    // skip the walk.
787    let mut ecc_unrecognized = false;
788    let ecc = match read_ecc_params_out_of_band(fs, path) {
789        Ok(Some(ScrubEcc::Off)) => None,
790        Ok(Some(ScrubEcc::Scheme(params))) => Some(params),
791        // The descriptor decodes to a scheme this build can't apply: the
792        // SST-block trailer length isn't derivable, so those sections are
793        // skipped during the walk. The self-describing `meta` / `meta_mid`
794        // sections still size parity from `block_flags`, so corruption there
795        // is NOT downgraded. Warn + continue (don't drop the whole scrub).
796        Ok(Some(ScrubEcc::Unrecognized)) => {
797            log::warn!(
798                "{}: unrecognized ECC scheme — skipping the ECC-dependent block \
799                 sections; recompact to re-stamp with a supported scheme",
800                path.display(),
801            );
802            report.warnings.push(BlockVerifyWarning::UnrecognizedEcc {
803                table_id: 0,
804                path: path.to_path_buf(),
805            });
806            ecc_unrecognized = true;
807            None
808        }
809        // File + trailer readable, but neither meta block decodes (corrupt
810        // meta, or an encrypted SST with no key out-of-band). The ECC scheme is
811        // undeterminable; skip the walk rather than mis-walk an ECC-bearing SST.
812        Ok(None) => {
813            report.errors.push(BlockVerifyError::SstFileUnreadable {
814                table_id: 0,
815                path: path.to_path_buf(),
816                error: io::Error::new(
817                    io::ErrorKind::InvalidData,
818                    "could not decode the SST meta block to determine the ECC scheme \
819                     (corrupt meta, or an encrypted SST with no key out-of-band); \
820                     skipping the block walk — use verify_block_checksums on a live \
821                     tree for ECC-aware verification",
822                ),
823            });
824            return report;
825        }
826        // Real file-open / SFA-trailer failure — preserve the underlying error
827        // rather than collapsing it into the undeterminable message above.
828        Err(error) => {
829            report.errors.push(BlockVerifyError::SstFileUnreadable {
830                table_id: 0,
831                path: path.to_path_buf(),
832                error: error.into(),
833            });
834            return report;
835        }
836    };
837
838    match scan_sst_blocks(fs, path, 0, 0, ecc, ecc_unrecognized) {
839        Ok(per_file) => {
840            report.blocks_scanned = per_file.blocks_scanned;
841            report.errors = per_file.errors;
842        }
843        Err(error) => {
844            report.errors.push(BlockVerifyError::SstFileUnreadable {
845                table_id: 0,
846                path: path.to_path_buf(),
847                error,
848            });
849        }
850    }
851
852    report
853}
854
855/// Per-SST ECC state as seen by the out-of-band scrub.
856#[cfg(feature = "std")]
857enum ScrubEcc {
858    /// ECC off — no parity trailer to skip.
859    Off,
860    /// A recognized + applicable scheme — size + verify the trailer with it.
861    Scheme(crate::table::block::EccParams),
862    /// An ECC scheme this build can't apply (unimplemented / unknown /
863    /// non-canonical). The trailer length isn't derivable, so the walk must
864    /// be skipped with a warning.
865    Unrecognized,
866}
867
868/// Best-effort read of the per-SST ECC state from an SST file's meta
869/// descriptor, for the out-of-band scrub (no live `Table` to consult).
870///
871/// Returns `Ok(Some(state))` when a meta block decodes. The authoritative
872/// tail `meta` section is tried first; if its block is corrupt / undecodable
873/// the early `meta_mid` mirror (which the writer emits so one bad meta block
874/// can't lose the descriptor) is tried next. The `Ok(None)` outer means the
875/// file and SFA trailer are readable but NEITHER meta block decodes (both
876/// corrupt, or an encrypted SST whose key the out-of-band tool doesn't have) —
877/// the scheme is genuinely UNDETERMINABLE. Returns `Err` when the file can't be
878/// opened or its SFA trailer can't be parsed.
879///
880/// The caller MUST NOT treat `Ok(None)` as "ECC disabled": walking an
881/// ECC-bearing SST without skipping the parity trailers mis-aligns the block
882/// scan and reports spurious corruption, so the caller skips the walk and
883/// surfaces the indeterminacy instead.
884#[cfg(feature = "std")]
885fn read_ecc_params_out_of_band(
886    fs: &dyn crate::fs::Fs,
887    path: &std::path::Path,
888) -> std::io::Result<Option<ScrubEcc>> {
889    let mut probe = fs.open(path, &crate::fs::FsOpenOptions::new().read(true))?;
890    let sfa_reader = crate::sfa::Reader::from_reader(&mut probe)
891        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
892    let toc = sfa_reader.toc();
893    // Tail `meta` is authoritative; `meta_mid` is the early mirror written so a
894    // single corrupt meta block doesn't lose the per-SST descriptor.
895    for name in [b"meta".as_slice(), b"meta_mid".as_slice()] {
896        let Some((pos, len)) = toc.section(name).map(|e| (e.pos(), e.len())) else {
897            continue;
898        };
899        let Ok(size) = u32::try_from(len) else {
900            continue;
901        };
902        let handle = crate::table::BlockHandle::new(crate::table::BlockOffset(pos), size);
903        // table_id is moot here: this scrub path reads unencrypted meta
904        // (encryption = None), so the AAD identity is unused.
905        if let Ok(meta) =
906            crate::table::meta::ParsedMeta::load_with_handle(probe.as_ref(), &handle, None, None)
907        {
908            let state = if meta.ecc_unrecognized {
909                ScrubEcc::Unrecognized
910            } else if let Some(params) = meta.ecc_params {
911                ScrubEcc::Scheme(params)
912            } else {
913                ScrubEcc::Off
914            };
915            return Ok(Some(state));
916        }
917    }
918    Ok(None)
919}
920
921struct PerFileScan {
922    blocks_scanned: usize,
923    errors: Vec<BlockVerifyError>,
924}
925
926/// Walks every block of one SST. Returns `Err` only on file-open or
927/// SFA trailer-parse failure (those make the whole walk impossible).
928/// Per-block AND per-section errors — corrupt block headers, mismatched
929/// data checksums, post-header data-read failures, and TOC sections we
930/// cannot seek to — all land inside `PerFileScan::errors` and never
931/// cause an early return; the walker proceeds to the next section so
932/// one bad TOC entry cannot mask corruption in the others.
933fn scan_sst_blocks(
934    fs: &dyn crate::fs::Fs,
935    path: &Path,
936    table_id: TableId,
937    max_enc_overhead: u32,
938    ecc: Option<crate::table::block::EccParams>,
939    ecc_unrecognized: bool,
940) -> io::Result<PerFileScan> {
941    use io::BufReader;
942    #[cfg(not(feature = "std"))]
943    use io::{Seek, SeekFrom};
944    #[cfg(feature = "std")]
945    use std::io::{Seek, SeekFrom};
946
947    let mut file = fs.open(path, &crate::fs::FsOpenOptions::new().read(true))?;
948
949    // The SFA trailer + TOC live at the tail of the file.
950    // crate::sfa::Reader::from_reader leaves the cursor at an undefined
951    // offset; each per-section walk below explicitly seeks to the
952    // section's `pos()` first so the unknown post-trailer position
953    // doesn't matter.
954    // Capture the sfa error's Debug form in the message. crate::io::Error is
955    // message-only (no source chain) so it stays portable on no_std; the `{:?}`
956    // repr keeps the original variant (InvalidHeader / InvalidVersion /
957    // ChecksumMismatch / underlying Io) visible for downstream diagnostics, just
958    // as a string rather than a downcastable `Error::source()`.
959    let sfa_reader = crate::sfa::Reader::from_reader(&mut file)
960        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, alloc::format!("{e:?}")))?;
961    let toc = sfa_reader.toc();
962    // SFA TOC layout for an SST. The writer opens the file and
963    // immediately calls `crate::sfa::Writer::start("data")`, so the first
964    // TOC entry is named (not unnamed) and covers the data-block
965    // region. Other named sections, in writer order:
966    //
967    //   - `data`              : block-format (data blocks)
968    //   - `index`             : block-format (partitioned index leaf
969    //                           blocks; absent for full-index tables,
970    //                           emitted before `tli` by
971    //                           `PartitionedIndexWriter::finish`)
972    //   - `tli`               : block-format (top-level index, both
973    //                           full and partitioned variants)
974    //   - `filter`            : block-format (filter blocks)
975    //   - `filter_tli`        : block-format (top-level filter for
976    //                           partitioned filters; absent for full
977    //                           filters, emitted after `filter` by
978    //                           `PartitionedFilterWriter::finish`)
979    //   - `range_tombstones`  : block-format (optional)
980    //   - `meta_mid`          : block-format (early mirror of `meta`)
981    //   - `linked_blob_files` : RAW length-prefixed list of u64s
982    //   - `table_version`     : RAW single byte
983    //   - `meta_separator`    : RAW 4 KiB zero padding
984    //   - `tli_tail`          : block-format (tail mirror of `tli`)
985    //   - `meta`              : block-format (metadata, authoritative)
986    //
987    // Block-format sections are walked block-by-block (each block
988    // prefixed with the standard `Header`). Raw-format sections are
989    // skipped — their integrity is covered by the SFA-trailer
990    // checksum verified at table-open time. New section names default
991    // to "walk" (must be added to `RAW_FORMAT_SECTIONS` if they're
992    // raw), so a forgotten-to-handle section fails loud rather than
993    // silently passing a corruption.
994
995    let mut reader = BufReader::with_capacity(64 * 1024, file);
996    let mut blocks_scanned: usize = 0;
997    let mut errors: Vec<BlockVerifyError> = Vec::new();
998    // One reusable data buffer across the whole SST — sized up via
999    // `resize` per block instead of a fresh `vec![0u8; N]` allocation
1000    // each iteration. On large trees this turns thousands of malloc
1001    // calls into a single growing allocation that settles at the
1002    // largest block size seen.
1003    let mut data_buf: Vec<u8> = Vec::new();
1004
1005    for entry in toc.iter() {
1006        if RAW_FORMAT_SECTIONS.contains(&entry.name()) {
1007            continue;
1008        }
1009        let start = entry.pos();
1010        // `checked_add` (not `saturating_add`) so a corrupted or
1011        // forged TOC length cannot silently collapse to `u64::MAX`
1012        // and let the walk treat the whole address space as one
1013        // section. On overflow we surface the section as a
1014        // file-level `TocCorrupted` and skip walking it — the other
1015        // (still-walkable) sections of the same SST are honoured.
1016        // `TocCorrupted` rather than `HeaderCorrupted` because the
1017        // failure is at the section-catalogue layer, not inside any
1018        // individual block.
1019        let Some(end) = start.checked_add(entry.len()) else {
1020            errors.push(BlockVerifyError::TocCorrupted {
1021                table_id,
1022                path: path.to_path_buf(),
1023                section_name: entry.name().to_vec(),
1024                section_offset: start,
1025                reason: format!(
1026                    "section length {} overflows u64 when added to start offset {start}",
1027                    entry.len(),
1028                ),
1029            });
1030            continue;
1031        };
1032        // Mid-walk seek failure: don't propagate as a file-level Err
1033        // (that would discard everything already scanned and report
1034        // the whole SST as unreadable, which contradicts the
1035        // function's contract). Surface as a `TocCorrupted` for this
1036        // section and skip walking it; subsequent sections still run.
1037        // Again `TocCorrupted` (not `HeaderCorrupted`): we never even
1038        // reached a block to decode its header.
1039        if let Err(e) = reader.seek(SeekFrom::Start(start)) {
1040            errors.push(BlockVerifyError::TocCorrupted {
1041                table_id,
1042                path: path.to_path_buf(),
1043                section_name: entry.name().to_vec(),
1044                section_offset: start,
1045                reason: format!("seek to section start failed: {e}"),
1046            });
1047            continue;
1048        }
1049        let mut ctx = WalkCtx {
1050            reader: &mut reader,
1051            table_id,
1052            path,
1053            data_buf: &mut data_buf,
1054            blocks_scanned: &mut blocks_scanned,
1055            errors: &mut errors,
1056            max_data_length: block_data_length_cap(max_enc_overhead),
1057            ecc,
1058            ecc_unrecognized,
1059        };
1060        walk_block_region(&mut ctx, start, end);
1061    }
1062
1063    Ok(PerFileScan {
1064        blocks_scanned,
1065        errors,
1066    })
1067}
1068
1069/// SFA TOC section names whose payload is NOT a sequence of `Block`s
1070/// (i.e. NOT prefixed with the standard `Header`). The scrub skips
1071/// these sections — their integrity is covered by the SFA-trailer
1072/// checksum verified at table-open time. Every other section
1073/// (`data` / `tli` / `tli_tail` / `index` / `filter_tli` / `filter` /
1074/// `range_tombstones` / `meta` / `meta_mid`) is a `Header`-prefixed
1075/// block run and gets walked. See `scan_sst_blocks` for the full
1076/// section catalogue and the writer-side source of truth.
1077///
1078/// `meta_separator` is the 4 KiB zero-padding section the writer
1079/// emits between the MID and TAIL meta blocks so a single bad
1080/// filesystem sector cannot take out both copies — it carries no
1081/// blocks and must be skipped here, otherwise the walker would try
1082/// to decode zeros as a `Header` and report a spurious
1083/// `HeaderCorrupted` on every clean SST.
1084const RAW_FORMAT_SECTIONS: &[&[u8]] = &[b"linked_blob_files", b"table_version", b"meta_separator"];
1085
1086/// Plaintext upper bound on a single block's on-disk data segment
1087/// length, mirroring `table::block::MAX_DECOMPRESSION_SIZE` (256 MiB).
1088/// Encrypted blocks legitimately exceed this by up to the AEAD
1089/// provider's `max_overhead()`; see `block_data_length_cap` for the
1090/// effective per-walk cap that adds that overhead in.
1091const MAX_BLOCK_DATA_LENGTH: u64 = 256 * 1024 * 1024;
1092
1093/// Effective `data_length` cap for one scan, mirroring the size
1094/// validation in `Block::from_file`: plaintext cap + the table's AEAD
1095/// `max_overhead()` (0 when encryption is disabled). A value above
1096/// this is treated as `HeaderCorrupted` regardless of TOC bounds,
1097/// defending against DoS-by-allocation if both the block header and
1098/// the enclosing TOC entry are simultaneously corrupted / forged.
1099fn block_data_length_cap(max_enc_overhead: u32) -> u64 {
1100    MAX_BLOCK_DATA_LENGTH + u64::from(max_enc_overhead)
1101}
1102
1103/// Walks the contiguous block range `[start_offset, end_offset)`,
1104/// decoding each block's header (which validates the header's own
1105/// XXH3) and then re-hashing the data segment against
1106/// `header.checksum`. Stops at the first un-parseable header inside
1107/// the range — that block is reported as `HeaderCorrupted` and the
1108/// rest of the range is skipped because subsequent offsets become
1109/// unrecoverable without a valid length field.
1110/// Mutable cursor + scratch state threaded through `walk_block_region`.
1111/// Bundles the per-walk accumulators (file cursor, reused data
1112/// buffer, counters, error sink) into one borrow so the function
1113/// signature stays under clippy's argument-count cap.
1114struct WalkCtx<'a> {
1115    reader: &'a mut io::BufReader<Box<dyn crate::fs::FsFile>>,
1116    table_id: TableId,
1117    path: &'a Path,
1118    data_buf: &'a mut Vec<u8>,
1119    blocks_scanned: &'a mut usize,
1120    errors: &'a mut Vec<BlockVerifyError>,
1121    /// Effective `data_length` cap (plaintext limit + AEAD overhead).
1122    /// Matches the bound `Block::from_file` applies on the read path,
1123    /// so the scrub does not false-flag legitimate encrypted blocks
1124    /// near the 256 MiB plaintext limit as `HeaderCorrupted`.
1125    max_data_length: u64,
1126    /// Per-SST Page-ECC shard layout. SST blocks (`Data` / `Index` / `Filter` /
1127    /// `RangeTombstone`) omit the `block_flags` byte, so their parity-trailer
1128    /// presence AND shard layout are NOT derivable from the header — both come
1129    /// from this table-wide descriptor scheme. When `Some`, each such block
1130    /// carries `expected_parity_len(data_length, scheme)` parity bytes after
1131    /// the payload that the walk must skip (sized by the scheme) to stay
1132    /// aligned. Meta / Manifest / `ManifestFooter` blocks keep the byte and
1133    /// self-describe parity via their `ECC_PARITY` bit, sized with the fixed
1134    /// RS(4,2) layout the writer uses for them, regardless of this field.
1135    ecc: Option<crate::table::block::EccParams>,
1136    /// `true` when the table's ECC descriptor decodes to a scheme this build
1137    /// can't apply. The trailer length of its SST blocks (`Data` / `Index` /
1138    /// `Filter` / `RangeTombstone`) isn't derivable, so those sections are
1139    /// skipped (the caller warns once). Self-describing sections (`meta` /
1140    /// `meta_mid`) still size parity from `block_flags` and ARE walked.
1141    ecc_unrecognized: bool,
1142}
1143
1144fn walk_block_region(ctx: &mut WalkCtx<'_>, start_offset: u64, end_offset: u64) {
1145    #[cfg(not(feature = "std"))]
1146    use io::Read;
1147    #[cfg(feature = "std")]
1148    use std::io::Read;
1149
1150    let mut offset = start_offset;
1151
1152    while offset < end_offset {
1153        // Confine reads to the declared section before touching
1154        // Header::decode_from. Without this pre-check, a TOC entry
1155        // whose `len` puts `end_offset` inside the first block's
1156        // header region would let `decode_from` consume up to
1157        // `header_len` bytes — reading past the section boundary
1158        // into the next section's payload, where random bytes might
1159        // happen to parse as a "valid" header and silently corrupt
1160        // the walk. Treat the under-sized tail as `HeaderCorrupted`
1161        // and stop this section's walk; subsequent sections still
1162        // run because `walk_block_region` returns rather than
1163        // bubbling the error up.
1164        let remaining_in_section = end_offset - offset;
1165        // Lower bound: the header is at least MIN_LEN (the exact length, with
1166        // or without the block_flags byte, is known only after decode).
1167        if remaining_in_section < Header::MIN_LEN as u64 {
1168            ctx.errors.push(BlockVerifyError::HeaderCorrupted {
1169                table_id: ctx.table_id,
1170                path: ctx.path.to_path_buf(),
1171                offset,
1172                reason: format!(
1173                    "section has only {remaining_in_section} bytes left at this offset, \
1174                     less than Header::MIN_LEN = {}",
1175                    Header::MIN_LEN,
1176                ),
1177            });
1178            return;
1179        }
1180        let header = match Header::decode_from(ctx.reader) {
1181            Ok(h) => h,
1182            Err(e) => {
1183                ctx.errors.push(BlockVerifyError::HeaderCorrupted {
1184                    table_id: ctx.table_id,
1185                    path: ctx.path.to_path_buf(),
1186                    offset,
1187                    reason: format!("{e:?}"),
1188                });
1189                return;
1190            }
1191        };
1192
1193        // Unrecognized-ECC table: SST blocks (no `block_flags` byte) carry a
1194        // parity trailer whose length we can't derive without the descriptor
1195        // scheme, so this section can't be walked — stop here (the caller has
1196        // already warned). Self-describing blocks (`block_flags` present) size
1197        // parity from their `ECC_PARITY` bit, so those sections still walk.
1198        // Checked before the scanned-count increment so skipped blocks aren't
1199        // tallied. Sections are homogeneous in block type, so the first block
1200        // decides the whole section.
1201        if ctx.ecc_unrecognized && !Header::has_block_flags(header.block_type) {
1202            return;
1203        }
1204
1205        // Count the block as "header-read" immediately on successful
1206        // decode — matches the BlockVerifyReport.blocks_scanned docs
1207        // ("includes blocks where the data checksum subsequently
1208        // failed"). Without this early increment, blocks that emit
1209        // DataReadError / data-length-bounds HeaderCorrupted would
1210        // be silently uncounted, contradicting the documented
1211        // semantics.
1212        // Block counter; a tree cannot hold 2^64 blocks, so a plain add cannot
1213        // overflow.
1214        *ctx.blocks_scanned += 1;
1215
1216        // Actual header length for this block (variable: SST blocks omit the
1217        // block_flags byte). Used for the section-bounds math and the offset
1218        // advance so the walk tracks what `decode_from` actually consumed.
1219        let header_len = Header::header_len(header.block_type) as u64;
1220
1221        // Page-ECC parity trailer that follows the payload on disk. Presence
1222        // depends on the block type: Meta / Manifest / ManifestFooter keep the
1223        // block_flags byte and self-describe via the ECC_PARITY bit; SST blocks
1224        // omit the byte, so parity presence is the per-SST `page_ecc` flag. The
1225        // trailer length is derived from data_length (never stored). The walk
1226        // must skip these bytes — otherwise the next iteration would read parity
1227        // as the following block's header and mis-align the whole section.
1228        // Parity-trailer scheme to skip for this block. Self-describing blocks
1229        // (Meta / Manifest / `ManifestFooter`) carry the `block_flags` byte and
1230        // are written with the fixed RS(4,2) layout; SST blocks size their
1231        // trailer from the per-SST descriptor scheme threaded in via `ctx.ecc`.
1232        let block_ecc = if Header::has_block_flags(header.block_type) {
1233            (header.block_flags & crate::table::block::header::block_flags::ECC_PARITY != 0)
1234                .then_some(crate::table::block::EccParams::RS_4_2)
1235        } else {
1236            ctx.ecc
1237        };
1238        let parity_len = block_ecc.map_or(0, |scheme| {
1239            u64::from(crate::table::block::expected_parity_len(
1240                header.data_length,
1241                scheme,
1242            ))
1243        });
1244
1245        // Validate data_length against TWO bounds before allocating
1246        // / reading:
1247        //
1248        // 1. Hard cap (MAX_BLOCK_DATA_LENGTH = 256 MiB, mirroring
1249        //    table::block::MAX_DECOMPRESSION_SIZE). Catches the case
1250        //    where BOTH the block header AND the enclosing TOC entry
1251        //    are simultaneously corrupted/forged so that `remaining`
1252        //    becomes arbitrarily large. Without this, a forged TOC
1253        //    entry with len=u64::MAX could let the section-bounds
1254        //    check pass and trigger a multi-GB Vec::resize.
1255        //
1256        // 2. Remaining bytes in this TOC section. Header::decode_from
1257        //    already verified the header's own XXH3, so a data_length
1258        //    that overruns the section bounds is either bit-flip
1259        //    corruption that happened to keep the header digest
1260        //    valid (rare but possible), or fuzz input. Honouring it
1261        //    would read past `end_offset` into the next section.
1262        //
1263        // Both bounds are reported as HeaderCorrupted — the header
1264        // was technically parseable but its length field is invalid.
1265        let data_length_u64 = u64::from(header.data_length);
1266        if data_length_u64 > ctx.max_data_length {
1267            ctx.errors.push(BlockVerifyError::HeaderCorrupted {
1268                table_id: ctx.table_id,
1269                path: ctx.path.to_path_buf(),
1270                offset,
1271                reason: format!(
1272                    "header data_length {data_length_u64} exceeds hard cap {}",
1273                    ctx.max_data_length,
1274                ),
1275            });
1276            return;
1277        }
1278        // A header whose own bytes cross the section boundary is corrupt and must
1279        // be rejected here: clamping `remaining` to zero would let a header with a
1280        // zero-length declared payload slip past the `>` check below even though
1281        // the header itself ran past the section end. Reuse the plain
1282        // `remaining_in_section` (the loop invariant `offset < end_offset` keeps
1283        // it non-negative) rather than recomputing it.
1284        if header_len > remaining_in_section {
1285            ctx.errors.push(BlockVerifyError::HeaderCorrupted {
1286                table_id: ctx.table_id,
1287                path: ctx.path.to_path_buf(),
1288                offset,
1289                reason: format!(
1290                    "block header ({header_len} bytes) extends past the section end \
1291                     ({remaining_in_section} bytes remain)",
1292                ),
1293            });
1294            return;
1295        }
1296        let remaining = remaining_in_section - header_len;
1297        // `data_length_u64` is already capped at `ctx.max_data_length` (checked
1298        // above) and `parity_len` is derived from it, so the sum is bounded well
1299        // within u64 — a plain add cannot overflow.
1300        let on_disk_payload = data_length_u64 + parity_len;
1301        if on_disk_payload > remaining {
1302            ctx.errors.push(BlockVerifyError::HeaderCorrupted {
1303                table_id: ctx.table_id,
1304                path: ctx.path.to_path_buf(),
1305                offset,
1306                reason: format!(
1307                    "header data_length {data_length_u64} + parity {parity_len} exceeds \
1308                     remaining section bytes {remaining}",
1309                ),
1310            });
1311            return;
1312        }
1313
1314        let data_length = header.data_length as usize;
1315        ctx.data_buf.resize(data_length, 0);
1316        // `as_mut_slice` returns the whole `Vec` (exactly `data_length`
1317        // bytes after the resize above) — full-slice access dodges
1318        // the crate-wide `#[deny(clippy::indexing_slicing)]`.
1319        if let Err(e) = ctx.reader.read_exact(ctx.data_buf.as_mut_slice()) {
1320            // Header was clean (XXH3 matched) but the data segment
1321            // that should follow it could not be read in full —
1322            // truncated SST, unexpected EOF, transient I/O.
1323            // Semantically distinct from HeaderCorrupted; reported
1324            // under its own variant so callers pattern-matching on
1325            // the error kind aren't surprised to find post-header
1326            // I/O failures bucketed with header-parse failures.
1327            ctx.errors.push(BlockVerifyError::DataReadError {
1328                table_id: ctx.table_id,
1329                path: ctx.path.to_path_buf(),
1330                offset,
1331                data_length: header.data_length,
1332                error: e.into(),
1333            });
1334            return;
1335        }
1336
1337        let computed = Checksum::from_raw(crate::hash::hash128(ctx.data_buf));
1338        if computed != header.checksum {
1339            ctx.errors.push(BlockVerifyError::DataCorrupted {
1340                table_id: ctx.table_id,
1341                path: ctx.path.to_path_buf(),
1342                offset,
1343                data_length: header.data_length,
1344                expected: header.checksum,
1345                got: computed,
1346            });
1347        }
1348
1349        // Consume the parity trailer (if any) so the reader cursor lands on
1350        // the next block's header. The payload checksum above already covers
1351        // correctness; parity is only consulted for ECC recovery on the live
1352        // read path, so the scrub discards it — but it MUST still skip exactly
1353        // `parity_len` bytes or the next iteration mis-reads parity as a header.
1354        if parity_len > 0 {
1355            // Discard the parity trailer so the cursor lands on the next block's
1356            // header. `crate::io` has no `copy`/`sink`, so drain exactly
1357            // `parity_len` bytes through a small scratch buffer.
1358            let mut scratch = [0u8; 512];
1359            let mut remaining = parity_len;
1360            // A short read (EOF before `parity_len`) and an underlying read error
1361            // are the same outcome for the scrub: the trailer cannot be skipped,
1362            // so collapse both into one `Err` and report a single DataReadError.
1363            let drain: io::Result<()> = loop {
1364                if remaining == 0 {
1365                    break Ok(());
1366                }
1367                let want =
1368                    usize::try_from(remaining.min(scratch.len() as u64)).unwrap_or(scratch.len());
1369                let (head, _) = scratch.split_at_mut(want);
1370                match ctx.reader.read(head) {
1371                    Ok(0) => {
1372                        break Err(io::Error::new(
1373                            io::ErrorKind::UnexpectedEof,
1374                            alloc::format!(
1375                                "parity trailer truncated: read {} of {parity_len} bytes",
1376                                parity_len - remaining
1377                            ),
1378                        ));
1379                    }
1380                    Ok(n) => remaining -= n as u64,
1381                    Err(e) => {
1382                        // EINTR is transient: retry the read rather than aborting
1383                        // the parity skip with a spurious DataReadError (matches
1384                        // the Interrupted handling in read_exact above). Convert
1385                        // first so the kind check is uniform across std/no_std.
1386                        let e: io::Error = e.into();
1387                        if e.kind() != io::ErrorKind::Interrupted {
1388                            break Err(e);
1389                        }
1390                    }
1391                }
1392            };
1393            if let Err(error) = drain {
1394                ctx.errors.push(BlockVerifyError::DataReadError {
1395                    table_id: ctx.table_id,
1396                    path: ctx.path.to_path_buf(),
1397                    offset,
1398                    data_length: header.data_length,
1399                    error,
1400                });
1401                return;
1402            }
1403        }
1404
1405        // blocks_scanned was already incremented right after a
1406        // successful Header::decode_from above — do not double-count
1407        // here.
1408        // Advance past this block. Each term is bounded (data_length capped
1409        // above, parity derived from it, header a const) and `offset` is bounded
1410        // by the section end, so the running cursor cannot overflow u64.
1411        offset += header_len + data_length_u64 + parity_len;
1412    }
1413}
1414
1415#[cfg(test)]
1416#[expect(clippy::unwrap_used, clippy::expect_used, reason = "test assertions")]
1417mod block_verify_tests;