postcard_rpc/host_client/serial.rs
1use std::{collections::VecDeque, future::Future};
2
3use cobs::encode_vec;
4use postcard_schema::Schema;
5use serde::de::DeserializeOwned;
6use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
7use tokio_serial::{SerialPortBuilderExt, SerialStream};
8
9use crate::{
10 accumulator::raw::{CobsAccumulator, FeedResult},
11 header::VarSeqKind,
12 host_client::{HostClient, WireRx, WireSpawn, WireTx},
13};
14
15/// # Serial Constructor Methods
16///
17/// These methods are used to create a new [HostClient] instance for use with tokio serial and cobs encoding.
18///
19/// **Requires feature**: `cobs-serial`
20impl<WireErr> HostClient<WireErr>
21where
22 WireErr: DeserializeOwned + Schema,
23{
24 /// Create a new [HostClient]
25 ///
26 /// `serial_path` is the path to the serial port used. `err_uri_path` is
27 /// the path associated with the `WireErr` message type.
28 ///
29 /// This constructor is available when the `cobs-serial` feature is enabled.
30 ///
31 /// ## Example
32 ///
33 /// ```rust,no_run
34 /// use postcard_rpc::host_client::HostClient;
35 /// use postcard_rpc::header::VarSeqKind;
36 /// use serde::{Serialize, Deserialize};
37 /// use postcard_schema::Schema;
38 ///
39 /// /// A "wire error" type your server can use to respond to any
40 /// /// kind of request, for example if deserializing a request fails
41 /// #[derive(Debug, PartialEq, Schema, Serialize, Deserialize)]
42 /// pub enum Error {
43 /// SomethingBad
44 /// }
45 ///
46 /// let client = HostClient::<Error>::new_serial_cobs(
47 /// // the serial port path
48 /// "/dev/ttyACM0",
49 /// // the URI/path for `Error` messages
50 /// "error",
51 /// // Outgoing queue depth in messages
52 /// 8,
53 /// // Baud rate of serial (does not generally matter for
54 /// // USB UART/CDC-ACM serial connections)
55 /// 115_200,
56 /// // Use one-byte sequence numbers
57 /// VarSeqKind::Seq1,
58 /// );
59 /// ```
60 pub fn try_new_serial_cobs(
61 serial_path: &str,
62 err_uri_path: &str,
63 outgoing_depth: usize,
64 baud: u32,
65 seq_no_kind: VarSeqKind,
66 ) -> Result<Self, String> {
67 let port = tokio_serial::new(serial_path, baud)
68 .open_native_async()
69 .map_err(|e| format!("Open Error: {e:?}"))?;
70
71 let (rx, tx) = tokio::io::split(port);
72
73 Ok(HostClient::new_with_wire(
74 SerialWireTx { tx },
75 SerialWireRx {
76 rx,
77 buf: Box::new([0u8; 1024]),
78 acc: Box::new(CobsAccumulator::new()),
79 pending: VecDeque::new(),
80 },
81 SerialSpawn,
82 seq_no_kind,
83 err_uri_path,
84 outgoing_depth,
85 ))
86 }
87
88 /// Create a new [HostClient]
89 ///
90 /// Panics if we couldn't open the serial port.
91 ///
92 /// See [`HostClient::try_new_serial_cobs`] for more details
93 pub fn new_serial_cobs(
94 serial_path: &str,
95 err_uri_path: &str,
96 outgoing_depth: usize,
97 baud: u32,
98 seq_no_kind: VarSeqKind,
99 ) -> Self {
100 Self::try_new_serial_cobs(serial_path, err_uri_path, outgoing_depth, baud, seq_no_kind)
101 .unwrap()
102 }
103}
104
105//////////////////////////////////////////////////////////////////////////////
106// Wire Interface Implementation
107//////////////////////////////////////////////////////////////////////////////
108
109/// Tokio Serial Wire Interface Implementor
110///
111/// Uses Tokio for spawning tasks
112struct SerialSpawn;
113
114impl WireSpawn for SerialSpawn {
115 fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
116 // Explicitly drop the joinhandle as it impls Future and this makes
117 // clippy mad if you just let it drop implicitly
118 core::mem::drop(tokio::task::spawn(fut));
119 }
120}
121
122/// Tokio Serial Wire Transmit Interface Implementor
123struct SerialWireTx {
124 // boq: Queue<Vec<u8>>,
125 tx: WriteHalf<SerialStream>,
126}
127
128#[derive(thiserror::Error, Debug)]
129enum SerialWireTxError {
130 #[error("Transfer Error on Send")]
131 Transfer(#[from] std::io::Error),
132}
133
134impl WireTx for SerialWireTx {
135 type Error = SerialWireTxError;
136
137 #[inline]
138 fn send(&mut self, data: Vec<u8>) -> impl Future<Output = Result<(), Self::Error>> + Send {
139 self.send_inner(data)
140 }
141}
142
143impl SerialWireTx {
144 async fn send_inner(&mut self, data: Vec<u8>) -> Result<(), SerialWireTxError> {
145 // Turn the serialized message into a COBS encoded message
146 //
147 // TODO: this is a little wasteful, data is already a vec,
148 // then we encode that to a second cobs-encoded vec. Oh well.
149 let mut msg = encode_vec(&data);
150 msg.push(0);
151
152 // And send it!
153 self.tx.write_all(&msg).await?;
154 Ok(())
155 }
156}
157
158/// NUSB Wire Receive Interface Implementor
159struct SerialWireRx {
160 rx: ReadHalf<SerialStream>,
161 buf: Box<[u8; 1024]>,
162 acc: Box<CobsAccumulator<1024>>,
163 pending: VecDeque<Vec<u8>>,
164}
165
166#[derive(thiserror::Error, Debug)]
167enum SerialWireRxError {
168 #[error("Transfer Error on Recv")]
169 Transfer(#[from] std::io::Error),
170}
171
172impl WireRx for SerialWireRx {
173 type Error = SerialWireRxError;
174
175 #[inline]
176 fn receive(&mut self) -> impl Future<Output = Result<Vec<u8>, Self::Error>> + Send {
177 self.recv_inner()
178 }
179}
180
181impl SerialWireRx {
182 async fn recv_inner(&mut self) -> Result<Vec<u8>, SerialWireRxError> {
183 // Receive until we've gotten AT LEAST one message, though we will continue
184 // consuming and buffering any read (partial) messages, to ensure they are not lost.
185 loop {
186 // Do we have any messages already prepared?
187 if let Some(p) = self.pending.pop_front() {
188 return Ok(p);
189 }
190
191 // Nothing in the pending queue, do a read to see if we can pull more
192 // data from the serial port
193 let used = self.rx.read(self.buf.as_mut_slice()).await?;
194
195 let mut window = &self.buf[..used];
196
197 // This buffering loop is necessary as a single `read()` might include
198 // more than one message
199 'cobs: while !window.is_empty() {
200 window = match self.acc.feed(window) {
201 // Consumed the whole USB frame
202 FeedResult::Consumed => break 'cobs,
203 // Ignore line errors
204 FeedResult::OverFull(new_wind) => {
205 tracing::warn!("Overflowed COBS accumulator");
206 new_wind
207 }
208 FeedResult::DeserError(new_wind) => {
209 tracing::warn!("COBS formatting error");
210 new_wind
211 }
212 // We got a message! Attempt to dispatch it
213 FeedResult::Success { data, remaining } => {
214 self.pending.push_back(data.to_vec());
215 remaining
216 }
217 };
218 }
219 }
220 }
221}