rart_rs/futures/
trigger.rs1use heapless::Deque;
2use core::task::Waker;
3use core::future::Future;
4use core::pin::Pin;
5use core::task::{Context, Poll};
6use core::sync::atomic::{AtomicUsize, Ordering};
7use crate::common::arc::Arc;
8use crate::common::ArcMutex;
9use crate::common::blocking_mutex::BlockingMutex;
10use crate::common::result::RARTError;
11use crate::Expect;
12
13pub struct Trigger<const TN: usize> {
14 wait_wakers: ArcMutex<Deque<Waker, TN>>,
15 is_triggered: AtomicUsize,
16}
17
18struct Waiter<const TN: usize> {
19 trigger: &'static Trigger<TN>,
20}
21
22impl<const TN: usize> Trigger<TN> {
23 pub fn new() -> Self {
24 Self {
25 wait_wakers: Arc::new(BlockingMutex::new(Deque::new())),
26 is_triggered: AtomicUsize::new(0),
27 }
28 }
29
30 pub fn trigger(&'static self) -> Result<(), RARTError> {
31 let mut wait_wakers = self.wait_wakers.lock()?;
32
33 if self.is_triggered.compare_exchange(0, wait_wakers.len(),
34 Ordering::AcqRel,
35 Ordering::Relaxed).is_ok() {
36 for waker in wait_wakers.iter() {
37 waker.wake_by_ref();
38 }
39 wait_wakers.clear();
40 }
41
42 Ok(())
43 }
44
45 pub async fn wait(&'static self) {
46 Waiter { trigger: &self }.await
47 }
48}
49
50impl<const TN: usize> Future for Waiter<TN> {
51 type Output = ();
52
53 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
54 if self.trigger.is_triggered.load(Ordering::Acquire) == 0 {
55 let mut wait_wakers = self.trigger.wait_wakers.lock().rart_expect("Cannot lock wait_wakers at trigger poll");
56 wait_wakers.push_back(cx.waker().clone()).rart_expect("Cannot push back the wait waker");
57 Poll::Pending
58 } else {
59 self.trigger.is_triggered.fetch_sub(1, Ordering::AcqRel);
60 Poll::Ready(())
61 }
62 }
63}