use embassy_time::Instant;
use crate::dm::clusters::basic_info::BasicInfoConfig;
use crate::error::{Error, ErrorCode};
use super::{plain_hdr::PlainHdr, proto_hdr::ProtoHdr};
const MRP_BASE_RETRY_INTERVAL_MS: u32 = 300;
const MRP_MAX_TRANSMISSIONS: u16 = 10;
const MRP_BACKOFF_THRESHOLD: u16 = 1;
const MRP_BACKOFF_BASE: (u64, u64) = (16, 10); const MRP_BACKOFF_JITTER: (u64, u64) = (25, 100); const MRP_BACKOFF_MARGIN: (u64, u64) = (11, 10); const MRP_JITTER_RAND_MAX: u8 = u8::MAX;
const MRP_DEFAULT_IDLE_INTERVAL_MS: u32 = 5000;
const MRP_DEFAULT_ACTIVE_THRESHOLD_MS: u16 = 4000;
pub fn default_peer_mrp_params(dev_det: &BasicInfoConfig<'_>) -> (u32, u32, u16) {
(
dev_det
.sai
.filter(|&v| v > 0)
.unwrap_or(MRP_BASE_RETRY_INTERVAL_MS),
dev_det
.sii
.filter(|&v| v > 0)
.unwrap_or(MRP_DEFAULT_IDLE_INTERVAL_MS),
MRP_DEFAULT_ACTIVE_THRESHOLD_MS,
)
}
#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct RetransEntry {
base_delay_interval_ms: u32,
msg_ctr: u32,
counter: u16,
}
impl RetransEntry {
pub fn new(base_delay_interval_ms: Option<u32>, msg_ctr: u32) -> Self {
let base_delay_interval_ms = base_delay_interval_ms
.filter(|&v| v > 0)
.unwrap_or(MRP_BASE_RETRY_INTERVAL_MS);
Self {
base_delay_interval_ms,
msg_ctr,
counter: 0,
}
}
pub fn get_msg_ctr(&self) -> u32 {
self.msg_ctr
}
pub fn delay_ms(&self, jitter_rand: u8) -> u64 {
self.delay_ms_counter(self.counter, jitter_rand)
}
pub fn max_delay_ms(&self) -> u64 {
self.delay_ms_counter(MRP_MAX_TRANSMISSIONS, MRP_JITTER_RAND_MAX)
}
pub fn delay_ms_counter(&self, counter: u16, jitter_rand: u8) -> u64 {
let mut delay =
self.base_delay_interval_ms as u64 * MRP_BACKOFF_MARGIN.0 / MRP_BACKOFF_MARGIN.1;
if counter > MRP_BACKOFF_THRESHOLD {
for _ in 0..counter - MRP_BACKOFF_THRESHOLD {
delay = delay * MRP_BACKOFF_BASE.0 / MRP_BACKOFF_BASE.1;
}
}
delay + (delay * jitter_rand as u64 * MRP_BACKOFF_JITTER.0) / (255 * MRP_BACKOFF_JITTER.1)
}
pub fn pre_send(&mut self, ctr: u32) -> Result<(), Error> {
if self.msg_ctr == ctr {
if self.counter < MRP_MAX_TRANSMISSIONS {
self.counter += 1;
Ok(())
} else {
Err(ErrorCode::TxTimeout.into())
}
} else {
panic!("Previous retrans entry for this exchange already exists");
}
}
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct AckEntry {
pub(crate) msg_ctr: u32,
pub(crate) acknowledged: bool,
}
impl AckEntry {
pub fn new(msg_ctr: u32) -> Result<Self, Error> {
Ok(Self {
msg_ctr,
acknowledged: false,
})
}
pub fn get_msg_ctr(&self) -> u32 {
self.msg_ctr
}
}
#[derive(Default, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct ReliableMessage {
pub(crate) retrans: Option<RetransEntry>,
pub(crate) ack: Option<AckEntry>,
pub(crate) received_at: Option<Instant>,
}
impl ReliableMessage {
pub fn new() -> Self {
Default::default()
}
pub fn is_retrans_pending(&self) -> bool {
self.retrans.is_some()
}
pub fn is_ack_pending(&self) -> bool {
self.ack
.as_ref()
.map(|ack| !ack.acknowledged)
.unwrap_or(false)
}
pub fn has_rx_timed_out(&self, timeout_ms: u64) -> bool {
self.received_at
.map(|received_at| {
let deadline =
received_at.saturating_add(embassy_time::Duration::from_millis(timeout_ms));
Instant::now() >= deadline
})
.unwrap_or(false)
}
pub fn pre_send(
&mut self,
tx_plain: &PlainHdr,
tx_proto: &mut ProtoHdr,
session_active_interval_ms: Option<u32>,
_session_idle_interval_ms: Option<u32>,
) -> Result<(), Error> {
if let Some(ack) = &mut self.ack {
tx_proto.set_ack(Some(ack.get_msg_ctr()));
ack.acknowledged = true;
}
if tx_proto.is_reliable() {
if let Some(retrans) = &mut self.retrans {
if retrans.pre_send(tx_plain.ctr).is_err() {
error!(
"Packet {}{}: Too many retransmissions. Giving up",
tx_plain, tx_proto
);
self.retrans = None;
self.ack = None;
}
} else {
self.retrans = Some(RetransEntry::new(session_active_interval_ms, tx_plain.ctr));
}
}
self.received_at = None;
Ok(())
}
pub fn post_recv(&mut self, rx_plain: &PlainHdr, rx_proto: &ProtoHdr) -> Result<(), Error> {
if let Some(ack_msg_ctr) = rx_proto.get_ack() {
if let Some(entry) = &self.retrans {
if entry.get_msg_ctr() != ack_msg_ctr {
warn!("Mismatch in retrans-table's msg counter and received msg counter: received {:x}, expected {:x}.", ack_msg_ctr, entry.msg_ctr);
Err(ErrorCode::Duplicate)?;
}
self.retrans = None;
self.ack = None;
}
}
if rx_proto.is_reliable() {
if let Some(ack) = &self.ack {
error!(
"Previous ACK entry {:x} for this exchange already exists",
ack.get_msg_ctr()
);
}
self.ack = Some(AckEntry::new(rx_plain.ctr)?);
}
self.received_at = Some(Instant::now());
Ok(())
}
}