use std::{collections::VecDeque, future::Future};
use cobs::encode_vec;
use postcard_schema::Schema;
use serde::de::DeserializeOwned;
use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
use tokio_serial::{SerialPortBuilderExt, SerialStream};
use crate::{
accumulator::raw::{CobsAccumulator, FeedResult},
header::VarSeqKind,
host_client::{HostClient, WireRx, WireSpawn, WireTx},
};
impl<WireErr> HostClient<WireErr>
where
WireErr: DeserializeOwned + Schema,
{
pub fn try_new_serial_cobs(
serial_path: &str,
err_uri_path: &str,
outgoing_depth: usize,
baud: u32,
seq_no_kind: VarSeqKind,
) -> Result<Self, String> {
let port = tokio_serial::new(serial_path, baud)
.open_native_async()
.map_err(|e| format!("Open Error: {e:?}"))?;
let (rx, tx) = tokio::io::split(port);
Ok(HostClient::new_with_wire(
SerialWireTx { tx },
SerialWireRx {
rx,
buf: Box::new([0u8; 1024]),
acc: Box::new(CobsAccumulator::new()),
pending: VecDeque::new(),
},
SerialSpawn,
seq_no_kind,
err_uri_path,
outgoing_depth,
))
}
pub fn new_serial_cobs(
serial_path: &str,
err_uri_path: &str,
outgoing_depth: usize,
baud: u32,
seq_no_kind: VarSeqKind,
) -> Self {
Self::try_new_serial_cobs(serial_path, err_uri_path, outgoing_depth, baud, seq_no_kind)
.unwrap()
}
}
struct SerialSpawn;
impl WireSpawn for SerialSpawn {
fn spawn(&mut self, fut: impl Future<Output = ()> + Send + 'static) {
core::mem::drop(tokio::task::spawn(fut));
}
}
struct SerialWireTx {
tx: WriteHalf<SerialStream>,
}
#[derive(thiserror::Error, Debug)]
enum SerialWireTxError {
#[error("Transfer Error on Send")]
Transfer(#[from] std::io::Error),
}
impl WireTx for SerialWireTx {
type Error = SerialWireTxError;
#[inline]
fn send(&mut self, data: Vec<u8>) -> impl Future<Output = Result<(), Self::Error>> + Send {
self.send_inner(data)
}
}
impl SerialWireTx {
async fn send_inner(&mut self, data: Vec<u8>) -> Result<(), SerialWireTxError> {
let mut msg = encode_vec(&data);
msg.push(0);
self.tx.write_all(&msg).await?;
Ok(())
}
}
struct SerialWireRx {
rx: ReadHalf<SerialStream>,
buf: Box<[u8; 1024]>,
acc: Box<CobsAccumulator<1024>>,
pending: VecDeque<Vec<u8>>,
}
#[derive(thiserror::Error, Debug)]
enum SerialWireRxError {
#[error("Transfer Error on Recv")]
Transfer(#[from] std::io::Error),
}
impl WireRx for SerialWireRx {
type Error = SerialWireRxError;
#[inline]
fn receive(&mut self) -> impl Future<Output = Result<Vec<u8>, Self::Error>> + Send {
self.recv_inner()
}
}
impl SerialWireRx {
async fn recv_inner(&mut self) -> Result<Vec<u8>, SerialWireRxError> {
loop {
if let Some(p) = self.pending.pop_front() {
return Ok(p);
}
let used = self.rx.read(self.buf.as_mut_slice()).await?;
let mut window = &self.buf[..used];
'cobs: while !window.is_empty() {
window = match self.acc.feed(window) {
FeedResult::Consumed => break 'cobs,
FeedResult::OverFull(new_wind) => {
tracing::warn!("Overflowed COBS accumulator");
new_wind
}
FeedResult::DeserError(new_wind) => {
tracing::warn!("COBS formatting error");
new_wind
}
FeedResult::Success { data, remaining } => {
if data.len() >= 9 {
self.pending.push_back(data.to_vec());
} else {
tracing::warn!("Ignoring too-short message: {} bytes", data.len());
}
remaining
}
};
}
}
}
}