#![allow(async_fn_in_trait)]
#[doc(hidden)]
pub mod dispatch_macro;
pub mod impls;
use core::{fmt::Arguments, ops::DerefMut};
use postcard_schema::Schema;
use serde::Serialize;
use crate::{
header::{VarHeader, VarKey, VarKeyKind, VarSeq},
DeviceMap, Key, TopicDirection,
};
pub trait WireTx {
type Error: AsWireTxErrorKind;
async fn send<T: Serialize + ?Sized>(&self, hdr: VarHeader, msg: &T)
-> Result<(), Self::Error>;
async fn send_raw(&self, buf: &[u8]) -> Result<(), Self::Error>;
async fn send_log_str(&self, kkind: VarKeyKind, s: &str) -> Result<(), Self::Error>;
async fn send_log_fmt<'a>(
&self,
kkind: VarKeyKind,
a: Arguments<'a>,
) -> Result<(), Self::Error>;
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub enum WireTxErrorKind {
ConnectionClosed,
Other,
Timeout,
}
pub trait AsWireTxErrorKind {
fn as_kind(&self) -> WireTxErrorKind;
}
impl AsWireTxErrorKind for WireTxErrorKind {
#[inline]
fn as_kind(&self) -> WireTxErrorKind {
*self
}
}
pub trait WireRx {
type Error: AsWireRxErrorKind;
async fn receive<'a>(&mut self, buf: &'a mut [u8]) -> Result<&'a mut [u8], Self::Error>;
}
#[derive(Debug, Clone, Copy)]
#[non_exhaustive]
pub enum WireRxErrorKind {
ConnectionClosed,
ReceivedMessageTooLarge,
Other,
}
pub trait AsWireRxErrorKind {
fn as_kind(&self) -> WireRxErrorKind;
}
impl AsWireRxErrorKind for WireRxErrorKind {
#[inline]
fn as_kind(&self) -> WireRxErrorKind {
*self
}
}
pub trait WireSpawn: Clone {
type Error;
type Info;
fn info(&self) -> &Self::Info;
}
#[derive(Clone)]
pub struct Sender<Tx: WireTx> {
tx: Tx,
kkind: VarKeyKind,
}
impl<Tx: WireTx> Sender<Tx> {
pub fn new(tx: Tx, kkind: VarKeyKind) -> Self {
Self { tx, kkind }
}
#[inline]
pub async fn reply<E>(&self, seq_no: VarSeq, resp: &E::Response) -> Result<(), Tx::Error>
where
E: crate::Endpoint,
E::Response: Serialize + Schema,
{
let mut key = VarKey::Key8(E::RESP_KEY);
key.shrink_to(self.kkind);
let wh = VarHeader { key, seq_no };
self.tx.send::<E::Response>(wh, resp).await
}
#[inline]
pub async fn reply_keyed<T>(&self, seq_no: VarSeq, key: Key, resp: &T) -> Result<(), Tx::Error>
where
T: ?Sized,
T: Serialize + Schema,
{
let mut key = VarKey::Key8(key);
key.shrink_to(self.kkind);
let wh = VarHeader { key, seq_no };
self.tx.send::<T>(wh, resp).await
}
#[inline]
pub async fn publish<T>(&self, seq_no: VarSeq, msg: &T::Message) -> Result<(), Tx::Error>
where
T: ?Sized,
T: crate::Topic,
T::Message: Serialize + Schema,
{
let mut key = VarKey::Key8(T::TOPIC_KEY);
key.shrink_to(self.kkind);
let wh = VarHeader { key, seq_no };
self.tx.send::<T::Message>(wh, msg).await
}
#[inline]
pub async fn log_str(&self, msg: &str) -> Result<(), Tx::Error> {
self.tx.send_log_str(self.kkind, msg).await
}
#[inline]
pub async fn log_fmt(&self, msg: Arguments<'_>) -> Result<(), Tx::Error> {
self.tx.send_log_fmt(self.kkind, msg).await
}
pub async fn error(
&self,
seq_no: VarSeq,
error: crate::standard_icd::WireError,
) -> Result<(), Tx::Error> {
self.reply_keyed(seq_no, crate::standard_icd::ERROR_KEY, &error)
.await
}
pub async fn send_all_schemas(
&self,
hdr: &VarHeader,
device_map: &DeviceMap,
) -> Result<(), Tx::Error> {
#[cfg(feature = "use-std")]
use crate::standard_icd::OwnedSchemaData as SchemaData;
#[cfg(not(feature = "use-std"))]
use crate::standard_icd::SchemaData;
use crate::standard_icd::{GetAllSchemaDataTopic, GetAllSchemasEndpoint, SchemaTotals};
let mut msg_ctr = 0;
let mut err_ctr = 0;
for ty in device_map.types {
let res = self
.publish::<GetAllSchemaDataTopic>(
VarSeq::Seq2(msg_ctr),
&SchemaData::Type((*ty).into()),
)
.await;
if res.is_err() {
err_ctr += 1;
};
msg_ctr += 1;
}
for ep in device_map.endpoints {
let res = self
.publish::<GetAllSchemaDataTopic>(
VarSeq::Seq2(msg_ctr),
&SchemaData::Endpoint {
path: ep.0.into(),
request_key: ep.1,
response_key: ep.2,
},
)
.await;
if res.is_err() {
err_ctr += 1;
}
msg_ctr += 1;
}
for to in device_map.topics_out {
let res = self
.publish::<GetAllSchemaDataTopic>(
VarSeq::Seq2(msg_ctr),
&SchemaData::Topic {
direction: TopicDirection::ToClient,
path: to.0.into(),
key: to.1,
},
)
.await;
if res.is_err() {
err_ctr += 1;
}
msg_ctr += 1;
}
for ti in device_map.topics_in {
let res = self
.publish::<GetAllSchemaDataTopic>(
VarSeq::Seq2(msg_ctr),
&SchemaData::Topic {
direction: TopicDirection::ToServer,
path: ti.0.into(),
key: ti.1,
},
)
.await;
if res.is_err() {
err_ctr += 1;
}
msg_ctr += 1;
}
self.reply::<GetAllSchemasEndpoint>(
hdr.seq_no,
&SchemaTotals {
types_sent: device_map.types.len() as u32,
endpoints_sent: device_map.endpoints.len() as u32,
topics_in_sent: device_map.topics_in.len() as u32,
topics_out_sent: device_map.topics_out.len() as u32,
errors: err_ctr,
},
)
.await?;
Ok(())
}
}
pub struct Server<Tx, Rx, Buf, D>
where
Tx: WireTx,
Rx: WireRx,
Buf: DerefMut<Target = [u8]>,
D: Dispatch<Tx = Tx>,
{
tx: Sender<Tx>,
rx: Rx,
buf: Buf,
dis: D,
}
pub enum ServerError<Tx, Rx>
where
Tx: WireTx,
Rx: WireRx,
{
TxFatal(Tx::Error),
RxFatal(Rx::Error),
}
impl<Tx, Rx, Buf, D> Server<Tx, Rx, Buf, D>
where
Tx: WireTx,
Rx: WireRx,
Buf: DerefMut<Target = [u8]>,
D: Dispatch<Tx = Tx>,
{
pub fn new(tx: Tx, rx: Rx, buf: Buf, dis: D, kkind: VarKeyKind) -> Self {
Self {
tx: Sender { tx, kkind },
rx,
buf,
dis,
}
}
pub async fn run(&mut self) -> ServerError<Tx, Rx> {
loop {
let Self {
tx,
rx,
buf,
dis: d,
} = self;
let used = match rx.receive(buf).await {
Ok(u) => u,
Err(e) => {
let kind = e.as_kind();
match kind {
WireRxErrorKind::ConnectionClosed => return ServerError::RxFatal(e),
WireRxErrorKind::ReceivedMessageTooLarge => continue,
WireRxErrorKind::Other => continue,
}
}
};
let Some((hdr, body)) = VarHeader::take_from_slice(used) else {
continue;
};
let fut = d.handle(tx, &hdr, body);
if let Err(e) = fut.await {
let kind = e.as_kind();
match kind {
WireTxErrorKind::ConnectionClosed => return ServerError::TxFatal(e),
WireTxErrorKind::Other => {}
WireTxErrorKind::Timeout => return ServerError::TxFatal(e),
}
}
}
}
}
impl<Tx, Rx, Buf, D> Server<Tx, Rx, Buf, D>
where
Tx: WireTx + Clone,
Rx: WireRx,
Buf: DerefMut<Target = [u8]>,
D: Dispatch<Tx = Tx>,
{
pub fn sender(&self) -> Sender<Tx> {
self.tx.clone()
}
}
pub trait Dispatch {
type Tx: WireTx;
fn min_key_len(&self) -> VarKeyKind;
async fn handle(
&mut self,
tx: &Sender<Self::Tx>,
hdr: &VarHeader,
body: &[u8],
) -> Result<(), <Self::Tx as WireTx>::Error>;
}
pub trait SpawnContext {
type SpawnCtxt: 'static;
fn spawn_ctxt(&mut self) -> Self::SpawnCtxt;
}
macro_rules! keycheck {
(
$lists:ident;
$($num:literal => $func:ident;)*
) => {
$(
{
let mut i = 0;
let mut good = true;
'dupe: while i < $lists.len() {
let ilist = $lists[i];
let mut j = 0;
while j < ilist.len() {
let jkey = ilist[j];
let akey = $func(jkey);
let mut x = i;
while x < $lists.len() {
let xlist = $lists[x];
let mut y = if x == i {
j + 1
} else {
0
};
while y < xlist.len() {
let ykey = xlist[y];
let bkey = $func(ykey);
if akey == bkey {
good = false;
break 'dupe;
}
y += 1;
}
x += 1;
}
j += 1;
}
i += 1;
}
if good {
return $num;
}
}
)*
};
}
pub const fn min_key_needed(lists: &[&[Key]]) -> usize {
const fn one(key: Key) -> u8 {
crate::Key1::from_key8(key).0
}
const fn two(key: Key) -> u16 {
u16::from_le_bytes(crate::Key2::from_key8(key).0)
}
const fn four(key: Key) -> u32 {
u32::from_le_bytes(crate::Key4::from_key8(key).0)
}
const fn eight(key: Key) -> u64 {
u64::from_le_bytes(key.0)
}
keycheck! {
lists;
1 => one;
2 => two;
4 => four;
8 => eight;
};
panic!("Collision requiring more than 8 bytes!");
}
#[cfg(test)]
mod test {
use crate::{server::min_key_needed, Key};
#[test]
fn min_test_1() {
const MINA: usize = min_key_needed(&[&[
unsafe { Key::from_bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) },
unsafe { Key::from_bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01]) },
]]);
assert_eq!(1, MINA);
const MINB: usize = min_key_needed(&[
&[unsafe { Key::from_bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) }],
&[unsafe { Key::from_bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01]) }],
]);
assert_eq!(1, MINB);
}
#[test]
fn min_test_2() {
const MINA: usize = min_key_needed(&[&[
unsafe { Key::from_bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) },
unsafe { Key::from_bytes([0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01]) },
]]);
assert_eq!(2, MINA);
const MINB: usize = min_key_needed(&[
&[unsafe { Key::from_bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) }],
&[unsafe { Key::from_bytes([0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01]) }],
]);
assert_eq!(2, MINB);
}
#[test]
fn min_test_4() {
const MINA: usize = min_key_needed(&[&[
unsafe { Key::from_bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) },
unsafe { Key::from_bytes([0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01]) },
]]);
assert_eq!(4, MINA);
const MINB: usize = min_key_needed(&[
&[unsafe { Key::from_bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) }],
&[unsafe { Key::from_bytes([0x00, 0x01, 0x00, 0x01, 0x00, 0x01, 0x00, 0x01]) }],
]);
assert_eq!(4, MINB);
}
#[test]
fn min_test_8() {
const MINA: usize = min_key_needed(&[&[
unsafe { Key::from_bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) },
unsafe { Key::from_bytes([0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01]) },
]]);
assert_eq!(8, MINA);
const MINB: usize = min_key_needed(&[
&[unsafe { Key::from_bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) }],
&[unsafe { Key::from_bytes([0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01]) }],
]);
assert_eq!(8, MINB);
}
}