1use std::{collections::BTreeSet, fmt::Debug, path::Path};
41
42use crate::{
43 backend::{EventStore, IndexKind, RedbBackend},
44 entry::EventStoreEntry,
45 error::EventStoreError,
46 manifest::{RunId, RunManifest, RunStatus},
47};
48
49pub struct Verifier {
56 backend: Box<dyn EventStore>,
57}
58
59impl Debug for Verifier {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 f.debug_struct(stringify!(Verifier)).finish_non_exhaustive()
62 }
63}
64
65impl Verifier {
66 #[must_use]
68 pub fn new(backend: Box<dyn EventStore>) -> Self {
69 Self { backend }
70 }
71
72 pub fn open_redb(
84 base_dir: impl AsRef<Path>,
85 instance_id: &str,
86 run_id: &str,
87 ) -> Result<Self, VerifyError> {
88 let backend =
89 RedbBackend::open_sealed(base_dir.as_ref().to_path_buf(), instance_id, run_id)?;
90 Ok(Self {
91 backend: Box::new(backend),
92 })
93 }
94
95 pub fn open_redb_file(path: impl AsRef<Path>) -> Result<Self, VerifyError> {
106 let backend = RedbBackend::open_sealed_file(path.as_ref().to_path_buf())?;
107 Ok(Self {
108 backend: Box::new(backend),
109 })
110 }
111
112 #[must_use]
114 pub fn backend(&self) -> &dyn EventStore {
115 self.backend.as_ref()
116 }
117
118 pub fn verify(&self) -> Result<VerifyReport, VerifyError> {
131 let manifest = self.backend.manifest()?;
132 let high_watermark = self.backend.high_watermark()?;
133
134 let mut findings = Vec::new();
135 let scan = self.scan_entries(high_watermark, &mut findings)?;
136
137 self.cross_check_indices(&scan, &mut findings)?;
138 validate_manifest(&manifest, high_watermark, &scan, &mut findings);
139
140 Ok(VerifyReport {
141 run_id: manifest.run_id.clone(),
142 status: manifest.status,
143 high_watermark,
144 entries_scanned: scan.scanned,
145 findings,
146 })
147 }
148
149 fn scan_entries(
150 &self,
151 high_watermark: u64,
152 findings: &mut Vec<VerifyFinding>,
153 ) -> Result<EntryScan, VerifyError> {
154 let mut scanned: u64 = 0;
155 let mut min_ts: Option<u64> = None;
156 let mut max_ts: Option<u64> = None;
157 let mut clean_seqs: BTreeSet<u64> = BTreeSet::new();
158 let mut corrupted_seqs: BTreeSet<u64> = BTreeSet::new();
159 let mut gap_cursor: Option<u64> = None;
160
161 for seq in 1..=high_watermark {
162 match self.backend.scan_seq(seq) {
163 Ok(Some(entry)) => {
164 flush_pending_gap(seq, &mut gap_cursor, findings);
165
166 if entry.seq != seq {
173 findings.push(VerifyFinding::SeqMismatch {
174 table_key: seq,
175 embedded_seq: entry.seq,
176 });
177 corrupted_seqs.insert(seq);
178 scanned += 1;
179 continue;
180 }
181 record_entry(&entry, &mut min_ts, &mut max_ts);
182 clean_seqs.insert(seq);
183 scanned += 1;
184 }
185 Ok(None) | Err(EventStoreError::Gap { .. }) => {
186 extend_pending_gap(seq, &mut gap_cursor);
187 }
188 Err(EventStoreError::HashMismatch { seq: bad }) => {
189 flush_pending_gap(seq, &mut gap_cursor, findings);
190 findings.push(VerifyFinding::HashMismatch { seq: bad });
191 corrupted_seqs.insert(seq);
192 scanned += 1;
193 }
194 Err(other) => return Err(VerifyError::Backend(other)),
195 }
196 }
197
198 flush_pending_gap(high_watermark + 1, &mut gap_cursor, findings);
199
200 Ok(EntryScan {
201 scanned,
202 min_ts,
203 max_ts,
204 clean_seqs,
205 corrupted_seqs,
206 })
207 }
208
209 fn cross_check_indices(
210 &self,
211 scan: &EntryScan,
212 findings: &mut Vec<VerifyFinding>,
213 ) -> Result<(), VerifyError> {
214 for kind in [IndexKind::ClientOrderId, IndexKind::VenueOrderId] {
215 for (key, stored_seq) in self.backend.iter_index_keys(kind)? {
216 let drift = classify_target(stored_seq, scan);
217 if let Some(drift) = drift {
218 findings.push(VerifyFinding::IndexDrift { kind, key, drift });
219 }
220 }
221 }
222
223 Ok(())
224 }
225}
226
227#[derive(Debug)]
228struct EntryScan {
229 scanned: u64,
230 min_ts: Option<u64>,
231 max_ts: Option<u64>,
232 clean_seqs: BTreeSet<u64>,
233 corrupted_seqs: BTreeSet<u64>,
234}
235
236fn record_entry(entry: &EventStoreEntry, min_ts: &mut Option<u64>, max_ts: &mut Option<u64>) {
237 let ts = entry.ts_init.as_u64();
238 *min_ts = Some(min_ts.map_or(ts, |cur| cur.min(ts)));
239 *max_ts = Some(max_ts.map_or(ts, |cur| cur.max(ts)));
240}
241
242fn extend_pending_gap(seq: u64, gap_cursor: &mut Option<u64>) {
243 if gap_cursor.is_none() {
244 *gap_cursor = Some(seq);
245 }
246}
247
248fn flush_pending_gap(
249 next_seq: u64,
250 gap_cursor: &mut Option<u64>,
251 findings: &mut Vec<VerifyFinding>,
252) {
253 if let Some(start) = gap_cursor.take() {
254 findings.push(VerifyFinding::Gap {
255 range: GapRange {
256 from: start,
257 to: next_seq - 1,
258 },
259 });
260 }
261}
262
263fn classify_target(stored_seq: u64, scan: &EntryScan) -> Option<IndexDrift> {
264 if scan.clean_seqs.contains(&stored_seq) {
265 None
266 } else if scan.corrupted_seqs.contains(&stored_seq) {
267 Some(IndexDrift::TargetCorrupted { stored_seq })
268 } else {
269 Some(IndexDrift::DanglingTarget { stored_seq })
270 }
271}
272
273fn validate_manifest(
274 manifest: &RunManifest,
275 high_watermark: u64,
276 scan: &EntryScan,
277 findings: &mut Vec<VerifyFinding>,
278) {
279 if manifest.high_watermark != high_watermark {
280 findings.push(VerifyFinding::ManifestMismatch {
281 kind: ManifestField::HighWatermark,
282 reason: format!(
283 "manifest high_watermark {} disagrees with durable high_watermark {high_watermark}",
284 manifest.high_watermark,
285 ),
286 });
287 }
288
289 if let Some(min_ts) = scan.min_ts
290 && manifest.start_ts_init.as_u64() > min_ts
291 {
292 findings.push(VerifyFinding::ManifestMismatch {
293 kind: ManifestField::StartTsInit,
294 reason: format!(
295 "manifest start_ts_init {} sits above earliest entry ts_init {min_ts}",
296 manifest.start_ts_init.as_u64(),
297 ),
298 });
299 }
300
301 if manifest.is_sealed() {
302 match (manifest.end_ts_init.map(|t| t.as_u64()), scan.max_ts) {
303 (Some(stored), Some(observed)) if stored != observed => {
304 findings.push(VerifyFinding::ManifestMismatch {
305 kind: ManifestField::EndTsInit,
306 reason: format!(
307 "manifest end_ts_init {stored} disagrees with last observed ts_init {observed}",
308 ),
309 });
310 }
311 (None, Some(observed)) => findings.push(VerifyFinding::ManifestMismatch {
312 kind: ManifestField::EndTsInit,
313 reason: format!(
314 "sealed manifest is missing end_ts_init while entries up to ts_init {observed} exist",
315 ),
316 }),
317 (Some(stored), None) => findings.push(VerifyFinding::ManifestMismatch {
318 kind: ManifestField::EndTsInit,
319 reason: format!(
320 "sealed manifest carries end_ts_init {stored} despite empty entry table",
321 ),
322 }),
323 _ => {}
324 }
325 }
326}
327
328#[derive(Debug, Clone, PartialEq, Eq)]
334pub struct VerifyReport {
335 pub run_id: RunId,
337 pub status: RunStatus,
339 pub high_watermark: u64,
341 pub entries_scanned: u64,
343 pub findings: Vec<VerifyFinding>,
345}
346
347impl VerifyReport {
348 #[must_use]
350 pub fn is_clean(&self) -> bool {
351 self.findings.is_empty()
352 }
353}
354
355#[derive(Debug, Clone, PartialEq, Eq)]
357pub enum VerifyFinding {
358 HashMismatch {
360 seq: u64,
362 },
363 Gap {
365 range: GapRange,
367 },
368 SeqMismatch {
376 table_key: u64,
378 embedded_seq: u64,
380 },
381 IndexDrift {
384 kind: IndexKind,
386 key: String,
388 drift: IndexDrift,
390 },
391 ManifestMismatch {
394 kind: ManifestField,
396 reason: String,
398 },
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
403pub struct GapRange {
404 pub from: u64,
406 pub to: u64,
408}
409
410#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
417pub enum IndexDrift {
418 DanglingTarget {
420 stored_seq: u64,
422 },
423 TargetCorrupted {
425 stored_seq: u64,
427 },
428}
429
430#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
432pub enum ManifestField {
433 HighWatermark,
435 StartTsInit,
437 EndTsInit,
440}
441
442#[derive(Debug, thiserror::Error)]
448pub enum VerifyError {
449 #[error("backend access failed: {0}")]
451 Backend(#[from] EventStoreError),
452}
453
454#[cfg(test)]
455mod tests {
456 use bytes::Bytes;
457 use indexmap::IndexMap;
458 use nautilus_core::UnixNanos;
459 use rstest::{fixture, rstest};
460 use ustr::Ustr;
461
462 use super::*;
463 use crate::{
464 backend::{AppendEntry, IndexKey, MemoryBackend, ScanDirection},
465 compute_entry_hash,
466 entry::Topic,
467 headers::Headers,
468 manifest::{RegisteredComponents, RunManifest, RunStatus},
469 };
470
471 fn manifest(run_id: &str) -> RunManifest {
472 RunManifest {
473 run_id: run_id.to_string(),
474 parent_run_id: None,
475 instance_id: "trader-001".to_string(),
476 binary_hash: "deadbeef".to_string(),
477 schema_version: 1,
478 crate_versions: "feedface".to_string(),
479 feature_flags: Vec::new(),
480 adapter_versions: IndexMap::new(),
481 config_hash: "cafebabe".to_string(),
482 registered_components: RegisteredComponents::default(),
483 seed: None,
484 start_ts_init: UnixNanos::from(0),
485 end_ts_init: None,
486 high_watermark: 0,
487 status: RunStatus::Running,
488 }
489 }
490
491 fn build_entry(seq: u64, headers: Headers, ts_init: u64) -> EventStoreEntry {
492 let topic: Topic = "exec.command.SubmitOrder".into();
493 let payload_type = Ustr::from("SubmitOrder");
494 let payload = Bytes::from_static(b"\x01\x02\x03\x04");
495 let ts_publish = UnixNanos::from(ts_init + 1);
496 let ts_init = UnixNanos::from(ts_init);
497 let hash = compute_entry_hash(
498 seq,
499 ts_init,
500 ts_publish,
501 topic.as_ref(),
502 payload_type.as_str(),
503 &payload,
504 &headers,
505 );
506
507 EventStoreEntry::new(
508 hash,
509 seq,
510 headers,
511 topic,
512 payload_type,
513 payload,
514 ts_init,
515 ts_publish,
516 )
517 }
518
519 fn append_with(seq: u64, ts_init: u64, index_keys: Vec<IndexKey>) -> AppendEntry {
520 AppendEntry::new(build_entry(seq, Headers::empty(), ts_init), index_keys)
521 }
522
523 struct ManifestOverrideBackend {
528 inner: MemoryBackend,
529 manifest_override: RunManifest,
530 high_watermark_override: Option<u64>,
531 }
532
533 impl ManifestOverrideBackend {
534 fn new(inner: MemoryBackend, manifest_override: RunManifest) -> Self {
535 Self {
536 inner,
537 manifest_override,
538 high_watermark_override: None,
539 }
540 }
541
542 fn with_high_watermark(mut self, hwm: u64) -> Self {
543 self.high_watermark_override = Some(hwm);
544 self
545 }
546 }
547
548 impl EventStore for ManifestOverrideBackend {
549 fn open_run(&mut self, m: RunManifest) -> Result<(), EventStoreError> {
550 self.inner.open_run(m)
551 }
552
553 fn append_batch(&mut self, entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
554 self.inner.append_batch(entries)
555 }
556
557 fn scan_range(
558 &self,
559 from: u64,
560 to: u64,
561 direction: ScanDirection,
562 ) -> Result<Vec<EventStoreEntry>, EventStoreError> {
563 self.inner.scan_range(from, to, direction)
564 }
565
566 fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
567 self.inner.scan_seq(seq)
568 }
569
570 fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
571 self.inner.lookup(kind, key)
572 }
573
574 fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
575 self.inner.iter_index_keys(kind)
576 }
577
578 fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
579 self.inner.seal(status)
580 }
581
582 fn manifest(&self) -> Result<RunManifest, EventStoreError> {
583 Ok(self.manifest_override.clone())
584 }
585
586 fn high_watermark(&self) -> Result<u64, EventStoreError> {
587 if let Some(hwm) = self.high_watermark_override {
588 return Ok(hwm);
589 }
590 self.inner.high_watermark()
591 }
592 }
593
594 #[fixture]
595 fn open_backend() -> MemoryBackend {
596 let mut backend = MemoryBackend::new();
597 backend
598 .open_run(manifest("1700000000-aaaa1111"))
599 .expect("open run");
600 backend
601 }
602
603 fn verifier_for(backend: MemoryBackend) -> Verifier {
604 Verifier::new(Box::new(backend))
605 }
606
607 #[rstest]
608 fn clean_run_reports_no_findings(mut open_backend: MemoryBackend) {
609 open_backend
610 .append_batch(&[
611 append_with(1, 10, Vec::new()),
612 append_with(2, 11, Vec::new()),
613 append_with(3, 12, Vec::new()),
614 ])
615 .expect("append");
616 open_backend.seal(RunStatus::Ended).expect("seal");
617
618 let report = verifier_for(open_backend).verify().expect("verify");
619
620 assert!(report.is_clean(), "findings was: {:?}", report.findings);
624 assert_eq!(report.findings.len(), 0);
625 assert_eq!(report.high_watermark, 3);
626 assert_eq!(report.entries_scanned, 3);
627 assert_eq!(report.status, RunStatus::Ended);
628 }
629
630 #[rstest]
631 fn empty_run_reports_no_findings(mut open_backend: MemoryBackend) {
632 open_backend.seal(RunStatus::Ended).expect("seal");
633
634 let report = verifier_for(open_backend).verify().expect("verify");
635
636 assert!(report.is_clean(), "findings was: {:?}", report.findings);
637 assert_eq!(report.entries_scanned, 0);
638 }
639
640 #[rstest]
641 fn hash_mismatch_surfaces_per_seq(mut open_backend: MemoryBackend) {
642 open_backend
643 .append_batch(&[append_with(1, 10, Vec::new())])
644 .expect("append");
645 let mut tampered = build_entry(2, Headers::empty(), 11);
646 tampered.payload = Bytes::from_static(b"\xFF");
647 open_backend
648 .append_batch(&[AppendEntry::without_indices(tampered)])
649 .expect("append");
650 open_backend
651 .append_batch(&[append_with(3, 12, Vec::new())])
652 .expect("append");
653
654 let report = verifier_for(open_backend).verify().expect("verify");
655
656 assert!(
657 report
658 .findings
659 .iter()
660 .any(|f| matches!(f, VerifyFinding::HashMismatch { seq: 2 })),
661 "findings was: {:?}",
662 report.findings,
663 );
664 assert_eq!(report.entries_scanned, 3);
665 assert_eq!(report.high_watermark, 3);
666 }
667
668 #[rstest]
669 fn multiple_hash_mismatches_all_surface(mut open_backend: MemoryBackend) {
670 for seq in 1..=4u64 {
674 let mut entry = build_entry(seq, Headers::empty(), 10 + seq);
675 if seq == 2 || seq == 4 {
676 entry.payload = Bytes::from_static(b"\xFF");
677 }
678 open_backend
679 .append_batch(&[AppendEntry::without_indices(entry)])
680 .expect("append");
681 }
682
683 let report = verifier_for(open_backend).verify().expect("verify");
684
685 let mismatch_seqs: Vec<u64> = report
686 .findings
687 .iter()
688 .filter_map(|f| match f {
689 VerifyFinding::HashMismatch { seq } => Some(*seq),
690 _ => None,
691 })
692 .collect();
693 assert_eq!(mismatch_seqs, vec![2, 4]);
694 }
695
696 #[rstest]
697 fn client_order_id_index_clean_when_target_resolves(mut open_backend: MemoryBackend) {
698 open_backend
699 .append_batch(&[AppendEntry::new(
700 build_entry(1, Headers::empty(), 10),
701 vec![IndexKey::new(IndexKind::ClientOrderId, "O-1".to_string())],
702 )])
703 .expect("append");
704 open_backend.seal(RunStatus::Ended).expect("seal");
705
706 let report = verifier_for(open_backend).verify().expect("verify");
707
708 assert!(report.is_clean(), "findings was: {:?}", report.findings);
709 }
710
711 #[rstest]
712 #[case::client_order_id(IndexKind::ClientOrderId)]
713 #[case::venue_order_id(IndexKind::VenueOrderId)]
714 fn entity_index_target_corrupted_drift(
715 mut open_backend: MemoryBackend,
716 #[case] kind: IndexKind,
717 ) {
718 let mut tampered = build_entry(1, Headers::empty(), 10);
723 tampered.payload = Bytes::from_static(b"\xFF");
724 open_backend
725 .append_batch(&[AppendEntry::new(
726 tampered,
727 vec![IndexKey::new(kind, "K-1".to_string())],
728 )])
729 .expect("append");
730
731 let report = verifier_for(open_backend).verify().expect("verify");
732
733 assert!(
734 report.findings.iter().any(|f| matches!(
735 f,
736 VerifyFinding::IndexDrift {
737 kind: drift_kind,
738 drift: IndexDrift::TargetCorrupted { stored_seq: 1 },
739 ..
740 } if *drift_kind == kind
741 )),
742 "findings was: {:?}",
743 report.findings,
744 );
745 }
746
747 fn find_manifest_mismatch(findings: &[VerifyFinding], target: ManifestField) -> &str {
748 findings
749 .iter()
750 .find_map(|f| match f {
751 VerifyFinding::ManifestMismatch { kind, reason } if *kind == target => {
752 Some(reason.as_str())
753 }
754 _ => None,
755 })
756 .unwrap_or_else(|| {
757 panic!("expected ManifestMismatch({target:?}), findings was: {findings:?}")
758 })
759 }
760
761 #[rstest]
762 fn manifest_high_watermark_drift() {
763 let mut inner = MemoryBackend::new();
767 inner.open_run(manifest("run-hwm")).expect("open run");
768 inner
769 .append_batch(&[append_with(1, 10, Vec::new())])
770 .expect("append");
771 inner.seal(RunStatus::Ended).expect("seal");
772
773 let mut stale = inner.manifest().expect("manifest");
774 stale.high_watermark = 99;
775 let backend = ManifestOverrideBackend::new(inner, stale);
776
777 let report = Verifier::new(Box::new(backend)).verify().expect("verify");
778 let reason = find_manifest_mismatch(&report.findings, ManifestField::HighWatermark);
779
780 assert!(reason.contains("99"), "reason was: {reason}");
781 assert!(reason.contains('1'), "reason was: {reason}");
782 }
783
784 #[rstest]
785 fn manifest_end_ts_init_drift_when_sealed() {
786 let mut inner = MemoryBackend::new();
790 inner.open_run(manifest("run-end-ts")).expect("open run");
791 inner
792 .append_batch(&[
793 append_with(1, 10, Vec::new()),
794 append_with(2, 25, Vec::new()),
795 ])
796 .expect("append");
797 inner.seal(RunStatus::Ended).expect("seal");
798
799 let mut drifted = inner.manifest().expect("manifest");
800 drifted.end_ts_init = Some(UnixNanos::from(99));
801 let backend = ManifestOverrideBackend::new(inner, drifted);
802
803 let report = Verifier::new(Box::new(backend)).verify().expect("verify");
804 let reason = find_manifest_mismatch(&report.findings, ManifestField::EndTsInit);
805
806 assert!(reason.contains("99"), "reason was: {reason}");
807 assert!(reason.contains("25"), "reason was: {reason}");
808 }
809
810 #[rstest]
811 fn manifest_end_ts_init_missing_when_sealed_with_entries() {
812 let mut inner = MemoryBackend::new();
816 inner
817 .open_run(manifest("run-end-ts-missing"))
818 .expect("open run");
819 inner
820 .append_batch(&[append_with(1, 42, Vec::new())])
821 .expect("append");
822 inner.seal(RunStatus::Ended).expect("seal");
823
824 let mut drifted = inner.manifest().expect("manifest");
825 drifted.end_ts_init = None;
826 let backend = ManifestOverrideBackend::new(inner, drifted);
827
828 let report = Verifier::new(Box::new(backend)).verify().expect("verify");
829 let reason = find_manifest_mismatch(&report.findings, ManifestField::EndTsInit);
830
831 assert!(reason.contains("missing"), "reason was: {reason}");
832 assert!(reason.contains("42"), "reason was: {reason}");
833 }
834
835 #[rstest]
836 fn manifest_end_ts_init_set_on_sealed_empty_run() {
837 let mut inner = MemoryBackend::new();
841 inner
842 .open_run(manifest("run-end-ts-empty"))
843 .expect("open run");
844 inner.seal(RunStatus::Ended).expect("seal");
845
846 let mut drifted = inner.manifest().expect("manifest");
847 drifted.end_ts_init = Some(UnixNanos::from(77));
848 let backend = ManifestOverrideBackend::new(inner, drifted);
849
850 let report = Verifier::new(Box::new(backend)).verify().expect("verify");
851 let reason = find_manifest_mismatch(&report.findings, ManifestField::EndTsInit);
852
853 assert!(reason.contains("77"), "reason was: {reason}");
854 assert!(reason.contains("empty"), "reason was: {reason}");
855 }
856
857 #[rstest]
858 fn manifest_start_ts_init_drift() {
859 let mut inner = MemoryBackend::new();
863 inner.open_run(manifest("run-start-ts")).expect("open run");
864 inner
865 .append_batch(&[
866 append_with(1, 10, Vec::new()),
867 append_with(2, 25, Vec::new()),
868 ])
869 .expect("append");
870 inner.seal(RunStatus::Ended).expect("seal");
871
872 let mut drifted = inner.manifest().expect("manifest");
873 drifted.start_ts_init = UnixNanos::from(50);
874 let backend = ManifestOverrideBackend::new(inner, drifted);
875
876 let report = Verifier::new(Box::new(backend)).verify().expect("verify");
877 let reason = find_manifest_mismatch(&report.findings, ManifestField::StartTsInit);
878
879 assert!(reason.contains("50"), "reason was: {reason}");
880 assert!(reason.contains("10"), "reason was: {reason}");
881 }
882
883 #[rstest]
884 fn trailing_gap_surfaces_when_last_seqs_missing() {
885 let mut inner = MemoryBackend::new();
891 inner
892 .open_run(manifest("run-trailing-gap"))
893 .expect("open run");
894 inner
895 .append_batch(&[
896 append_with(1, 10, Vec::new()),
897 append_with(2, 11, Vec::new()),
898 append_with(3, 12, Vec::new()),
899 ])
900 .expect("append");
901 inner.seal(RunStatus::Ended).expect("seal");
902
903 let mut drifted = inner.manifest().expect("manifest");
904 drifted.high_watermark = 5;
905 let backend = ManifestOverrideBackend::new(inner, drifted).with_high_watermark(5);
908
909 let report = Verifier::new(Box::new(backend)).verify().expect("verify");
910
911 let gaps: Vec<GapRange> = report
912 .findings
913 .iter()
914 .filter_map(|f| match f {
915 VerifyFinding::Gap { range } => Some(*range),
916 _ => None,
917 })
918 .collect();
919 assert_eq!(gaps, vec![GapRange { from: 4, to: 5 }]);
920 assert_eq!(report.entries_scanned, 3);
921 assert_eq!(report.high_watermark, 5);
922 }
923
924 struct SeqRewriteBackend {
929 inner: MemoryBackend,
930 target_key: u64,
931 substitute: EventStoreEntry,
932 }
933
934 impl EventStore for SeqRewriteBackend {
935 fn open_run(&mut self, m: RunManifest) -> Result<(), EventStoreError> {
936 self.inner.open_run(m)
937 }
938 fn append_batch(&mut self, e: &[AppendEntry]) -> Result<u64, EventStoreError> {
939 self.inner.append_batch(e)
940 }
941 fn scan_range(
942 &self,
943 from: u64,
944 to: u64,
945 direction: ScanDirection,
946 ) -> Result<Vec<EventStoreEntry>, EventStoreError> {
947 self.inner.scan_range(from, to, direction)
948 }
949 fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
950 if seq == self.target_key {
951 return Ok(Some(self.substitute.clone()));
952 }
953 self.inner.scan_seq(seq)
954 }
955 fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
956 self.inner.lookup(kind, key)
957 }
958 fn iter_index_keys(&self, kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
959 self.inner.iter_index_keys(kind)
960 }
961 fn seal(&mut self, status: RunStatus) -> Result<(), EventStoreError> {
962 self.inner.seal(status)
963 }
964 fn manifest(&self) -> Result<RunManifest, EventStoreError> {
965 self.inner.manifest()
966 }
967 fn high_watermark(&self) -> Result<u64, EventStoreError> {
968 self.inner.high_watermark()
969 }
970 }
971
972 #[rstest]
973 fn seq_mismatch_surfaces_when_row_value_disagrees_with_key() {
974 let mut inner = MemoryBackend::new();
980 inner
981 .open_run(manifest("run-seq-mismatch"))
982 .expect("open run");
983 inner
984 .append_batch(&[
985 append_with(1, 10, Vec::new()),
986 append_with(2, 11, Vec::new()),
987 append_with(3, 12, Vec::new()),
988 ])
989 .expect("append");
990 inner.seal(RunStatus::Ended).expect("seal");
991
992 let substitute = build_entry(99, Headers::empty(), 11);
993 let backend = SeqRewriteBackend {
994 inner,
995 target_key: 2,
996 substitute,
997 };
998
999 let report = Verifier::new(Box::new(backend)).verify().expect("verify");
1000
1001 assert!(
1002 report.findings.iter().any(|f| matches!(
1003 f,
1004 VerifyFinding::SeqMismatch {
1005 table_key: 2,
1006 embedded_seq: 99,
1007 }
1008 )),
1009 "findings was: {:?}",
1010 report.findings,
1011 );
1012 }
1013
1014 #[rstest]
1015 fn seq_mismatch_marks_target_corrupted_for_dependent_indices() {
1016 let mut inner = MemoryBackend::new();
1021 inner
1022 .open_run(manifest("run-seq-mismatch-idx"))
1023 .expect("open run");
1024 inner
1025 .append_batch(&[
1026 append_with(1, 10, Vec::new()),
1027 AppendEntry::new(
1028 build_entry(2, Headers::empty(), 11),
1029 vec![IndexKey::new(IndexKind::ClientOrderId, "O-1".to_string())],
1030 ),
1031 ])
1032 .expect("append");
1033 inner.seal(RunStatus::Ended).expect("seal");
1034
1035 let substitute = build_entry(99, Headers::empty(), 11);
1036 let backend = SeqRewriteBackend {
1037 inner,
1038 target_key: 2,
1039 substitute,
1040 };
1041
1042 let report = Verifier::new(Box::new(backend)).verify().expect("verify");
1043
1044 assert!(
1045 report.findings.iter().any(|f| matches!(
1046 f,
1047 VerifyFinding::IndexDrift {
1048 kind: IndexKind::ClientOrderId,
1049 drift: IndexDrift::TargetCorrupted { stored_seq: 2 },
1050 ..
1051 }
1052 )),
1053 "findings was: {:?}",
1054 report.findings,
1055 );
1056 }
1057
1058 #[rstest]
1059 fn verify_propagates_no_run_open_as_error() {
1060 let backend = MemoryBackend::new();
1061 let verifier = Verifier::new(Box::new(backend));
1062
1063 let err = verifier.verify().expect_err("must fail");
1064
1065 match err {
1066 VerifyError::Backend(EventStoreError::Backend(msg)) => {
1067 assert!(msg.contains("no run open"), "msg was: {msg}");
1068 }
1069 VerifyError::Backend(other) => {
1070 panic!("expected Backend(no run open), was {other:?}")
1071 }
1072 }
1073 }
1074}