Skip to main content

canadensis_serial/
rx.rs

1use alloc::vec::Vec;
2use core::convert::TryFrom;
3use core::marker::PhantomData;
4use core::mem;
5use fallible_collections::TryHashMap;
6
7use canadensis_core::crc::CrcTracker;
8use canadensis_core::subscription::SubscriptionManager;
9use canadensis_core::time::{Clock, MicrosecondDuration32, Microseconds32};
10use canadensis_core::transfer::{Header, Transfer};
11use canadensis_core::transport::Receiver;
12use canadensis_core::{nb, OutOfMemoryError, ServiceId, ServiceSubscribeError, SubjectId};
13use canadensis_header::Header as SerialHeader;
14
15use crate::cobs::Unescaper;
16use crate::driver::ReceiveDriver;
17use crate::header_collector::HeaderCollector;
18use crate::{Error, SerialNodeId, SerialTransferId, SerialTransport};
19
20/// A serial transport receiver
21///
22/// This implementation does not support multi-frame transfers or timestamps.
23pub struct SerialReceiver<C, D, S> {
24    state: State,
25    node_id: Option<SerialNodeId>,
26    subscriptions: S,
27    _driver: PhantomData<D>,
28    _clock: PhantomData<C>,
29}
30
31impl<C, D, S> SerialReceiver<C, D, S>
32where
33    C: Clock,
34    D: ReceiveDriver,
35    S: SubscriptionManager<Subscription> + Default,
36{
37    pub fn new(node_id: SerialNodeId) -> Self {
38        SerialReceiver {
39            state: State::Idle,
40            node_id: Some(node_id),
41            subscriptions: S::default(),
42            _driver: PhantomData,
43            _clock: PhantomData,
44        }
45    }
46    pub fn new_anonymous() -> Self {
47        SerialReceiver {
48            state: State::Idle,
49            node_id: None,
50            subscriptions: S::default(),
51            _driver: PhantomData,
52            _clock: PhantomData,
53        }
54    }
55
56    fn clean_expired_sessions(&mut self, now: Microseconds32) {
57        self.subscriptions
58            .for_each_message_subscription_mut(|sub| sub.clean_expired_sessions(now));
59        self.subscriptions
60            .for_each_request_subscription_mut(|sub| sub.clean_expired_sessions(now));
61        self.subscriptions
62            .for_each_response_subscription_mut(|sub| sub.clean_expired_sessions(now));
63    }
64
65    fn handle_byte(
66        &mut self,
67        byte: u8,
68        now: Microseconds32,
69    ) -> Result<Option<Transfer<Vec<u8>, SerialTransport>>, Error<D::Error>> {
70        let state = mem::replace(&mut self.state, State::Idle);
71        self.state = match state {
72            State::Idle => {
73                if byte == 0 {
74                    State::BetweenTransfers
75                } else {
76                    State::Idle
77                }
78            }
79            State::BetweenTransfers => {
80                if byte != 0 {
81                    // Start decoding
82                    l0g::debug!("Starting frame");
83                    let mut unescaper = Unescaper::new();
84                    match unescaper.accept(byte) {
85                        Ok(Some(byte)) => {
86                            // Got the first byte of the header
87                            let mut header = HeaderCollector::new();
88                            header.push(byte);
89                            State::Header { unescaper, header }
90                        }
91                        Ok(None) => State::Header {
92                            unescaper,
93                            header: HeaderCollector::new(),
94                        },
95                        Err(_) => unreachable!("Unescaper returned an error for a non-zero input"),
96                    }
97                } else {
98                    // Got another zero, keep waiting
99                    State::BetweenTransfers
100                }
101            }
102            State::Header {
103                mut unescaper,
104                mut header,
105            } => {
106                match unescaper.accept(byte) {
107                    Ok(Some(byte)) => {
108                        header.push(byte);
109
110                        if header.is_done() {
111                            // Got the complete header
112                            let header = header.as_header();
113                            match SerialHeader::try_from(header) {
114                                Ok(header) => {
115                                    let header = header.as_core_header(now);
116                                    if let Some(subscription) = self.is_interested(&header) {
117                                        // Try to allocate memory for the incoming transfer
118                                        let mut payload = Vec::new();
119                                        match payload
120                                            .try_reserve_exact(subscription.payload_size_max)
121                                        {
122                                            Ok(()) => State::Payload {
123                                                unescaper,
124                                                header,
125                                                crc: CrcTracker::new(),
126                                                payload,
127                                            },
128                                            Err(_) => {
129                                                // Not enough memory to receive this transfer
130                                                self.state = State::Idle;
131                                                return Err(Error::Memory(OutOfMemoryError));
132                                            }
133                                        }
134                                    } else {
135                                        // Not interested in this transfer
136                                        l0g::debug!("Got header, but not subscribed");
137                                        State::Idle
138                                    }
139                                }
140                                #[allow(unused_variables)]
141                                Err(e) => {
142                                    // Invalid header CRC or format
143                                    l0g::debug!("Header format or CRC invalid: {:?}", e);
144                                    State::Idle
145                                }
146                            }
147                        } else {
148                            // Wait for more header bytes
149                            State::Header { unescaper, header }
150                        }
151                    }
152                    Ok(None) => {
153                        // Keep the same state
154                        State::Header { unescaper, header }
155                    }
156                    Err(_) => {
157                        l0g::warn!("Unexpected zero byte in Header state");
158                        State::Idle
159                    }
160                }
161            }
162            State::Payload {
163                mut unescaper,
164                header,
165                mut crc,
166                mut payload,
167            } => {
168                match unescaper.accept(byte) {
169                    Ok(Some(byte)) => {
170                        if let Some(byte_before_crc) = crc.digest(byte) {
171                            if payload.len() < payload.capacity() {
172                                payload.push(byte_before_crc);
173                            }
174                        }
175                        State::Payload {
176                            unescaper,
177                            header,
178                            crc,
179                            payload,
180                        }
181                    }
182                    Ok(None) => {
183                        // Stay in the same state
184                        State::Payload {
185                            unescaper,
186                            header,
187                            crc,
188                            payload,
189                        }
190                    }
191                    Err(_) => {
192                        l0g::debug!("Got a zero (end delimiter)");
193                        self.state = State::BetweenTransfers;
194                        // Check and finish the transfer
195                        return Ok(self.complete_transfer(header, payload, crc));
196                    }
197                }
198            }
199        };
200        Ok(None)
201    }
202}
203
204impl<C, D, S> Receiver<C> for SerialReceiver<C, D, S>
205where
206    C: Clock,
207    D: ReceiveDriver,
208    S: SubscriptionManager<Subscription> + Default,
209{
210    type Transport = SerialTransport;
211    type Driver = D;
212    type Error = Error<D::Error>;
213
214    fn receive(
215        &mut self,
216        clock: &mut C,
217        driver: &mut D,
218    ) -> Result<Option<Transfer<Vec<u8>, Self::Transport>>, Self::Error> {
219        self.clean_expired_sessions(clock.now());
220        loop {
221            match driver.receive_byte() {
222                Ok(byte) => match self.handle_byte(byte, clock.now()) {
223                    Ok(Some(transfer)) => break Ok(Some(transfer)),
224                    Ok(None) => { /* Keep going and try another byte */ }
225                    Err(e) => break Err(e),
226                },
227                Err(nb::Error::WouldBlock) => break Ok(None),
228                Err(nb::Error::Other(e)) => break Err(Error::Driver(e)),
229            }
230        }
231    }
232
233    fn subscribe_message(
234        &mut self,
235        subject: SubjectId,
236        payload_size_max: usize,
237        timeout: MicrosecondDuration32,
238        _driver: &mut D,
239    ) -> Result<(), Self::Error> {
240        self.subscriptions
241            .subscribe_message(subject, Subscription::new(payload_size_max, timeout))
242            .map_err(Error::Memory)
243    }
244
245    fn unsubscribe_message(&mut self, subject: SubjectId, _driver: &mut D) {
246        self.subscriptions.unsubscribe_message(subject);
247    }
248
249    fn subscribe_request(
250        &mut self,
251        service: ServiceId,
252        payload_size_max: usize,
253        timeout: MicrosecondDuration32,
254        _driver: &mut D,
255    ) -> Result<(), ServiceSubscribeError<Self::Error>> {
256        if self.node_id.is_some() {
257            self.subscriptions
258                .subscribe_request(service, Subscription::new(payload_size_max, timeout))
259                .map_err(|oom| ServiceSubscribeError::Transport(Error::Memory(oom)))
260        } else {
261            Err(ServiceSubscribeError::Anonymous)
262        }
263    }
264
265    fn unsubscribe_request(&mut self, service: ServiceId, _driver: &mut D) {
266        self.subscriptions.unsubscribe_request(service);
267    }
268
269    fn subscribe_response(
270        &mut self,
271        service: ServiceId,
272        payload_size_max: usize,
273        timeout: MicrosecondDuration32,
274        _driver: &mut D,
275    ) -> Result<(), ServiceSubscribeError<Self::Error>> {
276        if self.node_id.is_some() {
277            self.subscriptions
278                .subscribe_response(service, Subscription::new(payload_size_max, timeout))
279                .map_err(|oom| ServiceSubscribeError::Transport(Error::Memory(oom)))
280        } else {
281            Err(ServiceSubscribeError::Anonymous)
282        }
283    }
284
285    fn unsubscribe_response(&mut self, service: ServiceId, _driver: &mut D) {
286        self.subscriptions.unsubscribe_response(service);
287    }
288
289    fn set_id(&mut self, id: Option<SerialNodeId>) {
290        self.node_id = id;
291    }
292
293    fn subscribers(&self) -> impl Iterator<Item = SubjectId> {
294        self.subscriptions.subscribers()
295    }
296
297    fn servers(&self) -> impl Iterator<Item = ServiceId> {
298        self.subscriptions.servers()
299    }
300}
301
302impl<C, D, S> SerialReceiver<C, D, S>
303where
304    C: Clock,
305    S: SubscriptionManager<Subscription>,
306{
307    /// Finds and returns a subscription that matches the provided header (and, for service
308    /// transfers, has this node as its destination) if any exists
309    fn find_subscription_mut(
310        &mut self,
311        header: &Header<SerialTransport>,
312    ) -> Option<&mut Subscription> {
313        match header {
314            Header::Message(header) => self
315                .subscriptions
316                .find_message_subscription_mut(header.subject),
317            Header::Request(header) => {
318                if self.node_id == Some(header.destination) {
319                    self.subscriptions
320                        .find_request_subscription_mut(header.service)
321                } else {
322                    None
323                }
324            }
325            Header::Response(header) => {
326                if self.node_id == Some(header.destination) {
327                    self.subscriptions
328                        .find_response_subscription_mut(header.service)
329                } else {
330                    None
331                }
332            }
333        }
334    }
335
336    /// Returns true if this receiver has a matching subscription, its last transfer ID is less
337    /// than the provided header's transfer ID, and (for service transfers) this node is the
338    /// destination
339    fn is_interested(&self, header: &Header<SerialTransport>) -> Option<&Subscription> {
340        self.subscriptions
341            .find_subscription(header)
342            .and_then(|subscription| {
343                match header.source() {
344                    Some(source) => {
345                        match subscription.sessions.get(source) {
346                            Some(session) => {
347                                if session.last_transfer_id < *header.transfer_id() {
348                                    Some(subscription)
349                                } else {
350                                    // Duplicate transfer
351                                    None
352                                }
353                            }
354                            None => {
355                                // No session, accept
356                                Some(subscription)
357                            }
358                        }
359                    }
360                    None => {
361                        // Anonymous transfers can't take advantage of deduplication. Always accept.
362                        Some(subscription)
363                    }
364                }
365            })
366    }
367
368    fn complete_transfer(
369        &mut self,
370        header: Header<SerialTransport>,
371        payload: Vec<u8>,
372        crc: CrcTracker,
373    ) -> Option<Transfer<Vec<u8>, SerialTransport>> {
374        if !crc.correct() {
375            l0g::debug!("Dropping transfer due to incorrect transfer CRC");
376            return None;
377        }
378        // Record that this transfer was received
379        if let Some(subscription) = self.find_subscription_mut(&header) {
380            if let Some(source_node) = header.source() {
381                // This may fail to allocate memory.
382                // TODO: Handle allocation failure
383                let _ = subscription.sessions.insert(
384                    *source_node,
385                    Session {
386                        expiration_time: header.timestamp() + subscription.timeout,
387                        last_transfer_id: *header.transfer_id(),
388                    },
389                );
390            }
391            Some(Transfer {
392                header,
393                loopback: false,
394                payload,
395            })
396        } else {
397            // The subscription was removed while receiving the transfer
398            l0g::debug!("No matching subscription for header");
399            None
400        }
401    }
402}
403
404pub struct Subscription {
405    /// The maximum payload size, in bytes
406    payload_size_max: usize,
407    /// Transfer ID timeout
408    timeout: MicrosecondDuration32,
409    /// A session for each node (and an associated last transfer ID)
410    ///
411    /// This is used to remove duplicates
412    sessions: TryHashMap<SerialNodeId, Session>,
413}
414
415impl Subscription {
416    fn new(payload_size_max: usize, timeout: MicrosecondDuration32) -> Self {
417        Subscription {
418            payload_size_max,
419            timeout,
420            sessions: Default::default(),
421        }
422    }
423
424    /// Removes all sessions that have expired
425    fn clean_expired_sessions(&mut self, now: Microseconds32) {
426        loop {
427            let mut id_to_remove: Option<SerialNodeId> = None;
428            for (id, session) in self.sessions.iter() {
429                if session.expiration_time < now {
430                    id_to_remove = Some(*id);
431                }
432            }
433            match id_to_remove {
434                Some(id) => {
435                    self.sessions.remove(&id);
436                }
437                None => break,
438            }
439        }
440    }
441}
442
443struct Session {
444    expiration_time: Microseconds32,
445    last_transfer_id: SerialTransferId,
446}
447
448/// Receiver states
449enum State {
450    /// Waiting for the first zero byte
451    Idle,
452    /// Got a zero byte, waiting for the first non-zero byte to begin a transfer
453    BetweenTransfers,
454    /// Collecting the header
455    ///
456    /// When the final header byte arrives, it will be inspected
457    Header {
458        unescaper: Unescaper,
459        header: HeaderCollector,
460    },
461    /// Got a header, collecting payload bytes
462    ///
463    /// The capacity of the payload is set to the maximum payload length.
464    Payload {
465        unescaper: Unescaper,
466        header: Header<SerialTransport>,
467        /// CRC of the payload bytes so far (after COBS unescaping, not including the header)
468        ///
469        /// This may cover more bytes than the capacity of `payload`
470        crc: CrcTracker,
471        payload: Vec<u8>,
472    },
473}