1#![no_std]
2
3use async_ach_waker::pool::{WakerPool, WakerToken};
4use async_ach_waker::WakerEntity;
5use core::future::Future;
6use core::pin::Pin;
7use core::sync::atomic::{AtomicUsize, Ordering::SeqCst};
8use core::task::{Context, Poll};
9use futures_util::Stream;
10
11pub struct Notify<const W: usize> {
12 permit: AtomicUsize,
13 wakers: WakerPool<(), W>,
14}
15impl<const W: usize> Notify<W> {
16 pub const fn new() -> Self {
17 Self {
18 permit: AtomicUsize::new(0),
19 wakers: WakerPool::new(),
20 }
21 }
22 pub fn notify_one(&self) {
24 self.permit.fetch_add(1, SeqCst);
25 self.wakers.wake_one();
26 }
27 pub fn notify_waiters(&self) -> usize {
28 let mut num = 0;
29 loop {
30 self.permit.fetch_add(1, SeqCst);
31 if !self.wakers.wake_one() {
32 self.get_permit();
33 break;
34 } else {
35 num += 1;
36 }
37 }
38 num
39 }
40 pub fn had_notified(&self) -> bool {
42 self.permit.load(SeqCst) != 0
43 }
44 pub fn listen(&self) -> Listener<'_, W> {
46 Listener {
47 parent: self,
48 token: None,
49 }
50 }
51 fn get_permit(&self) -> bool {
52 self.permit
53 .fetch_update(SeqCst, SeqCst, |x| if x > 0 { Some(x - 1) } else { None })
54 .is_ok()
55 }
56}
57
58pub struct Listener<'a, const W: usize> {
59 parent: &'a Notify<W>,
60 token: Option<WakerToken<'a, (), W>>,
61}
62impl<'a, const W: usize> Listener<'a, W> {
63 pub fn pendable(&mut self) -> bool {
64 if let Some(_) = &self.token {
65 true
66 } else if let Ok(token) = self.parent.wakers.register() {
67 self.token = Some(token);
68 true
69 } else {
70 false
71 }
72 }
73}
74impl<'a, const W: usize> Stream for Listener<'a, W> {
75 type Item = ();
76 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77 let waker = cx.waker();
78 if self.pendable() {
79 self.token
80 .as_ref()
81 .unwrap()
82 .swap(WakerEntity::new(waker.clone(), ()));
83 } else {
84 waker.wake_by_ref();
85 }
86 if self.parent.get_permit() {
87 Poll::Ready(Some(()))
88 } else {
89 Poll::Pending
90 }
91 }
92}
93impl<'a, const W: usize> Future for Listener<'a, W> {
94 type Output = ();
95 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
96 match self.poll_next(cx) {
97 Poll::Ready(_) => Poll::Ready(()),
98 Poll::Pending => Poll::Pending,
99 }
100 }
101}