1use std::{
2 collections::VecDeque,
3 future::Future,
4 io,
5 mem::{self, MaybeUninit},
6 os::fd::RawFd,
7 sync::atomic::{AtomicU32, AtomicUsize, Ordering},
8 task::Waker,
9};
10
11#[cfg(not(all(feature = "monoio", feature = "tpc")))]
12use parking_lot::Mutex;
13#[cfg(not(all(feature = "monoio", feature = "tpc")))]
14use std::sync::Arc;
15
16#[cfg(all(feature = "monoio", feature = "tpc"))]
17use std::{cell::UnsafeCell, rc::Rc};
18
19#[cfg(feature = "monoio")]
20use local_sync::oneshot::{channel, Receiver, Sender};
21
22#[cfg(all(feature = "tokio", not(feature = "monoio")))]
23use tokio::sync::oneshot::{channel, Receiver, Sender};
24
25#[cfg(feature = "monoio")]
26use monoio::{select, spawn};
27#[cfg(all(feature = "tokio", not(feature = "monoio")))]
28use tokio::{select, spawn};
29
30use crate::{
31 eventfd::{new_pair, Awaiter, Notifier},
32 util::yield_now,
33};
34
35pub struct Guard {
36 _rx: Receiver<()>,
37}
38
39pub struct ReadQueue<T> {
40 queue: Queue<T>,
41 unstuck_notifier: Notifier,
42 #[cfg(all(feature = "tokio", not(feature = "monoio")))]
43 tokio_handle: Option<tokio::runtime::Handle>,
44}
45
46impl<T> ReadQueue<T> {
47 #[inline]
48 pub fn meta(&self) -> QueueMeta {
49 self.queue.meta()
50 }
51
52 pub fn pop(&mut self) -> Option<T> {
53 let maybe_item = self.queue.pop();
54 if self.queue.stuck() {
55 self.queue.mark_unstuck();
56 self.unstuck_notifier.notify().ok();
57 }
58 maybe_item
59 }
60
61 #[cfg(feature = "monoio")]
62 pub fn run_handler(self, handler: impl FnMut(T) + 'static) -> Result<Guard, io::Error>
63 where
64 T: 'static,
65 {
66 let working_awaiter = unsafe { Awaiter::from_raw_fd(self.queue.working_fd)? };
67 let (tx, rx) = channel();
68 spawn(self.working_handler(working_awaiter, handler, tx));
69 Ok(Guard { _rx: rx })
70 }
71
72 #[cfg(all(feature = "tokio", not(feature = "monoio")))]
73 pub fn run_handler(self, handler: impl FnMut(T) + Send + 'static) -> Result<Guard, io::Error>
74 where
75 T: Send + 'static,
76 {
77 let working_awaiter = unsafe { Awaiter::from_raw_fd(self.queue.working_fd)? };
78 let (tx, rx) = channel();
79 if let Some(tokio_handle) = self.tokio_handle.clone() {
80 tokio_handle.spawn(self.working_handler(working_awaiter, handler, tx));
81 } else {
82 spawn(self.working_handler(working_awaiter, handler, tx));
83 }
84 Ok(Guard { _rx: rx })
85 }
86
87 #[cfg(feature = "monoio")]
88 async fn working_handler(
89 mut self,
90 mut working_awaiter: Awaiter,
91 mut handler: impl FnMut(T),
92 mut tx: Sender<()>,
93 ) {
94 const YIELD_CNT: u8 = 3;
95 let mut exit = std::pin::pin!(tx.closed());
96 self.queue.mark_working();
97
98 'p: loop {
99 while let Some(item) = self.pop() {
100 handler(item);
101 }
102
103 for _ in 0..YIELD_CNT {
104 yield_now().await;
105 if !self.queue.is_empty() {
106 continue 'p;
107 }
108 }
109
110 if !self.queue.mark_unworking() {
111 continue;
112 }
113
114 select! {
115 _ = working_awaiter.wait() => (),
116 _ = &mut exit => {
117 return;
118 }
119 }
120 self.queue.mark_working();
121 }
122 }
123
124 #[cfg(all(feature = "tokio", not(feature = "monoio")))]
125 async fn working_handler(
126 mut self,
127 mut working_awaiter: Awaiter,
128 mut handler: impl FnMut(T) + Send,
129 mut tx: Sender<()>,
130 ) where
131 T: Send,
132 {
133 const YIELD_CNT: u8 = 3;
134 let mut exit = std::pin::pin!(tx.closed());
135 self.queue.mark_working();
136
137 'p: loop {
138 while let Some(item) = self.pop() {
139 handler(item);
140 }
141
142 for _ in 0..YIELD_CNT {
143 yield_now().await;
144 if !self.queue.is_empty() {
145 continue 'p;
146 }
147 }
148
149 if !self.queue.mark_unworking() {
150 continue;
151 }
152
153 select! {
154 _ = working_awaiter.wait() => (),
155 _ = &mut exit => {
156 return;
157 }
158 }
159 self.queue.mark_working();
160 }
161 }
162}
163
164pub struct WriteQueue<T> {
165 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
166 inner: Arc<Mutex<WriteQueueInner<T>>>,
167 #[cfg(all(feature = "monoio", feature = "tpc"))]
168 inner: Rc<UnsafeCell<WriteQueueInner<T>>>,
169 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
170 working_notifier: Arc<Notifier>,
171 #[cfg(all(feature = "monoio", feature = "tpc"))]
172 working_notifier: Rc<Notifier>,
173}
174
175impl<T> Clone for WriteQueue<T> {
176 fn clone(&self) -> Self {
177 Self {
178 inner: self.inner.clone(),
179 working_notifier: self.working_notifier.clone(),
180 }
181 }
182}
183
184impl<T> WriteQueue<T> {
185 pub fn push(&self, item: T) -> bool {
188 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
189 let mut inner = self.inner.lock();
190 #[cfg(all(feature = "monoio", feature = "tpc"))]
191 let inner = unsafe { &mut *self.inner.get() };
192 let item = match inner.queue.push(item) {
193 Ok(_) => {
194 if !inner.queue.working() {
195 inner.queue.mark_working();
196 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
197 drop(inner);
198 let _ = self.working_notifier.notify();
199 }
200 return true;
201 }
202 Err(item) => item,
203 };
204
205 inner.queue.mark_stuck();
207 let pending = PendingTask {
208 data: Some(item),
209 waiter: None,
210 };
211 inner.pending_tasks.push_back(pending);
212 false
213 }
214
215 pub fn push_without_notify(&self, item: T) -> bool {
218 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
219 let mut inner = self.inner.lock();
220 #[cfg(all(feature = "monoio", feature = "tpc"))]
221 let inner = unsafe { &mut *self.inner.get() };
222 let item = match inner.queue.push(item) {
223 Ok(_) => return true,
224 Err(item) => item,
225 };
226
227 inner.queue.mark_stuck();
229 let pending = PendingTask {
230 data: Some(item),
231 waiter: None,
232 };
233 inner.pending_tasks.push_back(pending);
234 false
235 }
236
237 #[inline]
238 pub fn is_empty(&self) -> bool {
239 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
240 let inner = self.inner.lock();
241 #[cfg(all(feature = "monoio", feature = "tpc"))]
242 let inner = unsafe { &*self.inner.get() };
243 inner.queue.is_empty()
244 }
245
246 pub fn notify_manually(&self) -> bool {
249 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
250 let inner = self.inner.lock();
251 #[cfg(all(feature = "monoio", feature = "tpc"))]
252 let inner = unsafe { &mut *self.inner.get() };
253
254 if inner.queue.working() {
255 return false;
256 }
257
258 inner.queue.mark_working();
259 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
260 drop(inner);
261 let _ = self.working_notifier.notify();
262 true
263 }
264
265 pub fn push_with_awaiter(&self, item: T) -> PushResult {
266 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
267 let mut inner = self.inner.lock();
268 #[cfg(all(feature = "monoio", feature = "tpc"))]
269 let inner = unsafe { &mut *self.inner.get() };
270
271 let item = match inner.queue.push(item) {
272 Ok(_) => {
273 if !inner.queue.working() {
274 inner.queue.mark_working();
275 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
276 drop(inner);
277 let _ = self.working_notifier.notify();
278 }
279 return PushResult::Ok;
280 }
281 Err(item) => item,
282 };
283
284 inner.queue.mark_stuck();
286 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
287 let waker_slot = Arc::new(Mutex::new(WakerSlot::None));
288 #[cfg(all(feature = "monoio", feature = "tpc"))]
289 let waker_slot = Rc::new(UnsafeCell::new(WakerSlot::None));
290 let pending = PendingTask {
291 data: Some(item),
292 waiter: Some(waker_slot.clone()),
293 };
294
295 inner.pending_tasks.push_back(pending);
296 PushResult::Pending(PushJoinHandle { waker_slot })
297 }
298
299 async fn unstuck_handler(self, mut unstuck_awaiter: Awaiter, mut tx: Sender<()>) {
300 let mut exit = std::pin::pin!(tx.closed());
301 loop {
302 {
303 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
304 let mut inner = self.inner.lock();
305 #[cfg(all(feature = "monoio", feature = "tpc"))]
306 let inner = unsafe { &mut *self.inner.get() };
307
308 while let Some(mut pending_task) = inner.pending_tasks.pop_front() {
309 let data = pending_task.data.take().unwrap();
310 match inner.queue.push(data) {
311 Ok(_) => {
312 if let Some(waiter) = pending_task.waiter {
313 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
314 waiter.lock().wake();
315 #[cfg(all(feature = "monoio", feature = "tpc"))]
316 unsafe {
317 (*waiter.get()).wake()
318 };
319 }
320 }
321 Err(data) => {
322 pending_task.data = Some(data);
323 inner.pending_tasks.push_front(pending_task);
324 break;
325 }
326 }
327 }
328 if !inner.queue.working() {
329 inner.queue.mark_working();
330 let _ = self.working_notifier.notify();
331 }
332 if !inner.pending_tasks.is_empty() {
333 inner.queue.mark_stuck();
334 if !inner.queue.is_full() {
335 continue;
336 }
337 }
338 }
339
340 select! {
341 _ = unstuck_awaiter.wait() => (),
342 _ = &mut exit => {
343 return;
344 }
345 }
346 }
347 }
348}
349
350pub struct WriteQueueInner<T> {
351 queue: Queue<T>,
352 pending_tasks: VecDeque<PendingTask<T>>,
353 _guard: Receiver<()>,
354}
355
356impl<T> WriteQueue<T> {
357 #[inline]
358 pub fn meta(&self) -> QueueMeta {
359 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
360 {
361 self.inner.lock().queue.meta()
362 }
363 #[cfg(all(feature = "monoio", feature = "tpc"))]
364 {
365 unsafe { (*self.inner.get()).queue.meta() }
366 }
367 }
368}
369
370struct PendingTask<T> {
371 data: Option<T>,
373 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
374 waiter: Option<Arc<Mutex<WakerSlot>>>,
375 #[cfg(all(feature = "monoio", feature = "tpc"))]
376 waiter: Option<Rc<UnsafeCell<WakerSlot>>>,
377}
378
379enum WakerSlot {
380 None,
381 Some(Waker),
382 Finished,
383}
384
385impl WakerSlot {
386 fn wake(&mut self) {
387 if let WakerSlot::Some(w) = mem::replace(self, Self::Finished) {
388 w.wake();
389 }
390 }
391
392 fn set_waker(&mut self, w: &Waker) -> bool {
393 match self {
394 WakerSlot::None => *self = WakerSlot::Some(w.to_owned()),
395 WakerSlot::Some(old_waker) => old_waker.clone_from(w),
396 WakerSlot::Finished => return true,
397 }
398 false
399 }
400}
401
402pub struct Queue<T> {
403 buffer_ptr: *mut MaybeUninit<T>,
404 buffer_len: usize,
405
406 head_ptr: *mut AtomicUsize,
407 tail_ptr: *mut AtomicUsize,
408 working_ptr: *mut AtomicU32,
409 stuck_ptr: *mut AtomicU32,
410
411 working_fd: RawFd,
412 unstuck_fd: RawFd,
413
414 do_drop: bool,
415}
416
417unsafe impl<T: Send> Send for Queue<T> {}
418
419#[repr(C)]
420#[derive(Debug, Clone, Copy)]
421pub struct QueueMeta {
422 pub buffer_ptr: usize,
423 pub buffer_len: usize,
424 pub head_ptr: usize,
425 pub tail_ptr: usize,
426 pub working_ptr: usize,
427 pub stuck_ptr: usize,
428 pub working_fd: RawFd,
429 pub unstuck_fd: RawFd,
430}
431
432unsafe impl<T: Sync> Sync for Queue<T> {}
433
434impl<T> Queue<T> {
435 pub fn new(size: usize) -> Result<(Self, QueueMeta), io::Error> {
436 let buffer = unsafe {
437 let mut v = Vec::<MaybeUninit<T>>::with_capacity(size);
438 v.set_len(size);
439 v.into_boxed_slice()
440 };
441 let buffer_slice = Box::leak(buffer);
442
443 let head_ptr = Box::leak(Box::new(AtomicUsize::new(0)));
444 let tail_ptr = Box::leak(Box::new(AtomicUsize::new(0)));
445 let working_ptr = Box::leak(Box::new(AtomicU32::new(0)));
446 let stuck_ptr = Box::leak(Box::new(AtomicU32::new(0)));
447
448 let (working_fd, working_fd_peer) = new_pair()?;
449 let (unstuck_fd, unstuck_fd_peer) = new_pair()?;
450
451 let queue = Self {
452 buffer_ptr: buffer_slice.as_mut_ptr(),
453 buffer_len: size,
454 head_ptr,
455 tail_ptr,
456 working_ptr,
457 stuck_ptr,
458 working_fd,
459 unstuck_fd,
460 do_drop: true,
461 };
462 let meta = QueueMeta {
463 buffer_ptr: queue.buffer_ptr as _,
464 buffer_len: queue.buffer_len,
465 head_ptr: queue.head_ptr as _,
466 tail_ptr: queue.tail_ptr as _,
467 working_ptr: queue.working_ptr as _,
468 stuck_ptr: queue.stuck_ptr as _,
469 working_fd: working_fd_peer,
470 unstuck_fd: unstuck_fd_peer,
471 };
472
473 Ok((queue, meta))
474 }
475
476 pub unsafe fn new_from_meta(meta: &QueueMeta) -> Result<Self, io::Error> {
479 let buffer_slice =
480 std::slice::from_raw_parts_mut(meta.buffer_ptr as *mut MaybeUninit<T>, meta.buffer_len);
481 let size = buffer_slice.len();
482 let head_ptr = meta.head_ptr as *mut AtomicUsize;
483 let tail_ptr = meta.tail_ptr as *mut AtomicUsize;
484 let working_ptr = meta.working_ptr as *mut AtomicU32;
485 let stuck_ptr = meta.stuck_ptr as *mut AtomicU32;
486 let working_fd = meta.working_fd;
487 let unstuck_fd = meta.unstuck_fd;
488 Ok(Self {
489 buffer_ptr: buffer_slice.as_mut_ptr(),
490 buffer_len: size,
491 head_ptr,
492 tail_ptr,
493 working_ptr,
494 stuck_ptr,
495 working_fd,
496 unstuck_fd,
497 do_drop: false,
498 })
499 }
500
501 #[inline]
502 pub fn is_memory_owner(&self) -> bool {
503 self.do_drop
504 }
505
506 #[inline]
507 pub fn meta(&self) -> QueueMeta {
508 QueueMeta {
509 buffer_ptr: self.buffer_ptr as _,
510 buffer_len: self.buffer_len,
511 head_ptr: self.head_ptr as _,
512 tail_ptr: self.tail_ptr as _,
513 working_ptr: self.working_ptr as _,
514 stuck_ptr: self.stuck_ptr as _,
515 working_fd: self.working_fd,
516 unstuck_fd: self.unstuck_fd,
517 }
518 }
519
520 pub fn read(self) -> ReadQueue<T> {
521 let unstuck_notifier = unsafe { Notifier::from_raw_fd(self.unstuck_fd) };
522 ReadQueue {
523 queue: self,
524 unstuck_notifier,
525 #[cfg(all(feature = "tokio", not(feature = "monoio")))]
526 tokio_handle: None,
527 }
528 }
529
530 #[cfg(all(feature = "tokio", not(feature = "monoio")))]
531 pub fn read_with_tokio_handle(self, tokio_handle: tokio::runtime::Handle) -> ReadQueue<T> {
532 let unstuck_notifier = unsafe { Notifier::from_raw_fd(self.unstuck_fd) };
533 ReadQueue {
534 queue: self,
535 unstuck_notifier,
536 tokio_handle: Some(tokio_handle),
537 }
538 }
539
540 #[cfg(feature = "monoio")]
541 pub fn write(self) -> Result<WriteQueue<T>, io::Error>
542 where
543 T: 'static,
544 {
545 let working_notifier = unsafe { Notifier::from_raw_fd(self.working_fd) };
546 let unstuck_awaiter = unsafe { Awaiter::from_raw_fd(self.unstuck_fd) }?;
547
548 let (tx, rx) = channel();
549 let wq = WriteQueue {
550 #[cfg(feature = "tpc")]
551 inner: Rc::new(UnsafeCell::new(WriteQueueInner {
552 queue: self,
553 pending_tasks: VecDeque::new(),
554 _guard: rx,
555 })),
556 #[cfg(not(feature = "tpc"))]
557 inner: Arc::new(Mutex::new(WriteQueueInner {
558 queue: self,
559 pending_tasks: VecDeque::new(),
560 _guard: rx,
561 })),
562 #[cfg(feature = "tpc")]
563 working_notifier: Rc::new(working_notifier),
564 #[cfg(not(feature = "tpc"))]
565 working_notifier: Arc::new(working_notifier),
566 };
567
568 spawn(wq.clone().unstuck_handler(unstuck_awaiter, tx));
569
570 Ok(wq)
571 }
572
573 #[cfg(all(feature = "tokio", not(feature = "monoio")))]
574 pub fn write(self) -> Result<WriteQueue<T>, io::Error>
575 where
576 T: Send + 'static,
577 {
578 let working_notifier = unsafe { Notifier::from_raw_fd(self.working_fd) };
579 let unstuck_awaiter = unsafe { Awaiter::from_raw_fd(self.unstuck_fd) }?;
580
581 let (tx, rx) = channel();
582 let wq = WriteQueue {
583 inner: Arc::new(Mutex::new(WriteQueueInner {
584 queue: self,
585 pending_tasks: VecDeque::new(),
586 _guard: rx,
587 })),
588 working_notifier: Arc::new(working_notifier),
589 };
590
591 spawn(wq.clone().unstuck_handler(unstuck_awaiter, tx));
592
593 Ok(wq)
594 }
595
596 #[cfg(all(feature = "tokio", not(feature = "monoio")))]
597 pub fn write_with_tokio_handle(
598 self,
599 tokio_handle: &tokio::runtime::Handle,
600 ) -> Result<WriteQueue<T>, io::Error>
601 where
602 T: Send + 'static,
603 {
604 let working_notifier = unsafe { Notifier::from_raw_fd(self.working_fd) };
605 let unstuck_awaiter = unsafe { Awaiter::from_raw_fd(self.unstuck_fd) }?;
606
607 let (tx, rx) = channel();
608 let wq = WriteQueue {
609 inner: Arc::new(Mutex::new(WriteQueueInner {
610 queue: self,
611 pending_tasks: VecDeque::new(),
612 _guard: rx,
613 })),
614 working_notifier: Arc::new(working_notifier),
615 };
616
617 tokio_handle.spawn(wq.clone().unstuck_handler(unstuck_awaiter, tx));
618
619 Ok(wq)
620 }
621}
622
623impl<T> Drop for Queue<T> {
624 fn drop(&mut self) {
625 if self.do_drop {
626 unsafe {
627 let slice = std::slice::from_raw_parts_mut(self.buffer_ptr, self.buffer_len);
628 let _ = Box::from_raw(slice as *mut [MaybeUninit<T>]);
629 let _ = Box::from_raw(self.head_ptr);
630 let _ = Box::from_raw(self.tail_ptr);
631 let _ = Box::from_raw(self.working_ptr);
632 let _ = Box::from_raw(self.stuck_ptr);
633 let _ = Notifier::from_raw_fd(self.unstuck_fd);
634 let _ = Notifier::from_raw_fd(self.working_fd);
635 }
636 }
637 }
638}
639
640pub enum PushResult {
641 Ok,
642 Pending(PushJoinHandle),
643}
644
645pub struct PushJoinHandle {
646 #[cfg(all(feature = "monoio", feature = "tpc"))]
647 waker_slot: Rc<UnsafeCell<WakerSlot>>,
648
649 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
650 waker_slot: Arc<Mutex<WakerSlot>>,
651}
652
653impl Future for PushJoinHandle {
654 type Output = ();
655
656 fn poll(
657 self: std::pin::Pin<&mut Self>,
658 cx: &mut std::task::Context<'_>,
659 ) -> std::task::Poll<Self::Output> {
660 #[cfg(all(feature = "monoio", feature = "tpc"))]
661 let slot = unsafe { &mut *self.waker_slot.get() };
662 #[cfg(not(all(feature = "monoio", feature = "tpc")))]
663 let mut slot = self.waker_slot.lock();
664 if slot.set_waker(cx.waker()) {
665 return std::task::Poll::Ready(());
666 }
667 std::task::Poll::Pending
668 }
669}
670
671impl<T> Queue<T> {
672 pub fn len(&self) -> usize {
673 let shead = unsafe { &*self.head_ptr };
674 let stail = unsafe { &*self.tail_ptr };
675 stail.load(Ordering::Acquire) - shead.load(Ordering::Acquire)
676 }
677
678 pub fn is_empty(&self) -> bool {
679 let shead = unsafe { &*self.head_ptr };
680 let stail = unsafe { &*self.tail_ptr };
681 stail.load(Ordering::Acquire) == shead.load(Ordering::Acquire)
682 }
683
684 pub fn is_full(&self) -> bool {
685 let shead = unsafe { &*self.head_ptr };
686 let stail = unsafe { &*self.tail_ptr };
687 stail.load(Ordering::Acquire) - shead.load(Ordering::Acquire) == self.buffer_len
688 }
689
690 fn push(&mut self, item: T) -> Result<(), T> {
691 let shead = unsafe { &*self.head_ptr };
692 let stail = unsafe { &*self.tail_ptr };
693
694 let tail = stail.load(Ordering::Relaxed);
695 if tail - shead.load(Ordering::Acquire) == self.buffer_len {
696 return Err(item);
697 }
698
699 unsafe {
700 (*self.buffer_ptr.add(tail % self.buffer_len)).write(item);
701 }
702 stail.store(tail + 1, Ordering::Release);
703 Ok(())
704 }
705
706 fn pop(&mut self) -> Option<T> {
707 let shead = unsafe { &*self.head_ptr };
708 let stail = unsafe { &*self.tail_ptr };
709
710 let head = shead.load(Ordering::Relaxed);
711 if head == stail.load(Ordering::Acquire) {
712 return None;
713 }
714
715 let item = unsafe { (*self.buffer_ptr.add(head % self.buffer_len)).assume_init_read() };
716 shead.store(head + 1, Ordering::Release);
717 Some(item)
718 }
719
720 #[inline]
721 fn mark_unworking(&self) -> bool {
722 unsafe { &*self.working_ptr }.store(0, Ordering::Release);
723 if self.is_empty() {
724 return true;
725 }
726 self.mark_working();
727 false
728 }
729
730 #[inline]
731 fn mark_working(&self) {
732 unsafe { &*self.working_ptr }.store(1, Ordering::Release);
733 }
734
735 #[inline]
736 fn working(&self) -> bool {
737 unsafe { &*self.working_ptr }.load(Ordering::Acquire) == 1
738 }
739
740 #[inline]
741 fn mark_unstuck(&self) {
742 unsafe { &*self.stuck_ptr }.store(0, Ordering::Release);
743 }
744
745 #[inline]
746 fn mark_stuck(&self) {
747 unsafe { &*self.stuck_ptr }.store(1, Ordering::Release);
748 }
749
750 #[inline]
751 fn stuck(&self) -> bool {
752 unsafe { &*self.stuck_ptr }.load(Ordering::Acquire) == 1
753 }
754}
755
756#[cfg(test)]
757mod tests {
758 use std::time::Duration;
759
760 use super::*;
761
762 #[cfg(feature = "monoio")]
763 use monoio::time::sleep;
764 #[cfg(all(feature = "tokio", not(feature = "monoio")))]
765 use tokio::time::sleep;
766
767 macro_rules! test {
768 ($($i: item)*) => {$(
769 #[cfg(feature = "monoio")]
770 #[monoio::test(timer_enabled = true)]
771 $i
772
773 #[cfg(all(feature = "tokio", not(feature = "monoio")))]
774 #[tokio::test]
775 $i
776 )*};
777 }
778
779 test! {
780 async fn demo_wake() {
781 let (mut tx, mut rx) = channel::<()>();
782
783 let (q_read, meta) = Queue::<u8>::new(1024).unwrap();
784 let q_write = unsafe { Queue::<u8>::new_from_meta(&meta) }.unwrap();
785 let q_read = q_read.read();
786 let q_write = q_write.write().unwrap();
787
788 let _guard = q_read
789 .run_handler(move |item| {
790 if item == 2 {
791 rx.close();
792 }
793 })
794 .unwrap();
795
796 q_write.push(1);
797 sleep(Duration::from_secs(1)).await;
798 q_write.push(2);
799 tx.closed().await;
800 }
801
802 async fn demo_stuck() {
803 let (mut tx, mut rx) = channel::<()>();
804
805 let (q_read, meta) = Queue::<u8>::new(1).unwrap();
806 let q_write = unsafe { Queue::<u8>::new_from_meta(&meta) }.unwrap();
807 let q_read = q_read.read();
808 let q_write = q_write.write().unwrap();
809
810 let _guard = q_read
811 .run_handler(move |item| {
812 if item == 4 {
813 rx.close();
814 }
815 })
816 .unwrap();
817 println!("pushed {}", q_write.push(1));
818 println!("pushed {}", q_write.push(2));
819 println!("pushed {}", q_write.push(3));
820 println!("pushed {}", q_write.push(4));
821 sleep(Duration::from_secs(1)).await;
822
823 tx.closed().await;
824 }
825 }
826}