Skip to main content

motorcortex_rust/client/
subscription.rs

1use crate::GetParameterValue;
2use crate::msg::{DataType, GroupStatusMsg};
3use crate::parameter_value::GetParameterTuple;
4use crate::parameter_value::decode_parameter_value;
5use chrono::{DateTime, Local, Utc};
6use std::sync::{Arc, RwLock, RwLockReadGuard};
7
8#[derive(Debug, Clone, Copy)]
9pub struct TimeSpec {
10    pub sec: i64,  // Seconds
11    pub nsec: i64, // Nanoseconds
12}
13
14impl TimeSpec {
15    pub fn from_buffer(buffer: &[u8]) -> Option<Self> {
16        // Ensure the buffer has enough size for the TimeSpec struct
17        if buffer.len() < std::mem::size_of::<TimeSpec>() {
18            return None;
19        }
20
21        // Safely cast the buffer into a TimeSpec struct
22        let ts = unsafe { std::ptr::read(buffer.as_ptr() as *const TimeSpec) };
23
24        Some(ts)
25    }
26    pub fn to_date_time(&self) -> DateTime<Local> {
27        // Create a NaiveDateTime from seconds and nanoseconds since the UNIX epoch
28        self.to_utc_date_time().with_timezone(&Local)
29    }
30    pub fn to_utc_date_time(&self) -> DateTime<Utc> {
31        // Create a NaiveDateTime from seconds and nanoseconds since the UNIX epoch
32        DateTime::from_timestamp(self.sec, self.nsec as u32).expect("Invalid TimeSpec")
33    }
34}
35
36pub struct Subscription {
37    data_types: Vec<u32>,
38    description: GroupStatusMsg,
39    buffer: Vec<u8>,
40    callback: Arc<RwLock<Option<Box<dyn Fn(&Subscription) + Send + Sync + 'static>>>>,
41}
42
43impl Subscription {
44    pub fn new(group_msg: GroupStatusMsg) -> Self {
45        Self {
46            data_types: group_msg
47                .params
48                .iter()
49                .map(|param| DataType::try_from(param.info.data_type as i32).unwrap() as u32)
50                .collect(),
51            description: group_msg,
52            buffer: Vec::new(),
53            callback: Arc::new(RwLock::new(None)),
54        }
55    }
56
57    pub fn id(&self) -> u32 {
58        self.description.id
59    }
60    pub fn update(&mut self, buffer: Vec<u8>) {
61        self.buffer = buffer;
62        if let Some(cb) = &*self.callback.read().unwrap() {
63            cb(self); // Call the callback
64        }
65    }
66
67    pub fn read<V>(&self) -> Option<(TimeSpec, V)>
68    where
69        V: GetParameterTuple,
70    {
71        if !self.buffer.is_empty() {
72            const HEADER_LEN: usize = 4usize;
73            let protocol_version = self.buffer[3];
74            if protocol_version == 0 {
75                return self.read_protocol0(&self.buffer[HEADER_LEN..]);
76            }
77            if protocol_version == 1 {
78                return self.read_protocol1(&self.buffer[HEADER_LEN..]);
79            }
80        }
81        None
82    }
83
84    /// Reads all subscribed parameters and returns their values as a flat `Vec<V>`.
85    ///
86    /// Each parameter's elements are decoded individually using `decode_parameter_value`,
87    /// regardless of the number of parameters or their array sizes.
88    /// The caller chooses the output type:
89    ///
90    /// ```ignore
91    /// sub.read_all::<f64>()     // all values as f64
92    /// sub.read_all::<String>()  // all values as their string representation
93    /// sub.read_all::<i64>()     // all values as i64
94    /// ```
95    pub fn read_all<V>(&self) -> Option<(TimeSpec, Vec<V>)>
96    where
97        V: GetParameterValue + Default,
98    {
99        if self.buffer.is_empty() {
100            return None;
101        }
102        const HEADER_LEN: usize = 4usize;
103        let protocol_version = self.buffer[3];
104        if protocol_version != 1 {
105            return None;
106        }
107
108        let buffer = &self.buffer[HEADER_LEN..];
109        let ts = TimeSpec::from_buffer(buffer)?;
110        const TS_SIZE: usize = size_of::<TimeSpec>();
111        let body = &buffer[TS_SIZE..];
112
113        let mut values = Vec::new();
114        let mut cursor = 0usize;
115
116        for (param, &data_type) in self.description.params.iter().zip(self.data_types.iter()) {
117            let size = param.size as usize;
118            let data_size = param.info.data_size as usize;
119            let num_elements = param.info.number_of_elements as usize;
120            let param_bytes = &body[cursor..cursor + size];
121
122            for i in 0..num_elements {
123                let start = i * data_size;
124                let end = start + data_size;
125                let element_bytes = &param_bytes[start..end];
126                values.push(decode_parameter_value::<V>(data_type, element_bytes));
127            }
128
129            cursor += size;
130        }
131
132        Some((ts, values))
133    }
134
135    pub fn notify<F>(&mut self, cb: F)
136    where
137        F: Fn(&Subscription) + Send + 'static + Sync,
138    {
139        let mut callback_lock = self.callback.write().unwrap();
140        *callback_lock = Some(Box::new(cb));
141    }
142
143    pub fn name(&self) -> String {
144        self.description.alias.to_string()
145    }
146
147    fn read_protocol0<V>(&self, _buffer: &[u8]) -> Option<(TimeSpec, V)>
148    where
149        V: GetParameterTuple,
150    {
151        None
152    }
153
154    fn combined_iterator<'a>(
155        &'a self,
156        body: &'a [u8],
157    ) -> impl Iterator<Item = (&'a u32, &'a [u8])> + 'a {
158        self.description
159            .params
160            .iter()
161            .zip(self.data_types.iter())
162            .scan(0, move |cursor, (param, data_type)| {
163                let size = param.size as usize;
164                let slice = &body[*cursor..*cursor + size];
165                *cursor += size; // Advance the cursor
166                Some((data_type, slice)) // Yield the pair (data_type, slice)
167            })
168    }
169
170    fn read_protocol1<V>(&self, buffer: &[u8]) -> Option<(TimeSpec, V)>
171    where
172        V: GetParameterTuple,
173    {
174        let ts = TimeSpec::from_buffer(buffer).unwrap();
175        const TS_SIZE: usize = size_of::<TimeSpec>();
176        let body = &buffer[TS_SIZE..];
177
178        Some((ts, V::get_parameters(self.combined_iterator(body)).unwrap()))
179    }
180}
181
182pub struct ReadOnlySubscription {
183    inner: Arc<RwLock<Subscription>>, // Internal shared ownership
184}
185
186impl ReadOnlySubscription {
187    /// Creates a new readonly wrapper around the `Arc<RwLock<Subscription>>`.
188    pub fn new(subscription: Arc<RwLock<Subscription>>) -> Self {
189        Self {
190            inner: subscription,
191        }
192    }
193
194    /// Provides thread-safe, readonly access to the subscription.
195    pub fn get(&self) -> RwLockReadGuard<'_, Subscription> {
196        self.inner.read().unwrap()
197    }
198
199    /// Optionally, expose specific readonly fields for convenience
200    pub fn read<V>(&self) -> Option<V>
201    where
202        V: GetParameterTuple,
203    {
204        self.inner.read().unwrap().read().map(|(_, v)| v)
205    }
206
207    /// Reads all subscribed parameters as a flat `Vec<V>`.
208    /// See [`Subscription::read_all`] for details.
209    pub fn read_all<V>(&self) -> Option<(TimeSpec, Vec<V>)>
210    where
211        V: GetParameterValue + Default,
212    {
213        self.inner.read().unwrap().read_all()
214    }
215
216    pub fn notify<F>(&self, cb: F)
217    where
218        F: Fn(&Subscription) + Send + 'static + Sync,
219    {
220        self.inner.write().unwrap().notify(cb);
221    }
222
223    pub fn read_with_timestamp<V>(&self) -> Option<(TimeSpec, V)>
224    where
225        V: GetParameterTuple,
226    {
227        self.inner.read().unwrap().read()
228    }
229
230    pub fn name(&self) -> String {
231        self.inner.read().unwrap().name()
232    }
233
234    pub fn id(&self) -> u32 {
235        self.inner.read().unwrap().id()
236    }
237}