thread_broadcaster/
lib.rs1use std::{
55 sync::{Arc, Mutex},
56 thread,
57};
58
59use crossbeam_channel::{unbounded, Receiver, Sender};
60
61pub struct BroadcastListener<T> {
63 pub channel: Receiver<T>,
64}
65
66impl<T> BroadcastListener<T> {
67 pub fn register_broadcast_listener(broadcaster: Sender<Sender<T>>) -> BroadcastListener<T> {
68 let (s, r) = unbounded::<T>();
69 broadcaster.send(s.clone()).unwrap();
70 BroadcastListener { channel: r }
71 }
72}
73
74pub struct Controller<T> {
76 data: Arc<Mutex<Vec<Sender<T>>>>,
77}
78
79impl<T> Controller<T>
80where
81 T: std::marker::Send + Clone,
82{
83 pub fn broadcast(&self, data: T) {
84 tracing::debug_span!("broadcasting data");
85 let mut map = self.data.lock().unwrap();
86 for x in map.iter_mut() {
87 let new_data = data.clone();
88 x.send(new_data).unwrap();
89 }
90 }
91}
92
93pub struct Broadcaster<T> {
95 sender: Sender<Sender<T>>,
96 reciver: Receiver<Sender<T>>,
97 data: Arc<Mutex<Vec<Sender<T>>>>,
98}
99
100impl<T> Broadcaster<T>
101where
102 T: std::marker::Send + Clone + 'static,
103{
104 pub fn new() -> (Controller<T>, Sender<Sender<T>>) {
105 let (s, r) = unbounded::<crossbeam_channel::Sender<T>>();
106 let broadcaster = Broadcaster {
107 sender: s.clone(),
108 reciver: r,
109 data: Arc::new(Mutex::new(vec![])),
110 };
111 let tc = Controller {
112 data: Arc::clone(&broadcaster.data),
113 };
114 thread::spawn(move || {
115 tracing::debug_span!("starting registration loop");
116 broadcaster.registration_loop();
117 });
118 (tc, s)
119 }
120
121 pub fn broadcaster(self) -> Sender<Sender<T>> {
122 self.sender.clone()
123 }
124
125 fn registration_loop(&self) {
126 let r = self.reciver.clone();
127 thread::scope(|s| {
128 s.spawn(move || {
129 for msg in r.iter() {
130 tracing::debug_span!("got a registration from listener");
131 let mut map = self.data.lock().unwrap();
132 map.push(msg);
133 }
134 });
135 });
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 #[derive(Clone)]
142 pub struct Test {
143 pub id: String,
144 }
145
146 use core::time;
147
148 use super::*;
149
150 #[test]
151 fn single_listener() {
152 let (b, s) = Broadcaster::<Test>::new();
153 let listener = BroadcastListener::register_broadcast_listener(s);
154 let obj = Test {
155 id: "test broadcast".to_string(),
156 };
157 thread::spawn(move || {
158 thread::sleep(time::Duration::from_secs(1));
159 b.broadcast(obj);
160 });
161 assert_eq!(listener.channel.recv().unwrap().id, "test broadcast")
162 }
163
164 #[test]
165 fn broadcast_two_listener() {
166 let (b, s) = Broadcaster::<Test>::new();
167 let ls2 = s.clone();
168 let listener = BroadcastListener::register_broadcast_listener(s);
169 let listener2 = BroadcastListener::register_broadcast_listener(ls2);
170 let results = Arc::new(Mutex::new(Vec::<String>::new()));
171 let comparator = Arc::new(Mutex::new(vec![
172 "test broadcast".to_string(),
173 "test broadcast".to_string(),
174 ]));
175 let ar1 = Arc::clone(&results);
176 let ar2 = Arc::clone(&results);
177 let obj = Test {
178 id: "test broadcast".to_string(),
179 };
180 let t1 = thread::spawn(move || {
181 let data = listener.channel.recv();
182 ar1.lock().unwrap().push(data.unwrap().id);
183 });
184 let t2 = thread::spawn(move || {
185 let data = listener2.channel.recv();
186 ar2.lock().unwrap().push(data.unwrap().id);
187 });
188 thread::spawn(move || {
189 thread::sleep(time::Duration::from_secs(1));
190 b.broadcast(obj);
191 });
192 let _ = t1.join().unwrap();
193 let _ = t2.join().unwrap();
194 assert_eq!(*comparator.lock().unwrap(), *results.lock().unwrap());
195 }
196}