1use bus::{Bus, BusReader};
32use sod::{MutService, RetryError, Retryable};
33use std::{
34 convert::Infallible,
35 sync::mpsc::{RecvError, TryRecvError},
36};
37
38pub struct BusBroadcaster<T> {
40 bus: Bus<T>,
41}
42impl<T> BusBroadcaster<T> {
43 pub fn new(bus: Bus<T>) -> Self {
45 Self { bus }
46 }
47 pub fn with_len(len: usize) -> Self {
49 Self { bus: Bus::new(len) }
50 }
51 pub fn bus<'a>(&'a mut self) -> &'a mut Bus<T> {
53 &mut self.bus
54 }
55 pub fn create_receiver(&mut self) -> BusReceiver<T> {
57 BusReceiver::new(self.bus.add_rx())
58 }
59 pub fn create_try_receiver(&mut self) -> BusTryReceiver<T> {
61 BusTryReceiver::new(self.bus.add_rx())
62 }
63}
64impl<T> MutService for BusBroadcaster<T> {
65 type Input = T;
66 type Output = ();
67 type Error = Infallible;
68 fn process(&mut self, input: T) -> Result<Self::Output, Self::Error> {
69 Ok(self.bus.broadcast(input))
70 }
71}
72
73pub struct BusTryBroadcaster<T> {
75 bus: Bus<T>,
76}
77impl<T> BusTryBroadcaster<T> {
78 pub fn new(bus: Bus<T>) -> Self {
80 Self { bus }
81 }
82 pub fn with_len(len: usize) -> Self {
84 Self { bus: Bus::new(len) }
85 }
86 pub fn bus<'a>(&'a mut self) -> &'a mut Bus<T> {
88 &mut self.bus
89 }
90 pub fn create_receiver(&mut self) -> BusReceiver<T> {
92 BusReceiver::new(self.bus.add_rx())
93 }
94 pub fn create_try_receiver(&mut self) -> BusTryReceiver<T> {
96 BusTryReceiver::new(self.bus.add_rx())
97 }
98}
99impl<T> MutService for BusTryBroadcaster<T> {
100 type Input = T;
101 type Output = ();
102 type Error = T;
103 fn process(&mut self, input: T) -> Result<Self::Output, Self::Error> {
104 self.bus.try_broadcast(input)
105 }
106}
107impl<T> Retryable<T, T> for BusTryBroadcaster<T> {
108 fn parse_retry(&self, err: T) -> Result<T, RetryError<T>> {
109 Ok(err)
110 }
111}
112
113pub struct BusReceiver<T> {
115 reader: BusReader<T>,
116}
117impl<T> BusReceiver<T> {
118 pub fn new(reader: BusReader<T>) -> Self {
119 Self { reader }
120 }
121}
122impl<T: Clone + Sync> MutService for BusReceiver<T> {
123 type Input = ();
124 type Output = T;
125 type Error = RecvError;
126 fn process(&mut self, _: ()) -> Result<Self::Output, Self::Error> {
127 self.reader.recv()
128 }
129}
130
131pub struct BusTryReceiver<T> {
133 reader: BusReader<T>,
134}
135impl<T> BusTryReceiver<T> {
136 pub fn new(reader: BusReader<T>) -> Self {
137 Self { reader }
138 }
139}
140impl<T: Clone + Sync> MutService for BusTryReceiver<T> {
141 type Input = ();
142 type Output = T;
143 type Error = TryRecvError;
144 fn process(&mut self, _: ()) -> Result<Self::Output, Self::Error> {
145 self.reader.try_recv()
146 }
147}
148impl<T: Clone + Sync> Retryable<(), TryRecvError> for BusTryReceiver<T> {
149 fn parse_retry(&self, err: TryRecvError) -> Result<(), RetryError<TryRecvError>> {
150 match err {
151 TryRecvError::Disconnected => Err(RetryError::ServiceError(TryRecvError::Disconnected)),
152 TryRecvError::Empty => Ok(()),
153 }
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160
161 #[test]
162 fn blocking() {
163 let mut broadcaster = BusBroadcaster::new(Bus::new(1024));
164 let mut reader1 = broadcaster.create_receiver();
165 let mut reader2 = broadcaster.create_receiver();
166 broadcaster.process(1).unwrap();
167 broadcaster.process(2).unwrap();
168 broadcaster.process(3).unwrap();
169
170 assert_eq!(reader1.process(()).unwrap(), 1);
171 assert_eq!(reader1.process(()).unwrap(), 2);
172 assert_eq!(reader1.process(()).unwrap(), 3);
173
174 assert_eq!(reader2.process(()).unwrap(), 1);
175 assert_eq!(reader2.process(()).unwrap(), 2);
176 assert_eq!(reader2.process(()).unwrap(), 3);
177 }
178
179 #[test]
180 fn non_blocking() {
181 let mut broadcaster = BusTryBroadcaster::new(Bus::new(1024));
182 let mut reader1 = broadcaster.create_try_receiver();
183 let mut reader2 = broadcaster.create_try_receiver();
184
185 broadcaster.process(1).unwrap();
186 broadcaster.process(2).unwrap();
187 broadcaster.process(3).unwrap();
188
189 assert_eq!(reader1.process(()).unwrap(), 1);
190 assert_eq!(reader1.process(()).unwrap(), 2);
191 assert_eq!(reader1.process(()).unwrap(), 3);
192
193 assert_eq!(reader2.process(()).unwrap(), 1);
194 assert_eq!(reader2.process(()).unwrap(), 2);
195 assert_eq!(reader2.process(()).unwrap(), 3);
196 assert_eq!(reader2.process(()), Err(TryRecvError::Empty));
197 }
198}