1use crate::receiver::Receiver;
2use crate::sender::Sender;
3use crate::shared_state::SharedState;
4use std::sync::Arc;
5use append_only_vec::AppendOnlyVec;
6
7pub mod receiver;
8pub mod sender;
9mod shared_state;
10
11pub struct ReplayChannel<T: Clone + Send + 'static> {
54 shared_state: Arc<SharedState<T>>,
55}
56
57impl<T: Clone + Send + Sync + 'static> ReplayChannel<T> {
58 pub fn new() -> Self {
59 let shared_state = Arc::new(SharedState {
60 messages: AppendOnlyVec::new(),
61 notifiers: AppendOnlyVec::new(),
62 });
63 ReplayChannel { shared_state }
64 }
65
66 pub fn sender(&self) -> Sender<T> {
67 Sender::new(Arc::clone(&self.shared_state))
68 }
69
70 pub fn receiver(&self) -> Receiver<T> {
71 Receiver::new(Arc::clone(&self.shared_state))
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::*;
78
79 #[tokio::test]
80 async fn message_sending_and_receiving() {
81 let channel = ReplayChannel::new();
82 let sender = channel.sender();
83 let mut receiver = channel.receiver();
84
85 sender.send(1);
86 sender.send(2);
87
88 assert_eq!(receiver.receive().await, 1);
89 assert_eq!(receiver.receive().await, 2);
90 }
91
92 #[tokio::test]
93 async fn receiver_replays_past_messages() {
94 let channel = ReplayChannel::new();
95 let sender = channel.sender();
96 let receiver1 = channel.receiver();
97
98 sender.send(1);
100 sender.send(2);
101
102 assert_eq!(receiver1.receive().await, 1);
104 assert_eq!(receiver1.receive().await, 2);
105
106 let receiver2 = channel.receiver();
108 assert_eq!(receiver2.receive().await, 1);
109 assert_eq!(receiver2.receive().await, 2);
110
111 }
113
114 #[tokio::test]
115 async fn multiple_receivers_real_time() {
116 let channel = ReplayChannel::new();
117 let sender = channel.sender();
118 let mut receiver1 = channel.receiver();
119 let mut receiver2 = channel.receiver();
120
121 sender.send(1);
122 sender.send(2);
123
124 assert_eq!(receiver1.receive().await, 1);
125 assert_eq!(receiver1.receive().await, 2);
126 assert_eq!(receiver2.receive().await, 1);
127 assert_eq!(receiver2.receive().await, 2);
128
129 sender.send(3);
130 assert_eq!(receiver1.receive().await, 3);
131 assert_eq!(receiver2.receive().await, 3);
132 }
133
134 #[tokio::test]
135 async fn no_lost_messages() {
136 let channel = ReplayChannel::new();
137 let sender1 = channel.sender();
138 let sender2 = channel.sender();
139 let mut receiver = channel.receiver();
140
141 sender1.send(1);
142 sender2.send(2);
143
144 let received1 = receiver.receive().await;
145 let received2 = receiver.receive().await;
146
147 assert!(received1 == 1 && received2 == 2 || received1 == 2 && received2 == 1);
148 }
149
150 #[tokio::test]
151 async fn receiver_message_order() {
152 let channel = ReplayChannel::new();
153 let sender = channel.sender();
154 let mut receiver = channel.receiver();
155
156 sender.send(1);
157 sender.send(2);
158 sender.send(3);
159
160 assert_eq!(receiver.receive().await, 1);
161 assert_eq!(receiver.receive().await, 2);
162 assert_eq!(receiver.receive().await, 3);
163 }
164
165 #[tokio::test]
166 async fn receiver_index_handling() {
167 let channel = ReplayChannel::new();
168 let sender = channel.sender();
169 let mut receiver = channel.receiver();
170
171 sender.send(1);
172 assert_eq!(receiver.receive().await, 1);
173
174 sender.send(2);
175 sender.send(3);
176 assert_eq!(receiver.receive().await, 2);
177 assert_eq!(receiver.receive().await, 3);
178 }
179}