mini_rxtx/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3mod decoder;
4
5#[cfg(feature = "std")]
6pub use crate::decoder::StdDecoder;
7pub use crate::decoder::{Decoded, Decoder};
8
9use byteorder::ByteOrder;
10use heapless::spsc::Queue;
11
12#[derive(Debug)]
13#[cfg_attr(feature = "std", derive(thiserror::Error))]
14pub enum Error {
15    #[cfg_attr(feature = "std", error("serialization failed"))]
16    SerializeError(ssmarshal::Error),
17    #[cfg_attr(feature = "std", error("too long"))]
18    TooLong,
19    #[cfg_attr(feature = "std", error("already experienced an error previously"))]
20    PreviousError,
21    #[cfg_attr(feature = "std", error("incomplete"))]
22    Incomplete,
23    #[cfg_attr(feature = "std", error("extra characters found"))]
24    ExtraCharactersFound,
25}
26
27#[cfg(feature = "std")]
28fn _test_error_is_std() {
29    // Compile-time test to ensure Error implements std::error::Error trait.
30    fn implements<T: std::error::Error>() {}
31    implements::<Error>();
32}
33
34impl From<ssmarshal::Error> for Error {
35    fn from(orig: ssmarshal::Error) -> Error {
36        Error::SerializeError(orig)
37    }
38}
39
40pub struct MiniTxRx<RX, TX, const RX_SIZE: usize, const TX_SIZE: usize> {
41    rx: RX,
42    tx: TX,
43    in_bytes: Queue<u8, RX_SIZE>,
44    tx_queue: Queue<u8, TX_SIZE>,
45    held_byte: Option<u8>,
46}
47
48impl<RX, TX, const RX_SIZE: usize, const TX_SIZE: usize> MiniTxRx<RX, TX, RX_SIZE, TX_SIZE>
49where
50    RX: embedded_hal::serial::Read<u8>,
51    TX: embedded_hal::serial::Write<u8>,
52{
53    #[inline]
54    pub fn new(tx: TX, rx: RX) -> Self {
55        Self {
56            rx,
57            tx,
58            in_bytes: Queue::new(),
59            tx_queue: Queue::new(),
60            held_byte: None,
61        }
62    }
63
64    #[inline]
65    pub fn pump(&mut self) -> Option<u8> {
66        // Called with lock.
67
68        // Pump the output queue
69        self.pump_sender();
70
71        // Pump the input queue
72        self.in_bytes.dequeue()
73    }
74
75    #[inline]
76    pub fn send_msg(&mut self, m: SerializedMsg) -> Result<(), u8> {
77        // Called with lock.
78        let frame = &m.buf[0..m.total_bytes];
79        for byte in frame.iter() {
80            self.tx_queue.enqueue(*byte)?;
81        }
82        Ok(())
83    }
84
85    // inner function called by pump_sender
86    fn send_byte(&mut self, byte: u8) {
87        debug_assert!(self.held_byte.is_none());
88        match self.tx.write(byte) {
89            Ok(()) => {}
90            Err(nb::Error::WouldBlock) => self.held_byte = Some(byte),
91            Err(nb::Error::Other(_e)) => panic!("unreachable"), // not possible according to function definition
92        }
93    }
94
95    fn pump_sender(&mut self) {
96        if let Some(byte) = self.held_byte.take() {
97            self.send_byte(byte)
98        }
99        if self.held_byte.is_none() {
100            match self.tx_queue.dequeue() {
101                Some(byte) => self.send_byte(byte),
102                None => {}
103            }
104        }
105    }
106
107    #[inline]
108    pub fn on_interrupt(&mut self) -> Result<(), RX::Error> {
109        // This is called inside the interrupt handler and should do as little
110        // as possible.
111
112        // We have a new byte
113        match self.rx.read() {
114            Ok(byte) => {
115                #[cfg(feature = "print-defmt")]
116                defmt::trace!("got byte {}", byte);
117                self.in_bytes.enqueue(byte).expect("failed to enqueue byte");
118            }
119            Err(nb::Error::WouldBlock) => {} // do nothing, probably task called because of Txe event
120            Err(nb::Error::Other(e)) => {
121                return Err(e);
122            }
123        }
124        Ok(())
125    }
126
127    pub fn rx(&mut self) -> &mut RX {
128        &mut self.rx
129    }
130
131    pub fn tx(&mut self) -> &mut TX {
132        &mut self.tx
133    }
134}
135
136pub struct SerializedMsg<'a> {
137    buf: &'a [u8],
138    total_bytes: usize,
139}
140
141impl<'a> SerializedMsg<'a> {
142    pub fn framed_slice(&self) -> &[u8] {
143        &self.buf[0..self.total_bytes]
144    }
145}
146
147/// Encode messages into a byte buffer.
148///
149/// This is not part of MiniTxRx itself because we do not want to require
150/// access to resources when encoding bytes.
151#[inline]
152pub fn serialize_msg<'a, T: serde::ser::Serialize>(
153    msg: &T,
154    buf: &'a mut [u8],
155) -> Result<SerializedMsg<'a>, Error> {
156    let n_bytes = ssmarshal::serialize(&mut buf[2..], msg)?;
157    if n_bytes > u16::max_value() as usize {
158        return Err(Error::TooLong);
159    }
160    byteorder::LittleEndian::write_u16(&mut buf[0..2], n_bytes as u16);
161    Ok(SerializedMsg {
162        buf,
163        total_bytes: n_bytes + 2,
164    })
165}
166
167/// Encode messages into `Vec<u8>`
168///
169/// This is not part of MiniTxRx itself because we do not want to require
170/// access to resources when encoding bytes.
171#[cfg(feature = "std")]
172pub fn serialize_msg_owned<T: serde::ser::Serialize>(msg: &T) -> Result<Vec<u8>, Error> {
173    let mut dest = vec![0; 1024];
174    let n_bytes = serialize_msg(msg, &mut dest)?.total_bytes;
175    dest.truncate(n_bytes);
176    Ok(dest)
177}
178
179pub fn deserialize_owned_borrowed<T>(buf: &[u8], decode_buf: &mut [u8]) -> Result<T, Error>
180where
181    for<'de> T: serde::de::Deserialize<'de>,
182{
183    let mut decoder = Decoder::new(decode_buf);
184
185    let mut result: Option<T> = None;
186
187    for char_i in buf {
188        if result.is_some() {
189            // no more characters allowed
190            return Err(Error::ExtraCharactersFound);
191        }
192
193        match decoder.consume(*char_i) {
194            Decoded::Msg(msg) => {
195                result = Some(msg);
196            }
197            Decoded::FrameNotYetComplete => {}
198            Decoded::Error(e) => {
199                return Err(e);
200            }
201        }
202    }
203
204    match result {
205        Some(m) => Ok(m),
206        None => Err(Error::Incomplete),
207    }
208}
209
210#[cfg(feature = "std")]
211pub fn deserialize_owned<T>(buf: &[u8]) -> Result<T, Error>
212where
213    for<'de> T: serde::de::Deserialize<'de>,
214{
215    let mut decoder = StdDecoder::new(1024);
216
217    let mut result: Option<T> = None;
218
219    for char_i in buf {
220        if result.is_some() {
221            // no more characters allowed
222            return Err(Error::ExtraCharactersFound);
223        }
224
225        match decoder.consume(*char_i) {
226            Decoded::Msg(msg) => {
227                result = Some(msg);
228            }
229            Decoded::FrameNotYetComplete => {}
230            Decoded::Error(e) => {
231                return Err(e);
232            }
233        }
234    }
235
236    match result {
237        Some(m) => Ok(m),
238        None => Err(Error::Incomplete),
239    }
240}