replay_channel/
lib.rs

1use crate::receiver::Receiver;
2use crate::sender::Sender;
3use crate::shared_state::SharedState;
4use std::sync::Arc;
5use append_only_vec::AppendOnlyVec;
6
7pub mod receiver;
8pub mod sender;
9mod shared_state;
10
11/// A `ReplayChannel` provides a multi-receiver, message-passing communication channel
12/// where receivers can "catch up" by receiving all previously sent messages before
13/// continuing to receive new messages.
14///
15/// Each new `Receiver` created from a `ReplayChannel` will first receive all messages
16/// that were sent prior to its creation, ensuring that it starts with the full context.
17/// Once it has caught up, it will then receive messages as they are sent in real-time.
18///
19/// This is particularly useful in scenarios where the state history is important and
20/// late-joining receivers need to process all past messages to be properly synchronized
21/// with the current state.
22/// 
23/// ReplayChannel uses an [AppendOnlyVec](https://crates.io/crates/append-only-vec) which
24/// is a lock-free, append-only vector that allows for efficient, concurrent writes and
25/// reads. This makes it suitable for high-throughput message passing.
26///
27/// # Examples
28///
29/// Creating a `ReplayChannel` and sending messages:
30///
31/// ```rust
32/// # use tokio::runtime::Runtime; // Hidden line
33/// # use replay_channel::ReplayChannel; // This line is visible in the documentation
34/// # let rt = Runtime::new().unwrap(); // Hidden line
35/// # rt.block_on(async { // Hidden line
36/// let replay_channel = ReplayChannel::new();
37/// let sender = replay_channel.sender();
38/// sender.send("message 1");
39/// sender.send("message 2");
40///
41/// let mut receiver = replay_channel.receiver();
42/// assert_eq!(receiver.receive().await, "message 1");
43/// assert_eq!(receiver.receive().await, "message 2");
44///
45/// let mut new_receiver = replay_channel.receiver();
46/// assert_eq!(new_receiver.receive().await, "message 1");
47/// assert_eq!(new_receiver.receive().await, "message 2");
48///
49/// sender.send("message 3");
50/// assert_eq!(new_receiver.receive().await, "message 3");
51/// # }); // Hidden line
52/// ```
53pub struct ReplayChannel<T: Clone + Send + 'static> {
54    shared_state: Arc<SharedState<T>>,
55}
56
57impl<T: Clone + Send + Sync + 'static> ReplayChannel<T> {
58    pub fn new() -> Self {
59        let shared_state = Arc::new(SharedState {
60            messages: AppendOnlyVec::new(),
61            notifiers: AppendOnlyVec::new(),
62        });
63        ReplayChannel { shared_state }
64    }
65
66    pub fn sender(&self) -> Sender<T> {
67        Sender::new(Arc::clone(&self.shared_state))
68    }
69
70    pub fn receiver(&self) -> Receiver<T> {
71        Receiver::new(Arc::clone(&self.shared_state))
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78
79    #[tokio::test]
80    async fn message_sending_and_receiving() {
81        let channel = ReplayChannel::new();
82        let sender = channel.sender();
83        let mut receiver = channel.receiver();
84
85        sender.send(1);
86        sender.send(2);
87
88        assert_eq!(receiver.receive().await, 1);
89        assert_eq!(receiver.receive().await, 2);
90    }
91
92    #[tokio::test]
93    async fn receiver_replays_past_messages() {
94        let channel = ReplayChannel::new();
95        let sender = channel.sender();
96        let receiver1 = channel.receiver();
97
98        // Send two messages
99        sender.send(1);
100        sender.send(2);
101
102        // Receiver 1 receives the two messages
103        assert_eq!(receiver1.receive().await, 1);
104        assert_eq!(receiver1.receive().await, 2);
105
106        // Receiver 2 is created and should receive the same two messages
107        let receiver2 = channel.receiver();
108        assert_eq!(receiver2.receive().await, 1);
109        assert_eq!(receiver2.receive().await, 2);
110
111        // Do not call receive() again to avoid blocking
112    }
113
114    #[tokio::test]
115    async fn multiple_receivers_real_time() {
116        let channel = ReplayChannel::new();
117        let sender = channel.sender();
118        let mut receiver1 = channel.receiver();
119        let mut receiver2 = channel.receiver();
120
121        sender.send(1);
122        sender.send(2);
123
124        assert_eq!(receiver1.receive().await, 1);
125        assert_eq!(receiver1.receive().await, 2);
126        assert_eq!(receiver2.receive().await, 1);
127        assert_eq!(receiver2.receive().await, 2);
128
129        sender.send(3);
130        assert_eq!(receiver1.receive().await, 3);
131        assert_eq!(receiver2.receive().await, 3);
132    }
133
134    #[tokio::test]
135    async fn no_lost_messages() {
136        let channel = ReplayChannel::new();
137        let sender1 = channel.sender();
138        let sender2 = channel.sender();
139        let mut receiver = channel.receiver();
140
141        sender1.send(1);
142        sender2.send(2);
143
144        let received1 = receiver.receive().await;
145        let received2 = receiver.receive().await;
146
147        assert!(received1 == 1 && received2 == 2 || received1 == 2 && received2 == 1);
148    }
149
150    #[tokio::test]
151    async fn receiver_message_order() {
152        let channel = ReplayChannel::new();
153        let sender = channel.sender();
154        let mut receiver = channel.receiver();
155
156        sender.send(1);
157        sender.send(2);
158        sender.send(3);
159
160        assert_eq!(receiver.receive().await, 1);
161        assert_eq!(receiver.receive().await, 2);
162        assert_eq!(receiver.receive().await, 3);
163    }
164
165    #[tokio::test]
166    async fn receiver_index_handling() {
167        let channel = ReplayChannel::new();
168        let sender = channel.sender();
169        let mut receiver = channel.receiver();
170
171        sender.send(1);
172        assert_eq!(receiver.receive().await, 1);
173
174        sender.send(2);
175        sender.send(3);
176        assert_eq!(receiver.receive().await, 2);
177        assert_eq!(receiver.receive().await, 3);
178    }
179}