1use ach_array::Array;
2pub use ach_array::Ref;
3use ach_ring::Ring;
4use util::Error;
5
6pub struct Subscriber<'a, T, const N: usize> {
7 ch: Ref<'a, Ring<T, N>>,
8}
9impl<'a, T, const N: usize> Subscriber<'a, T, N> {
10 pub fn try_recv(&self) -> Result<T, Error<()>> {
14 self.ch.pop()
15 }
16}
17impl<'a, T, const N: usize> Drop for Subscriber<'a, T, N> {
18 fn drop(&mut self) {
19 self.ch.remove();
20 }
21}
22
23pub struct Publisher<T, const NT: usize, const NS: usize> {
24 subscribers: Array<Ring<T, NT>, NS>,
25 strict: bool,
26}
27impl<T, const NT: usize, const NS: usize> Publisher<T, NT, NS> {
28 pub const fn new(strict: bool) -> Publisher<T, NT, NS> {
30 Self {
31 subscribers: Array::new(),
32 strict,
33 }
34 }
35 pub fn subscribe(&self) -> Option<Subscriber<T, NT>> {
36 let subscriber = Ring::new();
37 if let Ok(i) = self.subscribers.push(subscriber) {
38 let sub = self.subscribers[i].get().unwrap();
39 Some(Subscriber { ch: sub })
40 } else {
41 None
42 }
43 }
44}
45impl<T: Clone, const NT: usize, const NS: usize> Publisher<T, NT, NS> {
46 pub fn send(&self, val: T) -> usize {
50 let mut success: usize = 0;
51 let mut send = None;
52 for sub in self.subscribers.iter(self.strict) {
53 let value = if let Some(v) = send.take() {
54 v
55 } else {
56 val.clone()
57 };
58 if let Err(v) = sub.push(value) {
59 send = Some(v.input);
60 } else {
61 success += 1
62 }
63 }
64 success
65 }
66}