1use std::{collections::VecDeque, fmt::Debug};
29
30use crate::{
31 backend::{EventStore, IndexKind, ScanDirection},
32 entry::EventStoreEntry,
33 error::EventStoreError,
34 manifest::RunManifest,
35 snapshot::SnapshotAnchor,
36};
37
38pub const DEFAULT_SCAN_CHUNK_SIZE: u64 = 1_024;
44
45#[derive(Clone, Debug, PartialEq, Eq)]
47pub struct SnapshotReplayPlan {
48 pub anchor: Option<SnapshotAnchor>,
50 pub from_seq: u64,
52 pub to_seq: u64,
54}
55
56impl SnapshotReplayPlan {
57 #[must_use]
59 pub const fn is_empty(&self) -> bool {
60 self.from_seq > self.to_seq
61 }
62}
63
64#[derive(Debug)]
70pub struct EventStoreReader<B> {
71 backend: B,
72}
73
74impl<B: EventStore> EventStoreReader<B> {
75 #[must_use]
77 pub const fn new(backend: B) -> Self {
78 Self { backend }
79 }
80
81 pub fn manifest(&self) -> Result<RunManifest, EventStoreError> {
87 self.backend.manifest()
88 }
89
90 pub fn high_watermark(&self) -> Result<u64, EventStoreError> {
96 self.backend.high_watermark()
97 }
98
99 pub fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
110 self.backend.scan_seq(seq)
111 }
112
113 pub fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
119 self.backend.lookup(kind, key)
120 }
121
122 pub fn latest_snapshot_anchor(&self) -> Result<Option<SnapshotAnchor>, EventStoreError> {
132 self.backend.latest_snapshot_anchor()
133 }
134
135 pub fn snapshot_replay_plan(&self) -> Result<SnapshotReplayPlan, EventStoreError> {
147 let anchor = self.latest_snapshot_anchor()?;
148 let to_seq = self.high_watermark()?;
149 let from_seq = match anchor.as_ref() {
150 Some(anchor) if anchor.high_watermark > to_seq => {
151 return Err(EventStoreError::Corrupted(format!(
152 "snapshot anchor high_watermark {} exceeds durable high_watermark {to_seq}",
153 anchor.high_watermark,
154 )));
155 }
156 Some(anchor) => anchor.high_watermark.saturating_add(1),
157 None => 1,
158 };
159
160 Ok(SnapshotReplayPlan {
161 anchor,
162 from_seq,
163 to_seq,
164 })
165 }
166
167 pub fn scan_snapshot_replay_tail(
177 &self,
178 ) -> Result<(SnapshotReplayPlan, RangeScan<'_>), EventStoreError> {
179 let plan = self.snapshot_replay_plan()?;
180 let scan = self.scan_range(plan.from_seq, plan.to_seq, ScanDirection::Forward);
181 Ok((plan, scan))
182 }
183
184 #[must_use]
191 pub fn scan_range(&self, from: u64, to: u64, direction: ScanDirection) -> RangeScan<'_> {
192 RangeScan::new(&self.backend, from, to, direction, DEFAULT_SCAN_CHUNK_SIZE)
193 }
194
195 #[must_use]
201 pub fn scan_range_chunked(
202 &self,
203 from: u64,
204 to: u64,
205 direction: ScanDirection,
206 chunk_size: u64,
207 ) -> RangeScan<'_> {
208 RangeScan::new(&self.backend, from, to, direction, chunk_size.max(1))
209 }
210
211 #[must_use]
213 pub fn into_inner(self) -> B {
214 self.backend
215 }
216
217 #[must_use]
219 pub const fn backend(&self) -> &B {
220 &self.backend
221 }
222}
223
224pub struct RangeScan<'a> {
231 backend: &'a dyn EventStore,
232 direction: ScanDirection,
233 chunk_size: u64,
234 cursor: u64,
235 end: u64,
236 buffer: VecDeque<EventStoreEntry>,
237 has_more: bool,
238 reverse_clamped: bool,
247}
248
249impl Debug for RangeScan<'_> {
250 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251 f.debug_struct(stringify!(RangeScan))
252 .field("direction", &self.direction)
253 .field("chunk_size", &self.chunk_size)
254 .field("cursor", &self.cursor)
255 .field("end", &self.end)
256 .field("buffered", &self.buffer.len())
257 .field("has_more", &self.has_more)
258 .field("reverse_clamped", &self.reverse_clamped)
259 .finish()
260 }
261}
262
263impl<'a> RangeScan<'a> {
264 fn new(
265 backend: &'a dyn EventStore,
266 from: u64,
267 to: u64,
268 direction: ScanDirection,
269 chunk_size: u64,
270 ) -> Self {
271 let valid = from != 0 && from <= to;
275 let (cursor, end) = if valid {
276 match direction {
277 ScanDirection::Forward => (from, to),
278 ScanDirection::Reverse => (to, from),
279 }
280 } else {
281 (0, 0)
282 };
283
284 Self {
285 backend,
286 direction,
287 chunk_size: chunk_size.max(1),
288 cursor,
289 end,
290 buffer: VecDeque::new(),
291 has_more: valid,
292 reverse_clamped: false,
293 }
294 }
295
296 fn fetch_chunk(&mut self) -> Option<Result<(), EventStoreError>> {
297 if !self.has_more {
298 return None;
299 }
300
301 if matches!(self.direction, ScanDirection::Reverse) && !self.reverse_clamped {
302 match self.backend.high_watermark() {
303 Ok(hwm) => {
304 if hwm == 0 || hwm < self.end {
305 self.has_more = false;
306 return Some(Ok(()));
307 }
308 self.cursor = self.cursor.min(hwm);
309 self.reverse_clamped = true;
310 }
311 Err(e) => {
312 self.has_more = false;
313 return Some(Err(e));
314 }
315 }
316 }
317 let (chunk_lo, chunk_hi) = match self.direction {
318 ScanDirection::Forward => {
319 let lo = self.cursor;
320 let hi = lo
321 .saturating_add(self.chunk_size)
322 .saturating_sub(1)
323 .min(self.end);
324 (lo, hi)
325 }
326 ScanDirection::Reverse => {
327 let hi = self.cursor;
328 let lo = hi
329 .saturating_sub(self.chunk_size.saturating_sub(1))
330 .max(self.end);
331 (lo, hi)
332 }
333 };
334
335 match self.backend.scan_range(chunk_lo, chunk_hi, self.direction) {
336 Ok(entries) => {
337 if entries.is_empty() {
338 self.has_more = false;
342 return Some(Ok(()));
343 }
344
345 match self.direction {
346 ScanDirection::Forward => {
347 if chunk_hi >= self.end {
348 self.has_more = false;
349 } else {
350 self.cursor = chunk_hi + 1;
351 }
352 }
353 ScanDirection::Reverse => {
354 if chunk_lo <= self.end {
355 self.has_more = false;
356 } else {
357 self.cursor = chunk_lo - 1;
358 }
359 }
360 }
361 self.buffer.extend(entries);
362 Some(Ok(()))
363 }
364 Err(e) => {
365 self.has_more = false;
366 Some(Err(e))
367 }
368 }
369 }
370}
371
372impl Iterator for RangeScan<'_> {
373 type Item = Result<EventStoreEntry, EventStoreError>;
374
375 fn next(&mut self) -> Option<Self::Item> {
376 loop {
377 if let Some(entry) = self.buffer.pop_front() {
378 return Some(Ok(entry));
379 }
380
381 match self.fetch_chunk() {
382 Some(Ok(())) => {}
383 Some(Err(e)) => return Some(Err(e)),
384 None => return None,
385 }
386 }
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use bytes::Bytes;
393 use indexmap::IndexMap;
394 use nautilus_core::UnixNanos;
395 use rstest::{fixture, rstest};
396 use ustr::Ustr;
397
398 use super::*;
399 use crate::{
400 backend::{AppendEntry, IndexKey, MemoryBackend},
401 compute_entry_hash,
402 entry::Topic,
403 headers::Headers,
404 manifest::{RegisteredComponents, RunManifest, RunStatus},
405 };
406
407 fn manifest(run_id: &str) -> RunManifest {
408 RunManifest {
409 run_id: run_id.to_string(),
410 parent_run_id: None,
411 instance_id: "trader-001".to_string(),
412 binary_hash: "deadbeef".to_string(),
413 schema_version: 1,
414 crate_versions: "feedface".to_string(),
415 feature_flags: Vec::new(),
416 adapter_versions: IndexMap::new(),
417 config_hash: "cafebabe".to_string(),
418 registered_components: RegisteredComponents::default(),
419 seed: None,
420 start_ts_init: UnixNanos::from(0),
421 end_ts_init: None,
422 high_watermark: 0,
423 status: RunStatus::Running,
424 }
425 }
426
427 fn build_entry(seq: u64, ts_init: u64) -> EventStoreEntry {
428 let topic: Topic = "exec.command.SubmitOrder".into();
429 let payload_type = Ustr::from("SubmitOrder");
430 let payload = Bytes::from_static(b"\x01\x02\x03\x04");
431 let headers = Headers::empty();
432 let ts_publish = UnixNanos::from(ts_init + 1);
433 let ts_init = UnixNanos::from(ts_init);
434 let hash = compute_entry_hash(
435 seq,
436 ts_init,
437 ts_publish,
438 topic.as_ref(),
439 payload_type.as_str(),
440 &payload,
441 &headers,
442 );
443
444 EventStoreEntry::new(
445 hash,
446 seq,
447 headers,
448 topic,
449 payload_type,
450 payload,
451 ts_init,
452 ts_publish,
453 )
454 }
455
456 fn append_with(seq: u64, ts_init: u64, index_keys: Vec<IndexKey>) -> AppendEntry {
457 AppendEntry::new(build_entry(seq, ts_init), index_keys)
458 }
459
460 fn populated(count: u64) -> EventStoreReader<MemoryBackend> {
461 let mut backend = MemoryBackend::new();
462 backend.open_run(manifest("run-reader")).expect("open run");
463 let batch: Vec<AppendEntry> = (1..=count)
464 .map(|seq| append_with(seq, 100 + seq, Vec::new()))
465 .collect();
466 backend.append_batch(&batch).expect("append");
467 EventStoreReader::new(backend)
468 }
469
470 #[derive(Debug)]
471 struct AnchorPastWatermarkBackend;
472
473 impl EventStore for AnchorPastWatermarkBackend {
474 fn open_run(&mut self, _manifest: RunManifest) -> Result<(), EventStoreError> {
475 Ok(())
476 }
477
478 fn append_batch(&mut self, _entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
479 Ok(1)
480 }
481
482 fn scan_range(
483 &self,
484 _from: u64,
485 _to: u64,
486 _direction: ScanDirection,
487 ) -> Result<Vec<EventStoreEntry>, EventStoreError> {
488 Ok(Vec::new())
489 }
490
491 fn scan_seq(&self, _seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
492 Ok(None)
493 }
494
495 fn lookup(&self, _kind: IndexKind, _key: &str) -> Result<Option<u64>, EventStoreError> {
496 Ok(None)
497 }
498
499 fn iter_index_keys(&self, _kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
500 Ok(Vec::new())
501 }
502
503 fn record_snapshot_anchor(
504 &mut self,
505 _anchor: SnapshotAnchor,
506 ) -> Result<(), EventStoreError> {
507 Ok(())
508 }
509
510 fn latest_snapshot_anchor(&self) -> Result<Option<SnapshotAnchor>, EventStoreError> {
511 Ok(Some(SnapshotAnchor::new(
512 2,
513 "cache://snapshots/run-reader/2",
514 "blake3:abc",
515 )))
516 }
517
518 fn seal(&mut self, _status: RunStatus) -> Result<(), EventStoreError> {
519 Ok(())
520 }
521
522 fn manifest(&self) -> Result<RunManifest, EventStoreError> {
523 Ok(manifest("run-anchor-past-watermark"))
524 }
525
526 fn high_watermark(&self) -> Result<u64, EventStoreError> {
527 Ok(1)
528 }
529 }
530
531 #[fixture]
532 fn reader_with_three() -> EventStoreReader<MemoryBackend> {
533 populated(3)
534 }
535
536 #[rstest]
537 fn manifest_delegates_to_backend(reader_with_three: EventStoreReader<MemoryBackend>) {
538 let m = reader_with_three.manifest().expect("manifest");
539
540 assert_eq!(m.run_id, "run-reader");
541 assert_eq!(m.high_watermark, 3);
542 }
543
544 #[rstest]
545 fn high_watermark_delegates_to_backend(reader_with_three: EventStoreReader<MemoryBackend>) {
546 assert_eq!(reader_with_three.high_watermark().expect("hwm"), 3);
547 }
548
549 #[rstest]
550 fn latest_snapshot_anchor_delegates_to_backend() {
551 let mut backend = MemoryBackend::new();
552 backend.open_run(manifest("run-anchor")).expect("open run");
553 backend
554 .append_batch(&[append_with(1, 101, Vec::new())])
555 .expect("append");
556 let anchor = SnapshotAnchor::new(1, "cache://snapshots/run-anchor/1", "blake3:abc");
557 backend
558 .record_snapshot_anchor(anchor.clone())
559 .expect("record anchor");
560 let reader = EventStoreReader::new(backend);
561
562 assert_eq!(
563 reader.latest_snapshot_anchor().expect("latest anchor"),
564 Some(anchor),
565 );
566 }
567
568 #[rstest]
569 fn snapshot_replay_plan_without_anchor_replays_from_start(
570 reader_with_three: EventStoreReader<MemoryBackend>,
571 ) {
572 let plan = reader_with_three
573 .snapshot_replay_plan()
574 .expect("snapshot replay plan");
575
576 assert_eq!(
577 plan,
578 SnapshotReplayPlan {
579 anchor: None,
580 from_seq: 1,
581 to_seq: 3,
582 },
583 );
584 assert!(!plan.is_empty());
585 }
586
587 #[rstest]
588 fn snapshot_replay_plan_with_anchor_starts_after_anchor_watermark() {
589 let mut backend = MemoryBackend::new();
590 backend.open_run(manifest("run-anchor")).expect("open run");
591 backend
592 .append_batch(&[
593 append_with(1, 101, Vec::new()),
594 append_with(2, 102, Vec::new()),
595 append_with(3, 103, Vec::new()),
596 ])
597 .expect("append");
598 let anchor = SnapshotAnchor::new(2, "cache://snapshots/run-anchor/2", "blake3:abc");
599 backend
600 .record_snapshot_anchor(anchor.clone())
601 .expect("record anchor");
602 let reader = EventStoreReader::new(backend);
603
604 let plan = reader.snapshot_replay_plan().expect("snapshot replay plan");
605
606 assert_eq!(
607 plan,
608 SnapshotReplayPlan {
609 anchor: Some(anchor),
610 from_seq: 3,
611 to_seq: 3,
612 },
613 );
614 assert!(!plan.is_empty());
615 }
616
617 #[rstest]
618 fn snapshot_replay_plan_rejects_anchor_past_watermark() {
619 let reader = EventStoreReader::new(AnchorPastWatermarkBackend);
620 let err = reader
621 .snapshot_replay_plan()
622 .expect_err("anchor past watermark must fail");
623
624 match err {
625 EventStoreError::Corrupted(msg) => {
626 assert!(
627 msg.contains("exceeds durable high_watermark"),
628 "msg was: {msg}",
629 );
630 }
631 other => panic!("expected Corrupted, was {other:?}"),
632 }
633 }
634
635 #[rstest]
636 fn scan_snapshot_replay_tail_yields_entries_after_anchor() {
637 let mut backend = MemoryBackend::new();
638 backend.open_run(manifest("run-anchor")).expect("open run");
639 backend
640 .append_batch(&[
641 append_with(1, 101, Vec::new()),
642 append_with(2, 102, Vec::new()),
643 append_with(3, 103, Vec::new()),
644 ])
645 .expect("append");
646 backend
647 .record_snapshot_anchor(SnapshotAnchor::new(
648 2,
649 "cache://snapshots/run-anchor/2",
650 "blake3:abc",
651 ))
652 .expect("record anchor");
653 let reader = EventStoreReader::new(backend);
654
655 let (plan, scan) = reader
656 .scan_snapshot_replay_tail()
657 .expect("snapshot replay tail");
658 let seqs: Vec<_> = scan.map(|entry| entry.expect("entry").seq).collect();
659
660 assert_eq!(plan.from_seq, 3);
661 assert_eq!(seqs, vec![3]);
662 }
663
664 #[rstest]
665 fn scan_snapshot_replay_tail_is_empty_when_anchor_matches_watermark() {
666 let mut backend = MemoryBackend::new();
667 backend.open_run(manifest("run-anchor")).expect("open run");
668 backend
669 .append_batch(&[
670 append_with(1, 101, Vec::new()),
671 append_with(2, 102, Vec::new()),
672 ])
673 .expect("append");
674 backend
675 .record_snapshot_anchor(SnapshotAnchor::new(
676 2,
677 "cache://snapshots/run-anchor/2",
678 "blake3:abc",
679 ))
680 .expect("record anchor");
681 let reader = EventStoreReader::new(backend);
682
683 let (plan, scan) = reader
684 .scan_snapshot_replay_tail()
685 .expect("snapshot replay tail");
686 let seqs: Vec<_> = scan.map(|entry| entry.expect("entry").seq).collect();
687
688 assert_eq!(plan.from_seq, 3);
689 assert_eq!(plan.to_seq, 2);
690 assert!(plan.is_empty());
691 assert!(seqs.is_empty());
692 }
693
694 #[rstest]
695 fn scan_seq_returns_committed_entry(reader_with_three: EventStoreReader<MemoryBackend>) {
696 let entry = reader_with_three
697 .scan_seq(2)
698 .expect("scan")
699 .expect("present");
700
701 assert_eq!(entry.seq, 2);
702 assert_eq!(entry.ts_init, UnixNanos::from(102));
703 }
704
705 #[rstest]
706 fn scan_seq_returns_none_outside_watermark(reader_with_three: EventStoreReader<MemoryBackend>) {
707 assert!(reader_with_three.scan_seq(0).expect("scan").is_none());
708 assert!(reader_with_three.scan_seq(99).expect("scan").is_none());
709 }
710
711 #[rstest]
712 fn lookup_finds_recorded_index_key() {
713 let mut backend = MemoryBackend::new();
714 backend.open_run(manifest("run-lookup")).expect("open run");
715 backend
716 .append_batch(&[
717 AppendEntry::new(
718 build_entry(1, 100),
719 vec![IndexKey::new(IndexKind::ClientOrderId, "O-1".to_string())],
720 ),
721 AppendEntry::new(
722 build_entry(2, 101),
723 vec![IndexKey::new(IndexKind::VenueOrderId, "V-1".to_string())],
724 ),
725 ])
726 .expect("append");
727 let reader = EventStoreReader::new(backend);
728
729 assert_eq!(
730 reader
731 .lookup(IndexKind::ClientOrderId, "O-1")
732 .expect("lookup"),
733 Some(1),
734 );
735 assert_eq!(
736 reader
737 .lookup(IndexKind::VenueOrderId, "V-1")
738 .expect("lookup"),
739 Some(2),
740 );
741 assert!(
742 reader
743 .lookup(IndexKind::ClientOrderId, "missing")
744 .expect("lookup")
745 .is_none(),
746 );
747 }
748
749 #[rstest]
750 fn scan_range_forward_yields_entries_in_order(
751 reader_with_three: EventStoreReader<MemoryBackend>,
752 ) {
753 let seqs: Vec<u64> = reader_with_three
754 .scan_range(1, 3, ScanDirection::Forward)
755 .map(|r| r.expect("entry").seq)
756 .collect();
757
758 assert_eq!(seqs, vec![1, 2, 3]);
759 }
760
761 #[rstest]
762 fn scan_range_reverse_yields_entries_in_reverse(
763 reader_with_three: EventStoreReader<MemoryBackend>,
764 ) {
765 let seqs: Vec<u64> = reader_with_three
766 .scan_range(1, 3, ScanDirection::Reverse)
767 .map(|r| r.expect("entry").seq)
768 .collect();
769
770 assert_eq!(seqs, vec![3, 2, 1]);
771 }
772
773 #[rstest]
774 fn scan_range_window_clips_to_request() {
775 let reader = populated(10);
776
777 let seqs: Vec<u64> = reader
778 .scan_range(4, 7, ScanDirection::Forward)
779 .map(|r| r.expect("entry").seq)
780 .collect();
781
782 assert_eq!(seqs, vec![4, 5, 6, 7]);
783 }
784
785 #[rstest]
786 fn scan_range_chunked_forward_walks_full_range() {
787 let reader = populated(7);
791
792 let seqs: Vec<u64> = reader
793 .scan_range_chunked(1, 7, ScanDirection::Forward, 2)
794 .map(|r| r.expect("entry").seq)
795 .collect();
796
797 assert_eq!(seqs, vec![1, 2, 3, 4, 5, 6, 7]);
798 }
799
800 #[rstest]
801 fn scan_range_chunked_reverse_walks_full_range() {
802 let reader = populated(7);
805
806 let seqs: Vec<u64> = reader
807 .scan_range_chunked(1, 7, ScanDirection::Reverse, 2)
808 .map(|r| r.expect("entry").seq)
809 .collect();
810
811 assert_eq!(seqs, vec![7, 6, 5, 4, 3, 2, 1]);
812 }
813
814 #[rstest]
815 fn scan_range_clips_to_high_watermark() {
816 let reader = populated(3);
820
821 let seqs: Vec<u64> = reader
822 .scan_range_chunked(1, 99, ScanDirection::Forward, 2)
823 .map(|r| r.expect("entry").seq)
824 .collect();
825
826 assert_eq!(seqs, vec![1, 2, 3]);
827 }
828
829 #[rstest]
830 fn scan_range_reverse_clips_to_high_watermark() {
831 let reader = populated(3);
837
838 let seqs: Vec<u64> = reader
839 .scan_range_chunked(1, 99, ScanDirection::Reverse, 2)
840 .map(|r| r.expect("entry").seq)
841 .collect();
842
843 assert_eq!(seqs, vec![3, 2, 1]);
844 }
845
846 #[rstest]
847 fn scan_range_reverse_with_to_at_u64_max() {
848 let reader = populated(3);
852
853 let seqs: Vec<u64> = reader
854 .scan_range_chunked(1, u64::MAX, ScanDirection::Reverse, 1)
855 .map(|r| r.expect("entry").seq)
856 .collect();
857
858 assert_eq!(seqs, vec![3, 2, 1]);
859 }
860
861 #[rstest]
862 fn scan_range_reverse_above_watermark_yields_nothing() {
863 let reader = populated(3);
866
867 let seqs: Vec<u64> = reader
868 .scan_range(10, 20, ScanDirection::Reverse)
869 .map(|r| r.expect("entry").seq)
870 .collect();
871
872 assert!(seqs.is_empty(), "seqs was: {seqs:?}");
873 }
874
875 #[rstest]
876 #[case::inverted(5, 1, ScanDirection::Forward)]
877 #[case::zero_from(0, 5, ScanDirection::Forward)]
878 #[case::inverted_reverse(5, 1, ScanDirection::Reverse)]
879 fn scan_range_empty_bounds_yield_no_entries(
880 #[case] from: u64,
881 #[case] to: u64,
882 #[case] direction: ScanDirection,
883 ) {
884 let reader = populated(3);
885
886 let seqs: Vec<u64> = reader
887 .scan_range(from, to, direction)
888 .map(|r| r.expect("entry").seq)
889 .collect();
890
891 assert!(seqs.is_empty(), "seqs was: {seqs:?}");
892 }
893
894 #[rstest]
895 fn scan_range_propagates_hash_mismatch_error() {
896 let mut backend = MemoryBackend::new();
900 backend.open_run(manifest("run-tamper")).expect("open run");
901 let mut tampered = build_entry(1, 100);
902 tampered.payload = Bytes::from_static(b"\xFF\xFF");
903 backend
904 .append_batch(&[AppendEntry::without_indices(tampered)])
905 .expect("append");
906 let reader = EventStoreReader::new(backend);
907
908 let mut iter = reader.scan_range(1, 1, ScanDirection::Forward);
909 let first = iter.next().expect("first item");
910
911 match first {
912 Err(EventStoreError::HashMismatch { seq: 1 }) => {}
913 other => panic!("expected HashMismatch, was {other:?}"),
914 }
915 assert!(
916 iter.next().is_none(),
917 "iterator must terminate after surfacing the error",
918 );
919 }
920
921 #[rstest]
922 fn scan_range_chunk_size_zero_normalizes_to_one() {
923 let reader = populated(3);
926
927 let seqs: Vec<u64> = reader
928 .scan_range_chunked(1, 3, ScanDirection::Forward, 0)
929 .map(|r| r.expect("entry").seq)
930 .collect();
931
932 assert_eq!(seqs, vec![1, 2, 3]);
933 }
934
935 #[rstest]
936 fn into_inner_returns_backend(reader_with_three: EventStoreReader<MemoryBackend>) {
937 let backend = reader_with_three.into_inner();
938
939 assert_eq!(backend.high_watermark().expect("hwm"), 3);
940 }
941
942 #[rstest]
943 fn lookup_uses_distinct_kinds() {
944 let mut backend = MemoryBackend::new();
947 backend.open_run(manifest("run-kinds")).expect("open run");
948 let shared_key = "shared-key".to_string();
949 backend
950 .append_batch(&[AppendEntry::new(
951 build_entry(1, 100),
952 vec![IndexKey::new(IndexKind::ClientOrderId, shared_key.clone())],
953 )])
954 .expect("append");
955 let reader = EventStoreReader::new(backend);
956
957 assert_eq!(
958 reader
959 .lookup(IndexKind::ClientOrderId, &shared_key)
960 .expect("lookup"),
961 Some(1),
962 );
963 assert!(
964 reader
965 .lookup(IndexKind::VenueOrderId, &shared_key)
966 .expect("lookup")
967 .is_none(),
968 );
969 }
970}