mem_ring/
queue.rs

1use std::{
2    collections::VecDeque,
3    future::Future,
4    io,
5    mem::{self, MaybeUninit},
6    os::fd::RawFd,
7    sync::atomic::{AtomicU32, AtomicUsize, Ordering},
8    task::Waker,
9};
10
11#[cfg(not(all(feature = "monoio", feature = "tpc")))]
12use parking_lot::Mutex;
13#[cfg(not(all(feature = "monoio", feature = "tpc")))]
14use std::sync::Arc;
15
16#[cfg(all(feature = "monoio", feature = "tpc"))]
17use std::{cell::UnsafeCell, rc::Rc};
18
19#[cfg(feature = "monoio")]
20use local_sync::oneshot::{channel, Receiver, Sender};
21
22#[cfg(all(feature = "tokio", not(feature = "monoio")))]
23use tokio::sync::oneshot::{channel, Receiver, Sender};
24
25#[cfg(feature = "monoio")]
26use monoio::{select, spawn};
27#[cfg(all(feature = "tokio", not(feature = "monoio")))]
28use tokio::{select, spawn};
29
30use crate::{
31    eventfd::{new_pair, Awaiter, Notifier},
32    util::yield_now,
33};
34
35pub struct Guard {
36    _rx: Receiver<()>,
37}
38
39pub struct ReadQueue<T> {
40    queue: Queue<T>,
41    unstuck_notifier: Notifier,
42    #[cfg(all(feature = "tokio", not(feature = "monoio")))]
43    tokio_handle: Option<tokio::runtime::Handle>,
44}
45
46impl<T> ReadQueue<T> {
47    #[inline]
48    pub fn meta(&self) -> QueueMeta {
49        self.queue.meta()
50    }
51
52    pub fn pop(&mut self) -> Option<T> {
53        let maybe_item = self.queue.pop();
54        if self.queue.stuck() {
55            self.queue.mark_unstuck();
56            self.unstuck_notifier.notify().ok();
57        }
58        maybe_item
59    }
60
61    #[cfg(feature = "monoio")]
62    pub fn run_handler(self, handler: impl FnMut(T) + 'static) -> Result<Guard, io::Error>
63    where
64        T: 'static,
65    {
66        let working_awaiter = unsafe { Awaiter::from_raw_fd(self.queue.working_fd)? };
67        let (tx, rx) = channel();
68        spawn(self.working_handler(working_awaiter, handler, tx));
69        Ok(Guard { _rx: rx })
70    }
71
72    #[cfg(all(feature = "tokio", not(feature = "monoio")))]
73    pub fn run_handler(self, handler: impl FnMut(T) + Send + 'static) -> Result<Guard, io::Error>
74    where
75        T: Send + 'static,
76    {
77        let working_awaiter = unsafe { Awaiter::from_raw_fd(self.queue.working_fd)? };
78        let (tx, rx) = channel();
79        if let Some(tokio_handle) = self.tokio_handle.clone() {
80            tokio_handle.spawn(self.working_handler(working_awaiter, handler, tx));
81        } else {
82            spawn(self.working_handler(working_awaiter, handler, tx));
83        }
84        Ok(Guard { _rx: rx })
85    }
86
87    #[cfg(feature = "monoio")]
88    async fn working_handler(
89        mut self,
90        mut working_awaiter: Awaiter,
91        mut handler: impl FnMut(T),
92        mut tx: Sender<()>,
93    ) {
94        const YIELD_CNT: u8 = 3;
95        let mut exit = std::pin::pin!(tx.closed());
96        self.queue.mark_working();
97
98        'p: loop {
99            while let Some(item) = self.pop() {
100                handler(item);
101            }
102
103            for _ in 0..YIELD_CNT {
104                yield_now().await;
105                if !self.queue.is_empty() {
106                    continue 'p;
107                }
108            }
109
110            if !self.queue.mark_unworking() {
111                continue;
112            }
113
114            select! {
115                _ = working_awaiter.wait() => (),
116                _ = &mut exit => {
117                    return;
118                }
119            }
120            self.queue.mark_working();
121        }
122    }
123
124    #[cfg(all(feature = "tokio", not(feature = "monoio")))]
125    async fn working_handler(
126        mut self,
127        mut working_awaiter: Awaiter,
128        mut handler: impl FnMut(T) + Send,
129        mut tx: Sender<()>,
130    ) where
131        T: Send,
132    {
133        const YIELD_CNT: u8 = 3;
134        let mut exit = std::pin::pin!(tx.closed());
135        self.queue.mark_working();
136
137        'p: loop {
138            while let Some(item) = self.pop() {
139                handler(item);
140            }
141
142            for _ in 0..YIELD_CNT {
143                yield_now().await;
144                if !self.queue.is_empty() {
145                    continue 'p;
146                }
147            }
148
149            if !self.queue.mark_unworking() {
150                continue;
151            }
152
153            select! {
154                _ = working_awaiter.wait() => (),
155                _ = &mut exit => {
156                    return;
157                }
158            }
159            self.queue.mark_working();
160        }
161    }
162}
163
164pub struct WriteQueue<T> {
165    #[cfg(not(all(feature = "monoio", feature = "tpc")))]
166    inner: Arc<Mutex<WriteQueueInner<T>>>,
167    #[cfg(all(feature = "monoio", feature = "tpc"))]
168    inner: Rc<UnsafeCell<WriteQueueInner<T>>>,
169    #[cfg(not(all(feature = "monoio", feature = "tpc")))]
170    working_notifier: Arc<Notifier>,
171    #[cfg(all(feature = "monoio", feature = "tpc"))]
172    working_notifier: Rc<Notifier>,
173}
174
175impl<T> Clone for WriteQueue<T> {
176    fn clone(&self) -> Self {
177        Self {
178            inner: self.inner.clone(),
179            working_notifier: self.working_notifier.clone(),
180        }
181    }
182}
183
184impl<T> WriteQueue<T> {
185    // Return if the item is put into queue or pending tasks.
186    // Note if the task is put into pending tasks, it will be sent to queue when the queue is not full.
187    pub fn push(&self, item: T) -> bool {
188        #[cfg(not(all(feature = "monoio", feature = "tpc")))]
189        let mut inner = self.inner.lock();
190        #[cfg(all(feature = "monoio", feature = "tpc"))]
191        let inner = unsafe { &mut *self.inner.get() };
192        let item = match inner.queue.push(item) {
193            Ok(_) => {
194                if !inner.queue.working() {
195                    inner.queue.mark_working();
196                    #[cfg(not(all(feature = "monoio", feature = "tpc")))]
197                    drop(inner);
198                    let _ = self.working_notifier.notify();
199                }
200                return true;
201            }
202            Err(item) => item,
203        };
204
205        // The queue is full now
206        inner.queue.mark_stuck();
207        let pending = PendingTask {
208            data: Some(item),
209            waiter: None,
210        };
211        inner.pending_tasks.push_back(pending);
212        false
213    }
214
215    // Return if the item is put into queue or pending tasks.
216    // Note if the task is put into pending tasks, it will be sent to queue when the queue is not full.
217    pub fn push_without_notify(&self, item: T) -> bool {
218        #[cfg(not(all(feature = "monoio", feature = "tpc")))]
219        let mut inner = self.inner.lock();
220        #[cfg(all(feature = "monoio", feature = "tpc"))]
221        let inner = unsafe { &mut *self.inner.get() };
222        let item = match inner.queue.push(item) {
223            Ok(_) => return true,
224            Err(item) => item,
225        };
226
227        // The queue is full now
228        inner.queue.mark_stuck();
229        let pending = PendingTask {
230            data: Some(item),
231            waiter: None,
232        };
233        inner.pending_tasks.push_back(pending);
234        false
235    }
236
237    #[inline]
238    pub fn is_empty(&self) -> bool {
239        #[cfg(not(all(feature = "monoio", feature = "tpc")))]
240        let inner = self.inner.lock();
241        #[cfg(all(feature = "monoio", feature = "tpc"))]
242        let inner = unsafe { &*self.inner.get() };
243        inner.queue.is_empty()
244    }
245
246    // If peer is not working, notify it and mark it working.
247    // Return notified
248    pub fn notify_manually(&self) -> bool {
249        #[cfg(not(all(feature = "monoio", feature = "tpc")))]
250        let inner = self.inner.lock();
251        #[cfg(all(feature = "monoio", feature = "tpc"))]
252        let inner = unsafe { &mut *self.inner.get() };
253
254        if inner.queue.working() {
255            return false;
256        }
257
258        inner.queue.mark_working();
259        #[cfg(not(all(feature = "monoio", feature = "tpc")))]
260        drop(inner);
261        let _ = self.working_notifier.notify();
262        true
263    }
264
265    pub fn push_with_awaiter(&self, item: T) -> PushResult {
266        #[cfg(not(all(feature = "monoio", feature = "tpc")))]
267        let mut inner = self.inner.lock();
268        #[cfg(all(feature = "monoio", feature = "tpc"))]
269        let inner = unsafe { &mut *self.inner.get() };
270
271        let item = match inner.queue.push(item) {
272            Ok(_) => {
273                if !inner.queue.working() {
274                    inner.queue.mark_working();
275                    #[cfg(not(all(feature = "monoio", feature = "tpc")))]
276                    drop(inner);
277                    let _ = self.working_notifier.notify();
278                }
279                return PushResult::Ok;
280            }
281            Err(item) => item,
282        };
283
284        // The queue is full now
285        inner.queue.mark_stuck();
286        #[cfg(not(all(feature = "monoio", feature = "tpc")))]
287        let waker_slot = Arc::new(Mutex::new(WakerSlot::None));
288        #[cfg(all(feature = "monoio", feature = "tpc"))]
289        let waker_slot = Rc::new(UnsafeCell::new(WakerSlot::None));
290        let pending = PendingTask {
291            data: Some(item),
292            waiter: Some(waker_slot.clone()),
293        };
294
295        inner.pending_tasks.push_back(pending);
296        PushResult::Pending(PushJoinHandle { waker_slot })
297    }
298
299    async fn unstuck_handler(self, mut unstuck_awaiter: Awaiter, mut tx: Sender<()>) {
300        let mut exit = std::pin::pin!(tx.closed());
301        loop {
302            {
303                #[cfg(not(all(feature = "monoio", feature = "tpc")))]
304                let mut inner = self.inner.lock();
305                #[cfg(all(feature = "monoio", feature = "tpc"))]
306                let inner = unsafe { &mut *self.inner.get() };
307
308                while let Some(mut pending_task) = inner.pending_tasks.pop_front() {
309                    let data = pending_task.data.take().unwrap();
310                    match inner.queue.push(data) {
311                        Ok(_) => {
312                            if let Some(waiter) = pending_task.waiter {
313                                #[cfg(not(all(feature = "monoio", feature = "tpc")))]
314                                waiter.lock().wake();
315                                #[cfg(all(feature = "monoio", feature = "tpc"))]
316                                unsafe {
317                                    (*waiter.get()).wake()
318                                };
319                            }
320                        }
321                        Err(data) => {
322                            pending_task.data = Some(data);
323                            inner.pending_tasks.push_front(pending_task);
324                            break;
325                        }
326                    }
327                }
328                if !inner.queue.working() {
329                    inner.queue.mark_working();
330                    let _ = self.working_notifier.notify();
331                }
332                if !inner.pending_tasks.is_empty() {
333                    inner.queue.mark_stuck();
334                    if !inner.queue.is_full() {
335                        continue;
336                    }
337                }
338            }
339
340            select! {
341                _ = unstuck_awaiter.wait() => (),
342                _ = &mut exit => {
343                    return;
344                }
345            }
346        }
347    }
348}
349
350pub struct WriteQueueInner<T> {
351    queue: Queue<T>,
352    pending_tasks: VecDeque<PendingTask<T>>,
353    _guard: Receiver<()>,
354}
355
356impl<T> WriteQueue<T> {
357    #[inline]
358    pub fn meta(&self) -> QueueMeta {
359        #[cfg(not(all(feature = "monoio", feature = "tpc")))]
360        {
361            self.inner.lock().queue.meta()
362        }
363        #[cfg(all(feature = "monoio", feature = "tpc"))]
364        {
365            unsafe { (*self.inner.get()).queue.meta() }
366        }
367    }
368}
369
370struct PendingTask<T> {
371    // always Some, Option is for taking temporary
372    data: Option<T>,
373    #[cfg(not(all(feature = "monoio", feature = "tpc")))]
374    waiter: Option<Arc<Mutex<WakerSlot>>>,
375    #[cfg(all(feature = "monoio", feature = "tpc"))]
376    waiter: Option<Rc<UnsafeCell<WakerSlot>>>,
377}
378
379enum WakerSlot {
380    None,
381    Some(Waker),
382    Finished,
383}
384
385impl WakerSlot {
386    fn wake(&mut self) {
387        if let WakerSlot::Some(w) = mem::replace(self, Self::Finished) {
388            w.wake();
389        }
390    }
391
392    fn set_waker(&mut self, w: &Waker) -> bool {
393        match self {
394            WakerSlot::None => *self = WakerSlot::Some(w.to_owned()),
395            WakerSlot::Some(old_waker) => old_waker.clone_from(w),
396            WakerSlot::Finished => return true,
397        }
398        false
399    }
400}
401
402pub struct Queue<T> {
403    buffer_ptr: *mut MaybeUninit<T>,
404    buffer_len: usize,
405
406    head_ptr: *mut AtomicUsize,
407    tail_ptr: *mut AtomicUsize,
408    working_ptr: *mut AtomicU32,
409    stuck_ptr: *mut AtomicU32,
410
411    working_fd: RawFd,
412    unstuck_fd: RawFd,
413
414    do_drop: bool,
415}
416
417unsafe impl<T: Send> Send for Queue<T> {}
418
419#[repr(C)]
420#[derive(Debug, Clone, Copy)]
421pub struct QueueMeta {
422    pub buffer_ptr: usize,
423    pub buffer_len: usize,
424    pub head_ptr: usize,
425    pub tail_ptr: usize,
426    pub working_ptr: usize,
427    pub stuck_ptr: usize,
428    pub working_fd: RawFd,
429    pub unstuck_fd: RawFd,
430}
431
432unsafe impl<T: Sync> Sync for Queue<T> {}
433
434impl<T> Queue<T> {
435    pub fn new(size: usize) -> Result<(Self, QueueMeta), io::Error> {
436        let buffer = unsafe {
437            let mut v = Vec::<MaybeUninit<T>>::with_capacity(size);
438            v.set_len(size);
439            v.into_boxed_slice()
440        };
441        let buffer_slice = Box::leak(buffer);
442
443        let head_ptr = Box::leak(Box::new(AtomicUsize::new(0)));
444        let tail_ptr = Box::leak(Box::new(AtomicUsize::new(0)));
445        let working_ptr = Box::leak(Box::new(AtomicU32::new(0)));
446        let stuck_ptr = Box::leak(Box::new(AtomicU32::new(0)));
447
448        let (working_fd, working_fd_peer) = new_pair()?;
449        let (unstuck_fd, unstuck_fd_peer) = new_pair()?;
450
451        let queue = Self {
452            buffer_ptr: buffer_slice.as_mut_ptr(),
453            buffer_len: size,
454            head_ptr,
455            tail_ptr,
456            working_ptr,
457            stuck_ptr,
458            working_fd,
459            unstuck_fd,
460            do_drop: true,
461        };
462        let meta = QueueMeta {
463            buffer_ptr: queue.buffer_ptr as _,
464            buffer_len: queue.buffer_len,
465            head_ptr: queue.head_ptr as _,
466            tail_ptr: queue.tail_ptr as _,
467            working_ptr: queue.working_ptr as _,
468            stuck_ptr: queue.stuck_ptr as _,
469            working_fd: working_fd_peer,
470            unstuck_fd: unstuck_fd_peer,
471        };
472
473        Ok((queue, meta))
474    }
475
476    /// # Safety
477    /// Must make sure the meta is valid until the Queue is dropped
478    pub unsafe fn new_from_meta(meta: &QueueMeta) -> Result<Self, io::Error> {
479        let buffer_slice =
480            std::slice::from_raw_parts_mut(meta.buffer_ptr as *mut MaybeUninit<T>, meta.buffer_len);
481        let size = buffer_slice.len();
482        let head_ptr = meta.head_ptr as *mut AtomicUsize;
483        let tail_ptr = meta.tail_ptr as *mut AtomicUsize;
484        let working_ptr = meta.working_ptr as *mut AtomicU32;
485        let stuck_ptr = meta.stuck_ptr as *mut AtomicU32;
486        let working_fd = meta.working_fd;
487        let unstuck_fd = meta.unstuck_fd;
488        Ok(Self {
489            buffer_ptr: buffer_slice.as_mut_ptr(),
490            buffer_len: size,
491            head_ptr,
492            tail_ptr,
493            working_ptr,
494            stuck_ptr,
495            working_fd,
496            unstuck_fd,
497            do_drop: false,
498        })
499    }
500
501    #[inline]
502    pub fn is_memory_owner(&self) -> bool {
503        self.do_drop
504    }
505
506    #[inline]
507    pub fn meta(&self) -> QueueMeta {
508        QueueMeta {
509            buffer_ptr: self.buffer_ptr as _,
510            buffer_len: self.buffer_len,
511            head_ptr: self.head_ptr as _,
512            tail_ptr: self.tail_ptr as _,
513            working_ptr: self.working_ptr as _,
514            stuck_ptr: self.stuck_ptr as _,
515            working_fd: self.working_fd,
516            unstuck_fd: self.unstuck_fd,
517        }
518    }
519
520    pub fn read(self) -> ReadQueue<T> {
521        let unstuck_notifier = unsafe { Notifier::from_raw_fd(self.unstuck_fd) };
522        ReadQueue {
523            queue: self,
524            unstuck_notifier,
525            #[cfg(all(feature = "tokio", not(feature = "monoio")))]
526            tokio_handle: None,
527        }
528    }
529
530    #[cfg(all(feature = "tokio", not(feature = "monoio")))]
531    pub fn read_with_tokio_handle(self, tokio_handle: tokio::runtime::Handle) -> ReadQueue<T> {
532        let unstuck_notifier = unsafe { Notifier::from_raw_fd(self.unstuck_fd) };
533        ReadQueue {
534            queue: self,
535            unstuck_notifier,
536            tokio_handle: Some(tokio_handle),
537        }
538    }
539
540    #[cfg(feature = "monoio")]
541    pub fn write(self) -> Result<WriteQueue<T>, io::Error>
542    where
543        T: 'static,
544    {
545        let working_notifier = unsafe { Notifier::from_raw_fd(self.working_fd) };
546        let unstuck_awaiter = unsafe { Awaiter::from_raw_fd(self.unstuck_fd) }?;
547
548        let (tx, rx) = channel();
549        let wq = WriteQueue {
550            #[cfg(feature = "tpc")]
551            inner: Rc::new(UnsafeCell::new(WriteQueueInner {
552                queue: self,
553                pending_tasks: VecDeque::new(),
554                _guard: rx,
555            })),
556            #[cfg(not(feature = "tpc"))]
557            inner: Arc::new(Mutex::new(WriteQueueInner {
558                queue: self,
559                pending_tasks: VecDeque::new(),
560                _guard: rx,
561            })),
562            #[cfg(feature = "tpc")]
563            working_notifier: Rc::new(working_notifier),
564            #[cfg(not(feature = "tpc"))]
565            working_notifier: Arc::new(working_notifier),
566        };
567
568        spawn(wq.clone().unstuck_handler(unstuck_awaiter, tx));
569
570        Ok(wq)
571    }
572
573    #[cfg(all(feature = "tokio", not(feature = "monoio")))]
574    pub fn write(self) -> Result<WriteQueue<T>, io::Error>
575    where
576        T: Send + 'static,
577    {
578        let working_notifier = unsafe { Notifier::from_raw_fd(self.working_fd) };
579        let unstuck_awaiter = unsafe { Awaiter::from_raw_fd(self.unstuck_fd) }?;
580
581        let (tx, rx) = channel();
582        let wq = WriteQueue {
583            inner: Arc::new(Mutex::new(WriteQueueInner {
584                queue: self,
585                pending_tasks: VecDeque::new(),
586                _guard: rx,
587            })),
588            working_notifier: Arc::new(working_notifier),
589        };
590
591        spawn(wq.clone().unstuck_handler(unstuck_awaiter, tx));
592
593        Ok(wq)
594    }
595
596    #[cfg(all(feature = "tokio", not(feature = "monoio")))]
597    pub fn write_with_tokio_handle(
598        self,
599        tokio_handle: &tokio::runtime::Handle,
600    ) -> Result<WriteQueue<T>, io::Error>
601    where
602        T: Send + 'static,
603    {
604        let working_notifier = unsafe { Notifier::from_raw_fd(self.working_fd) };
605        let unstuck_awaiter = unsafe { Awaiter::from_raw_fd(self.unstuck_fd) }?;
606
607        let (tx, rx) = channel();
608        let wq = WriteQueue {
609            inner: Arc::new(Mutex::new(WriteQueueInner {
610                queue: self,
611                pending_tasks: VecDeque::new(),
612                _guard: rx,
613            })),
614            working_notifier: Arc::new(working_notifier),
615        };
616
617        tokio_handle.spawn(wq.clone().unstuck_handler(unstuck_awaiter, tx));
618
619        Ok(wq)
620    }
621}
622
623impl<T> Drop for Queue<T> {
624    fn drop(&mut self) {
625        if self.do_drop {
626            unsafe {
627                let slice = std::slice::from_raw_parts_mut(self.buffer_ptr, self.buffer_len);
628                let _ = Box::from_raw(slice as *mut [MaybeUninit<T>]);
629                let _ = Box::from_raw(self.head_ptr);
630                let _ = Box::from_raw(self.tail_ptr);
631                let _ = Box::from_raw(self.working_ptr);
632                let _ = Box::from_raw(self.stuck_ptr);
633                let _ = Notifier::from_raw_fd(self.unstuck_fd);
634                let _ = Notifier::from_raw_fd(self.working_fd);
635            }
636        }
637    }
638}
639
640pub enum PushResult {
641    Ok,
642    Pending(PushJoinHandle),
643}
644
645pub struct PushJoinHandle {
646    #[cfg(all(feature = "monoio", feature = "tpc"))]
647    waker_slot: Rc<UnsafeCell<WakerSlot>>,
648
649    #[cfg(not(all(feature = "monoio", feature = "tpc")))]
650    waker_slot: Arc<Mutex<WakerSlot>>,
651}
652
653impl Future for PushJoinHandle {
654    type Output = ();
655
656    fn poll(
657        self: std::pin::Pin<&mut Self>,
658        cx: &mut std::task::Context<'_>,
659    ) -> std::task::Poll<Self::Output> {
660        #[cfg(all(feature = "monoio", feature = "tpc"))]
661        let slot = unsafe { &mut *self.waker_slot.get() };
662        #[cfg(not(all(feature = "monoio", feature = "tpc")))]
663        let mut slot = self.waker_slot.lock();
664        if slot.set_waker(cx.waker()) {
665            return std::task::Poll::Ready(());
666        }
667        std::task::Poll::Pending
668    }
669}
670
671impl<T> Queue<T> {
672    pub fn len(&self) -> usize {
673        let shead = unsafe { &*self.head_ptr };
674        let stail = unsafe { &*self.tail_ptr };
675        stail.load(Ordering::Acquire) - shead.load(Ordering::Acquire)
676    }
677
678    pub fn is_empty(&self) -> bool {
679        let shead = unsafe { &*self.head_ptr };
680        let stail = unsafe { &*self.tail_ptr };
681        stail.load(Ordering::Acquire) == shead.load(Ordering::Acquire)
682    }
683
684    pub fn is_full(&self) -> bool {
685        let shead = unsafe { &*self.head_ptr };
686        let stail = unsafe { &*self.tail_ptr };
687        stail.load(Ordering::Acquire) - shead.load(Ordering::Acquire) == self.buffer_len
688    }
689
690    fn push(&mut self, item: T) -> Result<(), T> {
691        let shead = unsafe { &*self.head_ptr };
692        let stail = unsafe { &*self.tail_ptr };
693
694        let tail = stail.load(Ordering::Relaxed);
695        if tail - shead.load(Ordering::Acquire) == self.buffer_len {
696            return Err(item);
697        }
698
699        unsafe {
700            (*self.buffer_ptr.add(tail % self.buffer_len)).write(item);
701        }
702        stail.store(tail + 1, Ordering::Release);
703        Ok(())
704    }
705
706    fn pop(&mut self) -> Option<T> {
707        let shead = unsafe { &*self.head_ptr };
708        let stail = unsafe { &*self.tail_ptr };
709
710        let head = shead.load(Ordering::Relaxed);
711        if head == stail.load(Ordering::Acquire) {
712            return None;
713        }
714
715        let item = unsafe { (*self.buffer_ptr.add(head % self.buffer_len)).assume_init_read() };
716        shead.store(head + 1, Ordering::Release);
717        Some(item)
718    }
719
720    #[inline]
721    fn mark_unworking(&self) -> bool {
722        unsafe { &*self.working_ptr }.store(0, Ordering::Release);
723        if self.is_empty() {
724            return true;
725        }
726        self.mark_working();
727        false
728    }
729
730    #[inline]
731    fn mark_working(&self) {
732        unsafe { &*self.working_ptr }.store(1, Ordering::Release);
733    }
734
735    #[inline]
736    fn working(&self) -> bool {
737        unsafe { &*self.working_ptr }.load(Ordering::Acquire) == 1
738    }
739
740    #[inline]
741    fn mark_unstuck(&self) {
742        unsafe { &*self.stuck_ptr }.store(0, Ordering::Release);
743    }
744
745    #[inline]
746    fn mark_stuck(&self) {
747        unsafe { &*self.stuck_ptr }.store(1, Ordering::Release);
748    }
749
750    #[inline]
751    fn stuck(&self) -> bool {
752        unsafe { &*self.stuck_ptr }.load(Ordering::Acquire) == 1
753    }
754}
755
756#[cfg(test)]
757mod tests {
758    use std::time::Duration;
759
760    use super::*;
761
762    #[cfg(feature = "monoio")]
763    use monoio::time::sleep;
764    #[cfg(all(feature = "tokio", not(feature = "monoio")))]
765    use tokio::time::sleep;
766
767    macro_rules! test {
768        ($($i: item)*) => {$(
769            #[cfg(feature = "monoio")]
770            #[monoio::test(timer_enabled = true)]
771            $i
772
773            #[cfg(all(feature = "tokio", not(feature = "monoio")))]
774            #[tokio::test]
775            $i
776        )*};
777    }
778
779    test! {
780        async fn demo_wake() {
781            let (mut tx, mut rx) = channel::<()>();
782
783            let (q_read, meta) = Queue::<u8>::new(1024).unwrap();
784            let q_write = unsafe { Queue::<u8>::new_from_meta(&meta) }.unwrap();
785            let q_read = q_read.read();
786            let q_write = q_write.write().unwrap();
787
788            let _guard = q_read
789                .run_handler(move |item| {
790                    if item == 2 {
791                        rx.close();
792                    }
793                })
794                .unwrap();
795
796            q_write.push(1);
797            sleep(Duration::from_secs(1)).await;
798            q_write.push(2);
799            tx.closed().await;
800        }
801
802        async fn demo_stuck() {
803            let (mut tx, mut rx) = channel::<()>();
804
805            let (q_read, meta) = Queue::<u8>::new(1).unwrap();
806            let q_write = unsafe { Queue::<u8>::new_from_meta(&meta) }.unwrap();
807            let q_read = q_read.read();
808            let q_write = q_write.write().unwrap();
809
810            let _guard = q_read
811                .run_handler(move |item| {
812                    if item == 4 {
813                        rx.close();
814                    }
815                })
816                .unwrap();
817            println!("pushed {}", q_write.push(1));
818            println!("pushed {}", q_write.push(2));
819            println!("pushed {}", q_write.push(3));
820            println!("pushed {}", q_write.push(4));
821            sleep(Duration::from_secs(1)).await;
822
823            tx.closed().await;
824        }
825    }
826}