async_slot/
sync.rs

1//! A thread-safe variant of an unbounded channel that only
2//! stores last value sent
3
4use std::mem;
5use std::sync::{Arc, Weak, Mutex};
6
7use futures::task::{self, Task};
8use futures::{Sink, Stream, AsyncSink, Async, Poll, StartSend};
9
10use SendError;
11
12/// Slot is very similar to unbounded channel but only stores last value sent
13///
14/// I.e. if you want to send some value between from producer to a consumer
15/// and if consumer is slow it should skip old values, the slot is
16/// a structure for the task.
17
18/// The transmission end of a channel which is used to send values
19///
20/// If the receiver is not fast enough only the last value is preserved and
21/// other ones are discarded.
22#[derive(Debug)]
23pub struct Sender<T> {
24    inner: Weak<Mutex<Inner<T>>>,
25}
26
27/// The receiving end of a channel which preserves only the last value
28#[derive(Debug)]
29pub struct Receiver<T> {
30    inner: Arc<Mutex<Inner<T>>>,
31}
32
33#[derive(Debug)]
34struct Inner<T> {
35    value: Option<T>,
36    read_task: Option<Task>,
37    cancel_task: Option<Task>,
38}
39
40trait AssertKindsSender: Send + Sync {}
41impl AssertKindsSender for Sender<u32> {}
42
43trait AssertKindsReceiver: Send + Sync {}
44impl AssertKindsReceiver for Receiver<u32> {}
45
46impl<T> Sender<T> {
47    /// Sets the new new value of the stream and notifies the consumer if any
48    ///
49    /// This function will store the `value` provided as the current value for
50    /// this channel, replacing any previous value that may have been there. If
51    /// the receiver may still be able to receive this message, then `Ok` is
52    /// returned with the previous value that was in this channel.
53    ///
54    /// If `Ok(Some)` is returned then this value overwrote a previous value,
55    /// and the value was never received by the receiver. If `Ok(None)` is
56    /// returned, then no previous value was found and the `value` is queued up
57    /// to be received by the receiver.
58    ///
59    /// # Errors
60    ///
61    /// This function will return an `Err` if the receiver has gone away and
62    /// it's impossible to send this value to the receiver. The error returned
63    /// retains ownership of the `value` provided and can be extracted, if
64    /// necessary.
65    pub fn swap(&self, value: T) -> Result<Option<T>, SendError<T>> {
66        let result;
67        // Do this step first so that the lock is dropped when
68        // `unpark` is called
69        let task = {
70            if let Some(ref lock) = self.inner.upgrade() {
71                let mut inner = lock.lock().unwrap();
72                result = inner.value.take();
73                inner.value = Some(value);
74                inner.read_task.take()
75            } else {
76                return Err(SendError(value));
77            }
78        };
79        if let Some(task) = task {
80            task.notify();
81        }
82        return Ok(result);
83    }
84    /// Polls this `Sender` half to detect whether the `Receiver` this has
85    /// paired with has gone away.
86    ///
87    /// This function can be used to learn about when the `Receiver` (consumer)
88    /// half has gone away and nothing will be able to receive a message sent
89    /// from `send` (or `swap`).
90    ///
91    /// If `Ready` is returned then it means that the `Receiver` has disappeared
92    /// and the result this `Sender` would otherwise produce should no longer
93    /// be produced.
94    ///
95    /// If `NotReady` is returned then the `Receiver` is still alive and may be
96    /// able to receive a message if sent. The current task, however, is
97    /// scheduled to receive a notification if the corresponding `Receiver` goes
98    /// away.
99    ///
100    /// # Panics
101    ///
102    /// Like `Future::poll`, this function will panic if it's not called from
103    /// within the context of a task. In other words, this should only ever be
104    /// called from inside another future.
105    ///
106    /// If you're calling this function from a context that does not have a
107    /// task, then you can use the `is_canceled` API instead.
108    pub fn poll_cancel(&mut self) -> Poll<(), ()> {
109        if let Some(ref lock) = self.inner.upgrade() {
110            let mut inner = lock.lock().unwrap();
111            inner.cancel_task = Some(task::current());
112            Ok(Async::NotReady)
113        } else {
114            Ok(Async::Ready(()))
115        }
116    }
117
118    /// Tests to see whether this `Sender`'s corresponding `Receiver`
119    /// has gone away.
120    ///
121    /// This function can be used to learn about when the `Receiver` (consumer)
122    /// half has gone away and nothing will be able to receive a message sent
123    /// from `send`.
124    ///
125    /// Note that this function is intended to *not* be used in the context of a
126    /// future. If you're implementing a future you probably want to call the
127    /// `poll_cancel` function which will block the current task if the
128    /// cancellation hasn't happened yet. This can be useful when working on a
129    /// non-futures related thread, though, which would otherwise panic if
130    /// `poll_cancel` were called.
131    pub fn is_canceled(&self) -> bool {
132        self.inner.upgrade().is_none()
133    }
134}
135
136impl<T> Sink for Sender<T> {
137    type SinkItem = T;
138    type SinkError = SendError<T>;
139    fn start_send(&mut self, item: T) -> StartSend<T, SendError<T>> {
140        self.swap(item)?;
141        Ok(AsyncSink::Ready)
142    }
143    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
144        Ok(Async::Ready(()))
145    }
146    fn close(&mut self) -> Poll<(), Self::SinkError> {
147        // Do this step first so that the lock is dropped *and*
148        // weakref is dropped when `unpark` is called
149        let task = {
150            let weak = mem::replace(&mut self.inner, Weak::new());
151            if let Some(ref lock) = weak.upgrade() {
152                drop(weak);
153                let mut inner = lock.lock().unwrap();
154                inner.read_task.take()
155            } else {
156                None
157            }
158        };
159        // notify on any drop of a sender, so eventually receiver wakes up
160        // when there are no senders and closes the stream
161        if let Some(task) = task {
162            task.notify();
163        }
164        Ok(Async::Ready(()))
165    }
166}
167
168impl<T> Drop for Sender<T> {
169    fn drop(&mut self) {
170        self.close().ok();
171    }
172}
173
174impl<T> Stream for Receiver<T> {
175    type Item = T;
176    type Error = ();  // actually void
177    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
178        let result = {
179            let mut inner = self.inner.lock().unwrap();
180            if inner.value.is_none() {
181                if Arc::weak_count(&self.inner) == 0 {
182                    // no senders, terminate the stream
183                    return Ok(Async::Ready(None));
184                } else {
185                    inner.read_task = Some(task::current());
186                }
187            }
188            inner.value.take()
189        };
190        match result {
191            Some(value) => Ok(Async::Ready(Some(value))),
192            None => Ok(Async::NotReady),
193        }
194    }
195}
196
197impl<T> SendError<T> {
198    /// Returns the message that was attempted to be sent but failed.
199    pub fn into_inner(self) -> T {
200        self.0
201    }
202}
203
204/// Creates an in-memory Stream which only preserves last value
205///
206/// This method is somewhat similar to `channel(1)` but instead of preserving
207/// first value sent (and erroring on sender side) it replaces value if
208/// consumer is not fast enough and preserves last values sent on any
209/// poll of a stream.
210///
211/// # Example
212///
213/// ```
214/// extern crate futures;
215/// extern crate async_slot;
216///
217/// use futures::prelude::*;
218/// use futures::stream::iter_ok;
219///
220/// # fn main() {
221/// let (tx, rx) = async_slot::channel::<i32>();
222///
223/// tx.send_all(iter_ok(vec![1, 2, 3])).wait();
224///
225/// let received = rx.collect().wait().unwrap();
226/// assert_eq!(received, vec![3]);
227/// # }
228/// ```
229pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
230    let inner = Arc::new(Mutex::new(Inner {
231        value: None,
232        read_task: None,
233        cancel_task: None,
234    }));
235    return (Sender { inner: Arc::downgrade(&inner) },
236            Receiver { inner: inner });
237}