use bytes::Bytes;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::mpsc;
pub struct EventReplayBuffer {
frames: Mutex<VecDeque<(u64, Bytes)>>,
next_seq: AtomicU64,
capacity: usize,
subscribers: Mutex<Vec<mpsc::UnboundedSender<Bytes>>>,
}
impl EventReplayBuffer {
pub fn new(capacity: usize) -> Self {
Self {
frames: Mutex::new(VecDeque::with_capacity(capacity)),
next_seq: AtomicU64::new(1),
capacity,
subscribers: Mutex::new(Vec::new()),
}
}
pub fn push_json(&self, json: &str) -> (u64, Bytes) {
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
let frame = Bytes::from(format!("id: {seq}\ndata: {json}\n\n"));
{
let mut frames = self.frames.lock();
frames.push_back((seq, frame.clone()));
while frames.len() > self.capacity {
frames.pop_front();
}
}
{
let mut subs = self.subscribers.lock();
subs.retain(|tx| tx.send(frame.clone()).is_ok());
}
(seq, frame)
}
pub fn replay_after(&self, last_seen_seq: u64) -> Vec<Bytes> {
self.frames
.lock()
.iter()
.filter(|(seq, _)| *seq > last_seen_seq)
.map(|(_, frame)| frame.clone())
.collect()
}
pub fn subscribe(&self) -> mpsc::UnboundedReceiver<Bytes> {
let (tx, rx) = mpsc::unbounded_channel();
self.subscribers.lock().push(tx);
rx
}
pub fn subscribe_after(
&self,
last_seen_seq: u64,
) -> (Vec<Bytes>, mpsc::UnboundedReceiver<Bytes>) {
let (tx, rx) = mpsc::unbounded_channel();
let frames = self.frames.lock();
let mut subs = self.subscribers.lock();
let replayed: Vec<Bytes> = frames
.iter()
.filter(|(seq, _)| *seq > last_seen_seq)
.map(|(_, frame)| frame.clone())
.collect();
subs.push(tx);
(replayed, rx)
}
pub fn close_subscribers(&self) {
self.subscribers.lock().clear();
}
pub fn current_seq(&self) -> u64 {
self.next_seq.load(Ordering::Relaxed).saturating_sub(1)
}
pub fn len(&self) -> usize {
self.frames.lock().len()
}
pub fn is_empty(&self) -> bool {
self.frames.lock().is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn push_and_replay() {
let buf = EventReplayBuffer::new(10);
buf.push_json(r#"{"a":1}"#);
buf.push_json(r#"{"a":2}"#);
buf.push_json(r#"{"a":3}"#);
let replayed = buf.replay_after(1);
assert_eq!(replayed.len(), 2);
assert!(String::from_utf8_lossy(&replayed[0]).contains("id: 2\n"));
assert!(String::from_utf8_lossy(&replayed[1]).contains("id: 3\n"));
}
#[test]
fn replay_all_from_zero() {
let buf = EventReplayBuffer::new(10);
buf.push_json(r#"{"x":1}"#);
buf.push_json(r#"{"x":2}"#);
let replayed = buf.replay_after(0);
assert_eq!(replayed.len(), 2);
}
#[test]
fn replay_empty_buffer() {
let buf = EventReplayBuffer::new(10);
let replayed = buf.replay_after(0);
assert!(replayed.is_empty());
}
#[test]
fn replay_future_seq() {
let buf = EventReplayBuffer::new(10);
buf.push_json(r#"{"a":1}"#);
buf.push_json(r#"{"a":2}"#);
buf.push_json(r#"{"a":3}"#);
let replayed = buf.replay_after(999);
assert!(replayed.is_empty());
}
#[test]
fn ring_buffer_eviction() {
let buf = EventReplayBuffer::new(3);
for i in 1..=5 {
buf.push_json(&format!(r#"{{"n":{i}}}"#));
}
assert_eq!(buf.len(), 3);
let replayed = buf.replay_after(0);
assert_eq!(replayed.len(), 3);
assert!(String::from_utf8_lossy(&replayed[0]).contains("id: 3\n"));
assert!(String::from_utf8_lossy(&replayed[2]).contains("id: 5\n"));
}
#[test]
fn current_seq_starts_at_zero() {
let buf = EventReplayBuffer::new(10);
assert_eq!(buf.current_seq(), 0);
}
#[test]
fn current_seq_increments() {
let buf = EventReplayBuffer::new(10);
buf.push_json("{}");
assert_eq!(buf.current_seq(), 1);
buf.push_json("{}");
assert_eq!(buf.current_seq(), 2);
}
#[tokio::test]
async fn subscriber_receives_new_frames() {
let buf = EventReplayBuffer::new(10);
let mut rx = buf.subscribe();
buf.push_json(r#"{"event":"hello"}"#);
let frame = rx.try_recv().unwrap();
let s = String::from_utf8_lossy(&frame);
assert!(s.contains("id: 1\n"));
assert!(s.contains(r#"{"event":"hello"}"#));
}
#[tokio::test]
async fn subscriber_gets_only_new_frames() {
let buf = EventReplayBuffer::new(10);
buf.push_json(r#"{"n":1}"#);
buf.push_json(r#"{"n":2}"#);
let mut rx = buf.subscribe();
buf.push_json(r#"{"n":3}"#);
let frame = rx.try_recv().unwrap();
assert!(String::from_utf8_lossy(&frame).contains("id: 3\n"));
assert!(rx.try_recv().is_err());
}
#[test]
fn dead_subscriber_cleaned_up() {
let buf = EventReplayBuffer::new(10);
let rx = buf.subscribe();
drop(rx);
buf.push_json("{}");
assert_eq!(buf.current_seq(), 1);
}
#[tokio::test]
async fn concurrent_push() {
use std::sync::Arc;
let buf = Arc::new(EventReplayBuffer::new(2000));
let mut handles = Vec::new();
for _ in 0..10 {
let buf = Arc::clone(&buf);
handles.push(tokio::spawn(async move {
for _ in 0..100 {
buf.push_json("{}");
}
}));
}
for h in handles {
h.await.unwrap();
}
assert_eq!(buf.current_seq(), 1000);
}
#[tokio::test]
async fn subscribe_after_no_duplicates_no_gaps() {
let buf = EventReplayBuffer::new(100);
buf.push_json(r#"{"n":1}"#);
buf.push_json(r#"{"n":2}"#);
buf.push_json(r#"{"n":3}"#);
let (replayed, mut live_rx) = buf.subscribe_after(1);
assert_eq!(replayed.len(), 2);
assert!(String::from_utf8_lossy(&replayed[0]).contains("id: 2\n"));
assert!(String::from_utf8_lossy(&replayed[1]).contains("id: 3\n"));
buf.push_json(r#"{"n":4}"#);
let frame = live_rx.try_recv().unwrap();
assert!(String::from_utf8_lossy(&frame).contains("id: 4\n"));
assert!(live_rx.try_recv().is_err());
}
#[tokio::test]
async fn subscribe_after_zero_replays_all() {
let buf = EventReplayBuffer::new(100);
buf.push_json("{}");
buf.push_json("{}");
let (replayed, _rx) = buf.subscribe_after(0);
assert_eq!(replayed.len(), 2);
}
#[tokio::test]
async fn close_subscribers_terminates_live_stream() {
let buf = EventReplayBuffer::new(100);
let mut rx = buf.subscribe();
buf.push_json("{}");
assert!(rx.try_recv().is_ok());
buf.close_subscribers();
buf.push_json("{}");
assert!(rx.recv().await.is_none());
}
#[tokio::test]
async fn close_subscribers_then_new_subscribe() {
let buf = EventReplayBuffer::new(100);
let mut rx1 = buf.subscribe();
buf.close_subscribers();
assert!(rx1.recv().await.is_none());
let mut rx2 = buf.subscribe();
buf.push_json("{}");
assert!(rx2.try_recv().is_ok());
}
}