async_ach_pubsub/
heapless.rs

1use ach_pubsub::heapless as ach;
2use ach_util::Error;
3use async_ach_notify::{Listener, Notify};
4use core::future::Future;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7use futures_util::Stream;
8
9pub struct Subscriber<'a, T, const N: usize, const MC: usize> {
10    parent: &'a Publisher<T, N, MC>,
11    ch: ach::Subscriber<'a, T, N>,
12}
13impl<'a, T, const N: usize, const MC: usize> Subscriber<'a, T, N, MC> {
14    /// Removes the first element and returns it.
15    ///
16    /// Returns Err if the Ring is empty.
17    pub fn try_recv(&self) -> Result<T, Error<()>> {
18        self.ch.try_recv()
19    }
20    pub fn recv<'b>(&'b self) -> Recv<'a, 'b, T, N, MC> {
21        Recv {
22            parent: self,
23            wait: self.parent.producer.listen(),
24        }
25    }
26}
27pub struct Recv<'a, 'b, T, const N: usize, const MC: usize> {
28    parent: &'b Subscriber<'a, T, N, MC>,
29    wait: Listener<'b, MC>,
30}
31impl<'a, 'b, T, const N: usize, const MC: usize> Stream for Recv<'a, 'b, T, N, MC> {
32    type Item = T;
33    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34        if let Ok(data) = self.parent.try_recv() {
35            Poll::Ready(Some(data))
36        } else {
37            let _ = Pin::new(&mut self.wait).poll_next(cx);
38            if let Ok(data) = self.parent.try_recv() {
39                Poll::Ready(Some(data))
40            } else {
41                Poll::Pending
42            }
43        }
44    }
45}
46impl<'a, 'b, T, const N: usize, const MC: usize> Future for Recv<'a, 'b, T, N, MC> {
47    type Output = T;
48    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
49        match self.poll_next(cx) {
50            Poll::Ready(Some(val)) => Poll::Ready(val),
51            Poll::Ready(None) => Poll::Pending,
52            Poll::Pending => Poll::Pending,
53        }
54    }
55}
56
57pub struct Publisher<T, const N: usize, const MC: usize> {
58    ch: ach::Publisher<T, N, MC>,
59    producer: Notify<MC>,
60}
61impl<T, const N: usize, const MC: usize> Publisher<T, N, MC> {
62    pub const fn new(strict: bool) -> Self {
63        Self {
64            ch: ach::Publisher::new(strict),
65            producer: Notify::new(),
66        }
67    }
68    pub fn subscribe(&self) -> Option<Subscriber<T, N, MC>> {
69        if let Some(sub) = self.ch.subscribe() {
70            Some(Subscriber {
71                parent: self,
72                ch: sub,
73            })
74        } else {
75            None
76        }
77    }
78}
79impl<T: Clone, const N: usize, const MC: usize> Publisher<T, N, MC> {
80    /// return success times
81    ///
82    /// Notice: `Spin` if strict
83    pub fn send(&self, val: T) -> usize {
84        let num = self.ch.send(val);
85        if num != 0 {
86            self.producer.notify_waiters();
87        }
88        num
89    }
90}