use std::{collections::BTreeSet, fmt::Debug, path::Path};
use crate::{
backend::{EventStore, IndexKind, RedbBackend},
entry::EventStoreEntry,
error::EventStoreError,
manifest::{RunId, RunManifest, RunStatus},
};
pub struct Verifier {
backend: Box<dyn EventStore>,
}
impl Debug for Verifier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(Verifier)).finish_non_exhaustive()
}
}
impl Verifier {
#[must_use]
pub fn new(backend: Box<dyn EventStore>) -> Self {
Self { backend }
}
pub fn open_redb(
base_dir: impl AsRef<Path>,
instance_id: &str,
run_id: &str,
) -> Result<Self, VerifyError> {
let backend =
RedbBackend::open_sealed(base_dir.as_ref().to_path_buf(), instance_id, run_id)?;
Ok(Self {
backend: Box::new(backend),
})
}
pub fn open_redb_file(path: impl AsRef<Path>) -> Result<Self, VerifyError> {
let backend = RedbBackend::open_sealed_file(path.as_ref().to_path_buf())?;
Ok(Self {
backend: Box::new(backend),
})
}
#[must_use]
pub fn backend(&self) -> &dyn EventStore {
self.backend.as_ref()
}
pub fn verify(&self) -> Result<VerifyReport, VerifyError> {
let manifest = self.backend.manifest()?;
let high_watermark = self.backend.high_watermark()?;
let mut findings = Vec::new();
let scan = self.scan_entries(high_watermark, &mut findings)?;
self.cross_check_indices(&scan, &mut findings)?;
validate_manifest(&manifest, high_watermark, &scan, &mut findings);
Ok(VerifyReport {
run_id: manifest.run_id.clone(),
status: manifest.status,
high_watermark,
entries_scanned: scan.scanned,
findings,
})
}
fn scan_entries(
&self,
high_watermark: u64,
findings: &mut Vec<VerifyFinding>,
) -> Result<EntryScan, VerifyError> {
let mut scanned: u64 = 0;
let mut min_ts: Option<u64> = None;
let mut max_ts: Option<u64> = None;
let mut clean_seqs: BTreeSet<u64> = BTreeSet::new();
let mut corrupted_seqs: BTreeSet<u64> = BTreeSet::new();
let mut gap_cursor: Option<u64> = None;
for seq in 1..=high_watermark {
match self.backend.scan_seq(seq) {
Ok(Some(entry)) => {
flush_pending_gap(seq, &mut gap_cursor, findings);
if entry.seq != seq {
findings.push(VerifyFinding::SeqMismatch {
table_key: seq,
embedded_seq: entry.seq,
});
corrupted_seqs.insert(seq);
scanned += 1;
continue;
}
record_entry(&entry, &mut min_ts, &mut max_ts);
clean_seqs.insert(seq);
scanned += 1;
}
Ok(None) | Err(EventStoreError::Gap { .. }) => {
extend_pending_gap(seq, &mut gap_cursor);
}
Err(EventStoreError::HashMismatch { seq: bad }) => {
flush_pending_gap(seq, &mut gap_cursor, findings);
findings.push(VerifyFinding::HashMismatch { seq: bad });
corrupted_seqs.insert(seq);
scanned += 1;
}
Err(other) => return Err(VerifyError::Backend(other)),
}
}
flush_pending_gap(high_watermark + 1, &mut gap_cursor, findings);
Ok(EntryScan {
scanned,
min_ts,
max_ts,
clean_seqs,
corrupted_seqs,
})
}
fn cross_check_indices(
&self,
scan: &EntryScan,
findings: &mut Vec<VerifyFinding>,
) -> Result<(), VerifyError> {
for kind in [IndexKind::ClientOrderId, IndexKind::VenueOrderId] {
for (key, stored_seq) in self.backend.iter_index_keys(kind)? {
let drift = classify_target(stored_seq, scan);
if let Some(drift) = drift {
findings.push(VerifyFinding::IndexDrift { kind, key, drift });
}
}
}
Ok(())
}
}
#[derive(Debug)]
struct EntryScan {
scanned: u64,
min_ts: Option<u64>,
max_ts: Option<u64>,
clean_seqs: BTreeSet<u64>,
corrupted_seqs: BTreeSet<u64>,
}
fn record_entry(entry: &EventStoreEntry, min_ts: &mut Option<u64>, max_ts: &mut Option<u64>) {
let ts = entry.ts_init.as_u64();
*min_ts = Some(min_ts.map_or(ts, |cur| cur.min(ts)));
*max_ts = Some(max_ts.map_or(ts, |cur| cur.max(ts)));
}
fn extend_pending_gap(seq: u64, gap_cursor: &mut Option<u64>) {
if gap_cursor.is_none() {
*gap_cursor = Some(seq);
}
}
fn flush_pending_gap(
next_seq: u64,
gap_cursor: &mut Option<u64>,
findings: &mut Vec<VerifyFinding>,
) {
if let Some(start) = gap_cursor.take() {
findings.push(VerifyFinding::Gap {
range: GapRange {
from: start,
to: next_seq - 1,
},
});
}
}
fn classify_target(stored_seq: u64, scan: &EntryScan) -> Option<IndexDrift> {
if scan.clean_seqs.contains(&stored_seq) {
None
} else if scan.corrupted_seqs.contains(&stored_seq) {
Some(IndexDrift::TargetCorrupted { stored_seq })
} else {
Some(IndexDrift::DanglingTarget { stored_seq })
}
}
fn validate_manifest(
manifest: &RunManifest,
high_watermark: u64,
scan: &EntryScan,
findings: &mut Vec<VerifyFinding>,
) {
if manifest.high_watermark != high_watermark {
findings.push(VerifyFinding::ManifestMismatch {
kind: ManifestField::HighWatermark,
reason: format!(
"manifest high_watermark {} disagrees with durable high_watermark {high_watermark}",
manifest.high_watermark,
),
});
}
if let Some(min_ts) = scan.min_ts
&& manifest.start_ts_init.as_u64() > min_ts
{
findings.push(VerifyFinding::ManifestMismatch {
kind: ManifestField::StartTsInit,
reason: format!(
"manifest start_ts_init {} sits above earliest entry ts_init {min_ts}",
manifest.start_ts_init.as_u64(),
),
});
}
if manifest.is_sealed() {
match (manifest.end_ts_init.map(|t| t.as_u64()), scan.max_ts) {
(Some(stored), Some(observed)) if stored != observed => {
findings.push(VerifyFinding::ManifestMismatch {
kind: ManifestField::EndTsInit,
reason: format!(
"manifest end_ts_init {stored} disagrees with last observed ts_init {observed}",
),
});
}
(None, Some(observed)) => findings.push(VerifyFinding::ManifestMismatch {
kind: ManifestField::EndTsInit,
reason: format!(
"sealed manifest is missing end_ts_init while entries up to ts_init {observed} exist",
),
}),
(Some(stored), None) => findings.push(VerifyFinding::ManifestMismatch {
kind: ManifestField::EndTsInit,
reason: format!(
"sealed manifest carries end_ts_init {stored} despite empty entry table",
),
}),
_ => {}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct VerifyReport {
pub run_id: RunId,
pub status: RunStatus,
pub high_watermark: u64,
pub entries_scanned: u64,
pub findings: Vec<VerifyFinding>,
}
impl VerifyReport {
#[must_use]
pub fn is_clean(&self) -> bool {
self.findings.is_empty()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum VerifyFinding {
HashMismatch {
seq: u64,
},
Gap {
range: GapRange,
},
SeqMismatch {
table_key: u64,
embedded_seq: u64,
},
IndexDrift {
kind: IndexKind,
key: String,
drift: IndexDrift,
},
ManifestMismatch {
kind: ManifestField,
reason: String,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct GapRange {
pub from: u64,
pub to: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum IndexDrift {
DanglingTarget {
stored_seq: u64,
},
TargetCorrupted {
stored_seq: u64,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ManifestField {
HighWatermark,
StartTsInit,
EndTsInit,
}
#[derive(Debug, thiserror::Error)]
pub enum VerifyError {
#[error("backend access failed: {0}")]
Backend(#[from] EventStoreError),
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use indexmap::IndexMap;
use nautilus_core::UnixNanos;
use rstest::{fixture, rstest};
use ustr::Ustr;
use super::*;
use crate::{
backend::{AppendEntry, IndexKey, MemoryBackend, ScanDirection},
compute_entry_hash,
entry::Topic,
headers::Headers,
manifest::{RegisteredComponents, RunManifest, RunStatus},
};
fn manifest(run_id: &str) -> RunManifest {
RunManifest {
run_id: run_id.to_string(),
parent_run_id: None,
instance_id: "trader-001".to_string(),
binary_hash: "deadbeef".to_string(),
schema_version: 1,
crate_versions: "feedface".to_string(),
feature_flags: Vec::new(),
adapter_versions: IndexMap::new(),
config_hash: "cafebabe".to_string(),
registered_components: RegisteredComponents::default(),
seed: None,
start_ts_init: UnixNanos::from(0),
end_ts_init: None,
high_watermark: 0,
status: RunStatus::Running,
}
}
fn build_entry(seq: u64, headers: Headers, ts_init: u64) -> EventStoreEntry {
let topic: Topic = "exec.command.SubmitOrder".into();
let payload_type = Ustr::from("SubmitOrder");
let payload = Bytes::from_static(b"\x01\x02\x03\x04");
let ts_publish = UnixNanos::from(ts_init + 1);
let ts_init = UnixNanos::from(ts_init);
let hash = compute_entry_hash(
seq,
ts_init,
ts_publish,
topic.as_ref(),
payload_type.as_str(),
&payload,
&headers,
);
EventStoreEntry::new(
hash,
seq,
headers,
topic,
payload_type,
payload,
ts_init,
ts_publish,
)
}
fn append_with(seq: u64, ts_init: u64, index_keys: Vec<IndexKey>) -> AppendEntry {
AppendEntry::new(build_entry(seq, Headers::empty(), ts_init), index_keys)
}
struct ManifestOverrideBackend {
inner: MemoryBackend,
manifest_override: RunManifest,
high_watermark_override: Option<u64>,
}
impl ManifestOverrideBackend {
fn new(inner: MemoryBackend, manifest_override: RunManifest) -> Self {
Self {
inner,
manifest_override,
high_watermark_override: None,
}
}
fn with_high_watermark(mut self, hwm: u64) -> Self {
self.high_watermark_override = Some(hwm);
self
}
}
impl EventStore for ManifestOverrideBackend {
fn open_run(&mut self, m: RunManifest) -> Result<(), EventStoreError> {
self.inner.open_run(m)
}
fn append_batch(&mut self, entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
self.inner.append_batch(entries)
}
fn scan_range(
&self,
from: u64,
to: u64,
direction: ScanDirection,
) -> Result<Vec<EventStoreEntry>, EventStoreError> {
self.inner.scan_range(from, to, direction)
}
fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
self.inner.scan_seq(seq)
}
fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
self.inner.lookup(kind, key)
}
fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
self.inner.iter_index_keys(kind)
}
fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
self.inner.seal(status)
}
fn manifest(&self) -> Result<RunManifest, EventStoreError> {
Ok(self.manifest_override.clone())
}
fn high_watermark(&self) -> Result<u64, EventStoreError> {
if let Some(hwm) = self.high_watermark_override {
return Ok(hwm);
}
self.inner.high_watermark()
}
}
#[fixture]
fn open_backend() -> MemoryBackend {
let mut backend = MemoryBackend::new();
backend
.open_run(manifest("1700000000-aaaa1111"))
.expect("open run");
backend
}
fn verifier_for(backend: MemoryBackend) -> Verifier {
Verifier::new(Box::new(backend))
}
#[rstest]
fn clean_run_reports_no_findings(mut open_backend: MemoryBackend) {
open_backend
.append_batch(&[
append_with(1, 10, Vec::new()),
append_with(2, 11, Vec::new()),
append_with(3, 12, Vec::new()),
])
.expect("append");
open_backend.seal(RunStatus::Ended).expect("seal");
let report = verifier_for(open_backend).verify().expect("verify");
assert!(report.is_clean(), "findings was: {:?}", report.findings);
assert_eq!(report.findings.len(), 0);
assert_eq!(report.high_watermark, 3);
assert_eq!(report.entries_scanned, 3);
assert_eq!(report.status, RunStatus::Ended);
}
#[rstest]
fn empty_run_reports_no_findings(mut open_backend: MemoryBackend) {
open_backend.seal(RunStatus::Ended).expect("seal");
let report = verifier_for(open_backend).verify().expect("verify");
assert!(report.is_clean(), "findings was: {:?}", report.findings);
assert_eq!(report.entries_scanned, 0);
}
#[rstest]
fn hash_mismatch_surfaces_per_seq(mut open_backend: MemoryBackend) {
open_backend
.append_batch(&[append_with(1, 10, Vec::new())])
.expect("append");
let mut tampered = build_entry(2, Headers::empty(), 11);
tampered.payload = Bytes::from_static(b"\xFF");
open_backend
.append_batch(&[AppendEntry::without_indices(tampered)])
.expect("append");
open_backend
.append_batch(&[append_with(3, 12, Vec::new())])
.expect("append");
let report = verifier_for(open_backend).verify().expect("verify");
assert!(
report
.findings
.iter()
.any(|f| matches!(f, VerifyFinding::HashMismatch { seq: 2 })),
"findings was: {:?}",
report.findings,
);
assert_eq!(report.entries_scanned, 3);
assert_eq!(report.high_watermark, 3);
}
#[rstest]
fn multiple_hash_mismatches_all_surface(mut open_backend: MemoryBackend) {
for seq in 1..=4u64 {
let mut entry = build_entry(seq, Headers::empty(), 10 + seq);
if seq == 2 || seq == 4 {
entry.payload = Bytes::from_static(b"\xFF");
}
open_backend
.append_batch(&[AppendEntry::without_indices(entry)])
.expect("append");
}
let report = verifier_for(open_backend).verify().expect("verify");
let mismatch_seqs: Vec<u64> = report
.findings
.iter()
.filter_map(|f| match f {
VerifyFinding::HashMismatch { seq } => Some(*seq),
_ => None,
})
.collect();
assert_eq!(mismatch_seqs, vec![2, 4]);
}
#[rstest]
fn client_order_id_index_clean_when_target_resolves(mut open_backend: MemoryBackend) {
open_backend
.append_batch(&[AppendEntry::new(
build_entry(1, Headers::empty(), 10),
vec![IndexKey::new(IndexKind::ClientOrderId, "O-1".to_string())],
)])
.expect("append");
open_backend.seal(RunStatus::Ended).expect("seal");
let report = verifier_for(open_backend).verify().expect("verify");
assert!(report.is_clean(), "findings was: {:?}", report.findings);
}
#[rstest]
#[case::client_order_id(IndexKind::ClientOrderId)]
#[case::venue_order_id(IndexKind::VenueOrderId)]
fn entity_index_target_corrupted_drift(
mut open_backend: MemoryBackend,
#[case] kind: IndexKind,
) {
let mut tampered = build_entry(1, Headers::empty(), 10);
tampered.payload = Bytes::from_static(b"\xFF");
open_backend
.append_batch(&[AppendEntry::new(
tampered,
vec![IndexKey::new(kind, "K-1".to_string())],
)])
.expect("append");
let report = verifier_for(open_backend).verify().expect("verify");
assert!(
report.findings.iter().any(|f| matches!(
f,
VerifyFinding::IndexDrift {
kind: drift_kind,
drift: IndexDrift::TargetCorrupted { stored_seq: 1 },
..
} if *drift_kind == kind
)),
"findings was: {:?}",
report.findings,
);
}
fn find_manifest_mismatch(findings: &[VerifyFinding], target: ManifestField) -> &str {
findings
.iter()
.find_map(|f| match f {
VerifyFinding::ManifestMismatch { kind, reason } if *kind == target => {
Some(reason.as_str())
}
_ => None,
})
.unwrap_or_else(|| {
panic!("expected ManifestMismatch({target:?}), findings was: {findings:?}")
})
}
#[rstest]
fn manifest_high_watermark_drift() {
let mut inner = MemoryBackend::new();
inner.open_run(manifest("run-hwm")).expect("open run");
inner
.append_batch(&[append_with(1, 10, Vec::new())])
.expect("append");
inner.seal(RunStatus::Ended).expect("seal");
let mut stale = inner.manifest().expect("manifest");
stale.high_watermark = 99;
let backend = ManifestOverrideBackend::new(inner, stale);
let report = Verifier::new(Box::new(backend)).verify().expect("verify");
let reason = find_manifest_mismatch(&report.findings, ManifestField::HighWatermark);
assert!(reason.contains("99"), "reason was: {reason}");
assert!(reason.contains('1'), "reason was: {reason}");
}
#[rstest]
fn manifest_end_ts_init_drift_when_sealed() {
let mut inner = MemoryBackend::new();
inner.open_run(manifest("run-end-ts")).expect("open run");
inner
.append_batch(&[
append_with(1, 10, Vec::new()),
append_with(2, 25, Vec::new()),
])
.expect("append");
inner.seal(RunStatus::Ended).expect("seal");
let mut drifted = inner.manifest().expect("manifest");
drifted.end_ts_init = Some(UnixNanos::from(99));
let backend = ManifestOverrideBackend::new(inner, drifted);
let report = Verifier::new(Box::new(backend)).verify().expect("verify");
let reason = find_manifest_mismatch(&report.findings, ManifestField::EndTsInit);
assert!(reason.contains("99"), "reason was: {reason}");
assert!(reason.contains("25"), "reason was: {reason}");
}
#[rstest]
fn manifest_end_ts_init_missing_when_sealed_with_entries() {
let mut inner = MemoryBackend::new();
inner
.open_run(manifest("run-end-ts-missing"))
.expect("open run");
inner
.append_batch(&[append_with(1, 42, Vec::new())])
.expect("append");
inner.seal(RunStatus::Ended).expect("seal");
let mut drifted = inner.manifest().expect("manifest");
drifted.end_ts_init = None;
let backend = ManifestOverrideBackend::new(inner, drifted);
let report = Verifier::new(Box::new(backend)).verify().expect("verify");
let reason = find_manifest_mismatch(&report.findings, ManifestField::EndTsInit);
assert!(reason.contains("missing"), "reason was: {reason}");
assert!(reason.contains("42"), "reason was: {reason}");
}
#[rstest]
fn manifest_end_ts_init_set_on_sealed_empty_run() {
let mut inner = MemoryBackend::new();
inner
.open_run(manifest("run-end-ts-empty"))
.expect("open run");
inner.seal(RunStatus::Ended).expect("seal");
let mut drifted = inner.manifest().expect("manifest");
drifted.end_ts_init = Some(UnixNanos::from(77));
let backend = ManifestOverrideBackend::new(inner, drifted);
let report = Verifier::new(Box::new(backend)).verify().expect("verify");
let reason = find_manifest_mismatch(&report.findings, ManifestField::EndTsInit);
assert!(reason.contains("77"), "reason was: {reason}");
assert!(reason.contains("empty"), "reason was: {reason}");
}
#[rstest]
fn manifest_start_ts_init_drift() {
let mut inner = MemoryBackend::new();
inner.open_run(manifest("run-start-ts")).expect("open run");
inner
.append_batch(&[
append_with(1, 10, Vec::new()),
append_with(2, 25, Vec::new()),
])
.expect("append");
inner.seal(RunStatus::Ended).expect("seal");
let mut drifted = inner.manifest().expect("manifest");
drifted.start_ts_init = UnixNanos::from(50);
let backend = ManifestOverrideBackend::new(inner, drifted);
let report = Verifier::new(Box::new(backend)).verify().expect("verify");
let reason = find_manifest_mismatch(&report.findings, ManifestField::StartTsInit);
assert!(reason.contains("50"), "reason was: {reason}");
assert!(reason.contains("10"), "reason was: {reason}");
}
#[rstest]
fn trailing_gap_surfaces_when_last_seqs_missing() {
let mut inner = MemoryBackend::new();
inner
.open_run(manifest("run-trailing-gap"))
.expect("open run");
inner
.append_batch(&[
append_with(1, 10, Vec::new()),
append_with(2, 11, Vec::new()),
append_with(3, 12, Vec::new()),
])
.expect("append");
inner.seal(RunStatus::Ended).expect("seal");
let mut drifted = inner.manifest().expect("manifest");
drifted.high_watermark = 5;
let backend = ManifestOverrideBackend::new(inner, drifted).with_high_watermark(5);
let report = Verifier::new(Box::new(backend)).verify().expect("verify");
let gaps: Vec<GapRange> = report
.findings
.iter()
.filter_map(|f| match f {
VerifyFinding::Gap { range } => Some(*range),
_ => None,
})
.collect();
assert_eq!(gaps, vec![GapRange { from: 4, to: 5 }]);
assert_eq!(report.entries_scanned, 3);
assert_eq!(report.high_watermark, 5);
}
struct SeqRewriteBackend {
inner: MemoryBackend,
target_key: u64,
substitute: EventStoreEntry,
}
impl EventStore for SeqRewriteBackend {
fn open_run(&mut self, m: RunManifest) -> Result<(), EventStoreError> {
self.inner.open_run(m)
}
fn append_batch(&mut self, e: &[AppendEntry]) -> Result<u64, EventStoreError> {
self.inner.append_batch(e)
}
fn scan_range(
&self,
from: u64,
to: u64,
direction: ScanDirection,
) -> Result<Vec<EventStoreEntry>, EventStoreError> {
self.inner.scan_range(from, to, direction)
}
fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
if seq == self.target_key {
return Ok(Some(self.substitute.clone()));
}
self.inner.scan_seq(seq)
}
fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
self.inner.lookup(kind, key)
}
fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
self.inner.iter_index_keys(kind)
}
fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
self.inner.seal(status)
}
fn manifest(&self) -> Result<RunManifest, EventStoreError> {
self.inner.manifest()
}
fn high_watermark(&self) -> Result<u64, EventStoreError> {
self.inner.high_watermark()
}
}
#[rstest]
fn seq_mismatch_surfaces_when_row_value_disagrees_with_key() {
let mut inner = MemoryBackend::new();
inner
.open_run(manifest("run-seq-mismatch"))
.expect("open run");
inner
.append_batch(&[
append_with(1, 10, Vec::new()),
append_with(2, 11, Vec::new()),
append_with(3, 12, Vec::new()),
])
.expect("append");
inner.seal(RunStatus::Ended).expect("seal");
let substitute = build_entry(99, Headers::empty(), 11);
let backend = SeqRewriteBackend {
inner,
target_key: 2,
substitute,
};
let report = Verifier::new(Box::new(backend)).verify().expect("verify");
assert!(
report.findings.iter().any(|f| matches!(
f,
VerifyFinding::SeqMismatch {
table_key: 2,
embedded_seq: 99,
}
)),
"findings was: {:?}",
report.findings,
);
}
#[rstest]
fn seq_mismatch_marks_target_corrupted_for_dependent_indices() {
let mut inner = MemoryBackend::new();
inner
.open_run(manifest("run-seq-mismatch-idx"))
.expect("open run");
inner
.append_batch(&[
append_with(1, 10, Vec::new()),
AppendEntry::new(
build_entry(2, Headers::empty(), 11),
vec![IndexKey::new(IndexKind::ClientOrderId, "O-1".to_string())],
),
])
.expect("append");
inner.seal(RunStatus::Ended).expect("seal");
let substitute = build_entry(99, Headers::empty(), 11);
let backend = SeqRewriteBackend {
inner,
target_key: 2,
substitute,
};
let report = Verifier::new(Box::new(backend)).verify().expect("verify");
assert!(
report.findings.iter().any(|f| matches!(
f,
VerifyFinding::IndexDrift {
kind: IndexKind::ClientOrderId,
drift: IndexDrift::TargetCorrupted { stored_seq: 2 },
..
}
)),
"findings was: {:?}",
report.findings,
);
}
#[rstest]
fn verify_propagates_no_run_open_as_error() {
let backend = MemoryBackend::new();
let verifier = Verifier::new(Box::new(backend));
let err = verifier.verify().expect_err("must fail");
match err {
VerifyError::Backend(EventStoreError::Backend(msg)) => {
assert!(msg.contains("no run open"), "msg was: {msg}");
}
VerifyError::Backend(other) => {
panic!("expected Backend(no run open), was {other:?}")
}
}
}
}