use core::fmt::{self, Display};
use core::future::Future;
use core::num::NonZeroU8;
use core::ops::{Deref, DerefMut};
use core::pin::pin;
use domain::base::name::ToLabelIter;
use embassy_futures::select::{select, select3, select4, Either};
use embassy_time::{Duration, Timer};
use rand_core::RngCore;
use crate::crypto::Crypto;
use crate::dm::clusters::basic_info::BasicInfoConfig;
use crate::dm::NodeId;
use crate::error::{Error, ErrorCode};
use crate::fabric::{MAX_FABRICS, MAX_GROUPS_PER_FABRIC};
use crate::fmt::Bytes;
use crate::sc::case::CaseInitiator;
use crate::sc::pase::PaseInitiator;
use crate::sc::{
sc_write, OpCode, SCStatusCodes, SessionParameters, StatusReport, PROTO_ID_SECURE_CHANNEL,
};
use crate::tlv::TLVElement;
use crate::transport::network::mdns::{
commissionable_instance_id, score_ip_address, BrowseExclude, CommissionableFilter,
MdnsBrowseState, MdnsRemoteService, MdnsResolveState, ResolvedNode,
};
use crate::transport::network::{MatterRemoteService, NetworkMulticast};
use crate::utils::init::{init, Init};
use crate::utils::ipv6::compute_group_multicast_addr;
use crate::utils::select::Coalesce;
use crate::utils::storage::Vec;
use crate::utils::storage::{pooled::Buffers, ParseBuf, WriteBuf};
use crate::utils::sync::{IfMutex, IfMutexGuard, Notification, Signal};
use crate::{Matter, MATTER_PORT};
use exchange::{Exchange, ExchangeId, ExchangeState, MessageMeta, ResponderState, Role};
use network::{Address, IpAddr, Ipv6Addr, NetworkReceive, NetworkSend, SocketAddr, SocketAddrV6};
use packet::PacketHdr;
use proto_hdr::ProtoHdr;
use session::{Session, Sessions};
mod dedup;
pub mod exchange;
pub mod mrp;
pub mod network;
pub mod packet;
pub mod plain_hdr;
pub mod proto_hdr;
pub mod session;
pub const MATTER_SOCKET_BIND_ADDR: SocketAddr =
SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, MATTER_PORT, 0, 0));
const MAX_GROUP_ADDRS: usize = MAX_FABRICS * MAX_GROUPS_PER_FABRIC;
const ACCEPT_TIMEOUT_MS: u64 = 1000;
#[cfg(feature = "large-buffers")]
pub(crate) const MAX_RX_BUF_SIZE: usize = network::MAX_RX_LARGE_PACKET_SIZE;
#[cfg(feature = "large-buffers")]
pub(crate) const MAX_TX_BUF_SIZE: usize = network::MAX_TX_LARGE_PACKET_SIZE;
#[cfg(not(feature = "large-buffers"))]
pub(crate) const MAX_RX_BUF_SIZE: usize = network::MAX_RX_PACKET_SIZE;
#[cfg(not(feature = "large-buffers"))]
pub(crate) const MAX_TX_BUF_SIZE: usize = network::MAX_TX_PACKET_SIZE;
pub const MAX_RX_PAYLOAD_SIZE: usize =
MAX_RX_BUF_SIZE - PacketHdr::HDR_RESERVE - PacketHdr::TAIL_RESERVE;
pub const MAX_TX_PAYLOAD_SIZE: usize =
MAX_TX_BUF_SIZE - PacketHdr::HDR_RESERVE - PacketHdr::TAIL_RESERVE;
pub struct Transport {
rx: IfMutex<Packet<MAX_RX_BUF_SIZE>>,
tx: IfMutex<Packet<MAX_TX_BUF_SIZE>>,
group_addrs: IfMutex<Vec<Ipv6Addr, MAX_GROUP_ADDRS>>,
exchange_dropped: Notification,
mdns_changed: Notification,
mdns_resolve: Signal<MdnsResolveState>,
mdns_browse: Signal<MdnsBrowseState>,
session_removed: Notification,
groups_modified: Notification,
device_sai: Option<u32>,
device_sii: Option<u32>,
}
impl Transport {
#[inline(always)]
pub(crate) const fn new(dev_det: &BasicInfoConfig<'_>) -> Self {
Self {
rx: IfMutex::new(Packet::new()),
tx: IfMutex::new(Packet::new()),
group_addrs: IfMutex::new(Vec::new()),
exchange_dropped: Notification::new(),
mdns_changed: Notification::new(),
mdns_resolve: Signal::new(MdnsResolveState::Idle),
mdns_browse: Signal::new(MdnsBrowseState::Idle),
session_removed: Notification::new(),
groups_modified: Notification::new(),
device_sai: dev_det.sai,
device_sii: dev_det.sii,
}
}
pub(crate) fn init<'m>(dev_det: &'m BasicInfoConfig<'m>) -> impl Init<Self> + 'm {
init!(Self {
rx <- IfMutex::init(Packet::init()),
tx <- IfMutex::init(Packet::init()),
group_addrs <- IfMutex::init(Vec::new()),
exchange_dropped: Notification::new(),
mdns_changed: Notification::new(),
mdns_resolve: Signal::new(MdnsResolveState::Idle),
mdns_browse: Signal::new(MdnsBrowseState::Idle),
session_removed: Notification::new(),
groups_modified: Notification::new(),
device_sai: dev_det.sai,
device_sii: dev_det.sii,
})
}
pub fn reset(&self) -> Result<(), Error> {
self.rx
.try_lock()
.map_err(|_| ErrorCode::InvalidState)?
.buf
.clear();
self.tx
.try_lock()
.map_err(|_| ErrorCode::InvalidState)?
.buf
.clear();
Ok(())
}
pub fn rx_buffer(&self) -> PacketBufferExternalAccess<'_, MAX_RX_BUF_SIZE> {
PacketBufferExternalAccess(&self.rx)
}
pub fn tx_buffer(&self) -> PacketBufferExternalAccess<'_, MAX_TX_BUF_SIZE> {
PacketBufferExternalAccess(&self.tx)
}
pub(crate) fn notify_mdns_changed(&self) {
self.mdns_changed.notify();
}
pub fn wait_mdns(&self) -> impl Future<Output = ()> + '_ {
self.mdns_changed.wait()
}
pub(crate) fn notify_groups_changed(&self) {
self.groups_modified.notify();
}
fn wait_groups_changed(&self) -> impl Future<Output = ()> + '_ {
self.groups_modified.wait()
}
pub(crate) fn notify_session_removed(&self) {
self.session_removed.notify();
}
pub(crate) fn wait_session_removed(&self) -> impl Future<Output = ()> + '_ {
self.session_removed.wait()
}
async fn resolve(
&self,
service: MatterRemoteService,
timeout_ms: u32,
) -> Result<ResolvedNode, Error> {
self.mdns_resolve
.wait(|state| {
if matches!(state, MdnsResolveState::Idle) {
*state = MdnsResolveState::Requested {
service: service.clone(),
};
Some(())
} else {
None
}
})
.await;
let mut guard = MdnsResolveGuard {
signal: &self.mdns_resolve,
armed: true,
};
let mut wait = pin!(self.mdns_resolve.wait(|state| match state {
MdnsResolveState::Resolved {
ip,
port,
scope_id,
sii,
sai,
sat,
} => {
let node = ResolvedNode {
addr: Self::scoped_socket_addr(*ip, *port, *scope_id),
sii: *sii,
sai: *sai,
sat: *sat,
};
*state = MdnsResolveState::Idle;
Some(node)
}
_ => None,
}));
let mut timer = pin!(Timer::after(Duration::from_millis(timeout_ms as u64)));
match select(&mut wait, &mut timer).await {
Either::First(node) => {
guard.armed = false;
Ok(node)
}
Either::Second(_) => Err(ErrorCode::NotFound.into()),
}
}
pub async fn wait_mdns_resolve_request(&self) -> MatterRemoteService {
self.mdns_resolve
.wait(|state| match state {
MdnsResolveState::Requested { service } => {
let service = service.clone();
*state = MdnsResolveState::InFlight {
service: service.clone(),
};
Some(service)
}
_ => None,
})
.await
}
#[allow(dead_code)]
pub fn mdns_resolve_in_flight(&self) -> bool {
self.mdns_resolve
.modify(|state| (false, matches!(state, MdnsResolveState::InFlight { .. })))
}
pub fn try_deposit_mdns_resolve<'a, I, A, T>(&self, answer: &MdnsRemoteService<I, A, T>)
where
I: ToLabelIter,
A: Iterator<Item = IpAddr> + Clone,
T: Iterator<Item = (&'a str, &'a str)> + Clone,
{
let Some(ip) = answer.addrs.clone().max_by_key(score_ip_address) else {
return;
};
let Some(port) = answer.port else {
return;
};
let scope_id = answer.scope_id;
let (sii, sai, sat) = answer.session_params();
self.mdns_resolve.modify(|state| match state {
MdnsResolveState::InFlight { service }
if service.matches_instance(&answer.instance_name) =>
{
*state = MdnsResolveState::Resolved {
ip,
port,
scope_id,
sii,
sai,
sat,
};
(true, ())
}
MdnsResolveState::Resolved {
ip: cur_ip,
sii: cur_sii,
sai: cur_sai,
sat: cur_sat,
..
} if score_ip_address(&ip) > score_ip_address(cur_ip) => {
*state = MdnsResolveState::Resolved {
ip,
port,
scope_id,
sii: sii.or(*cur_sii),
sai: sai.or(*cur_sai),
sat: sat.or(*cur_sat),
};
(true, ())
}
_ => (false, ()),
});
}
pub async fn browse_commissionable(
&self,
filter: &CommissionableFilter,
exclude: &[u64],
timeout_ms: u32,
) -> Result<(Address, u64), Error> {
let mut exclude_vec = BrowseExclude::new();
exclude_vec
.extend_from_slice(exclude)
.map_err(|_| ErrorCode::ResourceExhausted)?;
self.mdns_browse
.wait(|state| {
if matches!(state, MdnsBrowseState::Idle) {
*state = MdnsBrowseState::Requested {
filter: filter.clone(),
exclude: exclude_vec.clone(),
};
Some(())
} else {
None
}
})
.await;
let mut guard = MdnsBrowseGuard {
signal: &self.mdns_browse,
armed: true,
};
let mut wait = pin!(self.mdns_browse.wait(|state| match state {
MdnsBrowseState::Found {
ip,
port,
scope_id,
id,
} => {
let found = (
Address::Udp(Self::scoped_socket_addr(*ip, *port, *scope_id)),
*id,
);
*state = MdnsBrowseState::Idle;
Some(found)
}
_ => None,
}));
let mut timer = pin!(Timer::after(Duration::from_millis(timeout_ms as u64)));
match select(&mut wait, &mut timer).await {
Either::First(found) => {
guard.armed = false;
Ok(found)
}
Either::Second(_) => Err(ErrorCode::NotFound.into()),
}
}
pub async fn wait_mdns_browse_request(&self) -> CommissionableFilter {
self.mdns_browse
.wait(|state| match state {
MdnsBrowseState::Requested { filter, exclude } => {
let filter = filter.clone();
*state = MdnsBrowseState::InFlight {
filter: filter.clone(),
exclude: core::mem::take(exclude),
};
Some(filter)
}
_ => None,
})
.await
}
#[allow(dead_code)]
pub fn mdns_browse_in_flight(&self) -> bool {
self.mdns_browse
.modify(|state| (false, matches!(state, MdnsBrowseState::InFlight { .. })))
}
pub fn try_deposit_mdns_browse<'a, I, A, T>(&self, answer: &MdnsRemoteService<I, A, T>)
where
I: ToLabelIter,
A: Iterator<Item = IpAddr> + Clone,
T: Iterator<Item = (&'a str, &'a str)> + Clone,
{
let Some(id) = commissionable_instance_id(&answer.instance_name) else {
return;
};
let Some(ip) = answer.addrs.clone().max_by_key(score_ip_address) else {
return;
};
let Some(port) = answer.port else {
return;
};
let scope_id = answer.scope_id;
self.mdns_browse.modify(|state| match state {
MdnsBrowseState::InFlight { filter, exclude }
if !exclude.contains(&id) && filter.matches(answer) =>
{
*state = MdnsBrowseState::Found {
ip,
port,
scope_id,
id,
};
(true, ())
}
MdnsBrowseState::Found {
ip: cur_ip,
id: cur_id,
..
} if *cur_id == id && score_ip_address(&ip) > score_ip_address(cur_ip) => {
*state = MdnsBrowseState::Found {
ip,
port,
scope_id,
id,
};
(true, ())
}
_ => (false, ()),
});
}
pub(crate) async fn accept_if<'a, F>(
&self,
matter: &'a Matter<'a>,
mut f: F,
) -> Result<Exchange<'a>, Error>
where
F: FnMut(&Session, &ExchangeState, &Packet<MAX_RX_BUF_SIZE>) -> bool,
{
let exchange = self
.rx
.with(|packet| {
matter.with_state(|state| {
let session = state
.sessions
.get_for_rx(&packet.peer, &packet.header.plain)?;
let exch_index = session.get_exch_for_rx(&packet.header.proto)?;
let matches = {
let exch = unwrap!(session.exchanges[exch_index].as_ref());
matches!(exch.role, Role::Responder(ResponderState::AcceptPending))
&& f(session, exch, packet)
};
if !matches {
return None;
}
let exch = unwrap!(session.exchanges[exch_index].as_mut());
exch.role = Role::Responder(ResponderState::Owned);
let id = ExchangeId::new(session.id, exch_index);
debug!("Exchange {}: Accepted", id.display(session));
let exchange = Exchange::new(id, matter);
Some(exchange)
})
})
.await;
Ok(exchange)
}
const RESOLVE_TIMEOUT_MS: u32 = 5_000;
pub(crate) async fn initiate<'a, C: Crypto>(
&self,
matter: &'a Matter<'a>,
crypto: C,
fabric_idx: NonZeroU8,
peer_node_id: NodeId,
) -> Result<Exchange<'a>, Error> {
let existing = matter.with_state(|state| {
Ok::<_, Error>(
state
.sessions
.get_for_node(fabric_idx, peer_node_id)
.map(|s| s.id),
)
})?;
if let Some(session_id) = existing {
return self.initiate_for_session(matter, session_id);
}
let compressed_fabric_id = matter.with_state(|state| {
Ok::<_, Error>(state.fabrics.fabric(fabric_idx)?.compressed_fabric_id())
})?;
let service = MatterRemoteService::Operational {
compressed_fabric_id,
node_id: peer_node_id,
};
let resolved = self.resolve(service, Self::RESOLVE_TIMEOUT_MS).await?;
{
let mut exchange = self
.initiate_plaintext(matter, &crypto, Address::Udp(resolved.addr))
.await?;
CaseInitiator::initiate(&mut exchange, &crypto, fabric_idx, peer_node_id).await?;
}
let params = SessionParameters {
sii: resolved.sii,
sai: resolved.sai,
sat: resolved.sat,
..Default::default()
};
let session_id = matter.with_state(|state| {
let session = state
.sessions
.get_for_node(fabric_idx, peer_node_id)
.ok_or(ErrorCode::NoSession)?;
session.set_peer_session_params(¶ms);
Ok::<_, Error>(session.id)
})?;
self.initiate_for_session(matter, session_id)
}
pub(crate) async fn initiate_pase<'a, C: Crypto>(
&self,
matter: &'a Matter<'a>,
crypto: C,
peer_addr: Address,
passcode: u32,
) -> Result<Exchange<'a>, Error> {
let existing = matter.with_state(|state| {
Ok::<_, Error>(state.sessions.get_pase_for_addr(&peer_addr).map(|s| s.id))
})?;
if let Some(session_id) = existing {
return self.initiate_for_session(matter, session_id);
}
{
let mut handshake = self.initiate_plaintext(matter, &crypto, peer_addr).await?;
PaseInitiator::initiate(&mut handshake, &crypto, passcode).await?;
}
let session_id = matter.with_state(|state| {
state
.sessions
.get_pase_for_addr(&peer_addr)
.map(|s| s.id)
.ok_or_else(|| Error::from(ErrorCode::NoSession))
})?;
self.initiate_for_session(matter, session_id)
}
pub(crate) fn initiate_for_session<'a>(
&self,
matter: &'a Matter<'a>,
session_id: u32,
) -> Result<Exchange<'a>, Error> {
matter.with_state(|state| {
state
.sessions
.get(session_id)
.filter(|sess| !sess.is_expired())
.ok_or(ErrorCode::NoSession)?;
let exch_id = state.sessions.get_next_exch_id();
let session = unwrap!(state.sessions.get(session_id));
let exch_index = session
.add_exch(exch_id, Role::Initiator(Default::default()))
.ok_or(ErrorCode::NoSpaceExchanges)?;
let id = ExchangeId::new(session.id, exch_index);
debug!("Exchange {}: Initiated", id.display(session));
Ok(Exchange::new(id, matter))
})
}
async fn initiate_plaintext<'a, C: Crypto>(
&self,
matter: &'a Matter<'a>,
crypto: C,
peer_addr: Address,
) -> Result<Exchange<'a>, Error> {
match self.try_initiate_plaintext(matter, &crypto, peer_addr) {
Ok(exchange) => Ok(exchange),
Err(e) if e.code() == ErrorCode::NoSpaceSessions => {
matter
.transport_runner(&crypto)
.evict_some_session()
.await?;
self.try_initiate_plaintext(matter, &crypto, peer_addr)
}
Err(e) => Err(e),
}
}
fn try_initiate_plaintext<'a, C: Crypto>(
&self,
matter: &'a Matter<'a>,
crypto: C,
peer_addr: Address,
) -> Result<Exchange<'a>, Error> {
let session_id = self.create_plaintext_session(matter, crypto, peer_addr)?;
self.initiate_for_session(matter, session_id)
}
fn create_plaintext_session<C: Crypto>(
&self,
matter: &Matter<'_>,
crypto: C,
peer_addr: Address,
) -> Result<u32, Error> {
matter.with_state(|state| {
let mut rand = crypto.rand()?;
let session =
state
.sessions
.add(rand.next_u32(), false, peer_addr, None, matter.dev_det())?;
const MAX_OPERATIONAL_NODE_ID: u64 = 0xFFFF_FFEF_FFFF_FFFF;
let mut ephemeral_id = rand.next_u64();
while ephemeral_id == 0 || ephemeral_id > MAX_OPERATIONAL_NODE_ID {
ephemeral_id = rand.next_u64();
}
session.set_local_nodeid(ephemeral_id);
let session_id = session.id;
debug!(
"Unsecured session {} created for peer {}",
session_id, peer_addr
);
Ok(session_id)
})
}
pub(crate) async fn get_if_rx<F>(&self, f: F) -> PacketAccess<'_, MAX_RX_BUF_SIZE>
where
F: Fn(&Packet<MAX_RX_BUF_SIZE>) -> bool,
{
Self::get_if(&self.rx, f).await
}
pub(crate) async fn get_if_tx<F>(&self, f: F) -> PacketAccess<'_, MAX_TX_BUF_SIZE>
where
F: Fn(&Packet<MAX_TX_BUF_SIZE>) -> bool,
{
Self::get_if(&self.tx, f).await
}
async fn get_if<'b, F, const N: usize>(
packet_mutex: &'b IfMutex<Packet<N>>,
f: F,
) -> PacketAccess<'b, N>
where
F: Fn(&Packet<N>) -> bool,
{
PacketAccess(packet_mutex.lock_if(f).await, false)
}
fn scoped_socket_addr(ip: IpAddr, port: u16, scope_id: u32) -> SocketAddr {
match ip {
IpAddr::V6(v6) if v6.is_unicast_link_local() => {
SocketAddr::V6(SocketAddrV6::new(v6, port, 0, scope_id))
}
_ => SocketAddr::new(ip, port),
}
}
}
struct MdnsResolveGuard<'a> {
signal: &'a Signal<MdnsResolveState>,
armed: bool,
}
impl Drop for MdnsResolveGuard<'_> {
fn drop(&mut self) {
if self.armed {
self.signal.modify(|state| {
if matches!(state, MdnsResolveState::Idle) {
(false, ())
} else {
*state = MdnsResolveState::Idle;
(true, ())
}
});
}
}
}
struct MdnsBrowseGuard<'a> {
signal: &'a Signal<MdnsBrowseState>,
armed: bool,
}
impl Drop for MdnsBrowseGuard<'_> {
fn drop(&mut self) {
if self.armed {
self.signal.modify(|state| {
if matches!(state, MdnsBrowseState::Idle) {
(false, ())
} else {
*state = MdnsBrowseState::Idle;
(true, ())
}
});
}
}
}
pub struct TransportRunner<'a, C> {
matter: &'a Matter<'a>,
crypto: C,
}
impl<'a, C: Crypto> TransportRunner<'a, C> {
pub const fn new(matter: &'a Matter<'a>, crypto: C) -> Self {
Self { matter, crypto }
}
pub async fn run<S, R, M>(&mut self, send: S, recv: R, multicast: M) -> Result<(), Error>
where
S: NetworkSend,
R: NetworkReceive,
M: NetworkMulticast,
{
info!("Running Matter transport");
debug!("APP STATUS: Starting event loop");
let mut joined = self.transport().group_addrs.lock().await;
let send = IfMutex::new(send);
let mut rx = pin!(self.process_rx(recv, &send));
let mut tx = pin!(self.process_tx(&send));
let mut orphaned = pin!(self.process_orphaned());
let mut groups = pin!(self.process_groups(multicast, &mut joined));
select4(&mut rx, &mut tx, &mut orphaned, &mut groups)
.coalesce()
.await
}
async fn process_groups<M>(
&self,
mut multicast: M,
joined: &mut Vec<Ipv6Addr, MAX_GROUP_ADDRS>,
) -> Result<(), Error>
where
M: NetworkMulticast,
{
joined.clear();
loop {
let addr_op = self.matter.with_state(|state| {
let group_addrs = || {
state.fabrics.iter().flat_map(|fabric| {
fabric.groups().iter().map(|group| {
compute_group_multicast_addr(fabric.fabric_id(), group.group_id)
})
})
};
if let Some(new_addr) = group_addrs().find(|addr| !joined.contains(addr)) {
Some((new_addr, true))
} else {
joined
.iter()
.find(|addr| !group_addrs().any(|a| a == **addr))
.map(|&removed_addr| (removed_addr, false))
}
});
match addr_op {
Some((new_addr, true)) => {
match multicast.join(new_addr.into()).await {
Ok(_) => {
debug!("Joined multicast group: {}", new_addr);
unwrap!(joined.push(new_addr));
}
Err(e) => error!(
"Joining multicast group {} failed with error: {}",
new_addr, e
),
}
}
Some((removed_addr, false)) => match multicast.leave(removed_addr.into()).await {
Ok(_) => {
debug!("Left multicast group: {}", removed_addr);
let index = joined
.iter()
.position(|&addr| addr == removed_addr)
.unwrap();
joined.swap_remove(index);
}
Err(e) => error!(
"Leaving multicast group {} failed with error: {}",
removed_addr, e
),
},
None => {
self.transport().wait_groups_changed().await;
}
}
}
}
async fn process_tx<S>(&self, send: &IfMutex<S>) -> Result<(), Error>
where
S: NetworkSend,
{
loop {
trace!("Waiting for outgoing packet");
let mut tx = self
.matter
.transport
.get_if_tx(|packet| !packet.buf.is_empty())
.await;
tx.clear_on_drop(true);
if let TxPayloadState::NotEncoded { session_id } = tx.tx_info.payload_state {
let encoded = self.matter.with_state(|state| {
if let Some(session) = state.sessions.get_for_tx(session_id) {
self.encode_packet(&mut tx, Some(session))?;
Ok::<_, Error>(true)
} else {
error!(
"TX packet has session ID {}, but no such session exists, dropping",
session_id
);
Ok(false)
}
})?;
if !encoded {
continue;
}
}
Self::netw_send(send, tx.peer, &tx.buf[tx.payload_start..], false).await?;
}
}
async fn process_rx<R, S>(&self, mut recv: R, send: &IfMutex<S>) -> Result<(), Error>
where
R: NetworkReceive,
S: NetworkSend,
{
loop {
trace!("Waiting for incoming packet");
recv.wait_available().await?;
let mut rx = self
.matter
.transport
.get_if_rx(|packet| packet.buf.is_empty())
.await;
rx.clear_on_drop(true);
unwrap!(rx.buf.resize_default(MAX_RX_BUF_SIZE));
let (len, peer) = Self::netw_recv(&mut recv, &mut rx.buf).await?;
rx.peer = peer;
rx.buf.truncate(len);
rx.payload_start = 0;
match self.handle_rx_packet(&mut rx, send).await {
Ok(true) => {
rx.clear_on_drop(false);
}
Ok(false) => {
}
Err(e) => {
error!("UNEXPECTED RX ERROR: {:?}", e);
}
}
}
}
async fn process_orphaned(&self) -> Result<(), Error> {
let mut rx_accept_timeout = pin!(self.process_accept_timeout_rx());
let mut rx_orphaned = pin!(self.process_orphaned_rx());
let mut exch_dropped = pin!(self.process_dropped_exchanges());
select3(&mut rx_accept_timeout, &mut rx_orphaned, &mut exch_dropped)
.coalesce()
.await
}
async fn process_accept_timeout_rx(&self) -> Result<(), Error> {
loop {
trace!("Waiting for accept timeout");
let mut accept_timeout = pin!(self
.matter
.transport
.rx
.with(|packet| { self.handle_accept_timeout_rx_packet(packet).then_some(()) }));
let mut timer = pin!(Timer::after(embassy_time::Duration::from_millis(50)));
select(&mut accept_timeout, &mut timer).await;
}
}
async fn process_orphaned_rx(&self) -> Result<(), Error> {
loop {
trace!("Waiting for orphaned RX packets");
self.transport()
.rx
.with(|packet| self.handle_orphaned_rx_packet(packet).then_some(()))
.await;
}
}
async fn process_dropped_exchanges(&self) -> Result<(), Error> {
loop {
trace!("Waiting for dropped exchanges");
let mut tx = self
.matter
.transport
.get_if_tx(|packet| packet.buf.is_empty())
.await;
tx.clear_on_drop(true);
let wait = match self.handle_dropped_exchange(&mut tx) {
Ok(wait) => {
tx.clear_on_drop(false);
wait
}
Err(e) => {
error!("UNEXPECTED RX ERROR: {:?}", e);
false
}
};
drop(tx);
if wait {
let mut timeout = pin!(Timer::after(embassy_time::Duration::from_millis(100)));
let mut wait = pin!(self.transport().exchange_dropped.wait());
select(&mut timeout, &mut wait).await;
}
}
}
async fn handle_rx_packet<const N: usize, S>(
&self,
packet: &mut Packet<N>,
send: &IfMutex<S>,
) -> Result<bool, Error>
where
S: NetworkSend,
{
let result = self.decode_packet(packet);
match result {
Err(e) if matches!(e.code(), ErrorCode::Duplicate) => {
if packet.header.plain.is_group_session() {
debug!(
"\n>>RCV {}\n => Duplicate group message, discarding",
packet
);
} else if !packet.peer.is_reliable()
&& !MessageMeta::from(&packet.header.proto).is_standalone_ack()
{
debug!("\n>>RCV {}\n => Duplicate, sending ACK", packet);
self.matter.with_state(|state| {
let session = unwrap!(state
.sessions
.get_for_rx(&packet.peer, &packet.header.plain));
let ack = packet.header.plain.ctr;
packet.header.proto.toggle_initiator();
packet.header.proto.set_ack(Some(ack));
self.write_packet(packet, Some(session), None, true, |_| {
Ok(Some(OpCode::MRPStandAloneAck.into()))
})
})?;
Self::netw_send(send, packet.peer, &packet.buf[packet.payload_start..], true)
.await?;
} else {
debug!("\n>>RCV {}\n => Duplicate, discarding", packet);
}
}
Err(e) if matches!(e.code(), ErrorCode::NoSpaceSessions) => {
if !packet.header.plain.is_encrypted()
&& MessageMeta::from(&packet.header.proto).is_new_session()
{
warn!(
"\n>>RCV {}\n => No space for a new unencrypted session, sending Busy",
packet
);
let ack = packet.header.plain.ctr;
packet.header.proto.toggle_initiator();
packet.header.proto.set_ack(Some(ack));
self.write_packet(packet, None, None, true, |wb| {
sc_write(wb, SCStatusCodes::Busy, &[0xF4, 0x01])
})?;
Self::netw_send(send, packet.peer, &packet.buf[packet.payload_start..], true)
.await?;
if self.write_evict_some_session_packet(packet, true)? {
Self::netw_send(
send,
packet.peer,
&packet.buf[packet.payload_start..],
true,
)
.await?;
}
} else {
error!(
"\n>>RCV {}\n => No space for a new encrypted session, dropping",
packet
);
}
}
Err(e) if matches!(e.code(), ErrorCode::NoSpaceExchanges) => {
error!(
"\n>>RCV {}\n => No space for a new exchange, closing session",
packet
);
self.matter.with_state(|state| {
let session_id = unwrap!(state
.sessions
.get_for_rx(&packet.peer, &packet.header.plain))
.id;
packet.header.proto.exch_id = state.sessions.get_next_exch_id();
packet.header.proto.set_initiator();
let mut session = unwrap!(state.sessions.remove(session_id));
self.transport().notify_session_removed();
self.write_packet(packet, Some(&mut session), None, true, |wb| {
sc_write(wb, SCStatusCodes::CloseSession, &[])
})
})?;
Self::netw_send(send, packet.peer, &packet.buf[packet.payload_start..], true)
.await?;
}
Err(e) if matches!(e.code(), ErrorCode::NoExchange) => {
warn!(
"\n>>RCV {}\n => No valid exchange found, dropping",
packet
);
}
Err(e) if matches!(e.code(), ErrorCode::NoSession) => {
warn!(
"\n>>RCV {}\n => No valid session found, replying with SessionNotFound",
packet
);
packet.header.plain.sess_id = 0;
packet.header.plain.set_src_nodeid(Some(0));
packet.header.proto.unset_reliable();
packet.header.proto.set_ack(None);
self.write_packet(packet, None, None, true, |wb| {
sc_write(wb, SCStatusCodes::SessionNotFound, &[])
})?;
Self::netw_send(send, packet.peer, &packet.buf[packet.payload_start..], true)
.await?;
}
Err(e) => {
error!("\n>>RCV {}\n => Error ({:?}), dropping", packet, e);
}
Ok(new_exchange) => {
let meta = MessageMeta::from(&packet.header.proto);
if meta.is_standalone_ack() {
debug!("\n>>RCV {}\n => Standalone Ack, dropping", packet);
} else if meta.is_sc_status()
&& matches!(
Self::is_close_session(&mut packet.buf[packet.payload_start..]),
Ok(true)
)
{
warn!(
"\n>>RCV {}\n => Close session received, removing this session",
packet
);
self.matter.with_state(|state| {
if let Some(session_id) = state
.sessions
.get_for_rx(&packet.peer, &packet.header.plain)
.map(|sess| sess.id)
{
state.sessions.remove(session_id);
self.transport().notify_session_removed();
}
});
} else {
debug!(
"\n>>RCV {}\n => Processing{}",
packet,
if new_exchange { " (new exchange)" } else { "" }
);
#[cfg(feature = "debug-tlv-payload")]
debug!(
"{}",
Packet::<0>::display_payload(
&packet.header.proto,
&packet.buf[core::cmp::min(packet.payload_start, packet.buf.len())..]
)
);
#[cfg(not(feature = "debug-tlv-payload"))]
trace!(
"{}",
Packet::<0>::display_payload(
&packet.header.proto,
&packet.buf[core::cmp::min(packet.payload_start, packet.buf.len())..]
)
);
return Ok(true);
}
}
}
Ok(false)
}
fn handle_accept_timeout_rx_packet<const N: usize>(&self, packet: &mut Packet<N>) -> bool {
if packet.buf.is_empty() {
return false;
}
self.matter.with_state(|state| {
let Some(session) = state
.sessions
.get_for_rx(&packet.peer, &packet.header.plain)
else {
return false;
};
let Some(exch_index) = session.get_exch_for_rx(&packet.header.proto) else {
return false;
};
let exchange = unwrap!(session.exchanges[exch_index].as_mut());
if !matches!(
exchange.role,
Role::Responder(ResponderState::AcceptPending)
) || !exchange.mrp.has_rx_timed_out(ACCEPT_TIMEOUT_MS)
{
return false;
}
warn!(
"\n>>RCV {}\n => Accept timeout, marking exchange as dropped",
packet
);
exchange.role = Role::Responder(ResponderState::Dropped);
packet.buf.clear();
self.transport().exchange_dropped.notify();
true
})
}
fn handle_orphaned_rx_packet<const N: usize>(&self, packet: &mut Packet<N>) -> bool {
if packet.buf.is_empty() {
return false;
}
self.matter.with_state(|state| {
let Some(session) = state
.sessions
.get_for_rx(&packet.peer, &packet.header.plain)
else {
warn!("\n>>RCV {}\n => No session, dropping", packet);
packet.buf.clear();
return true;
};
let Some(exch_index) = session.get_exch_for_rx(&packet.header.proto) else {
warn!("\n>>RCV {}\n => No exchange, dropping", packet);
packet.buf.clear();
return true;
};
let exchange = unwrap!(session.exchanges[exch_index].as_mut());
if exchange.role.is_dropped_state() {
warn!(
"\n>>RCV {}\n => Owned by orphaned dropped {}, dropping packet",
packet,
ExchangeId::new(session.id, exch_index)
);
packet.buf.clear();
return true;
}
false
})
}
fn handle_dropped_exchange<const N: usize>(
&self,
packet: &mut Packet<N>,
) -> Result<bool, Error> {
self.matter.with_state(|state| {
let exch = state
.sessions
.get_exch(|_, exch| exch.role.is_dropped_state() && exch.mrp.is_retrans_pending())
.map(|(sess, exch_index)| (sess.id, exch_index, true))
.or_else(|| {
state
.sessions
.get_exch(|_, exch| {
exch.role.is_dropped_state() && !exch.mrp.is_retrans_pending()
})
.map(|(sess, exch_index)| (sess.id, exch_index, false))
});
let Some((session_id, exch_index, close_session)) = exch else {
return Ok(exch.is_none());
};
let exchange_id = ExchangeId::new(session_id, exch_index);
if close_session {
error!(
"Dropped exchange {}: Closing session because the exchange cannot be closed cleanly",
exchange_id.display(unwrap!(state.sessions.get(session_id))) );
self.write_evict_session_packet(packet, &mut state.sessions, session_id, false)?;
} else {
let session = unwrap!(state.sessions.get(session_id));
let exchange = unwrap!(session.exchanges[exch_index].as_mut());
if exchange.mrp.is_ack_pending() {
self.write_packet(
packet,
Some(session),
Some(exch_index),
false,
|_| Ok(Some(OpCode::MRPStandAloneAck.into())),
)?;
}
warn!("Dropped exchange {}: Closed", exchange_id.display(session));
session.exchanges[exch_index] = None;
}
Ok(exch.is_none())
})
}
pub(crate) async fn evict_some_session(&self) -> Result<(), Error> {
let mut tx = self
.matter
.transport
.get_if_tx(|packet| packet.buf.is_empty())
.await;
tx.clear_on_drop(true);
let evicted = self.write_evict_some_session_packet(&mut tx, true)?;
if evicted {
tx.clear_on_drop(false);
Ok(())
} else {
Err(ErrorCode::NoSpaceSessions.into())
}
}
fn decode_packet<const N: usize>(&self, packet: &mut Packet<N>) -> Result<bool, Error> {
self.matter.with_state(|state| {
packet.header.reset();
let mut pb = ParseBuf::new(&mut packet.buf[packet.payload_start..]);
packet.header.plain.decode(&mut pb)?;
let set_payload = |packet: &mut Packet<N>, (start, end)| {
packet.payload_start = start;
packet.buf.truncate(end);
};
if let Some(session) = state
.sessions
.get_for_rx(&packet.peer, &packet.header.plain)
{
let payload_range =
session.decode_remaining(&self.crypto, &mut packet.header, pb)?;
set_payload(packet, payload_range);
return session.post_recv(&packet.header);
}
if !packet.header.plain.is_encrypted() {
packet
.header
.decode_remaining(&self.crypto, None, 0, &mut pb)?;
packet.header.proto.adjust_reliability(true, &packet.peer);
let payload_range = pb.slice_range();
set_payload(packet, payload_range);
if MessageMeta::from(&packet.header.proto).is_new_session() {
let mut rand = self.crypto.rand()?;
let session = state.sessions.add(
rand.next_u32(),
false,
packet.peer,
packet.header.plain.get_src_nodeid(),
self.matter.dev_det(),
)?;
return session.post_recv(&packet.header);
}
} else if packet.header.plain.is_group_session() {
let (session, payload_range) = state.sessions.get_or_create_for_group_rx(
&self.crypto,
&state.fabrics,
packet,
self.matter.dev_det(),
)?;
set_payload(packet, payload_range);
return session.post_recv(&packet.header);
} else {
set_payload(packet, (0, 0));
}
Err(ErrorCode::NoSession.into())
})
}
fn encode_packet<const N: usize>(
&self,
packet: &mut Packet<N>,
session: Option<&mut Session>,
) -> Result<(), Error> {
assert!(matches!(
packet.tx_info.payload_state,
TxPayloadState::NotEncoded { .. }
));
let payload_end = packet.buf.len();
debug!(
"\n<<SND {}\n => {}",
Packet::<0>::display(&packet.peer, &packet.header),
if packet.tx_info.retransmission {
"Re-sending"
} else {
"Sending"
}
);
#[cfg(feature = "debug-tlv-payload")]
debug!(
"{}",
Packet::<0>::display_payload(
&packet.header.proto,
&packet.buf[packet.payload_start..payload_end]
)
);
#[cfg(not(feature = "debug-tlv-payload"))]
trace!(
"{}",
Packet::<0>::display_payload(
&packet.header.proto,
&packet.buf[packet.payload_start..payload_end]
)
);
unwrap!(packet.buf.resize_default(N));
let mut wb = WriteBuf::new_with(&mut packet.buf, packet.payload_start, payload_end);
if let Some(session) = session {
session.encode(&self.crypto, &packet.header, &mut wb)?;
} else {
packet.header.encode(&self.crypto, None, 0, &mut wb)?;
}
let encoded_payload_start = wb.get_start();
let encoded_payload_end = wb.get_tail();
packet.payload_start = encoded_payload_start;
packet.tx_info.payload_state = TxPayloadState::Encoded;
packet.buf.truncate(encoded_payload_end);
Ok(())
}
fn write_packet<const N: usize, F>(
&self,
packet: &mut Packet<N>,
mut session: Option<&mut Session>,
exchange_index: Option<usize>,
encode: bool,
payload_writer: F,
) -> Result<(), Error>
where
F: FnOnce(&mut WriteBuf) -> Result<Option<MessageMeta>, Error>,
{
unwrap!(packet.buf.resize_default(N));
let mut wb = WriteBuf::new_with(
&mut packet.buf,
PacketHdr::HDR_RESERVE,
PacketHdr::HDR_RESERVE,
);
let Some(meta) = payload_writer(&mut wb)? else {
packet.buf.clear();
return Ok(());
};
let (start, end) = (wb.get_start(), wb.get_tail());
packet.payload_start = start;
packet.buf.truncate(end);
meta.set_into(&mut packet.header.proto);
if let Some(session) = &mut session {
packet.header.plain = Default::default();
let (peer, retransmission) = session.pre_send(
exchange_index,
&mut packet.header,
self.transport().device_sai,
self.transport().device_sii,
)?;
packet.peer = peer;
packet.tx_info.retransmission = retransmission;
packet.tx_info.payload_state = TxPayloadState::NotEncoded {
session_id: session.id,
};
} else {
if packet.header.plain.is_encrypted()
|| packet.header.plain.get_src_nodeid().is_none()
|| packet.header.proto.is_reliable()
{
Err(ErrorCode::NoSession)?;
}
let src_nodeid = packet.header.plain.get_src_nodeid();
packet.header.plain = Default::default();
packet.header.plain.sess_id = 0;
packet.header.plain.ctr = 1;
packet.header.plain.set_src_nodeid(None);
packet.header.plain.set_dst_unicast_nodeid(src_nodeid);
packet.header.proto.unset_initiator();
packet.header.proto.adjust_reliability(false, &packet.peer);
packet.tx_info.retransmission = false;
packet.tx_info.payload_state = TxPayloadState::NotEncoded { session_id: 0 };
}
if encode {
self.encode_packet(packet, session)?;
}
Ok(())
}
fn write_evict_some_session_packet<const N: usize>(
&self,
packet: &mut Packet<N>,
encode: bool,
) -> Result<bool, Error> {
self.matter.with_state(|state| {
let id = state
.sessions
.get_session_for_eviction()
.map(|sess| sess.id);
if let Some(id) = id {
self.write_evict_session_packet(packet, &mut state.sessions, id, encode)?;
Ok(true)
} else {
error!("All sessions have active exchanges, cannot evict any session");
Ok(false)
}
})
}
fn write_evict_session_packet<const N: usize>(
&self,
packet: &mut Packet<N>,
sessions: &mut Sessions,
id: u32,
encode: bool,
) -> Result<(), Error> {
packet.header.proto.exch_id = sessions.get_next_exch_id();
packet.header.proto.set_initiator();
let mut session = unwrap!(sessions.remove(id));
self.transport().notify_session_removed();
debug!(
"Evicting session {} [SID:{:x},RSID:{:x}]",
session.id,
session.get_local_sess_id(),
session.get_peer_sess_id()
);
self.write_packet(packet, Some(&mut session), None, encode, |wb| {
sc_write(wb, SCStatusCodes::CloseSession, &[])
})?;
Ok(())
}
fn is_close_session(payload: &mut [u8]) -> Result<bool, Error> {
let mut pb = ParseBuf::new(payload);
let report = StatusReport::read(&mut pb)?;
let close_session = report.proto_id == PROTO_ID_SECURE_CHANNEL as u32
&& report.proto_code == SCStatusCodes::CloseSession as u16;
Ok(close_session)
}
async fn netw_recv<R>(mut recv: R, buf: &mut [u8]) -> Result<(usize, Address), Error>
where
R: NetworkReceive,
{
match recv.recv_from(buf).await {
Ok((len, addr)) => {
trace!("\n>>RCV {} {}B:\n {}", addr, len, Bytes(&buf[..len]));
Ok((len, addr))
}
Err(e) => {
error!("FAILED network recv: {:?}", e);
Err(e)
}
}
}
async fn netw_send<S>(
send: &IfMutex<S>,
peer: Address,
data: &[u8],
system: bool,
) -> Result<(), Error>
where
S: NetworkSend,
{
match send.lock().await.send_to(data, peer).await {
Ok(_) => {
trace!(
"\n<<SND {} {}B{}: {}",
peer,
data.len(),
if system { " (system)" } else { "" },
Bytes(data)
);
Ok(())
}
Err(e) => {
error!(
"\n<<SND {} {}B{} !FAILED!: {:?}",
peer,
data.len(),
if system { " (system)" } else { "" },
e
);
Ok(())
}
}
}
#[inline(always)]
const fn transport(&self) -> &Transport {
self.matter.transport()
}
}
#[derive(Copy, Clone, Default, PartialEq, Eq, Debug, Hash)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub(crate) enum TxPayloadState {
#[default]
Encoded,
NotEncoded {
session_id: u32,
},
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub(crate) struct TxInfo {
pub(crate) retransmission: bool,
pub(crate) payload_state: TxPayloadState,
}
impl TxInfo {
pub const fn new() -> Self {
Self {
retransmission: false,
payload_state: TxPayloadState::Encoded,
}
}
}
impl Default for TxInfo {
fn default() -> Self {
Self::new()
}
}
pub(crate) struct Packet<const N: usize> {
pub(crate) peer: Address,
pub(crate) header: PacketHdr,
pub(crate) buf: PacketBuffer<N>,
pub(crate) payload_start: usize,
pub(crate) tx_info: TxInfo,
}
impl<const N: usize> Packet<N> {
#[inline(always)]
pub(crate) const fn new() -> Self {
Self {
peer: Address::new(),
header: PacketHdr::new(),
buf: PacketBuffer::new(),
payload_start: 0,
tx_info: TxInfo::new(),
}
}
pub(crate) fn init() -> impl Init<Self> {
init!(Self {
peer: Address::new(),
header: PacketHdr::new(),
buf <- PacketBuffer::init(),
payload_start: 0,
tx_info: TxInfo::new(),
})
}
#[cfg(feature = "defmt")]
pub fn display<'a>(
peer: &'a Address,
header: &'a PacketHdr,
) -> impl Display + defmt::Format + 'a {
PacketInfo(peer, header)
}
#[cfg(not(feature = "defmt"))]
pub fn display<'a>(peer: &'a Address, header: &'a PacketHdr) -> impl Display + 'a {
PacketInfo(peer, header)
}
#[cfg(feature = "defmt")]
pub fn display_payload<'a>(
proto: &'a ProtoHdr,
buf: &'a [u8],
) -> impl Display + defmt::Format + 'a {
DetailedPacketInfo(proto, buf)
}
#[cfg(not(feature = "defmt"))]
pub fn display_payload<'a>(proto: &'a ProtoHdr, buf: &'a [u8]) -> impl Display + 'a {
DetailedPacketInfo(proto, buf)
}
fn fmt(f: &mut fmt::Formatter<'_>, peer: &Address, header: &PacketHdr) -> fmt::Result {
write!(f, "{peer} {header}")?;
if header.proto.is_decoded() {
let meta = MessageMeta::from(&header.proto);
write!(f, "\n {meta}")?;
}
Ok(())
}
#[cfg(feature = "defmt")]
fn format(f: defmt::Formatter<'_>, peer: &Address, header: &PacketHdr) {
defmt::write!(f, "{} {}", peer, header);
if header.proto.is_decoded() {
let meta = MessageMeta::from(&header.proto);
defmt::write!(f, "\n {}", meta);
}
}
fn fmt_payload(f: &mut fmt::Formatter<'_>, proto: &ProtoHdr, buf: &[u8]) -> fmt::Result {
let meta = MessageMeta::from(proto);
write!(f, "{meta}")?;
if meta.is_tlv() {
write!(
f,
"; TLV:\n----------------\n{}\n----------------\n",
TLVElement::new(buf)
)?;
} else {
write!(
f,
"; Payload:\n----------------\n{:02x?}\n----------------\n",
buf
)?;
}
Ok(())
}
#[cfg(feature = "defmt")]
fn format_payload(f: defmt::Formatter<'_>, proto: &ProtoHdr, buf: &[u8]) {
let meta = MessageMeta::from(proto);
defmt::write!(f, "{}", meta);
if meta.is_tlv() {
defmt::write!(
f,
"; TLV:\n----------------\n{}\n----------------\n",
TLVElement::new(buf)
);
} else {
defmt::write!(
f,
"; Payload:\n----------------\n{}\n----------------\n",
crate::fmt::Bytes(buf)
);
}
}
}
impl<const N: usize> Display for Packet<N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Self::fmt(f, &self.peer, &self.header)
}
}
#[cfg(feature = "defmt")]
impl<const N: usize> defmt::Format for Packet<N> {
fn format(&self, f: defmt::Formatter<'_>) {
Self::format(f, &self.peer, &self.header)
}
}
struct PacketInfo<'a>(&'a Address, &'a PacketHdr);
impl Display for PacketInfo<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Packet::<0>::fmt(f, self.0, self.1)
}
}
#[cfg(feature = "defmt")]
impl defmt::Format for PacketInfo<'_> {
fn format(&self, f: defmt::Formatter<'_>) {
Packet::<0>::format(f, self.0, self.1)
}
}
struct DetailedPacketInfo<'a>(&'a ProtoHdr, &'a [u8]);
impl Display for DetailedPacketInfo<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Packet::<0>::fmt_payload(f, self.0, self.1)
}
}
#[cfg(feature = "defmt")]
impl defmt::Format for DetailedPacketInfo<'_> {
fn format(&self, f: defmt::Formatter<'_>) {
Packet::<0>::format_payload(f, self.0, self.1)
}
}
pub(crate) struct PacketBuffer<const N: usize> {
buffer: crate::utils::storage::Vec<u8, N>,
}
impl<const N: usize> PacketBuffer<N> {
pub const fn new() -> Self {
Self {
buffer: crate::utils::storage::Vec::new(),
}
}
pub fn init() -> impl Init<Self> {
init!(Self {
buffer <- crate::utils::storage::Vec::init(),
})
}
pub fn buf_mut(&mut self) -> &mut crate::utils::storage::Vec<u8, N> {
&mut self.buffer
}
pub fn buf_ref(&self) -> &crate::utils::storage::Vec<u8, N> {
&self.buffer
}
}
impl<const N: usize> Deref for PacketBuffer<N> {
type Target = crate::utils::storage::Vec<u8, N>;
fn deref(&self) -> &Self::Target {
self.buf_ref()
}
}
impl<const N: usize> DerefMut for PacketBuffer<N> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.buf_mut()
}
}
pub(crate) struct PacketAccess<'a, const N: usize>(IfMutexGuard<'a, Packet<N>>, bool);
impl<const N: usize> PacketAccess<'_, N> {
pub fn clear_on_drop(&mut self, clear: bool) {
self.1 = clear;
}
}
impl<const N: usize> Deref for PacketAccess<'_, N> {
type Target = Packet<N>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<const N: usize> DerefMut for PacketAccess<'_, N> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<const N: usize> Drop for PacketAccess<'_, N> {
fn drop(&mut self) {
if self.1 {
self.buf.clear();
}
}
}
impl<const N: usize> Display for PacketAccess<'_, N> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
pub struct PacketBufferExternalAccess<'a, const N: usize>(pub(crate) &'a IfMutex<Packet<N>>);
impl<const N: usize> Buffers<[u8]> for PacketBufferExternalAccess<'_, N> {
type Buffer<'b>
= ExternalPacketBuffer<'b, N>
where
Self: 'b;
async fn get(&self) -> Option<ExternalPacketBuffer<'_, N>> {
let mut packet = self.0.lock_if(|packet| packet.buf.is_empty()).await;
unwrap!(packet.buf.resize_default(N));
Some(ExternalPacketBuffer(packet))
}
fn get_immediate(&self) -> Option<Self::Buffer<'_>> {
self.0
.try_lock_if(|packet| packet.buf.is_empty())
.ok()
.map(|mut packet| {
unwrap!(packet.buf.resize_default(N));
ExternalPacketBuffer(packet)
})
}
}
pub struct ExternalPacketBuffer<'a, const N: usize>(IfMutexGuard<'a, Packet<N>>);
impl<const N: usize> Deref for ExternalPacketBuffer<'_, N> {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.0.buf
}
}
impl<const N: usize> DerefMut for ExternalPacketBuffer<'_, N> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0.buf
}
}
impl<const N: usize> Drop for ExternalPacketBuffer<'_, N> {
fn drop(&mut self) {
self.0.buf.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crypto::test_only_crypto;
use crate::dm::devices::test::{TEST_DEV_ATT, TEST_DEV_COMM, TEST_DEV_DET};
fn test_matter() -> Matter<'static> {
Matter::new(&TEST_DEV_DET, TEST_DEV_COMM, &TEST_DEV_ATT, 0)
}
#[test]
fn test_create_unsecured_session_creates_plaintext_session() {
let matter = test_matter();
let crypto = test_only_crypto();
let peer = Address::new();
let session_id = matter
.transport
.create_plaintext_session(&matter, &crypto, peer)
.unwrap();
matter.with_state(|state| {
let session = state.sessions.get(session_id).unwrap();
assert_eq!(session.id, session_id);
assert!(!session.is_encrypted());
assert_eq!(session.get_peer_node_id(), None);
assert_eq!(*session.get_session_mode(), session::SessionMode::PlainText);
});
}
#[test]
fn test_initiate_unsecured_now_creates_initiator_exchange() {
let matter = test_matter();
let crypto = test_only_crypto();
let peer = Address::new();
let exchange = matter
.transport
.try_initiate_plaintext(&matter, &crypto, peer)
.unwrap();
exchange
.with_state(|state| {
let sess = exchange.id().session(&mut state.sessions);
let exch = exchange.id().exch(sess);
assert!(matches!(exch.role, Role::Initiator(_)));
assert_eq!(sess.id, exchange.id().session_id());
Ok(())
})
.unwrap();
}
}
#[cfg(test)]
mod resolve_tests {
use core::net::{IpAddr, Ipv4Addr, SocketAddr};
use futures_lite::future::{block_on, zip};
use crate::error::ErrorCode;
use crate::test::test_matter;
use crate::transport::network::mdns::{DottedName, MdnsRemoteService};
use crate::transport::network::MatterRemoteService;
fn op_service() -> MatterRemoteService {
MatterRemoteService::Operational {
compressed_fabric_id: 0x1122,
node_id: 0x3344,
}
}
#[test]
fn resolve_rendezvous_delivers_answer() {
let matter = test_matter();
let service = op_service();
let resolved = block_on(async {
let resolver = matter.transport().resolve(service.clone(), 5_000);
let responder = async {
let picked = matter.transport().wait_mdns_resolve_request().await;
assert_eq!(picked, service);
let mut name = heapless::String::<128>::new();
service.instance_name(&mut name);
let answer = MdnsRemoteService {
instance_name: DottedName(name.as_str()),
port: Some(1234),
addrs: [IpAddr::V4(Ipv4Addr::new(10, 0, 0, 5))].into_iter(),
txt: [("SII", "300"), ("SAI", "4000"), ("SAT", "5000")].into_iter(),
scope_id: 0,
};
matter.transport().try_deposit_mdns_resolve(&answer);
};
let (node, ()) = zip(resolver, responder).await;
node
})
.unwrap();
assert_eq!(
resolved.addr,
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 5)), 1234)
);
assert_eq!(resolved.sii, Some(300));
assert_eq!(resolved.sai, Some(4000));
assert_eq!(resolved.sat, Some(5000));
}
#[test]
fn resolve_times_out_and_releases_slot() {
let matter = test_matter();
let err = block_on(matter.transport().resolve(op_service(), 50)).unwrap_err();
assert!(matches!(err.code(), ErrorCode::NotFound));
let err = block_on(matter.transport().resolve(op_service(), 50)).unwrap_err();
assert!(matches!(err.code(), ErrorCode::NotFound));
}
#[test]
fn deposit_without_request_is_noop() {
let matter = test_matter();
let mut name = heapless::String::<128>::new();
op_service().instance_name(&mut name);
let answer = MdnsRemoteService {
instance_name: DottedName(name.as_str()),
port: Some(1234),
addrs: [IpAddr::V4(Ipv4Addr::new(10, 0, 0, 9))].into_iter(),
txt: core::iter::empty::<(&str, &str)>(),
scope_id: 0,
};
matter.transport().try_deposit_mdns_resolve(&answer);
let err = block_on(matter.transport().resolve(op_service(), 50)).unwrap_err();
assert!(matches!(err.code(), ErrorCode::NotFound));
}
}
#[cfg(test)]
mod browse_tests {
use core::net::{IpAddr, Ipv4Addr, SocketAddr};
use futures_lite::future::{block_on, zip};
use crate::error::ErrorCode;
use crate::test::test_matter;
use crate::transport::network::mdns::{CommissionableFilter, DottedName, MdnsRemoteService};
use crate::transport::network::Address;
#[test]
fn browse_rendezvous_delivers_first_match() {
let matter = test_matter();
let filter = CommissionableFilter {
discriminator: Some(0xA5A),
vendor_id: Some(0xFFF1),
..Default::default()
};
let found = block_on(async {
let browser = matter
.transport()
.browse_commissionable(&filter, &[], 5_000);
let responder = async {
let picked = matter.transport().wait_mdns_browse_request().await;
assert_eq!(picked, filter);
let other = MdnsRemoteService {
instance_name: DottedName("0000000000000001._matterc._udp.local"),
port: Some(5540),
addrs: [IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1))].into_iter(),
txt: [("D", "2650"), ("VP", "9999+1"), ("CM", "1")].into_iter(),
scope_id: 0,
};
matter.transport().try_deposit_mdns_browse(&other);
let answer = MdnsRemoteService {
instance_name: DottedName("00000000ABCD1234._matterc._udp.local"),
port: Some(5541),
addrs: [IpAddr::V4(Ipv4Addr::new(10, 0, 0, 7))].into_iter(),
txt: [("D", "2650"), ("VP", "65521+42"), ("CM", "1")].into_iter(),
scope_id: 0,
};
matter.transport().try_deposit_mdns_browse(&answer);
};
let (found, ()) = zip(browser, responder).await;
found
})
.unwrap();
assert_eq!(
found,
(
Address::Udp(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(10, 0, 0, 7)),
5541
)),
0x00000000ABCD1234,
)
);
}
#[test]
fn browse_times_out_and_releases_slot() {
let matter = test_matter();
let filter = CommissionableFilter {
short_discriminator: Some(0xA),
..Default::default()
};
let err = block_on(matter.transport().browse_commissionable(&filter, &[], 50)).unwrap_err();
assert!(matches!(err.code(), ErrorCode::NotFound));
let err = block_on(matter.transport().browse_commissionable(&filter, &[], 50)).unwrap_err();
assert!(matches!(err.code(), ErrorCode::NotFound));
}
#[test]
fn browse_exclude_steps_to_next_match() {
let matter = test_matter();
let filter = CommissionableFilter {
short_discriminator: Some(0xA),
..Default::default()
};
let found = block_on(async {
let browser = matter
.transport()
.browse_commissionable(&filter, &[0x1111], 5_000);
let responder = async {
matter.transport().wait_mdns_browse_request().await;
let node_a = MdnsRemoteService {
instance_name: DottedName("0000000000001111._matterc._udp.local"),
port: Some(5540),
addrs: [IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1))].into_iter(),
txt: [("D", "2578"), ("CM", "1")].into_iter(), scope_id: 0,
};
matter.transport().try_deposit_mdns_browse(&node_a);
let node_b = MdnsRemoteService {
instance_name: DottedName("0000000000002222._matterc._udp.local"),
port: Some(5541),
addrs: [IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2))].into_iter(),
txt: [("D", "2815"), ("CM", "1")].into_iter(), scope_id: 0,
};
matter.transport().try_deposit_mdns_browse(&node_b);
};
let (found, ()) = zip(browser, responder).await;
found
})
.unwrap();
assert_eq!(
found,
(
Address::Udp(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)),
5541
)),
0x2222,
)
);
}
}