sod_bus/
lib.rs

1//! [`sod::MutService`] implementations to interact with the broadcast [`bus::Bus`].
2//!
3//! ## Service Impls
4//! * [`BusBroadcaster`] broadcasts to a [`bus::Bus`] and blocks until the operation is successful.
5//! * [`BusTryBroadcaster`] tries to broadcast to a [`bus::Bus`] and is able to be retried via [`sod::RetryService`] when the bus buffer is full.
6//! * [`BusReceiver`] receives from a [`bus::BusReader`], blocking until an element is received.
7//! * [`BusTryReceiver`] tries to receive from a [`bus::BusReader`] and is able to be retried via [`sod::RetryService`] when the bus is empty.
8//!
9//! ## Example
10//! ```
11//! use sod::MutService;
12//! use sod_bus::{BusBroadcaster, BusReceiver};
13//!
14//! let mut broadcaster = BusBroadcaster::with_len(1024);
15//! let mut receiver1 = broadcaster.create_receiver();
16//! let mut receiver2 = broadcaster.create_receiver();
17//!
18//! broadcaster.process(1).unwrap();
19//! broadcaster.process(2).unwrap();
20//! broadcaster.process(3).unwrap();
21//!
22//! assert_eq!(receiver1.process(()).unwrap(), 1);
23//! assert_eq!(receiver1.process(()).unwrap(), 2);
24//! assert_eq!(receiver1.process(()).unwrap(), 3);
25//!
26//! assert_eq!(receiver2.process(()).unwrap(), 1);
27//! assert_eq!(receiver2.process(()).unwrap(), 2);
28//! assert_eq!(receiver2.process(()).unwrap(), 3);
29//! ```
30
31use bus::{Bus, BusReader};
32use sod::{MutService, RetryError, Retryable};
33use std::{
34    convert::Infallible,
35    sync::mpsc::{RecvError, TryRecvError},
36};
37
38/// A blocking [`sod::MutService`] that broadcasts to an underlying [`bus::Bus`].
39pub struct BusBroadcaster<T> {
40    bus: Bus<T>,
41}
42impl<T> BusBroadcaster<T> {
43    /// encapsulate the given [`Bus`]
44    pub fn new(bus: Bus<T>) -> Self {
45        Self { bus }
46    }
47    /// create a new [`Bus`] with the given length
48    pub fn with_len(len: usize) -> Self {
49        Self { bus: Bus::new(len) }
50    }
51    /// get a mutable reference to the underlying bus, which may be used to add readers
52    pub fn bus<'a>(&'a mut self) -> &'a mut Bus<T> {
53        &mut self.bus
54    }
55    /// create a [`BusReceiver`] service from the underlying [`Bus`].
56    pub fn create_receiver(&mut self) -> BusReceiver<T> {
57        BusReceiver::new(self.bus.add_rx())
58    }
59    /// create a [`BusTryReceiver`] service from the underlying [`Bus`].
60    pub fn create_try_receiver(&mut self) -> BusTryReceiver<T> {
61        BusTryReceiver::new(self.bus.add_rx())
62    }
63}
64impl<T> MutService for BusBroadcaster<T> {
65    type Input = T;
66    type Output = ();
67    type Error = Infallible;
68    fn process(&mut self, input: T) -> Result<Self::Output, Self::Error> {
69        Ok(self.bus.broadcast(input))
70    }
71}
72
73/// A non-blocking [`sod::MutService`] that is [`Retryable`] and attempts to broadcast to an underlying [`bus::Bus`].
74pub struct BusTryBroadcaster<T> {
75    bus: Bus<T>,
76}
77impl<T> BusTryBroadcaster<T> {
78    /// encapsulate the given [`Bus`]
79    pub fn new(bus: Bus<T>) -> Self {
80        Self { bus }
81    }
82    /// create a new [`Bus`] with the given length
83    pub fn with_len(len: usize) -> Self {
84        Self { bus: Bus::new(len) }
85    }
86    /// get a mutable reference to the underlying bus, which may be used to add readers
87    pub fn bus<'a>(&'a mut self) -> &'a mut Bus<T> {
88        &mut self.bus
89    }
90    /// create a [`BusReceiver`] service from the underlying [`Bus`].
91    pub fn create_receiver(&mut self) -> BusReceiver<T> {
92        BusReceiver::new(self.bus.add_rx())
93    }
94    /// create a [`BusTryReceiver`] service from the underlying [`Bus`].
95    pub fn create_try_receiver(&mut self) -> BusTryReceiver<T> {
96        BusTryReceiver::new(self.bus.add_rx())
97    }
98}
99impl<T> MutService for BusTryBroadcaster<T> {
100    type Input = T;
101    type Output = ();
102    type Error = T;
103    fn process(&mut self, input: T) -> Result<Self::Output, Self::Error> {
104        self.bus.try_broadcast(input)
105    }
106}
107impl<T> Retryable<T, T> for BusTryBroadcaster<T> {
108    fn parse_retry(&self, err: T) -> Result<T, RetryError<T>> {
109        Ok(err)
110    }
111}
112
113/// A blocking [`sod::MutService`] that receives from an underlying [`bus::BusReader`]
114pub struct BusReceiver<T> {
115    reader: BusReader<T>,
116}
117impl<T> BusReceiver<T> {
118    pub fn new(reader: BusReader<T>) -> Self {
119        Self { reader }
120    }
121}
122impl<T: Clone + Sync> MutService for BusReceiver<T> {
123    type Input = ();
124    type Output = T;
125    type Error = RecvError;
126    fn process(&mut self, _: ()) -> Result<Self::Output, Self::Error> {
127        self.reader.recv()
128    }
129}
130
131/// A non-blocking [`sod::MutService`] that is [`sod::Retryable`] and receives from an underlying [`bus::BusReader`]
132pub struct BusTryReceiver<T> {
133    reader: BusReader<T>,
134}
135impl<T> BusTryReceiver<T> {
136    pub fn new(reader: BusReader<T>) -> Self {
137        Self { reader }
138    }
139}
140impl<T: Clone + Sync> MutService for BusTryReceiver<T> {
141    type Input = ();
142    type Output = T;
143    type Error = TryRecvError;
144    fn process(&mut self, _: ()) -> Result<Self::Output, Self::Error> {
145        self.reader.try_recv()
146    }
147}
148impl<T: Clone + Sync> Retryable<(), TryRecvError> for BusTryReceiver<T> {
149    fn parse_retry(&self, err: TryRecvError) -> Result<(), RetryError<TryRecvError>> {
150        match err {
151            TryRecvError::Disconnected => Err(RetryError::ServiceError(TryRecvError::Disconnected)),
152            TryRecvError::Empty => Ok(()),
153        }
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160
161    #[test]
162    fn blocking() {
163        let mut broadcaster = BusBroadcaster::new(Bus::new(1024));
164        let mut reader1 = broadcaster.create_receiver();
165        let mut reader2 = broadcaster.create_receiver();
166        broadcaster.process(1).unwrap();
167        broadcaster.process(2).unwrap();
168        broadcaster.process(3).unwrap();
169
170        assert_eq!(reader1.process(()).unwrap(), 1);
171        assert_eq!(reader1.process(()).unwrap(), 2);
172        assert_eq!(reader1.process(()).unwrap(), 3);
173
174        assert_eq!(reader2.process(()).unwrap(), 1);
175        assert_eq!(reader2.process(()).unwrap(), 2);
176        assert_eq!(reader2.process(()).unwrap(), 3);
177    }
178
179    #[test]
180    fn non_blocking() {
181        let mut broadcaster = BusTryBroadcaster::new(Bus::new(1024));
182        let mut reader1 = broadcaster.create_try_receiver();
183        let mut reader2 = broadcaster.create_try_receiver();
184
185        broadcaster.process(1).unwrap();
186        broadcaster.process(2).unwrap();
187        broadcaster.process(3).unwrap();
188
189        assert_eq!(reader1.process(()).unwrap(), 1);
190        assert_eq!(reader1.process(()).unwrap(), 2);
191        assert_eq!(reader1.process(()).unwrap(), 3);
192
193        assert_eq!(reader2.process(()).unwrap(), 1);
194        assert_eq!(reader2.process(()).unwrap(), 2);
195        assert_eq!(reader2.process(()).unwrap(), 3);
196        assert_eq!(reader2.process(()), Err(TryRecvError::Empty));
197    }
198}