Skip to main content

retty_io/
channel.rs

1//! Thread safe communication channel implementing `Evented`
2
3#![allow(unused_imports, deprecated, missing_debug_implementations)]
4
5use crate::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
6use lazycell::{AtomicLazyCell, LazyCell};
7use std::any::Any;
8use std::error;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::{mpsc, Arc};
11use std::{fmt, io};
12
13/// Creates a new asynchronous channel, where the `Receiver` can be registered
14/// with `Poll`.
15pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
16    let (tx_ctl, rx_ctl) = ctl_pair();
17    let (tx, rx) = mpsc::channel();
18
19    let tx = Sender { tx, ctl: tx_ctl };
20
21    let rx = Receiver { rx, ctl: rx_ctl };
22
23    (tx, rx)
24}
25
26/// Creates a new synchronous, bounded channel where the `Receiver` can be
27/// registered with `Poll`.
28pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
29    let (tx_ctl, rx_ctl) = ctl_pair();
30    let (tx, rx) = mpsc::sync_channel(bound);
31
32    let tx = SyncSender { tx, ctl: tx_ctl };
33
34    let rx = Receiver { rx, ctl: rx_ctl };
35
36    (tx, rx)
37}
38
39fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
40    let inner = Arc::new(Inner {
41        pending: AtomicUsize::new(0),
42        senders: AtomicUsize::new(1),
43        set_readiness: AtomicLazyCell::new(),
44    });
45
46    let tx = SenderCtl {
47        inner: Arc::clone(&inner),
48    };
49
50    let rx = ReceiverCtl {
51        registration: LazyCell::new(),
52        inner,
53    };
54
55    (tx, rx)
56}
57
58/// Tracks messages sent on a channel in order to update readiness.
59struct SenderCtl {
60    inner: Arc<Inner>,
61}
62
63/// Tracks messages received on a channel in order to track readiness.
64struct ReceiverCtl {
65    registration: LazyCell<Registration>,
66    inner: Arc<Inner>,
67}
68
69/// The sending half of a channel.
70pub struct Sender<T> {
71    tx: mpsc::Sender<T>,
72    ctl: SenderCtl,
73}
74
75/// The sending half of a synchronous channel.
76pub struct SyncSender<T> {
77    tx: mpsc::SyncSender<T>,
78    ctl: SenderCtl,
79}
80
81/// The receiving half of a channel.
82pub struct Receiver<T> {
83    rx: mpsc::Receiver<T>,
84    ctl: ReceiverCtl,
85}
86
87/// An error returned from the `Sender::send` or `SyncSender::send` function.
88pub enum SendError<T> {
89    /// An IO error.
90    Io(io::Error),
91
92    /// The receiving half of the channel has disconnected.
93    Disconnected(T),
94}
95
96/// An error returned from the `SyncSender::try_send` function.
97pub enum TrySendError<T> {
98    /// An IO error.
99    Io(io::Error),
100
101    /// Data could not be sent because it would require the callee to block.
102    Full(T),
103
104    /// The receiving half of the channel has disconnected.
105    Disconnected(T),
106}
107
108struct Inner {
109    // The number of outstanding messages for the receiver to read
110    pending: AtomicUsize,
111    // The number of sender handles
112    senders: AtomicUsize,
113    // The set readiness handle
114    set_readiness: AtomicLazyCell<SetReadiness>,
115}
116
117impl<T> Sender<T> {
118    /// Attempts to send a value on this channel, returning it back if it could not be sent.
119    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
120        self.tx.send(t).map_err(SendError::from).and_then(|_| {
121            self.ctl.inc()?;
122            Ok(())
123        })
124    }
125}
126
127impl<T> Clone for Sender<T> {
128    fn clone(&self) -> Sender<T> {
129        Sender {
130            tx: self.tx.clone(),
131            ctl: self.ctl.clone(),
132        }
133    }
134}
135
136impl<T> SyncSender<T> {
137    /// Sends a value on this synchronous channel.
138    ///
139    /// This function will *block* until space in the internal buffer becomes
140    /// available or a receiver is available to hand off the message to.
141    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
142        self.tx.send(t).map_err(From::from).and_then(|_| {
143            self.ctl.inc()?;
144            Ok(())
145        })
146    }
147
148    /// Attempts to send a value on this channel without blocking.
149    ///
150    /// This method differs from `send` by returning immediately if the channel's
151    /// buffer is full or no receiver is waiting to acquire some data.
152    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
153        self.tx.try_send(t).map_err(From::from).and_then(|_| {
154            self.ctl.inc()?;
155            Ok(())
156        })
157    }
158}
159
160impl<T> Clone for SyncSender<T> {
161    fn clone(&self) -> SyncSender<T> {
162        SyncSender {
163            tx: self.tx.clone(),
164            ctl: self.ctl.clone(),
165        }
166    }
167}
168
169impl<T> Receiver<T> {
170    /// Attempts to return a pending value on this receiver without blocking.
171    pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
172        self.rx.try_recv().map(|res| {
173            let _ = self.ctl.dec();
174            res
175        })
176    }
177}
178
179impl<T> Evented for Receiver<T> {
180    fn register(
181        &self,
182        poll: &Poll,
183        token: Token,
184        interest: Ready,
185        opts: PollOpt,
186    ) -> io::Result<()> {
187        self.ctl.register(poll, token, interest, opts)
188    }
189
190    fn reregister(
191        &self,
192        poll: &Poll,
193        token: Token,
194        interest: Ready,
195        opts: PollOpt,
196    ) -> io::Result<()> {
197        self.ctl.reregister(poll, token, interest, opts)
198    }
199
200    fn deregister(&self, poll: &Poll) -> io::Result<()> {
201        self.ctl.deregister(poll)
202    }
203}
204
205/*
206 *
207 * ===== SenderCtl / ReceiverCtl =====
208 *
209 */
210
211impl SenderCtl {
212    /// Call to track that a message has been sent
213    fn inc(&self) -> io::Result<()> {
214        let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
215
216        if 0 == cnt {
217            // Toggle readiness to readable
218            if let Some(set_readiness) = self.inner.set_readiness.borrow() {
219                set_readiness.set_readiness(Ready::readable())?;
220            }
221        }
222
223        Ok(())
224    }
225}
226
227impl Clone for SenderCtl {
228    fn clone(&self) -> SenderCtl {
229        self.inner.senders.fetch_add(1, Ordering::Relaxed);
230        SenderCtl {
231            inner: Arc::clone(&self.inner),
232        }
233    }
234}
235
236impl Drop for SenderCtl {
237    fn drop(&mut self) {
238        if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
239            let _ = self.inc();
240        }
241    }
242}
243
244impl ReceiverCtl {
245    fn dec(&self) -> io::Result<()> {
246        let first = self.inner.pending.load(Ordering::Acquire);
247
248        if first == 1 {
249            // Unset readiness
250            if let Some(set_readiness) = self.inner.set_readiness.borrow() {
251                set_readiness.set_readiness(Ready::empty())?;
252            }
253        }
254
255        // Decrement
256        let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
257
258        if first == 1 && second > 1 {
259            // There are still pending messages. Since readiness was
260            // previously unset, it must be reset here
261            if let Some(set_readiness) = self.inner.set_readiness.borrow() {
262                set_readiness.set_readiness(Ready::readable())?;
263            }
264        }
265
266        Ok(())
267    }
268}
269
270impl Evented for ReceiverCtl {
271    fn register(
272        &self,
273        poll: &Poll,
274        token: Token,
275        interest: Ready,
276        opts: PollOpt,
277    ) -> io::Result<()> {
278        if self.registration.borrow().is_some() {
279            return Err(io::Error::new(
280                io::ErrorKind::Other,
281                "receiver already registered",
282            ));
283        }
284
285        let (registration, set_readiness) = Registration::new2();
286        poll.register(&registration, token, interest, opts)?;
287
288        if self.inner.pending.load(Ordering::Relaxed) > 0 {
289            // TODO: Don't drop readiness
290            let _ = set_readiness.set_readiness(Ready::readable());
291        }
292
293        self.registration
294            .fill(registration)
295            .expect("unexpected state encountered");
296        self.inner
297            .set_readiness
298            .fill(set_readiness)
299            .expect("unexpected state encountered");
300
301        Ok(())
302    }
303
304    fn reregister(
305        &self,
306        poll: &Poll,
307        token: Token,
308        interest: Ready,
309        opts: PollOpt,
310    ) -> io::Result<()> {
311        match self.registration.borrow() {
312            Some(registration) => poll.reregister(registration, token, interest, opts),
313            None => Err(io::Error::new(
314                io::ErrorKind::Other,
315                "receiver not registered",
316            )),
317        }
318    }
319
320    fn deregister(&self, poll: &Poll) -> io::Result<()> {
321        match self.registration.borrow() {
322            Some(registration) => poll.deregister(registration),
323            None => Err(io::Error::new(
324                io::ErrorKind::Other,
325                "receiver not registered",
326            )),
327        }
328    }
329}
330
331/*
332 *
333 * ===== Error conversions =====
334 *
335 */
336
337impl<T> From<mpsc::SendError<T>> for SendError<T> {
338    fn from(src: mpsc::SendError<T>) -> SendError<T> {
339        SendError::Disconnected(src.0)
340    }
341}
342
343impl<T> From<io::Error> for SendError<T> {
344    fn from(src: io::Error) -> SendError<T> {
345        SendError::Io(src)
346    }
347}
348
349impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
350    fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
351        match src {
352            mpsc::TrySendError::Full(v) => TrySendError::Full(v),
353            mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
354        }
355    }
356}
357
358impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
359    fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
360        TrySendError::Disconnected(src.0)
361    }
362}
363
364impl<T> From<io::Error> for TrySendError<T> {
365    fn from(src: io::Error) -> TrySendError<T> {
366        TrySendError::Io(src)
367    }
368}
369
370/*
371 *
372 * ===== Implement Error, Debug and Display for Errors =====
373 *
374 */
375
376impl<T: Any> error::Error for SendError<T> {}
377
378impl<T: Any> error::Error for TrySendError<T> {}
379
380impl<T> fmt::Debug for SendError<T> {
381    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
382        format_send_error(self, f)
383    }
384}
385
386impl<T> fmt::Display for SendError<T> {
387    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
388        format_send_error(self, f)
389    }
390}
391
392impl<T> fmt::Debug for TrySendError<T> {
393    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
394        format_try_send_error(self, f)
395    }
396}
397
398impl<T> fmt::Display for TrySendError<T> {
399    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
400        format_try_send_error(self, f)
401    }
402}
403
404#[inline]
405fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
406    match *e {
407        SendError::Io(ref io_err) => write!(f, "{}", io_err),
408        SendError::Disconnected(..) => write!(f, "Disconnected"),
409    }
410}
411
412#[inline]
413fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
414    match *e {
415        TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
416        TrySendError::Full(..) => write!(f, "Full"),
417        TrySendError::Disconnected(..) => write!(f, "Disconnected"),
418    }
419}