1use super::{
2 channel,
3 err::{ReceiverError, SenderError},
4};
5use crossbeam_channel::{Receiver, Sender, TryRecvError};
6
7pub struct CBInputChannel<T> {
8 rx: Receiver<T>,
9}
10impl<T> channel::Receiver<T> for CBInputChannel<T>
11where
12 T: Send,
13{
14 fn receive(&self) -> Result<Option<T>, ReceiverError> {
15 let err = self.rx.try_recv();
16 match err {
17 Ok(msg) => Ok(Some(msg)),
18 Err(e) => match e {
19 TryRecvError::Empty => Ok(None),
20 TryRecvError::Disconnected => Err(ReceiverError),
21 },
22 }
23 }
24
25 fn is_empty(&self) -> bool {
26 self.rx.is_empty()
27 }
28}
29
30pub struct CBBlockingInputChannel<T> {
31 rx: Receiver<T>,
32}
33impl<T> channel::Receiver<T> for CBBlockingInputChannel<T>
34where
35 T: Send,
36{
37 fn receive(&self) -> Result<Option<T>, ReceiverError> {
38 let err = self.rx.recv();
39 match err {
40 Ok(msg) => Ok(Some(msg)),
41 Err(_e) => Err(ReceiverError),
42 }
43 }
44
45 fn is_empty(&self) -> bool {
46 self.rx.is_empty()
47 }
48}
49
50pub struct CBOutputChannel<T> {
51 tx: Sender<T>,
52}
53
54impl<T> channel::Sender<T> for CBOutputChannel<T>
55where
56 T: Send,
57{
58 fn send(&self, msg: T) -> Result<(), SenderError> {
59 let err = self.tx.send(msg);
60 match err {
61 Ok(()) => Ok(()),
62 Err(_e) => Err(SenderError),
63 }
64 }
65}
66impl<T> Clone for CBOutputChannel<T> {
67 fn clone(&self) -> Self {
68 Self {
69 tx: self.tx.clone(),
70 }
71 }
72}
73
74pub struct Channel;
77
78impl Channel {
79 pub fn channel<T>(
81 blocking: bool,
82 ) -> (
83 Box<dyn channel::Receiver<T> + Sync + Send>,
84 Box<dyn channel::Sender<T> + Sync + Send>,
85 )
86 where
87 T: Send + 'static,
88 {
89 let (tx, rx) = crossbeam_channel::unbounded();
90 if blocking {
91 (
92 Box::new(CBBlockingInputChannel { rx }),
93 Box::new(CBOutputChannel { tx }),
94 )
95 } else {
96 (
97 Box::new(CBInputChannel { rx }),
98 Box::new(CBOutputChannel { tx }),
99 )
100 }
101 }
102}