use bus::{Bus, BusReader};
use sod::{MutService, RetryError, Retryable};
use std::{
convert::Infallible,
sync::mpsc::{RecvError, TryRecvError},
};
pub struct BusBroadcaster<T> {
bus: Bus<T>,
}
impl<T> BusBroadcaster<T> {
pub fn new(bus: Bus<T>) -> Self {
Self { bus }
}
pub fn with_len(len: usize) -> Self {
Self { bus: Bus::new(len) }
}
pub fn bus<'a>(&'a mut self) -> &'a mut Bus<T> {
&mut self.bus
}
pub fn create_receiver(&mut self) -> BusReceiver<T> {
BusReceiver::new(self.bus.add_rx())
}
pub fn create_try_receiver(&mut self) -> BusTryReceiver<T> {
BusTryReceiver::new(self.bus.add_rx())
}
}
impl<T> MutService for BusBroadcaster<T> {
type Input = T;
type Output = ();
type Error = Infallible;
fn process(&mut self, input: T) -> Result<Self::Output, Self::Error> {
Ok(self.bus.broadcast(input))
}
}
pub struct BusTryBroadcaster<T> {
bus: Bus<T>,
}
impl<T> BusTryBroadcaster<T> {
pub fn new(bus: Bus<T>) -> Self {
Self { bus }
}
pub fn with_len(len: usize) -> Self {
Self { bus: Bus::new(len) }
}
pub fn bus<'a>(&'a mut self) -> &'a mut Bus<T> {
&mut self.bus
}
pub fn create_receiver(&mut self) -> BusReceiver<T> {
BusReceiver::new(self.bus.add_rx())
}
pub fn create_try_receiver(&mut self) -> BusTryReceiver<T> {
BusTryReceiver::new(self.bus.add_rx())
}
}
impl<T> MutService for BusTryBroadcaster<T> {
type Input = T;
type Output = ();
type Error = T;
fn process(&mut self, input: T) -> Result<Self::Output, Self::Error> {
self.bus.try_broadcast(input)
}
}
impl<T> Retryable<T, T> for BusTryBroadcaster<T> {
fn parse_retry(&self, err: T) -> Result<T, RetryError<T>> {
Ok(err)
}
}
pub struct BusReceiver<T> {
reader: BusReader<T>,
}
impl<T> BusReceiver<T> {
pub fn new(reader: BusReader<T>) -> Self {
Self { reader }
}
}
impl<T: Clone + Sync> MutService for BusReceiver<T> {
type Input = ();
type Output = T;
type Error = RecvError;
fn process(&mut self, _: ()) -> Result<Self::Output, Self::Error> {
self.reader.recv()
}
}
pub struct BusTryReceiver<T> {
reader: BusReader<T>,
}
impl<T> BusTryReceiver<T> {
pub fn new(reader: BusReader<T>) -> Self {
Self { reader }
}
}
impl<T: Clone + Sync> MutService for BusTryReceiver<T> {
type Input = ();
type Output = T;
type Error = TryRecvError;
fn process(&mut self, _: ()) -> Result<Self::Output, Self::Error> {
self.reader.try_recv()
}
}
impl<T: Clone + Sync> Retryable<(), TryRecvError> for BusTryReceiver<T> {
fn parse_retry(&self, err: TryRecvError) -> Result<(), RetryError<TryRecvError>> {
match err {
TryRecvError::Disconnected => Err(RetryError::ServiceError(TryRecvError::Disconnected)),
TryRecvError::Empty => Ok(()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn blocking() {
let mut broadcaster = BusBroadcaster::new(Bus::new(1024));
let mut reader1 = broadcaster.create_receiver();
let mut reader2 = broadcaster.create_receiver();
broadcaster.process(1).unwrap();
broadcaster.process(2).unwrap();
broadcaster.process(3).unwrap();
assert_eq!(reader1.process(()).unwrap(), 1);
assert_eq!(reader1.process(()).unwrap(), 2);
assert_eq!(reader1.process(()).unwrap(), 3);
assert_eq!(reader2.process(()).unwrap(), 1);
assert_eq!(reader2.process(()).unwrap(), 2);
assert_eq!(reader2.process(()).unwrap(), 3);
}
#[test]
fn non_blocking() {
let mut broadcaster = BusTryBroadcaster::new(Bus::new(1024));
let mut reader1 = broadcaster.create_try_receiver();
let mut reader2 = broadcaster.create_try_receiver();
broadcaster.process(1).unwrap();
broadcaster.process(2).unwrap();
broadcaster.process(3).unwrap();
assert_eq!(reader1.process(()).unwrap(), 1);
assert_eq!(reader1.process(()).unwrap(), 2);
assert_eq!(reader1.process(()).unwrap(), 3);
assert_eq!(reader2.process(()).unwrap(), 1);
assert_eq!(reader2.process(()).unwrap(), 2);
assert_eq!(reader2.process(()).unwrap(), 3);
assert_eq!(reader2.process(()), Err(TryRecvError::Empty));
}
}