channel-async 0.3.0-deprecated

Async/Stream Extensions for crossbeam-channel
use futures::Stream;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::timer::Delay;

pub struct Receiver<T> {
    inner: crossbeam_channel::Receiver<T>,
    delay: Duration,
    pending: Option<Delay>,
}

impl<T> Receiver<T> {
    pub fn new(r: crossbeam_channel::Receiver<T>, delay: Duration) -> Receiver<T> {
        Receiver {
            inner: r,
            delay: delay,
            pending: None,
        }
    }

    pub fn into_inner(self) -> crossbeam_channel::Receiver<T> {
        self.inner
    }
}

impl<T: Send + 'static> Stream for Receiver<T> {
    type Item = T;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let Self {
            inner,
            delay,
            pending,
        } = unsafe { self.get_unchecked_mut() };
        loop {
            match pending {
                None => match inner.try_recv() {
                    Err(crossbeam_channel::TryRecvError::Disconnected) => return Poll::Ready(None),
                    Err(crossbeam_channel::TryRecvError::Empty) => {
                        *pending = Some(tokio::timer::delay_for(*delay));
                    }
                    Ok(v) => return Poll::Ready(Some(v)),
                },
                Some(pending_value) => {
                    let pin_pending = unsafe { Pin::new_unchecked(pending_value) };
                    futures::ready!(pin_pending.poll(cx));
                    *pending = None;
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Duration;

    fn assert_send<T: Send>() {}
    fn assert_sync<T: Sync>() {}

    #[test]
    fn assert_contracts() {
        let (_, _r) = crate::unbounded::<i32>(Duration::from_millis(100));

        assert_send::<Receiver<i32>>();
        assert_sync::<Receiver<i32>>();
    }
}