use core::fmt::{self, Display};
use core::num::NonZeroU8;
use core::pin::pin;
use either::Either as EitherIo;
use embassy_futures::select::{select, select3, Either, Either3};
use embassy_time::{Duration, Instant, Timer};
use crate::acl::Accessor;
use crate::bdx::{self, PROTO_ID_BDX};
use crate::crypto::Crypto;
use crate::dm::NodeId;
use crate::error::{Error, ErrorCode};
use crate::im::{self, PROTO_ID_INTERACTION_MODEL};
use crate::sc::{self, PROTO_ID_SECURE_CHANNEL};
use crate::transport::session::Sessions;
use crate::transport::TxPayloadState;
use crate::utils::storage::pooled::{PooledBuffers, DEFAULT_BUFFER_POOL_SIZE};
use crate::utils::storage::WriteBuf;
use crate::{Matter, MatterState};
use super::mrp::{ReliableMessage, RetransEntry};
use super::network;
use super::packet::PacketHdr;
use super::plain_hdr::PlainHdr;
use super::proto_hdr::ProtoHdr;
use super::session::{Session, SessionMode};
use super::{PacketAccess, MAX_RX_BUF_SIZE, MAX_TX_BUF_SIZE};
#[cfg(feature = "large-buffers")]
pub const MAX_EXCHANGE_RX_BUF_SIZE: usize = network::MAX_RX_LARGE_PACKET_SIZE;
#[cfg(not(feature = "large-buffers"))]
pub const MAX_EXCHANGE_RX_BUF_SIZE: usize = network::MAX_RX_PACKET_SIZE;
#[cfg(feature = "large-buffers")]
pub const MAX_EXCHANGE_TX_BUF_SIZE: usize =
network::MAX_TX_LARGE_PACKET_SIZE - PacketHdr::HDR_RESERVE - PacketHdr::TAIL_RESERVE;
#[cfg(not(feature = "large-buffers"))]
pub const MAX_EXCHANGE_TX_BUF_SIZE: usize =
network::MAX_TX_PACKET_SIZE - PacketHdr::HDR_RESERVE - PacketHdr::TAIL_RESERVE;
pub type Buffer = crate::utils::storage::Vec<u8, MAX_EXCHANGE_RX_BUF_SIZE>;
pub type MatterBuffers<const N: usize = DEFAULT_BUFFER_POOL_SIZE> = PooledBuffers<Buffer, N>;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct ExchangeId(u32);
impl ExchangeId {
pub(crate) fn new(session_id: u32, exchange_index: usize) -> Self {
if session_id > 0x0fff_ffff {
panic!("Session ID out of range");
}
if exchange_index >= 16 {
panic!("Exchange index out of range");
}
Self(((exchange_index as u32) << 28) | session_id)
}
pub(crate) fn session_id(&self) -> u32 {
self.0 & 0x0fff_ffff
}
pub(crate) fn exchange_index(&self) -> usize {
(self.0 >> 28) as _
}
pub(crate) fn session<'a>(&self, sessions: &'a mut Sessions) -> &'a mut Session {
unwrap!(sessions.get(self.session_id()))
}
pub(crate) fn exch<'a>(&self, session: &'a mut Session) -> &'a mut ExchangeState {
unwrap!(session.exchanges[self.exchange_index()].as_mut())
}
pub(crate) fn display<'a>(&'a self, session: &'a Session) -> ExchangeIdDisplay<'a> {
ExchangeIdDisplay { id: self, session }
}
async fn recv<'a>(&self, matter: &'a Matter<'a>) -> Result<RxMessage<'a>, Error> {
self.check_no_pending_retrans(matter)?;
loop {
let mut recv = pin!(matter.transport().get_if_rx(|packet| {
if packet.buf.is_empty() {
false
} else {
let for_us = self.with_state(matter, |state| {
let sess = self.session(&mut state.sessions);
if sess.is_for_rx(&packet.peer, &packet.header.plain) {
let exch = self.exch(sess);
return Ok(exch.is_for_rx(&packet.header.proto));
}
Ok(false)
});
for_us.unwrap_or(true)
}
}));
let mut session_removed = pin!(matter.transport().wait_session_removed());
let mut timeout = pin!(Timer::after(Duration::from_millis(
RetransEntry::new(matter.dev_det().sai, 0).max_delay_ms() * 3 / 2
)));
match select3(&mut recv, &mut session_removed, &mut timeout).await {
Either3::First(mut packet) => {
packet.clear_on_drop(true);
self.check_no_pending_retrans(matter)?;
break Ok(RxMessage(packet));
}
Either3::Second(_) => {
self.with_state(matter, |_| Ok(()))?;
continue;
}
Either3::Third(_) => {
Err(ErrorCode::RxTimeout)?;
}
};
}
}
async fn init_send<'a>(&self, matter: &'a Matter<'a>) -> Result<TxMessage<'a>, Error> {
self.with_state(matter, |_| Ok(()))?;
let mut packet = matter
.transport
.get_if_tx(|packet| {
packet.buf.is_empty() || self.with_state(matter, |_| Ok(())).is_err()
})
.await;
unwrap!(packet.buf.resize_default(MAX_TX_BUF_SIZE));
packet.clear_on_drop(true);
let tx = TxMessage {
exchange_id: *self,
matter,
packet,
};
self.with_state(matter, |_| Ok(()))?;
Ok(tx)
}
async fn wait_tx<'a>(&self, matter: &'a Matter<'a>) -> Result<TxOutcome, Error> {
if let Some(delay) = self.retrans_delay_ms(matter)? {
let expired = unwrap!(Instant::now().checked_add(Duration::from_millis(delay)));
loop {
let mut notification = pin!(self.internal_wait_ack(matter));
let mut session_removed = pin!(matter.transport().wait_session_removed());
let mut timer = pin!(Timer::at(expired));
if !matches!(
select3(&mut notification, &mut session_removed, &mut timer).await,
Either3::Second(_)
) {
break;
}
self.with_state(matter, |_| Ok(()))?;
}
if self.retrans_delay_ms(matter)?.is_some() {
Ok(TxOutcome::Retransmit)
} else {
Ok(TxOutcome::Done)
}
} else {
Ok(TxOutcome::Done)
}
}
fn accessor<'a>(&self, matter: &'a Matter<'a>) -> Result<Accessor<'a>, Error> {
self.with_state(matter, |state| {
let sess = self.session(&mut state.sessions);
Ok(Accessor::for_session(sess, matter))
})
}
fn with_state<'a, F, T>(&self, matter: &'a Matter<'a>, f: F) -> Result<T, Error>
where
F: FnOnce(&mut MatterState) -> Result<T, Error>,
{
self.with_state_ex(matter, f)
}
fn with_state_ex<'a, F, T, E>(&self, matter: &'a Matter<'a>, f: F) -> Result<T, E>
where
F: FnOnce(&mut MatterState) -> Result<T, E>,
E: From<Error>,
{
matter.with_state(|state| {
if state.sessions.get(self.session_id()).is_some() {
f(state)
} else {
warn!("Exchange {}: No session", self);
Err(Error::from(ErrorCode::NoSession).into())
}
})
}
async fn internal_wait_ack<'a>(&self, matter: &'a Matter<'a>) -> Result<(), Error> {
matter
.transport
.get_if_rx(|_| {
self.retrans_delay_ms(matter)
.map(|retrans| retrans.is_none())
.unwrap_or(true)
})
.await;
self.with_state(matter, |_| Ok(()))
}
fn retrans_delay_ms<'a>(&self, matter: &'a Matter<'a>) -> Result<Option<u64>, Error> {
self.with_state(matter, |state| {
let sess = self.session(&mut state.sessions);
let exch = self.exch(sess);
let mut jitter_rand = [0; 1];
jitter_rand[0] = 100;
Ok(exch.retrans_delay_ms(jitter_rand[0]))
})
}
fn check_no_pending_retrans<'a>(&self, matter: &'a Matter<'a>) -> Result<(), Error> {
self.with_state(matter, |state| {
let sess = self.session(&mut state.sessions);
let exch = self.exch(sess);
if exch.mrp.is_retrans_pending() {
error!("Exchange {}: Retransmission pending", self.display(sess));
Err(ErrorCode::InvalidState)?;
}
Ok(())
})
}
fn pending_retrans<'a>(&self, matter: &'a Matter<'a>) -> Result<bool, Error> {
Ok(self.retrans_delay_ms(matter)?.is_some())
}
fn pending_ack<'a>(&self, matter: &'a Matter<'a>) -> Result<bool, Error> {
self.with_state(matter, |state| {
let sess = self.session(&mut state.sessions);
let exch = self.exch(sess);
Ok(exch.mrp.is_ack_pending())
})
}
}
impl Display for ExchangeId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}::{}", self.session_id(), self.exchange_index())
}
}
#[cfg(feature = "defmt")]
impl defmt::Format for ExchangeId {
fn format(&self, f: defmt::Formatter<'_>) {
defmt::write!(f, "{}::{}", self.session_id(), self.exchange_index())
}
}
pub struct ExchangeIdDisplay<'a> {
id: &'a ExchangeId,
session: &'a Session,
}
impl Display for ExchangeIdDisplay<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.session.exchanges[self.id.exchange_index()].as_ref();
if let Some(state) = state {
write!(
f,
"{} [SID:{:x},RSID:{:x},EID:{:x}]",
self.id,
self.session.get_local_sess_id(),
self.session.get_peer_sess_id(),
state.exch_id
)
} else {
write!(f, "{}???", self.id)
}
}
}
#[cfg(feature = "defmt")]
impl defmt::Format for ExchangeIdDisplay<'_> {
fn format(&self, f: defmt::Formatter<'_>) {
let state = self.session.exchanges[self.id.exchange_index()].as_ref();
if let Some(state) = state {
defmt::write!(
f,
"{} [SID:{:x},RSID:{:x},EID:{:x}]",
self.id,
self.session.get_local_sess_id(),
self.session.get_peer_sess_id(),
state.exch_id
)
} else {
defmt::write!(f, "{}???", self.id)
}
}
}
#[derive(Debug, PartialEq, Eq, Copy, Clone, Default)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub(crate) enum InitiatorState {
#[default]
Owned,
Dropped,
}
#[derive(Debug, PartialEq, Eq, Copy, Clone, Default)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub(crate) enum ResponderState {
#[default]
AcceptPending,
Owned,
Dropped,
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub(crate) enum Role {
Initiator(InitiatorState),
Responder(ResponderState),
}
impl Role {
pub fn is_dropped_state(&self) -> bool {
match self {
Self::Initiator(state) => *state == InitiatorState::Dropped,
Self::Responder(state) => *state == ResponderState::Dropped,
}
}
pub fn set_dropped_state(&mut self) {
match self {
Self::Initiator(state) => *state = InitiatorState::Dropped,
Self::Responder(state) => *state = ResponderState::Dropped,
}
}
}
#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub(crate) struct ExchangeState {
pub(crate) exch_id: u16,
pub(crate) role: Role,
pub(crate) mrp: ReliableMessage,
}
impl ExchangeState {
pub fn is_for_rx(&self, rx_proto: &ProtoHdr) -> bool {
self.exch_id == rx_proto.exch_id
&& rx_proto.is_initiator() == matches!(self.role, Role::Responder(_))
}
pub fn post_recv(&mut self, rx_plain: &PlainHdr, rx_proto: &ProtoHdr) -> Result<(), Error> {
self.mrp.post_recv(rx_plain, rx_proto)?;
Ok(())
}
pub fn pre_send(
&mut self,
tx_plain: &PlainHdr,
tx_proto: &mut ProtoHdr,
session_active_interval_ms: Option<u32>,
session_idle_interval_ms: Option<u32>,
) -> Result<(), Error> {
if matches!(self.role, Role::Initiator(_)) {
tx_proto.set_initiator();
} else {
tx_proto.unset_initiator();
}
tx_proto.exch_id = self.exch_id;
self.mrp.pre_send(
tx_plain,
tx_proto,
session_active_interval_ms,
session_idle_interval_ms,
)
}
pub fn retrans_delay_ms(&mut self, jitter_rand: u8) -> Option<u64> {
self.mrp
.retrans
.as_ref()
.map(|retrans| retrans.delay_ms(jitter_rand))
}
}
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub struct MessageMeta {
pub proto_id: u16,
pub proto_opcode: u8,
pub reliable: bool,
}
impl MessageMeta {
pub const fn new(proto_id: u16, proto_opcode: u8, reliable: bool) -> Self {
Self {
proto_id,
proto_opcode,
reliable,
}
}
pub fn opcode<T: num::FromPrimitive>(&self) -> Result<T, Error> {
num::FromPrimitive::from_u8(self.proto_opcode).ok_or(ErrorCode::InvalidOpcode.into())
}
pub fn check_opcode<T: num::FromPrimitive + PartialEq>(&self, opcode: T) -> Result<(), Error> {
if self.opcode::<T>()? == opcode {
Ok(())
} else {
Err(ErrorCode::Invalid.into())
}
}
pub fn from(proto: &ProtoHdr) -> Self {
Self {
proto_id: proto.proto_id,
proto_opcode: proto.proto_opcode,
reliable: proto.is_reliable(),
}
}
pub fn set_into(&self, proto: &mut ProtoHdr) {
proto.proto_id = self.proto_id;
proto.proto_opcode = self.proto_opcode;
proto.set_vendor(None);
if self.reliable {
proto.set_reliable();
} else {
proto.unset_reliable();
}
}
pub fn reliable(self, reliable: bool) -> Self {
Self { reliable, ..self }
}
pub(crate) fn is_tlv(&self) -> bool {
match self.proto_id {
PROTO_ID_SECURE_CHANNEL => self
.opcode::<sc::OpCode>()
.ok()
.map(|op| op.is_tlv())
.unwrap_or(false),
PROTO_ID_INTERACTION_MODEL => self
.opcode::<im::OpCode>()
.ok()
.map(|op| op.is_tlv())
.unwrap_or(false),
_ => false,
}
}
pub(crate) fn is_standalone_ack(&self) -> bool {
self.proto_id == PROTO_ID_SECURE_CHANNEL
&& self.proto_opcode == sc::OpCode::MRPStandAloneAck as u8
}
pub(crate) fn is_sc_status(&self) -> bool {
self.proto_id == PROTO_ID_SECURE_CHANNEL
&& self.proto_opcode == sc::OpCode::StatusReport as u8
}
pub(crate) fn is_new_session(&self) -> bool {
self.proto_id == PROTO_ID_SECURE_CHANNEL
&& (self.proto_opcode == sc::OpCode::PBKDFParamRequest as u8
|| self.proto_opcode == sc::OpCode::CASESigma1 as u8)
}
pub(crate) fn is_new_exchange(&self) -> bool {
!self.is_standalone_ack() && !self.is_sc_status()
}
}
impl Display for MessageMeta {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.proto_id {
PROTO_ID_SECURE_CHANNEL => {
if let Ok(opcode) = self.opcode::<sc::OpCode>() {
write!(f, "SC::{:?}", opcode)
} else {
write!(f, "SC::{:02x}", self.proto_opcode)
}
}
PROTO_ID_INTERACTION_MODEL => {
if let Ok(opcode) = self.opcode::<im::OpCode>() {
write!(f, "IM::{:?}", opcode)
} else {
write!(f, "IM::{:02x}", self.proto_opcode)
}
}
PROTO_ID_BDX => {
if let Ok(opcode) = self.opcode::<bdx::OpCode>() {
write!(f, "BDX::{:?}", opcode)
} else {
write!(f, "BDX::{:02x}", self.proto_opcode)
}
}
_ => write!(f, "{:02x}::{:02x}", self.proto_id, self.proto_opcode),
}
}
}
#[cfg(feature = "defmt")]
impl defmt::Format for MessageMeta {
fn format(&self, f: defmt::Formatter<'_>) {
match self.proto_id {
PROTO_ID_SECURE_CHANNEL => {
if let Ok(opcode) = self.opcode::<sc::OpCode>() {
defmt::write!(f, "SC::{:?}", opcode)
} else {
defmt::write!(f, "SC::{:02x}", self.proto_opcode)
}
}
PROTO_ID_INTERACTION_MODEL => {
if let Ok(opcode) = self.opcode::<im::OpCode>() {
defmt::write!(f, "IM::{:?}", opcode)
} else {
defmt::write!(f, "IM::{:02x}", self.proto_opcode)
}
}
PROTO_ID_BDX => {
if let Ok(opcode) = self.opcode::<bdx::OpCode>() {
defmt::write!(f, "BDX::{:?}", opcode)
} else {
defmt::write!(f, "BDX::{:02x}", self.proto_opcode)
}
}
_ => defmt::write!(f, "{:02x}::{:02x}", self.proto_id, self.proto_opcode),
}
}
}
pub struct RxMessage<'a>(PacketAccess<'a, MAX_RX_BUF_SIZE>);
impl RxMessage<'_> {
pub fn meta(&self) -> MessageMeta {
MessageMeta::from(&self.0.header.proto)
}
pub fn payload(&self) -> &[u8] {
&self.0.buf[self.0.payload_start..]
}
}
pub struct TxMessage<'a> {
exchange_id: ExchangeId,
matter: &'a Matter<'a>,
packet: PacketAccess<'a, MAX_TX_BUF_SIZE>,
}
impl TxMessage<'_> {
pub fn payload(&mut self) -> &mut [u8] {
&mut self.packet.buf[PacketHdr::HDR_RESERVE..MAX_TX_BUF_SIZE - PacketHdr::TAIL_RESERVE]
}
pub fn complete<M>(
mut self,
payload_start: usize,
payload_end: usize,
meta: M,
) -> Result<(), Error>
where
M: Into<MessageMeta>,
{
if payload_start > payload_end
|| payload_end - payload_start
> MAX_TX_BUF_SIZE - PacketHdr::HDR_RESERVE - PacketHdr::TAIL_RESERVE
{
Err(ErrorCode::Invalid)?;
}
let meta: MessageMeta = meta.into();
self.packet.header.reset();
meta.set_into(&mut self.packet.header.proto);
self.matter.with_state(|state| {
let session = state
.sessions
.get(self.exchange_id.session_id())
.ok_or(ErrorCode::NoSession)?;
let (peer, retransmission) = session.pre_send(
Some(self.exchange_id.exchange_index()),
&mut self.packet.header,
Some(session.get_peer_active_interval_ms()),
Some(session.get_peer_idle_interval_ms()),
)?;
self.packet.peer = peer;
self.packet.payload_start = PacketHdr::HDR_RESERVE + payload_start;
self.packet
.buf
.truncate(PacketHdr::HDR_RESERVE + payload_end);
self.packet.tx_info.payload_state = TxPayloadState::NotEncoded {
session_id: session.id,
};
self.packet.tx_info.retransmission = retransmission;
self.packet.clear_on_drop(false);
Ok(())
})
}
}
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum TxOutcome {
Done,
Retransmit,
}
impl TxOutcome {
pub const fn is_done(&self) -> bool {
matches!(self, Self::Done)
}
}
pub struct SenderTx<'a, 'b> {
sender: &'b mut Sender<'a>,
message: TxMessage<'a>,
}
impl SenderTx<'_, '_> {
pub fn split(&mut self) -> (&Exchange<'_>, &mut [u8]) {
(self.sender.exchange, self.message.payload())
}
pub fn payload(&mut self) -> &mut [u8] {
self.message.payload()
}
pub fn complete(
self,
payload_start: usize,
payload_end: usize,
meta: MessageMeta,
) -> Result<(), Error> {
self.message.complete(payload_start, payload_end, meta)?;
self.sender.initial = false;
Ok(())
}
}
pub struct Sender<'a> {
exchange: &'a Exchange<'a>,
initial: bool,
complete: bool,
}
impl<'a> Sender<'a> {
fn new(exchange: &'a Exchange<'a>) -> Result<Self, Error> {
exchange.id.check_no_pending_retrans(exchange.matter)?;
Ok(Self {
exchange,
initial: true,
complete: false,
})
}
pub async fn tx(&mut self) -> Result<Option<SenderTx<'a, '_>>, Error> {
trace!(
"Sender::tx called, initial={}, complete={}",
self.initial,
self.complete
);
if self.complete {
trace!("Sender::tx - already complete, returning None");
return Ok(None);
}
if !self.initial {
trace!("Sender::tx - not initial, calling wait_tx");
let outcome = self.exchange.id.wait_tx(self.exchange.matter).await?;
trace!("Sender::tx - wait_tx returned {:?}", outcome);
if outcome.is_done() {
self.complete = true;
trace!("Sender::tx - ACK received, returning None");
return Ok(None);
}
trace!("Sender::tx - need to retransmit");
}
let id = self.exchange.id;
let matter = self.exchange.matter;
trace!("Sender::tx - calling init_send");
let tx = id.init_send(matter).await?;
trace!("Sender::tx - init_send returned");
if self.initial || id.pending_retrans(matter)? {
trace!("Sender::tx - returning Some(SenderTx)");
Ok(Some(SenderTx {
sender: self,
message: tx,
}))
} else {
self.complete = true;
trace!("Sender::tx - no pending retrans, returning None");
Ok(None)
}
}
}
pub struct OwnedSenderTx<'a> {
exchange: Exchange<'a>,
message: TxMessage<'a>,
}
impl<'a> OwnedSenderTx<'a> {
pub fn split(&mut self) -> (&Exchange<'_>, &mut [u8]) {
(&self.exchange, self.message.payload())
}
pub fn payload(&mut self) -> &mut [u8] {
self.message.payload()
}
pub fn complete(
self,
payload_start: usize,
payload_end: usize,
meta: MessageMeta,
) -> Result<OwnedSender<'a>, Error> {
self.message.complete(payload_start, payload_end, meta)?;
Ok(OwnedSender {
exchange: self.exchange,
initial: false,
complete: false,
})
}
}
pub struct OwnedSender<'a> {
exchange: Exchange<'a>,
initial: bool,
complete: bool,
}
impl<'a> OwnedSender<'a> {
fn new(exchange: Exchange<'a>) -> Result<Self, Error> {
exchange.id.check_no_pending_retrans(exchange.matter)?;
Ok(Self {
exchange,
initial: true,
complete: false,
})
}
pub async fn tx(mut self) -> Result<EitherIo<OwnedSenderTx<'a>, Exchange<'a>>, Error> {
trace!(
"OwnedSender::tx called, initial={}, complete={}",
self.initial,
self.complete
);
if self.complete {
trace!("OwnedSender::tx - already complete, returning exchange");
return Ok(EitherIo::Right(self.exchange));
}
if !self.initial {
trace!("OwnedSender::tx - not initial, calling wait_tx");
let outcome = self.exchange.id.wait_tx(self.exchange.matter).await?;
trace!("OwnedSender::tx - wait_tx returned {:?}", outcome);
if outcome.is_done() {
self.complete = true;
trace!("OwnedSender::tx - ACK received, returning exchange");
return Ok(EitherIo::Right(self.exchange));
}
trace!("OwnedSender::tx - need to retransmit");
}
let id = self.exchange.id;
let matter = self.exchange.matter;
trace!("OwnedSender::tx - calling init_send");
let tx = id.init_send(matter).await?;
trace!("OwnedSender::tx - init_send returned");
if self.initial || id.pending_retrans(matter)? {
trace!("OwnedSender::tx - returning Left(OwnedSenderTx)");
Ok(EitherIo::Left(OwnedSenderTx {
exchange: self.exchange,
message: tx,
}))
} else {
trace!("OwnedSender::tx - no pending retrans, returning exchange");
Ok(EitherIo::Right(self.exchange))
}
}
}
pub struct Exchange<'a> {
id: ExchangeId,
matter: &'a Matter<'a>,
rx: Option<RxMessage<'a>>,
}
impl<'a> Exchange<'a> {
pub(crate) const fn new(id: ExchangeId, matter: &'a Matter<'a>) -> Self {
Self {
id,
matter,
rx: None,
}
}
pub fn id(&self) -> ExchangeId {
self.id
}
pub fn matter(&self) -> &'a Matter<'a> {
self.matter
}
#[inline(always)]
pub async fn initiate<C: Crypto>(
matter: &'a Matter<'a>,
crypto: C,
fabric_idx: NonZeroU8,
peer_node_id: NodeId,
) -> Result<Self, Error> {
matter
.transport
.initiate(matter, crypto, fabric_idx, peer_node_id)
.await
}
#[inline(always)]
pub async fn initiate_pase<C: Crypto>(
matter: &'a Matter<'a>,
crypto: C,
peer_addr: network::Address,
passcode: u32,
) -> Result<Self, Error> {
matter
.transport
.initiate_pase(matter, crypto, peer_addr, passcode)
.await
}
#[inline(always)]
pub fn initiate_for_session(matter: &'a Matter<'a>, session_id: u32) -> Result<Self, Error> {
matter.transport().initiate_for_session(matter, session_id)
}
#[inline(always)]
pub async fn initiate_unsecured<C: Crypto>(
matter: &'a Matter<'a>,
crypto: C,
peer_addr: network::Address,
) -> Result<Self, Error> {
matter
.transport
.initiate_plaintext(matter, crypto, peer_addr)
.await
}
#[inline(always)]
pub async fn accept(matter: &'a Matter<'a>) -> Result<Self, Error> {
Self::accept_after(matter, 0).await
}
pub async fn accept_after(
matter: &'a Matter<'a>,
received_timeout_ms: u32,
) -> Result<Self, Error> {
if received_timeout_ms > 0 {
loop {
let mut accept = pin!(matter.transport().accept_if(matter, |_, exch, _| {
exch.mrp.has_rx_timed_out(received_timeout_ms as _)
}));
let mut timer = pin!(Timer::after(embassy_time::Duration::from_millis(
received_timeout_ms as u64
)));
if let Either::First(exchange) = select(&mut accept, &mut timer).await {
break exchange;
}
}
} else {
matter.transport().accept_if(matter, |_, _, _| true).await
}
}
#[inline(always)]
pub async fn recv(&mut self) -> Result<RxMessage<'_>, Error> {
self.recv_fetch().await?;
self.rx.take().ok_or(ErrorCode::InvalidState.into())
}
#[inline(always)]
pub async fn recv_into(&mut self, wb: &mut WriteBuf<'_>) -> Result<MessageMeta, Error> {
let rx = self.recv().await?;
wb.reset();
wb.append(rx.payload())?;
Ok(rx.meta())
}
#[inline(always)]
pub async fn recv_fetch(&mut self) -> Result<&RxMessage<'a>, Error> {
if self.rx.is_none() {
let rx = self.id.recv(self.matter).await?;
self.rx = Some(rx);
}
self.rx()
}
#[inline(always)]
pub fn rx(&self) -> Result<&RxMessage<'a>, Error> {
self.rx.as_ref().ok_or(ErrorCode::InvalidState.into())
}
#[inline(always)]
pub fn rx_done(&mut self) -> Result<(), Error> {
self.rx = None;
Ok(())
}
#[inline(always)]
pub async fn init_send(&mut self) -> Result<TxMessage<'_>, Error> {
self.rx = None;
self.id.init_send(self.matter).await
}
#[inline(always)]
pub async fn wait_tx(&mut self) -> Result<TxOutcome, Error> {
self.rx = None;
self.id.wait_tx(self.matter).await
}
pub fn pending_retrans(&self) -> Result<bool, Error> {
self.id.pending_retrans(self.matter)
}
pub fn pending_ack(&self) -> Result<bool, Error> {
self.id.pending_ack(self.matter)
}
#[inline(always)]
pub async fn acknowledge(&mut self) -> Result<(), Error> {
if self.pending_ack()? {
let tx = self.id.init_send(self.matter).await?;
if self.pending_ack()? {
tx.complete::<MessageMeta>(0, 0, sc::OpCode::MRPStandAloneAck.into())?;
}
}
Ok(())
}
pub fn sender(&mut self) -> Result<Sender<'_>, Error> {
self.rx = None;
Sender::new(self)
}
pub fn into_sender(mut self) -> Result<OwnedSender<'a>, Error> {
self.rx = None;
OwnedSender::new(self)
}
pub async fn send_with<F>(&mut self, mut f: F) -> Result<(), Error>
where
F: FnMut(&Exchange, &mut WriteBuf) -> Result<Option<MessageMeta>, Error>,
{
let mut sender = self.sender()?;
while let Some(mut tx) = sender.tx().await? {
let (exchange, payload) = tx.split();
let mut wb = WriteBuf::new(payload);
if let Some(meta) = f(exchange, &mut wb)? {
let payload_start = wb.get_start();
let payload_end = wb.get_tail();
tx.complete(payload_start, payload_end, meta)?;
} else {
break;
}
}
Ok(())
}
pub async fn send<M>(&mut self, meta: M, payload: &[u8]) -> Result<(), Error>
where
M: Into<MessageMeta>,
{
let meta = meta.into();
self.send_with(|_, wb| {
wb.append(payload)?;
Ok(Some(meta))
})
.await
}
pub(crate) fn accessor(&self) -> Result<Accessor<'a>, Error> {
self.id.accessor(self.matter)
}
pub fn is_groupcast(&self) -> Result<bool, Error> {
self.with_state(|state| {
Ok(matches!(
self.id().session(&mut state.sessions).get_session_mode(),
SessionMode::Group { .. }
))
})
}
pub(crate) fn with_state<F, T>(&self, f: F) -> Result<T, Error>
where
F: FnOnce(&mut MatterState) -> Result<T, Error>,
{
self.id.with_state(self.matter, f)
}
pub(crate) fn with_state_ex<F, T, E>(&self, f: F) -> Result<T, E>
where
F: FnOnce(&mut MatterState) -> Result<T, E>,
E: From<Error>,
{
self.id.with_state_ex(self.matter, f)
}
}
impl Drop for Exchange<'_> {
fn drop(&mut self) {
let closed = self.with_state(|state| {
let sess = self.id().session(&mut state.sessions);
let exch_index = self.id.exchange_index();
let closed = sess.remove_exch(exch_index);
if closed {
if matches!(sess.get_session_mode(), SessionMode::Group { .. })
&& sess.exchanges.iter().all(Option::is_none)
{
state.sessions.remove(self.id.session_id());
self.matter.transport().notify_session_removed();
}
Ok(true)
} else {
Ok(false)
}
});
if !matches!(closed, Ok(true)) {
self.matter.transport().exchange_dropped.notify();
}
}
}
impl Display for Exchange<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.id)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::crypto::test_only_crypto;
use crate::dm::devices::test::{TEST_DEV_ATT, TEST_DEV_COMM, TEST_DEV_DET};
use crate::error::ErrorCode;
use crate::transport::session::SessionMode;
use crate::Matter;
use futures_lite::future::block_on;
fn test_matter() -> Matter<'static> {
Matter::new(&TEST_DEV_DET, TEST_DEV_COMM, &TEST_DEV_ATT, 0)
}
fn fill_sessions(matter: &Matter<'_>, reserved: bool) {
let dev_det = matter.dev_det();
matter.with_state(|state| loop {
if state
.sessions
.add(0, reserved, network::Address::new(), None, dev_det)
.is_err()
{
break;
}
});
}
#[test]
fn test_initiate_unsecured_creates_initiator_exchange() {
let matter = test_matter();
let crypto = test_only_crypto();
let peer = network::Address::new();
let exchange = block_on(Exchange::initiate_unsecured(&matter, &crypto, peer)).unwrap();
exchange
.with_state(|state| {
let sess = exchange.id().session(&mut state.sessions);
let exch = exchange.id().exch(sess);
assert!(matches!(exch.role, Role::Initiator(_)));
assert_eq!(sess.id, exchange.id().session_id());
assert!(!sess.is_encrypted());
assert_eq!(*sess.get_session_mode(), SessionMode::PlainText);
Ok(())
})
.unwrap();
}
#[test]
fn test_initiate_unsecured_retries_after_eviction() {
let matter = test_matter();
let crypto = test_only_crypto();
let peer = network::Address::new();
fill_sessions(&matter, false);
let exchange = block_on(Exchange::initiate_unsecured(&matter, &crypto, peer)).unwrap();
exchange
.with_state(|state| {
let sess = exchange.id().session(&mut state.sessions);
let exch = exchange.id().exch(sess);
assert!(matches!(exch.role, Role::Initiator(_)));
assert_eq!(sess.id, exchange.id().session_id());
assert!(!sess.is_encrypted());
assert_eq!(*sess.get_session_mode(), SessionMode::PlainText);
Ok(())
})
.unwrap();
}
#[test]
fn test_initiate_unsecured_fails_when_no_session_can_be_evicted() {
let matter = test_matter();
let crypto = test_only_crypto();
let peer = network::Address::new();
fill_sessions(&matter, true);
let result = block_on(Exchange::initiate_unsecured(&matter, &crypto, peer));
match result {
Err(err) => assert!(matches!(err.code(), ErrorCode::NoSpaceSessions)),
Ok(_) => panic!("expected NoSpaceSessions error"),
}
}
}