motorcortex_rust/client/
subscription.rs1use crate::GetParameterValue;
2use crate::msg::{DataType, GroupStatusMsg};
3use crate::parameter_value::GetParameterTuple;
4use crate::parameter_value::decode_parameter_value;
5use chrono::{DateTime, Local, Utc};
6use std::sync::{Arc, RwLock, RwLockReadGuard};
7
8#[derive(Debug, Clone, Copy)]
9pub struct TimeSpec {
10 pub sec: i64, pub nsec: i64, }
13
14impl TimeSpec {
15 pub fn from_buffer(buffer: &[u8]) -> Option<Self> {
16 if buffer.len() < std::mem::size_of::<TimeSpec>() {
18 return None;
19 }
20
21 let ts = unsafe { std::ptr::read(buffer.as_ptr() as *const TimeSpec) };
23
24 Some(ts)
25 }
26 pub fn to_date_time(&self) -> DateTime<Local> {
27 self.to_utc_date_time().with_timezone(&Local)
29 }
30 pub fn to_utc_date_time(&self) -> DateTime<Utc> {
31 DateTime::from_timestamp(self.sec, self.nsec as u32).expect("Invalid TimeSpec")
33 }
34}
35
36pub struct Subscription {
37 data_types: Vec<u32>,
38 description: GroupStatusMsg,
39 buffer: Vec<u8>,
40 callback: Arc<RwLock<Option<Box<dyn Fn(&Subscription) + Send + Sync + 'static>>>>,
41}
42
43impl Subscription {
44 pub fn new(group_msg: GroupStatusMsg) -> Self {
45 Self {
46 data_types: group_msg
47 .params
48 .iter()
49 .map(|param| DataType::try_from(param.info.data_type as i32).unwrap() as u32)
50 .collect(),
51 description: group_msg,
52 buffer: Vec::new(),
53 callback: Arc::new(RwLock::new(None)),
54 }
55 }
56
57 pub fn id(&self) -> u32 {
58 self.description.id
59 }
60 pub fn update(&mut self, buffer: Vec<u8>) {
61 self.buffer = buffer;
62 if let Some(cb) = &*self.callback.read().unwrap() {
63 cb(self); }
65 }
66
67 pub fn read<V>(&self) -> Option<(TimeSpec, V)>
68 where
69 V: GetParameterTuple,
70 {
71 if !self.buffer.is_empty() {
72 const HEADER_LEN: usize = 4usize;
73 let protocol_version = self.buffer[3];
74 if protocol_version == 0 {
75 return self.read_protocol0(&self.buffer[HEADER_LEN..]);
76 }
77 if protocol_version == 1 {
78 return self.read_protocol1(&self.buffer[HEADER_LEN..]);
79 }
80 }
81 None
82 }
83
84 pub fn read_all<V>(&self) -> Option<(TimeSpec, Vec<V>)>
96 where
97 V: GetParameterValue + Default,
98 {
99 if self.buffer.is_empty() {
100 return None;
101 }
102 const HEADER_LEN: usize = 4usize;
103 let protocol_version = self.buffer[3];
104 if protocol_version != 1 {
105 return None;
106 }
107
108 let buffer = &self.buffer[HEADER_LEN..];
109 let ts = TimeSpec::from_buffer(buffer)?;
110 const TS_SIZE: usize = size_of::<TimeSpec>();
111 let body = &buffer[TS_SIZE..];
112
113 let mut values = Vec::new();
114 let mut cursor = 0usize;
115
116 for (param, &data_type) in self.description.params.iter().zip(self.data_types.iter()) {
117 let size = param.size as usize;
118 let data_size = param.info.data_size as usize;
119 let num_elements = param.info.number_of_elements as usize;
120 let param_bytes = &body[cursor..cursor + size];
121
122 for i in 0..num_elements {
123 let start = i * data_size;
124 let end = start + data_size;
125 let element_bytes = ¶m_bytes[start..end];
126 values.push(decode_parameter_value::<V>(data_type, element_bytes));
127 }
128
129 cursor += size;
130 }
131
132 Some((ts, values))
133 }
134
135 pub fn notify<F>(&mut self, cb: F)
136 where
137 F: Fn(&Subscription) + Send + 'static + Sync,
138 {
139 let mut callback_lock = self.callback.write().unwrap();
140 *callback_lock = Some(Box::new(cb));
141 }
142
143 pub fn name(&self) -> String {
144 self.description.alias.to_string()
145 }
146
147 fn read_protocol0<V>(&self, _buffer: &[u8]) -> Option<(TimeSpec, V)>
148 where
149 V: GetParameterTuple,
150 {
151 None
152 }
153
154 fn combined_iterator<'a>(
155 &'a self,
156 body: &'a [u8],
157 ) -> impl Iterator<Item = (&'a u32, &'a [u8])> + 'a {
158 self.description
159 .params
160 .iter()
161 .zip(self.data_types.iter())
162 .scan(0, move |cursor, (param, data_type)| {
163 let size = param.size as usize;
164 let slice = &body[*cursor..*cursor + size];
165 *cursor += size; Some((data_type, slice)) })
168 }
169
170 fn read_protocol1<V>(&self, buffer: &[u8]) -> Option<(TimeSpec, V)>
171 where
172 V: GetParameterTuple,
173 {
174 let ts = TimeSpec::from_buffer(buffer).unwrap();
175 const TS_SIZE: usize = size_of::<TimeSpec>();
176 let body = &buffer[TS_SIZE..];
177
178 Some((ts, V::get_parameters(self.combined_iterator(body)).unwrap()))
179 }
180}
181
182pub struct ReadOnlySubscription {
183 inner: Arc<RwLock<Subscription>>, }
185
186impl ReadOnlySubscription {
187 pub fn new(subscription: Arc<RwLock<Subscription>>) -> Self {
189 Self {
190 inner: subscription,
191 }
192 }
193
194 pub fn get(&self) -> RwLockReadGuard<'_, Subscription> {
196 self.inner.read().unwrap()
197 }
198
199 pub fn read<V>(&self) -> Option<V>
201 where
202 V: GetParameterTuple,
203 {
204 self.inner.read().unwrap().read().map(|(_, v)| v)
205 }
206
207 pub fn read_all<V>(&self) -> Option<(TimeSpec, Vec<V>)>
210 where
211 V: GetParameterValue + Default,
212 {
213 self.inner.read().unwrap().read_all()
214 }
215
216 pub fn notify<F>(&self, cb: F)
217 where
218 F: Fn(&Subscription) + Send + 'static + Sync,
219 {
220 self.inner.write().unwrap().notify(cb);
221 }
222
223 pub fn read_with_timestamp<V>(&self) -> Option<(TimeSpec, V)>
224 where
225 V: GetParameterTuple,
226 {
227 self.inner.read().unwrap().read()
228 }
229
230 pub fn name(&self) -> String {
231 self.inner.read().unwrap().name()
232 }
233
234 pub fn id(&self) -> u32 {
235 self.inner.read().unwrap().id()
236 }
237}