async_ach_spsc/
heapless.rs

1use ach_spsc as ach;
2use async_ach_notify::Notify;
3use futures_util::StreamExt;
4
5pub struct Spsc<T, const N: usize> {
6    buf: ach::Spsc<T, N>,
7    consumer: Notify<1>,
8    producer: Notify<1>,
9}
10impl<T, const N: usize> Spsc<T, N> {
11    pub const fn new() -> Self {
12        Self {
13            buf: ach::Spsc::new(),
14            consumer: Notify::new(),
15            producer: Notify::new(),
16        }
17    }
18}
19impl<T: Unpin, const N: usize> Spsc<T, N> {
20    pub fn take_sender(&self) -> Option<Sender<T, N>> {
21        let sender = self.buf.take_sender()?;
22        Some(Sender {
23            parent: self,
24            sender,
25        })
26    }
27    pub fn take_recver(&self) -> Option<Receiver<T, N>> {
28        let recver = self.buf.take_recver()?;
29        Some(Receiver {
30            parent: self,
31            recver,
32        })
33    }
34}
35
36pub struct Sender<'a, T: Unpin, const N: usize> {
37    parent: &'a Spsc<T, N>,
38    sender: ach::Sender<'a, T, N>,
39}
40impl<'a, T: Unpin, const N: usize> Sender<'a, T, N> {
41    pub fn try_send(&mut self, val: T) -> Result<(), T> {
42        self.sender.try_send(val).map(|_| {
43            self.parent.producer.notify_one();
44        })
45    }
46    pub async fn send<'b>(&'b mut self, mut val: T) {
47        let mut wait_c = self.parent.consumer.listen();
48        loop {
49            if let Err(v) = self.try_send(val) {
50                val = v;
51                wait_c.next().await;
52            } else {
53                break;
54            }
55        }
56    }
57}
58
59pub struct Receiver<'a, T, const N: usize> {
60    parent: &'a Spsc<T, N>,
61    recver: ach::Receiver<'a, T, N>,
62}
63impl<'a, T: Unpin, const N: usize> Receiver<'a, T, N> {
64    pub fn try_recv(&mut self) -> Option<T> {
65        self.recver.try_recv().map(|v| {
66            self.parent.consumer.notify_one();
67            v
68        })
69    }
70    pub async fn recv<'b>(&'b mut self) -> T {
71        let mut wait_p = self.parent.producer.listen();
72        loop {
73            if let Some(v) = self.try_recv() {
74                break v;
75            } else {
76                wait_p.next().await;
77            }
78        }
79    }
80}