async_ach_pubsub/
heapless.rs1use ach_pubsub::heapless as ach;
2use ach_util::Error;
3use async_ach_notify::{Listener, Notify};
4use core::future::Future;
5use core::pin::Pin;
6use core::task::{Context, Poll};
7use futures_util::Stream;
8
9pub struct Subscriber<'a, T, const N: usize, const MC: usize> {
10 parent: &'a Publisher<T, N, MC>,
11 ch: ach::Subscriber<'a, T, N>,
12}
13impl<'a, T, const N: usize, const MC: usize> Subscriber<'a, T, N, MC> {
14 pub fn try_recv(&self) -> Result<T, Error<()>> {
18 self.ch.try_recv()
19 }
20 pub fn recv<'b>(&'b self) -> Recv<'a, 'b, T, N, MC> {
21 Recv {
22 parent: self,
23 wait: self.parent.producer.listen(),
24 }
25 }
26}
27pub struct Recv<'a, 'b, T, const N: usize, const MC: usize> {
28 parent: &'b Subscriber<'a, T, N, MC>,
29 wait: Listener<'b, MC>,
30}
31impl<'a, 'b, T, const N: usize, const MC: usize> Stream for Recv<'a, 'b, T, N, MC> {
32 type Item = T;
33 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
34 if let Ok(data) = self.parent.try_recv() {
35 Poll::Ready(Some(data))
36 } else {
37 let _ = Pin::new(&mut self.wait).poll_next(cx);
38 if let Ok(data) = self.parent.try_recv() {
39 Poll::Ready(Some(data))
40 } else {
41 Poll::Pending
42 }
43 }
44 }
45}
46impl<'a, 'b, T, const N: usize, const MC: usize> Future for Recv<'a, 'b, T, N, MC> {
47 type Output = T;
48 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
49 match self.poll_next(cx) {
50 Poll::Ready(Some(val)) => Poll::Ready(val),
51 Poll::Ready(None) => Poll::Pending,
52 Poll::Pending => Poll::Pending,
53 }
54 }
55}
56
57pub struct Publisher<T, const N: usize, const MC: usize> {
58 ch: ach::Publisher<T, N, MC>,
59 producer: Notify<MC>,
60}
61impl<T, const N: usize, const MC: usize> Publisher<T, N, MC> {
62 pub const fn new(strict: bool) -> Self {
63 Self {
64 ch: ach::Publisher::new(strict),
65 producer: Notify::new(),
66 }
67 }
68 pub fn subscribe(&self) -> Option<Subscriber<T, N, MC>> {
69 if let Some(sub) = self.ch.subscribe() {
70 Some(Subscriber {
71 parent: self,
72 ch: sub,
73 })
74 } else {
75 None
76 }
77 }
78}
79impl<T: Clone, const N: usize, const MC: usize> Publisher<T, N, MC> {
80 pub fn send(&self, val: T) -> usize {
84 let num = self.ch.send(val);
85 if num != 0 {
86 self.producer.notify_waiters();
87 }
88 num
89 }
90}