motorcortex_rust/client/
subscription.rs

1use crate::msg::{DataType, GroupStatusMsg};
2use crate::parameter_value::GetParameterTuple;
3use chrono::{DateTime, Local, Utc};
4use std::sync::{Arc, RwLock, RwLockReadGuard};
5
6#[derive(Debug, Clone, Copy)]
7pub struct TimeSpec {
8    pub sec: i64,  // Seconds
9    pub nsec: i64, // Nanoseconds
10}
11
12impl TimeSpec {
13    pub fn from_buffer(buffer: &[u8]) -> Option<Self> {
14        // Ensure the buffer has enough size for the TimeSpec struct
15        if buffer.len() < std::mem::size_of::<TimeSpec>() {
16            return None;
17        }
18
19        // Safely cast the buffer into a TimeSpec struct
20        let ts = unsafe { std::ptr::read(buffer.as_ptr() as *const TimeSpec) };
21
22        Some(ts)
23    }
24    pub fn to_date_time(&self) -> DateTime<Local> {
25        // Create a NaiveDateTime from seconds and nanoseconds since the UNIX epoch
26        self.to_utc_date_time().with_timezone(&Local)
27    }
28    pub fn to_utc_date_time(&self) -> DateTime<Utc> {
29        // Create a NaiveDateTime from seconds and nanoseconds since the UNIX epoch
30        DateTime::from_timestamp(self.sec, self.nsec as u32).expect("Invalid TimeSpec")
31    }
32}
33
34pub struct Subscription {
35    data_types: Vec<u32>,
36    description: GroupStatusMsg,
37    buffer: Vec<u8>,
38    callback: Arc<RwLock<Option<Box<dyn Fn(&Subscription) + Send + Sync + 'static>>>>,
39}
40
41impl Subscription {
42    pub fn new(group_msg: GroupStatusMsg) -> Self {
43        Self {
44            data_types: group_msg
45                .params
46                .iter()
47                .map(|param| DataType::try_from(param.info.data_type as i32).unwrap() as u32)
48                .collect(),
49            description: group_msg,
50            buffer: Vec::new(),
51            callback: Arc::new(RwLock::new(None)),
52        }
53    }
54
55    pub fn id(&self) -> u32 {
56        self.description.id
57    }
58    pub fn update(&mut self, buffer: Vec<u8>) {
59        self.buffer = buffer;
60        if let Some(cb) = &*self.callback.read().unwrap() {
61            cb(self); // Call the callback
62        }
63    }
64
65    pub fn read<V>(&self) -> Option<(TimeSpec, V)>
66    where
67        V: GetParameterTuple,
68    {
69        if !self.buffer.is_empty() {
70            const HEADER_LEN: usize = 4usize;
71            let protocol_version = self.buffer[3];
72            if protocol_version == 0 {
73                return self.read_protocol0(&self.buffer[HEADER_LEN..]);
74            }
75            if protocol_version == 1 {
76                return self.read_protocol1(&self.buffer[HEADER_LEN..]);
77            }
78        }
79        None
80    }
81
82    pub fn notify<F>(&mut self, cb: F)
83    where
84        F: Fn(&Subscription) + Send + 'static + Sync,
85    {
86        let mut callback_lock = self.callback.write().unwrap();
87        *callback_lock = Some(Box::new(cb));
88    }
89
90    fn read_protocol0<V>(&self, buffer: &[u8]) -> Option<(TimeSpec, V)>
91    where
92        V: GetParameterTuple,
93    {
94        None
95    }
96
97    fn combined_iterator<'a>(
98        &'a self,
99        body: &'a [u8],
100    ) -> impl Iterator<Item = (&'a u32, &'a [u8])> + 'a {
101        self.description
102            .params
103            .iter()
104            .zip(self.data_types.iter())
105            .scan(0, move |cursor, (param, data_type)| {
106                let size = param.size as usize;
107                let slice = &body[*cursor..*cursor + size];
108                *cursor += size; // Advance the cursor
109                Some((data_type, slice)) // Yield the pair (data_type, slice)
110            })
111    }
112
113    fn read_protocol1<V>(&self, buffer: &[u8]) -> Option<(TimeSpec, V)>
114    where
115        V: GetParameterTuple,
116    {
117        let ts = TimeSpec::from_buffer(buffer).unwrap();
118        const TS_SIZE: usize = size_of::<TimeSpec>();
119        let body = &buffer[TS_SIZE..];
120
121        Some((ts, V::get_parameters(self.combined_iterator(body)).unwrap()))
122    }
123}
124
125pub struct ReadOnlySubscription {
126    inner: Arc<RwLock<Subscription>>, // Internal shared ownership
127}
128
129impl ReadOnlySubscription {
130    /// Creates a new readonly wrapper around the `Arc<RwLock<Subscription>>`.
131    pub fn new(subscription: Arc<RwLock<Subscription>>) -> Self {
132        Self {
133            inner: subscription,
134        }
135    }
136
137    /// Provides thread-safe, readonly access to the subscription.
138    pub fn get(&self) -> RwLockReadGuard<Subscription> {
139        self.inner.read().unwrap()
140    }
141
142    /// Optionally, expose specific readonly fields for convenience
143    pub fn read<V>(&self) -> Option<V>
144    where
145        V: GetParameterTuple,
146    {
147        self.inner.read().unwrap().read().map(|(_, v)| v)
148    }
149
150    pub fn notify<F>(&self, cb: F)
151    where
152        F: Fn(&Subscription) + Send + 'static + Sync,
153    {
154        self.inner.write().unwrap().notify(cb);
155    }
156
157    pub fn read_with_timestamp<V>(&self) -> Option<(TimeSpec, V)>
158    where
159        V: GetParameterTuple,
160    {
161        self.inner.read().unwrap().read()
162    }
163}