1use std::collections::VecDeque;
9use std::fmt;
10
11#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum ReplayError {
16 BufferFull,
18 InvalidPosition(String),
20 SequenceNotFound(u64),
22 TimestampNotFound(u64),
24}
25
26impl fmt::Display for ReplayError {
27 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28 match self {
29 Self::BufferFull => write!(f, "replay buffer is full"),
30 Self::InvalidPosition(msg) => write!(f, "invalid seek position: {msg}"),
31 Self::SequenceNotFound(id) => write!(f, "sequence id {id} not found in buffer"),
32 Self::TimestampNotFound(ts) => write!(f, "no event at or after timestamp {ts}"),
33 }
34 }
35}
36
37impl std::error::Error for ReplayError {}
38
39#[derive(Debug, Clone, PartialEq, Eq)]
43pub struct StreamEvent {
44 pub sequence_id: u64,
46 pub timestamp: u64,
48 pub partition: u32,
50 pub key: Option<String>,
52 pub payload: Vec<u8>,
54}
55
56impl StreamEvent {
57 pub fn new(
59 sequence_id: u64,
60 timestamp: u64,
61 partition: u32,
62 key: Option<String>,
63 payload: Vec<u8>,
64 ) -> Self {
65 Self {
66 sequence_id,
67 timestamp,
68 partition,
69 key,
70 payload,
71 }
72 }
73
74 pub fn payload_bytes(&self) -> usize {
76 self.payload.len()
77 }
78}
79
80#[derive(Debug, Clone, PartialEq, Eq)]
82pub enum SeekPosition {
83 Beginning,
85 End,
87 SequenceId(u64),
89 Timestamp(u64),
91 Offset(usize),
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
97pub struct ReplayStats {
98 pub total_events: usize,
100 pub replayed_events: usize,
102 pub current_position: usize,
104 pub bytes_replayed: usize,
106}
107
108pub struct ReplayBuffer {
116 events: VecDeque<StreamEvent>,
117 max_capacity: usize,
118 cursor: usize,
120 replayed_events: usize,
121 bytes_replayed: usize,
122}
123
124impl ReplayBuffer {
125 pub fn new(max_capacity: usize) -> Self {
129 let effective_capacity = if max_capacity == 0 {
130 usize::MAX
131 } else {
132 max_capacity
133 };
134 Self {
135 events: VecDeque::new(),
136 max_capacity: effective_capacity,
137 cursor: 0,
138 replayed_events: 0,
139 bytes_replayed: 0,
140 }
141 }
142
143 pub fn append(&mut self, event: StreamEvent) -> Result<(), ReplayError> {
148 if self.events.len() >= self.max_capacity {
149 self.events.pop_front();
151 if self.cursor > 0 {
152 self.cursor -= 1;
153 }
154 }
155 self.events.push_back(event);
156 Ok(())
157 }
158
159 pub fn seek(&mut self, pos: SeekPosition) -> Result<usize, ReplayError> {
163 let new_cursor = match pos {
164 SeekPosition::Beginning => 0,
165 SeekPosition::End => self.events.len(),
166 SeekPosition::SequenceId(id) => self
167 .events
168 .iter()
169 .position(|e| e.sequence_id == id)
170 .ok_or(ReplayError::SequenceNotFound(id))?,
171 SeekPosition::Timestamp(ts) => self
172 .events
173 .iter()
174 .position(|e| e.timestamp >= ts)
175 .ok_or(ReplayError::TimestampNotFound(ts))?,
176 SeekPosition::Offset(off) => {
177 if off > self.events.len() {
178 return Err(ReplayError::InvalidPosition(format!(
179 "offset {off} exceeds buffer length {}",
180 self.events.len()
181 )));
182 }
183 off
184 }
185 };
186 self.cursor = new_cursor;
187 Ok(self.cursor)
188 }
189
190 pub fn read_next(&mut self) -> Option<&StreamEvent> {
194 if self.cursor >= self.events.len() {
195 return None;
196 }
197 let payload_len = self.events[self.cursor].payload.len();
199 let idx = self.cursor;
200 self.cursor += 1;
201 self.replayed_events += 1;
202 self.bytes_replayed += payload_len;
203 self.events.get(idx)
204 }
205
206 pub fn read_batch(&mut self, count: usize) -> Vec<&StreamEvent> {
209 let available = self.events.len().saturating_sub(self.cursor);
210 let to_read = count.min(available);
211 let start = self.cursor;
212 self.cursor += to_read;
213
214 let mut bytes = 0usize;
216 for i in start..self.cursor {
217 if let Some(e) = self.events.get(i) {
218 bytes += e.payload.len();
219 }
220 }
221 self.replayed_events += to_read;
222 self.bytes_replayed += bytes;
223
224 (start..self.cursor)
226 .filter_map(|i| self.events.get(i))
227 .collect()
228 }
229
230 pub fn reset(&mut self) {
232 self.cursor = 0;
233 }
234
235 pub fn events_remaining(&self) -> usize {
237 self.events.len().saturating_sub(self.cursor)
238 }
239
240 pub fn stats(&self) -> ReplayStats {
242 ReplayStats {
243 total_events: self.events.len(),
244 replayed_events: self.replayed_events,
245 current_position: self.cursor,
246 bytes_replayed: self.bytes_replayed,
247 }
248 }
249
250 pub fn len(&self) -> usize {
252 self.events.len()
253 }
254
255 pub fn is_empty(&self) -> bool {
257 self.events.is_empty()
258 }
259}
260
261#[cfg(test)]
264mod tests {
265 use super::*;
266
267 fn make_event(seq: u64, ts: u64, partition: u32, payload: &[u8]) -> StreamEvent {
268 StreamEvent::new(seq, ts, partition, None, payload.to_vec())
269 }
270
271 fn make_keyed_event(seq: u64, ts: u64, key: &str, payload: &[u8]) -> StreamEvent {
272 StreamEvent::new(seq, ts, 0, Some(key.to_string()), payload.to_vec())
273 }
274
275 #[test]
278 fn test_stream_event_new() {
279 let e = make_event(1, 1000, 0, b"hello");
280 assert_eq!(e.sequence_id, 1);
281 assert_eq!(e.timestamp, 1000);
282 assert_eq!(e.partition, 0);
283 assert_eq!(e.payload, b"hello");
284 assert!(e.key.is_none());
285 }
286
287 #[test]
288 fn test_stream_event_with_key() {
289 let e = make_keyed_event(2, 2000, "my-key", b"data");
290 assert_eq!(e.key, Some("my-key".to_string()));
291 }
292
293 #[test]
294 fn test_stream_event_payload_bytes() {
295 let e = make_event(1, 0, 0, b"hello world");
296 assert_eq!(e.payload_bytes(), 11);
297 }
298
299 #[test]
300 fn test_stream_event_empty_payload() {
301 let e = make_event(1, 0, 0, b"");
302 assert_eq!(e.payload_bytes(), 0);
303 }
304
305 #[test]
308 fn test_new_buffer_is_empty() {
309 let buf = ReplayBuffer::new(100);
310 assert!(buf.is_empty());
311 assert_eq!(buf.len(), 0);
312 }
313
314 #[test]
315 fn test_zero_capacity_means_unlimited() {
316 let buf = ReplayBuffer::new(0);
317 assert_eq!(buf.max_capacity, usize::MAX);
318 }
319
320 #[test]
323 fn test_append_single_event() {
324 let mut buf = ReplayBuffer::new(10);
325 buf.append(make_event(1, 100, 0, b"a")).expect("append ok");
326 assert_eq!(buf.len(), 1);
327 assert!(!buf.is_empty());
328 }
329
330 #[test]
331 fn test_append_multiple_events() {
332 let mut buf = ReplayBuffer::new(10);
333 for i in 0..5u64 {
334 buf.append(make_event(i, i * 100, 0, b"x"))
335 .expect("append ok");
336 }
337 assert_eq!(buf.len(), 5);
338 }
339
340 #[test]
341 fn test_append_evicts_oldest_on_full() {
342 let mut buf = ReplayBuffer::new(3);
343 buf.append(make_event(1, 100, 0, b"a")).expect("ok");
344 buf.append(make_event(2, 200, 0, b"b")).expect("ok");
345 buf.append(make_event(3, 300, 0, b"c")).expect("ok");
346 buf.append(make_event(4, 400, 0, b"d")).expect("ok");
348 assert_eq!(buf.len(), 3);
349 let ids: Vec<u64> = buf.events.iter().map(|e| e.sequence_id).collect();
351 assert_eq!(ids, vec![2, 3, 4]);
352 }
353
354 #[test]
355 fn test_append_eviction_adjusts_cursor() {
356 let mut buf = ReplayBuffer::new(3);
357 buf.append(make_event(1, 100, 0, b"a")).expect("ok");
358 buf.append(make_event(2, 200, 0, b"b")).expect("ok");
359 buf.append(make_event(3, 300, 0, b"c")).expect("ok");
360 buf.seek(SeekPosition::Offset(2)).expect("seek ok");
362 assert_eq!(buf.cursor, 2);
363 buf.append(make_event(4, 400, 0, b"d")).expect("ok");
365 assert_eq!(buf.cursor, 1);
366 }
367
368 #[test]
371 fn test_read_next_returns_events_in_order() {
372 let mut buf = ReplayBuffer::new(10);
373 for i in 1u64..=3 {
374 buf.append(make_event(i, i * 10, 0, b"x")).expect("ok");
375 }
376 let e1 = buf.read_next().expect("has event");
377 assert_eq!(e1.sequence_id, 1);
378 let e2 = buf.read_next().expect("has event");
379 assert_eq!(e2.sequence_id, 2);
380 let e3 = buf.read_next().expect("has event");
381 assert_eq!(e3.sequence_id, 3);
382 assert!(buf.read_next().is_none());
383 }
384
385 #[test]
386 fn test_read_next_advances_cursor() {
387 let mut buf = ReplayBuffer::new(10);
388 buf.append(make_event(1, 0, 0, b"a")).expect("ok");
389 assert_eq!(buf.cursor, 0);
390 buf.read_next();
391 assert_eq!(buf.cursor, 1);
392 }
393
394 #[test]
395 fn test_read_next_updates_stats() {
396 let mut buf = ReplayBuffer::new(10);
397 buf.append(make_event(1, 0, 0, b"abc")).expect("ok"); buf.read_next();
399 let stats = buf.stats();
400 assert_eq!(stats.replayed_events, 1);
401 assert_eq!(stats.bytes_replayed, 3);
402 }
403
404 #[test]
405 fn test_read_next_empty_buffer_returns_none() {
406 let mut buf = ReplayBuffer::new(10);
407 assert!(buf.read_next().is_none());
408 }
409
410 #[test]
413 fn test_read_batch_reads_requested_count() {
414 let mut buf = ReplayBuffer::new(20);
415 for i in 1u64..=10 {
416 buf.append(make_event(i, i, 0, b"y")).expect("ok");
417 }
418 let batch = buf.read_batch(5);
419 assert_eq!(batch.len(), 5);
420 assert_eq!(batch[0].sequence_id, 1);
421 assert_eq!(batch[4].sequence_id, 5);
422 }
423
424 #[test]
425 fn test_read_batch_clamps_to_available() {
426 let mut buf = ReplayBuffer::new(10);
427 for i in 1u64..=3 {
428 buf.append(make_event(i, i, 0, b"z")).expect("ok");
429 }
430 let batch = buf.read_batch(100);
431 assert_eq!(batch.len(), 3);
432 }
433
434 #[test]
435 fn test_read_batch_advances_cursor() {
436 let mut buf = ReplayBuffer::new(10);
437 for i in 0u64..5 {
438 buf.append(make_event(i, i, 0, b"q")).expect("ok");
439 }
440 buf.read_batch(3);
441 assert_eq!(buf.cursor, 3);
442 }
443
444 #[test]
445 fn test_read_batch_updates_stats() {
446 let mut buf = ReplayBuffer::new(10);
447 buf.append(make_event(1, 0, 0, b"ab")).expect("ok"); buf.append(make_event(2, 1, 0, b"cde")).expect("ok"); buf.read_batch(2);
450 let s = buf.stats();
451 assert_eq!(s.replayed_events, 2);
452 assert_eq!(s.bytes_replayed, 5);
453 }
454
455 #[test]
456 fn test_read_batch_empty_buffer() {
457 let mut buf = ReplayBuffer::new(10);
458 let batch = buf.read_batch(5);
459 assert!(batch.is_empty());
460 }
461
462 #[test]
465 fn test_seek_beginning() {
466 let mut buf = ReplayBuffer::new(10);
467 for i in 1u64..=5 {
468 buf.append(make_event(i, i * 10, 0, b"x")).expect("ok");
469 }
470 buf.read_batch(3);
471 assert_eq!(buf.cursor, 3);
472 let pos = buf.seek(SeekPosition::Beginning).expect("seek ok");
473 assert_eq!(pos, 0);
474 assert_eq!(buf.cursor, 0);
475 }
476
477 #[test]
478 fn test_seek_end() {
479 let mut buf = ReplayBuffer::new(10);
480 for i in 1u64..=5 {
481 buf.append(make_event(i, i * 10, 0, b"x")).expect("ok");
482 }
483 let pos = buf.seek(SeekPosition::End).expect("seek ok");
484 assert_eq!(pos, 5);
485 assert!(buf.read_next().is_none());
486 }
487
488 #[test]
489 fn test_seek_sequence_id_found() {
490 let mut buf = ReplayBuffer::new(10);
491 for i in 10u64..=14 {
492 buf.append(make_event(i, i, 0, b"x")).expect("ok");
493 }
494 let pos = buf.seek(SeekPosition::SequenceId(12)).expect("seek ok");
495 assert_eq!(pos, 2);
497 let e = buf.read_next().expect("event");
498 assert_eq!(e.sequence_id, 12);
499 }
500
501 #[test]
502 fn test_seek_sequence_id_not_found() {
503 let mut buf = ReplayBuffer::new(10);
504 buf.append(make_event(1, 0, 0, b"x")).expect("ok");
505 let err = buf.seek(SeekPosition::SequenceId(99)).unwrap_err();
506 assert_eq!(err, ReplayError::SequenceNotFound(99));
507 }
508
509 #[test]
510 fn test_seek_timestamp_found() {
511 let mut buf = ReplayBuffer::new(10);
512 for i in 0u64..5 {
513 buf.append(make_event(i, i * 100, 0, b"x")).expect("ok");
514 }
515 let pos = buf.seek(SeekPosition::Timestamp(200)).expect("seek ok");
517 assert_eq!(pos, 2); let e = buf.read_next().expect("event");
519 assert_eq!(e.timestamp, 200);
520 }
521
522 #[test]
523 fn test_seek_timestamp_not_found() {
524 let mut buf = ReplayBuffer::new(10);
525 buf.append(make_event(1, 50, 0, b"x")).expect("ok");
526 let err = buf.seek(SeekPosition::Timestamp(9999)).unwrap_err();
527 assert_eq!(err, ReplayError::TimestampNotFound(9999));
528 }
529
530 #[test]
531 fn test_seek_offset_valid() {
532 let mut buf = ReplayBuffer::new(10);
533 for i in 0u64..5 {
534 buf.append(make_event(i, i, 0, b"x")).expect("ok");
535 }
536 let pos = buf.seek(SeekPosition::Offset(3)).expect("seek ok");
537 assert_eq!(pos, 3);
538 let e = buf.read_next().expect("event");
539 assert_eq!(e.sequence_id, 3);
540 }
541
542 #[test]
543 fn test_seek_offset_exactly_at_end() {
544 let mut buf = ReplayBuffer::new(10);
545 for i in 0u64..3 {
546 buf.append(make_event(i, i, 0, b"x")).expect("ok");
547 }
548 let pos = buf.seek(SeekPosition::Offset(3)).expect("seek ok");
549 assert_eq!(pos, 3);
550 }
551
552 #[test]
553 fn test_seek_offset_beyond_end_errors() {
554 let mut buf = ReplayBuffer::new(10);
555 buf.append(make_event(1, 0, 0, b"x")).expect("ok");
556 let err = buf.seek(SeekPosition::Offset(100)).unwrap_err();
557 matches!(err, ReplayError::InvalidPosition(_));
558 }
559
560 #[test]
563 fn test_reset_moves_cursor_to_zero() {
564 let mut buf = ReplayBuffer::new(10);
565 for i in 1u64..=5 {
566 buf.append(make_event(i, i, 0, b"x")).expect("ok");
567 }
568 buf.read_batch(4);
569 assert_eq!(buf.cursor, 4);
570 buf.reset();
571 assert_eq!(buf.cursor, 0);
572 }
573
574 #[test]
575 fn test_reset_allows_rereading() {
576 let mut buf = ReplayBuffer::new(10);
577 buf.append(make_event(1, 0, 0, b"hello")).expect("ok");
578 let e1 = buf.read_next().expect("first read");
579 assert_eq!(e1.sequence_id, 1);
580 buf.reset();
581 let e2 = buf.read_next().expect("second read after reset");
582 assert_eq!(e2.sequence_id, 1);
583 }
584
585 #[test]
588 fn test_events_remaining_full_buffer() {
589 let mut buf = ReplayBuffer::new(10);
590 for i in 0u64..5 {
591 buf.append(make_event(i, i, 0, b"x")).expect("ok");
592 }
593 assert_eq!(buf.events_remaining(), 5);
594 }
595
596 #[test]
597 fn test_events_remaining_after_reads() {
598 let mut buf = ReplayBuffer::new(10);
599 for i in 0u64..5 {
600 buf.append(make_event(i, i, 0, b"x")).expect("ok");
601 }
602 buf.read_batch(3);
603 assert_eq!(buf.events_remaining(), 2);
604 }
605
606 #[test]
607 fn test_events_remaining_at_end() {
608 let mut buf = ReplayBuffer::new(10);
609 buf.append(make_event(1, 0, 0, b"x")).expect("ok");
610 buf.seek(SeekPosition::End).expect("seek ok");
611 assert_eq!(buf.events_remaining(), 0);
612 }
613
614 #[test]
617 fn test_stats_initial_state() {
618 let buf = ReplayBuffer::new(10);
619 let s = buf.stats();
620 assert_eq!(s.total_events, 0);
621 assert_eq!(s.replayed_events, 0);
622 assert_eq!(s.current_position, 0);
623 assert_eq!(s.bytes_replayed, 0);
624 }
625
626 #[test]
627 fn test_stats_after_appends_and_reads() {
628 let mut buf = ReplayBuffer::new(10);
629 buf.append(make_event(1, 0, 0, b"ab")).expect("ok"); buf.append(make_event(2, 1, 0, b"cde")).expect("ok"); buf.read_next(); let s = buf.stats();
633 assert_eq!(s.total_events, 2);
634 assert_eq!(s.replayed_events, 1);
635 assert_eq!(s.current_position, 1);
636 assert_eq!(s.bytes_replayed, 2);
637 }
638
639 #[test]
640 fn test_stats_total_events_decreases_on_eviction() {
641 let mut buf = ReplayBuffer::new(2);
642 buf.append(make_event(1, 0, 0, b"x")).expect("ok");
643 buf.append(make_event(2, 1, 0, b"x")).expect("ok");
644 buf.append(make_event(3, 2, 0, b"x")).expect("ok");
646 let s = buf.stats();
647 assert_eq!(s.total_events, 2); }
649
650 #[test]
653 fn test_capacity_one_always_keeps_latest() {
654 let mut buf = ReplayBuffer::new(1);
655 for i in 1u64..=5 {
656 buf.append(make_event(i, i, 0, b"x")).expect("ok");
657 }
658 assert_eq!(buf.len(), 1);
659 let e = buf.read_next().expect("event");
660 assert_eq!(e.sequence_id, 5);
661 }
662
663 #[test]
664 fn test_capacity_eviction_fifo_order() {
665 let mut buf = ReplayBuffer::new(3);
666 for i in 1u64..=6 {
667 buf.append(make_event(i, i, 0, b"x")).expect("ok");
668 }
669 let ids: Vec<u64> = buf.events.iter().map(|e| e.sequence_id).collect();
671 assert_eq!(ids, vec![4, 5, 6]);
672 }
673
674 #[test]
677 fn test_replay_error_display_buffer_full() {
678 let e = ReplayError::BufferFull;
679 assert!(e.to_string().contains("full"));
680 }
681
682 #[test]
683 fn test_replay_error_display_invalid_position() {
684 let e = ReplayError::InvalidPosition("offset 100".to_string());
685 assert!(e.to_string().contains("offset 100"));
686 }
687
688 #[test]
689 fn test_replay_error_display_sequence_not_found() {
690 let e = ReplayError::SequenceNotFound(42);
691 assert!(e.to_string().contains("42"));
692 }
693
694 #[test]
695 fn test_replay_error_display_timestamp_not_found() {
696 let e = ReplayError::TimestampNotFound(9999);
697 assert!(e.to_string().contains("9999"));
698 }
699
700 #[test]
703 fn test_seek_timestamp_exact_match() {
704 let mut buf = ReplayBuffer::new(10);
705 buf.append(make_event(1, 100, 0, b"a")).expect("ok");
706 buf.append(make_event(2, 200, 0, b"b")).expect("ok");
707 buf.append(make_event(3, 300, 0, b"c")).expect("ok");
708 let pos = buf.seek(SeekPosition::Timestamp(100)).expect("ok");
709 assert_eq!(pos, 0);
710 }
711
712 #[test]
713 fn test_seek_timestamp_between_events() {
714 let mut buf = ReplayBuffer::new(10);
715 buf.append(make_event(1, 100, 0, b"a")).expect("ok");
716 buf.append(make_event(2, 300, 0, b"b")).expect("ok");
717 let pos = buf.seek(SeekPosition::Timestamp(200)).expect("ok");
719 assert_eq!(pos, 1);
720 }
721
722 #[test]
725 fn test_events_on_different_partitions() {
726 let mut buf = ReplayBuffer::new(10);
727 buf.append(make_event(1, 0, 0, b"part0")).expect("ok");
728 buf.append(make_event(2, 1, 1, b"part1")).expect("ok");
729 buf.append(make_event(3, 2, 0, b"part0_again")).expect("ok");
730 assert_eq!(buf.len(), 3);
731 let e = buf.read_next().expect("event");
732 assert_eq!(e.partition, 0);
733 let e = buf.read_next().expect("event");
734 assert_eq!(e.partition, 1);
735 }
736
737 #[test]
740 fn test_seek_read_seek_read_pattern() {
741 let mut buf = ReplayBuffer::new(20);
742 for i in 1u64..=10 {
743 let label = format!("event-{i}");
744 buf.append(make_event(i, i * 10, 0, label.as_bytes()))
745 .expect("ok");
746 }
747 let batch1 = buf.read_batch(3);
749 assert_eq!(batch1.len(), 3);
750 buf.seek(SeekPosition::SequenceId(5)).expect("ok");
752 let e = buf.read_next().expect("event");
753 assert_eq!(e.sequence_id, 5);
754 }
755}