1use std::{
2 future::Future,
3 mem::ManuallyDrop,
4 num::NonZeroUsize,
5 ops::{Deref, DerefMut},
6 pin::Pin,
7 sync::atomic::Ordering,
8 sync::Arc,
9 task::{Context, Poll}
10};
11
12#[repr(transparent)]
14pub struct Puller<I>(pub(crate) Arc<super::Shared<I>>);
15
16use super::StaleErr;
17
18#[derive(Default)]
19enum DropAction {
20 #[default]
21 ReturnToQueue,
22 Drop,
23 Nothing
24}
25
26pub struct MustHandle<T> {
28 sh: Arc<super::Shared<T>>,
29 inner: ManuallyDrop<T>,
30 drop_action: DropAction
31}
32
33impl<T> MustHandle<T> {
34 fn new(sh: Arc<super::Shared<T>>, inner: T) -> Self {
35 Self {
36 sh,
37 inner: ManuallyDrop::new(inner),
38 drop_action: DropAction::default()
39 }
40 }
41
42 pub fn handled(mut self) {
44 self.drop_action = DropAction::Drop;
45 }
46
47 pub fn into_inner(mut self) -> T {
49 self.drop_action = DropAction::Nothing;
50 unsafe { ManuallyDrop::take(&mut self.inner) }
51 }
52}
53
54impl<T> Deref for MustHandle<T> {
55 type Target = T;
56
57 fn deref(&self) -> &T {
58 &self.inner
59 }
60}
61
62impl<T> DerefMut for MustHandle<T> {
63 fn deref_mut(&mut self) -> &mut T {
64 &mut self.inner
65 }
66}
67
68impl<T> Drop for MustHandle<T> {
69 fn drop(&mut self) {
70 match self.drop_action {
71 DropAction::ReturnToQueue => {
72 let t = unsafe { ManuallyDrop::take(&mut self.inner) };
73 let mut inner = self.sh.inner.lock();
74 inner.q.push_front(t);
75 }
76 DropAction::Drop => unsafe { ManuallyDrop::drop(&mut self.inner) },
77 DropAction::Nothing => {}
78 }
79 }
80}
81
82
83impl<I> Puller<I> {
84 #[cfg_attr(feature = "inline-more", inline)]
93 pub fn pop(&self) -> Result<I, StaleErr> {
94 let mut inner = self.0.inner.lock();
95 loop {
96 if inner.q.is_empty() && inner.npushers == 0 {
97 break Err(StaleErr);
98 }
99 match inner.q.pop_front() {
100 Some(node) => {
101 break Ok(node);
102 }
103 None => {
104 self.0.signal.wait(&mut inner);
105 }
106 }
107 }
108 }
109
110 pub fn pop_managed(&self) -> Result<MustHandle<I>, StaleErr> {
117 let n = self.pop()?;
118 Ok(MustHandle::new(Arc::clone(&self.0), n))
119 }
120
121 #[cfg_attr(feature = "inline-more", inline)]
132 #[allow(clippy::option_if_let_else)]
133 pub fn try_pop(&self) -> Result<Option<I>, StaleErr> {
134 let mut inner = self.0.inner.lock();
135 if let Some(n) = inner.q.pop_front() {
136 Ok(Some(n))
137 } else if inner.npushers == 0 {
138 Err(StaleErr)
139 } else {
140 Ok(None)
141 }
142 }
143
144 pub fn try_pop_managed(&self) -> Result<Option<MustHandle<I>>, StaleErr> {
156 Ok(
157 self
158 .try_pop()?
159 .map(|n| MustHandle::new(Arc::clone(&self.0), n))
160 )
161 }
162
163 #[cfg_attr(feature = "inline-more", inline)]
178 #[must_use]
179 pub fn apop(&self) -> PopFuture<I> {
180 PopFuture {
181 ctx: Arc::clone(&self.0),
182 id: None
183 }
184 }
185
186 #[cfg_attr(feature = "inline-more", inline)]
190 #[must_use]
191 pub fn apop_managed(&self) -> PopManagedFuture<I> {
192 PopManagedFuture {
193 ctx: Arc::clone(&self.0),
194 id: None
195 }
196 }
197
198 #[cfg_attr(feature = "inline-more", inline)]
203 #[must_use]
204 pub fn was_empty(&self) -> bool {
205 let inner = self.0.inner.lock();
206 inner.q.is_empty()
207 }
208}
209
210impl<I> Drop for Puller<I> {
211 fn drop(&mut self) {
217 let mut inner = self.0.inner.lock();
218 inner.npullers -= 1;
219
220 if inner.npullers == 0 {
224 inner.q.clear();
225 }
226 }
227}
228
229
230#[doc(hidden)]
231pub struct PopFuture<I> {
232 ctx: Arc<super::Shared<I>>,
233 id: Option<NonZeroUsize>
234}
235
236impl<I: 'static + Send> Future for PopFuture<I> {
237 type Output = Result<I, StaleErr>;
238 fn poll(
239 mut self: Pin<&mut Self>,
240 ctx: &mut Context<'_>
241 ) -> Poll<Self::Output> {
242 let mut inner = self.ctx.inner.lock();
243 match inner.q.pop_front() {
244 Some(node) => Poll::Ready(Ok(node)),
245 None => {
246 if inner.q.is_empty() && inner.npushers == 0 {
247 Poll::Ready(Err(StaleErr))
249 } else {
250 let id = loop {
252 let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst);
253 if id == 0 || inner.wakers.contains_key(&id) {
255 continue;
256 }
257 break id;
258 };
259 inner.wakers.insert(id, ctx.waker().clone());
260 drop(inner);
261 self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
262 Poll::Pending
263 }
264 }
265 }
266 }
267}
268
269impl<I> Drop for PopFuture<I> {
270 fn drop(&mut self) {
271 if let Some(id) = self.id {
272 let mut inner = self.ctx.inner.lock();
273 let _ = inner.wakers.swap_remove(&id.get());
275 }
276 }
277}
278
279
280#[doc(hidden)]
281pub struct PopManagedFuture<I> {
282 ctx: Arc<super::Shared<I>>,
283 id: Option<NonZeroUsize>
284}
285
286impl<I: 'static + Send> Future for PopManagedFuture<I> {
287 type Output = Result<MustHandle<I>, StaleErr>;
288 fn poll(
289 mut self: Pin<&mut Self>,
290 ctx: &mut Context<'_>
291 ) -> Poll<Self::Output> {
292 let mut inner = self.ctx.inner.lock();
293 match inner.q.pop_front() {
294 Some(node) => {
295 Poll::Ready(Ok(MustHandle::new(Arc::clone(&self.ctx), node)))
296 }
297 None => {
298 if inner.q.is_empty() && inner.npushers == 0 {
299 Poll::Ready(Err(StaleErr))
301 } else {
302 let id = loop {
304 let id = self.ctx.idgen.fetch_add(1, Ordering::SeqCst);
305 if id == 0 || inner.wakers.contains_key(&id) {
307 continue;
308 }
309 break id;
310 };
311 inner.wakers.insert(id, ctx.waker().clone());
312 drop(inner);
313 self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
314 Poll::Pending
315 }
316 }
317 }
318 }
319}
320
321impl<I> Drop for PopManagedFuture<I> {
322 fn drop(&mut self) {
323 if let Some(id) = self.id {
324 let mut inner = self.ctx.inner.lock();
325 let _ = inner.wakers.swap_remove(&id.get());
327 }
328 }
329}
330
331