1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
use futures::task::AtomicWaker; use std::{fmt::Debug, future::Future, sync::Arc, task::Poll}; use crate::queues::mpsc::{DequeueError, EnqueueError}; use super::{queue, Receiver, Sender}; /// This is the asynchronous Version of the [`Jiffy-Receiver`](Receiver) pub struct AsyncReceiver<T> { /// The shared Waker to inform this receiver of any newly enqueued items waker: Arc<AtomicWaker>, /// The actual underlying Queue queue: Receiver<T>, } /// This is the asynchronous Version of the [`Jiffy-Sender`](Sender) pub struct AsyncSender<T> { /// The shared Waker to wake up the Receiver if it is still waiting for /// an new Item to be enqueued waker: Arc<AtomicWaker>, /// The actual underlying Queue queue: Sender<T>, } impl<T> AsyncReceiver<T> { /// Checks if the current Queue has been closed by the Producer /// /// # Note /// This does not mean, that there are no more Elements in the Queue /// currently. /// It only indicates that there will be no new Elements inserted into the /// Queue, but there might still be a any number of Elements currently /// still left in the Queue. pub fn is_closed(&self) -> bool { self.queue.is_closed() } /// This attempts to dequeue the first Element in the Queue. /// /// This is the same as [`try_dequeue`](Receiver::try_dequeue) on the /// normal Jiffy-Queue pub fn try_dequeue(&mut self) -> Result<T, DequeueError> { // Simply attempt to dequeue the first Item self.queue.try_dequeue() } /// This is the asynchronous version of the blocking /// [`dequeue`](Receiver::dequeue) operation on the normal Jiffy-Queue /// /// # Behaviour /// The Future returned, will either resolve once an item is ready to be /// dequeued (`Ok`) or the Queue encountered a "fatal" error, like when the other /// side closes the Queue (`Err`) /// /// # Example /// ``` /// # use nolock::queues::mpsc::jiffy; /// /// async fn demo() { /// let (mut rx, tx) = jiffy::async_queue::<usize>(); /// /// tx.enqueue(13).unwrap(); /// /// assert_eq!(Ok(13), rx.dequeue().await); /// } /// /// # fn main() { /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); /// # /// # rt.block_on(demo()); /// # } /// ``` pub fn dequeue<'queue>(&'queue mut self) -> DequeueFuture<'queue, T> { // Return the right DequeueFuture DequeueFuture { waker: &self.waker, queue: &mut self.queue, } } } impl<T> Debug for AsyncReceiver<T> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Async-Receiver ()") } } /// This is the Future returend by the [`Dequeue`](AsyncReceiver::<T>::dequeue) /// operation on the [`AsyncReceiver`] /// /// # Behaviour /// The Future only resolves when it managed to successfully dequeue an Element /// from the Queue, which will then return `Ok(data)`, or if it encountered a /// "fatal" [`DequeueError`], like when the Queue has been closed, which will /// then resolve to `Err(DequeueError)`. pub struct DequeueFuture<'queue, T> { /// This is the Waker on which we will be notified in case the Sender will /// enqueue a new Item in the Queue waker: &'queue AtomicWaker, /// The actual underlying Queue from which we will dequeue the Item queue: &'queue mut Receiver<T>, } impl<'queue, T> Future for DequeueFuture<'queue, T> { type Output = Result<T, DequeueError>; fn poll( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Self::Output> { // Attempt to Dequeue an Item match self.queue.try_dequeue() { // If it worked, simply return Ready with the Data as the Result Ok(d) => Poll::Ready(Ok(d)), // If it did not work, update the Waker and return Pending Err(e) => match e { DequeueError::WouldBlock => { // Update the shared Waker with the right Waker for the current // Task self.waker.register(cx.waker()); // Indicate the we are still waiting for data Poll::Pending } DequeueError::Closed => Poll::Ready(Err(DequeueError::Closed)), }, } } } impl<'queue, T> Debug for DequeueFuture<'queue, T> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Async-Dequeue-Operation ()") } } impl<T> AsyncSender<T> { /// Checks if the Queue has been closed by the Consumer pub fn is_closed(&self) -> bool { self.queue.is_closed() } /// Enqueues the given Data /// /// # Example: /// ``` /// # use nolock::queues::mpsc::jiffy; /// /// async fn demo() { /// let (mut rx, tx) = jiffy::async_queue::<usize>(); /// /// // Enqueue the given Data /// assert_eq!(Ok(()), tx.enqueue(13)); /// /// # assert_eq!(Ok(13), rx.dequeue().await); /// } /// /// # fn main() { /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap(); /// # /// # rt.block_on(demo()); /// # } /// ``` pub fn enqueue(&self, data: T) -> Result<(), (T, EnqueueError)> { // Enqueue the Data on the underlying Queue itself self.queue.enqueue(data)?; // Notify the Receiver about new Data self.waker.wake(); Ok(()) } } impl<T> Debug for AsyncSender<T> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Async-Sender ()") } } /// Creates an async Jiffy-Queue Pair of ([`AsyncReceiver`], [`AsyncSender`]) pub fn async_queue<T>() -> (AsyncReceiver<T>, AsyncSender<T>) { let (u_rx, u_tx) = queue(); let waker = Arc::new(AtomicWaker::new()); ( AsyncReceiver { waker: waker.clone(), queue: u_rx, }, AsyncSender { waker, queue: u_tx }, ) } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn enqueue_dequeue() { let (mut rx, tx) = async_queue(); tx.enqueue(13).unwrap(); assert_eq!(Ok(13), rx.dequeue().await); } }