1#![allow(unused_imports, deprecated, missing_debug_implementations)]
4
5use crate::{Evented, Poll, PollOpt, Ready, Registration, SetReadiness, Token};
6use lazycell::{AtomicLazyCell, LazyCell};
7use std::any::Any;
8use std::error;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::{mpsc, Arc};
11use std::{fmt, io};
12
13pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
16 let (tx_ctl, rx_ctl) = ctl_pair();
17 let (tx, rx) = mpsc::channel();
18
19 let tx = Sender { tx, ctl: tx_ctl };
20
21 let rx = Receiver { rx, ctl: rx_ctl };
22
23 (tx, rx)
24}
25
26pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
29 let (tx_ctl, rx_ctl) = ctl_pair();
30 let (tx, rx) = mpsc::sync_channel(bound);
31
32 let tx = SyncSender { tx, ctl: tx_ctl };
33
34 let rx = Receiver { rx, ctl: rx_ctl };
35
36 (tx, rx)
37}
38
39fn ctl_pair() -> (SenderCtl, ReceiverCtl) {
40 let inner = Arc::new(Inner {
41 pending: AtomicUsize::new(0),
42 senders: AtomicUsize::new(1),
43 set_readiness: AtomicLazyCell::new(),
44 });
45
46 let tx = SenderCtl {
47 inner: Arc::clone(&inner),
48 };
49
50 let rx = ReceiverCtl {
51 registration: LazyCell::new(),
52 inner,
53 };
54
55 (tx, rx)
56}
57
58struct SenderCtl {
60 inner: Arc<Inner>,
61}
62
63struct ReceiverCtl {
65 registration: LazyCell<Registration>,
66 inner: Arc<Inner>,
67}
68
69pub struct Sender<T> {
71 tx: mpsc::Sender<T>,
72 ctl: SenderCtl,
73}
74
75pub struct SyncSender<T> {
77 tx: mpsc::SyncSender<T>,
78 ctl: SenderCtl,
79}
80
81pub struct Receiver<T> {
83 rx: mpsc::Receiver<T>,
84 ctl: ReceiverCtl,
85}
86
87pub enum SendError<T> {
89 Io(io::Error),
91
92 Disconnected(T),
94}
95
96pub enum TrySendError<T> {
98 Io(io::Error),
100
101 Full(T),
103
104 Disconnected(T),
106}
107
108struct Inner {
109 pending: AtomicUsize,
111 senders: AtomicUsize,
113 set_readiness: AtomicLazyCell<SetReadiness>,
115}
116
117impl<T> Sender<T> {
118 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
120 self.tx.send(t).map_err(SendError::from).and_then(|_| {
121 self.ctl.inc()?;
122 Ok(())
123 })
124 }
125}
126
127impl<T> Clone for Sender<T> {
128 fn clone(&self) -> Sender<T> {
129 Sender {
130 tx: self.tx.clone(),
131 ctl: self.ctl.clone(),
132 }
133 }
134}
135
136impl<T> SyncSender<T> {
137 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
142 self.tx.send(t).map_err(From::from).and_then(|_| {
143 self.ctl.inc()?;
144 Ok(())
145 })
146 }
147
148 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
153 self.tx.try_send(t).map_err(From::from).and_then(|_| {
154 self.ctl.inc()?;
155 Ok(())
156 })
157 }
158}
159
160impl<T> Clone for SyncSender<T> {
161 fn clone(&self) -> SyncSender<T> {
162 SyncSender {
163 tx: self.tx.clone(),
164 ctl: self.ctl.clone(),
165 }
166 }
167}
168
169impl<T> Receiver<T> {
170 pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
172 self.rx.try_recv().map(|res| {
173 let _ = self.ctl.dec();
174 res
175 })
176 }
177}
178
179impl<T> Evented for Receiver<T> {
180 fn register(
181 &self,
182 poll: &Poll,
183 token: Token,
184 interest: Ready,
185 opts: PollOpt,
186 ) -> io::Result<()> {
187 self.ctl.register(poll, token, interest, opts)
188 }
189
190 fn reregister(
191 &self,
192 poll: &Poll,
193 token: Token,
194 interest: Ready,
195 opts: PollOpt,
196 ) -> io::Result<()> {
197 self.ctl.reregister(poll, token, interest, opts)
198 }
199
200 fn deregister(&self, poll: &Poll) -> io::Result<()> {
201 self.ctl.deregister(poll)
202 }
203}
204
205impl SenderCtl {
212 fn inc(&self) -> io::Result<()> {
214 let cnt = self.inner.pending.fetch_add(1, Ordering::Acquire);
215
216 if 0 == cnt {
217 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
219 set_readiness.set_readiness(Ready::readable())?;
220 }
221 }
222
223 Ok(())
224 }
225}
226
227impl Clone for SenderCtl {
228 fn clone(&self) -> SenderCtl {
229 self.inner.senders.fetch_add(1, Ordering::Relaxed);
230 SenderCtl {
231 inner: Arc::clone(&self.inner),
232 }
233 }
234}
235
236impl Drop for SenderCtl {
237 fn drop(&mut self) {
238 if self.inner.senders.fetch_sub(1, Ordering::Release) == 1 {
239 let _ = self.inc();
240 }
241 }
242}
243
244impl ReceiverCtl {
245 fn dec(&self) -> io::Result<()> {
246 let first = self.inner.pending.load(Ordering::Acquire);
247
248 if first == 1 {
249 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
251 set_readiness.set_readiness(Ready::empty())?;
252 }
253 }
254
255 let second = self.inner.pending.fetch_sub(1, Ordering::AcqRel);
257
258 if first == 1 && second > 1 {
259 if let Some(set_readiness) = self.inner.set_readiness.borrow() {
262 set_readiness.set_readiness(Ready::readable())?;
263 }
264 }
265
266 Ok(())
267 }
268}
269
270impl Evented for ReceiverCtl {
271 fn register(
272 &self,
273 poll: &Poll,
274 token: Token,
275 interest: Ready,
276 opts: PollOpt,
277 ) -> io::Result<()> {
278 if self.registration.borrow().is_some() {
279 return Err(io::Error::new(
280 io::ErrorKind::Other,
281 "receiver already registered",
282 ));
283 }
284
285 let (registration, set_readiness) = Registration::new2();
286 poll.register(®istration, token, interest, opts)?;
287
288 if self.inner.pending.load(Ordering::Relaxed) > 0 {
289 let _ = set_readiness.set_readiness(Ready::readable());
291 }
292
293 self.registration
294 .fill(registration)
295 .expect("unexpected state encountered");
296 self.inner
297 .set_readiness
298 .fill(set_readiness)
299 .expect("unexpected state encountered");
300
301 Ok(())
302 }
303
304 fn reregister(
305 &self,
306 poll: &Poll,
307 token: Token,
308 interest: Ready,
309 opts: PollOpt,
310 ) -> io::Result<()> {
311 match self.registration.borrow() {
312 Some(registration) => poll.reregister(registration, token, interest, opts),
313 None => Err(io::Error::new(
314 io::ErrorKind::Other,
315 "receiver not registered",
316 )),
317 }
318 }
319
320 fn deregister(&self, poll: &Poll) -> io::Result<()> {
321 match self.registration.borrow() {
322 Some(registration) => poll.deregister(registration),
323 None => Err(io::Error::new(
324 io::ErrorKind::Other,
325 "receiver not registered",
326 )),
327 }
328 }
329}
330
331impl<T> From<mpsc::SendError<T>> for SendError<T> {
338 fn from(src: mpsc::SendError<T>) -> SendError<T> {
339 SendError::Disconnected(src.0)
340 }
341}
342
343impl<T> From<io::Error> for SendError<T> {
344 fn from(src: io::Error) -> SendError<T> {
345 SendError::Io(src)
346 }
347}
348
349impl<T> From<mpsc::TrySendError<T>> for TrySendError<T> {
350 fn from(src: mpsc::TrySendError<T>) -> TrySendError<T> {
351 match src {
352 mpsc::TrySendError::Full(v) => TrySendError::Full(v),
353 mpsc::TrySendError::Disconnected(v) => TrySendError::Disconnected(v),
354 }
355 }
356}
357
358impl<T> From<mpsc::SendError<T>> for TrySendError<T> {
359 fn from(src: mpsc::SendError<T>) -> TrySendError<T> {
360 TrySendError::Disconnected(src.0)
361 }
362}
363
364impl<T> From<io::Error> for TrySendError<T> {
365 fn from(src: io::Error) -> TrySendError<T> {
366 TrySendError::Io(src)
367 }
368}
369
370impl<T: Any> error::Error for SendError<T> {}
377
378impl<T: Any> error::Error for TrySendError<T> {}
379
380impl<T> fmt::Debug for SendError<T> {
381 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
382 format_send_error(self, f)
383 }
384}
385
386impl<T> fmt::Display for SendError<T> {
387 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
388 format_send_error(self, f)
389 }
390}
391
392impl<T> fmt::Debug for TrySendError<T> {
393 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
394 format_try_send_error(self, f)
395 }
396}
397
398impl<T> fmt::Display for TrySendError<T> {
399 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
400 format_try_send_error(self, f)
401 }
402}
403
404#[inline]
405fn format_send_error<T>(e: &SendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
406 match *e {
407 SendError::Io(ref io_err) => write!(f, "{}", io_err),
408 SendError::Disconnected(..) => write!(f, "Disconnected"),
409 }
410}
411
412#[inline]
413fn format_try_send_error<T>(e: &TrySendError<T>, f: &mut fmt::Formatter) -> fmt::Result {
414 match *e {
415 TrySendError::Io(ref io_err) => write!(f, "{}", io_err),
416 TrySendError::Full(..) => write!(f, "Full"),
417 TrySendError::Disconnected(..) => write!(f, "Disconnected"),
418 }
419}