mio_misc/
channel.rs

1//! Thread safe communication channels
2use crate::queue::{NotificationError, Notifier};
3use crate::NotificationId;
4use crossbeam::channel as beamchannel;
5use std::error;
6use std::sync::{mpsc, Arc};
7use std::{fmt, io};
8
9/// Creates a new asynchronous/unbounded channel, where the `Sender::send` function, in addition to sending a message,
10/// triggers a notification on `Poll`
11pub fn channel<T>(
12    notifier: Arc<dyn Notifier>,
13    id: NotificationId,
14) -> (Sender<T>, mpsc::Receiver<T>) {
15    let (tx, rx) = mpsc::channel();
16    let tx = Sender { notifier, tx, id };
17    (tx, rx)
18}
19
20/// Creates a new synchronous channel, where the `SyncSender::send` function, in addition to sending a message,
21/// triggers a notification on `Poll`
22pub fn sync_channel<T>(
23    notifier: Arc<dyn Notifier>,
24    id: NotificationId,
25    bound_size: usize,
26) -> (SyncSender<T>, mpsc::Receiver<T>) {
27    let (tx, rx) = mpsc::sync_channel(bound_size);
28    let tx = SyncSender { notifier, tx, id };
29    (tx, rx)
30}
31
32/// Creates a new asynchronous/unbounded crossbeam channel, where the `Sender::send` function, in addition to sending a message,
33/// triggers a notification on `Poll`
34pub fn crossbeam_channel_unbounded<T>(
35    notifier: Arc<dyn Notifier>,
36    id: NotificationId,
37) -> (CrossbeamSender<T>, beamchannel::Receiver<T>) {
38    let (tx, rx) = beamchannel::unbounded();
39    let tx = CrossbeamSender { notifier, tx, id };
40    (tx, rx)
41}
42
43/// Creates a new synchronous/bounded crossbeam channel, where the `Sender::send` function, in addition to sending a message,
44/// triggers a notification on `Poll`
45pub fn crossbeam_channel_bounded<T>(
46    notifier: Arc<dyn Notifier>,
47    id: NotificationId,
48    size: usize,
49) -> (CrossbeamSender<T>, beamchannel::Receiver<T>) {
50    let (tx, rx) = beamchannel::bounded(size);
51    let tx = CrossbeamSender { notifier, tx, id };
52    (tx, rx)
53}
54
55/// The sending half of a channel.
56pub struct Sender<T> {
57    tx: mpsc::Sender<T>,
58    notifier: Arc<dyn Notifier>,
59    id: NotificationId,
60}
61
62impl<T> Sender<T> {
63    /// Attempts to send a value on this channel, returning it back if it could not be sent.
64    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
65        self.tx.send(t).map_err(SendError::from)?;
66        self.notifier.notify(self.id).map_err(SendError::from)
67    }
68}
69
70/// The sending half of a channel crossbeam channel
71pub struct CrossbeamSender<T> {
72    tx: beamchannel::Sender<T>,
73    notifier: Arc<dyn Notifier>,
74    id: NotificationId,
75}
76
77impl<T> CrossbeamSender<T> {
78    /// Attempts to send a value on this channel, returning it back if it could not be sent.
79    /// For bounded channels, it will block.
80    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
81        self.tx.send(t).map_err(SendError::from)?;
82        self.notifier.notify(self.id).map_err(SendError::from)
83    }
84
85    /// Attempts to send a value on this channel without blocking.
86    ///
87    /// This method differs from `send` by returning immediately if the channel's
88    /// buffer is full or no receiver is waiting to acquire some data.
89    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
90        self.tx
91            .try_send(t)
92            .map_err(From::from)
93            .and_then(|_| self.notifier.notify(self.id).map_err(From::from))
94    }
95}
96
97/// The sending half of a synchronous channel.
98pub struct SyncSender<T> {
99    tx: mpsc::SyncSender<T>,
100    notifier: Arc<dyn Notifier>,
101    id: NotificationId,
102}
103
104impl<T> SyncSender<T> {
105    /// Sends a value on this synchronous channel.
106    ///
107    /// This function will *block* until space in the internal buffer becomes
108    /// available or a receiver is available to hand off the message to.
109    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
110        self.tx
111            .send(t)
112            .map_err(From::from)
113            .and_then(|_| self.notifier.notify(self.id).map_err(From::from))
114    }
115
116    /// Attempts to send a value on this channel without blocking.
117    ///
118    /// This method differs from `send` by returning immediately if the channel's
119    /// buffer is full or no receiver is waiting to acquire some data.
120    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
121        self.tx
122            .try_send(t)
123            .map_err(From::from)
124            .and_then(|_| self.notifier.notify(self.id).map_err(From::from))
125    }
126}
127
128/// An error returned from the `Sender::send`
129pub enum SendError<T> {
130    /// An IO error.
131    Io(io::Error),
132
133    /// The receiving half of the channel has disconnected.
134    Disconnected(T),
135
136    /// Underlying notification queue is full
137    NotificationQueueFull,
138}
139
140/// An error returned from the `SyncSender::try_send` function.
141pub enum TrySendError<T> {
142    /// An IO error.
143    Io(io::Error),
144
145    /// Data could not be sent over the channel because it would require the callee to block.
146    Full(T),
147
148    /// The receiving half of the channel has disconnected.
149    Disconnected(T),
150
151    /// Underlying notification queue is full
152    NotificationQueueFull,
153}
154
155impl<T> Clone for Sender<T> {
156    fn clone(&self) -> Sender<T> {
157        Sender {
158            tx: self.tx.clone(),
159            notifier: Arc::clone(&self.notifier),
160            id: self.id,
161        }
162    }
163}
164
165impl<T> Clone for SyncSender<T> {
166    fn clone(&self) -> SyncSender<T> {
167        SyncSender {
168            tx: self.tx.clone(),
169            notifier: Arc::clone(&self.notifier),
170            id: self.id,
171        }
172    }
173}
174
175/*
176 *
177 * ===== Implement Error conversions =====
178 *
179 */
180
181impl<T> From<mpsc::SendError<T>> for SendError<T> {
182    fn from(src: mpsc::SendError<T>) -> Self {
183        SendError::Disconnected(src.0)
184    }
185}
186
187impl<T> From<io::Error> for SendError<T> {
188    fn from(src: io::Error) -> Self {
189        SendError::Io(src)
190    }
191}
192
193impl<T> From<beamchannel::SendError<T>> for SendError<T> {
194    fn from(src: beamchannel::SendError<T>) -> Self {
195        SendError::Disconnected(src.0)
196    }
197}
198
199impl<T> From<NotificationError<NotificationId>> for SendError<T> {
200    fn from(_: NotificationError<NotificationId>) -> Self {
201        SendError::NotificationQueueFull
202    }
203}
204
205impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
206    fn from(src: mpsc::TrySendError<T>) -> Self {
207        match src {
208            mpsc::TrySendError::Full(v) => TrySendError::Full(v),
209            mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
210        }
211    }
212}
213
214impl<T> From<NotificationError<NotificationId>> for TrySendError<T> {
215    fn from(_: NotificationError<NotificationId>) -> Self {
216        TrySendError::NotificationQueueFull
217    }
218}
219
220impl<T> From<beamchannel::TrySendError<T>> for TrySendError<T> {
221    fn from(src: beamchannel::TrySendError<T>) -> Self {
222        match src {
223            beamchannel::TrySendError::Full(v) => TrySendError::Full(v),
224            beamchannel::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
225        }
226    }
227}
228
229impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
230    fn from(src: mpsc::SendError<T>) -> Self {
231        TrySendError::Disconnected(src.0)
232    }
233}
234
235impl<T> From<io::Error> for TrySendError<T> {
236    fn from(src: io::Error) -> Self {
237        TrySendError::Io(src)
238    }
239}
240
241/*
242 *
243 * ===== Implement Error, Debug, and Display for Errors =====
244 *
245 */
246
247impl<T> error::Error for SendError<T> {}
248
249impl<T> error::Error for TrySendError<T> {}
250
251impl<T> fmt::Debug for SendError<T> {
252    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
253        match self {
254            SendError::Io(io_err) => write!(f, "{:?}", io_err),
255            SendError::Disconnected(_) => write!(f, "Disconnected(..)"),
256            SendError::NotificationQueueFull => write!(f, "NotificationQueueFull"),
257        }
258    }
259}
260
261impl<T> fmt::Display for SendError<T> {
262    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
263        match self {
264            SendError::Io(io_err) => write!(f, "{}", io_err),
265            SendError::Disconnected(_) => write!(f, "sending on a closed channel"),
266            SendError::NotificationQueueFull => write!(f, "sending on a full notification queue"),
267        }
268    }
269}
270
271impl<T> fmt::Debug for TrySendError<T> {
272    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
273        match self {
274            TrySendError::Io(io_err) => write!(f, "{:?}", io_err),
275            TrySendError::Full(..) => write!(f, "Full(..)"),
276            TrySendError::Disconnected(..) => write!(f, "Disconnected(..)"),
277            TrySendError::NotificationQueueFull => write!(f, "NotificationQueueFull"),
278        }
279    }
280}
281
282impl<T> fmt::Display for TrySendError<T> {
283    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
284        match self {
285            TrySendError::Io(io_err) => write!(f, "{}", io_err),
286            TrySendError::Full(..) => write!(f, "sending on a full channel"),
287            TrySendError::Disconnected(..) => write!(f, "sending on a closed channel"),
288            TrySendError::NotificationQueueFull => {
289                write!(f, "sending on a full notification queue")
290            }
291        }
292    }
293}