Skip to main content

nautilus_event_store/reader/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Reader for the event store.
17//!
18//! The reader composes over the locked [`EventStore`] trait. It owns one backend instance,
19//! exposes range scans as a chunked iterator, single-`seq` lookups, secondary-index
20//! lookups, and manifest access. The backend the reader receives may be a still-open run
21//! (the writer's backend) or a sealed run produced via
22//! [`crate::backend::RedbBackend::open_sealed`].
23//!
24//! Run iteration across a base directory lives on the redb backend itself
25//! ([`crate::backend::RedbBackend::list_runs`]) because it depends on the on-disk file
26//! layout; the in-memory backend has no analog and reads its single open run in place.
27
28use std::{collections::VecDeque, fmt::Debug};
29
30use crate::{
31    backend::{EventStore, IndexKind, ScanDirection},
32    entry::EventStoreEntry,
33    error::EventStoreError,
34    manifest::RunManifest,
35    snapshot::SnapshotAnchor,
36};
37
38/// Default number of entries materialized per chunked `scan_range` call.
39///
40/// Chosen so a forensics scan of a multi-million-entry run keeps the live working set
41/// bounded while amortizing the per-call transaction overhead. Tune through
42/// [`EventStoreReader::scan_range_chunked`] when a workload prefers different bounds.
43pub const DEFAULT_SCAN_CHUNK_SIZE: u64 = 1_024;
44
45/// Replay bounds derived from the latest cache snapshot anchor.
46#[derive(Clone, Debug, PartialEq, Eq)]
47pub struct SnapshotReplayPlan {
48    /// Latest cache snapshot anchor, or `None` when restore must replay from the start.
49    pub anchor: Option<SnapshotAnchor>,
50    /// First event-store seq to replay after the cache snapshot restore.
51    pub from_seq: u64,
52    /// Current durable high-watermark for the run.
53    pub to_seq: u64,
54}
55
56impl SnapshotReplayPlan {
57    /// Returns whether there are no entries to replay for this plan.
58    #[must_use]
59    pub const fn is_empty(&self) -> bool {
60        self.from_seq > self.to_seq
61    }
62}
63
64/// Read-only handle over an [`EventStore`] backend.
65///
66/// The reader is the canonical entry point for read-only replay, audit, and verifier
67/// scans: it never mutates the backend (no `append_batch` surface) and it tolerates
68/// running and sealed backends uniformly.
69#[derive(Debug)]
70pub struct EventStoreReader<B> {
71    backend: B,
72}
73
74impl<B: EventStore> EventStoreReader<B> {
75    /// Wraps `backend` for read-only access.
76    #[must_use]
77    pub const fn new(backend: B) -> Self {
78        Self { backend }
79    }
80
81    /// Returns the manifest of the open run.
82    ///
83    /// # Errors
84    ///
85    /// Returns [`EventStoreError::Backend`] when no run is open.
86    pub fn manifest(&self) -> Result<RunManifest, EventStoreError> {
87        self.backend.manifest()
88    }
89
90    /// Returns the largest `seq` durably acknowledged for the open run.
91    ///
92    /// # Errors
93    ///
94    /// Returns [`EventStoreError::Backend`] when no run is open.
95    pub fn high_watermark(&self) -> Result<u64, EventStoreError> {
96        self.backend.high_watermark()
97    }
98
99    /// Reads a single entry by `seq`.
100    ///
101    /// Returns `None` when `seq == 0` or `seq` exceeds the current high-watermark.
102    ///
103    /// # Errors
104    ///
105    /// Returns [`EventStoreError::HashMismatch`] when the recomputed entry hash diverges
106    /// from the stored value, [`EventStoreError::Gap`] when the row is missing inside
107    /// the high-watermark, and [`EventStoreError::Backend`] for unclassified backend
108    /// failures.
109    pub fn scan_seq(&self, seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
110        self.backend.scan_seq(seq)
111    }
112
113    /// Looks up the first `seq` recorded under the given index key.
114    ///
115    /// # Errors
116    ///
117    /// Returns [`EventStoreError::Backend`] for unclassified backend failures.
118    pub fn lookup(&self, kind: IndexKind, key: &str) -> Result<Option<u64>, EventStoreError> {
119        self.backend.lookup(kind, key)
120    }
121
122    /// Returns the latest snapshot anchor for the run.
123    ///
124    /// Returns `Ok(None)` when no snapshot anchor has been recorded yet.
125    ///
126    /// # Errors
127    ///
128    /// Returns [`EventStoreError::Backend`] when no run is open or the backend does not
129    /// support snapshot anchors, and [`EventStoreError::Corrupted`] when a stored anchor
130    /// cannot decode.
131    pub fn latest_snapshot_anchor(&self) -> Result<Option<SnapshotAnchor>, EventStoreError> {
132        self.backend.latest_snapshot_anchor()
133    }
134
135    /// Builds the restore replay bounds from the latest snapshot anchor.
136    ///
137    /// Restore callers fetch and validate the cache-owned snapshot blob first, then
138    /// replay entries in `[from_seq, to_seq]`. When an anchor exists, `from_seq` is
139    /// `anchor.high_watermark + 1`; without an anchor, restore replays from seq `1`.
140    ///
141    /// # Errors
142    ///
143    /// Returns [`EventStoreError::Backend`] when no run is open or the backend does not
144    /// support snapshot anchors, and [`EventStoreError::Corrupted`] when the stored
145    /// anchor points past the durable high-watermark.
146    pub fn snapshot_replay_plan(&self) -> Result<SnapshotReplayPlan, EventStoreError> {
147        let anchor = self.latest_snapshot_anchor()?;
148        let to_seq = self.high_watermark()?;
149        let from_seq = match anchor.as_ref() {
150            Some(anchor) if anchor.high_watermark > to_seq => {
151                return Err(EventStoreError::Corrupted(format!(
152                    "snapshot anchor high_watermark {} exceeds durable high_watermark {to_seq}",
153                    anchor.high_watermark,
154                )));
155            }
156            Some(anchor) => anchor.high_watermark.saturating_add(1),
157            None => 1,
158        };
159
160        Ok(SnapshotReplayPlan {
161            anchor,
162            from_seq,
163            to_seq,
164        })
165    }
166
167    /// Scans the forward replay tail after the latest snapshot anchor.
168    ///
169    /// This pairs [`Self::snapshot_replay_plan`] with the actual event iterator used by
170    /// restore: entries start at `anchor.high_watermark + 1` when an anchor exists, or
171    /// at seq `1` when no cache snapshot has been anchored.
172    ///
173    /// # Errors
174    ///
175    /// See [`Self::snapshot_replay_plan`].
176    pub fn scan_snapshot_replay_tail(
177        &self,
178    ) -> Result<(SnapshotReplayPlan, RangeScan<'_>), EventStoreError> {
179        let plan = self.snapshot_replay_plan()?;
180        let scan = self.scan_range(plan.from_seq, plan.to_seq, ScanDirection::Forward);
181        Ok((plan, scan))
182    }
183
184    /// Scans entries by `seq` over the inclusive range `[from, to]`.
185    ///
186    /// The returned iterator pulls [`DEFAULT_SCAN_CHUNK_SIZE`] entries at a time from the
187    /// backend so a multi-million-entry forensics scan keeps the working set bounded
188    /// while still amortizing per-transaction overhead. The iterator yields one entry
189    /// per call; backend errors surface as `Some(Err(...))` and terminate the scan.
190    #[must_use]
191    pub fn scan_range(&self, from: u64, to: u64, direction: ScanDirection) -> RangeScan<'_> {
192        RangeScan::new(&self.backend, from, to, direction, DEFAULT_SCAN_CHUNK_SIZE)
193    }
194
195    /// Variant of [`Self::scan_range`] with a caller-chosen chunk size.
196    ///
197    /// `chunk_size == 0` is normalized to `1`; the reader never asks the backend for a
198    /// degenerate empty window because the chunk window is the only progress signal the
199    /// iterator advances on.
200    #[must_use]
201    pub fn scan_range_chunked(
202        &self,
203        from: u64,
204        to: u64,
205        direction: ScanDirection,
206        chunk_size: u64,
207    ) -> RangeScan<'_> {
208        RangeScan::new(&self.backend, from, to, direction, chunk_size.max(1))
209    }
210
211    /// Returns the underlying backend, consuming the reader.
212    #[must_use]
213    pub fn into_inner(self) -> B {
214        self.backend
215    }
216
217    /// Returns a reference to the underlying backend.
218    #[must_use]
219    pub const fn backend(&self) -> &B {
220        &self.backend
221    }
222}
223
224/// Lazy iterator over a `seq` range, materialized in chunks.
225///
226/// Created by [`EventStoreReader::scan_range`] and
227/// [`EventStoreReader::scan_range_chunked`]. The iterator owns no transaction lifetime:
228/// each chunk opens a fresh [`EventStore::scan_range`] call against the backend, so a
229/// long-running scan is not held open against a writer's commit cadence.
230pub struct RangeScan<'a> {
231    backend: &'a dyn EventStore,
232    direction: ScanDirection,
233    chunk_size: u64,
234    cursor: u64,
235    end: u64,
236    buffer: VecDeque<EventStoreEntry>,
237    has_more: bool,
238    // Reverse scans must clamp their starting cursor against the durable
239    // high-watermark on the first fetch. Without that step, a `to` value
240    // above the watermark (a forensics caller passing an open upper bound,
241    // or simply `to = u64::MAX`) makes the first chunk lie wholly above
242    // the durable rows; the backend clips that chunk to an empty Vec and
243    // the iterator would terminate before reading the rows below the
244    // watermark. Forward scans are not affected: an empty forward chunk
245    // genuinely means we have walked past the watermark.
246    reverse_clamped: bool,
247}
248
249impl Debug for RangeScan<'_> {
250    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251        f.debug_struct(stringify!(RangeScan))
252            .field("direction", &self.direction)
253            .field("chunk_size", &self.chunk_size)
254            .field("cursor", &self.cursor)
255            .field("end", &self.end)
256            .field("buffered", &self.buffer.len())
257            .field("has_more", &self.has_more)
258            .field("reverse_clamped", &self.reverse_clamped)
259            .finish()
260    }
261}
262
263impl<'a> RangeScan<'a> {
264    fn new(
265        backend: &'a dyn EventStore,
266        from: u64,
267        to: u64,
268        direction: ScanDirection,
269        chunk_size: u64,
270    ) -> Self {
271        // Mirror the backend's empty-range conventions so the iterator never makes a
272        // first call into the backend for a degenerate window. `from == 0` is reserved
273        // (seq is 1-based); `from > to` is an empty range.
274        let valid = from != 0 && from <= to;
275        let (cursor, end) = if valid {
276            match direction {
277                ScanDirection::Forward => (from, to),
278                ScanDirection::Reverse => (to, from),
279            }
280        } else {
281            (0, 0)
282        };
283
284        Self {
285            backend,
286            direction,
287            chunk_size: chunk_size.max(1),
288            cursor,
289            end,
290            buffer: VecDeque::new(),
291            has_more: valid,
292            reverse_clamped: false,
293        }
294    }
295
296    fn fetch_chunk(&mut self) -> Option<Result<(), EventStoreError>> {
297        if !self.has_more {
298            return None;
299        }
300
301        if matches!(self.direction, ScanDirection::Reverse) && !self.reverse_clamped {
302            match self.backend.high_watermark() {
303                Ok(hwm) => {
304                    if hwm == 0 || hwm < self.end {
305                        self.has_more = false;
306                        return Some(Ok(()));
307                    }
308                    self.cursor = self.cursor.min(hwm);
309                    self.reverse_clamped = true;
310                }
311                Err(e) => {
312                    self.has_more = false;
313                    return Some(Err(e));
314                }
315            }
316        }
317        let (chunk_lo, chunk_hi) = match self.direction {
318            ScanDirection::Forward => {
319                let lo = self.cursor;
320                let hi = lo
321                    .saturating_add(self.chunk_size)
322                    .saturating_sub(1)
323                    .min(self.end);
324                (lo, hi)
325            }
326            ScanDirection::Reverse => {
327                let hi = self.cursor;
328                let lo = hi
329                    .saturating_sub(self.chunk_size.saturating_sub(1))
330                    .max(self.end);
331                (lo, hi)
332            }
333        };
334
335        match self.backend.scan_range(chunk_lo, chunk_hi, self.direction) {
336            Ok(entries) => {
337                if entries.is_empty() {
338                    // The backend clipped the window to its high-watermark or the run is
339                    // shorter than the requested range; either way no further chunks
340                    // will yield rows.
341                    self.has_more = false;
342                    return Some(Ok(()));
343                }
344
345                match self.direction {
346                    ScanDirection::Forward => {
347                        if chunk_hi >= self.end {
348                            self.has_more = false;
349                        } else {
350                            self.cursor = chunk_hi + 1;
351                        }
352                    }
353                    ScanDirection::Reverse => {
354                        if chunk_lo <= self.end {
355                            self.has_more = false;
356                        } else {
357                            self.cursor = chunk_lo - 1;
358                        }
359                    }
360                }
361                self.buffer.extend(entries);
362                Some(Ok(()))
363            }
364            Err(e) => {
365                self.has_more = false;
366                Some(Err(e))
367            }
368        }
369    }
370}
371
372impl Iterator for RangeScan<'_> {
373    type Item = Result<EventStoreEntry, EventStoreError>;
374
375    fn next(&mut self) -> Option<Self::Item> {
376        loop {
377            if let Some(entry) = self.buffer.pop_front() {
378                return Some(Ok(entry));
379            }
380
381            match self.fetch_chunk() {
382                Some(Ok(())) => {}
383                Some(Err(e)) => return Some(Err(e)),
384                None => return None,
385            }
386        }
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use bytes::Bytes;
393    use indexmap::IndexMap;
394    use nautilus_core::UnixNanos;
395    use rstest::{fixture, rstest};
396    use ustr::Ustr;
397
398    use super::*;
399    use crate::{
400        backend::{AppendEntry, IndexKey, MemoryBackend},
401        compute_entry_hash,
402        entry::Topic,
403        headers::Headers,
404        manifest::{RegisteredComponents, RunManifest, RunStatus},
405    };
406
407    fn manifest(run_id: &str) -> RunManifest {
408        RunManifest {
409            run_id: run_id.to_string(),
410            parent_run_id: None,
411            instance_id: "trader-001".to_string(),
412            binary_hash: "deadbeef".to_string(),
413            schema_version: 1,
414            crate_versions: "feedface".to_string(),
415            feature_flags: Vec::new(),
416            adapter_versions: IndexMap::new(),
417            config_hash: "cafebabe".to_string(),
418            registered_components: RegisteredComponents::default(),
419            seed: None,
420            start_ts_init: UnixNanos::from(0),
421            end_ts_init: None,
422            high_watermark: 0,
423            status: RunStatus::Running,
424        }
425    }
426
427    fn build_entry(seq: u64, ts_init: u64) -> EventStoreEntry {
428        let topic: Topic = "exec.command.SubmitOrder".into();
429        let payload_type = Ustr::from("SubmitOrder");
430        let payload = Bytes::from_static(b"\x01\x02\x03\x04");
431        let headers = Headers::empty();
432        let ts_publish = UnixNanos::from(ts_init + 1);
433        let ts_init = UnixNanos::from(ts_init);
434        let hash = compute_entry_hash(
435            seq,
436            ts_init,
437            ts_publish,
438            topic.as_ref(),
439            payload_type.as_str(),
440            &payload,
441            &headers,
442        );
443
444        EventStoreEntry::new(
445            hash,
446            seq,
447            headers,
448            topic,
449            payload_type,
450            payload,
451            ts_init,
452            ts_publish,
453        )
454    }
455
456    fn append_with(seq: u64, ts_init: u64, index_keys: Vec<IndexKey>) -> AppendEntry {
457        AppendEntry::new(build_entry(seq, ts_init), index_keys)
458    }
459
460    fn populated(count: u64) -> EventStoreReader<MemoryBackend> {
461        let mut backend = MemoryBackend::new();
462        backend.open_run(manifest("run-reader")).expect("open run");
463        let batch: Vec<AppendEntry> = (1..=count)
464            .map(|seq| append_with(seq, 100 + seq, Vec::new()))
465            .collect();
466        backend.append_batch(&batch).expect("append");
467        EventStoreReader::new(backend)
468    }
469
470    #[derive(Debug)]
471    struct AnchorPastWatermarkBackend;
472
473    impl EventStore for AnchorPastWatermarkBackend {
474        fn open_run(&mut self, _manifest: RunManifest) -> Result<(), EventStoreError> {
475            Ok(())
476        }
477
478        fn append_batch(&mut self, _entries: &[AppendEntry]) -> Result<u64, EventStoreError> {
479            Ok(1)
480        }
481
482        fn scan_range(
483            &self,
484            _from: u64,
485            _to: u64,
486            _direction: ScanDirection,
487        ) -> Result<Vec<EventStoreEntry>, EventStoreError> {
488            Ok(Vec::new())
489        }
490
491        fn scan_seq(&self, _seq: u64) -> Result<Option<EventStoreEntry>, EventStoreError> {
492            Ok(None)
493        }
494
495        fn lookup(&self, _kind: IndexKind, _key: &str) -> Result<Option<u64>, EventStoreError> {
496            Ok(None)
497        }
498
499        fn iter_index_keys(&self, _kind: IndexKind) -> Result<Vec<(String, u64)>, EventStoreError> {
500            Ok(Vec::new())
501        }
502
503        fn record_snapshot_anchor(
504            &mut self,
505            _anchor: SnapshotAnchor,
506        ) -> Result<(), EventStoreError> {
507            Ok(())
508        }
509
510        fn latest_snapshot_anchor(&self) -> Result<Option<SnapshotAnchor>, EventStoreError> {
511            Ok(Some(SnapshotAnchor::new(
512                2,
513                "cache://snapshots/run-reader/2",
514                "blake3:abc",
515            )))
516        }
517
518        fn seal(&mut self, _status: RunStatus) -> Result<(), EventStoreError> {
519            Ok(())
520        }
521
522        fn manifest(&self) -> Result<RunManifest, EventStoreError> {
523            Ok(manifest("run-anchor-past-watermark"))
524        }
525
526        fn high_watermark(&self) -> Result<u64, EventStoreError> {
527            Ok(1)
528        }
529    }
530
531    #[fixture]
532    fn reader_with_three() -> EventStoreReader<MemoryBackend> {
533        populated(3)
534    }
535
536    #[rstest]
537    fn manifest_delegates_to_backend(reader_with_three: EventStoreReader<MemoryBackend>) {
538        let m = reader_with_three.manifest().expect("manifest");
539
540        assert_eq!(m.run_id, "run-reader");
541        assert_eq!(m.high_watermark, 3);
542    }
543
544    #[rstest]
545    fn high_watermark_delegates_to_backend(reader_with_three: EventStoreReader<MemoryBackend>) {
546        assert_eq!(reader_with_three.high_watermark().expect("hwm"), 3);
547    }
548
549    #[rstest]
550    fn latest_snapshot_anchor_delegates_to_backend() {
551        let mut backend = MemoryBackend::new();
552        backend.open_run(manifest("run-anchor")).expect("open run");
553        backend
554            .append_batch(&[append_with(1, 101, Vec::new())])
555            .expect("append");
556        let anchor = SnapshotAnchor::new(1, "cache://snapshots/run-anchor/1", "blake3:abc");
557        backend
558            .record_snapshot_anchor(anchor.clone())
559            .expect("record anchor");
560        let reader = EventStoreReader::new(backend);
561
562        assert_eq!(
563            reader.latest_snapshot_anchor().expect("latest anchor"),
564            Some(anchor),
565        );
566    }
567
568    #[rstest]
569    fn snapshot_replay_plan_without_anchor_replays_from_start(
570        reader_with_three: EventStoreReader<MemoryBackend>,
571    ) {
572        let plan = reader_with_three
573            .snapshot_replay_plan()
574            .expect("snapshot replay plan");
575
576        assert_eq!(
577            plan,
578            SnapshotReplayPlan {
579                anchor: None,
580                from_seq: 1,
581                to_seq: 3,
582            },
583        );
584        assert!(!plan.is_empty());
585    }
586
587    #[rstest]
588    fn snapshot_replay_plan_with_anchor_starts_after_anchor_watermark() {
589        let mut backend = MemoryBackend::new();
590        backend.open_run(manifest("run-anchor")).expect("open run");
591        backend
592            .append_batch(&[
593                append_with(1, 101, Vec::new()),
594                append_with(2, 102, Vec::new()),
595                append_with(3, 103, Vec::new()),
596            ])
597            .expect("append");
598        let anchor = SnapshotAnchor::new(2, "cache://snapshots/run-anchor/2", "blake3:abc");
599        backend
600            .record_snapshot_anchor(anchor.clone())
601            .expect("record anchor");
602        let reader = EventStoreReader::new(backend);
603
604        let plan = reader.snapshot_replay_plan().expect("snapshot replay plan");
605
606        assert_eq!(
607            plan,
608            SnapshotReplayPlan {
609                anchor: Some(anchor),
610                from_seq: 3,
611                to_seq: 3,
612            },
613        );
614        assert!(!plan.is_empty());
615    }
616
617    #[rstest]
618    fn snapshot_replay_plan_rejects_anchor_past_watermark() {
619        let reader = EventStoreReader::new(AnchorPastWatermarkBackend);
620        let err = reader
621            .snapshot_replay_plan()
622            .expect_err("anchor past watermark must fail");
623
624        match err {
625            EventStoreError::Corrupted(msg) => {
626                assert!(
627                    msg.contains("exceeds durable high_watermark"),
628                    "msg was: {msg}",
629                );
630            }
631            other => panic!("expected Corrupted, was {other:?}"),
632        }
633    }
634
635    #[rstest]
636    fn scan_snapshot_replay_tail_yields_entries_after_anchor() {
637        let mut backend = MemoryBackend::new();
638        backend.open_run(manifest("run-anchor")).expect("open run");
639        backend
640            .append_batch(&[
641                append_with(1, 101, Vec::new()),
642                append_with(2, 102, Vec::new()),
643                append_with(3, 103, Vec::new()),
644            ])
645            .expect("append");
646        backend
647            .record_snapshot_anchor(SnapshotAnchor::new(
648                2,
649                "cache://snapshots/run-anchor/2",
650                "blake3:abc",
651            ))
652            .expect("record anchor");
653        let reader = EventStoreReader::new(backend);
654
655        let (plan, scan) = reader
656            .scan_snapshot_replay_tail()
657            .expect("snapshot replay tail");
658        let seqs: Vec<_> = scan.map(|entry| entry.expect("entry").seq).collect();
659
660        assert_eq!(plan.from_seq, 3);
661        assert_eq!(seqs, vec![3]);
662    }
663
664    #[rstest]
665    fn scan_snapshot_replay_tail_is_empty_when_anchor_matches_watermark() {
666        let mut backend = MemoryBackend::new();
667        backend.open_run(manifest("run-anchor")).expect("open run");
668        backend
669            .append_batch(&[
670                append_with(1, 101, Vec::new()),
671                append_with(2, 102, Vec::new()),
672            ])
673            .expect("append");
674        backend
675            .record_snapshot_anchor(SnapshotAnchor::new(
676                2,
677                "cache://snapshots/run-anchor/2",
678                "blake3:abc",
679            ))
680            .expect("record anchor");
681        let reader = EventStoreReader::new(backend);
682
683        let (plan, scan) = reader
684            .scan_snapshot_replay_tail()
685            .expect("snapshot replay tail");
686        let seqs: Vec<_> = scan.map(|entry| entry.expect("entry").seq).collect();
687
688        assert_eq!(plan.from_seq, 3);
689        assert_eq!(plan.to_seq, 2);
690        assert!(plan.is_empty());
691        assert!(seqs.is_empty());
692    }
693
694    #[rstest]
695    fn scan_seq_returns_committed_entry(reader_with_three: EventStoreReader<MemoryBackend>) {
696        let entry = reader_with_three
697            .scan_seq(2)
698            .expect("scan")
699            .expect("present");
700
701        assert_eq!(entry.seq, 2);
702        assert_eq!(entry.ts_init, UnixNanos::from(102));
703    }
704
705    #[rstest]
706    fn scan_seq_returns_none_outside_watermark(reader_with_three: EventStoreReader<MemoryBackend>) {
707        assert!(reader_with_three.scan_seq(0).expect("scan").is_none());
708        assert!(reader_with_three.scan_seq(99).expect("scan").is_none());
709    }
710
711    #[rstest]
712    fn lookup_finds_recorded_index_key() {
713        let mut backend = MemoryBackend::new();
714        backend.open_run(manifest("run-lookup")).expect("open run");
715        backend
716            .append_batch(&[
717                AppendEntry::new(
718                    build_entry(1, 100),
719                    vec![IndexKey::new(IndexKind::ClientOrderId, "O-1".to_string())],
720                ),
721                AppendEntry::new(
722                    build_entry(2, 101),
723                    vec![IndexKey::new(IndexKind::VenueOrderId, "V-1".to_string())],
724                ),
725            ])
726            .expect("append");
727        let reader = EventStoreReader::new(backend);
728
729        assert_eq!(
730            reader
731                .lookup(IndexKind::ClientOrderId, "O-1")
732                .expect("lookup"),
733            Some(1),
734        );
735        assert_eq!(
736            reader
737                .lookup(IndexKind::VenueOrderId, "V-1")
738                .expect("lookup"),
739            Some(2),
740        );
741        assert!(
742            reader
743                .lookup(IndexKind::ClientOrderId, "missing")
744                .expect("lookup")
745                .is_none(),
746        );
747    }
748
749    #[rstest]
750    fn scan_range_forward_yields_entries_in_order(
751        reader_with_three: EventStoreReader<MemoryBackend>,
752    ) {
753        let seqs: Vec<u64> = reader_with_three
754            .scan_range(1, 3, ScanDirection::Forward)
755            .map(|r| r.expect("entry").seq)
756            .collect();
757
758        assert_eq!(seqs, vec![1, 2, 3]);
759    }
760
761    #[rstest]
762    fn scan_range_reverse_yields_entries_in_reverse(
763        reader_with_three: EventStoreReader<MemoryBackend>,
764    ) {
765        let seqs: Vec<u64> = reader_with_three
766            .scan_range(1, 3, ScanDirection::Reverse)
767            .map(|r| r.expect("entry").seq)
768            .collect();
769
770        assert_eq!(seqs, vec![3, 2, 1]);
771    }
772
773    #[rstest]
774    fn scan_range_window_clips_to_request() {
775        let reader = populated(10);
776
777        let seqs: Vec<u64> = reader
778            .scan_range(4, 7, ScanDirection::Forward)
779            .map(|r| r.expect("entry").seq)
780            .collect();
781
782        assert_eq!(seqs, vec![4, 5, 6, 7]);
783    }
784
785    #[rstest]
786    fn scan_range_chunked_forward_walks_full_range() {
787        // Seven entries with a chunk size of 2 forces the iterator to make four
788        // backend calls (sizes 2, 2, 2, 1) and stitch them into a single forward
789        // sequence without skipping seq 1, 4, or 7.
790        let reader = populated(7);
791
792        let seqs: Vec<u64> = reader
793            .scan_range_chunked(1, 7, ScanDirection::Forward, 2)
794            .map(|r| r.expect("entry").seq)
795            .collect();
796
797        assert_eq!(seqs, vec![1, 2, 3, 4, 5, 6, 7]);
798    }
799
800    #[rstest]
801    fn scan_range_chunked_reverse_walks_full_range() {
802        // Mirror of the forward case: chunk size 2 over seven entries must yield
803        // descending [7, 6, 5, 4, 3, 2, 1] without dropping the chunk-boundary seqs.
804        let reader = populated(7);
805
806        let seqs: Vec<u64> = reader
807            .scan_range_chunked(1, 7, ScanDirection::Reverse, 2)
808            .map(|r| r.expect("entry").seq)
809            .collect();
810
811        assert_eq!(seqs, vec![7, 6, 5, 4, 3, 2, 1]);
812    }
813
814    #[rstest]
815    fn scan_range_clips_to_high_watermark() {
816        // Requesting beyond the watermark must terminate without a Gap error: the
817        // backend reports the empty tail by returning an empty Vec, and the iterator
818        // honors that as end-of-stream.
819        let reader = populated(3);
820
821        let seqs: Vec<u64> = reader
822            .scan_range_chunked(1, 99, ScanDirection::Forward, 2)
823            .map(|r| r.expect("entry").seq)
824            .collect();
825
826        assert_eq!(seqs, vec![1, 2, 3]);
827    }
828
829    #[rstest]
830    fn scan_range_reverse_clips_to_high_watermark() {
831        // Reverse scans whose `to` sits more than one chunk above the high-watermark
832        // must still walk the rows below the watermark. Without the up-front cursor
833        // clamp, the backend would clip the first chunk to an empty Vec (entirely
834        // above the watermark) and the iterator would terminate before yielding any
835        // entry: regression coverage for the open `to` forensics call site.
836        let reader = populated(3);
837
838        let seqs: Vec<u64> = reader
839            .scan_range_chunked(1, 99, ScanDirection::Reverse, 2)
840            .map(|r| r.expect("entry").seq)
841            .collect();
842
843        assert_eq!(seqs, vec![3, 2, 1]);
844    }
845
846    #[rstest]
847    fn scan_range_reverse_with_to_at_u64_max() {
848        // Belt-and-braces: even an open upper bound at `u64::MAX` must yield the
849        // durable rows. Demonstrates the clamp guards against pathological inputs
850        // (a defensive caller, a debug REPL, or a max-sentinel) without spinning.
851        let reader = populated(3);
852
853        let seqs: Vec<u64> = reader
854            .scan_range_chunked(1, u64::MAX, ScanDirection::Reverse, 1)
855            .map(|r| r.expect("entry").seq)
856            .collect();
857
858        assert_eq!(seqs, vec![3, 2, 1]);
859    }
860
861    #[rstest]
862    fn scan_range_reverse_above_watermark_yields_nothing() {
863        // The reverse range sits entirely above the high-watermark; the iterator
864        // must terminate cleanly with zero entries instead of stepping forever.
865        let reader = populated(3);
866
867        let seqs: Vec<u64> = reader
868            .scan_range(10, 20, ScanDirection::Reverse)
869            .map(|r| r.expect("entry").seq)
870            .collect();
871
872        assert!(seqs.is_empty(), "seqs was: {seqs:?}");
873    }
874
875    #[rstest]
876    #[case::inverted(5, 1, ScanDirection::Forward)]
877    #[case::zero_from(0, 5, ScanDirection::Forward)]
878    #[case::inverted_reverse(5, 1, ScanDirection::Reverse)]
879    fn scan_range_empty_bounds_yield_no_entries(
880        #[case] from: u64,
881        #[case] to: u64,
882        #[case] direction: ScanDirection,
883    ) {
884        let reader = populated(3);
885
886        let seqs: Vec<u64> = reader
887            .scan_range(from, to, direction)
888            .map(|r| r.expect("entry").seq)
889            .collect();
890
891        assert!(seqs.is_empty(), "seqs was: {seqs:?}");
892    }
893
894    #[rstest]
895    fn scan_range_propagates_hash_mismatch_error() {
896        // The MemoryBackend's tampered-payload path returns HashMismatch on scan; the
897        // iterator must surface that error and then stop yielding rather than mask the
898        // failure as end-of-stream.
899        let mut backend = MemoryBackend::new();
900        backend.open_run(manifest("run-tamper")).expect("open run");
901        let mut tampered = build_entry(1, 100);
902        tampered.payload = Bytes::from_static(b"\xFF\xFF");
903        backend
904            .append_batch(&[AppendEntry::without_indices(tampered)])
905            .expect("append");
906        let reader = EventStoreReader::new(backend);
907
908        let mut iter = reader.scan_range(1, 1, ScanDirection::Forward);
909        let first = iter.next().expect("first item");
910
911        match first {
912            Err(EventStoreError::HashMismatch { seq: 1 }) => {}
913            other => panic!("expected HashMismatch, was {other:?}"),
914        }
915        assert!(
916            iter.next().is_none(),
917            "iterator must terminate after surfacing the error",
918        );
919    }
920
921    #[rstest]
922    fn scan_range_chunk_size_zero_normalizes_to_one() {
923        // A zero chunk size must not deadlock: it normalizes to 1 so progress is
924        // guaranteed even under a pathological caller.
925        let reader = populated(3);
926
927        let seqs: Vec<u64> = reader
928            .scan_range_chunked(1, 3, ScanDirection::Forward, 0)
929            .map(|r| r.expect("entry").seq)
930            .collect();
931
932        assert_eq!(seqs, vec![1, 2, 3]);
933    }
934
935    #[rstest]
936    fn into_inner_returns_backend(reader_with_three: EventStoreReader<MemoryBackend>) {
937        let backend = reader_with_three.into_inner();
938
939        assert_eq!(backend.high_watermark().expect("hwm"), 3);
940    }
941
942    #[rstest]
943    fn lookup_uses_distinct_kinds() {
944        // Same string under two IndexKinds must return None for the kind that didn't
945        // record it; the reader's lookup path must not collapse kinds.
946        let mut backend = MemoryBackend::new();
947        backend.open_run(manifest("run-kinds")).expect("open run");
948        let shared_key = "shared-key".to_string();
949        backend
950            .append_batch(&[AppendEntry::new(
951                build_entry(1, 100),
952                vec![IndexKey::new(IndexKind::ClientOrderId, shared_key.clone())],
953            )])
954            .expect("append");
955        let reader = EventStoreReader::new(backend);
956
957        assert_eq!(
958            reader
959                .lookup(IndexKind::ClientOrderId, &shared_key)
960                .expect("lookup"),
961            Some(1),
962        );
963        assert!(
964            reader
965                .lookup(IndexKind::VenueOrderId, &shared_key)
966                .expect("lookup")
967                .is_none(),
968        );
969    }
970}