Skip to main content

broadcaster_ssmr/
lib.rs

1//! A Broadcaster (ssmr = single sender multi receiver) is a channel where one sender sends duplicate data to multiple receiver's.
2//! the data is buffered (as specified by Broadcaster::subscribe(buffer_size: usize)) at the receiver and can be pulled with 'recv' or 'try_recv'.
3//! All receivers will get sent a copy of the data, if a receivers buffer overflows it will block Broadcaster::send().
4
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Condvar, Mutex, Weak};
9use std::time::Duration;
10
11pub struct Broadcaster<T: Clone> {
12    children: Mutex<Vec<Weak<Receiver<T>>>>, // inside a mutex to it can be subscribed to from multiple threads 'concurrently'
13}
14impl<T: Clone> Broadcaster<T> {
15    pub const fn new() -> Self {
16        Self {
17            children: Mutex::new(vec![]),
18        }
19    }
20
21    pub fn subscribe(&self, buffer_size: usize) -> Arc<Receiver<T>> {
22        let mut c = self.children.lock().unwrap();
23        let receiver = Arc::new(Receiver::new(buffer_size));
24        c.push(Arc::downgrade(&receiver));
25        receiver
26    }
27
28    /// alias for self.subscribe(usize::MAX)
29    pub fn subscribe_unbound(&self) {
30        self.subscribe(usize::MAX);
31    }
32
33    /// returns the amount of receivers the data was sent to
34    pub fn send(&self, data: T) -> usize {
35        let mut c = self.children.lock().unwrap();
36        c.retain(|r| {
37            if let Some(r) = r.upgrade() {
38                r.push(data.clone());
39                true
40            } else {
41                false
42            }
43        });
44
45        c.len()
46    }
47
48    /// if it timeouts the data is never sent to select receiver
49    /// returns the amount of receivers the data was sent to
50    pub fn send_timeout(&self, data: T, timeout: Duration) -> Result<usize, &'static str> {
51        let mut c = self.children.lock().unwrap();
52        let mut res = Ok(());
53        c.retain(|r| {
54            if let Some(r) = r.upgrade() {
55                if let Err(e) = r.push_timeout(data.clone(), timeout) {
56                    res = Err(e);
57                }
58                true
59            } else {
60                false
61            }
62        });
63
64        match res {
65            Ok(_) => {
66                Ok(c.len())
67            }
68            Err(e) => {
69                Err(e)
70            }
71        }
72    }
73}
74impl<T: Clone> Drop for Broadcaster<T> {
75    fn drop(&mut self) {
76        let children = self.children.lock().unwrap();
77        for c in children.iter() {
78            if let Some(r) = c.upgrade() {
79                r.sender_alive.store(false, Ordering::SeqCst);
80            }
81        }
82    }
83}
84
85pub struct Receiver<T> {
86    buffer: Mutex<VecDeque<T>>,
87    condvar: Condvar,
88    space_available: Condvar,
89    buffer_size: usize,
90    sender_alive: AtomicBool,
91}
92impl<T> Receiver<T> {
93    fn new(size: usize) -> Self {
94        Self {
95            buffer: Mutex::new(VecDeque::new()),
96            condvar: Condvar::new(),
97            space_available: Condvar::new(),
98            buffer_size: size,
99            sender_alive: AtomicBool::new(true),
100        }
101    }
102
103    pub(crate) fn push(&self, val: T) {
104        let mut buffer = self.buffer.lock().unwrap();
105        while buffer.len() >= self.buffer_size {
106            buffer = self.space_available.wait(buffer).unwrap();
107        }
108        buffer.push_back(val);
109        self.condvar.notify_one();
110    }
111
112    pub(crate) fn push_timeout(&self, val: T, timeout: Duration) -> Result<(), &'static str> {
113        let buffer = self.buffer.lock().unwrap();
114        let (mut buffer, res) = self
115            .space_available
116            .wait_timeout_while(buffer, timeout, |buffer| buffer.len() >= self.buffer_size)
117            .unwrap();
118
119        if res.timed_out() {
120            return Err("Timeout");
121        }
122
123        buffer.push_back(val);
124        self.condvar.notify_one();
125
126        Ok(())
127    }
128
129    /// Blocking
130    pub fn recv(&self) -> T {
131        let mut buffer = self.buffer.lock().unwrap();
132        while buffer.is_empty() {
133            buffer = self.condvar.wait(buffer).unwrap();
134        }
135        let val = buffer.pop_front().unwrap();
136        self.space_available.notify_one();
137        val
138    }
139
140    /// Blocking
141    pub fn recv_all(&self) -> Vec<T> {
142        let mut buffer = self.buffer.lock().unwrap();
143        while buffer.is_empty() {
144            buffer = self.condvar.wait(buffer).unwrap();
145        }
146        // let val = buffer.pop_front().unwrap();
147
148        let mut out = Vec::with_capacity(buffer.len());
149        for _ in 0..buffer.len() {
150            out.push(buffer.pop_front().expect("This should not happen"))
151        }
152        self.space_available.notify_one();
153        out
154    }
155
156    pub fn try_recv(&self) -> Option<T> {
157        let mut buffer = self.buffer.lock().unwrap();
158        while buffer.is_empty() {
159            return None;
160        }
161
162        let out = buffer.pop_front().expect("wont happen");
163        self.space_available.notify_one();
164        Some(out)
165    }
166
167    pub fn try_recv_all(&self) -> Option<Vec<T>> {
168        let mut buffer = self.buffer.lock().unwrap();
169        while buffer.is_empty() {
170            // buffer = self.condvar.wait(buffer).unwrap();
171            return None;
172        }
173        // let val = buffer.pop_front().unwrap();
174
175        let mut out = Vec::with_capacity(buffer.len());
176        for _ in 0..buffer.len() {
177            out.push(buffer.pop_front().expect("This should not happen"))
178        }
179        self.space_available.notify_one();
180        Some(out)
181    }
182
183    pub fn sender_alive(&self) -> bool {
184        self.sender_alive.load(Ordering::Relaxed)
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use std::thread::{sleep, spawn};
192
193    #[test]
194    fn easy() {
195        let broadcaster = Broadcaster::new();
196        let mut seen = vec![];
197        for _ in 0..3 {
198            let rv = broadcaster.subscribe(5);
199            let v = spawn(move || {
200                assert_eq!(rv.recv(), 3);
201            });
202            seen.push(v);
203        }
204
205        broadcaster.send(3);
206
207        sleep(Duration::from_millis(50));
208        for s in seen.iter() {
209            if !s.is_finished() {
210                panic!();
211            }
212        }
213    }
214
215    #[test]
216    fn timeout() {
217        let sender = Broadcaster::new();
218
219        let r1 = sender.subscribe(1);
220        let one = spawn(move || {
221            // sleep(Duration::from_millis(500));
222            assert_eq!(r1.recv(), 3);
223            assert_eq!(r1.recv(), 3);
224        });
225
226        let r2 = sender.subscribe(1);
227        let two = spawn(move || {
228            let _local = r2;
229            sleep(Duration::from_millis(500));
230        });
231
232        sender.send(3);
233        assert_eq!(
234            Err("Timeout"),
235            sender.send_timeout(3, Duration::from_millis(200))
236        );
237
238        one.join().unwrap();
239        two.join().unwrap();
240    }
241
242    #[test]
243    fn clearing() {
244        let broadcaster = Broadcaster::new();
245        let mut seen = vec![];
246        for _ in 0..5 {
247            let rv = broadcaster.subscribe(3);
248            let s = spawn(move || {
249                for i in 0..10 {
250                    assert_eq!(rv.recv(), i);
251                }
252            });
253            seen.push(s);
254        }
255
256        for i in 0..10 {
257            broadcaster
258                .send_timeout(i, Duration::from_millis(50))
259                .unwrap();
260        }
261
262        for s in seen {
263            s.join().unwrap();
264        }
265    }
266
267    #[test]
268    fn removed_receivers() {
269        let broadcaster = Broadcaster::new();
270
271        let r1 = broadcaster.subscribe(1);
272
273        let r2 = broadcaster.subscribe(1);
274        drop(r2);
275
276        broadcaster.send(3);
277        r1.recv();
278        broadcaster
279            .send_timeout(4, Duration::from_millis(500))
280            .unwrap();
281
282        assert_eq!(r1.recv(), 4);
283    }
284
285    #[test]
286    fn removed_sender() {
287        let broadcaster: Broadcaster<()> = Broadcaster::new();
288        let r = broadcaster.subscribe(1);
289        assert_eq!(r.sender_alive(), true);
290        drop(broadcaster);
291        assert_eq!(r.sender_alive(), false);
292    }
293
294    #[test]
295    fn stat_ic() {
296        static INFO_BOT: Broadcaster<()> = Broadcaster::new();
297
298        let one = spawn(|| {
299            let r = INFO_BOT.subscribe(usize::MAX);
300            r.recv()
301        });
302
303        loop {
304            let v = INFO_BOT.send(());
305            if v != 0 { break }
306        }
307
308
309        one.join().unwrap();
310        // two.join().unwrap();
311    }
312}