freertos_rust/patterns/
pub_sub.rs

1use crate::base::*;
2use crate::mutex::*;
3use crate::prelude::v1::*;
4use crate::queue::*;
5use crate::units::*;
6
7/// A pub-sub queue. An item sent to the publisher is sent to every subscriber.
8pub struct QueuePublisher<T: Sized + Copy + Send> {
9    inner: Arc<Mutex<PublisherInner<T>>>,
10}
11
12/// A subscribtion to the publisher.
13pub struct QueueSubscriber<T: Sized + Copy + Send> {
14    inner: Arc<SubscriberInner<T>>,
15}
16
17impl<T: Sized + Copy + Send> QueuePublisher<T> {
18    /// Create a new publisher
19    pub fn new() -> Result<QueuePublisher<T>, FreeRtosError> {
20        let inner = PublisherInner {
21            subscribers: Vec::new(),
22            queue_next_id: 1,
23        };
24
25        Ok(QueuePublisher {
26            inner: Arc::new(Mutex::new(inner)?),
27        })
28    }
29
30    /// Send an item to every subscriber. Returns the number of
31    /// subscribers that have received the item.
32    pub fn send<D: DurationTicks>(&self, item: T, max_wait: D) -> usize {
33        let mut sent_to = 0;
34
35        if let Ok(m) = self.inner.lock(max_wait) {
36            for subscriber in &m.subscribers {
37                if let Ok(_) = subscriber.queue.send(item, max_wait) {
38                    sent_to += 1;
39                }
40            }
41        }
42
43        sent_to
44    }
45
46    /// Subscribe to this publisher. Can accept a fixed amount of items.
47    pub fn subscribe<D: DurationTicks>(
48        &self,
49        max_size: usize,
50        create_max_wait: D,
51    ) -> Result<QueueSubscriber<T>, FreeRtosError> {
52        let mut inner = self.inner.lock(create_max_wait)?;
53
54        let queue = Queue::new(max_size)?;
55
56        let id = inner.queue_next_id;
57        inner.queue_next_id += 1;
58
59        let subscriber = SubscriberInner {
60            id: id,
61            queue: queue,
62            publisher: self.inner.clone(),
63        };
64        let subscriber = Arc::new(subscriber);
65
66        inner.subscribers.push(subscriber.clone());
67
68        Ok(QueueSubscriber { inner: subscriber })
69    }
70}
71
72impl<T: Sized + Copy + Send> Clone for QueuePublisher<T> {
73    fn clone(&self) -> Self {
74        QueuePublisher {
75            inner: self.inner.clone(),
76        }
77    }
78}
79
80impl<T: Sized + Copy + Send> Drop for QueueSubscriber<T> {
81    fn drop(&mut self) {
82        if let Ok(mut l) = self.inner.publisher.lock(Duration::infinite()) {
83            l.unsubscribe(&self.inner);
84        }
85    }
86}
87
88impl<T: Sized + Copy + Send> QueueSubscriber<T> {
89    /// Wait for an item to be posted from the publisher.
90    pub fn receive<D: DurationTicks>(&self, max_wait: D) -> Result<T, FreeRtosError> {
91        self.inner.queue.receive(max_wait)
92    }
93}
94
95struct PublisherInner<T: Sized + Copy + Send> {
96    subscribers: Vec<Arc<SubscriberInner<T>>>,
97    queue_next_id: usize,
98}
99
100impl<T: Sized + Copy + Send> PublisherInner<T> {
101    fn unsubscribe(&mut self, subscriber: &SubscriberInner<T>) {
102        self.subscribers.retain(|ref x| x.id != subscriber.id);
103    }
104}
105
106struct SubscriberInner<T: Sized + Copy + Send> {
107    id: usize,
108    queue: Queue<T>,
109    publisher: Arc<Mutex<PublisherInner<T>>>,
110}