batch_queue/
receiver.rs

1use super::*;
2use futures::Future;
3use std::{
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8/// Error type returned from [`try_recv`](struct.Receiver.html#method.try_recv)
9#[derive(Debug, Clone, PartialEq)]
10pub enum TryRecvError {
11    Empty,
12    Closed,
13}
14
15/// The receiving end of this batching queue
16///
17/// Since this is a single-consume queue, this handle cannot be cloned or shared.
18/// Dropping this handle will eventually lead to the sender signaling that this
19/// queue has been closed. Items that were in flight will be dropped.
20pub struct Receiver<T, const N: usize> {
21    inner: Option<Arc<Inner<T, N>>>,
22}
23
24impl<T, const N: usize> Drop for Receiver<T, N> {
25    fn drop(&mut self) {
26        let inner = self.inner.take().unwrap();
27        inner.reader.waker.take();
28        let waker = inner.writer.waker.take();
29        // other side will notice by checking Arc ref count
30        drop(inner);
31        if let Some(waker) = waker {
32            // important: wake AFTER dropping the ref count
33            waker.wake();
34        }
35    }
36}
37
38unsafe impl<T, const N: usize> Send for Receiver<T, N> {}
39
40impl<T, const N: usize> Receiver<T, N> {
41    pub(crate) fn new(inner: Arc<Inner<T, N>>) -> Self {
42        Self { inner: Some(inner) }
43    }
44
45    fn inner(&self) -> &Inner<T, N> {
46        self.inner.as_ref().unwrap()
47    }
48
49    fn strong_count(&self) -> usize {
50        Arc::strong_count(self.inner.as_ref().unwrap())
51    }
52
53    /// Check if a batch is currently available and fill them into a fresh Vec
54    ///
55    /// If no batch is available it returns `TryRecvError::Empty`. If no batch will
56    /// ever become available because the sender has been dropped it returns
57    /// `TryRecvError::Closed`.
58    ///
59    /// If the next thing you’ll do is to iterate over the vector, prefer
60    /// [`try_recv`](#method.try_recv) instead to save one allocation.
61    pub fn try_recv_batch(&mut self) -> Result<Vec<T>, TryRecvError> {
62        match self.inner().do_recv() {
63            Some(read_pos) => {
64                let mut v = Vec::new();
65                v.extend(BucketIter::new(self.inner(), read_pos));
66                Ok(v)
67            }
68            None => {
69                if self.strong_count() == 1 {
70                    Err(TryRecvError::Closed)
71                } else {
72                    Err(TryRecvError::Empty)
73                }
74            }
75        }
76    }
77
78    /// Check if a batch is currently available and return an iterator of its items
79    ///
80    /// If no batch is available it returns `TryRecvError::Empty`. If no batch will
81    /// ever become available because the sender has been dropped it returns
82    /// `TryRecvError::Closed`.
83    ///
84    /// See [`recv`](#method.recv) for more information on the returned iterator.
85    pub fn try_recv(&mut self) -> Result<BucketIter<'_, T, N>, TryRecvError> {
86        match self.inner().do_recv() {
87            Some(read_pos) => Ok(BucketIter::new(self.inner(), read_pos)),
88            None => {
89                if self.strong_count() == 1 {
90                    Err(TryRecvError::Closed)
91                } else {
92                    Err(TryRecvError::Empty)
93                }
94            }
95        }
96    }
97
98    /// A Future that will wait for the next batch and return an iterator of its items
99    ///
100    /// The iterator should be consumed quickly since it borrows the queue bucket that
101    /// holds the items, meaning that the queue space is not handed back to the sender
102    /// until the iterator is dropped.
103    pub fn recv(&mut self) -> ReceiveFuture<'_, T, N> {
104        ReceiveFuture {
105            inner: self.inner.as_ref().unwrap(),
106        }
107    }
108
109    /// Wait for the next batch and fill it into a fresh Vec
110    ///
111    /// If the next thing you’ll do is to iterate over the vector, prefer
112    /// [`recv`](#method.recv) instead to save one allocation.
113    pub async fn recv_batch(&mut self) -> Result<Vec<T>, Closed> {
114        Ok(self.recv().await?.collect())
115    }
116}
117
118/// The Future returned from [`recv`](struct.Receiver.html#method.recv)
119///
120/// It will resolve once a batch becomes available or the queue is closed (by dropping the sender).
121pub struct ReceiveFuture<'a, T, const N: usize> {
122    inner: &'a Arc<Inner<T, N>>,
123}
124
125unsafe impl<'a, T, const N: usize> Send for ReceiveFuture<'a, T, N> {}
126
127impl<'a, T, const N: usize> Future for ReceiveFuture<'a, T, N> {
128    type Output = Result<BucketIter<'a, T, N>, Closed>;
129
130    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
131        match self.inner.do_recv() {
132            Some(v) => Poll::Ready(Ok(BucketIter::new(self.inner, v))),
133            None => {
134                self.inner.reader.waker.register(cx.waker());
135                if Arc::strong_count(self.inner) == 1 {
136                    Poll::Ready(Err(Closed))
137                } else {
138                    match self.inner.do_recv() {
139                        Some(v) => {
140                            // no wakeup needed anymore
141                            self.inner.reader.waker.take();
142                            Poll::Ready(Ok(BucketIter::new(self.inner, v)))
143                        }
144                        None => Poll::Pending,
145                    }
146                }
147            }
148        }
149    }
150}