Skip to main content

nautilus_event_store/verifier/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Off-trader verifier that proves a run file's integrity before the trader opens it.
17//!
18//! See `README.md` "Storage backend" and "Determinism contract" sections for the SPEC
19//! posture: redb 4.x does not framewise-checksum data pages, so a `zero-tail` corruption
20//! opens cleanly and panics on first read. The verifier therefore exercises every entry
21//! and every stored index pair, accumulating findings so a single run produces one
22//! actionable report rather than failing fast on the first hit. The supervisor runs the
23//! verifier in an isolated process so a bad file aborts the verifier, not trading.
24//!
25//! Scope:
26//!
27//! - Walk every `seq` over `[1, high_watermark]` and recompute [`crate::EntryHash`].
28//! - Detect gaps in the seq sequence (the SPEC's gap-detection idempotency primitive).
29//! - Validate that every `client_order_id` and `venue_order_id` stored target seq still
30//!   resolves to a clean entry; full payload-derived rebuild is deferred until the
31//!   wrapper-type encoders land.
32//! - Validate manifest invariants: `high_watermark` matches the durable last seq, the
33//!   recorded `start_ts_init` and `end_ts_init` bracket the entry stream, and a sealed
34//!   manifest's status is a terminal state.
35//!
36//! The library API stays narrow: a single [`Verifier`] type that owns a backend, a
37//! [`VerifyReport`] structured for downstream operator tooling, and a [`VerifyError`]
38//! reserved for failures that prevent the verifier from producing any report at all.
39
40use std::{collections::BTreeSet, fmt::Debug, path::Path};
41
42use crate::{
43    backend::{EventStore, IndexKind, RedbBackend},
44    entry::EventStoreEntry,
45    error::EventStoreError,
46    manifest::{RunId, RunManifest, RunStatus},
47};
48
49/// Verifier over a single open run.
50///
51/// Constructed either by passing an already-open backend ([`Verifier::new`]) or by
52/// opening a sealed redb file directly ([`Verifier::open_redb`] or
53/// [`Verifier::open_redb_file`]). The verifier never mutates the backend; it walks the
54/// entry table and the secondary indices, then emits a typed [`VerifyReport`].
55pub struct Verifier {
56    backend: Box<dyn EventStore>,
57}
58
59impl Debug for Verifier {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        f.debug_struct(stringify!(Verifier)).finish_non_exhaustive()
62    }
63}
64
65impl Verifier {
66    /// Wraps an already-open backend for read-only verification.
67    #[must_use]
68    pub fn new(backend: Box<dyn EventStore>) -> Self {
69        Self { backend }
70    }
71
72    /// Opens a sealed redb run file at `<base_dir>/<instance_id>/<run_id>.redb` and
73    /// wraps it for verification.
74    ///
75    /// Mirrors [`crate::backend::RedbBackend::open_sealed`]: only sealed files are
76    /// accepted, since opening a still-`Running` file would race with a live writer
77    /// and break the off-trader-process posture.
78    ///
79    /// # Errors
80    ///
81    /// Returns [`VerifyError::Backend`] when the underlying backend rejects the open
82    /// (file missing, run still `Running`, header corruption).
83    pub fn open_redb(
84        base_dir: impl AsRef<Path>,
85        instance_id: &str,
86        run_id: &str,
87    ) -> Result<Self, VerifyError> {
88        let backend =
89            RedbBackend::open_sealed(base_dir.as_ref().to_path_buf(), instance_id, run_id)?;
90        Ok(Self {
91            backend: Box::new(backend),
92        })
93    }
94
95    /// Opens a sealed redb run file directly by path and wraps it for verification.
96    ///
97    /// The backend uses a read-only database handle for this path. The verifier
98    /// reports findings, but it never seals or quarantines the file; a supervisor or
99    /// operator process decides that policy from the returned [`VerifyReport`].
100    ///
101    /// # Errors
102    ///
103    /// Returns [`VerifyError::Backend`] when the underlying backend rejects the open
104    /// (file missing, run still `Running`, header corruption).
105    pub fn open_redb_file(path: impl AsRef<Path>) -> Result<Self, VerifyError> {
106        let backend = RedbBackend::open_sealed_file(path.as_ref().to_path_buf())?;
107        Ok(Self {
108            backend: Box::new(backend),
109        })
110    }
111
112    /// Returns a reference to the wrapped backend.
113    #[must_use]
114    pub fn backend(&self) -> &dyn EventStore {
115        self.backend.as_ref()
116    }
117
118    /// Performs a full integrity scan of the open run and returns the typed report.
119    ///
120    /// `verify` reads the manifest, walks every `seq` in `[1, high_watermark]`,
121    /// cross-checks the stored client- and venue-order-id indices, and validates manifest
122    /// invariants. Hash mismatches, gaps, index drift, and manifest mismatches surface as
123    /// [`VerifyFinding`]s on the returned report; only failures that prevent the verifier
124    /// from producing a report at all surface as [`VerifyError`].
125    ///
126    /// # Errors
127    ///
128    /// Returns [`VerifyError::Backend`] when the backend refuses a read-side
129    /// operation (no run open, disk pressure, manifest decode failure).
130    pub fn verify(&self) -> Result<VerifyReport, VerifyError> {
131        let manifest = self.backend.manifest()?;
132        let high_watermark = self.backend.high_watermark()?;
133
134        let mut findings = Vec::new();
135        let scan = self.scan_entries(high_watermark, &mut findings)?;
136
137        self.cross_check_indices(&scan, &mut findings)?;
138        validate_manifest(&manifest, high_watermark, &scan, &mut findings);
139
140        Ok(VerifyReport {
141            run_id: manifest.run_id.clone(),
142            status: manifest.status,
143            high_watermark,
144            entries_scanned: scan.scanned,
145            findings,
146        })
147    }
148
149    fn scan_entries(
150        &self,
151        high_watermark: u64,
152        findings: &mut Vec<VerifyFinding>,
153    ) -> Result<EntryScan, VerifyError> {
154        let mut scanned: u64 = 0;
155        let mut min_ts: Option<u64> = None;
156        let mut max_ts: Option<u64> = None;
157        let mut clean_seqs: BTreeSet<u64> = BTreeSet::new();
158        let mut corrupted_seqs: BTreeSet<u64> = BTreeSet::new();
159        let mut gap_cursor: Option<u64> = None;
160
161        for seq in 1..=high_watermark {
162            match self.backend.scan_seq(seq) {
163                Ok(Some(entry)) => {
164                    flush_pending_gap(seq, &mut gap_cursor, findings);
165
166                    // The recomputed hash check inside scan_seq covers the entry
167                    // contents, but the entry's embedded seq is one of those
168                    // contents: a row whose value is moved or duplicated under a
169                    // different table key still hashes correctly. Cross-check the
170                    // table key against the embedded seq so the verifier catches
171                    // that class of corruption rather than reporting a clean run.
172                    if entry.seq != seq {
173                        findings.push(VerifyFinding::SeqMismatch {
174                            table_key: seq,
175                            embedded_seq: entry.seq,
176                        });
177                        corrupted_seqs.insert(seq);
178                        scanned += 1;
179                        continue;
180                    }
181                    record_entry(&entry, &mut min_ts, &mut max_ts);
182                    clean_seqs.insert(seq);
183                    scanned += 1;
184                }
185                Ok(None) | Err(EventStoreError::Gap { .. }) => {
186                    extend_pending_gap(seq, &mut gap_cursor);
187                }
188                Err(EventStoreError::HashMismatch { seq: bad }) => {
189                    flush_pending_gap(seq, &mut gap_cursor, findings);
190                    findings.push(VerifyFinding::HashMismatch { seq: bad });
191                    corrupted_seqs.insert(seq);
192                    scanned += 1;
193                }
194                Err(other) => return Err(VerifyError::Backend(other)),
195            }
196        }
197
198        flush_pending_gap(high_watermark + 1, &mut gap_cursor, findings);
199
200        Ok(EntryScan {
201            scanned,
202            min_ts,
203            max_ts,
204            clean_seqs,
205            corrupted_seqs,
206        })
207    }
208
209    fn cross_check_indices(
210        &self,
211        scan: &EntryScan,
212        findings: &mut Vec<VerifyFinding>,
213    ) -> Result<(), VerifyError> {
214        for kind in [IndexKind::ClientOrderId, IndexKind::VenueOrderId] {
215            for (key, stored_seq) in self.backend.iter_index_keys(kind)? {
216                let drift = classify_target(stored_seq, scan);
217                if let Some(drift) = drift {
218                    findings.push(VerifyFinding::IndexDrift { kind, key, drift });
219                }
220            }
221        }
222
223        Ok(())
224    }
225}
226
227#[derive(Debug)]
228struct EntryScan {
229    scanned: u64,
230    min_ts: Option<u64>,
231    max_ts: Option<u64>,
232    clean_seqs: BTreeSet<u64>,
233    corrupted_seqs: BTreeSet<u64>,
234}
235
236fn record_entry(entry: &EventStoreEntry, min_ts: &mut Option<u64>, max_ts: &mut Option<u64>) {
237    let ts = entry.ts_init.as_u64();
238    *min_ts = Some(min_ts.map_or(ts, |cur| cur.min(ts)));
239    *max_ts = Some(max_ts.map_or(ts, |cur| cur.max(ts)));
240}
241
242fn extend_pending_gap(seq: u64, gap_cursor: &mut Option<u64>) {
243    if gap_cursor.is_none() {
244        *gap_cursor = Some(seq);
245    }
246}
247
248fn flush_pending_gap(
249    next_seq: u64,
250    gap_cursor: &mut Option<u64>,
251    findings: &mut Vec<VerifyFinding>,
252) {
253    if let Some(start) = gap_cursor.take() {
254        findings.push(VerifyFinding::Gap {
255            range: GapRange {
256                from: start,
257                to: next_seq - 1,
258            },
259        });
260    }
261}
262
263fn classify_target(stored_seq: u64, scan: &EntryScan) -> Option<IndexDrift> {
264    if scan.clean_seqs.contains(&stored_seq) {
265        None
266    } else if scan.corrupted_seqs.contains(&stored_seq) {
267        Some(IndexDrift::TargetCorrupted { stored_seq })
268    } else {
269        Some(IndexDrift::DanglingTarget { stored_seq })
270    }
271}
272
273fn validate_manifest(
274    manifest: &RunManifest,
275    high_watermark: u64,
276    scan: &EntryScan,
277    findings: &mut Vec<VerifyFinding>,
278) {
279    if manifest.high_watermark != high_watermark {
280        findings.push(VerifyFinding::ManifestMismatch {
281            kind: ManifestField::HighWatermark,
282            reason: format!(
283                "manifest high_watermark {} disagrees with durable high_watermark {high_watermark}",
284                manifest.high_watermark,
285            ),
286        });
287    }
288
289    if let Some(min_ts) = scan.min_ts
290        && manifest.start_ts_init.as_u64() > min_ts
291    {
292        findings.push(VerifyFinding::ManifestMismatch {
293            kind: ManifestField::StartTsInit,
294            reason: format!(
295                "manifest start_ts_init {} sits above earliest entry ts_init {min_ts}",
296                manifest.start_ts_init.as_u64(),
297            ),
298        });
299    }
300
301    if manifest.is_sealed() {
302        match (manifest.end_ts_init.map(|t| t.as_u64()), scan.max_ts) {
303            (Some(stored), Some(observed)) if stored != observed => {
304                findings.push(VerifyFinding::ManifestMismatch {
305                    kind: ManifestField::EndTsInit,
306                    reason: format!(
307                        "manifest end_ts_init {stored} disagrees with last observed ts_init {observed}",
308                    ),
309                });
310            }
311            (None, Some(observed)) => findings.push(VerifyFinding::ManifestMismatch {
312                kind: ManifestField::EndTsInit,
313                reason: format!(
314                    "sealed manifest is missing end_ts_init while entries up to ts_init {observed} exist",
315                ),
316            }),
317            (Some(stored), None) => findings.push(VerifyFinding::ManifestMismatch {
318                kind: ManifestField::EndTsInit,
319                reason: format!(
320                    "sealed manifest carries end_ts_init {stored} despite empty entry table",
321                ),
322            }),
323            _ => {}
324        }
325    }
326}
327
328/// The structured report produced by [`Verifier::verify`].
329///
330/// Operators key on [`VerifyReport::is_clean`] for the binary verdict and walk
331/// [`VerifyReport::findings`] for the actionable items. The verifier never
332/// quarantines on its own: that is the supervisor's call given the report.
333#[derive(Debug, Clone, PartialEq, Eq)]
334pub struct VerifyReport {
335    /// The id of the verified run, copied from the manifest.
336    pub run_id: RunId,
337    /// The lifecycle status the run carried at verification time.
338    pub status: RunStatus,
339    /// The durable high-watermark the verifier walked up to.
340    pub high_watermark: u64,
341    /// The number of `seq` slots the verifier successfully read (clean or hash-mismatched).
342    pub entries_scanned: u64,
343    /// Every integrity finding the verifier accumulated.
344    pub findings: Vec<VerifyFinding>,
345}
346
347impl VerifyReport {
348    /// Returns `true` when the verifier accumulated no findings.
349    #[must_use]
350    pub fn is_clean(&self) -> bool {
351        self.findings.is_empty()
352    }
353}
354
355/// One actionable integrity finding from a verifier run.
356#[derive(Debug, Clone, PartialEq, Eq)]
357pub enum VerifyFinding {
358    /// The recomputed canonical hash of `seq` did not match the stored value.
359    HashMismatch {
360        /// The sequence number whose hash diverged.
361        seq: u64,
362    },
363    /// One or more contiguous `seq` slots inside the high-watermark are missing.
364    Gap {
365        /// The inclusive range of missing seqs.
366        range: GapRange,
367    },
368    /// The entry stored at table key `table_key` carries an `entry.seq` that
369    /// disagrees with the key.
370    ///
371    /// The canonical hash hashes `entry.seq` rather than the table key, so a row
372    /// whose bytes were moved or duplicated under a different key still passes the
373    /// hash check. The verifier surfaces the divergence so that class of
374    /// corruption never reads as clean.
375    SeqMismatch {
376        /// The redb table key (the slot the verifier was reading).
377        table_key: u64,
378        /// The seq embedded inside the decoded entry value.
379        embedded_seq: u64,
380    },
381    /// A stored sidecar index entry diverges from the projection rebuilt from the
382    /// entry table.
383    IndexDrift {
384        /// Which sidecar index the finding applies to.
385        kind: IndexKind,
386        /// The stringified key inside that index.
387        key: String,
388        /// The kind of drift observed.
389        drift: IndexDrift,
390    },
391    /// A manifest field disagrees with the entry table or violates a sealed-state
392    /// invariant.
393    ManifestMismatch {
394        /// Which manifest field the finding applies to.
395        kind: ManifestField,
396        /// Operator-readable explanation of the mismatch.
397        reason: String,
398    },
399}
400
401/// An inclusive `[from, to]` range of missing seqs.
402#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
403pub struct GapRange {
404    /// First missing seq.
405    pub from: u64,
406    /// Last missing seq.
407    pub to: u64,
408}
409
410/// The kind of drift observed for a sidecar index key.
411///
412/// Today the verifier only reports target reachability for the `client_order_id` and
413/// `venue_order_id` indices because the rebuild is not yet payload-aware. Variants for
414/// rebuild-vs-stored mismatches (missing from stored, divergent seq, unknown key) will
415/// land when wrapper-type encoders provide a payload-derived projection.
416#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
417pub enum IndexDrift {
418    /// The stored index points at a seq that does not exist inside the high-watermark.
419    DanglingTarget {
420        /// The seq the stored index recorded.
421        stored_seq: u64,
422    },
423    /// The stored index points at a seq whose entry failed the hash check.
424    TargetCorrupted {
425        /// The seq the stored index recorded.
426        stored_seq: u64,
427    },
428}
429
430/// A manifest field flagged by [`VerifyFinding::ManifestMismatch`].
431#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
432pub enum ManifestField {
433    /// `manifest.high_watermark` does not match the durable last seq.
434    HighWatermark,
435    /// `manifest.start_ts_init` sits above the earliest observed `ts_init`.
436    StartTsInit,
437    /// `manifest.end_ts_init` does not bracket the entry stream as expected for a
438    /// sealed run.
439    EndTsInit,
440}
441
442/// Errors that prevent the verifier from producing a report at all.
443///
444/// Findings on a successful report cover the operator's actionable surface; this
445/// type captures the verifier's own failure modes (no run open, disk pressure on a
446/// read, manifest header damage that prevents loading the manifest).
447#[derive(Debug, thiserror::Error)]
448pub enum VerifyError {
449    /// A backend operation refused service before the verifier could produce a report.
450    #[error("backend access failed: {0}")]
451    Backend(#[from] EventStoreError),
452}
453
454#[cfg(test)]
455mod tests {
456    use bytes::Bytes;
457    use indexmap::IndexMap;
458    use nautilus_core::UnixNanos;
459    use rstest::{fixture, rstest};
460    use ustr::Ustr;
461
462    use super::*;
463    use crate::{
464        backend::{AppendEntry, IndexKey, MemoryBackend, ScanDirection},
465        compute_entry_hash,
466        entry::Topic,
467        headers::Headers,
468        manifest::{RegisteredComponents, RunManifest, RunStatus},
469    };
470
471    fn manifest(run_id: &str) -> RunManifest {
472        RunManifest {
473            run_id: run_id.to_string(),
474            parent_run_id: None,
475            instance_id: "trader-001".to_string(),
476            binary_hash: "deadbeef".to_string(),
477            schema_version: 1,
478            crate_versions: "feedface".to_string(),
479            feature_flags: Vec::new(),
480            adapter_versions: IndexMap::new(),
481            config_hash: "cafebabe".to_string(),
482            registered_components: RegisteredComponents::default(),
483            seed: None,
484            start_ts_init: UnixNanos::from(0),
485            end_ts_init: None,
486            high_watermark: 0,
487            status: RunStatus::Running,
488        }
489    }
490
491    fn build_entry(seq: u64, headers: Headers, ts_init: u64) -> EventStoreEntry {
492        let topic: Topic = "exec.command.SubmitOrder".into();
493        let payload_type = Ustr::from("SubmitOrder");
494        let payload = Bytes::from_static(b"\x01\x02\x03\x04");
495        let ts_publish = UnixNanos::from(ts_init + 1);
496        let ts_init = UnixNanos::from(ts_init);
497        let hash = compute_entry_hash(
498            seq,
499            ts_init,
500            ts_publish,
501            topic.as_ref(),
502            payload_type.as_str(),
503            &payload,
504            &headers,
505        );
506
507        EventStoreEntry::new(
508            hash,
509            seq,
510            headers,
511            topic,
512            payload_type,
513            payload,
514            ts_init,
515            ts_publish,
516        )
517    }
518
519    fn append_with(seq: u64, ts_init: u64, index_keys: Vec<IndexKey>) -> AppendEntry {
520        AppendEntry::new(build_entry(seq, Headers::empty(), ts_init), index_keys)
521    }
522
523    /// Test-only wrapper that delegates every call to an inner backend except the
524    /// manifest, which it returns verbatim, and (optionally) the high-watermark.
525    /// Lets unit tests drive manifest-mismatch and trailing-gap findings the
526    /// public `MemoryBackend` API would normalize away on seal.
527    struct ManifestOverrideBackend {
528        inner: MemoryBackend,
529        manifest_override: RunManifest,
530        high_watermark_override: Option<u64>,
531    }
532
533    impl ManifestOverrideBackend {
534        fn new(inner: MemoryBackend, manifest_override: RunManifest) -> Self {
535            Self {
536                inner,
537                manifest_override,
538                high_watermark_override: None,
539            }
540        }
541
542        fn with_high_watermark(mut self, hwm: u64) -> Self {
543            self.high_watermark_override = Some(hwm);
544            self
545        }
546    }
547
548    impl EventStore for ManifestOverrideBackend {
549        fn open_run(&mut self, m: RunManifest) -> Result<(), EventStoreError> {
550            self.inner.open_run(m)
551        }
552
553        fn append_batch(&mut self, entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
554            self.inner.append_batch(entries)
555        }
556
557        fn scan_range(
558            &self,
559            from: u64,
560            to: u64,
561            direction: ScanDirection,
562        ) -> Result<Vec<EventStoreEntry>, EventStoreError> {
563            self.inner.scan_range(from, to, direction)
564        }
565
566        fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
567            self.inner.scan_seq(seq)
568        }
569
570        fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
571            self.inner.lookup(kind, key)
572        }
573
574        fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
575            self.inner.iter_index_keys(kind)
576        }
577
578        fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
579            self.inner.seal(status)
580        }
581
582        fn manifest(&self) -> Result<RunManifest, EventStoreError> {
583            Ok(self.manifest_override.clone())
584        }
585
586        fn high_watermark(&self) -> Result<u64, EventStoreError> {
587            if let Some(hwm) = self.high_watermark_override {
588                return Ok(hwm);
589            }
590            self.inner.high_watermark()
591        }
592    }
593
594    #[fixture]
595    fn open_backend() -> MemoryBackend {
596        let mut backend = MemoryBackend::new();
597        backend
598            .open_run(manifest("1700000000-aaaa1111"))
599            .expect("open run");
600        backend
601    }
602
603    fn verifier_for(backend: MemoryBackend) -> Verifier {
604        Verifier::new(Box::new(backend))
605    }
606
607    #[rstest]
608    fn clean_run_reports_no_findings(mut open_backend: MemoryBackend) {
609        open_backend
610            .append_batch(&[
611                append_with(1, 10, Vec::new()),
612                append_with(2, 11, Vec::new()),
613                append_with(3, 12, Vec::new()),
614            ])
615            .expect("append");
616        open_backend.seal(RunStatus::Ended).expect("seal");
617
618        let report = verifier_for(open_backend).verify().expect("verify");
619
620        // Lock the canonical clean case to zero findings exactly: any spurious
621        // additional finding must fail this test rather than slip past is_clean()
622        // matchers in the more targeted suites below.
623        assert!(report.is_clean(), "findings was: {:?}", report.findings);
624        assert_eq!(report.findings.len(), 0);
625        assert_eq!(report.high_watermark, 3);
626        assert_eq!(report.entries_scanned, 3);
627        assert_eq!(report.status, RunStatus::Ended);
628    }
629
630    #[rstest]
631    fn empty_run_reports_no_findings(mut open_backend: MemoryBackend) {
632        open_backend.seal(RunStatus::Ended).expect("seal");
633
634        let report = verifier_for(open_backend).verify().expect("verify");
635
636        assert!(report.is_clean(), "findings was: {:?}", report.findings);
637        assert_eq!(report.entries_scanned, 0);
638    }
639
640    #[rstest]
641    fn hash_mismatch_surfaces_per_seq(mut open_backend: MemoryBackend) {
642        open_backend
643            .append_batch(&[append_with(1, 10, Vec::new())])
644            .expect("append");
645        let mut tampered = build_entry(2, Headers::empty(), 11);
646        tampered.payload = Bytes::from_static(b"\xFF");
647        open_backend
648            .append_batch(&[AppendEntry::without_indices(tampered)])
649            .expect("append");
650        open_backend
651            .append_batch(&[append_with(3, 12, Vec::new())])
652            .expect("append");
653
654        let report = verifier_for(open_backend).verify().expect("verify");
655
656        assert!(
657            report
658                .findings
659                .iter()
660                .any(|f| matches!(f, VerifyFinding::HashMismatch { seq: 2 })),
661            "findings was: {:?}",
662            report.findings,
663        );
664        assert_eq!(report.entries_scanned, 3);
665        assert_eq!(report.high_watermark, 3);
666    }
667
668    #[rstest]
669    fn multiple_hash_mismatches_all_surface(mut open_backend: MemoryBackend) {
670        // Confirms the verifier walks past hash mismatches instead of bailing on the
671        // first hit: seq=2 and seq=4 are both tampered, and both must appear in the
672        // report.
673        for seq in 1..=4u64 {
674            let mut entry = build_entry(seq, Headers::empty(), 10 + seq);
675            if seq == 2 || seq == 4 {
676                entry.payload = Bytes::from_static(b"\xFF");
677            }
678            open_backend
679                .append_batch(&[AppendEntry::without_indices(entry)])
680                .expect("append");
681        }
682
683        let report = verifier_for(open_backend).verify().expect("verify");
684
685        let mismatch_seqs: Vec<u64> = report
686            .findings
687            .iter()
688            .filter_map(|f| match f {
689                VerifyFinding::HashMismatch { seq } => Some(*seq),
690                _ => None,
691            })
692            .collect();
693        assert_eq!(mismatch_seqs, vec![2, 4]);
694    }
695
696    #[rstest]
697    fn client_order_id_index_clean_when_target_resolves(mut open_backend: MemoryBackend) {
698        open_backend
699            .append_batch(&[AppendEntry::new(
700                build_entry(1, Headers::empty(), 10),
701                vec![IndexKey::new(IndexKind::ClientOrderId, "O-1".to_string())],
702            )])
703            .expect("append");
704        open_backend.seal(RunStatus::Ended).expect("seal");
705
706        let report = verifier_for(open_backend).verify().expect("verify");
707
708        assert!(report.is_clean(), "findings was: {:?}", report.findings);
709    }
710
711    #[rstest]
712    #[case::client_order_id(IndexKind::ClientOrderId)]
713    #[case::venue_order_id(IndexKind::VenueOrderId)]
714    fn entity_index_target_corrupted_drift(
715        mut open_backend: MemoryBackend,
716        #[case] kind: IndexKind,
717    ) {
718        // Stored entity-index entry points at seq=1 whose stored hash no longer
719        // matches the recomputed hash. The verifier must surface TargetCorrupted
720        // for both ClientOrderId and VenueOrderId so a drop of either kind from
721        // the cross-check loop fails this test.
722        let mut tampered = build_entry(1, Headers::empty(), 10);
723        tampered.payload = Bytes::from_static(b"\xFF");
724        open_backend
725            .append_batch(&[AppendEntry::new(
726                tampered,
727                vec![IndexKey::new(kind, "K-1".to_string())],
728            )])
729            .expect("append");
730
731        let report = verifier_for(open_backend).verify().expect("verify");
732
733        assert!(
734            report.findings.iter().any(|f| matches!(
735                f,
736                VerifyFinding::IndexDrift {
737                    kind: drift_kind,
738                    drift: IndexDrift::TargetCorrupted { stored_seq: 1 },
739                    ..
740                } if *drift_kind == kind
741            )),
742            "findings was: {:?}",
743            report.findings,
744        );
745    }
746
747    fn find_manifest_mismatch(findings: &[VerifyFinding], target: ManifestField) -> &str {
748        findings
749            .iter()
750            .find_map(|f| match f {
751                VerifyFinding::ManifestMismatch { kind, reason } if *kind == target => {
752                    Some(reason.as_str())
753                }
754                _ => None,
755            })
756            .unwrap_or_else(|| {
757                panic!("expected ManifestMismatch({target:?}), findings was: {findings:?}")
758            })
759    }
760
761    #[rstest]
762    fn manifest_high_watermark_drift() {
763        // Real durable hwm is 1, but the manifest reports 99. Verifier must surface
764        // a HighWatermark mismatch whose reason carries both values so a swap of
765        // observed and stored sides would fail this test.
766        let mut inner = MemoryBackend::new();
767        inner.open_run(manifest("run-hwm")).expect("open run");
768        inner
769            .append_batch(&[append_with(1, 10, Vec::new())])
770            .expect("append");
771        inner.seal(RunStatus::Ended).expect("seal");
772
773        let mut stale = inner.manifest().expect("manifest");
774        stale.high_watermark = 99;
775        let backend = ManifestOverrideBackend::new(inner, stale);
776
777        let report = Verifier::new(Box::new(backend)).verify().expect("verify");
778        let reason = find_manifest_mismatch(&report.findings, ManifestField::HighWatermark);
779
780        assert!(reason.contains("99"), "reason was: {reason}");
781        assert!(reason.contains('1'), "reason was: {reason}");
782    }
783
784    #[rstest]
785    fn manifest_end_ts_init_drift_when_sealed() {
786        // Real durable max ts_init is 25, but the sealed manifest's end_ts_init
787        // claims 99. The reason must surface both values; without that assertion,
788        // a min/max swap inside record_entry would still pass.
789        let mut inner = MemoryBackend::new();
790        inner.open_run(manifest("run-end-ts")).expect("open run");
791        inner
792            .append_batch(&[
793                append_with(1, 10, Vec::new()),
794                append_with(2, 25, Vec::new()),
795            ])
796            .expect("append");
797        inner.seal(RunStatus::Ended).expect("seal");
798
799        let mut drifted = inner.manifest().expect("manifest");
800        drifted.end_ts_init = Some(UnixNanos::from(99));
801        let backend = ManifestOverrideBackend::new(inner, drifted);
802
803        let report = Verifier::new(Box::new(backend)).verify().expect("verify");
804        let reason = find_manifest_mismatch(&report.findings, ManifestField::EndTsInit);
805
806        assert!(reason.contains("99"), "reason was: {reason}");
807        assert!(reason.contains("25"), "reason was: {reason}");
808    }
809
810    #[rstest]
811    fn manifest_end_ts_init_missing_when_sealed_with_entries() {
812        // Sealed manifest forgot to record end_ts_init while the entry stream is
813        // non-empty: validate_manifest's (None, Some) arm must fire and the
814        // reason must carry the observed last ts_init.
815        let mut inner = MemoryBackend::new();
816        inner
817            .open_run(manifest("run-end-ts-missing"))
818            .expect("open run");
819        inner
820            .append_batch(&[append_with(1, 42, Vec::new())])
821            .expect("append");
822        inner.seal(RunStatus::Ended).expect("seal");
823
824        let mut drifted = inner.manifest().expect("manifest");
825        drifted.end_ts_init = None;
826        let backend = ManifestOverrideBackend::new(inner, drifted);
827
828        let report = Verifier::new(Box::new(backend)).verify().expect("verify");
829        let reason = find_manifest_mismatch(&report.findings, ManifestField::EndTsInit);
830
831        assert!(reason.contains("missing"), "reason was: {reason}");
832        assert!(reason.contains("42"), "reason was: {reason}");
833    }
834
835    #[rstest]
836    fn manifest_end_ts_init_set_on_sealed_empty_run() {
837        // Sealed manifest carries end_ts_init even though the entry table is
838        // empty: validate_manifest's (Some, None) arm must fire and the reason
839        // must carry the spurious stored value.
840        let mut inner = MemoryBackend::new();
841        inner
842            .open_run(manifest("run-end-ts-empty"))
843            .expect("open run");
844        inner.seal(RunStatus::Ended).expect("seal");
845
846        let mut drifted = inner.manifest().expect("manifest");
847        drifted.end_ts_init = Some(UnixNanos::from(77));
848        let backend = ManifestOverrideBackend::new(inner, drifted);
849
850        let report = Verifier::new(Box::new(backend)).verify().expect("verify");
851        let reason = find_manifest_mismatch(&report.findings, ManifestField::EndTsInit);
852
853        assert!(reason.contains("77"), "reason was: {reason}");
854        assert!(reason.contains("empty"), "reason was: {reason}");
855    }
856
857    #[rstest]
858    fn manifest_start_ts_init_drift() {
859        // Earliest entry ts_init is 10, but the manifest's start_ts_init is 50.
860        // Reason must carry both values so a flipped comparison or wrong-side
861        // formatting fails the test.
862        let mut inner = MemoryBackend::new();
863        inner.open_run(manifest("run-start-ts")).expect("open run");
864        inner
865            .append_batch(&[
866                append_with(1, 10, Vec::new()),
867                append_with(2, 25, Vec::new()),
868            ])
869            .expect("append");
870        inner.seal(RunStatus::Ended).expect("seal");
871
872        let mut drifted = inner.manifest().expect("manifest");
873        drifted.start_ts_init = UnixNanos::from(50);
874        let backend = ManifestOverrideBackend::new(inner, drifted);
875
876        let report = Verifier::new(Box::new(backend)).verify().expect("verify");
877        let reason = find_manifest_mismatch(&report.findings, ManifestField::StartTsInit);
878
879        assert!(reason.contains("50"), "reason was: {reason}");
880        assert!(reason.contains("10"), "reason was: {reason}");
881    }
882
883    #[rstest]
884    fn trailing_gap_surfaces_when_last_seqs_missing() {
885        // Inner backend holds seqs 1..=3, but both the manifest and the advertised
886        // high-watermark claim 5. The verifier must walk to seq=5, find seqs 4-5
887        // missing, and emit a single trailing GapRange{4,5}. Removing the
888        // `flush_pending_gap(high_watermark + 1, ...)` call after the loop would
889        // drop this finding entirely.
890        let mut inner = MemoryBackend::new();
891        inner
892            .open_run(manifest("run-trailing-gap"))
893            .expect("open run");
894        inner
895            .append_batch(&[
896                append_with(1, 10, Vec::new()),
897                append_with(2, 11, Vec::new()),
898                append_with(3, 12, Vec::new()),
899            ])
900            .expect("append");
901        inner.seal(RunStatus::Ended).expect("seal");
902
903        let mut drifted = inner.manifest().expect("manifest");
904        drifted.high_watermark = 5;
905        // Advertise hwm=5 on both sides so the HighWatermark mismatch path stays
906        // quiet and the test pins only the trailing-gap behavior.
907        let backend = ManifestOverrideBackend::new(inner, drifted).with_high_watermark(5);
908
909        let report = Verifier::new(Box::new(backend)).verify().expect("verify");
910
911        let gaps: Vec<GapRange> = report
912            .findings
913            .iter()
914            .filter_map(|f| match f {
915                VerifyFinding::Gap { range } => Some(*range),
916                _ => None,
917            })
918            .collect();
919        assert_eq!(gaps, vec![GapRange { from: 4, to: 5 }]);
920        assert_eq!(report.entries_scanned, 3);
921        assert_eq!(report.high_watermark, 5);
922    }
923
924    /// Test backend that rewrites a single `scan_seq` result so the value's
925    /// embedded seq disagrees with the requested table key. Lets the unit suite
926    /// exercise the redb-only "row moved under wrong key" corruption class
927    /// without setting up a real on-disk file.
928    struct SeqRewriteBackend {
929        inner: MemoryBackend,
930        target_key: u64,
931        substitute: EventStoreEntry,
932    }
933
934    impl EventStore for SeqRewriteBackend {
935        fn open_run(&mut self, m: RunManifest) -> Result<(), EventStoreError> {
936            self.inner.open_run(m)
937        }
938        fn append_batch(&mut self, e: &[AppendEntry]) -> Result<u64, EventStoreError> {
939            self.inner.append_batch(e)
940        }
941        fn scan_range(
942            &self,
943            from: u64,
944            to: u64,
945            direction: ScanDirection,
946        ) -> Result<Vec<EventStoreEntry>, EventStoreError> {
947            self.inner.scan_range(from, to, direction)
948        }
949        fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
950            if seq == self.target_key {
951                return Ok(Some(self.substitute.clone()));
952            }
953            self.inner.scan_seq(seq)
954        }
955        fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
956            self.inner.lookup(kind, key)
957        }
958        fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
959            self.inner.iter_index_keys(kind)
960        }
961        fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
962            self.inner.seal(status)
963        }
964        fn manifest(&self) -> Result<RunManifest, EventStoreError> {
965            self.inner.manifest()
966        }
967        fn high_watermark(&self) -> Result<u64, EventStoreError> {
968            self.inner.high_watermark()
969        }
970    }
971
972    #[rstest]
973    fn seq_mismatch_surfaces_when_row_value_disagrees_with_key() {
974        // Row at table_key=2 holds the bytes of an entry whose embedded seq is 99.
975        // The hash recomputes correctly (because the hash covers entry.seq=99),
976        // so scan_seq returns Ok(Some(entry)) without raising HashMismatch. The
977        // verifier must catch the key/embedded-seq divergence rather than mark
978        // the slot clean.
979        let mut inner = MemoryBackend::new();
980        inner
981            .open_run(manifest("run-seq-mismatch"))
982            .expect("open run");
983        inner
984            .append_batch(&[
985                append_with(1, 10, Vec::new()),
986                append_with(2, 11, Vec::new()),
987                append_with(3, 12, Vec::new()),
988            ])
989            .expect("append");
990        inner.seal(RunStatus::Ended).expect("seal");
991
992        let substitute = build_entry(99, Headers::empty(), 11);
993        let backend = SeqRewriteBackend {
994            inner,
995            target_key: 2,
996            substitute,
997        };
998
999        let report = Verifier::new(Box::new(backend)).verify().expect("verify");
1000
1001        assert!(
1002            report.findings.iter().any(|f| matches!(
1003                f,
1004                VerifyFinding::SeqMismatch {
1005                    table_key: 2,
1006                    embedded_seq: 99,
1007                }
1008            )),
1009            "findings was: {:?}",
1010            report.findings,
1011        );
1012    }
1013
1014    #[rstest]
1015    fn seq_mismatch_marks_target_corrupted_for_dependent_indices() {
1016        // Same row corruption as above, but the stored client_order_id index
1017        // points at the rewritten slot. The slot must be classified as corrupted
1018        // so the index drift surfaces TargetCorrupted rather than silently
1019        // accepting the lookup.
1020        let mut inner = MemoryBackend::new();
1021        inner
1022            .open_run(manifest("run-seq-mismatch-idx"))
1023            .expect("open run");
1024        inner
1025            .append_batch(&[
1026                append_with(1, 10, Vec::new()),
1027                AppendEntry::new(
1028                    build_entry(2, Headers::empty(), 11),
1029                    vec![IndexKey::new(IndexKind::ClientOrderId, "O-1".to_string())],
1030                ),
1031            ])
1032            .expect("append");
1033        inner.seal(RunStatus::Ended).expect("seal");
1034
1035        let substitute = build_entry(99, Headers::empty(), 11);
1036        let backend = SeqRewriteBackend {
1037            inner,
1038            target_key: 2,
1039            substitute,
1040        };
1041
1042        let report = Verifier::new(Box::new(backend)).verify().expect("verify");
1043
1044        assert!(
1045            report.findings.iter().any(|f| matches!(
1046                f,
1047                VerifyFinding::IndexDrift {
1048                    kind: IndexKind::ClientOrderId,
1049                    drift: IndexDrift::TargetCorrupted { stored_seq: 2 },
1050                    ..
1051                }
1052            )),
1053            "findings was: {:?}",
1054            report.findings,
1055        );
1056    }
1057
1058    #[rstest]
1059    fn verify_propagates_no_run_open_as_error() {
1060        let backend = MemoryBackend::new();
1061        let verifier = Verifier::new(Box::new(backend));
1062
1063        let err = verifier.verify().expect_err("must fail");
1064
1065        match err {
1066            VerifyError::Backend(EventStoreError::Backend(msg)) => {
1067                assert!(msg.contains("no run open"), "msg was: {msg}");
1068            }
1069            VerifyError::Backend(other) => {
1070                panic!("expected Backend(no run open), was {other:?}")
1071            }
1072        }
1073    }
1074}