ppl/mpsc/
channel_cb.rs

1use super::{
2    channel,
3    err::{ReceiverError, SenderError},
4};
5use crossbeam_channel::{Receiver, Sender, TryRecvError};
6
7pub struct CBInputChannel<T> {
8    rx: Receiver<T>,
9}
10impl<T> channel::Receiver<T> for CBInputChannel<T>
11where
12    T: Send,
13{
14    fn receive(&self) -> Result<Option<T>, ReceiverError> {
15        let err = self.rx.try_recv();
16        match err {
17            Ok(msg) => Ok(Some(msg)),
18            Err(e) => match e {
19                TryRecvError::Empty => Ok(None),
20                TryRecvError::Disconnected => Err(ReceiverError),
21            },
22        }
23    }
24
25    fn is_empty(&self) -> bool {
26        self.rx.is_empty()
27    }
28}
29
30pub struct CBBlockingInputChannel<T> {
31    rx: Receiver<T>,
32}
33impl<T> channel::Receiver<T> for CBBlockingInputChannel<T>
34where
35    T: Send,
36{
37    fn receive(&self) -> Result<Option<T>, ReceiverError> {
38        let err = self.rx.recv();
39        match err {
40            Ok(msg) => Ok(Some(msg)),
41            Err(_e) => Err(ReceiverError),
42        }
43    }
44
45    fn is_empty(&self) -> bool {
46        self.rx.is_empty()
47    }
48}
49
50pub struct CBOutputChannel<T> {
51    tx: Sender<T>,
52}
53
54impl<T> channel::Sender<T> for CBOutputChannel<T>
55where
56    T: Send,
57{
58    fn send(&self, msg: T) -> Result<(), SenderError> {
59        let err = self.tx.send(msg);
60        match err {
61            Ok(()) => Ok(()),
62            Err(_e) => Err(SenderError),
63        }
64    }
65}
66impl<T> Clone for CBOutputChannel<T> {
67    fn clone(&self) -> Self {
68        Self {
69            tx: self.tx.clone(),
70        }
71    }
72}
73
74/// Channel is a factory for creating channels.
75/// It is a wrapper around crossbeam_channel.
76pub struct Channel;
77
78impl Channel {
79    /// Create a new channel using crossbeam_channel.
80    pub fn channel<T>(
81        blocking: bool,
82    ) -> (
83        Box<dyn channel::Receiver<T> + Sync + Send>,
84        Box<dyn channel::Sender<T> + Sync + Send>,
85    )
86    where
87        T: Send + 'static,
88    {
89        let (tx, rx) = crossbeam_channel::unbounded();
90        if blocking {
91            (
92                Box::new(CBBlockingInputChannel { rx }),
93                Box::new(CBOutputChannel { tx }),
94            )
95        } else {
96            (
97                Box::new(CBInputChannel { rx }),
98                Box::new(CBOutputChannel { tx }),
99            )
100        }
101    }
102}