use core::num::NonZeroU8;
use core::pin::pin;
use embassy_futures::select::{select3, select4};
use embassy_time::{Instant, Timer};
use crate::acl::Accessor;
use crate::crypto::Crypto;
use crate::dm::clusters::net_comm::{
NetCtl, NetworkType, Networks, NetworksAccess, SharedNetworks,
};
use crate::dm::clusters::wifi_diag::WirelessDiag;
use crate::dm::networks::eth::EthNetwork;
use crate::dm::networks::wireless::{NoopWirelessNetCtl, WirelessMgr, MAX_CREDS_SIZE};
use crate::dm::networks::NetChangeNotif;
use crate::dm::{
AsyncHandler, AttrChangeNotifier, AttrDetails, Attribute, DataModel, EventEmitter,
HandlerContext, MatchContextInstance, Metadata,
};
use crate::error::{Error, ErrorCode};
use crate::im::events::{EventReader, EventTLVWrite, Events, DEFAULT_MAX_EVENTS_BUF_SIZE};
use crate::im::invoker::HandlerInvoker;
use crate::im::subscriptions::{
ReportContext, Subscriptions, SubscriptionsBuffers, DEFAULT_MAX_SUBSCRIPTIONS,
};
use crate::persist::{KvBlobStoreAccess, NETWORKS_KEY};
use crate::respond::ExchangeHandler;
use crate::tlv::{get_root_node_struct, FromTLV, Nullable, TLVElement, TLVTag, TLVWrite, ToTLV};
use crate::transport::exchange::{Exchange, ExchangeId, MAX_EXCHANGE_TX_BUF_SIZE};
use crate::utils::init::{init, Init};
use crate::utils::select::Coalesce;
use crate::utils::storage::pooled::Buffers;
use crate::utils::storage::WriteBuf;
use crate::Matter;
pub use encoding::*;
pub use expand::{expand_invoke, expand_read, expand_write};
pub mod busy;
pub mod client;
pub mod encoding;
pub mod events;
pub mod expand;
pub mod invoker;
pub mod subscriptions;
pub struct InteractionModelState<
N,
const NS: usize = DEFAULT_MAX_SUBSCRIPTIONS,
const NE: usize = DEFAULT_MAX_EVENTS_BUF_SIZE,
> {
subscriptions: Subscriptions<NS>,
events: Events<NE>,
networks: SharedNetworks<N>,
}
impl<N, const NS: usize, const NE: usize> InteractionModelState<N, NS, NE> {
pub const fn new(networks: N) -> Self {
Self {
subscriptions: Subscriptions::new(),
events: Events::new(),
networks: SharedNetworks::new(networks),
}
}
pub fn init(networks: impl Init<N>) -> impl Init<Self> {
init!(Self {
subscriptions <- Subscriptions::init(),
events <- Events::init(),
networks <- SharedNetworks::init(networks),
})
}
pub async fn reset_persist<K>(&mut self, kv: K) -> Result<(), Error>
where
K: KvBlobStoreAccess,
N: Networks,
{
let Self {
events, networks, ..
} = self;
let events = events.inner_mut();
let networks = networks.get_mut().get_mut();
kv.access(|store, buf| {
events.reset_persist(&mut *store, buf)?;
networks.reset()?;
store.remove(NETWORKS_KEY, buf)?;
Ok(())
})
}
pub async fn load_persist<K>(&mut self, kv: K) -> Result<(), Error>
where
K: KvBlobStoreAccess,
N: Networks,
{
let Self {
events, networks, ..
} = self;
let events = events.inner_mut();
let networks = networks.get_mut().get_mut();
kv.access(|store, buf| {
events.load_persist(&mut *store, buf)?;
networks.reset()?;
if let Some(data) = store.load(NETWORKS_KEY, buf)? {
networks.load(data)?;
}
Ok(())
})
}
pub const fn subscriptions(&self) -> &Subscriptions<NS> {
&self.subscriptions
}
pub const fn events(&self) -> &Events<NE> {
&self.events
}
pub const fn networks(&self) -> &SharedNetworks<N> {
&self.networks
}
}
pub struct InteractionModel<
'a,
C,
B,
T,
K,
N,
NC = NoopWirelessNetCtl,
const NS: usize = DEFAULT_MAX_SUBSCRIPTIONS,
const NE: usize = DEFAULT_MAX_EVENTS_BUF_SIZE,
> where
B: Buffers<IMBuffer>,
{
matter: &'a Matter<'a>,
crypto: C,
buffers: &'a B,
kv: K,
net_ctl: NC,
subscriptions_buffers: SubscriptionsBuffers<'a, B, NS>,
state: &'a InteractionModelState<N, NS, NE>,
handler: T,
}
pub type EthInteractionModelState<
const NS: usize = DEFAULT_MAX_SUBSCRIPTIONS,
const NE: usize = DEFAULT_MAX_EVENTS_BUF_SIZE,
> = InteractionModelState<EthNetwork<'static>, NS, NE>;
pub type EthInteractionModel<
'a,
C,
B,
T,
K,
NC = NoopWirelessNetCtl,
const NS: usize = DEFAULT_MAX_SUBSCRIPTIONS,
const NE: usize = DEFAULT_MAX_EVENTS_BUF_SIZE,
> = InteractionModel<'a, C, B, T, K, EthNetwork<'static>, NC, NS, NE>;
pub type WirelessInteractionModelState<
N,
const NS: usize = DEFAULT_MAX_SUBSCRIPTIONS,
const NE: usize = DEFAULT_MAX_EVENTS_BUF_SIZE,
> = InteractionModelState<N, NS, NE>;
pub type WirelessInteractionModel<
'a,
C,
B,
T,
K,
N,
NC,
const NS: usize = DEFAULT_MAX_SUBSCRIPTIONS,
const NE: usize = DEFAULT_MAX_EVENTS_BUF_SIZE,
> = InteractionModel<'a, C, B, T, K, N, NC, NS, NE>;
impl<'a, C, B, T, K, N, const NS: usize, const NE: usize>
InteractionModel<'a, C, B, T, K, N, NoopWirelessNetCtl, NS, NE>
where
C: Crypto,
B: Buffers<IMBuffer>,
T: DataModel,
K: KvBlobStoreAccess,
N: Networks,
{
#[inline(always)]
pub fn new(
matter: &'a Matter<'a>,
crypto: C,
buffers: &'a B,
handler: T,
kv: K,
state: &'a InteractionModelState<N, NS, NE>,
) -> Self {
Self::new_with_net_ctl(
matter,
crypto,
buffers,
handler,
kv,
NoopWirelessNetCtl::new(NetworkType::Ethernet),
state,
)
}
}
impl<'a, C, B, T, K, N, NC, const NS: usize, const NE: usize>
InteractionModel<'a, C, B, T, K, N, NC, NS, NE>
where
C: Crypto,
B: Buffers<IMBuffer>,
T: DataModel,
K: KvBlobStoreAccess,
N: Networks,
{
#[inline(always)]
pub fn new_with_net_ctl(
matter: &'a Matter<'a>,
crypto: C,
buffers: &'a B,
handler: T,
kv: K,
net_ctl: NC,
state: &'a InteractionModelState<N, NS, NE>,
) -> Self {
state.subscriptions.clear();
Self {
matter,
crypto,
buffers,
kv,
net_ctl,
subscriptions_buffers: SubscriptionsBuffers::new(),
state,
handler,
}
}
pub const fn matter(&self) -> &'a Matter<'a> {
self.matter
}
pub const fn crypto(&self) -> &C {
&self.crypto
}
pub fn open_basic_comm_window(&self, timeout_secs: u16) -> Result<(), Error> {
self.matter
.open_basic_comm_window(timeout_secs, &self.crypto, self)
}
pub fn close_comm_window(&self) -> Result<bool, Error> {
self.matter.close_comm_window(self)
}
pub fn bump_configuration_version(&self) -> Result<u32, Error> {
self.matter.bump_configuration_version(&self.kv, self)
}
pub async fn run(&self) -> Result<(), Error>
where
NC: NetCtl + WirelessDiag + NetChangeNotif,
{
let mut timeouts = pin!(self.run_timeout_checks());
let mut handler = pin!(self.handler.run(self));
let mut subs = pin!(self.process_subscriptions(self.matter));
let mut net = pin!(self.run_net_mgr());
select4(&mut timeouts, &mut handler, &mut subs, &mut net)
.coalesce()
.await
}
async fn run_net_mgr(&self) -> Result<(), Error>
where
NC: NetCtl + WirelessDiag + NetChangeNotif,
{
if self.net_ctl.net_type() == NetworkType::Ethernet {
return core::future::pending().await;
}
let mut buf = [0u8; MAX_CREDS_SIZE];
let mut mgr = WirelessMgr::new(self.state.networks(), &self.net_ctl, &mut buf);
mgr.run().await
}
pub async fn connect_once(&self, network_id: &[u8]) -> Result<(), Error>
where
NC: NetCtl + WirelessDiag + NetChangeNotif,
{
let mut buf = [0u8; MAX_CREDS_SIZE];
let mut mgr = WirelessMgr::new(self.state.networks(), &self.net_ctl, &mut buf);
mgr.connect_once(network_id).await
}
async fn run_timeout_checks(&self) -> Result<(), Error> {
const CHECK_INTERVAL_SECS: u64 = 1;
loop {
Timer::after_secs(CHECK_INTERVAL_SECS).await;
self.check_timeouts(None)?;
}
}
fn check_timeouts(&self, exch_id: Option<ExchangeId>) -> Result<(), Error> {
let mut notify_mdns = || self.matter.transport().notify_mdns_changed();
let mut notify_change =
|endpt_id, clust_id| self.notify_cluster_changed(endpt_id, clust_id);
self.matter.with_state(|state| {
let expire_sess_id = exch_id.and_then(|exch_id| {
state
.sessions
.get(exch_id.session_id())
.map(|sess| sess.id())
});
state.failsafe.check_failsafe_timeout(
&mut state.fabrics,
&mut state.sessions,
&self.state.networks,
&self.kv,
expire_sess_id,
&mut notify_mdns,
&mut notify_change,
)?;
state
.pase
.check_comm_window_timeout(&mut notify_mdns, &mut notify_change)?;
Ok(())
})
}
pub async fn handle(&self, exchange: &mut Exchange<'_>) -> Result<(), Error> {
let fetch_meta = |exchange: &mut Exchange| {
let meta = exchange.rx()?.meta();
if meta.proto_id != PROTO_ID_INTERACTION_MODEL {
Err(ErrorCode::InvalidProto)?;
}
Result::<_, Error>::Ok(meta)
};
if exchange.rx().is_err() {
exchange.recv_fetch().await?;
}
let is_groupcast = exchange.is_groupcast()?;
let mut meta = fetch_meta(exchange)?;
let timeout_instant = if !is_groupcast && meta.opcode::<OpCode>()? == OpCode::TimedRequest {
let timeout = self.timed(exchange).await?;
exchange.recv_fetch().await?;
meta = fetch_meta(exchange)?;
Some(timeout)
} else {
None
};
self.check_timeouts(Some(exchange.id()))?;
match meta.opcode::<OpCode>()? {
OpCode::ReadRequest if is_groupcast => {
error!("Received a groupcast message for opcode: ReadRequest")
}
OpCode::ReadRequest if !is_groupcast => self.read(exchange).await?,
OpCode::WriteRequest => self.write(exchange, timeout_instant, is_groupcast).await?,
OpCode::InvokeRequest => self.invoke(exchange, timeout_instant, is_groupcast).await?,
OpCode::SubscribeRequest if is_groupcast => {
error!("Received a groupcast message for opcode: SubscribeRequest")
}
OpCode::SubscribeRequest if !is_groupcast => self.subscribe(exchange).await?,
OpCode::TimedRequest if !is_groupcast => {
Self::send_status(exchange, IMStatusCode::InvalidAction).await?
}
_ if is_groupcast => {
}
opcode => {
error!("Invalid opcode: {:?}", opcode);
Err(ErrorCode::InvalidOpcode)?
}
}
if !is_groupcast {
exchange.acknowledge().await?;
}
Ok(())
}
async fn read(&self, exchange: &mut Exchange<'_>) -> Result<(), Error> {
let Some((mut tx, rx)) = self.buffers(exchange).await? else {
return Ok(());
};
let read_req = ReadReq::new(TLVElement::new(&rx));
debug!("IM: Read request: {:?}", read_req);
if let Err(err) = Self::validate_read(&read_req) {
error!("Invalid read request: {:?}", err);
return Self::send_status(exchange, err.code().into()).await;
}
let req = ReportDataReq::Read(&read_req);
let mut wb = WriteBuf::new(&mut tx);
let fabric_filtered = req.fabric_filtered().unwrap_or(true);
let mut resp = ReportDataResponder::new(
&req,
None,
HandlerInvoker::new(exchange, self),
EventReader::new(0, u64::MAX, fabric_filtered),
&self.state.events,
);
resp.respond(&mut wb, true, true, &self.handler, |_, _, _| true)
.await?;
Ok(())
}
fn validate_read(req: &ReadReq<'_>) -> Result<(), Error> {
if let Some(attr_requests) = req.attr_requests()? {
for attr_req in attr_requests {
Self::validate_attr_wildcard_path(&attr_req?)?;
}
}
Ok(())
}
fn validate_attr_wildcard_path(path: &AttrPath) -> Result<(), Error> {
if path.cluster.is_none() {
if let Some(attr_id) = path.attr {
if !Attribute::is_system_attr(attr_id) {
return Err(ErrorCode::InvalidAction.into());
}
}
}
Ok(())
}
async fn write(
&self,
exchange: &mut Exchange<'_>,
timeout_instant: Option<Instant>,
is_groupcast: bool,
) -> Result<(), Error> {
while exchange.rx().is_ok() {
let Some((mut tx, rx)) = self.buffers(exchange).await? else {
break;
};
let req = WriteReq::new(TLVElement::new(&rx));
debug!("IM: Write request: {:?}", req);
let timed = req.timed_request()?;
if self.timed_out(exchange, timeout_instant, timed).await? {
break;
}
let mut wb = WriteBuf::new(&mut tx);
let mut resp = WriteResponder::new(&req, HandlerInvoker::new(exchange, self));
resp.respond(&mut wb, &self.handler, is_groupcast).await?;
if req.more_chunks()? {
exchange.recv_fetch().await?;
}
}
Ok(())
}
async fn invoke(
&self,
exchange: &mut Exchange<'_>,
timeout_instant: Option<Instant>,
is_groupcast: bool,
) -> Result<(), Error> {
let Some((mut tx, rx)) = self.buffers(exchange).await? else {
return Ok(());
};
let req = InvReq::new(TLVElement::new(&rx));
debug!("IM: Invoke request: {:?}", req);
let timed = req.timed_request()?;
if self.timed_out(exchange, timeout_instant, timed).await? {
return Ok(());
}
let max_paths = exchange.matter().dev_det().max_paths_per_invoke as usize;
if let Some(reqs) = req.inv_requests()? {
let mut count = 0;
for r in &reqs {
let _ = r?;
count += 1;
}
if count > max_paths {
return Self::send_status(exchange, IMStatusCode::InvalidAction).await;
}
if count > 1 {
for (i, req_i) in reqs.iter().enumerate() {
let req_i = req_i?;
if req_i.command_ref.is_none() {
return Self::send_status(exchange, IMStatusCode::InvalidAction).await;
}
for req_j in reqs.iter().skip(i + 1) {
let req_j = req_j?;
if req_i.path == req_j.path || req_i.command_ref == req_j.command_ref {
return Self::send_status(exchange, IMStatusCode::InvalidAction).await;
}
}
}
}
}
let mut wb = WriteBuf::new(&mut tx);
let mut resp = InvokeResponder::new(&req, HandlerInvoker::new(exchange, self));
resp.respond(&mut wb, &self.handler, is_groupcast).await
}
async fn subscribe(&self, exchange: &mut Exchange<'_>) -> Result<(), Error> {
let Some((mut tx, rx)) = self.buffers(exchange).await? else {
return Ok(());
};
let req = SubscribeReq::new(TLVElement::new(&rx));
debug!("IM: Subscribe request: {:?}", req);
let accessor = exchange.accessor()?;
if let Err(err) = self.validate_subscribe(&req, &accessor) {
error!("Invalid subscribe request: {:?}", err);
return Self::send_status(exchange, err.code().into()).await;
}
let (fab_idx, peer_node_id) = exchange.with_state(|state| {
let sess = exchange.id().session(&mut state.sessions);
let fab_idx = NonZeroU8::new(sess.get_local_fabric_idx()).ok_or(ErrorCode::Invalid)?;
let peer_node_id = sess.get_peer_node_id().ok_or(ErrorCode::Invalid)?;
Ok((fab_idx, peer_node_id))
})?;
if !req.keep_subs()? {
self.state
.subscriptions
.remove(&self.subscriptions_buffers, |sub| {
(sub.ids().fab_idx == fab_idx && sub.ids().peer_node_id == peer_node_id)
.then_some("new subscription request")
});
}
let max_int_secs = core::cmp::max(req.max_int_ceil()?, 40); let min_int_secs = req.min_int_floor()?;
let now = Instant::now();
let Some(mut rctx) = self.state.subscriptions.add(
now,
fab_idx,
peer_node_id,
exchange.id().session_id(),
min_int_secs,
max_int_secs,
self.state.events.watermark(),
rx,
&self.subscriptions_buffers,
) else {
return Self::send_status(exchange, IMStatusCode::ResourceExhausted).await;
};
let primed = self.report_data(&mut rctx, &mut tx, exchange, true).await?;
if primed {
exchange
.send_with(|_, wb| {
SubscribeResp::write(wb, rctx.subscription().ids().id, max_int_secs)?;
Ok(Some(OpCode::SubscribeResponse.into()))
})
.await?;
rctx.set_keep();
info!("Subscription {:?} primed", rctx.subscription().ids());
drop(rctx);
self.state.subscriptions.notification.notify();
}
Ok(())
}
fn validate_subscribe(
&self,
req: &SubscribeReq<'_>,
accessor: &Accessor<'_>,
) -> Result<(), Error> {
self.handler.access(|node| {
let mut has_attrs = false;
let mut has_events = false;
if let Some(attr_requests) = req.attr_requests()? {
has_attrs = true;
for attr_req in attr_requests {
let path = attr_req?;
if path.is_wildcard() {
Self::validate_attr_wildcard_path(&path)?;
if !node.has_accessible_attr(&path, accessor) {
return Err(ErrorCode::InvalidAction.into());
}
} else {
node.validate_attr_path(&path, false, false, accessor)
.map_err(|_| ErrorCode::InvalidAction)?;
}
}
}
if let Some(event_reqs) = req.event_requests()? {
has_events = true;
for event_req in event_reqs {
let path = event_req?;
if !path.is_wildcard() {
node.validate_event_path(&path, accessor)
.map_err(|_| ErrorCode::InvalidAction)?;
}
}
}
if !has_attrs && !has_events {
return Err(ErrorCode::InvalidAction.into());
}
Ok(())
})
}
async fn process_subscriptions(&self, matter: &Matter<'_>) -> Result<(), Error> {
loop {
let mut notification = pin!(self.state.subscriptions.notification.wait());
let mut session_removed = pin!(matter.transport().wait_session_removed());
let deadline = self
.state
.subscriptions
.next_report_at(self.state.events.watermark(), &self.subscriptions_buffers);
let mut timeout = pin!(Timer::at(deadline));
select3(&mut notification, &mut timeout, &mut session_removed).await;
let now = Instant::now();
loop {
let removed_any =
self.state
.subscriptions
.remove(&self.subscriptions_buffers, |sub| {
if sub.is_expired(now) {
return Some("expired");
}
matter.with_state(|state| {
if state.fabrics.get(sub.ids().fab_idx).is_none() {
return Some("fabric removed");
}
if state.sessions.get(sub.session_id()).is_none() {
return Some("session removed");
}
None
})
});
if !removed_any {
break;
}
}
let event_numbers_watermark = self.state.events.watermark();
loop {
let Some(mut rctx) = self.state.subscriptions.report(
now,
event_numbers_watermark,
&self.subscriptions_buffers,
) else {
break;
};
let result = self.process_subscription(matter, &mut rctx).await;
match result {
Ok(true) => rctx.set_keep(),
Ok(false) => (),
Err(e) => error!(
"Error processing subscription {:?}: {:?}",
rctx.subscription().ids(),
e
),
}
}
self.state.subscriptions.purge_reported_changes();
}
}
#[allow(clippy::too_many_arguments)]
async fn process_subscription(
&self,
matter: &Matter<'_>,
rctx: &mut ReportContext<'_, '_, B, NS>,
) -> Result<bool, Error> {
let mut exchange =
Exchange::initiate_for_session(matter, rctx.subscription().session_id())?;
if let Some(mut tx) = self.buffers.get().await {
unwrap!(tx.resize_default(MAX_EXCHANGE_TX_BUF_SIZE));
let primed = self
.report_data(rctx, &mut tx, &mut exchange, false)
.await?;
exchange.acknowledge().await?;
Ok(primed)
} else {
error!(
"No TX buffer available for processing subscription {:?}",
rctx.subscription().ids(),
);
Ok(false)
}
}
async fn timed(&self, exchange: &mut Exchange<'_>) -> Result<Instant, Error> {
let req = TimedReq::from_tlv(&get_root_node_struct(exchange.rx()?.payload())?)?;
debug!("IM: Timed request: {:?}", req);
let timeout_instant = req.timeout_instant();
Self::send_status(exchange, IMStatusCode::Success).await?;
Ok(timeout_instant)
}
async fn timed_out(
&self,
exchange: &mut Exchange<'_>,
timeout_instant: Option<Instant>,
timed_req: bool,
) -> Result<bool, Error> {
let status = {
if timed_req != timeout_instant.is_some() {
Some(IMStatusCode::TimedRequestMisMatch)
} else if timeout_instant
.map(|timeout_instant| Instant::now() > timeout_instant)
.unwrap_or(false)
{
Some(IMStatusCode::Timeout)
} else {
None
}
};
if let Some(status) = status {
Self::send_status(exchange, status).await?;
Ok(true)
} else {
Ok(false)
}
}
#[allow(clippy::too_many_arguments)]
async fn report_data(
&self,
rctx: &mut ReportContext<'_, '_, B, NS>,
tx: &mut [u8],
exchange: &mut Exchange<'_>,
with_dataver: bool,
) -> Result<bool, Error>
where
T: DataModel,
{
let mut wb = WriteBuf::new(tx);
let sub_req = SubscribeReq::new(TLVElement::new(rctx.rx()));
let req = if with_dataver {
ReportDataReq::Subscribe(&sub_req)
} else {
ReportDataReq::SubscribeReport(&sub_req)
};
let fabric_filtered = req.fabric_filtered().unwrap_or(true);
let mut resp = ReportDataResponder::new(
&req,
Some(rctx.subscription().ids().id),
HandlerInvoker::new(exchange, self),
EventReader::new(
rctx.max_seen_event_number(),
rctx.next_max_seen_event_number(),
fabric_filtered,
),
&self.state.events,
);
let sub_valid = resp
.respond(
&mut wb,
false,
rctx.should_send_if_empty(),
&self.handler,
|e, c, a| rctx.should_report_attr(e, c, a),
)
.await?;
if !sub_valid {
warn!(
"Subscription {:?} removed during reporting",
rctx.subscription().ids()
);
}
Ok(sub_valid)
}
async fn buffers(
&self,
exchange: &mut Exchange<'_>,
) -> Result<Option<(B::Buffer<'a>, B::Buffer<'a>)>, Error> {
if let Some(tx) = self.tx_buffer(exchange).await? {
if let Some(rx) = self.rx_buffer(exchange).await? {
return Ok(Some((tx, rx)));
}
}
Ok(None)
}
async fn rx_buffer(&self, exchange: &mut Exchange<'_>) -> Result<Option<B::Buffer<'a>>, Error> {
if let Some(mut buffer) = self.buffer(exchange).await? {
let rx = exchange.rx()?;
buffer.clear();
unwrap!(buffer.extend_from_slice(rx.payload()));
exchange.rx_done()?;
Ok(Some(buffer))
} else {
Ok(None)
}
}
async fn tx_buffer(&self, exchange: &mut Exchange<'_>) -> Result<Option<B::Buffer<'a>>, Error> {
if let Some(mut buffer) = self.buffer(exchange).await? {
unwrap!(buffer.resize_default(MAX_EXCHANGE_TX_BUF_SIZE));
Ok(Some(buffer))
} else {
Ok(None)
}
}
async fn buffer(&self, exchange: &mut Exchange<'_>) -> Result<Option<B::Buffer<'a>>, Error> {
if let Some(buffer) = self.buffers.get().await {
Ok(Some(buffer))
} else {
Self::send_status(exchange, IMStatusCode::Busy).await?;
Ok(None)
}
}
async fn send_status(exchange: &mut Exchange<'_>, status: IMStatusCode) -> Result<(), Error> {
exchange
.send_with(|_, wb| {
StatusResp::write(wb, status)?;
Ok(Some(OpCode::StatusResponse.into()))
})
.await
}
}
impl<C, B, T, K, N, NC, const NS: usize, const NE: usize> ExchangeHandler
for InteractionModel<'_, C, B, T, K, N, NC, NS, NE>
where
C: Crypto,
B: Buffers<IMBuffer>,
T: DataModel,
K: KvBlobStoreAccess,
N: Networks,
{
async fn handle(&self, mut exchange: Exchange<'_>) -> Result<(), Error> {
InteractionModel::handle(self, &mut exchange).await
}
}
impl<C, B, T, K, N, NC, const NS: usize, const NE: usize> HandlerContext
for InteractionModel<'_, C, B, T, K, N, NC, NS, NE>
where
C: Crypto,
B: Buffers<IMBuffer>,
T: DataModel,
K: KvBlobStoreAccess,
N: Networks,
{
fn matter(&self) -> &Matter<'_> {
self.matter
}
fn crypto(&self) -> impl Crypto + '_ {
&self.crypto
}
fn kv(&self) -> impl KvBlobStoreAccess + '_ {
&self.kv
}
fn networks(&self) -> impl NetworksAccess + '_ {
&self.state.networks
}
fn metadata(&self) -> impl Metadata + '_ {
&self.handler
}
fn handler(&self) -> impl AsyncHandler + '_ {
&self.handler
}
fn buffers(&self) -> impl Buffers<IMBuffer> + '_ {
self.buffers
}
}
impl<C, B, T, K, N, NC, const NS: usize, const NE: usize> AttrChangeNotifier
for InteractionModel<'_, C, B, T, K, N, NC, NS, NE>
where
C: Crypto,
B: Buffers<IMBuffer>,
T: DataModel,
K: KvBlobStoreAccess,
N: Networks,
{
fn notify_attr_changed(&self, endpoint_id: EndptId, cluster_id: ClusterId, attr_id: AttrId) {
self.handler.bump_dataver(MatchContextInstance::new(
Some(endpoint_id),
Some(cluster_id),
));
self.state
.subscriptions
.notify_attr_changed(endpoint_id, cluster_id, attr_id);
}
fn notify_cluster_changed(&self, endpoint_id: EndptId, cluster_id: ClusterId) {
self.handler.bump_dataver(MatchContextInstance::new(
Some(endpoint_id),
Some(cluster_id),
));
self.state
.subscriptions
.notify_cluster_changed(endpoint_id, cluster_id);
}
fn notify_endpoint_changed(&self, endpoint_id: EndptId) {
self.handler
.bump_dataver(MatchContextInstance::new(Some(endpoint_id), None));
self.state
.subscriptions
.notify_endpoint_changed(endpoint_id)
}
fn notify_all_changed(&self) {
self.handler
.bump_dataver(MatchContextInstance::new(None, None));
self.state.subscriptions.notify_all_changed()
}
}
impl<C, B, T, K, N, NC, const NS: usize, const NE: usize> EventEmitter
for InteractionModel<'_, C, B, T, K, N, NC, NS, NE>
where
C: Crypto,
B: Buffers<IMBuffer>,
T: DataModel,
K: KvBlobStoreAccess,
N: Networks,
{
fn emit_event<F>(
&self,
endpoint_id: EndptId,
cluster_id: ClusterId,
event_id: EventId,
priority: EventPriority,
f: F,
) -> Result<u64, Error>
where
F: FnOnce(EventTLVWrite<'_>) -> Result<(), Error>,
{
let event_number =
self.state
.events
.push(endpoint_id, cluster_id, event_id, priority, &self.kv, f)?;
self.state
.subscriptions
.notify_event_emitted(endpoint_id, cluster_id, event_id);
Ok(event_number)
}
}
pub enum RespondOutcome {
Accepted,
Rejected,
Empty,
}
struct ReportDataResponder<'a, 'b, 'c, const NE: usize, C> {
req: &'a ReportDataReq<'a>,
subscription_id: Option<u32>,
invoker: HandlerInvoker<'b, 'c, C>,
event_reader: EventReader,
events: &'a Events<NE>,
}
impl<'a, 'b, 'c, const NE: usize, C> ReportDataResponder<'a, 'b, 'c, NE, C>
where
C: HandlerContext,
{
const LONG_READS_TLV_RESERVE_SIZE: usize = 24;
const fn new(
req: &'a ReportDataReq<'a>,
subscription_id: Option<u32>,
invoker: HandlerInvoker<'b, 'c, C>,
event_reader: EventReader,
events: &'a Events<NE>,
) -> Self {
Self {
req,
subscription_id,
invoker,
event_reader,
events,
}
}
async fn respond<M, F>(
&mut self,
wb: &mut WriteBuf<'_>,
suppress_last_resp: bool,
send_if_empty: bool,
metadata: M,
mut filter: F,
) -> Result<bool, Error>
where
M: Metadata,
F: FnMut(EndptId, ClusterId, u32) -> bool,
{
let mut empty = true;
self.start_reply(wb)?;
if !self
.report_attributes(wb, &mut empty, &metadata, &mut filter)
.await?
{
return Ok(false);
}
if !self.report_events(wb, &mut empty, &metadata).await? {
return Ok(false);
}
if send_if_empty || !empty {
self.send(ReportDataChunkState::Done, suppress_last_resp, wb)
.await
} else {
debug!("No data to report, skipping sending ReportData response");
Ok(true)
}
}
async fn report_attributes<M, F>(
&mut self,
wb: &mut WriteBuf<'_>,
empty: &mut bool,
metadata: M,
mut filter: F,
) -> Result<bool, Error>
where
M: Metadata,
F: FnMut(EndptId, ClusterId, u32) -> bool,
{
let accessor = self.invoker.exchange().accessor()?;
if self.req.attr_requests()?.is_some() {
wb.start_array(&TLVTag::Context(ReportDataRespTag::AttributeReports as u8))?;
for item in expand_read(&metadata, self.req, &accessor, &mut filter)? {
let item = item?;
*empty = false;
loop {
let result = self.invoker.process_read(&item, &mut *wb).await;
match result {
Ok(()) => break,
Err(err) if err.code() == ErrorCode::NoSpace => {
let array_attr = item.as_ref().ok().filter(|attr| {
attr.list_index.is_none()
&& attr.array
});
if let Some(array_attr) = array_attr {
if self.send_array_items(array_attr, wb).await? {
break;
} else {
return Ok(false);
}
} else {
debug!("<<< No TX space, chunking >>>");
if !self
.send(ReportDataChunkState::ChunkingAttributes, false, wb)
.await?
{
return Ok(false);
}
}
}
Err(err) => Err(err)?,
}
}
}
wb.end_container()?;
}
Ok(true)
}
async fn report_events<M>(
&mut self,
wb: &mut WriteBuf<'_>,
empty: &mut bool,
metadata: M,
) -> Result<bool, Error>
where
M: Metadata,
{
let accessor = self.invoker.exchange().accessor()?;
if let Some(event_reqs) = self.req.event_requests()? {
wb.start_array(&TLVTag::Context(ReportDataRespTag::EventReports as _))?;
for event_req in event_reqs.iter() {
let path = event_req?;
if !path.is_wildcard() {
if let Err(status) =
metadata.access(|node| node.validate_event_path(&path, &accessor))
{
if matches!(status, IMStatusCode::UnsupportedEvent) {
continue;
}
*empty = false;
let resp = EventResp::Status(EventStatus::new(path, status, None));
let mut result = resp.to_tlv(&TLVTag::Anonymous, &mut *wb);
if let Err(e) = &result {
if e.code() == ErrorCode::NoSpace {
debug!("<<< No TX space, chunking >>>");
if !self
.send(ReportDataChunkState::ChunkingEvents, false, &mut *wb)
.await?
{
return Ok(false);
}
result = resp.to_tlv(&TLVTag::Anonymous, &mut *wb);
}
}
result?;
}
}
}
let event_filters = self.req.event_filters()?;
loop {
let finished = self.events.fetch(|events| {
metadata.access(|node| {
for event in events {
let result = self.event_reader.process_read(
event,
&event_reqs,
&event_filters,
node,
&accessor,
&mut *wb,
);
if let Err(e) = &result {
if e.code() == ErrorCode::NoSpace {
return Ok::<_, Error>(false);
}
}
if result? {
*empty = false;
}
}
Ok(true)
})
})?;
if finished {
break;
}
debug!("<<< No TX space, chunking >>>");
if !self
.send(ReportDataChunkState::ChunkingEvents, false, wb)
.await?
{
return Ok(false);
}
}
wb.end_container()?;
}
Ok(true)
}
async fn send_array_items(
&mut self,
attr: &AttrDetails,
wb: &mut WriteBuf<'_>,
) -> Result<bool, Error> {
let mut attr = attr.clone();
let mut list_index = None;
attr.list_chunked = true;
attr.list_index = Some(Nullable::new(list_index));
loop {
let pos = wb.get_tail();
let result = self.invoker.read(&attr, &mut *wb).await;
if result.is_err() {
wb.rewind_to(pos);
}
match result {
Ok(()) => {
let new_list_index = if let Some(list_index) = list_index {
list_index + 1
} else {
0
};
list_index = Some(new_list_index);
attr.list_index = Some(Nullable::some(new_list_index));
}
Err(err) if err.code() == ErrorCode::NoSpace => {
debug!("<<< No TX space, chunking >>>");
if !self
.send(ReportDataChunkState::ChunkingAttributes, false, wb)
.await?
{
return Ok(false);
}
}
Err(err) if err.code() == ErrorCode::ConstraintError => break, Err(err) => Err(err)?,
}
}
Ok(true)
}
async fn send(
&mut self,
state: ReportDataChunkState,
suppress_last_resp: bool,
wb: &mut WriteBuf<'_>,
) -> Result<bool, Error> {
self.end_reply(state, suppress_last_resp, wb)?;
self.invoker
.exchange()
.send(OpCode::ReportData, wb.as_slice())
.await?;
let cont = match state {
ReportDataChunkState::ChunkingAttributes => {
let cont = self.recv_status_success().await?;
self.start_reply(wb)?;
wb.start_array(&TLVTag::Context(ReportDataRespTag::AttributeReports as u8))?;
cont
}
ReportDataChunkState::ChunkingEvents => {
let cont = self.recv_status_success().await?;
self.start_reply(wb)?;
wb.start_array(&TLVTag::Context(ReportDataRespTag::EventReports as u8))?;
cont
}
ReportDataChunkState::Done => {
if !suppress_last_resp {
self.recv_status_success().await?
} else {
false
}
}
};
Ok(cont)
}
async fn recv_status_success(&mut self) -> Result<bool, Error> {
let rx = self.invoker.exchange().recv().await?;
let opcode = rx.meta().proto_opcode;
if opcode != OpCode::StatusResponse as u8 {
warn!(
"Got opcode {:02x}, while expecting status code {:02x}",
opcode,
OpCode::StatusResponse as u8
);
return Err(ErrorCode::Invalid.into());
}
let resp = StatusResp::from_tlv(&get_root_node_struct(rx.payload())?)?;
if resp.status == IMStatusCode::Success {
Ok(true)
} else {
warn!(
"Got status response {:?}, aborting interaction",
resp.status
);
drop(rx);
self.invoker.exchange().acknowledge().await?;
Ok(false)
}
}
fn start_reply(&self, wb: &mut WriteBuf<'_>) -> Result<(), Error> {
wb.reset();
wb.shrink(Self::LONG_READS_TLV_RESERVE_SIZE)?;
wb.start_struct(&TLVTag::Anonymous)?;
if let Some(subscription_id) = self.subscription_id {
assert!(matches!(
self.req,
ReportDataReq::Subscribe(_) | ReportDataReq::SubscribeReport(_)
));
wb.u32(
&TLVTag::Context(ReportDataRespTag::SubscriptionId as u8),
subscription_id,
)?;
} else {
assert!(matches!(self.req, ReportDataReq::Read(_)));
}
Ok(())
}
fn end_reply(
&self,
state: ReportDataChunkState,
suppress_resp: bool,
wb: &mut WriteBuf<'_>,
) -> Result<(), Error> {
wb.expand(Self::LONG_READS_TLV_RESERVE_SIZE)?;
match state {
ReportDataChunkState::ChunkingAttributes | ReportDataChunkState::ChunkingEvents => {
wb.end_container()?;
wb.bool(
&TLVTag::Context(ReportDataRespTag::MoreChunkedMsgs as u8),
true,
)?;
}
ReportDataChunkState::Done => {
if suppress_resp {
wb.bool(
&TLVTag::Context(ReportDataRespTag::SupressResponse as u8),
true,
)?;
}
}
};
wb.u8(
&TLVTag::Context(crate::im::encoding::IM_REVISION_TAG),
IM_REVISION,
)?;
wb.end_container()?;
Ok(())
}
}
#[derive(Clone, Copy)]
enum ReportDataChunkState {
ChunkingAttributes,
ChunkingEvents,
Done,
}
struct WriteResponder<'a, 'b, 'c, C> {
req: &'a WriteReq<'a>,
invoker: HandlerInvoker<'b, 'c, C>,
}
impl<'a, 'b, 'c, C> WriteResponder<'a, 'b, 'c, C>
where
C: HandlerContext,
{
const fn new(req: &'a WriteReq<'a>, invoker: HandlerInvoker<'b, 'c, C>) -> Self {
Self { req, invoker }
}
async fn respond<M>(
&mut self,
wb: &mut WriteBuf<'_>,
metadata: M,
suppress_resp: bool,
) -> Result<(), Error>
where
M: Metadata,
{
let accessor = self.invoker.exchange().accessor()?;
wb.reset();
wb.start_struct(&TLVTag::Anonymous)?;
wb.start_array(&TLVTag::Context(WriteRespTag::WriteResponses as u8))?;
for item in expand_write(metadata, self.req, &accessor)? {
self.invoker.process_write(&item?, &mut *wb).await?;
}
if suppress_resp {
return Ok(());
}
wb.end_container()?;
wb.u8(
&TLVTag::Context(crate::im::encoding::IM_REVISION_TAG),
IM_REVISION,
)?;
wb.end_container()?;
self.invoker
.exchange()
.send(OpCode::WriteResponse, wb.as_slice())
.await
}
}
struct InvokeResponder<'a, 'b, 'c, C> {
req: &'a InvReq<'a>,
invoker: HandlerInvoker<'b, 'c, C>,
}
impl<'a, 'b, 'c, C> InvokeResponder<'a, 'b, 'c, C>
where
C: HandlerContext,
{
const fn new(req: &'a InvReq<'a>, invoker: HandlerInvoker<'b, 'c, C>) -> Self {
Self { req, invoker }
}
async fn respond<M>(
&mut self,
wb: &mut WriteBuf<'_>,
metadata: M,
suppress_resp: bool,
) -> Result<(), Error>
where
M: Metadata,
{
wb.reset();
wb.start_struct(&TLVTag::Anonymous)?;
wb.bool(
&TLVTag::Context(InvRespTag::SupressResponse as u8),
suppress_resp,
)?;
let has_requests = self.req.inv_requests()?.is_some();
if has_requests {
wb.start_array(&TLVTag::Context(InvRespTag::InvokeResponses as u8))?;
}
let accessor = self.invoker.exchange().accessor()?;
for item in expand_invoke(metadata, self.req, &accessor)? {
self.invoker.process_invoke(&item?, &mut *wb).await?;
}
if suppress_resp {
return Ok(());
}
if has_requests {
wb.end_container()?;
}
wb.u8(
&TLVTag::Context(crate::im::encoding::IM_REVISION_TAG),
IM_REVISION,
)?;
wb.end_container()?;
self.invoker
.exchange()
.send(OpCode::InvokeResponse, wb.as_slice())
.await?;
Ok(())
}
}