freertos_rust/patterns/
pub_sub.rs1use crate::base::*;
2use crate::mutex::*;
3use crate::prelude::v1::*;
4use crate::queue::*;
5use crate::units::*;
6
7pub struct QueuePublisher<T: Sized + Copy + Send> {
9 inner: Arc<Mutex<PublisherInner<T>>>,
10}
11
12pub struct QueueSubscriber<T: Sized + Copy + Send> {
14 inner: Arc<SubscriberInner<T>>,
15}
16
17impl<T: Sized + Copy + Send> QueuePublisher<T> {
18 pub fn new() -> Result<QueuePublisher<T>, FreeRtosError> {
20 let inner = PublisherInner {
21 subscribers: Vec::new(),
22 queue_next_id: 1,
23 };
24
25 Ok(QueuePublisher {
26 inner: Arc::new(Mutex::new(inner)?),
27 })
28 }
29
30 pub fn send<D: DurationTicks>(&self, item: T, max_wait: D) -> usize {
33 let mut sent_to = 0;
34
35 if let Ok(m) = self.inner.lock(max_wait) {
36 for subscriber in &m.subscribers {
37 if let Ok(_) = subscriber.queue.send(item, max_wait) {
38 sent_to += 1;
39 }
40 }
41 }
42
43 sent_to
44 }
45
46 pub fn subscribe<D: DurationTicks>(
48 &self,
49 max_size: usize,
50 create_max_wait: D,
51 ) -> Result<QueueSubscriber<T>, FreeRtosError> {
52 let mut inner = self.inner.lock(create_max_wait)?;
53
54 let queue = Queue::new(max_size)?;
55
56 let id = inner.queue_next_id;
57 inner.queue_next_id += 1;
58
59 let subscriber = SubscriberInner {
60 id: id,
61 queue: queue,
62 publisher: self.inner.clone(),
63 };
64 let subscriber = Arc::new(subscriber);
65
66 inner.subscribers.push(subscriber.clone());
67
68 Ok(QueueSubscriber { inner: subscriber })
69 }
70}
71
72impl<T: Sized + Copy + Send> Clone for QueuePublisher<T> {
73 fn clone(&self) -> Self {
74 QueuePublisher {
75 inner: self.inner.clone(),
76 }
77 }
78}
79
80impl<T: Sized + Copy + Send> Drop for QueueSubscriber<T> {
81 fn drop(&mut self) {
82 if let Ok(mut l) = self.inner.publisher.lock(Duration::infinite()) {
83 l.unsubscribe(&self.inner);
84 }
85 }
86}
87
88impl<T: Sized + Copy + Send> QueueSubscriber<T> {
89 pub fn receive<D: DurationTicks>(&self, max_wait: D) -> Result<T, FreeRtosError> {
91 self.inner.queue.receive(max_wait)
92 }
93}
94
95struct PublisherInner<T: Sized + Copy + Send> {
96 subscribers: Vec<Arc<SubscriberInner<T>>>,
97 queue_next_id: usize,
98}
99
100impl<T: Sized + Copy + Send> PublisherInner<T> {
101 fn unsubscribe(&mut self, subscriber: &SubscriberInner<T>) {
102 self.subscribers.retain(|ref x| x.id != subscriber.id);
103 }
104}
105
106struct SubscriberInner<T: Sized + Copy + Send> {
107 id: usize,
108 queue: Queue<T>,
109 publisher: Arc<Mutex<PublisherInner<T>>>,
110}