use core::fmt::Display;
use core::future::Future;
use core::pin::pin;
use embassy_futures::select::{select, select_slice};
use crate::crypto::Crypto;
use crate::dm::clusters::net_comm;
use crate::dm::networks::wireless::NoopWirelessNetCtl;
use crate::dm::DataModel;
use crate::error::Error;
use crate::im::busy::BusyInteractionModel;
use crate::im::events::DEFAULT_MAX_EVENTS_BUF_SIZE;
use crate::im::subscriptions::DEFAULT_MAX_SUBSCRIPTIONS;
use crate::im::{IMBuffer, InteractionModel, PROTO_ID_INTERACTION_MODEL};
use crate::persist::KvBlobStoreAccess;
use crate::sc::busy::BusySecureChannel;
use crate::sc::SecureChannel;
use crate::transport::exchange::Exchange;
use crate::utils::select::Coalesce;
use crate::utils::storage::pooled::Buffers;
use crate::Matter;
const RESPOND_BUSY_MS: u32 = 500;
pub trait ExchangeHandler {
async fn handle(&self, exchange: Exchange<'_>) -> Result<(), Error>;
}
impl<T> ExchangeHandler for &T
where
T: ExchangeHandler,
{
fn handle(&self, exchange: Exchange<'_>) -> impl Future<Output = Result<(), Error>> {
(*self).handle(exchange)
}
}
pub struct ChainedExchangeHandler<H, T> {
pub handler_proto: u16,
pub handler: H,
pub next: T,
}
impl<H, T> ChainedExchangeHandler<H, T> {
pub const fn new(handler_proto: u16, handler: H, next: T) -> Self {
Self {
handler_proto,
handler,
next,
}
}
pub const fn chain<H2>(
self,
handler_proto: u16,
handler: H2,
) -> ChainedExchangeHandler<H2, Self> {
ChainedExchangeHandler::new(handler_proto, handler, self)
}
}
impl<H, T> ExchangeHandler for ChainedExchangeHandler<H, T>
where
H: ExchangeHandler,
T: ExchangeHandler,
{
async fn handle(&self, mut exchange: Exchange<'_>) -> Result<(), Error> {
exchange.recv_fetch().await?;
let proto_id = exchange.rx()?.meta().proto_id;
if proto_id == self.handler_proto {
self.handler.handle(exchange).await
} else {
self.next.handle(exchange).await
}
}
}
pub struct EmptyExchangeHandler;
impl ExchangeHandler for EmptyExchangeHandler {
async fn handle(&self, _exchange: Exchange<'_>) -> Result<(), Error> {
Ok(())
}
}
pub struct Responder<'a, T> {
name: &'a str,
handler: T,
matter: &'a Matter<'a>,
respond_after_ms: u32,
}
impl<'a, T> Responder<'a, T>
where
T: ExchangeHandler,
{
#[inline(always)]
pub const fn new(
name: &'a str,
handler: T,
matter: &'a Matter<'a>,
respond_after_ms: u32,
) -> Self {
Self {
name,
handler,
matter,
respond_after_ms,
}
}
pub const fn name(&self) -> &str {
self.name
}
pub fn handler(&self) -> &T {
&self.handler
}
pub async fn run<const N: usize>(&self) -> Result<(), Error> {
info!("{}: Creating {} handlers", self.name, N);
let mut handlers = heapless::Vec::<_, N>::new();
debug!(
"{}: Handlers size: {}B",
self.name,
core::mem::size_of_val(&handlers)
);
for handler_id in 0..N {
unwrap!(handlers.push(self.handle(handler_id)).map_err(|_| ())); }
let handlers = pin!(handlers);
let handlers = unsafe { handlers.map_unchecked_mut(|handlers| handlers.as_mut_slice()) };
select_slice(handlers).await.0
}
#[inline(always)]
pub async fn handle(&self, handler_id: impl Display) -> Result<(), Error> {
loop {
let _ = self.respond_once(&handler_id).await;
}
}
#[inline(always)]
pub async fn respond_once(&self, handler_id: impl Display) -> Result<(), Error> {
let exchange = Exchange::accept_after(self.matter, self.respond_after_ms).await?;
let exchange_id = exchange.id();
if self.log_warn() {
warn!(
"{}: Handler {} / exchange {}: Starting",
self.name,
display2format!(&handler_id),
exchange_id
);
} else {
debug!(
"{}: Handler {} / exchange {}: Starting",
self.name,
display2format!(&handler_id),
exchange_id
);
}
let result = self.handler.handle(exchange).await;
if let Err(err) = &result {
error!(
"{}: Handler {} / exchange {}: Abandoned because of error {:?}",
self.name,
display2format!(&handler_id),
exchange_id,
err
);
} else if self.log_warn() {
warn!(
"{}: Handler {} / exchange {}: Completed",
self.name,
display2format!(&handler_id),
exchange_id
);
} else {
debug!(
"{}: Handler {} / exchange {}: Completed",
self.name,
display2format!(&handler_id),
exchange_id
);
}
result
}
fn log_warn(&self) -> bool {
self.respond_after_ms > 0
}
}
pub type DefaultExchangeHandler<
'd,
'a,
C,
B,
T,
K,
N,
NC = NoopWirelessNetCtl,
const NS: usize = DEFAULT_MAX_SUBSCRIPTIONS,
const NE: usize = DEFAULT_MAX_EVENTS_BUF_SIZE,
> = ChainedExchangeHandler<
&'d InteractionModel<'a, C, B, T, K, N, NC, NS, NE>,
SecureChannel<'d, &'d C>,
>;
impl<'d, 'a, C, B, T, K, N, NC, const NS: usize, const NE: usize>
Responder<'a, DefaultExchangeHandler<'d, 'a, C, B, T, K, N, NC, NS, NE>>
where
B: Buffers<IMBuffer>,
{
#[inline(always)]
pub const fn new_default(
data_model: &'d InteractionModel<'a, C, B, T, K, N, NC, NS, NE>,
) -> Self
where
C: Crypto,
T: DataModel,
K: KvBlobStoreAccess,
N: net_comm::Networks,
{
Self::new(
"Responder",
ChainedExchangeHandler::new(
PROTO_ID_INTERACTION_MODEL,
data_model,
SecureChannel::new(data_model.crypto(), data_model),
),
data_model.matter(),
0,
)
}
}
pub type BusyExchangeHandler = ChainedExchangeHandler<BusyInteractionModel, BusySecureChannel>;
impl<'a> Responder<'a, BusyExchangeHandler> {
#[inline(always)]
pub const fn new_busy(matter: &'a Matter<'a>, respond_after_ms: u32) -> Self {
Self::new(
"Busy Responder",
ChainedExchangeHandler::new(
PROTO_ID_INTERACTION_MODEL,
BusyInteractionModel::new(),
BusySecureChannel::new(),
),
matter,
respond_after_ms,
)
}
}
pub struct DefaultResponder<
'd,
'a,
C,
B,
T,
K,
N,
NC = NoopWirelessNetCtl,
const NS: usize = DEFAULT_MAX_SUBSCRIPTIONS,
const NE: usize = DEFAULT_MAX_EVENTS_BUF_SIZE,
> where
B: Buffers<IMBuffer>,
{
responder: Responder<'a, DefaultExchangeHandler<'d, 'a, C, B, T, K, N, NC, NS, NE>>,
busy_responder: Responder<'a, BusyExchangeHandler>,
}
impl<'d, 'a, C, B, T, K, N, NC, const NS: usize, const NE: usize>
DefaultResponder<'d, 'a, C, B, T, K, N, NC, NS, NE>
where
C: Crypto,
B: Buffers<IMBuffer>,
T: DataModel,
K: KvBlobStoreAccess,
N: net_comm::Networks,
{
#[inline(always)]
pub const fn new(data_model: &'d InteractionModel<'a, C, B, T, K, N, NC, NS, NE>) -> Self {
Self {
responder: Responder::new_default(data_model),
busy_responder: Responder::new_busy(data_model.matter(), RESPOND_BUSY_MS),
}
}
pub async fn run<const A: usize, const O: usize>(&self) -> Result<(), Error> {
let mut actual = pin!(self.responder.run::<A>());
let mut busy = pin!(self.busy_responder.run::<O>());
select(&mut actual, &mut busy).coalesce().await
}
#[allow(clippy::type_complexity)]
pub const fn responder(
&self,
) -> &Responder<
'a,
ChainedExchangeHandler<
&'d InteractionModel<'a, C, B, T, K, N, NC, NS, NE>,
SecureChannel<'d, &'d C>,
>,
> {
&self.responder
}
pub const fn busy_responder(
&self,
) -> &Responder<'a, ChainedExchangeHandler<BusyInteractionModel, BusySecureChannel>> {
&self.busy_responder
}
}