1use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Condvar, Mutex, Weak};
9use std::time::Duration;
10
11pub struct Broadcaster<T: Clone> {
12 children: Mutex<Vec<Weak<Receiver<T>>>>, }
14impl<T: Clone> Broadcaster<T> {
15 pub const fn new() -> Self {
16 Self {
17 children: Mutex::new(vec![]),
18 }
19 }
20
21 pub fn subscribe(&self, buffer_size: usize) -> Arc<Receiver<T>> {
22 let mut c = self.children.lock().unwrap();
23 let receiver = Arc::new(Receiver::new(buffer_size));
24 c.push(Arc::downgrade(&receiver));
25 receiver
26 }
27
28 pub fn subscribe_unbound(&self) {
30 self.subscribe(usize::MAX);
31 }
32
33 pub fn send(&self, data: T) -> usize {
35 let mut c = self.children.lock().unwrap();
36 c.retain(|r| {
37 if let Some(r) = r.upgrade() {
38 r.push(data.clone());
39 true
40 } else {
41 false
42 }
43 });
44
45 c.len()
46 }
47
48 pub fn send_timeout(&self, data: T, timeout: Duration) -> Result<usize, &'static str> {
51 let mut c = self.children.lock().unwrap();
52 let mut res = Ok(());
53 c.retain(|r| {
54 if let Some(r) = r.upgrade() {
55 if let Err(e) = r.push_timeout(data.clone(), timeout) {
56 res = Err(e);
57 }
58 true
59 } else {
60 false
61 }
62 });
63
64 match res {
65 Ok(_) => {
66 Ok(c.len())
67 }
68 Err(e) => {
69 Err(e)
70 }
71 }
72 }
73}
74impl<T: Clone> Drop for Broadcaster<T> {
75 fn drop(&mut self) {
76 let children = self.children.lock().unwrap();
77 for c in children.iter() {
78 if let Some(r) = c.upgrade() {
79 r.sender_alive.store(false, Ordering::SeqCst);
80 }
81 }
82 }
83}
84
85pub struct Receiver<T> {
86 buffer: Mutex<VecDeque<T>>,
87 condvar: Condvar,
88 space_available: Condvar,
89 buffer_size: usize,
90 sender_alive: AtomicBool,
91}
92impl<T> Receiver<T> {
93 fn new(size: usize) -> Self {
94 Self {
95 buffer: Mutex::new(VecDeque::new()),
96 condvar: Condvar::new(),
97 space_available: Condvar::new(),
98 buffer_size: size,
99 sender_alive: AtomicBool::new(true),
100 }
101 }
102
103 pub(crate) fn push(&self, val: T) {
104 let mut buffer = self.buffer.lock().unwrap();
105 while buffer.len() >= self.buffer_size {
106 buffer = self.space_available.wait(buffer).unwrap();
107 }
108 buffer.push_back(val);
109 self.condvar.notify_one();
110 }
111
112 pub(crate) fn push_timeout(&self, val: T, timeout: Duration) -> Result<(), &'static str> {
113 let buffer = self.buffer.lock().unwrap();
114 let (mut buffer, res) = self
115 .space_available
116 .wait_timeout_while(buffer, timeout, |buffer| buffer.len() >= self.buffer_size)
117 .unwrap();
118
119 if res.timed_out() {
120 return Err("Timeout");
121 }
122
123 buffer.push_back(val);
124 self.condvar.notify_one();
125
126 Ok(())
127 }
128
129 pub fn recv(&self) -> T {
131 let mut buffer = self.buffer.lock().unwrap();
132 while buffer.is_empty() {
133 buffer = self.condvar.wait(buffer).unwrap();
134 }
135 let val = buffer.pop_front().unwrap();
136 self.space_available.notify_one();
137 val
138 }
139
140 pub fn recv_all(&self) -> Vec<T> {
142 let mut buffer = self.buffer.lock().unwrap();
143 while buffer.is_empty() {
144 buffer = self.condvar.wait(buffer).unwrap();
145 }
146 let mut out = Vec::with_capacity(buffer.len());
149 for _ in 0..buffer.len() {
150 out.push(buffer.pop_front().expect("This should not happen"))
151 }
152 self.space_available.notify_one();
153 out
154 }
155
156 pub fn try_recv(&self) -> Option<T> {
157 let mut buffer = self.buffer.lock().unwrap();
158 while buffer.is_empty() {
159 return None;
160 }
161
162 let out = buffer.pop_front().expect("wont happen");
163 self.space_available.notify_one();
164 Some(out)
165 }
166
167 pub fn try_recv_all(&self) -> Option<Vec<T>> {
168 let mut buffer = self.buffer.lock().unwrap();
169 while buffer.is_empty() {
170 return None;
172 }
173 let mut out = Vec::with_capacity(buffer.len());
176 for _ in 0..buffer.len() {
177 out.push(buffer.pop_front().expect("This should not happen"))
178 }
179 self.space_available.notify_one();
180 Some(out)
181 }
182
183 pub fn sender_alive(&self) -> bool {
184 self.sender_alive.load(Ordering::Relaxed)
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use std::thread::{sleep, spawn};
192
193 #[test]
194 fn easy() {
195 let broadcaster = Broadcaster::new();
196 let mut seen = vec![];
197 for _ in 0..3 {
198 let rv = broadcaster.subscribe(5);
199 let v = spawn(move || {
200 assert_eq!(rv.recv(), 3);
201 });
202 seen.push(v);
203 }
204
205 broadcaster.send(3);
206
207 sleep(Duration::from_millis(50));
208 for s in seen.iter() {
209 if !s.is_finished() {
210 panic!();
211 }
212 }
213 }
214
215 #[test]
216 fn timeout() {
217 let sender = Broadcaster::new();
218
219 let r1 = sender.subscribe(1);
220 let one = spawn(move || {
221 assert_eq!(r1.recv(), 3);
223 assert_eq!(r1.recv(), 3);
224 });
225
226 let r2 = sender.subscribe(1);
227 let two = spawn(move || {
228 let _local = r2;
229 sleep(Duration::from_millis(500));
230 });
231
232 sender.send(3);
233 assert_eq!(
234 Err("Timeout"),
235 sender.send_timeout(3, Duration::from_millis(200))
236 );
237
238 one.join().unwrap();
239 two.join().unwrap();
240 }
241
242 #[test]
243 fn clearing() {
244 let broadcaster = Broadcaster::new();
245 let mut seen = vec![];
246 for _ in 0..5 {
247 let rv = broadcaster.subscribe(3);
248 let s = spawn(move || {
249 for i in 0..10 {
250 assert_eq!(rv.recv(), i);
251 }
252 });
253 seen.push(s);
254 }
255
256 for i in 0..10 {
257 broadcaster
258 .send_timeout(i, Duration::from_millis(50))
259 .unwrap();
260 }
261
262 for s in seen {
263 s.join().unwrap();
264 }
265 }
266
267 #[test]
268 fn removed_receivers() {
269 let broadcaster = Broadcaster::new();
270
271 let r1 = broadcaster.subscribe(1);
272
273 let r2 = broadcaster.subscribe(1);
274 drop(r2);
275
276 broadcaster.send(3);
277 r1.recv();
278 broadcaster
279 .send_timeout(4, Duration::from_millis(500))
280 .unwrap();
281
282 assert_eq!(r1.recv(), 4);
283 }
284
285 #[test]
286 fn removed_sender() {
287 let broadcaster: Broadcaster<()> = Broadcaster::new();
288 let r = broadcaster.subscribe(1);
289 assert_eq!(r.sender_alive(), true);
290 drop(broadcaster);
291 assert_eq!(r.sender_alive(), false);
292 }
293
294 #[test]
295 fn stat_ic() {
296 static INFO_BOT: Broadcaster<()> = Broadcaster::new();
297
298 let one = spawn(|| {
299 let r = INFO_BOT.subscribe(usize::MAX);
300 r.recv()
301 });
302
303 loop {
304 let v = INFO_BOT.send(());
305 if v != 0 { break }
306 }
307
308
309 one.join().unwrap();
310 }
312}