freertos_rust/patterns/
pub_sub.rs1use crate::prelude::v1::*;
2use crate::base::*;
3use crate::mutex::*;
4use crate::queue::*;
5use crate::units::*;
6
7
8pub struct QueuePublisher<T: Sized + Copy> {
10 inner: Arc<Mutex<PublisherInner<T>>>,
11}
12
13pub struct QueueSubscriber<T: Sized + Copy> {
15 inner: Arc<SubscriberInner<T>>,
16}
17
18impl<T: Sized + Copy> QueuePublisher<T> {
19 pub fn new() -> Result<QueuePublisher<T>, FreeRtosError> {
21 let inner = PublisherInner {
22 subscribers: Vec::new(),
23 queue_next_id: 1,
24 };
25
26 Ok(QueuePublisher { inner: Arc::new(Mutex::new(inner)?) })
27 }
28
29 pub fn send<D: DurationTicks>(&self, item: T, max_wait: D) -> usize {
32 let mut sent_to = 0;
33
34 if let Ok(m) = self.inner.lock(max_wait) {
35 for subscriber in &m.subscribers {
36 if let Ok(_) = subscriber.queue.send(item, max_wait) {
37 sent_to += 1;
38 }
39 }
40 }
41
42 sent_to
43 }
44
45 pub fn subscribe<D: DurationTicks>(&self,
47 max_size: usize,
48 create_max_wait: D)
49 -> Result<QueueSubscriber<T>, FreeRtosError> {
50 let mut inner = self.inner.lock(create_max_wait)?;
51
52 let queue = Queue::new(max_size)?;
53
54 let id = inner.queue_next_id;
55 inner.queue_next_id += 1;
56
57 let subscriber = SubscriberInner {
58 id: id,
59 queue: queue,
60 publisher: self.inner.clone(),
61 };
62 let subscriber = Arc::new(subscriber);
63
64 inner.subscribers.push(subscriber.clone());
65
66 Ok(QueueSubscriber { inner: subscriber })
67 }
68}
69
70impl<T: Sized + Copy> Clone for QueuePublisher<T> {
71 fn clone(&self) -> Self {
72 QueuePublisher { inner: self.inner.clone() }
73 }
74}
75
76impl<T: Sized + Copy> Drop for QueueSubscriber<T> {
77 fn drop(&mut self) {
78 if let Ok(mut l) = self.inner.publisher.lock(Duration::infinite()) {
79 l.unsubscribe(&self.inner);
80 }
81 }
82}
83
84
85impl<T: Sized + Copy> QueueSubscriber<T> {
86 pub fn receive<D: DurationTicks>(&self, max_wait: D) -> Result<T, FreeRtosError> {
88 self.inner.queue.receive(max_wait)
89 }
90}
91
92struct PublisherInner<T: Sized + Copy> {
93 subscribers: Vec<Arc<SubscriberInner<T>>>,
94 queue_next_id: usize,
95}
96
97impl<T: Sized + Copy> PublisherInner<T> {
98 fn unsubscribe(&mut self, subscriber: &SubscriberInner<T>) {
99 self.subscribers.retain(|ref x| x.id != subscriber.id);
100 }
101}
102
103struct SubscriberInner<T: Sized + Copy> {
104 id: usize,
105 queue: Queue<T>,
106 publisher: Arc<Mutex<PublisherInner<T>>>,
107}