1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
//! A thread-safe variant of an unbounded channel that only
//! stores last value sent

use std::mem;
use std::sync::{Arc, Weak, Mutex};

use futures::task::{self, Task};
use futures::{Sink, Stream, AsyncSink, Async, Poll, StartSend};

use SendError;

/// Slot is very similar to unbounded channel but only stores last value sent
///
/// I.e. if you want to send some value between from producer to a consumer
/// and if consumer is slow it should skip old values, the slot is
/// a structure for the task.

/// The transmission end of a channel which is used to send values
///
/// If the receiver is not fast enough only the last value is preserved and
/// other ones are discarded.
#[derive(Debug)]
pub struct Sender<T> {
    inner: Weak<Mutex<Inner<T>>>,
}

/// The receiving end of a channel which preserves only the last value
#[derive(Debug)]
pub struct Receiver<T> {
    inner: Arc<Mutex<Inner<T>>>,
}

#[derive(Debug)]
struct Inner<T> {
    value: Option<T>,
    read_task: Option<Task>,
    cancel_task: Option<Task>,
}

trait AssertKindsSender: Send + Sync {}
impl AssertKindsSender for Sender<u32> {}

trait AssertKindsReceiver: Send + Sync {}
impl AssertKindsReceiver for Receiver<u32> {}

impl<T> Sender<T> {
    /// Sets the new new value of the stream and notifies the consumer if any
    ///
    /// This function will store the `value` provided as the current value for
    /// this channel, replacing any previous value that may have been there. If
    /// the receiver may still be able to receive this message, then `Ok` is
    /// returned with the previous value that was in this channel.
    ///
    /// If `Ok(Some)` is returned then this value overwrote a previous value,
    /// and the value was never received by the receiver. If `Ok(None)` is
    /// returned, then no previous value was found and the `value` is queued up
    /// to be received by the receiver.
    ///
    /// # Errors
    ///
    /// This function will return an `Err` if the receiver has gone away and
    /// it's impossible to send this value to the receiver. The error returned
    /// retains ownership of the `value` provided and can be extracted, if
    /// necessary.
    pub fn swap(&self, value: T) -> Result<Option<T>, SendError<T>> {
        let result;
        // Do this step first so that the lock is dropped when
        // `unpark` is called
        let task = {
            if let Some(ref lock) = self.inner.upgrade() {
                let mut inner = lock.lock().unwrap();
                result = inner.value.take();
                inner.value = Some(value);
                inner.read_task.take()
            } else {
                return Err(SendError(value));
            }
        };
        if let Some(task) = task {
            task.notify();
        }
        return Ok(result);
    }
    /// Polls this `Sender` half to detect whether the `Receiver` this has
    /// paired with has gone away.
    ///
    /// This function can be used to learn about when the `Receiver` (consumer)
    /// half has gone away and nothing will be able to receive a message sent
    /// from `send` (or `swap`).
    ///
    /// If `Ready` is returned then it means that the `Receiver` has disappeared
    /// and the result this `Sender` would otherwise produce should no longer
    /// be produced.
    ///
    /// If `NotReady` is returned then the `Receiver` is still alive and may be
    /// able to receive a message if sent. The current task, however, is
    /// scheduled to receive a notification if the corresponding `Receiver` goes
    /// away.
    ///
    /// # Panics
    ///
    /// Like `Future::poll`, this function will panic if it's not called from
    /// within the context of a task. In other words, this should only ever be
    /// called from inside another future.
    ///
    /// If you're calling this function from a context that does not have a
    /// task, then you can use the `is_canceled` API instead.
    pub fn poll_cancel(&mut self) -> Poll<(), ()> {
        if let Some(ref lock) = self.inner.upgrade() {
            let mut inner = lock.lock().unwrap();
            inner.cancel_task = Some(task::current());
            Ok(Async::NotReady)
        } else {
            Ok(Async::Ready(()))
        }
    }

    /// Tests to see whether this `Sender`'s corresponding `Receiver`
    /// has gone away.
    ///
    /// This function can be used to learn about when the `Receiver` (consumer)
    /// half has gone away and nothing will be able to receive a message sent
    /// from `send`.
    ///
    /// Note that this function is intended to *not* be used in the context of a
    /// future. If you're implementing a future you probably want to call the
    /// `poll_cancel` function which will block the current task if the
    /// cancellation hasn't happened yet. This can be useful when working on a
    /// non-futures related thread, though, which would otherwise panic if
    /// `poll_cancel` were called.
    pub fn is_canceled(&self) -> bool {
        self.inner.upgrade().is_none()
    }
}

impl<T> Sink for Sender<T> {
    type SinkItem = T;
    type SinkError = SendError<T>;
    fn start_send(&mut self, item: T) -> StartSend<T, SendError<T>> {
        self.swap(item)?;
        Ok(AsyncSink::Ready)
    }
    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
        Ok(Async::Ready(()))
    }
    fn close(&mut self) -> Poll<(), Self::SinkError> {
        // Do this step first so that the lock is dropped *and*
        // weakref is dropped when `unpark` is called
        let task = {
            let weak = mem::replace(&mut self.inner, Weak::new());
            if let Some(ref lock) = weak.upgrade() {
                drop(weak);
                let mut inner = lock.lock().unwrap();
                inner.read_task.take()
            } else {
                None
            }
        };
        // notify on any drop of a sender, so eventually receiver wakes up
        // when there are no senders and closes the stream
        if let Some(task) = task {
            task.notify();
        }
        Ok(Async::Ready(()))
    }
}

impl<T> Drop for Sender<T> {
    fn drop(&mut self) {
        self.close().ok();
    }
}

impl<T> Stream for Receiver<T> {
    type Item = T;
    type Error = ();  // actually void
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        let result = {
            let mut inner = self.inner.lock().unwrap();
            if inner.value.is_none() {
                if Arc::weak_count(&self.inner) == 0 {
                    // no senders, terminate the stream
                    return Ok(Async::Ready(None));
                } else {
                    inner.read_task = Some(task::current());
                }
            }
            inner.value.take()
        };
        match result {
            Some(value) => Ok(Async::Ready(Some(value))),
            None => Ok(Async::NotReady),
        }
    }
}

impl<T> SendError<T> {
    /// Returns the message that was attempted to be sent but failed.
    pub fn into_inner(self) -> T {
        self.0
    }
}

/// Creates an in-memory Stream which only preserves last value
///
/// This method is somewhat similar to `channel(1)` but instead of preserving
/// first value sent (and erroring on sender side) it replaces value if
/// consumer is not fast enough and preserves last values sent on any
/// poll of a stream.
///
/// # Example
///
/// ```
/// extern crate futures;
/// extern crate async_slot;
///
/// use futures::prelude::*;
/// use futures::stream::iter_ok;
///
/// # fn main() {
/// let (tx, rx) = async_slot::channel::<i32>();
///
/// tx.send_all(iter_ok(vec![1, 2, 3])).wait();
///
/// let received = rx.collect().wait().unwrap();
/// assert_eq!(received, vec![3]);
/// # }
/// ```
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let inner = Arc::new(Mutex::new(Inner {
        value: None,
        read_task: None,
        cancel_task: None,
    }));
    return (Sender { inner: Arc::downgrade(&inner) },
            Receiver { inner: inner });
}