motorcortex_rust/blocking/
subscription.rs1use std::pin::Pin;
11use std::sync::Arc;
12
13use futures::{Stream, StreamExt};
14use tokio::runtime::Runtime;
15
16use crate::TimeSpec;
17use crate::core::{self, StreamResult};
18use crate::error::Result;
19use crate::parameter_value::{GetParameterTuple, GetParameterValue};
20
21pub struct Subscription {
22 inner: core::Subscription,
23 rt: Arc<Runtime>,
24}
25
26impl Subscription {
27 pub(crate) fn new(inner: core::Subscription, rt: Arc<Runtime>) -> Self {
28 Self { inner, rt }
29 }
30
31 pub(crate) fn into_inner(self) -> core::Subscription {
32 self.inner
33 }
34
35 pub fn id(&self) -> u32 {
36 self.inner.id()
37 }
38
39 pub fn name(&self) -> &str {
40 self.inner.name()
41 }
42
43 pub fn read<V>(&self) -> Option<(TimeSpec, V)>
44 where
45 V: GetParameterTuple,
46 {
47 self.inner.read()
48 }
49
50 pub fn read_all<V>(&self) -> Option<(TimeSpec, Vec<V>)>
51 where
52 V: GetParameterValue + Default,
53 {
54 self.inner.read_all()
55 }
56
57 pub fn notify<F>(&self, cb: F)
58 where
59 F: Fn(&core::Subscription) + Send + Sync + 'static,
60 {
61 self.inner.notify(cb);
62 }
63
64 pub fn latest<V>(&self) -> Result<(TimeSpec, V)>
66 where
67 V: GetParameterTuple,
68 {
69 self.rt.block_on(self.inner.latest::<V>())
70 }
71
72 pub fn iter<V>(&self, capacity: usize) -> StreamIter<V>
77 where
78 V: GetParameterTuple + Send + 'static,
79 {
80 StreamIter {
81 rt: Arc::clone(&self.rt),
82 stream: Box::pin(self.inner.stream::<V>(capacity)),
83 }
84 }
85}
86
87impl Clone for Subscription {
88 fn clone(&self) -> Self {
89 Self {
90 inner: self.inner.clone(),
91 rt: Arc::clone(&self.rt),
92 }
93 }
94}
95
96pub struct StreamIter<V>
98where
99 V: GetParameterTuple + Send + 'static,
100{
101 rt: Arc<Runtime>,
102 stream: Pin<Box<dyn Stream<Item = StreamResult<V>> + Send>>,
103}
104
105impl<V> Iterator for StreamIter<V>
106where
107 V: GetParameterTuple + Send + 'static,
108{
109 type Item = StreamResult<V>;
110
111 fn next(&mut self) -> Option<Self::Item> {
112 self.rt.block_on(self.stream.next())
113 }
114}