async_slot/sync.rs
1//! A thread-safe variant of an unbounded channel that only
2//! stores last value sent
3
4use std::mem;
5use std::sync::{Arc, Weak, Mutex};
6
7use futures::task::{self, Task};
8use futures::{Sink, Stream, AsyncSink, Async, Poll, StartSend};
9
10use SendError;
11
12/// Slot is very similar to unbounded channel but only stores last value sent
13///
14/// I.e. if you want to send some value between from producer to a consumer
15/// and if consumer is slow it should skip old values, the slot is
16/// a structure for the task.
17
18/// The transmission end of a channel which is used to send values
19///
20/// If the receiver is not fast enough only the last value is preserved and
21/// other ones are discarded.
22#[derive(Debug)]
23pub struct Sender<T> {
24 inner: Weak<Mutex<Inner<T>>>,
25}
26
27/// The receiving end of a channel which preserves only the last value
28#[derive(Debug)]
29pub struct Receiver<T> {
30 inner: Arc<Mutex<Inner<T>>>,
31}
32
33#[derive(Debug)]
34struct Inner<T> {
35 value: Option<T>,
36 read_task: Option<Task>,
37 cancel_task: Option<Task>,
38}
39
40trait AssertKindsSender: Send + Sync {}
41impl AssertKindsSender for Sender<u32> {}
42
43trait AssertKindsReceiver: Send + Sync {}
44impl AssertKindsReceiver for Receiver<u32> {}
45
46impl<T> Sender<T> {
47 /// Sets the new new value of the stream and notifies the consumer if any
48 ///
49 /// This function will store the `value` provided as the current value for
50 /// this channel, replacing any previous value that may have been there. If
51 /// the receiver may still be able to receive this message, then `Ok` is
52 /// returned with the previous value that was in this channel.
53 ///
54 /// If `Ok(Some)` is returned then this value overwrote a previous value,
55 /// and the value was never received by the receiver. If `Ok(None)` is
56 /// returned, then no previous value was found and the `value` is queued up
57 /// to be received by the receiver.
58 ///
59 /// # Errors
60 ///
61 /// This function will return an `Err` if the receiver has gone away and
62 /// it's impossible to send this value to the receiver. The error returned
63 /// retains ownership of the `value` provided and can be extracted, if
64 /// necessary.
65 pub fn swap(&self, value: T) -> Result<Option<T>, SendError<T>> {
66 let result;
67 // Do this step first so that the lock is dropped when
68 // `unpark` is called
69 let task = {
70 if let Some(ref lock) = self.inner.upgrade() {
71 let mut inner = lock.lock().unwrap();
72 result = inner.value.take();
73 inner.value = Some(value);
74 inner.read_task.take()
75 } else {
76 return Err(SendError(value));
77 }
78 };
79 if let Some(task) = task {
80 task.notify();
81 }
82 return Ok(result);
83 }
84 /// Polls this `Sender` half to detect whether the `Receiver` this has
85 /// paired with has gone away.
86 ///
87 /// This function can be used to learn about when the `Receiver` (consumer)
88 /// half has gone away and nothing will be able to receive a message sent
89 /// from `send` (or `swap`).
90 ///
91 /// If `Ready` is returned then it means that the `Receiver` has disappeared
92 /// and the result this `Sender` would otherwise produce should no longer
93 /// be produced.
94 ///
95 /// If `NotReady` is returned then the `Receiver` is still alive and may be
96 /// able to receive a message if sent. The current task, however, is
97 /// scheduled to receive a notification if the corresponding `Receiver` goes
98 /// away.
99 ///
100 /// # Panics
101 ///
102 /// Like `Future::poll`, this function will panic if it's not called from
103 /// within the context of a task. In other words, this should only ever be
104 /// called from inside another future.
105 ///
106 /// If you're calling this function from a context that does not have a
107 /// task, then you can use the `is_canceled` API instead.
108 pub fn poll_cancel(&mut self) -> Poll<(), ()> {
109 if let Some(ref lock) = self.inner.upgrade() {
110 let mut inner = lock.lock().unwrap();
111 inner.cancel_task = Some(task::current());
112 Ok(Async::NotReady)
113 } else {
114 Ok(Async::Ready(()))
115 }
116 }
117
118 /// Tests to see whether this `Sender`'s corresponding `Receiver`
119 /// has gone away.
120 ///
121 /// This function can be used to learn about when the `Receiver` (consumer)
122 /// half has gone away and nothing will be able to receive a message sent
123 /// from `send`.
124 ///
125 /// Note that this function is intended to *not* be used in the context of a
126 /// future. If you're implementing a future you probably want to call the
127 /// `poll_cancel` function which will block the current task if the
128 /// cancellation hasn't happened yet. This can be useful when working on a
129 /// non-futures related thread, though, which would otherwise panic if
130 /// `poll_cancel` were called.
131 pub fn is_canceled(&self) -> bool {
132 self.inner.upgrade().is_none()
133 }
134}
135
136impl<T> Sink for Sender<T> {
137 type SinkItem = T;
138 type SinkError = SendError<T>;
139 fn start_send(&mut self, item: T) -> StartSend<T, SendError<T>> {
140 self.swap(item)?;
141 Ok(AsyncSink::Ready)
142 }
143 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
144 Ok(Async::Ready(()))
145 }
146 fn close(&mut self) -> Poll<(), Self::SinkError> {
147 // Do this step first so that the lock is dropped *and*
148 // weakref is dropped when `unpark` is called
149 let task = {
150 let weak = mem::replace(&mut self.inner, Weak::new());
151 if let Some(ref lock) = weak.upgrade() {
152 drop(weak);
153 let mut inner = lock.lock().unwrap();
154 inner.read_task.take()
155 } else {
156 None
157 }
158 };
159 // notify on any drop of a sender, so eventually receiver wakes up
160 // when there are no senders and closes the stream
161 if let Some(task) = task {
162 task.notify();
163 }
164 Ok(Async::Ready(()))
165 }
166}
167
168impl<T> Drop for Sender<T> {
169 fn drop(&mut self) {
170 self.close().ok();
171 }
172}
173
174impl<T> Stream for Receiver<T> {
175 type Item = T;
176 type Error = (); // actually void
177 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
178 let result = {
179 let mut inner = self.inner.lock().unwrap();
180 if inner.value.is_none() {
181 if Arc::weak_count(&self.inner) == 0 {
182 // no senders, terminate the stream
183 return Ok(Async::Ready(None));
184 } else {
185 inner.read_task = Some(task::current());
186 }
187 }
188 inner.value.take()
189 };
190 match result {
191 Some(value) => Ok(Async::Ready(Some(value))),
192 None => Ok(Async::NotReady),
193 }
194 }
195}
196
197impl<T> SendError<T> {
198 /// Returns the message that was attempted to be sent but failed.
199 pub fn into_inner(self) -> T {
200 self.0
201 }
202}
203
204/// Creates an in-memory Stream which only preserves last value
205///
206/// This method is somewhat similar to `channel(1)` but instead of preserving
207/// first value sent (and erroring on sender side) it replaces value if
208/// consumer is not fast enough and preserves last values sent on any
209/// poll of a stream.
210///
211/// # Example
212///
213/// ```
214/// extern crate futures;
215/// extern crate async_slot;
216///
217/// use futures::prelude::*;
218/// use futures::stream::iter_ok;
219///
220/// # fn main() {
221/// let (tx, rx) = async_slot::channel::<i32>();
222///
223/// tx.send_all(iter_ok(vec![1, 2, 3])).wait();
224///
225/// let received = rx.collect().wait().unwrap();
226/// assert_eq!(received, vec![3]);
227/// # }
228/// ```
229pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
230 let inner = Arc::new(Mutex::new(Inner {
231 value: None,
232 read_task: None,
233 cancel_task: None,
234 }));
235 return (Sender { inner: Arc::downgrade(&inner) },
236 Receiver { inner: inner });
237}