use core::{fmt::Arguments, marker::PhantomData, ops::DerefMut};
use crate::{
header::{VarHeader, VarKey, VarKeyKind, VarSeq},
server::{WireRx, WireRxErrorKind, WireTx, WireTxErrorKind},
standard_icd::LoggingTopic,
Topic,
};
use cobs::decode;
use embassy_sync_0_7::{blocking_mutex::raw::RawMutex, mutex::Mutex};
use embedded_io_async_0_6::{Read, Write};
use postcard::{
ser_flavors::{Flavor, Slice},
Serializer,
};
use serde::Serialize;
use static_cell::{ConstStaticCell, StaticCell};
pub mod dispatch_impl {
pub use crate::server::impls::embassy_shared::embassy_spawn as spawn_fn;
pub type WireTxImpl<M, D> = super::EioWireTx<M, D>;
pub type WireRxImpl<D> = super::EioWireRx<D>;
pub type WireSpawnImpl = crate::server::impls::embassy_shared::EmbassyWireSpawn;
pub type WireRxBuf = &'static mut [u8];
}
pub use super::embassy_shared::embassy_spawn;
pub use super::embassy_shared::EmbassyWireSpawn as EioWireSpawn;
pub struct WireStorage<
Rx: Read,
Tx: Write,
M: RawMutex + 'static,
const RXB: usize,
const TXB: usize,
> {
bufs: ConstStaticCell<([u8; RXB], [u8; TXB])>,
tx: StaticCell<Mutex<M, EioWireTxInner<Tx>>>,
_rx: PhantomData<Rx>,
}
pub struct EioWireTx<R, Tx>
where
R: RawMutex + 'static,
Tx: Write + 'static,
{
t: &'static Mutex<R, EioWireTxInner<Tx>>,
}
pub struct EioWireRx<R: Read> {
remain: &'static mut [u8],
offset: usize,
rx: R,
}
struct EioWireTxInner<Tx: Write> {
t: Tx,
tx_buf: &'static mut [u8],
log_seq: u16,
}
impl<Rx: Read, Tx: Write, M: RawMutex + 'static, const RXB: usize, const TXB: usize>
WireStorage<Rx, Tx, M, RXB, TXB>
{
pub const fn new() -> Self {
Self {
bufs: ConstStaticCell::new(([0u8; RXB], [0u8; TXB])),
tx: StaticCell::new(),
_rx: PhantomData,
}
}
pub fn init(&'static self, r: Rx, t: Tx) -> Option<(EioWireRx<Rx>, EioWireTx<M, Tx>)> {
let (rxb, txb) = self.bufs.try_take()?;
let txi = self.tx.try_init(Mutex::new(EioWireTxInner {
t,
tx_buf: txb,
log_seq: 0,
}))?;
let rx = EioWireRx {
remain: rxb,
offset: 0,
rx: r,
};
let tx = EioWireTx { t: txi };
Some((rx, tx))
}
}
impl<Rx: Read, Tx: Write, M: RawMutex + 'static, const RXB: usize, const TXB: usize> Default
for WireStorage<Rx, Tx, M, RXB, TXB>
{
fn default() -> Self {
Self::new()
}
}
impl<R, Tx> Clone for EioWireTx<R, Tx>
where
R: RawMutex + 'static,
Tx: Write + 'static,
{
fn clone(&self) -> Self {
Self { t: self.t }
}
}
impl<R, Tx> WireTx for EioWireTx<R, Tx>
where
R: RawMutex + 'static,
Tx: Write + 'static,
{
type Error = WireTxErrorKind;
async fn send<T: Serialize + ?Sized>(
&self,
hdr: VarHeader,
msg: &T,
) -> Result<(), Self::Error> {
let mut guard = self.t.lock().await;
let EioWireTxInner { t, tx_buf, .. } = guard.deref_mut();
let mut flavor = flava_flav(tx_buf)?;
header_to_flavor(&hdr, &mut flavor)?;
let used = body_to_flavor(msg, flavor)?;
t.write_all(used)
.await
.map_err(|_| WireTxErrorKind::ConnectionClosed)?;
Ok(())
}
async fn send_raw(&self, buf: &[u8]) -> Result<(), Self::Error> {
let mut guard = self.t.lock().await;
let EioWireTxInner { t, tx_buf, .. } = guard.deref_mut();
let mut flavor = flava_flav(tx_buf)?;
flavor.try_extend(buf).map_err(|_| WireTxErrorKind::Other)?;
let used = flavor.finalize().map_err(|_| WireTxErrorKind::Other)?;
t.write_all(used)
.await
.map_err(|_| WireTxErrorKind::ConnectionClosed)?;
Ok(())
}
async fn send_log_str(&self, kkind: VarKeyKind, s: &str) -> Result<(), Self::Error> {
let mut guard = self.t.lock().await;
let EioWireTxInner { t, tx_buf, log_seq } = guard.deref_mut();
let mut flavor = flava_flav(tx_buf)?;
let key = match kkind {
VarKeyKind::Key1 => VarKey::Key1(LoggingTopic::TOPIC_KEY1),
VarKeyKind::Key2 => VarKey::Key2(LoggingTopic::TOPIC_KEY2),
VarKeyKind::Key4 => VarKey::Key4(LoggingTopic::TOPIC_KEY4),
VarKeyKind::Key8 => VarKey::Key8(LoggingTopic::TOPIC_KEY),
};
let ctr = *log_seq;
*log_seq = log_seq.wrapping_add(1);
let wh = VarHeader {
key,
seq_no: VarSeq::Seq2(ctr),
};
header_to_flavor(&wh, &mut flavor)?;
let used = body_to_flavor(s, flavor)?;
t.write_all(used)
.await
.map_err(|_| WireTxErrorKind::ConnectionClosed)?;
Ok(())
}
async fn send_log_fmt<'a>(
&self,
_kkind: VarKeyKind,
_a: Arguments<'a>,
) -> Result<(), Self::Error> {
todo!()
}
}
impl<R: Read> EioWireRx<R> {
pub fn new(rx: R, buffer: &'static mut [u8]) -> Self {
Self {
remain: buffer,
rx,
offset: 0,
}
}
}
impl<R: Read> WireRx for EioWireRx<R> {
type Error = WireRxErrorKind;
async fn receive<'a>(&mut self, buf: &'a mut [u8]) -> Result<&'a mut [u8], Self::Error> {
if self.offset >= self.remain.len() {
self.offset = 0;
}
if self.offset != 0 {
let r = &mut self.remain[..self.offset];
if let Some(pos) = r.iter().position(|b| *b == 0) {
let (now, later) = r.split_at(pos + 1);
let res = decode(now, buf);
let after_len = later.len();
copy_backwards(r, pos + 1);
self.offset = after_len;
return match res {
Ok(rpt) => Ok(&mut buf[..rpt.frame_size()]),
Err(cobs::DecodeError::TargetBufTooSmall) => {
Err(WireRxErrorKind::ReceivedMessageTooLarge)
}
Err(_) => Err(WireRxErrorKind::Other),
};
}
}
let Self { remain, offset, rx } = self;
loop {
if *offset >= remain.len() {
*offset = 0;
return Err(WireRxErrorKind::ReceivedMessageTooLarge);
}
let (old, new) = remain.split_at_mut(*offset);
let got = rx.read(new).await;
let got = match got {
Ok(0) => {
*offset = 0;
return Err(WireRxErrorKind::ConnectionClosed);
}
Ok(n) => n,
Err(_) => {
*offset = 0;
return Err(WireRxErrorKind::Other);
}
};
let (left, _right) = new.split_at(got);
let Some(pos) = left.iter().position(|b| *b == 0) else {
*offset += got;
continue;
};
let old_left_len = old.len() + left.len();
let lrstart = old.len() + (pos + 1);
let lrlen = left.len() - (pos + 1);
let res = decode(&remain[..lrstart], buf);
copy_backwards(&mut remain[..old_left_len], lrstart);
self.offset = lrlen;
return match res {
Ok(rpt) => Ok(&mut buf[..rpt.frame_size()]),
Err(_e) => Err(WireRxErrorKind::ReceivedMessageTooLarge),
};
}
}
}
fn copy_backwards(buf: &mut [u8], start: usize) {
if buf.len() <= start {
return;
}
let count = buf.len() - start;
let base = buf.as_mut_ptr();
unsafe {
core::ptr::copy(base.add(start).cast_const(), base, count);
}
}
fn flava_flav(buf: &'_ mut [u8]) -> Result<Cobs<Slice<'_>>, WireTxErrorKind> {
Cobs::try_new(Slice::new(buf)).map_err(|_| WireTxErrorKind::Other)
}
fn header_to_flavor(hdr: &VarHeader, flava: &mut Cobs<Slice<'_>>) -> Result<(), WireTxErrorKind> {
let mut hdr_buf = [0u8; 1 + 4 + 8];
let (used, _unused) = hdr
.write_to_slice(&mut hdr_buf)
.ok_or(WireTxErrorKind::Other)?;
flava.try_extend(used).map_err(|_| WireTxErrorKind::Other)?;
Ok(())
}
fn body_to_flavor<'a, T: Serialize + ?Sized>(
msg: &T,
flava: Cobs<Slice<'a>>,
) -> Result<&'a [u8], WireTxErrorKind> {
let mut serializer = Serializer { output: flava };
msg.serialize(&mut serializer)
.map_err(|_| WireTxErrorKind::Other)?;
let used = serializer
.output
.finalize()
.map_err(|_| WireTxErrorKind::Other)?;
Ok(used)
}
use core::ops::IndexMut;
struct Cobs<B>
where
B: Flavor + IndexMut<usize, Output = u8>,
{
flav: B,
cobs: cobs::EncoderState,
}
impl<B> Cobs<B>
where
B: Flavor + IndexMut<usize, Output = u8>,
{
fn try_new(mut bee: B) -> postcard::Result<Self> {
bee.try_push(0)
.map_err(|_| postcard::Error::SerializeBufferFull)?;
Ok(Self {
flav: bee,
cobs: cobs::EncoderState::default(),
})
}
}
impl<B> Flavor for Cobs<B>
where
B: Flavor + IndexMut<usize, Output = u8>,
{
type Output = <B as Flavor>::Output;
#[inline(always)]
fn try_push(&mut self, data: u8) -> postcard::Result<()> {
use cobs::PushResult::*;
match self.cobs.push(data) {
AddSingle(n) => self.flav.try_push(n),
ModifyFromStartAndSkip((idx, mval)) => {
self.flav[idx] = mval;
self.flav.try_push(0)
}
ModifyFromStartAndPushAndSkip((idx, mval, nval)) => {
self.flav[idx] = mval;
self.flav.try_push(nval)?;
self.flav.try_push(0)
}
}
}
fn finalize(mut self) -> postcard::Result<Self::Output> {
let (idx, mval) = self.cobs.finalize();
self.flav[idx] = mval;
self.flav.try_push(0)?;
self.flav.finalize()
}
}