postcard_rpc/host_client/
serial.rs

1use std::{collections::VecDeque, future::Future};
2
3use cobs::encode_vec;
4use postcard_schema::Schema;
5use serde::de::DeserializeOwned;
6use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
7use tokio_serial::{SerialPortBuilderExt, SerialStream};
8
9use crate::{
10    accumulator::raw::{CobsAccumulator, FeedResult},
11    header::VarSeqKind,
12    host_client::{HostClient, WireRx, WireSpawn, WireTx},
13};
14
15/// # Serial Constructor Methods
16///
17/// These methods are used to create a new [HostClient] instance for use with tokio serial and cobs encoding.
18///
19/// **Requires feature**: `cobs-serial`
20impl<WireErr> HostClient<WireErr>
21where
22    WireErr: DeserializeOwned + Schema,
23{
24    /// Create a new [HostClient]
25    ///
26    /// `serial_path` is the path to the serial port used. `err_uri_path` is
27    /// the path associated with the `WireErr` message type.
28    ///
29    /// This constructor is available when the `cobs-serial` feature is enabled.
30    ///
31    /// ## Example
32    ///
33    /// ```rust,no_run
34    /// use postcard_rpc::host_client::HostClient;
35    /// use postcard_rpc::header::VarSeqKind;
36    /// use serde::{Serialize, Deserialize};
37    /// use postcard_schema::Schema;
38    ///
39    /// /// A "wire error" type your server can use to respond to any
40    /// /// kind of request, for example if deserializing a request fails
41    /// #[derive(Debug, PartialEq, Schema, Serialize, Deserialize)]
42    /// pub enum Error {
43    ///    SomethingBad
44    /// }
45    ///
46    /// let client = HostClient::<Error>::new_serial_cobs(
47    ///     // the serial port path
48    ///     "/dev/ttyACM0",
49    ///     // the URI/path for `Error` messages
50    ///     "error",
51    ///     // Outgoing queue depth in messages
52    ///     8,
53    ///     // Baud rate of serial (does not generally matter for
54    ///     //  USB UART/CDC-ACM serial connections)
55    ///     115_200,
56    ///     // Use one-byte sequence numbers
57    ///     VarSeqKind::Seq1,
58    /// );
59    /// ```
60    pub fn try_new_serial_cobs(
61        serial_path: &str,
62        err_uri_path: &str,
63        outgoing_depth: usize,
64        baud: u32,
65        seq_no_kind: VarSeqKind,
66    ) -> Result<Self, String> {
67        let port = tokio_serial::new(serial_path, baud)
68            .open_native_async()
69            .map_err(|e| format!("Open Error: {e:?}"))?;
70
71        let (rx, tx) = tokio::io::split(port);
72
73        Ok(HostClient::new_with_wire(
74            SerialWireTx { tx },
75            SerialWireRx {
76                rx,
77                buf: Box::new([0u8; 1024]),
78                acc: Box::new(CobsAccumulator::new()),
79                pending: VecDeque::new(),
80            },
81            SerialSpawn,
82            seq_no_kind,
83            err_uri_path,
84            outgoing_depth,
85        ))
86    }
87
88    /// Create a new [HostClient]
89    ///
90    /// Panics if we couldn't open the serial port.
91    ///
92    /// See [`HostClient::try_new_serial_cobs`] for more details
93    pub fn new_serial_cobs(
94        serial_path: &str,
95        err_uri_path: &str,
96        outgoing_depth: usize,
97        baud: u32,
98        seq_no_kind: VarSeqKind,
99    ) -> Self {
100        Self::try_new_serial_cobs(serial_path, err_uri_path, outgoing_depth, baud, seq_no_kind)
101            .unwrap()
102    }
103}
104
105//////////////////////////////////////////////////////////////////////////////
106// Wire Interface Implementation
107//////////////////////////////////////////////////////////////////////////////
108
109/// Tokio Serial Wire Interface Implementor
110///
111/// Uses Tokio for spawning tasks
112struct SerialSpawn;
113
114impl WireSpawn for SerialSpawn {
115    fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
116        // Explicitly drop the joinhandle as it impls Future and this makes
117        // clippy mad if you just let it drop implicitly
118        core::mem::drop(tokio::task::spawn(fut));
119    }
120}
121
122/// Tokio Serial Wire Transmit Interface Implementor
123struct SerialWireTx {
124    // boq: Queue<Vec<u8>>,
125    tx: WriteHalf<SerialStream>,
126}
127
128#[derive(thiserror::Error, Debug)]
129enum SerialWireTxError {
130    #[error("Transfer Error on Send")]
131    Transfer(#[from] std::io::Error),
132}
133
134impl WireTx for SerialWireTx {
135    type Error = SerialWireTxError;
136
137    #[inline]
138    fn send(&mut self, data: Vec<u8>) -> impl Future<Output = Result<(), Self::Error>> + Send {
139        self.send_inner(data)
140    }
141}
142
143impl SerialWireTx {
144    async fn send_inner(&mut self, data: Vec<u8>) -> Result<(), SerialWireTxError> {
145        // Turn the serialized message into a COBS encoded message
146        //
147        // TODO: this is a little wasteful, data is already a vec,
148        // then we encode that to a second cobs-encoded vec. Oh well.
149        let mut msg = encode_vec(&data);
150        msg.push(0);
151
152        // And send it!
153        self.tx.write_all(&msg).await?;
154        Ok(())
155    }
156}
157
158/// NUSB Wire Receive Interface Implementor
159struct SerialWireRx {
160    rx: ReadHalf<SerialStream>,
161    buf: Box<[u8; 1024]>,
162    acc: Box<CobsAccumulator<1024>>,
163    pending: VecDeque<Vec<u8>>,
164}
165
166#[derive(thiserror::Error, Debug)]
167enum SerialWireRxError {
168    #[error("Transfer Error on Recv")]
169    Transfer(#[from] std::io::Error),
170}
171
172impl WireRx for SerialWireRx {
173    type Error = SerialWireRxError;
174
175    #[inline]
176    fn receive(&mut self) -> impl Future<Output = Result<Vec<u8>, Self::Error>> + Send {
177        self.recv_inner()
178    }
179}
180
181impl SerialWireRx {
182    async fn recv_inner(&mut self) -> Result<Vec<u8>, SerialWireRxError> {
183        // Receive until we've gotten AT LEAST one message, though we will continue
184        // consuming and buffering any read (partial) messages, to ensure they are not lost.
185        loop {
186            // Do we have any messages already prepared?
187            if let Some(p) = self.pending.pop_front() {
188                return Ok(p);
189            }
190
191            // Nothing in the pending queue, do a read to see if we can pull more
192            // data from the serial port
193            let used = self.rx.read(self.buf.as_mut_slice()).await?;
194
195            let mut window = &self.buf[..used];
196
197            // This buffering loop is necessary as a single `read()` might include
198            // more than one message
199            'cobs: while !window.is_empty() {
200                window = match self.acc.feed(window) {
201                    // Consumed the whole USB frame
202                    FeedResult::Consumed => break 'cobs,
203                    // Ignore line errors
204                    FeedResult::OverFull(new_wind) => {
205                        tracing::warn!("Overflowed COBS accumulator");
206                        new_wind
207                    }
208                    FeedResult::DeserError(new_wind) => {
209                        tracing::warn!("COBS formatting error");
210                        new_wind
211                    }
212                    // We got a message! Attempt to dispatch it
213                    FeedResult::Success { data, remaining } => {
214                        self.pending.push_back(data.to_vec());
215                        remaining
216                    }
217                };
218            }
219        }
220    }
221}