motorcortex_rust/client/
subscription.rs

1use crate::msg::{DataType, GroupStatusMsg};
2use crate::parameter_value::GetParameterTuple;
3use chrono::{DateTime, Local, Utc};
4use std::sync::{Arc, RwLock, RwLockReadGuard};
5
6#[derive(Debug, Clone, Copy)]
7pub struct TimeSpec {
8    pub sec: i64,  // Seconds
9    pub nsec: i64, // Nanoseconds
10}
11
12impl TimeSpec {
13    pub fn from_buffer(buffer: &[u8]) -> Option<Self> {
14        // Ensure the buffer has enough size for the TimeSpec struct
15        if buffer.len() < std::mem::size_of::<TimeSpec>() {
16            return None;
17        }
18
19        // Safely cast the buffer into a TimeSpec struct
20        let ts = unsafe { std::ptr::read(buffer.as_ptr() as *const TimeSpec) };
21
22        Some(ts)
23    }
24    pub fn to_date_time(&self) -> DateTime<Local> {
25        // Create a NaiveDateTime from seconds and nanoseconds since the UNIX epoch
26        self.to_utc_date_time().with_timezone(&Local)
27    }
28    pub fn to_utc_date_time(&self) -> DateTime<Utc> {
29        // Create a NaiveDateTime from seconds and nanoseconds since the UNIX epoch
30        DateTime::from_timestamp(self.sec, self.nsec as u32).expect("Invalid TimeSpec")
31    }
32}
33
34pub struct Subscription {
35    data_types: Vec<u32>,
36    description: GroupStatusMsg,
37    buffer: Vec<u8>,
38    callback: Arc<RwLock<Option<Box<dyn Fn(&Subscription) + Send + Sync + 'static>>>>,
39}
40
41impl Subscription {
42    pub fn new(group_msg: GroupStatusMsg) -> Self {
43        Self {
44            data_types: group_msg
45                .params
46                .iter()
47                .map(|param| DataType::try_from(param.info.data_type as i32).unwrap() as u32)
48                .collect(),
49            description: group_msg,
50            buffer: Vec::new(),
51            callback: Arc::new(RwLock::new(None)),
52        }
53    }
54
55    pub fn id(&self) -> u32 {
56        self.description.id
57    }
58    pub fn update(&mut self, buffer: Vec<u8>) {
59        self.buffer = buffer;
60        if let Some(cb) = &*self.callback.read().unwrap() {
61            cb(self); // Call the callback
62        }
63    }
64
65    pub fn read<V>(&self) -> Option<(TimeSpec, V)>
66    where
67        V: GetParameterTuple,
68    {
69        if !self.buffer.is_empty() {
70            const HEADER_LEN: usize = 4usize;
71            let protocol_version = self.buffer[3];
72            if protocol_version == 0 {
73                return self.read_protocol0(&self.buffer[HEADER_LEN..]);
74            }
75            if protocol_version == 1 {
76                return self.read_protocol1(&self.buffer[HEADER_LEN..]);
77            }
78        }
79        None
80    }
81
82    pub fn notify<F>(&mut self, cb: F)
83    where
84        F: Fn(&Subscription) + Send + 'static + Sync,
85    {
86        let mut callback_lock = self.callback.write().unwrap();
87        *callback_lock = Some(Box::new(cb));
88    }
89
90    pub fn name(&self) -> String {
91        self.description.alias.to_string()
92    }
93
94    fn read_protocol0<V>(&self, _buffer: &[u8]) -> Option<(TimeSpec, V)>
95    where
96        V: GetParameterTuple,
97    {
98        None
99    }
100
101    fn combined_iterator<'a>(
102        &'a self,
103        body: &'a [u8],
104    ) -> impl Iterator<Item = (&'a u32, &'a [u8])> + 'a {
105        self.description
106            .params
107            .iter()
108            .zip(self.data_types.iter())
109            .scan(0, move |cursor, (param, data_type)| {
110                let size = param.size as usize;
111                let slice = &body[*cursor..*cursor + size];
112                *cursor += size; // Advance the cursor
113                Some((data_type, slice)) // Yield the pair (data_type, slice)
114            })
115    }
116
117    fn read_protocol1<V>(&self, buffer: &[u8]) -> Option<(TimeSpec, V)>
118    where
119        V: GetParameterTuple,
120    {
121        let ts = TimeSpec::from_buffer(buffer).unwrap();
122        const TS_SIZE: usize = size_of::<TimeSpec>();
123        let body = &buffer[TS_SIZE..];
124
125        Some((ts, V::get_parameters(self.combined_iterator(body)).unwrap()))
126    }
127}
128
129pub struct ReadOnlySubscription {
130    inner: Arc<RwLock<Subscription>>, // Internal shared ownership
131}
132
133impl ReadOnlySubscription {
134    /// Creates a new readonly wrapper around the `Arc<RwLock<Subscription>>`.
135    pub fn new(subscription: Arc<RwLock<Subscription>>) -> Self {
136        Self {
137            inner: subscription,
138        }
139    }
140
141    /// Provides thread-safe, readonly access to the subscription.
142    pub fn get(&self) -> RwLockReadGuard<'_, Subscription> {
143        self.inner.read().unwrap()
144    }
145
146    /// Optionally, expose specific readonly fields for convenience
147    pub fn read<V>(&self) -> Option<V>
148    where
149        V: GetParameterTuple,
150    {
151        self.inner.read().unwrap().read().map(|(_, v)| v)
152    }
153
154    pub fn notify<F>(&self, cb: F)
155    where
156        F: Fn(&Subscription) + Send + 'static + Sync,
157    {
158        self.inner.write().unwrap().notify(cb);
159    }
160
161    pub fn read_with_timestamp<V>(&self) -> Option<(TimeSpec, V)>
162    where
163        V: GetParameterTuple,
164    {
165        self.inner.read().unwrap().read()
166    }
167
168    pub fn name(&self) -> String {
169        self.inner.read().unwrap().name()
170    }
171
172    pub fn id(&self) -> u32 {
173        self.inner.read().unwrap().id()
174    }
175}