motorcortex_rust/client/
subscription.rs1use crate::msg::{DataType, GroupStatusMsg};
2use crate::parameter_value::GetParameterTuple;
3use chrono::{DateTime, Local, Utc};
4use std::sync::{Arc, RwLock, RwLockReadGuard};
5
6#[derive(Debug, Clone, Copy)]
7pub struct TimeSpec {
8 pub sec: i64, pub nsec: i64, }
11
12impl TimeSpec {
13 pub fn from_buffer(buffer: &[u8]) -> Option<Self> {
14 if buffer.len() < std::mem::size_of::<TimeSpec>() {
16 return None;
17 }
18
19 let ts = unsafe { std::ptr::read(buffer.as_ptr() as *const TimeSpec) };
21
22 Some(ts)
23 }
24 pub fn to_date_time(&self) -> DateTime<Local> {
25 self.to_utc_date_time().with_timezone(&Local)
27 }
28 pub fn to_utc_date_time(&self) -> DateTime<Utc> {
29 DateTime::from_timestamp(self.sec, self.nsec as u32).expect("Invalid TimeSpec")
31 }
32}
33
34pub struct Subscription {
35 data_types: Vec<u32>,
36 description: GroupStatusMsg,
37 buffer: Vec<u8>,
38 callback: Arc<RwLock<Option<Box<dyn Fn(&Subscription) + Send + Sync + 'static>>>>,
39}
40
41impl Subscription {
42 pub fn new(group_msg: GroupStatusMsg) -> Self {
43 Self {
44 data_types: group_msg
45 .params
46 .iter()
47 .map(|param| DataType::try_from(param.info.data_type as i32).unwrap() as u32)
48 .collect(),
49 description: group_msg,
50 buffer: Vec::new(),
51 callback: Arc::new(RwLock::new(None)),
52 }
53 }
54
55 pub fn id(&self) -> u32 {
56 self.description.id
57 }
58 pub fn update(&mut self, buffer: Vec<u8>) {
59 self.buffer = buffer;
60 if let Some(cb) = &*self.callback.read().unwrap() {
61 cb(self); }
63 }
64
65 pub fn read<V>(&self) -> Option<(TimeSpec, V)>
66 where
67 V: GetParameterTuple,
68 {
69 if !self.buffer.is_empty() {
70 const HEADER_LEN: usize = 4usize;
71 let protocol_version = self.buffer[3];
72 if protocol_version == 0 {
73 return self.read_protocol0(&self.buffer[HEADER_LEN..]);
74 }
75 if protocol_version == 1 {
76 return self.read_protocol1(&self.buffer[HEADER_LEN..]);
77 }
78 }
79 None
80 }
81
82 pub fn notify<F>(&mut self, cb: F)
83 where
84 F: Fn(&Subscription) + Send + 'static + Sync,
85 {
86 let mut callback_lock = self.callback.write().unwrap();
87 *callback_lock = Some(Box::new(cb));
88 }
89
90 fn read_protocol0<V>(&self, buffer: &[u8]) -> Option<(TimeSpec, V)>
91 where
92 V: GetParameterTuple,
93 {
94 None
95 }
96
97 fn combined_iterator<'a>(
98 &'a self,
99 body: &'a [u8],
100 ) -> impl Iterator<Item = (&'a u32, &'a [u8])> + 'a {
101 self.description
102 .params
103 .iter()
104 .zip(self.data_types.iter())
105 .scan(0, move |cursor, (param, data_type)| {
106 let size = param.size as usize;
107 let slice = &body[*cursor..*cursor + size];
108 *cursor += size; Some((data_type, slice)) })
111 }
112
113 fn read_protocol1<V>(&self, buffer: &[u8]) -> Option<(TimeSpec, V)>
114 where
115 V: GetParameterTuple,
116 {
117 let ts = TimeSpec::from_buffer(buffer).unwrap();
118 const TS_SIZE: usize = size_of::<TimeSpec>();
119 let body = &buffer[TS_SIZE..];
120
121 Some((ts, V::get_parameters(self.combined_iterator(body)).unwrap()))
122 }
123}
124
125pub struct ReadOnlySubscription {
126 inner: Arc<RwLock<Subscription>>, }
128
129impl ReadOnlySubscription {
130 pub fn new(subscription: Arc<RwLock<Subscription>>) -> Self {
132 Self {
133 inner: subscription,
134 }
135 }
136
137 pub fn get(&self) -> RwLockReadGuard<Subscription> {
139 self.inner.read().unwrap()
140 }
141
142 pub fn read<V>(&self) -> Option<V>
144 where
145 V: GetParameterTuple,
146 {
147 self.inner.read().unwrap().read().map(|(_, v)| v)
148 }
149
150 pub fn notify<F>(&self, cb: F)
151 where
152 F: Fn(&Subscription) + Send + 'static + Sync,
153 {
154 self.inner.write().unwrap().notify(cb);
155 }
156
157 pub fn read_with_timestamp<V>(&self) -> Option<(TimeSpec, V)>
158 where
159 V: GetParameterTuple,
160 {
161 self.inner.read().unwrap().read()
162 }
163}