thread_broadcaster/
lib.rs

1/*
2* Hemanth Krishna (DarthBenro008), Jun 2023
3*/
4
5/*!
6# thread-broadcaster
7Thread Broadcaster is a Single Channel Multi-Producer (SPMC) library that enables the sending of notifications between threads.
8Unlike most Multi-Producer Multi-Consumer (MPMC) implementations, Thread Broadcaster ensures that all listeners receive the data, rather than just the first one.
9
10## Example
11
12```rust
13use core::time;
14use std::thread;
15
16use thread_broadcaster::{BroadcastListener, Broadcaster};
17
18fn main() {
19    let (b, s) = Broadcaster::<String>::new();
20    let s2 = s.clone();
21    thread::spawn(move || {
22        let ls1 = BroadcastListener::register_broadcast_listener(s);
23        for msg in ls1.channel {
24            println!(
25                "got broadcast with data: {} on thread {:#?}",
26                msg,
27                thread::current().id()
28            );
29        }
30    });
31    thread::spawn(move || {
32        let ls2 = BroadcastListener::register_broadcast_listener(s2);
33        for msg in ls2.channel {
34            println!(
35                "got broadcast with data: {} on thread {:#?}",
36                msg,
37                thread::current().id()
38            );
39        }
40    });
41    thread::spawn(move || {
42        // we wait for registration
43        thread::sleep(time::Duration::from_secs(1));
44        b.broadcast("something to broadcast".to_string());
45        // we wait for listeners to pickup before being dropped
46        thread::sleep(time::Duration::from_secs(2));
47    })
48    .join()
49    .unwrap();
50}
51```
52*/
53
54use std::{
55    sync::{Arc, Mutex},
56    thread,
57};
58
59use crossbeam_channel::{unbounded, Receiver, Sender};
60
61/// Responsible for registring new listeners to the broadcaster and to recieve data
62pub struct BroadcastListener<T> {
63    pub channel: Receiver<T>,
64}
65
66impl<T> BroadcastListener<T> {
67    pub fn register_broadcast_listener(broadcaster: Sender<Sender<T>>) -> BroadcastListener<T> {
68        let (s, r) = unbounded::<T>();
69        broadcaster.send(s.clone()).unwrap();
70        BroadcastListener { channel: r }
71    }
72}
73
74/// Returned objet on creation of thread-broadcaster responsible to broadcast data to threads
75pub struct Controller<T> {
76    data: Arc<Mutex<Vec<Sender<T>>>>,
77}
78
79impl<T> Controller<T>
80where
81    T: std::marker::Send + Clone,
82{
83    pub fn broadcast(&self, data: T) {
84        tracing::debug_span!("broadcasting data");
85        let mut map = self.data.lock().unwrap();
86        for x in map.iter_mut() {
87            let new_data = data.clone();
88            x.send(new_data).unwrap();
89        }
90    }
91}
92
93/// Allows to create a thread-broadcaster
94pub struct Broadcaster<T> {
95    sender: Sender<Sender<T>>,
96    reciver: Receiver<Sender<T>>,
97    data: Arc<Mutex<Vec<Sender<T>>>>,
98}
99
100impl<T> Broadcaster<T>
101where
102    T: std::marker::Send + Clone + 'static,
103{
104    pub fn new() -> (Controller<T>, Sender<Sender<T>>) {
105        let (s, r) = unbounded::<crossbeam_channel::Sender<T>>();
106        let broadcaster = Broadcaster {
107            sender: s.clone(),
108            reciver: r,
109            data: Arc::new(Mutex::new(vec![])),
110        };
111        let tc = Controller {
112            data: Arc::clone(&broadcaster.data),
113        };
114        thread::spawn(move || {
115            tracing::debug_span!("starting registration loop");
116            broadcaster.registration_loop();
117        });
118        (tc, s)
119    }
120
121    pub fn broadcaster(self) -> Sender<Sender<T>> {
122        self.sender.clone()
123    }
124
125    fn registration_loop(&self) {
126        let r = self.reciver.clone();
127        thread::scope(|s| {
128            s.spawn(move || {
129                for msg in r.iter() {
130                    tracing::debug_span!("got a registration from listener");
131                    let mut map = self.data.lock().unwrap();
132                    map.push(msg);
133                }
134            });
135        });
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    #[derive(Clone)]
142    pub struct Test {
143        pub id: String,
144    }
145
146    use core::time;
147
148    use super::*;
149
150    #[test]
151    fn single_listener() {
152        let (b, s) = Broadcaster::<Test>::new();
153        let listener = BroadcastListener::register_broadcast_listener(s);
154        let obj = Test {
155            id: "test broadcast".to_string(),
156        };
157        thread::spawn(move || {
158            thread::sleep(time::Duration::from_secs(1));
159            b.broadcast(obj);
160        });
161        assert_eq!(listener.channel.recv().unwrap().id, "test broadcast")
162    }
163
164    #[test]
165    fn broadcast_two_listener() {
166        let (b, s) = Broadcaster::<Test>::new();
167        let ls2 = s.clone();
168        let listener = BroadcastListener::register_broadcast_listener(s);
169        let listener2 = BroadcastListener::register_broadcast_listener(ls2);
170        let results = Arc::new(Mutex::new(Vec::<String>::new()));
171        let comparator = Arc::new(Mutex::new(vec![
172            "test broadcast".to_string(),
173            "test broadcast".to_string(),
174        ]));
175        let ar1 = Arc::clone(&results);
176        let ar2 = Arc::clone(&results);
177        let obj = Test {
178            id: "test broadcast".to_string(),
179        };
180        let t1 = thread::spawn(move || {
181            let data = listener.channel.recv();
182            ar1.lock().unwrap().push(data.unwrap().id);
183        });
184        let t2 = thread::spawn(move || {
185            let data = listener2.channel.recv();
186            ar2.lock().unwrap().push(data.unwrap().id);
187        });
188        thread::spawn(move || {
189            thread::sleep(time::Duration::from_secs(1));
190            b.broadcast(obj);
191        });
192        let _ = t1.join().unwrap();
193        let _ = t2.join().unwrap();
194        assert_eq!(*comparator.lock().unwrap(), *results.lock().unwrap());
195    }
196}