1use alloc::vec::Vec;
2use core::convert::TryFrom;
3use core::marker::PhantomData;
4use core::mem;
5use fallible_collections::TryHashMap;
6
7use canadensis_core::crc::CrcTracker;
8use canadensis_core::subscription::SubscriptionManager;
9use canadensis_core::time::{Clock, MicrosecondDuration32, Microseconds32};
10use canadensis_core::transfer::{Header, Transfer};
11use canadensis_core::transport::Receiver;
12use canadensis_core::{nb, OutOfMemoryError, ServiceId, ServiceSubscribeError, SubjectId};
13use canadensis_header::Header as SerialHeader;
14
15use crate::cobs::Unescaper;
16use crate::driver::ReceiveDriver;
17use crate::header_collector::HeaderCollector;
18use crate::{Error, SerialNodeId, SerialTransferId, SerialTransport};
19
20pub struct SerialReceiver<C, D, S> {
24 state: State,
25 node_id: Option<SerialNodeId>,
26 subscriptions: S,
27 _driver: PhantomData<D>,
28 _clock: PhantomData<C>,
29}
30
31impl<C, D, S> SerialReceiver<C, D, S>
32where
33 C: Clock,
34 D: ReceiveDriver,
35 S: SubscriptionManager<Subscription> + Default,
36{
37 pub fn new(node_id: SerialNodeId) -> Self {
38 SerialReceiver {
39 state: State::Idle,
40 node_id: Some(node_id),
41 subscriptions: S::default(),
42 _driver: PhantomData,
43 _clock: PhantomData,
44 }
45 }
46 pub fn new_anonymous() -> Self {
47 SerialReceiver {
48 state: State::Idle,
49 node_id: None,
50 subscriptions: S::default(),
51 _driver: PhantomData,
52 _clock: PhantomData,
53 }
54 }
55
56 fn clean_expired_sessions(&mut self, now: Microseconds32) {
57 self.subscriptions
58 .for_each_message_subscription_mut(|sub| sub.clean_expired_sessions(now));
59 self.subscriptions
60 .for_each_request_subscription_mut(|sub| sub.clean_expired_sessions(now));
61 self.subscriptions
62 .for_each_response_subscription_mut(|sub| sub.clean_expired_sessions(now));
63 }
64
65 fn handle_byte(
66 &mut self,
67 byte: u8,
68 now: Microseconds32,
69 ) -> Result<Option<Transfer<Vec<u8>, SerialTransport>>, Error<D::Error>> {
70 let state = mem::replace(&mut self.state, State::Idle);
71 self.state = match state {
72 State::Idle => {
73 if byte == 0 {
74 State::BetweenTransfers
75 } else {
76 State::Idle
77 }
78 }
79 State::BetweenTransfers => {
80 if byte != 0 {
81 l0g::debug!("Starting frame");
83 let mut unescaper = Unescaper::new();
84 match unescaper.accept(byte) {
85 Ok(Some(byte)) => {
86 let mut header = HeaderCollector::new();
88 header.push(byte);
89 State::Header { unescaper, header }
90 }
91 Ok(None) => State::Header {
92 unescaper,
93 header: HeaderCollector::new(),
94 },
95 Err(_) => unreachable!("Unescaper returned an error for a non-zero input"),
96 }
97 } else {
98 State::BetweenTransfers
100 }
101 }
102 State::Header {
103 mut unescaper,
104 mut header,
105 } => {
106 match unescaper.accept(byte) {
107 Ok(Some(byte)) => {
108 header.push(byte);
109
110 if header.is_done() {
111 let header = header.as_header();
113 match SerialHeader::try_from(header) {
114 Ok(header) => {
115 let header = header.as_core_header(now);
116 if let Some(subscription) = self.is_interested(&header) {
117 let mut payload = Vec::new();
119 match payload
120 .try_reserve_exact(subscription.payload_size_max)
121 {
122 Ok(()) => State::Payload {
123 unescaper,
124 header,
125 crc: CrcTracker::new(),
126 payload,
127 },
128 Err(_) => {
129 self.state = State::Idle;
131 return Err(Error::Memory(OutOfMemoryError));
132 }
133 }
134 } else {
135 l0g::debug!("Got header, but not subscribed");
137 State::Idle
138 }
139 }
140 #[allow(unused_variables)]
141 Err(e) => {
142 l0g::debug!("Header format or CRC invalid: {:?}", e);
144 State::Idle
145 }
146 }
147 } else {
148 State::Header { unescaper, header }
150 }
151 }
152 Ok(None) => {
153 State::Header { unescaper, header }
155 }
156 Err(_) => {
157 l0g::warn!("Unexpected zero byte in Header state");
158 State::Idle
159 }
160 }
161 }
162 State::Payload {
163 mut unescaper,
164 header,
165 mut crc,
166 mut payload,
167 } => {
168 match unescaper.accept(byte) {
169 Ok(Some(byte)) => {
170 if let Some(byte_before_crc) = crc.digest(byte) {
171 if payload.len() < payload.capacity() {
172 payload.push(byte_before_crc);
173 }
174 }
175 State::Payload {
176 unescaper,
177 header,
178 crc,
179 payload,
180 }
181 }
182 Ok(None) => {
183 State::Payload {
185 unescaper,
186 header,
187 crc,
188 payload,
189 }
190 }
191 Err(_) => {
192 l0g::debug!("Got a zero (end delimiter)");
193 self.state = State::BetweenTransfers;
194 return Ok(self.complete_transfer(header, payload, crc));
196 }
197 }
198 }
199 };
200 Ok(None)
201 }
202}
203
204impl<C, D, S> Receiver<C> for SerialReceiver<C, D, S>
205where
206 C: Clock,
207 D: ReceiveDriver,
208 S: SubscriptionManager<Subscription> + Default,
209{
210 type Transport = SerialTransport;
211 type Driver = D;
212 type Error = Error<D::Error>;
213
214 fn receive(
215 &mut self,
216 clock: &mut C,
217 driver: &mut D,
218 ) -> Result<Option<Transfer<Vec<u8>, Self::Transport>>, Self::Error> {
219 self.clean_expired_sessions(clock.now());
220 loop {
221 match driver.receive_byte() {
222 Ok(byte) => match self.handle_byte(byte, clock.now()) {
223 Ok(Some(transfer)) => break Ok(Some(transfer)),
224 Ok(None) => { }
225 Err(e) => break Err(e),
226 },
227 Err(nb::Error::WouldBlock) => break Ok(None),
228 Err(nb::Error::Other(e)) => break Err(Error::Driver(e)),
229 }
230 }
231 }
232
233 fn subscribe_message(
234 &mut self,
235 subject: SubjectId,
236 payload_size_max: usize,
237 timeout: MicrosecondDuration32,
238 _driver: &mut D,
239 ) -> Result<(), Self::Error> {
240 self.subscriptions
241 .subscribe_message(subject, Subscription::new(payload_size_max, timeout))
242 .map_err(Error::Memory)
243 }
244
245 fn unsubscribe_message(&mut self, subject: SubjectId, _driver: &mut D) {
246 self.subscriptions.unsubscribe_message(subject);
247 }
248
249 fn subscribe_request(
250 &mut self,
251 service: ServiceId,
252 payload_size_max: usize,
253 timeout: MicrosecondDuration32,
254 _driver: &mut D,
255 ) -> Result<(), ServiceSubscribeError<Self::Error>> {
256 if self.node_id.is_some() {
257 self.subscriptions
258 .subscribe_request(service, Subscription::new(payload_size_max, timeout))
259 .map_err(|oom| ServiceSubscribeError::Transport(Error::Memory(oom)))
260 } else {
261 Err(ServiceSubscribeError::Anonymous)
262 }
263 }
264
265 fn unsubscribe_request(&mut self, service: ServiceId, _driver: &mut D) {
266 self.subscriptions.unsubscribe_request(service);
267 }
268
269 fn subscribe_response(
270 &mut self,
271 service: ServiceId,
272 payload_size_max: usize,
273 timeout: MicrosecondDuration32,
274 _driver: &mut D,
275 ) -> Result<(), ServiceSubscribeError<Self::Error>> {
276 if self.node_id.is_some() {
277 self.subscriptions
278 .subscribe_response(service, Subscription::new(payload_size_max, timeout))
279 .map_err(|oom| ServiceSubscribeError::Transport(Error::Memory(oom)))
280 } else {
281 Err(ServiceSubscribeError::Anonymous)
282 }
283 }
284
285 fn unsubscribe_response(&mut self, service: ServiceId, _driver: &mut D) {
286 self.subscriptions.unsubscribe_response(service);
287 }
288
289 fn set_id(&mut self, id: Option<SerialNodeId>) {
290 self.node_id = id;
291 }
292
293 fn subscribers(&self) -> impl Iterator<Item = SubjectId> {
294 self.subscriptions.subscribers()
295 }
296
297 fn servers(&self) -> impl Iterator<Item = ServiceId> {
298 self.subscriptions.servers()
299 }
300}
301
302impl<C, D, S> SerialReceiver<C, D, S>
303where
304 C: Clock,
305 S: SubscriptionManager<Subscription>,
306{
307 fn find_subscription_mut(
310 &mut self,
311 header: &Header<SerialTransport>,
312 ) -> Option<&mut Subscription> {
313 match header {
314 Header::Message(header) => self
315 .subscriptions
316 .find_message_subscription_mut(header.subject),
317 Header::Request(header) => {
318 if self.node_id == Some(header.destination) {
319 self.subscriptions
320 .find_request_subscription_mut(header.service)
321 } else {
322 None
323 }
324 }
325 Header::Response(header) => {
326 if self.node_id == Some(header.destination) {
327 self.subscriptions
328 .find_response_subscription_mut(header.service)
329 } else {
330 None
331 }
332 }
333 }
334 }
335
336 fn is_interested(&self, header: &Header<SerialTransport>) -> Option<&Subscription> {
340 self.subscriptions
341 .find_subscription(header)
342 .and_then(|subscription| {
343 match header.source() {
344 Some(source) => {
345 match subscription.sessions.get(source) {
346 Some(session) => {
347 if session.last_transfer_id < *header.transfer_id() {
348 Some(subscription)
349 } else {
350 None
352 }
353 }
354 None => {
355 Some(subscription)
357 }
358 }
359 }
360 None => {
361 Some(subscription)
363 }
364 }
365 })
366 }
367
368 fn complete_transfer(
369 &mut self,
370 header: Header<SerialTransport>,
371 payload: Vec<u8>,
372 crc: CrcTracker,
373 ) -> Option<Transfer<Vec<u8>, SerialTransport>> {
374 if !crc.correct() {
375 l0g::debug!("Dropping transfer due to incorrect transfer CRC");
376 return None;
377 }
378 if let Some(subscription) = self.find_subscription_mut(&header) {
380 if let Some(source_node) = header.source() {
381 let _ = subscription.sessions.insert(
384 *source_node,
385 Session {
386 expiration_time: header.timestamp() + subscription.timeout,
387 last_transfer_id: *header.transfer_id(),
388 },
389 );
390 }
391 Some(Transfer {
392 header,
393 loopback: false,
394 payload,
395 })
396 } else {
397 l0g::debug!("No matching subscription for header");
399 None
400 }
401 }
402}
403
404pub struct Subscription {
405 payload_size_max: usize,
407 timeout: MicrosecondDuration32,
409 sessions: TryHashMap<SerialNodeId, Session>,
413}
414
415impl Subscription {
416 fn new(payload_size_max: usize, timeout: MicrosecondDuration32) -> Self {
417 Subscription {
418 payload_size_max,
419 timeout,
420 sessions: Default::default(),
421 }
422 }
423
424 fn clean_expired_sessions(&mut self, now: Microseconds32) {
426 loop {
427 let mut id_to_remove: Option<SerialNodeId> = None;
428 for (id, session) in self.sessions.iter() {
429 if session.expiration_time < now {
430 id_to_remove = Some(*id);
431 }
432 }
433 match id_to_remove {
434 Some(id) => {
435 self.sessions.remove(&id);
436 }
437 None => break,
438 }
439 }
440 }
441}
442
443struct Session {
444 expiration_time: Microseconds32,
445 last_transfer_id: SerialTransferId,
446}
447
448enum State {
450 Idle,
452 BetweenTransfers,
454 Header {
458 unescaper: Unescaper,
459 header: HeaderCollector,
460 },
461 Payload {
465 unescaper: Unescaper,
466 header: Header<SerialTransport>,
467 crc: CrcTracker,
471 payload: Vec<u8>,
472 },
473}