1use crate::queue::{NotificationError, Notifier};
3use crate::NotificationId;
4use crossbeam::channel as beamchannel;
5use std::error;
6use std::sync::{mpsc, Arc};
7use std::{fmt, io};
8
9pub fn channel<T>(
12 notifier: Arc<dyn Notifier>,
13 id: NotificationId,
14) -> (Sender<T>, mpsc::Receiver<T>) {
15 let (tx, rx) = mpsc::channel();
16 let tx = Sender { notifier, tx, id };
17 (tx, rx)
18}
19
20pub fn sync_channel<T>(
23 notifier: Arc<dyn Notifier>,
24 id: NotificationId,
25 bound_size: usize,
26) -> (SyncSender<T>, mpsc::Receiver<T>) {
27 let (tx, rx) = mpsc::sync_channel(bound_size);
28 let tx = SyncSender { notifier, tx, id };
29 (tx, rx)
30}
31
32pub fn crossbeam_channel_unbounded<T>(
35 notifier: Arc<dyn Notifier>,
36 id: NotificationId,
37) -> (CrossbeamSender<T>, beamchannel::Receiver<T>) {
38 let (tx, rx) = beamchannel::unbounded();
39 let tx = CrossbeamSender { notifier, tx, id };
40 (tx, rx)
41}
42
43pub fn crossbeam_channel_bounded<T>(
46 notifier: Arc<dyn Notifier>,
47 id: NotificationId,
48 size: usize,
49) -> (CrossbeamSender<T>, beamchannel::Receiver<T>) {
50 let (tx, rx) = beamchannel::bounded(size);
51 let tx = CrossbeamSender { notifier, tx, id };
52 (tx, rx)
53}
54
55pub struct Sender<T> {
57 tx: mpsc::Sender<T>,
58 notifier: Arc<dyn Notifier>,
59 id: NotificationId,
60}
61
62impl<T> Sender<T> {
63 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
65 self.tx.send(t).map_err(SendError::from)?;
66 self.notifier.notify(self.id).map_err(SendError::from)
67 }
68}
69
70pub struct CrossbeamSender<T> {
72 tx: beamchannel::Sender<T>,
73 notifier: Arc<dyn Notifier>,
74 id: NotificationId,
75}
76
77impl<T> CrossbeamSender<T> {
78 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
81 self.tx.send(t).map_err(SendError::from)?;
82 self.notifier.notify(self.id).map_err(SendError::from)
83 }
84
85 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
90 self.tx
91 .try_send(t)
92 .map_err(From::from)
93 .and_then(|_| self.notifier.notify(self.id).map_err(From::from))
94 }
95}
96
97pub struct SyncSender<T> {
99 tx: mpsc::SyncSender<T>,
100 notifier: Arc<dyn Notifier>,
101 id: NotificationId,
102}
103
104impl<T> SyncSender<T> {
105 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
110 self.tx
111 .send(t)
112 .map_err(From::from)
113 .and_then(|_| self.notifier.notify(self.id).map_err(From::from))
114 }
115
116 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
121 self.tx
122 .try_send(t)
123 .map_err(From::from)
124 .and_then(|_| self.notifier.notify(self.id).map_err(From::from))
125 }
126}
127
128pub enum SendError<T> {
130 Io(io::Error),
132
133 Disconnected(T),
135
136 NotificationQueueFull,
138}
139
140pub enum TrySendError<T> {
142 Io(io::Error),
144
145 Full(T),
147
148 Disconnected(T),
150
151 NotificationQueueFull,
153}
154
155impl<T> Clone for Sender<T> {
156 fn clone(&self) -> Sender<T> {
157 Sender {
158 tx: self.tx.clone(),
159 notifier: Arc::clone(&self.notifier),
160 id: self.id,
161 }
162 }
163}
164
165impl<T> Clone for SyncSender<T> {
166 fn clone(&self) -> SyncSender<T> {
167 SyncSender {
168 tx: self.tx.clone(),
169 notifier: Arc::clone(&self.notifier),
170 id: self.id,
171 }
172 }
173}
174
175impl<T> From<mpsc::SendError<T>> for SendError<T> {
182 fn from(src: mpsc::SendError<T>) -> Self {
183 SendError::Disconnected(src.0)
184 }
185}
186
187impl<T> From<io::Error> for SendError<T> {
188 fn from(src: io::Error) -> Self {
189 SendError::Io(src)
190 }
191}
192
193impl<T> From<beamchannel::SendError<T>> for SendError<T> {
194 fn from(src: beamchannel::SendError<T>) -> Self {
195 SendError::Disconnected(src.0)
196 }
197}
198
199impl<T> From<NotificationError<NotificationId>> for SendError<T> {
200 fn from(_: NotificationError<NotificationId>) -> Self {
201 SendError::NotificationQueueFull
202 }
203}
204
205impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
206 fn from(src: mpsc::TrySendError<T>) -> Self {
207 match src {
208 mpsc::TrySendError::Full(v) => TrySendError::Full(v),
209 mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
210 }
211 }
212}
213
214impl<T> From<NotificationError<NotificationId>> for TrySendError<T> {
215 fn from(_: NotificationError<NotificationId>) -> Self {
216 TrySendError::NotificationQueueFull
217 }
218}
219
220impl<T> From<beamchannel::TrySendError<T>> for TrySendError<T> {
221 fn from(src: beamchannel::TrySendError<T>) -> Self {
222 match src {
223 beamchannel::TrySendError::Full(v) => TrySendError::Full(v),
224 beamchannel::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
225 }
226 }
227}
228
229impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
230 fn from(src: mpsc::SendError<T>) -> Self {
231 TrySendError::Disconnected(src.0)
232 }
233}
234
235impl<T> From<io::Error> for TrySendError<T> {
236 fn from(src: io::Error) -> Self {
237 TrySendError::Io(src)
238 }
239}
240
241impl<T> error::Error for SendError<T> {}
248
249impl<T> error::Error for TrySendError<T> {}
250
251impl<T> fmt::Debug for SendError<T> {
252 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
253 match self {
254 SendError::Io(io_err) => write!(f, "{:?}", io_err),
255 SendError::Disconnected(_) => write!(f, "Disconnected(..)"),
256 SendError::NotificationQueueFull => write!(f, "NotificationQueueFull"),
257 }
258 }
259}
260
261impl<T> fmt::Display for SendError<T> {
262 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
263 match self {
264 SendError::Io(io_err) => write!(f, "{}", io_err),
265 SendError::Disconnected(_) => write!(f, "sending on a closed channel"),
266 SendError::NotificationQueueFull => write!(f, "sending on a full notification queue"),
267 }
268 }
269}
270
271impl<T> fmt::Debug for TrySendError<T> {
272 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
273 match self {
274 TrySendError::Io(io_err) => write!(f, "{:?}", io_err),
275 TrySendError::Full(..) => write!(f, "Full(..)"),
276 TrySendError::Disconnected(..) => write!(f, "Disconnected(..)"),
277 TrySendError::NotificationQueueFull => write!(f, "NotificationQueueFull"),
278 }
279 }
280}
281
282impl<T> fmt::Display for TrySendError<T> {
283 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
284 match self {
285 TrySendError::Io(io_err) => write!(f, "{}", io_err),
286 TrySendError::Full(..) => write!(f, "sending on a full channel"),
287 TrySendError::Disconnected(..) => write!(f, "sending on a closed channel"),
288 TrySendError::NotificationQueueFull => {
289 write!(f, "sending on a full notification queue")
290 }
291 }
292 }
293}