tokio_sync/mpsc/
bounded.rs1use super::chan;
2
3use futures::{Poll, Sink, StartSend, Stream};
4
5use std::fmt;
6
7pub struct Sender<T> {
11    chan: chan::Tx<T, Semaphore>,
12}
13
14impl<T> Clone for Sender<T> {
15    fn clone(&self) -> Self {
16        Sender {
17            chan: self.chan.clone(),
18        }
19    }
20}
21
22impl<T> fmt::Debug for Sender<T> {
23    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
24        fmt.debug_struct("Sender")
25            .field("chan", &self.chan)
26            .finish()
27    }
28}
29
30pub struct Receiver<T> {
34    chan: chan::Rx<T, Semaphore>,
36}
37
38impl<T> fmt::Debug for Receiver<T> {
39    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
40        fmt.debug_struct("Receiver")
41            .field("chan", &self.chan)
42            .finish()
43    }
44}
45
46#[derive(Debug)]
48pub struct SendError(());
49
50#[derive(Debug)]
52pub struct TrySendError<T> {
53    kind: ErrorKind,
54    value: T,
55}
56
57#[derive(Debug)]
58enum ErrorKind {
59    Closed,
60    NoCapacity,
61}
62
63#[derive(Debug)]
65pub struct RecvError(());
66
67pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
116    assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
117    let semaphore = (::semaphore::Semaphore::new(buffer), buffer);
118    let (tx, rx) = chan::channel(semaphore);
119
120    let tx = Sender::new(tx);
121    let rx = Receiver::new(rx);
122
123    (tx, rx)
124}
125
126type Semaphore = (::semaphore::Semaphore, usize);
129
130impl<T> Receiver<T> {
131    pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
132        Receiver { chan }
133    }
134
135    pub fn close(&mut self) {
140        self.chan.close();
141    }
142}
143
144impl<T> Stream for Receiver<T> {
145    type Item = T;
146    type Error = RecvError;
147
148    fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
149        self.chan.recv().map_err(|_| RecvError(()))
150    }
151}
152
153impl<T> Sender<T> {
154    pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
155        Sender { chan }
156    }
157
158    pub fn poll_ready(&mut self) -> Poll<(), SendError> {
179        self.chan.poll_ready().map_err(|_| SendError(()))
180    }
181
182    pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
185        self.chan.try_send(message)?;
186        Ok(())
187    }
188}
189
190impl<T> Sink for Sender<T> {
191    type SinkItem = T;
192    type SinkError = SendError;
193
194    fn start_send(&mut self, msg: T) -> StartSend<T, Self::SinkError> {
195        use futures::Async::*;
196        use futures::AsyncSink;
197
198        match self.poll_ready()? {
199            Ready(_) => {
200                self.try_send(msg).map_err(|_| SendError(()))?;
201                Ok(AsyncSink::Ready)
202            }
203            NotReady => Ok(AsyncSink::NotReady(msg)),
204        }
205    }
206
207    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
208        use futures::Async::Ready;
209        Ok(Ready(()))
210    }
211
212    fn close(&mut self) -> Poll<(), Self::SinkError> {
213        use futures::Async::Ready;
214        Ok(Ready(()))
215    }
216}
217
218impl fmt::Display for SendError {
221    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
222        use std::error::Error;
223        write!(fmt, "{}", self.description())
224    }
225}
226
227impl ::std::error::Error for SendError {
228    fn description(&self) -> &str {
229        "channel closed"
230    }
231}
232
233impl<T> TrySendError<T> {
236    pub fn into_inner(self) -> T {
238        self.value
239    }
240
241    pub fn is_closed(&self) -> bool {
243        if let ErrorKind::Closed = self.kind {
244            true
245        } else {
246            false
247        }
248    }
249
250    pub fn is_full(&self) -> bool {
252        if let ErrorKind::NoCapacity = self.kind {
253            true
254        } else {
255            false
256        }
257    }
258}
259
260impl<T: fmt::Debug> fmt::Display for TrySendError<T> {
261    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
262        use std::error::Error;
263        write!(fmt, "{}", self.description())
264    }
265}
266
267impl<T: fmt::Debug> ::std::error::Error for TrySendError<T> {
268    fn description(&self) -> &str {
269        match self.kind {
270            ErrorKind::Closed => "channel closed",
271            ErrorKind::NoCapacity => "no available capacity",
272        }
273    }
274}
275
276impl<T> From<(T, chan::TrySendError)> for TrySendError<T> {
277    fn from((value, err): (T, chan::TrySendError)) -> TrySendError<T> {
278        TrySendError {
279            value,
280            kind: match err {
281                chan::TrySendError::Closed => ErrorKind::Closed,
282                chan::TrySendError::NoPermits => ErrorKind::NoCapacity,
283            },
284        }
285    }
286}
287
288impl fmt::Display for RecvError {
291    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
292        use std::error::Error;
293        write!(fmt, "{}", self.description())
294    }
295}
296
297impl ::std::error::Error for RecvError {
298    fn description(&self) -> &str {
299        "channel closed"
300    }
301}