async_ach_notify/
lib.rs

1#![no_std]
2
3use async_ach_waker::pool::{WakerPool, WakerToken};
4use async_ach_waker::WakerEntity;
5use core::future::Future;
6use core::pin::Pin;
7use core::sync::atomic::{AtomicUsize, Ordering::SeqCst};
8use core::task::{Context, Poll};
9use futures_util::Stream;
10
11pub struct Notify<const W: usize> {
12    permit: AtomicUsize,
13    wakers: WakerPool<(), W>,
14}
15impl<const W: usize> Notify<W> {
16    pub const fn new() -> Self {
17        Self {
18            permit: AtomicUsize::new(0),
19            wakers: WakerPool::new(),
20        }
21    }
22    /// Notify a waiter
23    pub fn notify_one(&self) {
24        self.permit.fetch_add(1, SeqCst);
25        self.wakers.wake_one();
26    }
27    pub fn notify_waiters(&self) -> usize {
28        let mut num = 0;
29        loop {
30            self.permit.fetch_add(1, SeqCst);
31            if !self.wakers.wake_one() {
32                self.get_permit();
33                break;
34            } else {
35                num += 1;
36            }
37        }
38        num
39    }
40    /// Had been notified
41    pub fn had_notified(&self) -> bool {
42        self.permit.load(SeqCst) != 0
43    }
44    /// Wait for a notice
45    pub fn listen(&self) -> Listener<'_, W> {
46        Listener {
47            parent: self,
48            token: None,
49        }
50    }
51    fn get_permit(&self) -> bool {
52        self.permit
53            .fetch_update(SeqCst, SeqCst, |x| if x > 0 { Some(x - 1) } else { None })
54            .is_ok()
55    }
56}
57
58pub struct Listener<'a, const W: usize> {
59    parent: &'a Notify<W>,
60    token: Option<WakerToken<'a, (), W>>,
61}
62impl<'a, const W: usize> Listener<'a, W> {
63    pub fn pendable(&mut self) -> bool {
64        if let Some(_) = &self.token {
65            true
66        } else if let Ok(token) = self.parent.wakers.register() {
67            self.token = Some(token);
68            true
69        } else {
70            false
71        }
72    }
73}
74impl<'a, const W: usize> Stream for Listener<'a, W> {
75    type Item = ();
76    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
77        let waker = cx.waker();
78        if self.pendable() {
79            self.token
80                .as_ref()
81                .unwrap()
82                .swap(WakerEntity::new(waker.clone(), ()));
83        } else {
84            waker.wake_by_ref();
85        }
86        if self.parent.get_permit() {
87            Poll::Ready(Some(()))
88        } else {
89            Poll::Pending
90        }
91    }
92}
93impl<'a, const W: usize> Future for Listener<'a, W> {
94    type Output = ();
95    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
96        match self.poll_next(cx) {
97            Poll::Ready(_) => Poll::Ready(()),
98            Poll::Pending => Poll::Pending,
99        }
100    }
101}