1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
use futures::task::AtomicWaker;
use std::{fmt::Debug, future::Future, sync::Arc, task::Poll};

use crate::queues::mpsc::{DequeueError, EnqueueError};

use super::{queue, Receiver, Sender};

/// This is the asynchronous Version of the [`Jiffy-Receiver`](Receiver)
pub struct AsyncReceiver<T> {
    /// The shared Waker to inform this receiver of any newly enqueued items
    waker: Arc<AtomicWaker>,
    /// The actual underlying Queue
    queue: Receiver<T>,
}

/// This is the asynchronous Version of the [`Jiffy-Sender`](Sender)
pub struct AsyncSender<T> {
    /// The shared Waker to wake up the Receiver if it is still waiting for
    /// an new Item to be enqueued
    waker: Arc<AtomicWaker>,
    /// The actual underlying Queue
    queue: Sender<T>,
}

impl<T> AsyncReceiver<T> {
    /// Checks if the current Queue has been closed by the Producer
    ///
    /// # Note
    /// This does not mean, that there are no more Elements in the Queue
    /// currently.
    /// It only indicates that there will be no new Elements inserted into the
    /// Queue, but there might still be a any number of Elements currently
    /// still left in the Queue.
    pub fn is_closed(&self) -> bool {
        self.queue.is_closed()
    }

    /// This attempts to dequeue the first Element in the Queue.
    ///
    /// This is the same as [`try_dequeue`](Receiver::try_dequeue) on the
    /// normal Jiffy-Queue
    pub fn try_dequeue(&mut self) -> Result<T, DequeueError> {
        // Simply attempt to dequeue the first Item
        self.queue.try_dequeue()
    }

    /// This is the asynchronous version of the blocking
    /// [`dequeue`](Receiver::dequeue) operation on the normal Jiffy-Queue
    ///
    /// # Behaviour
    /// The Future returned, will either resolve once an item is ready to be
    /// dequeued (`Ok`) or the Queue encountered a "fatal" error, like when the other
    /// side closes the Queue (`Err`)
    ///
    /// # Example
    /// ```
    /// # use nolock::queues::mpsc::jiffy;
    ///
    /// async fn demo() {
    ///   let (mut rx, tx) = jiffy::async_queue::<usize>();
    ///
    ///   tx.enqueue(13).unwrap();
    ///
    ///   assert_eq!(Ok(13), rx.dequeue().await);
    /// }
    ///
    /// # fn main() {
    /// #   let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
    /// #
    /// #   rt.block_on(demo());
    /// # }
    /// ```
    pub fn dequeue<'queue>(&'queue mut self) -> DequeueFuture<'queue, T> {
        // Return the right DequeueFuture
        DequeueFuture {
            waker: &self.waker,
            queue: &mut self.queue,
        }
    }
}

impl<T> Debug for AsyncReceiver<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "Async-Receiver ()")
    }
}

/// This is the Future returend by the [`Dequeue`](AsyncReceiver::<T>::dequeue)
/// operation on the [`AsyncReceiver`]
///
/// # Behaviour
/// The Future only resolves when it managed to successfully dequeue an Element
/// from the Queue, which will then return `Ok(data)`, or if it encountered a
/// "fatal" [`DequeueError`], like when the Queue has been closed, which will
/// then resolve to `Err(DequeueError)`.
pub struct DequeueFuture<'queue, T> {
    /// This is the Waker on which we will be notified in case the Sender will
    /// enqueue a new Item in the Queue
    waker: &'queue AtomicWaker,
    /// The actual underlying Queue from which we will dequeue the Item
    queue: &'queue mut Receiver<T>,
}

impl<'queue, T> Future for DequeueFuture<'queue, T> {
    type Output = Result<T, DequeueError>;

    fn poll(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        // Attempt to Dequeue an Item
        match self.queue.try_dequeue() {
            // If it worked, simply return Ready with the Data as the Result
            Ok(d) => Poll::Ready(Ok(d)),
            // If it did not work, update the Waker and return Pending
            Err(e) => match e {
                DequeueError::WouldBlock => {
                    // Update the shared Waker with the right Waker for the current
                    // Task
                    self.waker.register(cx.waker());

                    // Indicate the we are still waiting for data
                    Poll::Pending
                }
                DequeueError::Closed => Poll::Ready(Err(DequeueError::Closed)),
            },
        }
    }
}

impl<'queue, T> Debug for DequeueFuture<'queue, T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "Async-Dequeue-Operation ()")
    }
}

impl<T> AsyncSender<T> {
    /// Checks if the Queue has been closed by the Consumer
    pub fn is_closed(&self) -> bool {
        self.queue.is_closed()
    }

    /// Enqueues the given Data
    ///
    /// # Example:
    /// ```
    /// # use nolock::queues::mpsc::jiffy;
    ///
    /// async fn demo() {
    ///   let (mut rx, tx) = jiffy::async_queue::<usize>();
    ///
    ///   // Enqueue the given Data
    ///   assert_eq!(Ok(()), tx.enqueue(13));
    ///   
    ///   # assert_eq!(Ok(13), rx.dequeue().await);
    /// }
    ///
    /// # fn main() {
    /// #   let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
    /// #
    /// #   rt.block_on(demo());
    /// # }
    /// ```
    pub fn enqueue(&self, data: T) -> Result<(), (T, EnqueueError)> {
        // Enqueue the Data on the underlying Queue itself
        self.queue.enqueue(data)?;

        // Notify the Receiver about new Data
        self.waker.wake();
        Ok(())
    }
}

impl<T> Debug for AsyncSender<T> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "Async-Sender ()")
    }
}

/// Creates an async Jiffy-Queue Pair of ([`AsyncReceiver`], [`AsyncSender`])
pub fn async_queue<T>() -> (AsyncReceiver<T>, AsyncSender<T>) {
    let (u_rx, u_tx) = queue();
    let waker = Arc::new(AtomicWaker::new());

    (
        AsyncReceiver {
            waker: waker.clone(),
            queue: u_rx,
        },
        AsyncSender { waker, queue: u_tx },
    )
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn enqueue_dequeue() {
        let (mut rx, tx) = async_queue();

        tx.enqueue(13).unwrap();
        assert_eq!(Ok(13), rx.dequeue().await);
    }
}