1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use std::{fmt::Debug, future::Future, sync::Arc, task::Poll};

use futures::task::AtomicWaker;

use crate::queues::spsc::{DequeueError, EnqueueError};

use super::{queue, UnboundedReceiver, UnboundedSender};

/// This is the async Variant of the [`UnboundedSender`].
///
/// Created using [`async_queue`]
pub struct AsyncUnboundedSender<T> {
    rx_waker: Arc<AtomicWaker>,
    queue: UnboundedSender<T>,
}

/// This is the async Variant of the [`UnboundedReceiver`].
///
/// Created using [`async_queue`]
pub struct AsyncUnboundedReceiver<T> {
    rx_waker: Arc<AtomicWaker>,
    queue: UnboundedReceiver<T>,
}

/// The Future returned by the
/// [`dequeue`](AsyncUnboundedReceiver::dequeue)-Operation
///
/// # Behaviour
/// This Future only resolves when it either successfully dequeued an Element
/// and then returns it as `Ok(Element)` or when the Queue has been closed by
/// the Producer and the Queue is currently empty, therefore no more Elements
/// will be added to the Queue meaning that we would wait forever, instead it
/// returns `Err(DequeueError)`
pub struct DequeueFuture<'queue, T> {
    rx_waker: &'queue AtomicWaker,
    queue: &'queue mut UnboundedReceiver<T>,
}

impl<T> AsyncUnboundedSender<T> {
    /// Checks if the Queue has been closed by the Consumer
    pub fn is_closed(&self) -> bool {
        self.queue.is_closed()
    }

    /// Enqueues the given Data on the Queue
    pub fn enqueue(&mut self, data: T) -> Result<(), (T, EnqueueError)> {
        self.queue.enqueue(data)?;
        self.rx_waker.wake();
        Ok(())
    }
}

impl<T> Debug for AsyncUnboundedSender<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "Async-Unbounded-Sender ()")
    }
}

impl<T> AsyncUnboundedReceiver<T> {
    /// Checks if the Queue has been closed by the Producer
    pub fn is_closed(&self) -> bool {
        self.queue.is_closed()
    }

    /// Dequeues the next Item from the Queue
    pub fn dequeue<'queue>(&'queue mut self) -> DequeueFuture<'queue, T> {
        DequeueFuture {
            rx_waker: &self.rx_waker,
            queue: &mut self.queue,
        }
    }

    /// This attempts to dequeue the next Item from the Queue.
    ///
    /// This behaves just like the normal
    /// [`try_dequeue`](UnboundedReceiver::try_dequeue)-Operation
    pub fn try_dequeue(&mut self) -> Result<T, DequeueError> {
        self.queue.try_dequeue()
    }
}

impl<T> Debug for AsyncUnboundedReceiver<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "Async-Unbounded-Receiver ()")
    }
}

impl<'queue, T> Future for DequeueFuture<'queue, T> {
    type Output = Result<T, DequeueError>;

    fn poll(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        match self.queue.try_dequeue() {
            Ok(d) => Poll::Ready(Ok(d)),
            Err(e) => match e {
                DequeueError::WouldBlock => {
                    self.rx_waker.register(cx.waker());
                    Poll::Pending
                }
                DequeueError::Closed => Poll::Ready(Err(DequeueError::Closed)),
            },
        }
    }
}

impl<'queue, T> Debug for DequeueFuture<'queue, T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "Dequeue-Future ()")
    }
}

/// Creates a new async SPSC-Queue and returns its respective
/// ([`AsyncUnboundedReceiver`], [`AsyncUnboundedSender`])
pub fn async_queue<T>() -> (AsyncUnboundedReceiver<T>, AsyncUnboundedSender<T>) {
    let (u_rx, u_tx) = queue();

    let rx_waker = Arc::new(AtomicWaker::new());

    (
        AsyncUnboundedReceiver {
            rx_waker: rx_waker.clone(),
            queue: u_rx,
        },
        AsyncUnboundedSender {
            rx_waker,
            queue: u_tx,
        },
    )
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn enqueue_dequeue() {
        let (mut rx, mut tx) = async_queue();

        tx.enqueue(13).unwrap();
        assert_eq!(Ok(13), rx.dequeue().await);
    }
}