freertos_rust/patterns/
pub_sub.rs

1use crate::prelude::v1::*;
2use crate::base::*;
3use crate::mutex::*;
4use crate::queue::*;
5use crate::units::*;
6
7
8/// A pub-sub queue. An item sent to the publisher is sent to every subscriber.
9pub struct QueuePublisher<T: Sized + Copy> {
10    inner: Arc<Mutex<PublisherInner<T>>>,
11}
12
13/// A subscribtion to the publisher.
14pub struct QueueSubscriber<T: Sized + Copy> {
15    inner: Arc<SubscriberInner<T>>,
16}
17
18impl<T: Sized + Copy> QueuePublisher<T> {
19    /// Create a new publisher
20    pub fn new() -> Result<QueuePublisher<T>, FreeRtosError> {
21        let inner = PublisherInner {
22            subscribers: Vec::new(),
23            queue_next_id: 1,
24        };
25
26        Ok(QueuePublisher { inner: Arc::new(Mutex::new(inner)?) })
27    }
28
29    /// Send an item to every subscriber. Returns the number of
30    /// subscribers that have received the item.
31    pub fn send<D: DurationTicks>(&self, item: T, max_wait: D) -> usize {
32        let mut sent_to = 0;
33
34        if let Ok(m) = self.inner.lock(max_wait) {
35            for subscriber in &m.subscribers {
36                if let Ok(_) = subscriber.queue.send(item, max_wait) {
37                    sent_to += 1;
38                }
39            }
40        }
41
42        sent_to
43    }
44
45    /// Subscribe to this publisher. Can accept a fixed amount of items.
46    pub fn subscribe<D: DurationTicks>(&self,
47                     max_size: usize,
48                     create_max_wait: D)
49                     -> Result<QueueSubscriber<T>, FreeRtosError> {
50        let mut inner = self.inner.lock(create_max_wait)?;
51
52        let queue = Queue::new(max_size)?;
53
54        let id = inner.queue_next_id;
55        inner.queue_next_id += 1;
56
57        let subscriber = SubscriberInner {
58            id: id,
59            queue: queue,
60            publisher: self.inner.clone(),
61        };
62        let subscriber = Arc::new(subscriber);
63
64        inner.subscribers.push(subscriber.clone());
65
66        Ok(QueueSubscriber { inner: subscriber })
67    }
68}
69
70impl<T: Sized + Copy> Clone for QueuePublisher<T> {
71    fn clone(&self) -> Self {
72        QueuePublisher { inner: self.inner.clone() }
73    }
74}
75
76impl<T: Sized + Copy> Drop for QueueSubscriber<T> {
77    fn drop(&mut self) {
78        if let Ok(mut l) = self.inner.publisher.lock(Duration::infinite()) {
79            l.unsubscribe(&self.inner);
80        }
81    }
82}
83
84
85impl<T: Sized + Copy> QueueSubscriber<T> {
86    /// Wait for an item to be posted from the publisher.
87    pub fn receive<D: DurationTicks>(&self, max_wait: D) -> Result<T, FreeRtosError> {
88        self.inner.queue.receive(max_wait)
89    }
90}
91
92struct PublisherInner<T: Sized + Copy> {
93    subscribers: Vec<Arc<SubscriberInner<T>>>,
94    queue_next_id: usize,
95}
96
97impl<T: Sized + Copy> PublisherInner<T> {
98    fn unsubscribe(&mut self, subscriber: &SubscriberInner<T>) {
99        self.subscribers.retain(|ref x| x.id != subscriber.id);
100    }
101}
102
103struct SubscriberInner<T: Sized + Copy> {
104    id: usize,
105    queue: Queue<T>,
106    publisher: Arc<Mutex<PublisherInner<T>>>,
107}