Skip to main content

oxirs_stream/
replay_buffer.rs

1//! Event replay buffer with seek and position tracking.
2//!
3//! Provides an in-memory ring buffer for stream events that supports
4//! random access via sequence IDs, timestamps, or byte offsets. Useful
5//! for replaying historical events to late-joining consumers or during
6//! failure recovery.
7
8use std::collections::VecDeque;
9use std::fmt;
10
11// ── Error ────────────────────────────────────────────────────────────────────
12
13/// Errors that can occur during replay buffer operations.
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum ReplayError {
16    /// Buffer is at maximum capacity and `append` was called without eviction enabled.
17    BufferFull,
18    /// Seek position is structurally invalid (e.g. offset beyond end).
19    InvalidPosition(String),
20    /// No event with the requested sequence ID exists in the buffer.
21    SequenceNotFound(u64),
22    /// No event with a timestamp >= the requested value exists in the buffer.
23    TimestampNotFound(u64),
24}
25
26impl fmt::Display for ReplayError {
27    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28        match self {
29            Self::BufferFull => write!(f, "replay buffer is full"),
30            Self::InvalidPosition(msg) => write!(f, "invalid seek position: {msg}"),
31            Self::SequenceNotFound(id) => write!(f, "sequence id {id} not found in buffer"),
32            Self::TimestampNotFound(ts) => write!(f, "no event at or after timestamp {ts}"),
33        }
34    }
35}
36
37impl std::error::Error for ReplayError {}
38
39// ── Core types ────────────────────────────────────────────────────────────────
40
41/// A single event stored in the replay buffer.
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct StreamEvent {
44    /// Monotonically increasing sequence identifier.
45    pub sequence_id: u64,
46    /// Unix epoch milliseconds at event creation.
47    pub timestamp: u64,
48    /// Partition this event belongs to.
49    pub partition: u32,
50    /// Optional routing / deduplication key.
51    pub key: Option<String>,
52    /// Raw event payload.
53    pub payload: Vec<u8>,
54}
55
56impl StreamEvent {
57    /// Create a new `StreamEvent`.
58    pub fn new(
59        sequence_id: u64,
60        timestamp: u64,
61        partition: u32,
62        key: Option<String>,
63        payload: Vec<u8>,
64    ) -> Self {
65        Self {
66            sequence_id,
67            timestamp,
68            partition,
69            key,
70            payload,
71        }
72    }
73
74    /// Total bytes occupied by the payload.
75    pub fn payload_bytes(&self) -> usize {
76        self.payload.len()
77    }
78}
79
80/// Describes where a seek operation should move the cursor.
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub enum SeekPosition {
83    /// Move to the very first event in the buffer.
84    Beginning,
85    /// Move to one past the last event (EOF).
86    End,
87    /// Move to the event with this exact sequence ID.
88    SequenceId(u64),
89    /// Move to the first event whose `timestamp >=` the supplied value.
90    Timestamp(u64),
91    /// Move to an absolute buffer offset (0-based index).
92    Offset(usize),
93}
94
95/// Snapshot of replay buffer statistics.
96#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct ReplayStats {
98    /// Total events currently held in the buffer.
99    pub total_events: usize,
100    /// Number of events that have been delivered by `read_next` / `read_batch`.
101    pub replayed_events: usize,
102    /// Current cursor position (0-based index into the buffer).
103    pub current_position: usize,
104    /// Sum of all payload bytes delivered so far.
105    pub bytes_replayed: usize,
106}
107
108// ── ReplayBuffer ─────────────────────────────────────────────────────────────
109
110/// An in-memory event replay buffer.
111///
112/// Events are appended sequentially. When the buffer exceeds `max_capacity`
113/// the oldest event is evicted automatically. A cursor tracks the current
114/// read position; seek / read operations advance it.
115pub struct ReplayBuffer {
116    events: VecDeque<StreamEvent>,
117    max_capacity: usize,
118    /// Index of the next event to read (logical position inside `events`).
119    cursor: usize,
120    replayed_events: usize,
121    bytes_replayed: usize,
122}
123
124impl ReplayBuffer {
125    /// Create a new `ReplayBuffer` with the given maximum capacity.
126    ///
127    /// When `max_capacity` is 0 the buffer has no effective limit (usize::MAX).
128    pub fn new(max_capacity: usize) -> Self {
129        let effective_capacity = if max_capacity == 0 {
130            usize::MAX
131        } else {
132            max_capacity
133        };
134        Self {
135            events: VecDeque::new(),
136            max_capacity: effective_capacity,
137            cursor: 0,
138            replayed_events: 0,
139            bytes_replayed: 0,
140        }
141    }
142
143    /// Append a new event to the buffer.
144    ///
145    /// If the buffer is at `max_capacity` the oldest event is evicted and the
146    /// cursor is adjusted so it never points past the start.
147    pub fn append(&mut self, event: StreamEvent) -> Result<(), ReplayError> {
148        if self.events.len() >= self.max_capacity {
149            // Evict the oldest event; adjust cursor if it was pointing at it.
150            self.events.pop_front();
151            if self.cursor > 0 {
152                self.cursor -= 1;
153            }
154        }
155        self.events.push_back(event);
156        Ok(())
157    }
158
159    /// Move the read cursor to `pos`.
160    ///
161    /// Returns the new cursor position on success.
162    pub fn seek(&mut self, pos: SeekPosition) -> Result<usize, ReplayError> {
163        let new_cursor = match pos {
164            SeekPosition::Beginning => 0,
165            SeekPosition::End => self.events.len(),
166            SeekPosition::SequenceId(id) => self
167                .events
168                .iter()
169                .position(|e| e.sequence_id == id)
170                .ok_or(ReplayError::SequenceNotFound(id))?,
171            SeekPosition::Timestamp(ts) => self
172                .events
173                .iter()
174                .position(|e| e.timestamp >= ts)
175                .ok_or(ReplayError::TimestampNotFound(ts))?,
176            SeekPosition::Offset(off) => {
177                if off > self.events.len() {
178                    return Err(ReplayError::InvalidPosition(format!(
179                        "offset {off} exceeds buffer length {}",
180                        self.events.len()
181                    )));
182                }
183                off
184            }
185        };
186        self.cursor = new_cursor;
187        Ok(self.cursor)
188    }
189
190    /// Read the event at the current cursor position and advance the cursor.
191    ///
192    /// Returns `None` when the cursor is past the last event.
193    pub fn read_next(&mut self) -> Option<&StreamEvent> {
194        if self.cursor >= self.events.len() {
195            return None;
196        }
197        // Capture payload length before any mutation.
198        let payload_len = self.events[self.cursor].payload.len();
199        let idx = self.cursor;
200        self.cursor += 1;
201        self.replayed_events += 1;
202        self.bytes_replayed += payload_len;
203        self.events.get(idx)
204    }
205
206    /// Read up to `count` events starting at the current cursor, advancing
207    /// the cursor past each returned event.
208    pub fn read_batch(&mut self, count: usize) -> Vec<&StreamEvent> {
209        let available = self.events.len().saturating_sub(self.cursor);
210        let to_read = count.min(available);
211        let start = self.cursor;
212        self.cursor += to_read;
213
214        // Accumulate payload stats.
215        let mut bytes = 0usize;
216        for i in start..self.cursor {
217            if let Some(e) = self.events.get(i) {
218                bytes += e.payload.len();
219            }
220        }
221        self.replayed_events += to_read;
222        self.bytes_replayed += bytes;
223
224        // Collect references from the range.
225        (start..self.cursor)
226            .filter_map(|i| self.events.get(i))
227            .collect()
228    }
229
230    /// Reset the cursor to the beginning of the buffer.
231    pub fn reset(&mut self) {
232        self.cursor = 0;
233    }
234
235    /// Number of events remaining from the cursor to the end of the buffer.
236    pub fn events_remaining(&self) -> usize {
237        self.events.len().saturating_sub(self.cursor)
238    }
239
240    /// Snapshot of current buffer statistics.
241    pub fn stats(&self) -> ReplayStats {
242        ReplayStats {
243            total_events: self.events.len(),
244            replayed_events: self.replayed_events,
245            current_position: self.cursor,
246            bytes_replayed: self.bytes_replayed,
247        }
248    }
249
250    /// Total number of events currently stored in the buffer.
251    pub fn len(&self) -> usize {
252        self.events.len()
253    }
254
255    /// Returns `true` if the buffer contains no events.
256    pub fn is_empty(&self) -> bool {
257        self.events.is_empty()
258    }
259}
260
261// ── Tests ─────────────────────────────────────────────────────────────────────
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266
267    fn make_event(seq: u64, ts: u64, partition: u32, payload: &[u8]) -> StreamEvent {
268        StreamEvent::new(seq, ts, partition, None, payload.to_vec())
269    }
270
271    fn make_keyed_event(seq: u64, ts: u64, key: &str, payload: &[u8]) -> StreamEvent {
272        StreamEvent::new(seq, ts, 0, Some(key.to_string()), payload.to_vec())
273    }
274
275    // ── StreamEvent basics ──────────────────────────────────────────────────
276
277    #[test]
278    fn test_stream_event_new() {
279        let e = make_event(1, 1000, 0, b"hello");
280        assert_eq!(e.sequence_id, 1);
281        assert_eq!(e.timestamp, 1000);
282        assert_eq!(e.partition, 0);
283        assert_eq!(e.payload, b"hello");
284        assert!(e.key.is_none());
285    }
286
287    #[test]
288    fn test_stream_event_with_key() {
289        let e = make_keyed_event(2, 2000, "my-key", b"data");
290        assert_eq!(e.key, Some("my-key".to_string()));
291    }
292
293    #[test]
294    fn test_stream_event_payload_bytes() {
295        let e = make_event(1, 0, 0, b"hello world");
296        assert_eq!(e.payload_bytes(), 11);
297    }
298
299    #[test]
300    fn test_stream_event_empty_payload() {
301        let e = make_event(1, 0, 0, b"");
302        assert_eq!(e.payload_bytes(), 0);
303    }
304
305    // ── ReplayBuffer construction ───────────────────────────────────────────
306
307    #[test]
308    fn test_new_buffer_is_empty() {
309        let buf = ReplayBuffer::new(100);
310        assert!(buf.is_empty());
311        assert_eq!(buf.len(), 0);
312    }
313
314    #[test]
315    fn test_zero_capacity_means_unlimited() {
316        let buf = ReplayBuffer::new(0);
317        assert_eq!(buf.max_capacity, usize::MAX);
318    }
319
320    // ── append ─────────────────────────────────────────────────────────────
321
322    #[test]
323    fn test_append_single_event() {
324        let mut buf = ReplayBuffer::new(10);
325        buf.append(make_event(1, 100, 0, b"a")).expect("append ok");
326        assert_eq!(buf.len(), 1);
327        assert!(!buf.is_empty());
328    }
329
330    #[test]
331    fn test_append_multiple_events() {
332        let mut buf = ReplayBuffer::new(10);
333        for i in 0..5u64 {
334            buf.append(make_event(i, i * 100, 0, b"x"))
335                .expect("append ok");
336        }
337        assert_eq!(buf.len(), 5);
338    }
339
340    #[test]
341    fn test_append_evicts_oldest_on_full() {
342        let mut buf = ReplayBuffer::new(3);
343        buf.append(make_event(1, 100, 0, b"a")).expect("ok");
344        buf.append(make_event(2, 200, 0, b"b")).expect("ok");
345        buf.append(make_event(3, 300, 0, b"c")).expect("ok");
346        // Buffer full — appending seq 4 should evict seq 1.
347        buf.append(make_event(4, 400, 0, b"d")).expect("ok");
348        assert_eq!(buf.len(), 3);
349        // seq 1 should no longer be present.
350        let ids: Vec<u64> = buf.events.iter().map(|e| e.sequence_id).collect();
351        assert_eq!(ids, vec![2, 3, 4]);
352    }
353
354    #[test]
355    fn test_append_eviction_adjusts_cursor() {
356        let mut buf = ReplayBuffer::new(3);
357        buf.append(make_event(1, 100, 0, b"a")).expect("ok");
358        buf.append(make_event(2, 200, 0, b"b")).expect("ok");
359        buf.append(make_event(3, 300, 0, b"c")).expect("ok");
360        // Advance cursor to position 2.
361        buf.seek(SeekPosition::Offset(2)).expect("seek ok");
362        assert_eq!(buf.cursor, 2);
363        // Eviction should move cursor back by 1.
364        buf.append(make_event(4, 400, 0, b"d")).expect("ok");
365        assert_eq!(buf.cursor, 1);
366    }
367
368    // ── read_next ──────────────────────────────────────────────────────────
369
370    #[test]
371    fn test_read_next_returns_events_in_order() {
372        let mut buf = ReplayBuffer::new(10);
373        for i in 1u64..=3 {
374            buf.append(make_event(i, i * 10, 0, b"x")).expect("ok");
375        }
376        let e1 = buf.read_next().expect("has event");
377        assert_eq!(e1.sequence_id, 1);
378        let e2 = buf.read_next().expect("has event");
379        assert_eq!(e2.sequence_id, 2);
380        let e3 = buf.read_next().expect("has event");
381        assert_eq!(e3.sequence_id, 3);
382        assert!(buf.read_next().is_none());
383    }
384
385    #[test]
386    fn test_read_next_advances_cursor() {
387        let mut buf = ReplayBuffer::new(10);
388        buf.append(make_event(1, 0, 0, b"a")).expect("ok");
389        assert_eq!(buf.cursor, 0);
390        buf.read_next();
391        assert_eq!(buf.cursor, 1);
392    }
393
394    #[test]
395    fn test_read_next_updates_stats() {
396        let mut buf = ReplayBuffer::new(10);
397        buf.append(make_event(1, 0, 0, b"abc")).expect("ok"); // 3 bytes
398        buf.read_next();
399        let stats = buf.stats();
400        assert_eq!(stats.replayed_events, 1);
401        assert_eq!(stats.bytes_replayed, 3);
402    }
403
404    #[test]
405    fn test_read_next_empty_buffer_returns_none() {
406        let mut buf = ReplayBuffer::new(10);
407        assert!(buf.read_next().is_none());
408    }
409
410    // ── read_batch ─────────────────────────────────────────────────────────
411
412    #[test]
413    fn test_read_batch_reads_requested_count() {
414        let mut buf = ReplayBuffer::new(20);
415        for i in 1u64..=10 {
416            buf.append(make_event(i, i, 0, b"y")).expect("ok");
417        }
418        let batch = buf.read_batch(5);
419        assert_eq!(batch.len(), 5);
420        assert_eq!(batch[0].sequence_id, 1);
421        assert_eq!(batch[4].sequence_id, 5);
422    }
423
424    #[test]
425    fn test_read_batch_clamps_to_available() {
426        let mut buf = ReplayBuffer::new(10);
427        for i in 1u64..=3 {
428            buf.append(make_event(i, i, 0, b"z")).expect("ok");
429        }
430        let batch = buf.read_batch(100);
431        assert_eq!(batch.len(), 3);
432    }
433
434    #[test]
435    fn test_read_batch_advances_cursor() {
436        let mut buf = ReplayBuffer::new(10);
437        for i in 0u64..5 {
438            buf.append(make_event(i, i, 0, b"q")).expect("ok");
439        }
440        buf.read_batch(3);
441        assert_eq!(buf.cursor, 3);
442    }
443
444    #[test]
445    fn test_read_batch_updates_stats() {
446        let mut buf = ReplayBuffer::new(10);
447        buf.append(make_event(1, 0, 0, b"ab")).expect("ok"); // 2 bytes
448        buf.append(make_event(2, 1, 0, b"cde")).expect("ok"); // 3 bytes
449        buf.read_batch(2);
450        let s = buf.stats();
451        assert_eq!(s.replayed_events, 2);
452        assert_eq!(s.bytes_replayed, 5);
453    }
454
455    #[test]
456    fn test_read_batch_empty_buffer() {
457        let mut buf = ReplayBuffer::new(10);
458        let batch = buf.read_batch(5);
459        assert!(batch.is_empty());
460    }
461
462    // ── seek ───────────────────────────────────────────────────────────────
463
464    #[test]
465    fn test_seek_beginning() {
466        let mut buf = ReplayBuffer::new(10);
467        for i in 1u64..=5 {
468            buf.append(make_event(i, i * 10, 0, b"x")).expect("ok");
469        }
470        buf.read_batch(3);
471        assert_eq!(buf.cursor, 3);
472        let pos = buf.seek(SeekPosition::Beginning).expect("seek ok");
473        assert_eq!(pos, 0);
474        assert_eq!(buf.cursor, 0);
475    }
476
477    #[test]
478    fn test_seek_end() {
479        let mut buf = ReplayBuffer::new(10);
480        for i in 1u64..=5 {
481            buf.append(make_event(i, i * 10, 0, b"x")).expect("ok");
482        }
483        let pos = buf.seek(SeekPosition::End).expect("seek ok");
484        assert_eq!(pos, 5);
485        assert!(buf.read_next().is_none());
486    }
487
488    #[test]
489    fn test_seek_sequence_id_found() {
490        let mut buf = ReplayBuffer::new(10);
491        for i in 10u64..=14 {
492            buf.append(make_event(i, i, 0, b"x")).expect("ok");
493        }
494        let pos = buf.seek(SeekPosition::SequenceId(12)).expect("seek ok");
495        // seq 12 is at index 2 (10→0, 11→1, 12→2).
496        assert_eq!(pos, 2);
497        let e = buf.read_next().expect("event");
498        assert_eq!(e.sequence_id, 12);
499    }
500
501    #[test]
502    fn test_seek_sequence_id_not_found() {
503        let mut buf = ReplayBuffer::new(10);
504        buf.append(make_event(1, 0, 0, b"x")).expect("ok");
505        let err = buf.seek(SeekPosition::SequenceId(99)).unwrap_err();
506        assert_eq!(err, ReplayError::SequenceNotFound(99));
507    }
508
509    #[test]
510    fn test_seek_timestamp_found() {
511        let mut buf = ReplayBuffer::new(10);
512        for i in 0u64..5 {
513            buf.append(make_event(i, i * 100, 0, b"x")).expect("ok");
514        }
515        // Timestamps: 0, 100, 200, 300, 400
516        let pos = buf.seek(SeekPosition::Timestamp(200)).expect("seek ok");
517        assert_eq!(pos, 2); // event with ts=200 is at index 2.
518        let e = buf.read_next().expect("event");
519        assert_eq!(e.timestamp, 200);
520    }
521
522    #[test]
523    fn test_seek_timestamp_not_found() {
524        let mut buf = ReplayBuffer::new(10);
525        buf.append(make_event(1, 50, 0, b"x")).expect("ok");
526        let err = buf.seek(SeekPosition::Timestamp(9999)).unwrap_err();
527        assert_eq!(err, ReplayError::TimestampNotFound(9999));
528    }
529
530    #[test]
531    fn test_seek_offset_valid() {
532        let mut buf = ReplayBuffer::new(10);
533        for i in 0u64..5 {
534            buf.append(make_event(i, i, 0, b"x")).expect("ok");
535        }
536        let pos = buf.seek(SeekPosition::Offset(3)).expect("seek ok");
537        assert_eq!(pos, 3);
538        let e = buf.read_next().expect("event");
539        assert_eq!(e.sequence_id, 3);
540    }
541
542    #[test]
543    fn test_seek_offset_exactly_at_end() {
544        let mut buf = ReplayBuffer::new(10);
545        for i in 0u64..3 {
546            buf.append(make_event(i, i, 0, b"x")).expect("ok");
547        }
548        let pos = buf.seek(SeekPosition::Offset(3)).expect("seek ok");
549        assert_eq!(pos, 3);
550    }
551
552    #[test]
553    fn test_seek_offset_beyond_end_errors() {
554        let mut buf = ReplayBuffer::new(10);
555        buf.append(make_event(1, 0, 0, b"x")).expect("ok");
556        let err = buf.seek(SeekPosition::Offset(100)).unwrap_err();
557        matches!(err, ReplayError::InvalidPosition(_));
558    }
559
560    // ── reset ──────────────────────────────────────────────────────────────
561
562    #[test]
563    fn test_reset_moves_cursor_to_zero() {
564        let mut buf = ReplayBuffer::new(10);
565        for i in 1u64..=5 {
566            buf.append(make_event(i, i, 0, b"x")).expect("ok");
567        }
568        buf.read_batch(4);
569        assert_eq!(buf.cursor, 4);
570        buf.reset();
571        assert_eq!(buf.cursor, 0);
572    }
573
574    #[test]
575    fn test_reset_allows_rereading() {
576        let mut buf = ReplayBuffer::new(10);
577        buf.append(make_event(1, 0, 0, b"hello")).expect("ok");
578        let e1 = buf.read_next().expect("first read");
579        assert_eq!(e1.sequence_id, 1);
580        buf.reset();
581        let e2 = buf.read_next().expect("second read after reset");
582        assert_eq!(e2.sequence_id, 1);
583    }
584
585    // ── events_remaining ───────────────────────────────────────────────────
586
587    #[test]
588    fn test_events_remaining_full_buffer() {
589        let mut buf = ReplayBuffer::new(10);
590        for i in 0u64..5 {
591            buf.append(make_event(i, i, 0, b"x")).expect("ok");
592        }
593        assert_eq!(buf.events_remaining(), 5);
594    }
595
596    #[test]
597    fn test_events_remaining_after_reads() {
598        let mut buf = ReplayBuffer::new(10);
599        for i in 0u64..5 {
600            buf.append(make_event(i, i, 0, b"x")).expect("ok");
601        }
602        buf.read_batch(3);
603        assert_eq!(buf.events_remaining(), 2);
604    }
605
606    #[test]
607    fn test_events_remaining_at_end() {
608        let mut buf = ReplayBuffer::new(10);
609        buf.append(make_event(1, 0, 0, b"x")).expect("ok");
610        buf.seek(SeekPosition::End).expect("seek ok");
611        assert_eq!(buf.events_remaining(), 0);
612    }
613
614    // ── stats ──────────────────────────────────────────────────────────────
615
616    #[test]
617    fn test_stats_initial_state() {
618        let buf = ReplayBuffer::new(10);
619        let s = buf.stats();
620        assert_eq!(s.total_events, 0);
621        assert_eq!(s.replayed_events, 0);
622        assert_eq!(s.current_position, 0);
623        assert_eq!(s.bytes_replayed, 0);
624    }
625
626    #[test]
627    fn test_stats_after_appends_and_reads() {
628        let mut buf = ReplayBuffer::new(10);
629        buf.append(make_event(1, 0, 0, b"ab")).expect("ok"); // 2 bytes
630        buf.append(make_event(2, 1, 0, b"cde")).expect("ok"); // 3 bytes
631        buf.read_next(); // reads seq 1
632        let s = buf.stats();
633        assert_eq!(s.total_events, 2);
634        assert_eq!(s.replayed_events, 1);
635        assert_eq!(s.current_position, 1);
636        assert_eq!(s.bytes_replayed, 2);
637    }
638
639    #[test]
640    fn test_stats_total_events_decreases_on_eviction() {
641        let mut buf = ReplayBuffer::new(2);
642        buf.append(make_event(1, 0, 0, b"x")).expect("ok");
643        buf.append(make_event(2, 1, 0, b"x")).expect("ok");
644        // Evict seq 1 by adding seq 3.
645        buf.append(make_event(3, 2, 0, b"x")).expect("ok");
646        let s = buf.stats();
647        assert_eq!(s.total_events, 2); // 2 and 3
648    }
649
650    // ── capacity eviction ──────────────────────────────────────────────────
651
652    #[test]
653    fn test_capacity_one_always_keeps_latest() {
654        let mut buf = ReplayBuffer::new(1);
655        for i in 1u64..=5 {
656            buf.append(make_event(i, i, 0, b"x")).expect("ok");
657        }
658        assert_eq!(buf.len(), 1);
659        let e = buf.read_next().expect("event");
660        assert_eq!(e.sequence_id, 5);
661    }
662
663    #[test]
664    fn test_capacity_eviction_fifo_order() {
665        let mut buf = ReplayBuffer::new(3);
666        for i in 1u64..=6 {
667            buf.append(make_event(i, i, 0, b"x")).expect("ok");
668        }
669        // After 6 appends with capacity 3, should have seq 4, 5, 6.
670        let ids: Vec<u64> = buf.events.iter().map(|e| e.sequence_id).collect();
671        assert_eq!(ids, vec![4, 5, 6]);
672    }
673
674    // ── ReplayError Display ────────────────────────────────────────────────
675
676    #[test]
677    fn test_replay_error_display_buffer_full() {
678        let e = ReplayError::BufferFull;
679        assert!(e.to_string().contains("full"));
680    }
681
682    #[test]
683    fn test_replay_error_display_invalid_position() {
684        let e = ReplayError::InvalidPosition("offset 100".to_string());
685        assert!(e.to_string().contains("offset 100"));
686    }
687
688    #[test]
689    fn test_replay_error_display_sequence_not_found() {
690        let e = ReplayError::SequenceNotFound(42);
691        assert!(e.to_string().contains("42"));
692    }
693
694    #[test]
695    fn test_replay_error_display_timestamp_not_found() {
696        let e = ReplayError::TimestampNotFound(9999);
697        assert!(e.to_string().contains("9999"));
698    }
699
700    // ── seek_timestamp boundary ────────────────────────────────────────────
701
702    #[test]
703    fn test_seek_timestamp_exact_match() {
704        let mut buf = ReplayBuffer::new(10);
705        buf.append(make_event(1, 100, 0, b"a")).expect("ok");
706        buf.append(make_event(2, 200, 0, b"b")).expect("ok");
707        buf.append(make_event(3, 300, 0, b"c")).expect("ok");
708        let pos = buf.seek(SeekPosition::Timestamp(100)).expect("ok");
709        assert_eq!(pos, 0);
710    }
711
712    #[test]
713    fn test_seek_timestamp_between_events() {
714        let mut buf = ReplayBuffer::new(10);
715        buf.append(make_event(1, 100, 0, b"a")).expect("ok");
716        buf.append(make_event(2, 300, 0, b"b")).expect("ok");
717        // Seek ts=200 should land at event with ts=300 (index 1).
718        let pos = buf.seek(SeekPosition::Timestamp(200)).expect("ok");
719        assert_eq!(pos, 1);
720    }
721
722    // ── partition filtering ────────────────────────────────────────────────
723
724    #[test]
725    fn test_events_on_different_partitions() {
726        let mut buf = ReplayBuffer::new(10);
727        buf.append(make_event(1, 0, 0, b"part0")).expect("ok");
728        buf.append(make_event(2, 1, 1, b"part1")).expect("ok");
729        buf.append(make_event(3, 2, 0, b"part0_again")).expect("ok");
730        assert_eq!(buf.len(), 3);
731        let e = buf.read_next().expect("event");
732        assert_eq!(e.partition, 0);
733        let e = buf.read_next().expect("event");
734        assert_eq!(e.partition, 1);
735    }
736
737    // ── concurrent seek-and-read patterns ─────────────────────────────────
738
739    #[test]
740    fn test_seek_read_seek_read_pattern() {
741        let mut buf = ReplayBuffer::new(20);
742        for i in 1u64..=10 {
743            let label = format!("event-{i}");
744            buf.append(make_event(i, i * 10, 0, label.as_bytes()))
745                .expect("ok");
746        }
747        // Read first 3.
748        let batch1 = buf.read_batch(3);
749        assert_eq!(batch1.len(), 3);
750        // Seek back to seq 5.
751        buf.seek(SeekPosition::SequenceId(5)).expect("ok");
752        let e = buf.read_next().expect("event");
753        assert_eq!(e.sequence_id, 5);
754    }
755}