unros_core/pubsub/
subs.rs

1//! Subscriptions are produced by Subscribers with the intent of being consumed by Publishers.
2//! However, Subscriptions can be manipulated in the same way as iterators to change its generic
3//! type, allowing Subscriptions and Publishers with different generic types to connect.
4
5use std::{
6    marker::PhantomData,
7    ops::{Deref, DerefMut},
8    sync::{atomic::Ordering, Weak},
9};
10
11use log::warn;
12
13use super::SubscriberInner;
14
15/// A token produced only by Publishers, ensuring that only Unros
16/// can call specific methods.
17pub struct PublisherToken<'a>(pub(super) PhantomData<&'a ()>);
18
19/// A trait for all Subscriptions, similar to the `Iterator` trait.
20pub trait Subscription {
21    type Item;
22
23    /// Places a value into this Subscription.
24    /// 
25    /// Returns `true` iff the value could be consumed. If `false` is returned,
26    /// this `Subscription` should be dropped by the caller.
27    fn push(&mut self, value: Self::Item, token: PublisherToken) -> bool;
28
29    /// Changes the generic type of this `Subscription` using the given `map` function.
30    fn map<F, O>(self, map: F) -> Map<Self, F, O>
31    where
32        Self: Sized,
33        F: FnMut(O) -> Self::Item,
34    {
35        Map {
36            inner: self,
37            map,
38            _phantom: PhantomData,
39        }
40    }
41
42    /// Changes the generic type of this `Subscription` using the given `filter_map` function.
43    ///
44    /// If the function returns `None`, the value will not be published.
45    fn filter_map<F, O>(self, map: F) -> FilterMap<Self, F, O>
46    where
47        Self: Sized,
48        F: FnMut(O) -> Self::Item,
49    {
50        FilterMap {
51            inner: self,
52            map,
53            _phantom: PhantomData,
54        }
55    }
56
57    /// Convenience method to box this subscription.
58    fn boxed(self) -> BoxedSubscription<Self::Item>
59    where
60        Self: Sized + Send + 'static,
61    {
62        Box::new(self)
63    }
64
65    // fn zip<V: 'static>(mut self, mut other: DirectSubscription<V>) -> DirectSubscription<(T, V)> where Self: Sized {
66    // self.pub_count.append(&mut other.pub_count);
67    // DirectSubscription {
68    //     queue: Box::new(move |(left, right)| {
69    //         let left_result = self.queue.push(left);
70    //         let right_result = other.queue.push(right);
71    //         match left_result {
72    //             EnqueueResult::Ok => right_result,
73    //             EnqueueResult::Full => {
74    //                 if right_result == EnqueueResult::Closed {
75    //                     EnqueueResult::Closed
76    //                 } else {
77    //                     EnqueueResult::Full
78    //                 }
79    //             }
80    //             EnqueueResult::Closed => EnqueueResult::Closed,
81    //         }
82    //     }),
83    //     notify: self.notify,
84    //     lag: 0,
85    //     name: None,
86    //     pub_count: self.pub_count,
87    // }
88    // }
89
90    /// Provides a name to this subscription, which enables lag logging.
91    ///
92    /// If the `Publisher` that accepts this `Subscription` cannot push
93    /// new messages into this `Subscription` without deleting old message,
94    /// we say that the `Subscription` is lagging. Catching lagging is important
95    /// as it indicates data loss and a lack of processing speed. With a name,
96    /// these lags will be logged as warnings in the standard log file (`.log`).
97    #[must_use]
98    fn set_name(mut self, name: impl Into<String>) -> Self
99    where
100        Self: Sized,
101    {
102        self.set_name_mut(name.into().into_boxed_str());
103        self
104    }
105
106    /// Analagous to `set_name`, except that mutation is done through a mutable reference.
107    fn set_name_mut(&mut self, name: Box<str>);
108
109    /// Increments the publisher count of the `Subscriber`, which is important for it to know
110    /// when no more publishers are connected to it.
111    fn increment_publishers(&self, token: PublisherToken);
112
113    /// Decrements the publisher count of the `Subscriber`, which is important for it to know
114    /// when no more publishers are connected to it.
115    fn decrement_publishers(&self, token: PublisherToken);
116}
117
118/// An object that must be passed to a `Publisher`, enabling the `Subscriber`
119/// that created the subscription to receive messages from that `Publisher`.
120///
121/// If dropped, no change will occur to the `Subscriber` and no resources will be leaked.
122pub struct DirectSubscription<T> {
123    pub(super) sub: Weak<SubscriberInner<T>>,
124    pub(super) name: Option<Box<str>>,
125    pub(super) lag: usize,
126}
127
128impl<T> Clone for DirectSubscription<T> {
129    fn clone(&self) -> Self {
130        Self {
131            sub: self.sub.clone(),
132            name: self.name.clone(),
133            lag: self.lag.clone(),
134        }
135    }
136}
137
138impl<T> Subscription for DirectSubscription<T> {
139    type Item = T;
140
141    fn push(&mut self, value: Self::Item, _token: PublisherToken) -> bool {
142        if let Some(sub) = self.sub.upgrade() {
143            if sub.queue.force_push(value).is_some() {
144                self.lag += 1;
145                if let Some(name) = &self.name {
146                    warn!(target: "publishers", "{name} lagging by {} messages", self.lag);
147                }
148            } else {
149                self.lag = 0;
150                sub.notify.notify_one();
151            }
152            true
153        } else {
154            false
155        }
156    }
157
158    fn set_name_mut(&mut self, name: Box<str>)
159    where
160        Self: Sized,
161    {
162        self.name = Some(name);
163    }
164
165    fn increment_publishers(&self, _token: PublisherToken) {
166        if let Some(sub) = self.sub.upgrade() {
167            sub.pub_count.fetch_add(1, Ordering::AcqRel);
168        }
169    }
170
171    fn decrement_publishers(&self, _token: PublisherToken) {
172        if let Some(sub) = self.sub.upgrade() {
173            sub.pub_count.fetch_sub(1, Ordering::AcqRel);
174            sub.notify.notify_one();
175        }
176    }
177}
178
179pub struct Map<I, F, O> {
180    inner: I,
181    map: F,
182    _phantom: PhantomData<O>,
183}
184
185impl<O, I, F> Subscription for Map<I, F, O>
186where
187    I: Subscription,
188    F: FnMut(O) -> I::Item,
189{
190    type Item = O;
191
192    fn push(&mut self, value: Self::Item, token: PublisherToken) -> bool {
193        self.inner.push((self.map)(value), token)
194    }
195
196    fn set_name_mut(&mut self, name: Box<str>) {
197        self.inner.set_name_mut(name);
198    }
199
200    fn increment_publishers(&self, token: PublisherToken) {
201        self.inner.increment_publishers(token);
202    }
203
204    fn decrement_publishers(&self, token: PublisherToken) {
205        self.inner.decrement_publishers(token);
206    }
207}
208
209impl<I: Clone, F: Clone, O> Clone for Map<I, F, O> {
210    fn clone(&self) -> Self {
211        Self {
212            inner: self.inner.clone(),
213            map: self.map.clone(),
214            _phantom: PhantomData,
215        }
216    }
217}
218
219pub struct FilterMap<I, F, O> {
220    inner: I,
221    map: F,
222    _phantom: PhantomData<O>,
223}
224
225impl<O, I, F> Subscription for FilterMap<I, F, O>
226where
227    I: Subscription,
228    F: FnMut(O) -> Option<I::Item>,
229{
230    type Item = O;
231
232    fn push(&mut self, value: Self::Item, token: PublisherToken) -> bool {
233        if let Some(value) = (self.map)(value) {
234            self.inner.push(value, token)
235        } else {
236            true
237        }
238    }
239
240    fn set_name_mut(&mut self, name: Box<str>) {
241        self.inner.set_name_mut(name);
242    }
243
244    fn increment_publishers(&self, token: PublisherToken) {
245        self.inner.increment_publishers(token);
246    }
247
248    fn decrement_publishers(&self, token: PublisherToken) {
249        self.inner.decrement_publishers(token);
250    }
251}
252
253impl<I: Clone, F: Clone, O> Clone for FilterMap<I, F, O> {
254    fn clone(&self) -> Self {
255        Self {
256            inner: self.inner.clone(),
257            map: self.map.clone(),
258            _phantom: PhantomData,
259        }
260    }
261}
262
263pub type BoxedSubscription<T> = Box<dyn Subscription<Item = T> + Send>;
264
265impl<T> Subscription for BoxedSubscription<T> {
266    type Item = T;
267
268    fn push(&mut self, value: Self::Item, token: PublisherToken) -> bool {
269        self.deref_mut().push(value, token)
270    }
271
272    fn set_name_mut(&mut self, name: Box<str>) {
273        self.deref_mut().set_name_mut(name);
274    }
275
276    fn increment_publishers(&self, token: PublisherToken) {
277        self.deref().increment_publishers(token);
278    }
279
280    fn decrement_publishers(&self, token: PublisherToken) {
281        self.deref().decrement_publishers(token);
282    }
283}