batch_queue/receiver.rs
1use super::*;
2use futures::Future;
3use std::{
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8/// Error type returned from [`try_recv`](struct.Receiver.html#method.try_recv)
9#[derive(Debug, Clone, PartialEq)]
10pub enum TryRecvError {
11 Empty,
12 Closed,
13}
14
15/// The receiving end of this batching queue
16///
17/// Since this is a single-consume queue, this handle cannot be cloned or shared.
18/// Dropping this handle will eventually lead to the sender signaling that this
19/// queue has been closed. Items that were in flight will be dropped.
20pub struct Receiver<T, const N: usize> {
21 inner: Option<Arc<Inner<T, N>>>,
22}
23
24impl<T, const N: usize> Drop for Receiver<T, N> {
25 fn drop(&mut self) {
26 let inner = self.inner.take().unwrap();
27 inner.reader.waker.take();
28 let waker = inner.writer.waker.take();
29 // other side will notice by checking Arc ref count
30 drop(inner);
31 if let Some(waker) = waker {
32 // important: wake AFTER dropping the ref count
33 waker.wake();
34 }
35 }
36}
37
38unsafe impl<T, const N: usize> Send for Receiver<T, N> {}
39
40impl<T, const N: usize> Receiver<T, N> {
41 pub(crate) fn new(inner: Arc<Inner<T, N>>) -> Self {
42 Self { inner: Some(inner) }
43 }
44
45 fn inner(&self) -> &Inner<T, N> {
46 self.inner.as_ref().unwrap()
47 }
48
49 fn strong_count(&self) -> usize {
50 Arc::strong_count(self.inner.as_ref().unwrap())
51 }
52
53 /// Check if a batch is currently available and fill them into a fresh Vec
54 ///
55 /// If no batch is available it returns `TryRecvError::Empty`. If no batch will
56 /// ever become available because the sender has been dropped it returns
57 /// `TryRecvError::Closed`.
58 ///
59 /// If the next thing you’ll do is to iterate over the vector, prefer
60 /// [`try_recv`](#method.try_recv) instead to save one allocation.
61 pub fn try_recv_batch(&mut self) -> Result<Vec<T>, TryRecvError> {
62 match self.inner().do_recv() {
63 Some(read_pos) => {
64 let mut v = Vec::new();
65 v.extend(BucketIter::new(self.inner(), read_pos));
66 Ok(v)
67 }
68 None => {
69 if self.strong_count() == 1 {
70 Err(TryRecvError::Closed)
71 } else {
72 Err(TryRecvError::Empty)
73 }
74 }
75 }
76 }
77
78 /// Check if a batch is currently available and return an iterator of its items
79 ///
80 /// If no batch is available it returns `TryRecvError::Empty`. If no batch will
81 /// ever become available because the sender has been dropped it returns
82 /// `TryRecvError::Closed`.
83 ///
84 /// See [`recv`](#method.recv) for more information on the returned iterator.
85 pub fn try_recv(&mut self) -> Result<BucketIter<'_, T, N>, TryRecvError> {
86 match self.inner().do_recv() {
87 Some(read_pos) => Ok(BucketIter::new(self.inner(), read_pos)),
88 None => {
89 if self.strong_count() == 1 {
90 Err(TryRecvError::Closed)
91 } else {
92 Err(TryRecvError::Empty)
93 }
94 }
95 }
96 }
97
98 /// A Future that will wait for the next batch and return an iterator of its items
99 ///
100 /// The iterator should be consumed quickly since it borrows the queue bucket that
101 /// holds the items, meaning that the queue space is not handed back to the sender
102 /// until the iterator is dropped.
103 pub fn recv(&mut self) -> ReceiveFuture<'_, T, N> {
104 ReceiveFuture {
105 inner: self.inner.as_ref().unwrap(),
106 }
107 }
108
109 /// Wait for the next batch and fill it into a fresh Vec
110 ///
111 /// If the next thing you’ll do is to iterate over the vector, prefer
112 /// [`recv`](#method.recv) instead to save one allocation.
113 pub async fn recv_batch(&mut self) -> Result<Vec<T>, Closed> {
114 Ok(self.recv().await?.collect())
115 }
116}
117
118/// The Future returned from [`recv`](struct.Receiver.html#method.recv)
119///
120/// It will resolve once a batch becomes available or the queue is closed (by dropping the sender).
121pub struct ReceiveFuture<'a, T, const N: usize> {
122 inner: &'a Arc<Inner<T, N>>,
123}
124
125unsafe impl<'a, T, const N: usize> Send for ReceiveFuture<'a, T, N> {}
126
127impl<'a, T, const N: usize> Future for ReceiveFuture<'a, T, N> {
128 type Output = Result<BucketIter<'a, T, N>, Closed>;
129
130 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
131 match self.inner.do_recv() {
132 Some(v) => Poll::Ready(Ok(BucketIter::new(self.inner, v))),
133 None => {
134 self.inner.reader.waker.register(cx.waker());
135 if Arc::strong_count(self.inner) == 1 {
136 Poll::Ready(Err(Closed))
137 } else {
138 match self.inner.do_recv() {
139 Some(v) => {
140 // no wakeup needed anymore
141 self.inner.reader.waker.take();
142 Poll::Ready(Ok(BucketIter::new(self.inner, v)))
143 }
144 None => Poll::Pending,
145 }
146 }
147 }
148 }
149 }
150}