Skip to main content

motorcortex_rust/blocking/
subscription.rs

1//! Blocking wrapper around [`crate::core::Subscription`].
2//!
3//! `read` / `read_all` / `notify` / `id` / `name` are forwarded
4//! directly — they're sync on the async type too, so no runtime is
5//! involved. `latest` uses the shared `Runtime` handed in by
6//! [`super::Subscribe`] to block on the async future. `stream()` is
7//! exposed as a blocking [`Iterator`] — each `.next()` call blocks
8//! on the next broadcast frame via the same runtime.
9
10use std::pin::Pin;
11use std::sync::Arc;
12
13use futures::{Stream, StreamExt};
14use tokio::runtime::Runtime;
15
16use crate::TimeSpec;
17use crate::core::{self, StreamResult};
18use crate::error::Result;
19use crate::parameter_value::{GetParameterTuple, GetParameterValue};
20
21pub struct Subscription {
22    inner: core::Subscription,
23    rt: Arc<Runtime>,
24}
25
26impl Subscription {
27    pub(crate) fn new(inner: core::Subscription, rt: Arc<Runtime>) -> Self {
28        Self { inner, rt }
29    }
30
31    pub(crate) fn into_inner(self) -> core::Subscription {
32        self.inner
33    }
34
35    pub fn id(&self) -> u32 {
36        self.inner.id()
37    }
38
39    pub fn name(&self) -> &str {
40        self.inner.name()
41    }
42
43    pub fn read<V>(&self) -> Option<(TimeSpec, V)>
44    where
45        V: GetParameterTuple,
46    {
47        self.inner.read()
48    }
49
50    pub fn read_all<V>(&self) -> Option<(TimeSpec, Vec<V>)>
51    where
52        V: GetParameterValue + Default,
53    {
54        self.inner.read_all()
55    }
56
57    pub fn notify<F>(&self, cb: F)
58    where
59        F: Fn(&core::Subscription) + Send + Sync + 'static,
60    {
61        self.inner.notify(cb);
62    }
63
64    /// Blocking analogue of [`core::Subscription::latest`].
65    pub fn latest<V>(&self) -> Result<(TimeSpec, V)>
66    where
67        V: GetParameterTuple,
68    {
69        self.rt.block_on(self.inner.latest::<V>())
70    }
71
72    /// Blocking iterator over the broadcast ring. Each `next()`
73    /// call blocks on the next item via the shared runtime. The
74    /// iterator ends (returns `None`) when the broadcast channel
75    /// closes — i.e. when every `Subscription` clone is dropped.
76    pub fn iter<V>(&self, capacity: usize) -> StreamIter<V>
77    where
78        V: GetParameterTuple + Send + 'static,
79    {
80        StreamIter {
81            rt: Arc::clone(&self.rt),
82            stream: Box::pin(self.inner.stream::<V>(capacity)),
83        }
84    }
85}
86
87impl Clone for Subscription {
88    fn clone(&self) -> Self {
89        Self {
90            inner: self.inner.clone(),
91            rt: Arc::clone(&self.rt),
92        }
93    }
94}
95
96/// Blocking iterator returned by [`Subscription::iter`].
97pub struct StreamIter<V>
98where
99    V: GetParameterTuple + Send + 'static,
100{
101    rt: Arc<Runtime>,
102    stream: Pin<Box<dyn Stream<Item = StreamResult<V>> + Send>>,
103}
104
105impl<V> Iterator for StreamIter<V>
106where
107    V: GetParameterTuple + Send + 'static,
108{
109    type Item = StreamResult<V>;
110
111    fn next(&mut self) -> Option<Self::Item> {
112        self.rt.block_on(self.stream.next())
113    }
114}