sigq/
pull.rs

1use std::{
2  future::Future,
3  mem::ManuallyDrop,
4  num::NonZeroUsize,
5  ops::{Deref, DerefMut},
6  pin::Pin,
7  sync::atomic::Ordering,
8  sync::Arc,
9  task::{Context, Poll}
10};
11
12/// The receiving end-point of queue.
13#[repr(transparent)]
14pub struct Puller<I>(pub(crate) Arc<super::Shared<I>>);
15
16use super::StaleErr;
17
18#[derive(Default)]
19enum DropAction {
20  #[default]
21  ReturnToQueue,
22  Drop,
23  Nothing
24}
25
26/// Wrapper around elements that must be handled by the application.
27pub struct MustHandle<T> {
28  sh: Arc<super::Shared<T>>,
29  inner: ManuallyDrop<T>,
30  drop_action: DropAction
31}
32
33impl<T> MustHandle<T> {
34  fn new(sh: Arc<super::Shared<T>>, inner: T) -> Self {
35    Self {
36      sh,
37      inner: ManuallyDrop::new(inner),
38      drop_action: DropAction::default()
39    }
40  }
41
42  /// Mark the inner object has handled and then drop it.
43  pub fn handled(mut self) {
44    self.drop_action = DropAction::Drop;
45  }
46
47  /// Remove the inner object from the `MustHandle` and return it.
48  pub fn into_inner(mut self) -> T {
49    self.drop_action = DropAction::Nothing;
50    unsafe { ManuallyDrop::take(&mut self.inner) }
51  }
52}
53
54impl<T> Deref for MustHandle<T> {
55  type Target = T;
56
57  fn deref(&self) -> &T {
58    &self.inner
59  }
60}
61
62impl<T> DerefMut for MustHandle<T> {
63  fn deref_mut(&mut self) -> &mut T {
64    &mut self.inner
65  }
66}
67
68impl<T> Drop for MustHandle<T> {
69  fn drop(&mut self) {
70    match self.drop_action {
71      DropAction::ReturnToQueue => {
72        let t = unsafe { ManuallyDrop::take(&mut self.inner) };
73        let mut inner = self.sh.inner.lock();
74        inner.q.push_front(t);
75      }
76      DropAction::Drop => unsafe { ManuallyDrop::drop(&mut self.inner) },
77      DropAction::Nothing => {}
78    }
79  }
80}
81
82
83impl<I> Puller<I> {
84  /// Pull the oldest node off the queue and return it.
85  ///
86  /// If no nodes are available on the queue, then block and wait for one to
87  /// become available.
88  ///
89  /// # Errors
90  /// `StaleErr` means there are no more items in queue and there are no more
91  /// [`Pusher`](super::Pusher) objects associated with this `Puller`.
92  #[cfg_attr(feature = "inline-more", inline)]
93  pub fn pop(&self) -> Result<I, StaleErr> {
94    let mut inner = self.0.inner.lock();
95    loop {
96      if inner.q.is_empty() && inner.npushers == 0 {
97        break Err(StaleErr);
98      }
99      match inner.q.pop_front() {
100        Some(node) => {
101          break Ok(node);
102        }
103        None => {
104          self.0.signal.wait(&mut inner);
105        }
106      }
107    }
108  }
109
110  /// Take an element off the queue that must be handled by the application, or
111  /// it will be returned to the queue.
112  ///
113  /// # Errors
114  /// `StaleErr` means there are no more items in queue and there are no more
115  /// [`Pusher`](super::Pusher) objects associated with this `Puller`.
116  pub fn pop_managed(&self) -> Result<MustHandle<I>, StaleErr> {
117    let n = self.pop()?;
118    Ok(MustHandle::new(Arc::clone(&self.0), n))
119  }
120
121  /// Pull the oldest node off the queue and return it.
122  ///
123  /// If a node is available on the queue then take it off and return it.
124  ///
125  /// If no nodes are available and there's at least one associated `Pusher`
126  /// exists then return `Ok(None)`.
127  ///
128  /// # Errors
129  /// `StaleErr` is returned if no nodes are available and there are no more
130  /// [`Pusher`](super::Pusher) objects associated with this `Puller`.
131  #[cfg_attr(feature = "inline-more", inline)]
132  #[allow(clippy::option_if_let_else)]
133  pub fn try_pop(&self) -> Result<Option<I>, StaleErr> {
134    let mut inner = self.0.inner.lock();
135    if let Some(n) = inner.q.pop_front() {
136      Ok(Some(n))
137    } else if inner.npushers == 0 {
138      Err(StaleErr)
139    } else {
140      Ok(None)
141    }
142  }
143
144  /// Take an element off the queue that must be handled by the application, or
145  /// it will be returned to the queue.
146  ///
147  /// If a node is available on the queue then take it off and return it.
148  ///
149  /// If no nodes are available and there's at least one associated `Pusher`
150  /// exists then return `Ok(None)`.
151  ///
152  /// # Errors
153  /// `StaleErr` is returned if no nodes are available and there are no more
154  /// [`Pusher`](super::Pusher) objects associated with this `Puller`.
155  pub fn try_pop_managed(&self) -> Result<Option<MustHandle<I>>, StaleErr> {
156    Ok(
157      self
158        .try_pop()?
159        .map(|n| MustHandle::new(Arc::clone(&self.0), n))
160    )
161  }
162
163  /// This method serves the same purpose as the [`pop()`](#method.pop) method,
164  /// but rather than block it returns a `Future` to be used to wait for a node
165  /// to arrive in an `async` context.
166  ///
167  /// ```
168  /// async fn test() {
169  ///   let (tx, rx) = sigq::new();
170  ///   tx.push("hello");
171  ///   assert_eq!(rx.was_empty(), false);
172  ///   let node = rx.apop().await.unwrap();
173  ///   assert_eq!(node, "hello");
174  ///   assert_eq!(rx.was_empty(), true);
175  /// }
176  /// ```
177  #[cfg_attr(feature = "inline-more", inline)]
178  #[must_use]
179  pub fn apop(&self) -> PopFuture<I> {
180    PopFuture {
181      ctx: Arc::clone(&self.0),
182      id: None
183    }
184  }
185
186  /// This method serves the same purpose as the [`pop()`](#method.pop) method,
187  /// but rather than block it returns a `Future` to be used to wait for a node
188  /// to arrive in an `async` context.
189  #[cfg_attr(feature = "inline-more", inline)]
190  #[must_use]
191  pub fn apop_managed(&self) -> PopManagedFuture<I> {
192    PopManagedFuture {
193      ctx: Arc::clone(&self.0),
194      id: None
195    }
196  }
197
198  /// Returns a boolean indicating whether the queue was empty or not.
199  ///
200  /// This function is not particularly useful.  If you don't understand why,
201  /// then please don't use it.
202  #[cfg_attr(feature = "inline-more", inline)]
203  #[must_use]
204  pub fn was_empty(&self) -> bool {
205    let inner = self.0.inner.lock();
206    inner.q.is_empty()
207  }
208}
209
210impl<I> Drop for Puller<I> {
211  /// Drop a `Puller` instance.
212  ///
213  /// If this is the last `Puller` end-point of a sigq instance, then the inner
214  /// queue will be cleared (i.e. all its elements will be immediately
215  /// dropped).
216  fn drop(&mut self) {
217    let mut inner = self.0.inner.lock();
218    inner.npullers -= 1;
219
220    // If this is the last puller then remove all thr nodes.
221    // The nodes may contain some kind of context that must be notified that
222    // the node will never reach its intended destination.
223    if inner.npullers == 0 {
224      inner.q.clear();
225    }
226  }
227}
228
229
230#[doc(hidden)]
231pub struct PopFuture<I> {
232  ctx: Arc<super::Shared<I>>,
233  id: Option<NonZeroUsize>
234}
235
236impl<I: 'static + Send> Future for PopFuture<I> {
237  type Output = Result<I, StaleErr>;
238  fn poll(
239    mut self: Pin<&mut Self>,
240    ctx: &mut Context<'_>
241  ) -> Poll<Self::Output> {
242    let mut inner = self.ctx.inner.lock();
243    match inner.q.pop_front() {
244      Some(node) => Poll::Ready(Ok(node)),
245      None => {
246        if inner.q.is_empty() && inner.npushers == 0 {
247          // No more nodes and no more pushers, so return None
248          Poll::Ready(Err(StaleErr))
249        } else {
250          // Generate a unique identifier for this waker
251          let id = loop {
252            let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst);
253            // Make sure it is non-zero and unique
254            if id == 0 || inner.wakers.contains_key(&id) {
255              continue;
256            }
257            break id;
258          };
259          inner.wakers.insert(id, ctx.waker().clone());
260          drop(inner);
261          self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
262          Poll::Pending
263        }
264      }
265    }
266  }
267}
268
269impl<I> Drop for PopFuture<I> {
270  fn drop(&mut self) {
271    if let Some(id) = self.id {
272      let mut inner = self.ctx.inner.lock();
273      // Remove this future's waker
274      let _ = inner.wakers.swap_remove(&id.get());
275    }
276  }
277}
278
279
280#[doc(hidden)]
281pub struct PopManagedFuture<I> {
282  ctx: Arc<super::Shared<I>>,
283  id: Option<NonZeroUsize>
284}
285
286impl<I: 'static + Send> Future for PopManagedFuture<I> {
287  type Output = Result<MustHandle<I>, StaleErr>;
288  fn poll(
289    mut self: Pin<&mut Self>,
290    ctx: &mut Context<'_>
291  ) -> Poll<Self::Output> {
292    let mut inner = self.ctx.inner.lock();
293    match inner.q.pop_front() {
294      Some(node) => {
295        Poll::Ready(Ok(MustHandle::new(Arc::clone(&self.ctx), node)))
296      }
297      None => {
298        if inner.q.is_empty() && inner.npushers == 0 {
299          // No more nodes and no more pushers, so return None
300          Poll::Ready(Err(StaleErr))
301        } else {
302          // Generate a unique identifier for this waker
303          let id = loop {
304            let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst);
305            // Make sure it is non-zero and unique
306            if id == 0 || inner.wakers.contains_key(&id) {
307              continue;
308            }
309            break id;
310          };
311          inner.wakers.insert(id, ctx.waker().clone());
312          drop(inner);
313          self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
314          Poll::Pending
315        }
316      }
317    }
318  }
319}
320
321impl<I> Drop for PopManagedFuture<I> {
322  fn drop(&mut self) {
323    if let Some(id) = self.id {
324      let mut inner = self.ctx.inner.lock();
325      // Remove this future's waker
326      let _ = inner.wakers.swap_remove(&id.get());
327    }
328  }
329}
330
331// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :