ach_pubsub/
heapless.rs

1use ach_array::Array;
2pub use ach_array::Ref;
3use ach_ring::Ring;
4use util::Error;
5
6pub struct Subscriber<'a, T, const N: usize> {
7    ch: Ref<'a, Ring<T, N>>,
8}
9impl<'a, T, const N: usize> Subscriber<'a, T, N> {
10    /// Removes the first element and returns it.
11    ///
12    /// Returns Err if the Ring is empty.
13    pub fn try_recv(&self) -> Result<T, Error<()>> {
14        self.ch.pop()
15    }
16}
17impl<'a, T, const N: usize> Drop for Subscriber<'a, T, N> {
18    fn drop(&mut self) {
19        self.ch.remove();
20    }
21}
22
23pub struct Publisher<T, const NT: usize, const NS: usize> {
24    subscribers: Array<Ring<T, NT>, NS>,
25    strict: bool,
26}
27impl<T, const NT: usize, const NS: usize> Publisher<T, NT, NS> {
28    /// It will wait all subscriber ready when `send`, if strict is `true`.
29    pub const fn new(strict: bool) -> Publisher<T, NT, NS> {
30        Self {
31            subscribers: Array::new(),
32            strict,
33        }
34    }
35    pub fn subscribe(&self) -> Option<Subscriber<T, NT>> {
36        let subscriber = Ring::new();
37        if let Ok(i) = self.subscribers.push(subscriber) {
38            let sub = self.subscribers[i].get().unwrap();
39            Some(Subscriber { ch: sub })
40        } else {
41            None
42        }
43    }
44}
45impl<T: Clone, const NT: usize, const NS: usize> Publisher<T, NT, NS> {
46    /// return success times
47    ///
48    /// Notice: `Spin` if strict
49    pub fn send(&self, val: T) -> usize {
50        let mut success: usize = 0;
51        let mut send = None;
52        for sub in self.subscribers.iter(self.strict) {
53            let value = if let Some(v) = send.take() {
54                v
55            } else {
56                val.clone()
57            };
58            if let Err(v) = sub.push(value) {
59                send = Some(v.input);
60            } else {
61                success += 1
62            }
63        }
64        success
65    }
66}