1use mio::{Evented, Ready, Poll, PollOpt, Registration, SetReadiness, Token};
4use lazycell::{LazyCell, AtomicLazyCell};
5use std::any::Any;
6use std::{io, fmt};
7use std::error;
8use std::sync::{mpsc, Arc};
9use std::sync::atomic::{AtomicUsize, Ordering};
10
11pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
14 let (tx_ctl, rx_ctl) = ctl_pair();
15 let (tx, rx) = mpsc::channel();
16
17 let tx = Sender {
18 tx: tx,
19 ctl: tx_ctl,
20 };
21
22 let rx = Receiver {
23 rx: rx,
24 ctl: rx_ctl,
25 };
26
27 (tx, rx)
28}
29
30pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
33 let (tx_ctl, rx_ctl) = ctl_pair();
34 let (tx, rx) = mpsc::sync_channel(bound);
35
36 let tx = SyncSender {
37 tx: tx,
38 ctl: tx_ctl,
39 };
40
41 let rx = Receiver {
42 rx: rx,
43 ctl: rx_ctl,
44 };
45
46 (tx, rx)
47}
48
49pub fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
50 let inner = Arc::new(Inner {
51 pending: AtomicUsize::new(0),
52 senders: AtomicUsize::new(1),
53 set_readiness: AtomicLazyCell::new(),
54 });
55
56 let tx = SenderCtl {
57 inner: inner.clone(),
58 };
59
60 let rx = ReceiverCtl {
61 registration: LazyCell::new(),
62 inner: inner,
63 };
64
65 (tx, rx)
66}
67
68pub struct SenderCtl {
70 inner: Arc<Inner>,
71}
72
73pub struct ReceiverCtl {
75 registration: LazyCell<Registration>,
76 inner: Arc<Inner>,
77}
78
79pub struct Sender<T> {
80 tx: mpsc::Sender<T>,
81 ctl: SenderCtl,
82}
83
84pub struct SyncSender<T> {
85 tx: mpsc::SyncSender<T>,
86 ctl: SenderCtl,
87}
88
89pub struct Receiver<T> {
90 rx: mpsc::Receiver<T>,
91 ctl: ReceiverCtl,
92}
93
94pub enum SendError<T> {
95 Io(io::Error),
96 Disconnected(T),
97}
98
99pub enum TrySendError<T> {
100 Io(io::Error),
101 Full(T),
102 Disconnected(T),
103}
104
105struct Inner {
106 pending: AtomicUsize,
108 senders: AtomicUsize,
110 set_readiness: AtomicLazyCell<SetReadiness>,
112}
113
114impl<T> Sender<T> {
115 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
116 self.tx.send(t)
117 .map_err(SendError::from)
118 .and_then(|_| {
119 try!(self.ctl.inc());
120 Ok(())
121 })
122 }
123}
124
125impl<T> Clone for Sender<T> {
126 fn clone(&self) -> Sender<T> {
127 Sender {
128 tx: self.tx.clone(),
129 ctl: self.ctl.clone(),
130 }
131 }
132}
133
134impl<T> SyncSender<T> {
135 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
136 self.tx.send(t)
137 .map_err(From::from)
138 .and_then(|_| {
139 try!(self.ctl.inc());
140 Ok(())
141 })
142 }
143
144 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
145 self.tx.try_send(t)
146 .map_err(From::from)
147 .and_then(|_| {
148 try!(self.ctl.inc());
149 Ok(())
150 })
151 }
152}
153
154impl<T> Clone for SyncSender<T> {
155 fn clone(&self) -> SyncSender<T> {
156 SyncSender {
157 tx: self.tx.clone(),
158 ctl: self.ctl.clone(),
159 }
160 }
161}
162
163impl<T> Receiver<T> {
164 pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
165 self.rx.try_recv().and_then(|res| {
166 let _ = self.ctl.dec();
167 Ok(res)
168 })
169 }
170}
171
172impl<T> Evented for Receiver<T> {
173 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
174 self.ctl.register(poll, token, interest, opts)
175 }
176
177 fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
178 self.ctl.reregister(poll, token, interest, opts)
179 }
180
181 fn deregister(&self, poll: &Poll) -> io::Result<()> {
182 self.ctl.deregister(poll)
183 }
184}
185
186impl SenderCtl {
193 pub fn inc(&self) -> io::Result<()> {
195 let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
196
197 if 0 == cnt {
198 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
200 try!(set_readiness.set_readiness(Ready::readable()));
201 }
202 }
203
204 Ok(())
205 }
206}
207
208impl Clone for SenderCtl {
209 fn clone(&self) -> SenderCtl {
210 self.inner.senders.fetch_add(1, Ordering::Relaxed);
211 SenderCtl { inner: self.inner.clone() }
212 }
213}
214
215impl Drop for SenderCtl {
216 fn drop(&mut self) {
217 if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
218 let _ = self.inc();
219 }
220 }
221}
222
223impl ReceiverCtl {
224 pub fn dec(&self) -> io::Result<()> {
225 let first = self.inner.pending.load(Ordering::Acquire);
226
227 if first == 1 {
228 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
230 try!(set_readiness.set_readiness(Ready::none()));
231 }
232 }
233
234 let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
236
237 if first == 1 && second > 1 {
238 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
241 try!(set_readiness.set_readiness(Ready::readable()));
242 }
243 }
244
245 Ok(())
246 }
247}
248
249impl Evented for ReceiverCtl {
250 fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
251 if self.registration.borrow().is_some() {
252 return Err(io::Error::new(io::ErrorKind::Other, "receiver already registered"));
253 }
254
255 let (registration, set_readiness) = Registration::new(poll, token, interest, opts);
256
257
258 if self.inner.pending.load(Ordering::Relaxed) > 0 {
259 let _ = set_readiness.set_readiness(Ready::readable());
261 }
262
263 self.registration.fill(registration).ok().expect("unexpected state encountered");
264 self.inner.set_readiness.fill(set_readiness).ok().expect("unexpected state encountered");
265
266 Ok(())
267 }
268
269 fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
270 match self.registration.borrow() {
271 Some(registration) => registration.update(poll, token, interest, opts),
272 None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
273 }
274 }
275
276 fn deregister(&self, poll: &Poll) -> io::Result<()> {
277 match self.registration.borrow() {
278 Some(registration) => registration.deregister(poll),
279 None => Err(io::Error::new(io::ErrorKind::Other, "receiver not registered")),
280 }
281 }
282}
283
284impl<T> From<mpsc::SendError<T>> for SendError<T> {
291 fn from(src: mpsc::SendError<T>) -> SendError<T> {
292 SendError::Disconnected(src.0)
293 }
294}
295
296impl<T> From<io::Error> for SendError<T> {
297 fn from(src: io::Error) -> SendError<T> {
298 SendError::Io(src)
299 }
300}
301
302impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
303 fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
304 match src {
305 mpsc::TrySendError::Full(v) => TrySendError::Full(v),
306 mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
307 }
308 }
309}
310
311impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
312 fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
313 TrySendError::Disconnected(src.0)
314 }
315}
316
317impl<T> From<io::Error> for TrySendError<T> {
318 fn from(src: io::Error) -> TrySendError<T> {
319 TrySendError::Io(src)
320 }
321}
322
323impl<T: Any> error::Error for SendError<T> {
330 fn description(&self) -> &str {
331 match self {
332 &SendError::Io(ref io_err) => io_err.description(),
333 &SendError::Disconnected(..) => "Disconnected",
334 }
335 }
336}
337
338impl<T: Any> error::Error for TrySendError<T> {
339 fn description(&self) -> &str {
340 match self {
341 &TrySendError::Io(ref io_err) => io_err.description(),
342 &TrySendError::Full(..) => "Full",
343 &TrySendError::Disconnected(..) => "Disconnected",
344 }
345 }
346}
347
348impl<T> fmt::Debug for SendError<T> {
349 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
350 format_send_error(self, f)
351 }
352}
353
354impl<T> fmt::Display for SendError<T> {
355 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
356 format_send_error(self, f)
357 }
358}
359
360impl<T> fmt::Debug for TrySendError<T> {
361 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
362 format_try_send_error(self, f)
363 }
364}
365
366impl<T> fmt::Display for TrySendError<T> {
367 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
368 format_try_send_error(self, f)
369 }
370}
371
372#[inline]
373fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
374 match e {
375 &SendError::Io(ref io_err) => write!(f, "{}", io_err),
376 &SendError::Disconnected(..) => write!(f, "Disconnected"),
377 }
378}
379
380#[inline]
381fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
382 match e {
383 &TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
384 &TrySendError::Full(..) => write!(f, "Full"),
385 &TrySendError::Disconnected(..) => write!(f, "Disconnected"),
386 }
387}