async_ach_spsc/
heapless.rs1use ach_spsc as ach;
2use async_ach_notify::Notify;
3use futures_util::StreamExt;
4
5pub struct Spsc<T, const N: usize> {
6 buf: ach::Spsc<T, N>,
7 consumer: Notify<1>,
8 producer: Notify<1>,
9}
10impl<T, const N: usize> Spsc<T, N> {
11 pub const fn new() -> Self {
12 Self {
13 buf: ach::Spsc::new(),
14 consumer: Notify::new(),
15 producer: Notify::new(),
16 }
17 }
18}
19impl<T: Unpin, const N: usize> Spsc<T, N> {
20 pub fn take_sender(&self) -> Option<Sender<T, N>> {
21 let sender = self.buf.take_sender()?;
22 Some(Sender {
23 parent: self,
24 sender,
25 })
26 }
27 pub fn take_recver(&self) -> Option<Receiver<T, N>> {
28 let recver = self.buf.take_recver()?;
29 Some(Receiver {
30 parent: self,
31 recver,
32 })
33 }
34}
35
36pub struct Sender<'a, T: Unpin, const N: usize> {
37 parent: &'a Spsc<T, N>,
38 sender: ach::Sender<'a, T, N>,
39}
40impl<'a, T: Unpin, const N: usize> Sender<'a, T, N> {
41 pub fn try_send(&mut self, val: T) -> Result<(), T> {
42 self.sender.try_send(val).map(|_| {
43 self.parent.producer.notify_one();
44 })
45 }
46 pub async fn send<'b>(&'b mut self, mut val: T) {
47 let mut wait_c = self.parent.consumer.listen();
48 loop {
49 if let Err(v) = self.try_send(val) {
50 val = v;
51 wait_c.next().await;
52 } else {
53 break;
54 }
55 }
56 }
57}
58
59pub struct Receiver<'a, T, const N: usize> {
60 parent: &'a Spsc<T, N>,
61 recver: ach::Receiver<'a, T, N>,
62}
63impl<'a, T: Unpin, const N: usize> Receiver<'a, T, N> {
64 pub fn try_recv(&mut self) -> Option<T> {
65 self.recver.try_recv().map(|v| {
66 self.parent.consumer.notify_one();
67 v
68 })
69 }
70 pub async fn recv<'b>(&'b mut self) -> T {
71 let mut wait_p = self.parent.producer.listen();
72 loop {
73 if let Some(v) = self.try_recv() {
74 break v;
75 } else {
76 wait_p.next().await;
77 }
78 }
79 }
80}