mio_more/
channel.rs

1//! Thread safe communication channel implementing `Evented`
2
3use mio::{Evented, Ready, Poll, PollOpt, Registration, SetReadiness, Token};
4use lazycell::{LazyCell, AtomicLazyCell};
5use std::any::Any;
6use std::{io, fmt};
7use std::error;
8use std::sync::{mpsc, Arc};
9use std::sync::atomic::{AtomicUsize, Ordering};
10
11/// Creates a new asynchronous channel, where the `Receiver` can be registered
12/// with `Poll`.
13pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
14    let (tx_ctl, rx_ctl) = ctl_pair();
15    let (tx, rx) = mpsc::channel();
16
17    let tx = Sender {
18        tx: tx,
19        ctl: tx_ctl,
20    };
21
22    let rx = Receiver {
23        rx: rx,
24        ctl: rx_ctl,
25    };
26
27    (tx, rx)
28}
29
30/// Creates a new synchronous, bounded channel where the `Receiver` can be
31/// registered with `Poll`.
32pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
33    let (tx_ctl, rx_ctl) = ctl_pair();
34    let (tx, rx) = mpsc::sync_channel(bound);
35
36    let tx = SyncSender {
37        tx: tx,
38        ctl: tx_ctl,
39    };
40
41    let rx = Receiver {
42        rx: rx,
43        ctl: rx_ctl,
44    };
45
46    (tx, rx)
47}
48
49pub fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
50    let inner = Arc::new(Inner {
51        pending: AtomicUsize::new(0),
52        senders: AtomicUsize::new(1),
53        set_readiness: AtomicLazyCell::new(),
54    });
55
56    let tx = SenderCtl {
57        inner: inner.clone(),
58    };
59
60    let rx = ReceiverCtl {
61        registration: LazyCell::new(),
62        inner: inner,
63    };
64
65    (tx, rx)
66}
67
68/// Tracks messages sent on a channel in order to update readiness.
69pub struct SenderCtl {
70    inner: Arc<Inner>,
71}
72
73/// Tracks messages received on a channel in order to track readiness.
74pub struct ReceiverCtl {
75    registration: LazyCell<Registration>,
76    inner: Arc<Inner>,
77}
78
79pub struct Sender<T> {
80    tx: mpsc::Sender<T>,
81    ctl: SenderCtl,
82}
83
84pub struct SyncSender<T> {
85    tx: mpsc::SyncSender<T>,
86    ctl: SenderCtl,
87}
88
89pub struct Receiver<T> {
90    rx: mpsc::Receiver<T>,
91    ctl: ReceiverCtl,
92}
93
94pub enum SendError<T> {
95    Io(io::Error),
96    Disconnected(T),
97}
98
99pub enum TrySendError<T> {
100    Io(io::Error),
101    Full(T),
102    Disconnected(T),
103}
104
105struct Inner {
106    // The number of outstanding messages for the receiver to read
107    pending: AtomicUsize,
108    // The number of sender handles
109    senders: AtomicUsize,
110    // The set readiness handle
111    set_readiness: AtomicLazyCell<SetReadiness>,
112}
113
114impl<T> Sender<T> {
115    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
116        self.tx.send(t)
117            .map_err(SendError::from)
118            .and_then(|_| {
119                try!(self.ctl.inc());
120                Ok(())
121            })
122    }
123}
124
125impl<T> Clone for Sender<T> {
126    fn clone(&self) -> Sender<T> {
127        Sender {
128            tx: self.tx.clone(),
129            ctl: self.ctl.clone(),
130        }
131    }
132}
133
134impl<T> SyncSender<T> {
135    pub fn send(&self, t: T) -> Result<(), SendError<T>> {
136        self.tx.send(t)
137            .map_err(From::from)
138            .and_then(|_| {
139                try!(self.ctl.inc());
140                Ok(())
141            })
142    }
143
144    pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
145        self.tx.try_send(t)
146            .map_err(From::from)
147            .and_then(|_| {
148                try!(self.ctl.inc());
149                Ok(())
150            })
151    }
152}
153
154impl<T> Clone for SyncSender<T> {
155    fn clone(&self) -> SyncSender<T> {
156        SyncSender {
157            tx: self.tx.clone(),
158            ctl: self.ctl.clone(),
159        }
160    }
161}
162
163impl<T> Receiver<T> {
164    pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
165        self.rx.try_recv().and_then(|res| {
166            let _ = self.ctl.dec();
167            Ok(res)
168        })
169    }
170}
171
172impl<T> Evented for Receiver<T> {
173    fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
174        self.ctl.register(poll, token, interest, opts)
175    }
176
177    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
178        self.ctl.reregister(poll, token, interest, opts)
179    }
180
181    fn deregister(&self, poll: &Poll) -> io::Result<()> {
182        self.ctl.deregister(poll)
183    }
184}
185
186/*
187 *
188 * ===== SenderCtl / ReceiverCtl =====
189 *
190 */
191
192impl SenderCtl {
193    /// Call to track that a message has been sent
194    pub fn inc(&self) -> io::Result<()> {
195        let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
196
197        if 0 == cnt {
198            // Toggle readiness to readable
199            if let Some(set_readiness) = self.inner.set_readiness.borrow() {
200                try!(set_readiness.set_readiness(Ready::readable()));
201            }
202        }
203
204        Ok(())
205    }
206}
207
208impl Clone for SenderCtl {
209    fn clone(&self) -> SenderCtl {
210        self.inner.senders.fetch_add(1, Ordering::Relaxed);
211        SenderCtl { inner: self.inner.clone() }
212    }
213}
214
215impl Drop for SenderCtl {
216    fn drop(&mut self) {
217        if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
218            let _ = self.inc();
219        }
220    }
221}
222
223impl ReceiverCtl {
224    pub fn dec(&self) -> io::Result<()> {
225        let first = self.inner.pending.load(Ordering::Acquire);
226
227        if first == 1 {
228            // Unset readiness
229            if let Some(set_readiness) = self.inner.set_readiness.borrow() {
230                try!(set_readiness.set_readiness(Ready::none()));
231            }
232        }
233
234        // Decrement
235        let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
236
237        if first == 1 && second > 1 {
238            // There are still pending messages. Since readiness was
239            // previously unset, it must be reset here
240            if let Some(set_readiness) = self.inner.set_readiness.borrow() {
241                try!(set_readiness.set_readiness(Ready::readable()));
242            }
243        }
244
245        Ok(())
246    }
247}
248
249impl Evented for ReceiverCtl {
250    fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
251        if self.registration.borrow().is_some() {
252            return Err(io::Error::new(io::ErrorKind::Other, "receiver already registered"));
253        }
254
255        let (registration, set_readiness) = Registration::new(poll, token, interest, opts);
256
257
258        if self.inner.pending.load(Ordering::Relaxed) > 0 {
259            // TODO: Don't drop readiness
260            let _ = set_readiness.set_readiness(Ready::readable());
261        }
262
263        self.registration.fill(registration).ok().expect("unexpected state encountered");
264        self.inner.set_readiness.fill(set_readiness).ok().expect("unexpected state encountered");
265
266        Ok(())
267    }
268
269    fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
270        match self.registration.borrow() {
271            Some(registration) => registration.update(poll, token, interest, opts),
272            None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
273        }
274    }
275
276    fn deregister(&self, poll: &Poll) -> io::Result<()> {
277        match self.registration.borrow() {
278            Some(registration) => registration.deregister(poll),
279            None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
280        }
281    }
282}
283
284/*
285 *
286 * ===== Error conversions =====
287 *
288 */
289
290impl<T> From<mpsc::SendError<T>> for SendError<T> {
291    fn from(src: mpsc::SendError<T>) -> SendError<T> {
292        SendError::Disconnected(src.0)
293    }
294}
295
296impl<T> From<io::Error> for SendError<T> {
297    fn from(src: io::Error) -> SendError<T> {
298        SendError::Io(src)
299    }
300}
301
302impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
303    fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
304        match src {
305            mpsc::TrySendError::Full(v) => TrySendError::Full(v),
306            mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
307        }
308    }
309}
310
311impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
312    fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
313        TrySendError::Disconnected(src.0)
314    }
315}
316
317impl<T> From<io::Error> for TrySendError<T> {
318    fn from(src: io::Error) -> TrySendError<T> {
319        TrySendError::Io(src)
320    }
321}
322
323/*
324 *
325 * ===== Implement Error, Debug and Display for Errors =====
326 *
327 */
328
329impl<T: Any> error::Error for SendError<T> {
330    fn description(&self) -> &str {
331        match self {
332            &SendError::Io(ref io_err) => io_err.description(),
333            &SendError::Disconnected(..) => "Disconnected",
334        }
335    }
336}
337
338impl<T: Any> error::Error for TrySendError<T> {
339    fn description(&self) -> &str {
340        match self {
341            &TrySendError::Io(ref io_err) => io_err.description(),
342            &TrySendError::Full(..) => "Full",
343            &TrySendError::Disconnected(..) => "Disconnected",
344        }
345    }
346}
347
348impl<T> fmt::Debug for SendError<T> {
349    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
350        format_send_error(self, f)
351    }
352}
353
354impl<T> fmt::Display for SendError<T> {
355    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
356        format_send_error(self, f)
357    }
358}
359
360impl<T> fmt::Debug for TrySendError<T> {
361    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
362        format_try_send_error(self, f)
363    }
364}
365
366impl<T> fmt::Display for TrySendError<T> {
367    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
368        format_try_send_error(self, f)
369    }
370}
371
372#[inline]
373fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
374    match e {
375        &SendError::Io(ref io_err) => write!(f, "{}", io_err),
376        &SendError::Disconnected(..) => write!(f, "Disconnected"),
377    }
378}
379
380#[inline]
381fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
382    match e {
383        &TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
384        &TrySendError::Full(..) => write!(f, "Full"),
385        &TrySendError::Disconnected(..) => write!(f, "Disconnected"),
386    }
387}