1use crate::prelude::*;
10
11use async_channel::TryRecvError;
12
13pub struct Receiver<T> {
15 rx: async_channel::Receiver<T>,
16}
17
18pub struct Sender<T> {
20 tx: async_channel::Sender<T>,
21}
22
23#[derive(Clone, Copy, Debug, Default, Error)]
25#[error("Channel is closed.")]
26pub struct ClosedError;
27
28#[derive(Clone, Copy)]
30pub struct SendError<M> {
31 pub msg: M,
33 pub reason: SendErrorReason,
35}
36
37#[derive(Clone, Copy, Debug, Eq, Error, PartialEq)]
39pub enum SendErrorReason {
40 #[error("Channel is closed.")]
41 Closed,
42 #[error("Channel is full.")]
43 Full,
44}
45
46pub fn with_capacity<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {
48 let (tx, rx) = async_channel::bounded(capacity);
49
50 (Sender { tx }, Receiver { rx })
51}
52
53pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
55 let (tx, rx) = async_channel::unbounded();
56
57 (Sender { tx }, Receiver { rx })
58}
59
60impl<T> Receiver<T> {
61 pub fn is_closed(&self) -> bool {
65 self.rx.is_closed()
66 }
67
68 pub async fn recv(&self) -> Result<T, ClosedError> {
70 self.rx.recv().await.map_err(|_| ClosedError)
71 }
72
73 pub fn try_recv(&self) -> Result<Option<T>, ClosedError> {
77 match self.rx.try_recv() {
78 Ok(msg) => Ok(Some(msg)),
79 Err(TryRecvError::Empty) => Ok(None),
80 Err(TryRecvError::Closed) => Err(ClosedError),
81 }
82 }
83}
84
85impl<T> Sender<T> {
86 pub fn is_closed(&self) -> bool {
90 self.tx.is_closed()
91 }
92
93 pub async fn send(&self, message: T) -> Result<(), SendError<T>> {
98 self
99 .tx
100 .send(message)
101 .await
102 .map_err(|err| SendError { msg: err.0, reason: SendErrorReason::Closed })
103 }
104
105 pub fn try_send(&self, message: T) -> Result<(), SendError<T>> {
110 self.tx.try_send(message).map_err(|err| match err {
111 async_channel::TrySendError::Full(msg) => SendError { msg, reason: SendErrorReason::Full },
112 async_channel::TrySendError::Closed(msg) => {
113 SendError { msg, reason: SendErrorReason::Closed }
114 }
115 })
116 }
117}
118
119impl<T> Clone for Receiver<T> {
122 fn clone(&self) -> Self {
123 Self { rx: self.rx.clone() }
124 }
125}
126
127impl<T> Clone for Sender<T> {
128 fn clone(&self) -> Self {
129 Self { tx: self.tx.clone() }
130 }
131}
132
133impl<M> std::error::Error for SendError<M> {}
136
137impl<M> Debug for SendError<M> {
138 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
139 Debug::fmt(&self.reason, f)
140 }
141}
142
143impl<M> Display for SendError<M> {
144 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
145 Display::fmt(&self.reason, f)
146 }
147}