motorcortex_rust/client/
subscription.rs1use crate::msg::{DataType, GroupStatusMsg};
2use crate::parameter_value::GetParameterTuple;
3use chrono::{DateTime, Local, Utc};
4use std::sync::{Arc, RwLock, RwLockReadGuard};
5
6#[derive(Debug, Clone, Copy)]
7pub struct TimeSpec {
8 pub sec: i64, pub nsec: i64, }
11
12impl TimeSpec {
13 pub fn from_buffer(buffer: &[u8]) -> Option<Self> {
14 if buffer.len() < std::mem::size_of::<TimeSpec>() {
16 return None;
17 }
18
19 let ts = unsafe { std::ptr::read(buffer.as_ptr() as *const TimeSpec) };
21
22 Some(ts)
23 }
24 pub fn to_date_time(&self) -> DateTime<Local> {
25 self.to_utc_date_time().with_timezone(&Local)
27 }
28 pub fn to_utc_date_time(&self) -> DateTime<Utc> {
29 DateTime::from_timestamp(self.sec, self.nsec as u32).expect("Invalid TimeSpec")
31 }
32}
33
34pub struct Subscription {
35 data_types: Vec<u32>,
36 description: GroupStatusMsg,
37 buffer: Vec<u8>,
38 callback: Arc<RwLock<Option<Box<dyn Fn(&Subscription) + Send + Sync + 'static>>>>,
39}
40
41impl Subscription {
42 pub fn new(group_msg: GroupStatusMsg) -> Self {
43 Self {
44 data_types: group_msg
45 .params
46 .iter()
47 .map(|param| DataType::try_from(param.info.data_type as i32).unwrap() as u32)
48 .collect(),
49 description: group_msg,
50 buffer: Vec::new(),
51 callback: Arc::new(RwLock::new(None)),
52 }
53 }
54
55 pub fn id(&self) -> u32 {
56 self.description.id
57 }
58 pub fn update(&mut self, buffer: Vec<u8>) {
59 self.buffer = buffer;
60 if let Some(cb) = &*self.callback.read().unwrap() {
61 cb(self); }
63 }
64
65 pub fn read<V>(&self) -> Option<(TimeSpec, V)>
66 where
67 V: GetParameterTuple,
68 {
69 if !self.buffer.is_empty() {
70 const HEADER_LEN: usize = 4usize;
71 let protocol_version = self.buffer[3];
72 if protocol_version == 0 {
73 return self.read_protocol0(&self.buffer[HEADER_LEN..]);
74 }
75 if protocol_version == 1 {
76 return self.read_protocol1(&self.buffer[HEADER_LEN..]);
77 }
78 }
79 None
80 }
81
82 pub fn notify<F>(&mut self, cb: F)
83 where
84 F: Fn(&Subscription) + Send + 'static + Sync,
85 {
86 let mut callback_lock = self.callback.write().unwrap();
87 *callback_lock = Some(Box::new(cb));
88 }
89
90 pub fn name(&self) -> String {
91 self.description.alias.to_string()
92 }
93
94 fn read_protocol0<V>(&self, _buffer: &[u8]) -> Option<(TimeSpec, V)>
95 where
96 V: GetParameterTuple,
97 {
98 None
99 }
100
101 fn combined_iterator<'a>(
102 &'a self,
103 body: &'a [u8],
104 ) -> impl Iterator<Item = (&'a u32, &'a [u8])> + 'a {
105 self.description
106 .params
107 .iter()
108 .zip(self.data_types.iter())
109 .scan(0, move |cursor, (param, data_type)| {
110 let size = param.size as usize;
111 let slice = &body[*cursor..*cursor + size];
112 *cursor += size; Some((data_type, slice)) })
115 }
116
117 fn read_protocol1<V>(&self, buffer: &[u8]) -> Option<(TimeSpec, V)>
118 where
119 V: GetParameterTuple,
120 {
121 let ts = TimeSpec::from_buffer(buffer).unwrap();
122 const TS_SIZE: usize = size_of::<TimeSpec>();
123 let body = &buffer[TS_SIZE..];
124
125 Some((ts, V::get_parameters(self.combined_iterator(body)).unwrap()))
126 }
127}
128
129pub struct ReadOnlySubscription {
130 inner: Arc<RwLock<Subscription>>, }
132
133impl ReadOnlySubscription {
134 pub fn new(subscription: Arc<RwLock<Subscription>>) -> Self {
136 Self {
137 inner: subscription,
138 }
139 }
140
141 pub fn get(&self) -> RwLockReadGuard<'_, Subscription> {
143 self.inner.read().unwrap()
144 }
145
146 pub fn read<V>(&self) -> Option<V>
148 where
149 V: GetParameterTuple,
150 {
151 self.inner.read().unwrap().read().map(|(_, v)| v)
152 }
153
154 pub fn notify<F>(&self, cb: F)
155 where
156 F: Fn(&Subscription) + Send + 'static + Sync,
157 {
158 self.inner.write().unwrap().notify(cb);
159 }
160
161 pub fn read_with_timestamp<V>(&self) -> Option<(TimeSpec, V)>
162 where
163 V: GetParameterTuple,
164 {
165 self.inner.read().unwrap().read()
166 }
167
168 pub fn name(&self) -> String {
169 self.inner.read().unwrap().name()
170 }
171
172 pub fn id(&self) -> u32 {
173 self.inner.read().unwrap().id()
174 }
175}