use core::num::NonZeroU8;
use embassy_time::Instant;
use crate::fabric::MAX_FABRICS;
use crate::im::{AttrId, ClusterId, EndptId, EventId, EventNumber, IMBuffer, NodeId};
use crate::utils::cell::RefCell;
use crate::utils::init::{init, Init};
use crate::utils::storage::pooled::Buffers;
use crate::utils::storage::Vec;
use crate::utils::sync::blocking::Mutex;
use crate::utils::sync::{DynBase, Notification};
pub const DEFAULT_MAX_SUBSCRIPTIONS: usize = MAX_FABRICS * 3;
pub const MAX_CHANGED_ATTRS: usize = 16;
pub struct SubscriptionsBuffers<'a, B, const N: usize = DEFAULT_MAX_SUBSCRIPTIONS>
where
B: Buffers<IMBuffer> + 'a,
{
buffers: Mutex<RefCell<SubscriptionsBuffersInner<'a, B, N>>>,
}
impl<'a, B, const N: usize> SubscriptionsBuffers<'a, B, N>
where
B: Buffers<IMBuffer> + 'a,
{
pub const fn new() -> Self {
Self {
buffers: Mutex::new(RefCell::new(Vec::new())),
}
}
pub fn init() -> impl Init<Self> {
init!(Self {
buffers <- Mutex::init(RefCell::init(Vec::init())),
})
}
fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut SubscriptionsBuffersInner<'a, B, N>) -> R,
{
self.buffers.lock(|buffers| f(&mut buffers.borrow_mut()))
}
}
impl<'a, B, const N: usize> Default for SubscriptionsBuffers<'a, B, N>
where
B: Buffers<IMBuffer> + 'a,
{
fn default() -> Self {
Self::new()
}
}
type SubscriptionsBuffersInner<'a, B, const N: usize> =
Vec<<B as Buffers<IMBuffer>>::Buffer<'a>, N>;
pub struct Subscriptions<const N: usize = DEFAULT_MAX_SUBSCRIPTIONS> {
state: Mutex<RefCell<SubscriptionsInner<N>>>,
pub(crate) notification: Notification,
}
impl<const N: usize> Subscriptions<N> {
#[inline(always)]
pub const fn new() -> Self {
Self {
state: Mutex::new(RefCell::new(SubscriptionsInner::new())),
notification: Notification::new(),
}
}
pub fn init() -> impl Init<Self> {
init!(Self {
state <- Mutex::init(RefCell::init(SubscriptionsInner::init())),
notification: Notification::new(),
})
}
pub(crate) fn notify_attr_changed(
&self,
endpoint_id: EndptId,
cluster_id: ClusterId,
attr_id: AttrId,
) {
self.state.lock(|internal| {
internal
.borrow_mut()
.changed_attrs
.record(endpoint_id, cluster_id, attr_id);
});
self.notification.notify();
}
pub(crate) fn notify_cluster_changed(&self, endpoint_id: EndptId, cluster_id: ClusterId) {
self.state.lock(|internal| {
internal
.borrow_mut()
.changed_attrs
.record_wildcard(Some(endpoint_id), Some(cluster_id));
});
self.notification.notify();
}
pub(crate) fn notify_endpoint_changed(&self, endpoint_id: EndptId) {
self.state.lock(|internal| {
internal
.borrow_mut()
.changed_attrs
.record_wildcard(Some(endpoint_id), None);
});
self.notification.notify();
}
pub(crate) fn notify_all_changed(&self) {
self.state.lock(|internal| {
internal
.borrow_mut()
.changed_attrs
.record_wildcard(None, None);
});
self.notification.notify();
}
pub fn notify_event_emitted(
&self,
_endpoint_id: EndptId,
_cluster_id: ClusterId,
_event_id: EventId,
) {
self.notification.notify();
}
pub(crate) fn clear(&self) {
self.state.lock(|state| state.borrow_mut().clear());
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn add<'a, 's, B>(
&'s self,
now: Instant,
fabric_idx: NonZeroU8,
peer_node_id: u64,
session_id: u32,
min_int_secs: u16,
max_int_secs: u16,
event_numbers_watermark: EventNumber,
buffer: B::Buffer<'a>,
buffers: &'s SubscriptionsBuffers<'a, B, N>,
) -> Option<ReportContext<'a, 's, B, N>>
where
B: Buffers<IMBuffer> + 'a,
{
let (sub, buf, next_max_seen_attr_change_id) = self.with(buffers, |state, buffers| {
let (sub, buf) = state.add::<B>(
fabric_idx,
peer_node_id,
session_id,
min_int_secs,
max_int_secs,
buffer,
buffers,
)?;
Some((sub, buf, state.changed_attrs.watermark()))
})?;
Some(ReportContext {
subscriptions: self,
subscriptions_buffers: buffers,
subscription: Some(sub),
subscription_buffer: Some(buf),
next_max_seen_attr_change_id,
next_max_seen_event_number: event_numbers_watermark,
next_reported_at: now,
keep: false,
})
}
pub(crate) fn remove<B, F>(&self, buffers: &SubscriptionsBuffers<'_, B, N>, mut f: F) -> bool
where
B: Buffers<IMBuffer>,
F: FnMut(&Subscription) -> Option<&'static str>,
{
let removed = self.with(buffers, |state, buffers| {
let mut removed = false;
loop {
let next = state
.subscriptions
.iter()
.enumerate()
.filter_map(|(index, subscription)| {
f(subscription).map(|reason| (index, subscription.ids().clone(), reason))
})
.next();
let Some((index, ids, reason)) = next else {
break;
};
state.subscriptions.swap_remove(index);
buffers.swap_remove(index);
state.subscriptions_count -= 1;
info!("Removed subscription {:?}, reason: {}", ids, reason);
removed = true;
}
if state.reporting_cancelled.is_none() {
if let Some(sub) = state.reporting.as_ref() {
if let Some(reason) = f(sub) {
info!(
"Marked in-flight subscription {:?} for removal, reason: {}",
sub.ids(),
reason
);
state.reporting_cancelled = Some(reason);
removed = true;
}
}
}
removed
});
if removed {
self.notification.notify();
}
removed
}
pub(crate) fn report<'a, 's, B>(
&'s self,
now: Instant,
event_numbers_watermark: EventNumber,
buffers: &'s SubscriptionsBuffers<'a, B, N>,
) -> Option<ReportContext<'a, 's, B, N>>
where
B: Buffers<IMBuffer> + 'a,
{
let (sub, buf, next_max_seen_attr_change_id) = self.with(buffers, |state, buffers| {
let (sub, buf) = state.report::<B>(now, event_numbers_watermark, buffers)?;
let attr_change_ids_watermark = state.changed_attrs.watermark();
debug!("About to report on subscription {:?}, details: max_seen_attr_change_id: {}, max_seen_event_number: {}, attr_change_ids_watermark: {}, event_numbers_watermark: {}", sub.ids(), sub.max_seen_attr_change_id, sub.max_seen_event_number, attr_change_ids_watermark, event_numbers_watermark);
Some((sub, buf, attr_change_ids_watermark))
})?;
Some(ReportContext {
subscriptions: self,
subscriptions_buffers: buffers,
subscription: Some(sub),
subscription_buffer: Some(buf),
next_max_seen_attr_change_id,
next_max_seen_event_number: event_numbers_watermark,
next_reported_at: now,
keep: false,
})
}
pub(crate) fn next_report_at<'a, B>(
&self,
event_numbers_watermark: EventNumber,
buffers: &SubscriptionsBuffers<'a, B, N>,
) -> Instant
where
B: Buffers<IMBuffer> + 'a,
{
self.with(buffers, |state, buffers| {
state.next_report_at::<B>(event_numbers_watermark, buffers)
})
}
pub(crate) fn purge_reported_changes(&self) {
self.state
.lock(|state| state.borrow_mut().purge_reported_changes())
}
fn report_complete<'a, B>(&self, report: &mut ReportContext<'a, '_, B, N>)
where
B: Buffers<IMBuffer> + 'a,
{
let mut sub = unwrap!(report.subscription.take());
let buf = unwrap!(report.subscription_buffer.take());
sub.max_seen_attr_change_id = report.next_max_seen_attr_change_id;
sub.max_seen_event_number = report.next_max_seen_event_number;
sub.reported_at = report.next_reported_at;
let keep = report.keep;
self.with(report.subscriptions_buffers, |state, buffers| {
state.report_complete::<B>(sub, buf, buffers, keep)
})
}
fn with<'a, B, F, R>(&self, buffers: &SubscriptionsBuffers<'a, B, N>, f: F) -> R
where
B: Buffers<IMBuffer> + 'a,
F: FnOnce(&mut SubscriptionsInner<N>, &mut SubscriptionsBuffersInner<'a, B, N>) -> R,
{
self.state.lock(|state| {
let mut state = state.borrow_mut();
buffers.with(|buffers| f(&mut state, buffers))
})
}
}
impl<const N: usize> Default for Subscriptions<N> {
fn default() -> Self {
Self::new()
}
}
impl<const N: usize> DynBase for Subscriptions<N> {}
struct SubscriptionsInner<const N: usize> {
next_subscription_id: u32,
subscriptions_count: usize,
subscriptions: Vec<Subscription, N>,
changed_attrs: ChangedAttrs,
reporting: Option<Subscription>,
reporting_cancelled: Option<&'static str>,
}
impl<const N: usize> SubscriptionsInner<N> {
#[inline(always)]
const fn new() -> Self {
Self {
next_subscription_id: 1,
subscriptions_count: 0,
subscriptions: Vec::new(),
changed_attrs: ChangedAttrs::new(),
reporting: None,
reporting_cancelled: None,
}
}
fn init() -> impl Init<Self> {
init!(Self {
next_subscription_id: 1,
subscriptions_count: 0,
subscriptions <- Vec::init(),
changed_attrs <- ChangedAttrs::init(),
reporting: None,
reporting_cancelled: None,
})
}
fn clear(&mut self) {
self.subscriptions.clear();
self.subscriptions_count = 0;
if self.reporting.is_some() {
self.reporting_cancelled = Some("subscriptions cleared");
self.subscriptions_count = 1;
}
}
#[allow(clippy::too_many_arguments)]
fn add<'a, B>(
&mut self,
fab_idx: NonZeroU8,
peer_node_id: u64,
session_id: u32,
min_int_secs: u16,
max_int_secs: u16,
buffer: B::Buffer<'a>,
_buffers: &mut SubscriptionsBuffersInner<'a, B, N>,
) -> Option<(Subscription, B::Buffer<'a>)>
where
B: Buffers<IMBuffer> + 'a,
{
if self.subscriptions_count >= N {
return None;
}
self.subscriptions_count += 1;
let id = self.next_subscription_id;
self.next_subscription_id += 1;
let max_seen_attr_change_id = self.changed_attrs.watermark();
let subscription = Subscription {
ids: SubscriptionIds {
id,
fab_idx,
peer_node_id,
},
session_id,
min_int_secs,
max_int_secs,
reported_at: Instant::MAX,
max_seen_attr_change_id,
max_seen_event_number: 0,
};
info!("Added subscription {:?}", subscription.ids());
Some((subscription, buffer))
}
fn report<'a, B>(
&mut self,
now: Instant,
event_numbers_watermark: EventNumber,
buffers: &mut SubscriptionsBuffersInner<'a, B, N>,
) -> Option<(Subscription, B::Buffer<'a>)>
where
B: Buffers<IMBuffer> + 'a,
{
debug_assert!(self.reporting.is_none());
debug_assert!(self.reporting_cancelled.is_none());
if let Some(index) = self.find_reportable::<B>(now, event_numbers_watermark, buffers) {
let sub = self.subscriptions.swap_remove(index);
let buf = buffers.swap_remove(index);
debug!("About to report on subscription {:?}", sub.ids());
self.reporting = Some(sub.clone());
Some((sub, buf))
} else {
None
}
}
fn report_complete<'a, B>(
&mut self,
sub: Subscription,
buffer: B::Buffer<'a>,
buffers: &mut SubscriptionsBuffersInner<'a, B, N>,
keep: bool,
) where
B: Buffers<IMBuffer> + 'a,
{
self.reporting = None;
let cancelled = self.reporting_cancelled.take();
if let Some(reason) = cancelled {
info!(
"In-flight subscription {:?} cancelled during reporting: {}",
sub.ids(),
reason
);
self.subscriptions_count -= 1;
} else if keep {
debug!("Subscription {:?} kept after reporting; max-attr-change-id: {}, max-seen-event-number: {}", sub.ids(), sub.max_seen_attr_change_id, sub.max_seen_event_number);
unwrap!(self.subscriptions.push(sub));
unwrap!(buffers.push(buffer).map_err(|_| ()));
} else {
warn!("Subscription {:?} removed during reporting", sub.ids());
self.subscriptions_count -= 1;
}
}
fn find_reportable<'a, B>(
&self,
now: Instant,
event_numbers_watermark: EventNumber,
buffers: &SubscriptionsBuffersInner<'a, B, N>,
) -> Option<usize>
where
B: Buffers<IMBuffer> + 'a,
{
self.subscriptions
.iter()
.enumerate()
.map(|(index, sub)| (sub, &buffers[index]))
.position(|(sub, rx)| {
sub.is_reportable(now, rx, &self.changed_attrs, event_numbers_watermark)
})
}
fn purge_reported_changes(&mut self) {
if let Some(min_seen_attr_change_id) = self
.subscriptions
.iter()
.map(|s| s.max_seen_attr_change_id)
.min()
{
self.changed_attrs.purge_up_to(min_seen_attr_change_id);
} else {
self.changed_attrs.clear();
}
}
fn next_report_at<'a, B>(
&self,
event_numbers_watermark: EventNumber,
buffers: &SubscriptionsBuffersInner<'a, B, N>,
) -> Instant
where
B: Buffers<IMBuffer> + 'a,
{
self.subscriptions
.iter()
.enumerate()
.map(|(index, sub)| {
sub.next_report_at(
&buffers[index],
&self.changed_attrs,
event_numbers_watermark,
)
})
.min()
.unwrap_or(Instant::MAX)
}
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct SubscriptionIds {
pub id: u32,
pub fab_idx: NonZeroU8,
pub peer_node_id: NodeId,
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub struct Subscription {
ids: SubscriptionIds,
session_id: u32,
min_int_secs: u16,
max_int_secs: u16,
reported_at: Instant,
max_seen_attr_change_id: u64,
max_seen_event_number: u64,
}
impl Subscription {
pub const fn ids(&self) -> &SubscriptionIds {
&self.ids
}
pub const fn session_id(&self) -> u32 {
self.session_id
}
pub fn is_expired(&self, now: Instant) -> bool {
self.reported_at
.checked_add(embassy_time::Duration::from_secs(self.max_int_secs as _))
.map(|expiry| expiry <= now)
.unwrap_or(false)
}
fn is_reportable(
&self,
now: Instant,
rx: &[u8],
changed_attrs: &ChangedAttrs,
event_numbers_watermark: EventNumber,
) -> bool {
if !self.is_report_allowed(now) {
return false;
}
self.is_report_due(now)
|| self.is_affected_by_attr_changes(rx, changed_attrs)
|| self.is_affected_by_new_events(rx, event_numbers_watermark)
}
fn report_allowed_at(&self) -> Instant {
self.reported_at
.checked_add(embassy_time::Duration::from_secs(self.min_int_secs as _))
.unwrap_or(Instant::MIN)
}
fn is_report_allowed(&self, now: Instant) -> bool {
self.report_allowed_at() <= now
}
fn report_due_at(&self) -> Instant {
self.reported_at
.checked_add(embassy_time::Duration::from_secs(
(self.max_int_secs - self.max_int_secs / 2) as _,
))
.unwrap_or(Instant::MIN)
}
fn is_report_due(&self, now: Instant) -> bool {
self.report_due_at() <= now
}
fn is_affected_by_attr_changes(&self, _rx: &[u8], changes: &ChangedAttrs) -> bool {
changes.any_since(self.max_seen_attr_change_id)
}
fn is_affected_by_new_events(&self, _rx: &[u8], event_numbers_watermark: EventNumber) -> bool {
self.max_seen_event_number < event_numbers_watermark
}
fn next_report_at(
&self,
rx: &[u8],
changed_attrs: &ChangedAttrs,
event_numbers_watermark: EventNumber,
) -> Instant {
let allowed_at = self.report_allowed_at();
let pending = self.is_affected_by_attr_changes(rx, changed_attrs)
|| self.is_affected_by_new_events(rx, event_numbers_watermark);
if pending {
allowed_at
} else {
allowed_at.max(self.report_due_at())
}
}
}
pub(crate) struct ChangedAttrs {
next_change_id: u64,
entries: Vec<ChangedAttr, MAX_CHANGED_ATTRS>,
}
impl ChangedAttrs {
#[inline(always)]
const fn new() -> Self {
Self {
next_change_id: 1,
entries: Vec::new(),
}
}
fn init() -> impl Init<Self> {
init!(Self {
next_change_id: 1,
entries <- Vec::init(),
})
}
#[inline]
fn watermark(&self) -> u64 {
self.next_change_id.wrapping_sub(1)
}
fn record(&mut self, endpoint: EndptId, cluster: ClusterId, attr: AttrId) -> u64 {
self.record_raw(ChangedAttr::concrete(endpoint, cluster, attr, 0))
}
fn record_wildcard(&mut self, endpoint: Option<EndptId>, cluster: Option<ClusterId>) -> u64 {
self.record_raw(ChangedAttr {
endpoint: endpoint.unwrap_or(WILDCARD_ENDPOINT),
cluster: cluster.unwrap_or(WILDCARD_CLUSTER),
attr: WILDCARD_ATTR,
change_id: 0,
})
}
fn record_raw(&mut self, mut new: ChangedAttr) -> u64 {
let change_id = self.next_change_id;
self.next_change_id = self.next_change_id.wrapping_add(1).max(1);
new.change_id = change_id;
if let Some(existing) = self.entries.iter_mut().find(|x| x.covers(&new)) {
existing.change_id = change_id;
return change_id;
}
let mut i = 0;
while i < self.entries.len() {
if new.covers(&self.entries[i]) {
self.entries.swap_remove(i);
} else {
i += 1;
}
}
if let Err(new) = self.entries.push(new) {
self.promote_and_insert(new);
}
change_id
}
fn contains_since(
&self,
endpoint: EndptId,
cluster: ClusterId,
attr: AttrId,
since: u64,
) -> bool {
self.entries
.iter()
.any(|x| x.change_id > since && x.matches(endpoint, cluster, attr))
}
fn any_since(&self, since: u64) -> bool {
self.entries.iter().any(|x| x.change_id > since)
}
fn purge_up_to(&mut self, threshold: u64) {
if threshold == 0 {
return;
}
let mut i = 0;
while i < self.entries.len() {
if self.entries[i].change_id <= threshold {
self.entries.swap_remove(i);
} else {
i += 1;
}
}
}
fn clear(&mut self) {
self.entries.clear();
}
fn promote_and_insert(&mut self, new: ChangedAttr) {
loop {
if let Some(existing) = self.entries.iter_mut().find(|x| x.covers(&new)) {
existing.change_id = new.change_id;
return;
}
if self.entries.push(new.clone()).is_ok() {
return;
}
if !self.promote_largest_group(1) && !self.promote_largest_group(2) {
self.entries.clear();
unwrap!(self.entries.push(ChangedAttr {
endpoint: WILDCARD_ENDPOINT,
cluster: WILDCARD_CLUSTER,
attr: WILDCARD_ATTR,
change_id: new.change_id,
}));
return;
}
}
}
fn promote_largest_group(&mut self, level: u8) -> bool {
let mut best_pivot: Option<ChangedAttr> = None;
let mut best_count = 1usize;
for i in 0..self.entries.len() {
let pivot = &self.entries[i];
let Some(coarsened) = pivot.coarsen(level) else {
continue;
};
let count = self.entries.iter().filter(|e| coarsened.covers(e)).count();
if count > best_count {
best_count = count;
best_pivot = Some(pivot.clone());
}
}
let Some(pivot) = best_pivot else {
return false;
};
let mut coarsened = pivot.coarsen(level).unwrap();
let mut max_change_id = 0u64;
let mut i = 0;
while i < self.entries.len() {
if coarsened.covers(&self.entries[i]) {
if self.entries[i].change_id > max_change_id {
max_change_id = self.entries[i].change_id;
}
self.entries.swap_remove(i);
} else {
i += 1;
}
}
coarsened.change_id = max_change_id;
unwrap!(self.entries.push(coarsened));
true
}
}
const WILDCARD_ENDPOINT: EndptId = EndptId::MAX;
const WILDCARD_CLUSTER: ClusterId = ClusterId::MAX;
const WILDCARD_ATTR: AttrId = AttrId::MAX;
#[derive(Clone, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
struct ChangedAttr {
endpoint: EndptId,
cluster: ClusterId,
attr: AttrId,
change_id: u64,
}
impl ChangedAttr {
const fn concrete(endpoint: EndptId, cluster: ClusterId, attr: AttrId, change_id: u64) -> Self {
Self {
endpoint,
cluster,
attr,
change_id,
}
}
#[inline]
const fn is_endpoint_wildcard(&self) -> bool {
self.endpoint == WILDCARD_ENDPOINT
}
#[inline]
const fn is_cluster_wildcard(&self) -> bool {
self.cluster == WILDCARD_CLUSTER
}
#[inline]
const fn is_attr_wildcard(&self) -> bool {
self.attr == WILDCARD_ATTR
}
fn matches(&self, endpoint: EndptId, cluster: ClusterId, attr: AttrId) -> bool {
(self.is_endpoint_wildcard() || self.endpoint == endpoint)
&& (self.is_cluster_wildcard() || self.cluster == cluster)
&& (self.is_attr_wildcard() || self.attr == attr)
}
fn covers(&self, other: &ChangedAttr) -> bool {
#[inline]
fn cov<T: Eq>(a: T, a_wild: bool, b: T, b_wild: bool) -> bool {
if a_wild {
true } else if b_wild {
false } else {
a == b
}
}
cov(
self.endpoint,
self.is_endpoint_wildcard(),
other.endpoint,
other.is_endpoint_wildcard(),
) && cov(
self.cluster,
self.is_cluster_wildcard(),
other.cluster,
other.is_cluster_wildcard(),
) && cov(
self.attr,
self.is_attr_wildcard(),
other.attr,
other.is_attr_wildcard(),
)
}
fn coarsen(&self, level: u8) -> Option<Self> {
match level {
1 => {
if self.is_endpoint_wildcard() || self.is_cluster_wildcard() {
return None;
}
Some(Self {
change_id: 0,
cluster: self.cluster,
attr: WILDCARD_ATTR,
endpoint: self.endpoint,
})
}
2 => {
if self.is_endpoint_wildcard() {
return None;
}
Some(Self {
change_id: 0,
cluster: WILDCARD_CLUSTER,
attr: WILDCARD_ATTR,
endpoint: self.endpoint,
})
}
_ => unreachable!(),
}
}
}
pub struct ReportContext<'a, 's, B, const N: usize>
where
B: Buffers<IMBuffer> + 'a,
{
subscriptions: &'s Subscriptions<N>,
subscriptions_buffers: &'s SubscriptionsBuffers<'a, B, N>,
subscription: Option<Subscription>,
subscription_buffer: Option<B::Buffer<'a>>,
next_max_seen_attr_change_id: u64,
next_max_seen_event_number: EventNumber,
next_reported_at: Instant,
keep: bool,
}
impl<'a, 's, B, const N: usize> ReportContext<'a, 's, B, N>
where
B: Buffers<IMBuffer> + 'a,
{
pub fn subscription(&self) -> &Subscription {
unwrap!(self.subscription.as_ref())
}
pub fn rx(&self) -> &[u8] {
unwrap!(self.subscription_buffer.as_ref()).as_ref()
}
pub fn should_send_if_empty(&self) -> bool {
unwrap!(self.subscription.as_ref()).is_report_due(self.next_reported_at)
}
pub fn should_report_attr(
&self,
endpoint_id: EndptId,
cluster_id: ClusterId,
attr_id: AttrId,
) -> bool {
let sub = self.subscription();
if sub.reported_at == Instant::MAX {
return true;
}
self.subscriptions.state.lock(|state| {
state.borrow().changed_attrs.contains_since(
endpoint_id,
cluster_id,
attr_id,
sub.max_seen_attr_change_id,
)
})
}
pub fn max_seen_event_number(&self) -> EventNumber {
unwrap!(self.subscription.as_ref()).max_seen_event_number
}
pub fn next_max_seen_event_number(&self) -> EventNumber {
self.next_max_seen_event_number
}
pub fn set_keep(&mut self) {
self.keep = true;
}
}
impl<'a, 's, B, const N: usize> Drop for ReportContext<'a, 's, B, N>
where
B: Buffers<IMBuffer> + 'a,
{
fn drop(&mut self) {
self.subscriptions.report_complete(self);
}
}
#[cfg(test)]
mod tests {
use crate::utils::storage::pooled::PooledBuffers;
use super::*;
use embassy_time::Duration;
type TestPool<const N: usize> = PooledBuffers<IMBuffer, N>;
#[test]
fn changed_attrs_starts_empty() {
let attrs = ChangedAttrs::new();
assert_eq!(attrs.watermark(), 0);
assert!(!attrs.any_since(0));
assert!(!attrs.contains_since(1, 2, 3, 0));
}
#[test]
fn changed_attrs_record_assigns_monotonic_ids() {
let mut attrs = ChangedAttrs::new();
let id1 = attrs.record(1, 2, 3);
let id2 = attrs.record(1, 2, 4);
let id3 = attrs.record(2, 2, 3);
assert_eq!(id1, 1);
assert_eq!(id2, 2);
assert_eq!(id3, 3);
assert_eq!(attrs.watermark(), 3);
}
#[test]
fn changed_attrs_contains_since_and_any_since() {
let mut attrs = ChangedAttrs::new();
attrs.record(1, 2, 3);
attrs.record(1, 2, 4);
assert!(attrs.any_since(0));
assert!(attrs.any_since(1));
assert!(!attrs.any_since(2));
assert!(attrs.contains_since(1, 2, 3, 0));
assert!(attrs.contains_since(1, 2, 4, 1));
assert!(!attrs.contains_since(1, 2, 3, 2));
assert!(!attrs.contains_since(9, 9, 9, 0));
}
#[test]
fn changed_attrs_duplicate_refreshes_change_id() {
let mut attrs = ChangedAttrs::new();
attrs.record(1, 2, 3);
attrs.record(1, 2, 4);
let id3 = attrs.record(1, 2, 3);
assert_eq!(id3, 3);
assert_eq!(attrs.entries.len(), 2);
assert!(attrs.contains_since(1, 2, 3, 2));
assert!(attrs.contains_since(1, 2, 3, 0));
}
#[test]
fn changed_attrs_record_wildcard_cluster_covers_every_attr() {
let mut attrs = ChangedAttrs::new();
let id = attrs.record_wildcard(Some(7), Some(42));
assert!(attrs.contains_since(7, 42, 0, 0));
assert!(attrs.contains_since(7, 42, 1, 0));
assert!(attrs.contains_since(7, 42, u32::MAX, 0));
assert!(!attrs.contains_since(7, 99, 0, 0));
assert!(!attrs.contains_since(8, 42, 0, 0));
assert_eq!(id, attrs.watermark());
}
#[test]
fn changed_attrs_record_wildcard_endpoint_covers_every_cluster() {
let mut attrs = ChangedAttrs::new();
attrs.record_wildcard(Some(5), None);
assert!(attrs.contains_since(5, 1, 1, 0));
assert!(attrs.contains_since(5, 1000, 1000, 0));
assert!(!attrs.contains_since(6, 1, 1, 0));
}
#[test]
fn changed_attrs_record_wildcard_absorbs_existing_concrete_entries() {
let mut attrs = ChangedAttrs::new();
attrs.record(1, 2, 10);
attrs.record(1, 2, 11);
attrs.record(1, 2, 12);
attrs.record(1, 3, 20);
assert_eq!(attrs.entries.len(), 4);
attrs.record_wildcard(Some(1), Some(2));
assert_eq!(attrs.entries.len(), 2);
assert!(attrs
.entries
.iter()
.any(|e| e.endpoint == 1 && e.cluster == 2 && e.is_attr_wildcard()));
assert!(attrs.contains_since(1, 3, 20, 0));
}
#[test]
fn changed_attrs_record_wildcard_is_refreshed_when_already_covered() {
let mut attrs = ChangedAttrs::new();
attrs.record_wildcard(Some(1), None);
let before_len = attrs.entries.len();
let id = attrs.record_wildcard(Some(1), Some(2));
assert_eq!(attrs.entries.len(), before_len);
assert_eq!(attrs.watermark(), id);
}
#[test]
fn changed_attrs_purge_up_to_removes_old_entries() {
let mut attrs = ChangedAttrs::new();
attrs.record(1, 2, 3); attrs.record(1, 2, 4); attrs.record(2, 2, 3);
attrs.purge_up_to(2);
assert!(!attrs.contains_since(1, 2, 3, 0));
assert!(!attrs.contains_since(1, 2, 4, 0));
assert!(attrs.contains_since(2, 2, 3, 0));
attrs.purge_up_to(0);
assert!(attrs.contains_since(2, 2, 3, 0));
}
#[test]
fn changed_attrs_clear_empties_table_but_keeps_watermark() {
let mut attrs = ChangedAttrs::new();
attrs.record(1, 2, 3);
attrs.record(1, 2, 4);
let wm_before = attrs.watermark();
attrs.clear();
assert!(!attrs.any_since(0));
assert_eq!(attrs.watermark(), wm_before);
let id = attrs.record(5, 5, 5);
assert_eq!(id, wm_before + 1);
}
#[test]
fn changed_attrs_promotion_on_overflow_same_cluster() {
let mut attrs = ChangedAttrs::new();
for attr in 0..MAX_CHANGED_ATTRS as u32 {
attrs.record(1, 2, attr);
}
assert_eq!(attrs.entries.len(), MAX_CHANGED_ATTRS);
let overflow_id = attrs.record(1, 2, 9999);
assert_eq!(overflow_id as usize, MAX_CHANGED_ATTRS + 1);
assert!(attrs.entries.len() <= MAX_CHANGED_ATTRS);
for attr in 0..MAX_CHANGED_ATTRS as u32 {
assert!(
attrs.contains_since(1, 2, attr, 0),
"attr {} lost after promotion",
attr
);
}
assert!(attrs.contains_since(1, 2, 9999, 0));
assert!(attrs.contains_since(1, 2, 9999, MAX_CHANGED_ATTRS as u64));
}
#[test]
fn changed_attrs_promotion_to_global_wildcard() {
let mut attrs = ChangedAttrs::new();
for i in 0..(MAX_CHANGED_ATTRS as u16 + 5) {
attrs.record(i, i as u32, i as u32);
}
assert!(attrs.entries.len() <= MAX_CHANGED_ATTRS);
for i in 0..(MAX_CHANGED_ATTRS as u16 + 5) {
assert!(attrs.contains_since(i, i as u32, i as u32, 0));
}
assert!(attrs.any_since(0));
}
#[test]
fn promotion_prefers_largest_level_1_group() {
let mut attrs = ChangedAttrs::new();
for attr in 0..10u32 {
attrs.record(1, 1, attr);
}
for cluster in 2..=6u32 {
attrs.record(1, cluster, 0);
}
attrs.record(1, 1, 100);
assert_eq!(attrs.entries.len(), MAX_CHANGED_ATTRS);
attrs.record(2, 2, 2);
assert!(attrs.entries.len() <= MAX_CHANGED_ATTRS);
let wild_11 = attrs
.entries
.iter()
.filter(|e| e.endpoint == 1 && e.cluster == 1 && e.is_attr_wildcard())
.count();
assert_eq!(wild_11, 1);
let concrete_11 = attrs
.entries
.iter()
.filter(|e| e.endpoint == 1 && e.cluster == 1 && !e.is_attr_wildcard())
.count();
assert_eq!(concrete_11, 0);
for cluster in 2..=6u32 {
let n = attrs
.entries
.iter()
.filter(|e| e.endpoint == 1 && e.cluster == cluster && e.attr == 0)
.count();
assert_eq!(n, 1, "singleton (1, {}, 0) should remain concrete", cluster);
}
assert!(attrs
.entries
.iter()
.any(|e| e.endpoint == 2 && e.cluster == 2 && e.attr == 2));
for attr in 0..10u32 {
assert!(attrs.contains_since(1, 1, attr, 0));
}
for cluster in 2..=6u32 {
assert!(attrs.contains_since(1, cluster, 0, 0));
}
assert!(attrs.contains_since(1, 1, 100, 0));
assert!(attrs.contains_since(2, 2, 2, 0));
}
#[test]
fn promotion_is_minimal_only_one_group_collapsed_per_overflow() {
let mut attrs = ChangedAttrs::new();
for attr in 0..8u32 {
attrs.record(1, 1, attr);
}
for attr in 0..8u32 {
attrs.record(2, 2, attr);
}
assert_eq!(attrs.entries.len(), MAX_CHANGED_ATTRS);
attrs.record(9, 9, 9);
let a_wild = attrs
.entries
.iter()
.any(|e| e.endpoint == 1 && e.cluster == 1 && e.is_attr_wildcard());
let b_wild = attrs
.entries
.iter()
.any(|e| e.endpoint == 2 && e.cluster == 2 && e.is_attr_wildcard());
assert!(
a_wild ^ b_wild,
"expected exactly one of the groups to be collapsed (A: {}, B: {})",
a_wild,
b_wild
);
let a_concrete = attrs
.entries
.iter()
.filter(|e| e.endpoint == 1 && e.cluster == 1 && !e.is_attr_wildcard())
.count();
let b_concrete = attrs
.entries
.iter()
.filter(|e| e.endpoint == 2 && e.cluster == 2 && !e.is_attr_wildcard())
.count();
assert!(
(a_wild && a_concrete == 0 && b_concrete == 8)
|| (b_wild && b_concrete == 0 && a_concrete == 8)
);
}
#[test]
fn promotion_falls_back_to_level_2_when_no_level_1_group() {
let mut attrs = ChangedAttrs::new();
for cluster in 0..8u32 {
attrs.record(1, cluster, 0);
}
for cluster in 0..8u32 {
attrs.record(2, cluster, 0);
}
assert_eq!(attrs.entries.len(), MAX_CHANGED_ATTRS);
attrs.record(3, 9, 9);
assert!(attrs.entries.len() <= MAX_CHANGED_ATTRS);
let lvl1_wild = attrs
.entries
.iter()
.filter(|e| {
!e.is_endpoint_wildcard() && !e.is_cluster_wildcard() && e.is_attr_wildcard()
})
.count();
assert_eq!(lvl1_wild, 0);
let ep1_wild = attrs
.entries
.iter()
.any(|e| e.endpoint == 1 && e.is_cluster_wildcard() && e.is_attr_wildcard());
let ep2_wild = attrs
.entries
.iter()
.any(|e| e.endpoint == 2 && e.is_cluster_wildcard() && e.is_attr_wildcard());
assert!(ep1_wild ^ ep2_wild);
assert!(!attrs
.entries
.iter()
.any(|e| e.is_endpoint_wildcard() && e.is_cluster_wildcard() && e.is_attr_wildcard()));
for cluster in 0..8u32 {
assert!(attrs.contains_since(1, cluster, 0, 0));
assert!(attrs.contains_since(2, cluster, 0, 0));
}
assert!(attrs.contains_since(3, 9, 9, 0));
}
#[test]
fn promotion_falls_back_to_global_only_when_no_lower_group() {
let mut attrs = ChangedAttrs::new();
for i in 0..MAX_CHANGED_ATTRS as u16 {
attrs.record(i, i as u32, i as u32);
}
assert_eq!(attrs.entries.len(), MAX_CHANGED_ATTRS);
attrs.record(100, 200, 300);
assert_eq!(attrs.entries.len(), 1);
let only = &attrs.entries[0];
assert!(
only.is_endpoint_wildcard() && only.is_cluster_wildcard() && only.is_attr_wildcard()
);
for i in 0..MAX_CHANGED_ATTRS as u16 {
assert!(attrs.contains_since(i, i as u32, i as u32, 0));
}
assert!(attrs.contains_since(100, 200, 300, 0));
}
#[test]
fn promotion_preserves_max_change_id_in_coarsened_entry() {
let mut attrs = ChangedAttrs::new();
for attr in 0..MAX_CHANGED_ATTRS as u32 {
attrs.record(1, 1, attr);
}
let max_before = attrs.watermark();
attrs.record(2, 2, 2);
let wild = attrs
.entries
.iter()
.find(|e| e.endpoint == 1 && e.cluster == 1 && e.is_attr_wildcard())
.expect("(1, 1, *) wildcard was produced");
assert_eq!(wild.change_id, max_before);
assert!(attrs.contains_since(1, 1, 0, max_before - 1));
assert!(!attrs.contains_since(1, 1, 0, max_before));
}
#[test]
fn promotion_with_existing_wildcard_refreshes_instead_of_promoting_again() {
let mut attrs = ChangedAttrs::new();
for attr in 0..MAX_CHANGED_ATTRS as u32 {
attrs.record(1, 1, attr);
}
attrs.record(2, 2, 2);
assert_eq!(attrs.entries.len(), 2);
let wm_after_promo = attrs.watermark();
let new_id = attrs.record(1, 1, 42);
assert_eq!(attrs.entries.len(), 2);
assert_eq!(new_id, wm_after_promo + 1);
let wild = attrs
.entries
.iter()
.find(|e| e.endpoint == 1 && e.cluster == 1 && e.is_attr_wildcard())
.unwrap();
assert_eq!(wild.change_id, new_id);
}
#[test]
fn promotion_capacity_invariant_under_sustained_churn() {
let mut attrs = ChangedAttrs::new();
for i in 0..1000u32 {
let endpoint = (i % 7) as u16;
let cluster = i % 13;
let attr = i;
attrs.record(endpoint, cluster, attr);
assert!(
attrs.entries.len() <= MAX_CHANGED_ATTRS,
"capacity exceeded at i={}",
i
);
assert!(
attrs.contains_since(endpoint, cluster, attr, 0),
"just-recorded triple lost at i={}",
i
);
}
}
#[test]
fn promotion_iterated_into_same_existing_wildcard() {
let mut attrs = ChangedAttrs::new();
for attr in 0..MAX_CHANGED_ATTRS as u32 {
attrs.record(1, 1, attr);
}
attrs.record(2, 2, 2); assert_eq!(attrs.entries.len(), 2);
for attr in 100..200u32 {
attrs.record(1, 1, attr);
assert_eq!(attrs.entries.len(), 2);
}
}
#[test]
fn promotion_escalates_when_level_1_group_still_insufficient() {
let mut attrs = ChangedAttrs::new();
attrs.record(1, 1, 0);
attrs.record(1, 1, 1);
for i in 0..(MAX_CHANGED_ATTRS as u16 - 2) {
attrs.record(10 + i, 100 + i as u32, i as u32);
}
assert_eq!(attrs.entries.len(), MAX_CHANGED_ATTRS);
attrs.record(50, 50, 50);
assert!(attrs.entries.len() <= MAX_CHANGED_ATTRS);
assert!(attrs
.entries
.iter()
.any(|e| e.endpoint == 1 && e.cluster == 1 && e.is_attr_wildcard()));
for i in 0..200u32 {
let endpoint = 200 + (i % 5) as u16;
let cluster = 300 + (i % 3);
let attr = i;
attrs.record(endpoint, cluster, attr);
assert!(attrs.entries.len() <= MAX_CHANGED_ATTRS);
assert!(attrs.contains_since(endpoint, cluster, attr, 0));
}
assert!(attrs.contains_since(1, 1, 0, 0));
assert!(attrs.contains_since(1, 1, 1, 0));
assert!(attrs.contains_since(50, 50, 50, 0));
}
#[test]
fn changed_attr_covers_wildcards() {
let concrete = ChangedAttr::concrete(1, 2, 3, 1);
let any_attr = ChangedAttr {
endpoint: 1,
cluster: 2,
attr: WILDCARD_ATTR,
change_id: 1,
};
let any_cluster = ChangedAttr {
endpoint: 1,
cluster: WILDCARD_CLUSTER,
attr: WILDCARD_ATTR,
change_id: 1,
};
let global = ChangedAttr {
endpoint: WILDCARD_ENDPOINT,
cluster: WILDCARD_CLUSTER,
attr: WILDCARD_ATTR,
change_id: 1,
};
assert!(any_attr.covers(&concrete));
assert!(any_cluster.covers(&concrete));
assert!(global.covers(&concrete));
assert!(!concrete.covers(&any_attr));
assert!(!concrete.covers(&global));
assert!(concrete.matches(1, 2, 3));
assert!(!concrete.matches(1, 2, 4));
assert!(any_attr.matches(1, 2, 99));
assert!(!any_attr.matches(1, 9, 99));
assert!(global.matches(99, 99, 99));
}
#[test]
fn changed_attr_size_is_compact() {
assert_eq!(core::mem::size_of::<ChangedAttr>(), 24);
}
fn fab(i: u8) -> NonZeroU8 {
NonZeroU8::new(i).unwrap()
}
#[test]
fn add_returns_monotonic_ids_and_rejects_when_full() {
let subs: Subscriptions<2> = Subscriptions::new();
let pool = TestPool::<3>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<3>, 2> = SubscriptionsBuffers::new();
let now = Instant::now();
let rctx1 = subs
.add(
now,
fab(1),
10,
100,
1,
60,
0,
pool.get_immediate().unwrap(),
&subs_bufs,
)
.unwrap();
let rctx2 = subs
.add(
now,
fab(1),
10,
100,
1,
60,
0,
pool.get_immediate().unwrap(),
&subs_bufs,
)
.unwrap();
assert_eq!(rctx1.subscription().ids().id, 1);
assert_eq!(rctx2.subscription().ids().id, 2);
assert!(subs
.add(
now,
fab(1),
10,
100,
1,
60,
0,
pool.get_immediate().unwrap(),
&subs_bufs
)
.is_none());
}
#[test]
fn begin_report_snapshots_watermark_and_pending() {
let subs: Subscriptions<2> = Subscriptions::new();
let pool = TestPool::<3>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<3>, 2> = SubscriptionsBuffers::new();
let now = Instant::now();
subs.notify_attr_changed(1, 2, 3);
{
let mut rctx = subs
.add(
now,
fab(1),
10,
100,
1,
60,
0,
pool.get_immediate().unwrap(),
&subs_bufs,
)
.unwrap();
assert!(rctx.should_send_if_empty());
assert!(rctx.should_report_attr(1, 2, 3));
assert!(rctx.should_report_attr(42, 55555, 1234556677));
rctx.set_keep();
}
subs.notify_attr_changed(1, 2, 4);
let later = now + Duration::from_secs(2);
let rctx = subs.report(later, 0, &subs_bufs).unwrap();
assert!(!rctx.should_send_if_empty());
assert!(!rctx.should_report_attr(1, 2, 3));
assert!(rctx.should_report_attr(1, 2, 4));
}
#[allow(clippy::too_many_arguments)]
fn add_sub<'a, 's, const N: usize, const B: usize>(
subs: &'s Subscriptions<N>,
subs_bufs: &'s SubscriptionsBuffers<'a, TestPool<B>, N>,
pool: &'a TestPool<B>,
now: Instant,
fab_idx: u8,
peer_node_id: u64,
min_int: u16,
max_int: u16,
) -> ReportContext<'a, 's, TestPool<B>, N>
where
'a: 's,
{
subs.add(
now,
fab(fab_idx),
peer_node_id,
0,
min_int,
max_int,
0,
pool.get_immediate().unwrap(),
subs_bufs,
)
.unwrap()
}
#[test]
fn priming_report_context_is_report_due_and_keeps_sub() {
let subs: Subscriptions<1> = Subscriptions::new();
let pool = TestPool::<2>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 1> = SubscriptionsBuffers::new();
let now = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 10, 1, 60);
assert!(rctx.should_send_if_empty());
assert_eq!(rctx.max_seen_event_number(), 0);
rctx.set_keep();
}
assert!(subs.report(now, 0, &subs_bufs).is_none());
}
#[test]
fn report_without_keep_frees_the_slot() {
let subs: Subscriptions<1> = Subscriptions::new();
let pool = TestPool::<2>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 1> = SubscriptionsBuffers::new();
let now = Instant::now();
drop(add_sub(&subs, &subs_bufs, &pool, now, 1, 10, 1, 60));
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 11, 1, 60);
assert_eq!(rctx.subscription().ids().id, 2);
rctx.set_keep();
}
#[test]
fn report_with_keep_advances_reported_at_and_watermark() {
let subs: Subscriptions<1> = Subscriptions::new();
let pool = TestPool::<2>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 1> = SubscriptionsBuffers::new();
let now = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 10, 1, 60);
rctx.set_keep();
}
subs.notify_attr_changed(1, 2, 3);
assert!(subs.report(now, 0, &subs_bufs).is_none());
let later = now + Duration::from_secs(2);
{
let mut rctx = subs.report(later, 0, &subs_bufs).unwrap();
assert!(rctx.should_report_attr(1, 2, 3));
assert!(!rctx.should_report_attr(9, 9, 9));
rctx.set_keep();
}
assert!(subs.report(later, 0, &subs_bufs).is_none());
}
#[test]
fn report_triggered_by_new_events() {
let subs: Subscriptions<1> = Subscriptions::new();
let pool = TestPool::<2>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 1> = SubscriptionsBuffers::new();
let now = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 10, 1, 60);
rctx.set_keep();
}
assert!(subs.report(now, 0, &subs_bufs).is_none());
let later = now + Duration::from_secs(2);
assert!(subs.report(later, 0, &subs_bufs).is_none());
{
let mut rctx = subs.report(later, 5, &subs_bufs).unwrap();
assert_eq!(rctx.max_seen_event_number(), 0);
assert_eq!(rctx.next_max_seen_event_number(), 5);
rctx.set_keep();
}
assert!(subs.report(later, 5, &subs_bufs).is_none());
let even_later = later + Duration::from_secs(2);
{
let mut rctx = subs.report(even_later, 6, &subs_bufs).unwrap();
assert_eq!(rctx.max_seen_event_number(), 5);
assert_eq!(rctx.next_max_seen_event_number(), 6);
rctx.set_keep();
}
}
#[test]
fn report_triggered_by_liveness_deadline() {
let subs: Subscriptions<1> = Subscriptions::new();
let pool = TestPool::<2>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 1> = SubscriptionsBuffers::new();
let now = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 10, 1, 20);
rctx.set_keep();
}
let short = now + Duration::from_secs(5);
assert!(subs.report(short, 0, &subs_bufs).is_none());
let long = now + Duration::from_secs(11);
{
let mut rctx = subs.report(long, 0, &subs_bufs).unwrap();
assert!(rctx.should_send_if_empty());
rctx.set_keep();
}
}
#[test]
fn next_report_at_max_when_empty() {
let subs: Subscriptions<1> = Subscriptions::new();
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 1> = SubscriptionsBuffers::new();
assert_eq!(subs.next_report_at(0, &subs_bufs), Instant::MAX);
}
#[test]
fn next_report_at_liveness_when_idle() {
let subs: Subscriptions<1> = Subscriptions::new();
let pool = TestPool::<2>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 1> = SubscriptionsBuffers::new();
let now = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 10, 1, 60);
rctx.set_keep();
}
assert_eq!(
subs.next_report_at(0, &subs_bufs),
now + Duration::from_secs(30)
);
}
#[test]
fn next_report_at_quiet_period_for_pending_attr_change() {
let subs: Subscriptions<1> = Subscriptions::new();
let pool = TestPool::<2>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 1> = SubscriptionsBuffers::new();
let now = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 10, 5, 60);
rctx.set_keep();
}
subs.notify_attr_changed(1, 2, 3);
assert!(subs.report(now, 0, &subs_bufs).is_none());
assert_eq!(
subs.next_report_at(0, &subs_bufs),
now + Duration::from_secs(5)
);
}
#[test]
fn next_report_at_quiet_period_for_pending_event() {
let subs: Subscriptions<1> = Subscriptions::new();
let pool = TestPool::<2>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 1> = SubscriptionsBuffers::new();
let now = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 10, 5, 60);
rctx.set_keep();
}
assert_eq!(
subs.next_report_at(7, &subs_bufs),
now + Duration::from_secs(5)
);
}
#[test]
fn next_report_at_clamps_liveness_to_min_interval() {
let subs: Subscriptions<1> = Subscriptions::new();
let pool = TestPool::<2>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 1> = SubscriptionsBuffers::new();
let now = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 10, 25, 40);
rctx.set_keep();
}
assert_eq!(
subs.next_report_at(0, &subs_bufs),
now + Duration::from_secs(25)
);
assert!(subs
.report(now + Duration::from_secs(24), 0, &subs_bufs)
.is_none());
assert!(subs
.report(now + Duration::from_secs(25), 0, &subs_bufs)
.is_some());
}
#[test]
fn next_report_at_returns_earliest_across_subs() {
let subs: Subscriptions<2> = Subscriptions::new();
let pool = TestPool::<3>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<3>, 2> = SubscriptionsBuffers::new();
let now = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 10, 1, 60);
rctx.set_keep();
}
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 2, 11, 1, 40);
rctx.set_keep();
}
assert_eq!(
subs.next_report_at(0, &subs_bufs),
now + Duration::from_secs(20)
);
}
#[test]
fn subscription_added_notification_wakes_reporter_to_recompute_deadline() {
use core::pin::pin;
use embassy_futures::select::{select, Either};
let subs: Subscriptions<2> = Subscriptions::new();
let pool = TestPool::<3>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<3>, 2> = SubscriptionsBuffers::new();
let now = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 10, 1, 60);
rctx.set_keep();
}
assert_eq!(
subs.next_report_at(0, &subs_bufs),
now + Duration::from_secs(30)
);
let waiter = pin!(subs.notification.wait());
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 11, 1, 10);
rctx.set_keep();
}
subs.notification.notify();
let notified = embassy_futures::block_on(async {
match select(waiter, pin!(core::future::ready(()))).await {
Either::First(_) => true,
Either::Second(_) => false,
}
});
assert!(notified);
assert_eq!(
subs.next_report_at(0, &subs_bufs),
now + Duration::from_secs(5)
);
}
#[test]
fn is_expired_uses_max_int() {
let subs: Subscriptions<1> = Subscriptions::new();
let pool = TestPool::<2>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 1> = SubscriptionsBuffers::new();
let base = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, base, 1, 10, 1, 5);
rctx.set_keep();
}
let before = base + Duration::from_secs(2);
assert!(!subs.remove(&subs_bufs, |sub| sub
.is_expired(before)
.then_some("expired")));
let after = base + Duration::from_secs(10);
assert!(subs.remove(&subs_bufs, |sub| sub.is_expired(after).then_some("expired")));
}
#[test]
fn remove_invokes_predicate_and_frees_slots() {
let subs: Subscriptions<3> = Subscriptions::new();
let pool = TestPool::<4>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<4>, 3> = SubscriptionsBuffers::new();
let now = Instant::now();
for peer in [100_u64, 101, 102] {
let fab_idx = if peer == 102 { 2 } else { 1 };
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, fab_idx, peer, 1, 60);
rctx.set_keep();
}
assert!(subs
.add(
now,
fab(1),
200,
0,
1,
60,
0,
pool.get_immediate().unwrap(),
&subs_bufs
)
.is_none());
let mut seen_peers: std::vec::Vec<u64> = std::vec::Vec::new();
let removed = subs.remove(&subs_bufs, |sub| {
if sub.ids().fab_idx == fab(1) {
seen_peers.push(sub.ids().peer_node_id);
Some("fabric 1 removed")
} else {
None
}
});
assert!(removed);
seen_peers.sort();
assert_eq!(seen_peers, std::vec![100_u64, 101]);
assert!(!subs.remove(&subs_bufs, |sub| (sub.ids().fab_idx == fab(1))
.then_some("fabric 1 removed")));
{
let mut r1 = add_sub(&subs, &subs_bufs, &pool, now, 3, 300, 1, 60);
r1.set_keep();
let mut r2 = add_sub(&subs, &subs_bufs, &pool, now, 3, 301, 1, 60);
r2.set_keep();
}
assert!(subs
.add(
now,
fab(3),
302,
0,
1,
60,
0,
pool.get_immediate().unwrap(),
&subs_bufs
)
.is_none());
}
#[test]
fn remove_on_empty_table_returns_false() {
let subs: Subscriptions<2> = Subscriptions::new();
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 2> = SubscriptionsBuffers::new();
assert!(!subs.remove(&subs_bufs, |_| Some("never called on empty")));
}
#[test]
fn remove_cancels_in_flight_subscription() {
let subs: Subscriptions<2> = Subscriptions::new();
let pool = TestPool::<3>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<3>, 2> = SubscriptionsBuffers::new();
let now = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 100, 1, 60);
rctx.set_keep();
}
assert_eq!(subs.state.lock(|s| s.borrow().subscriptions_count), 1);
subs.notify_attr_changed(1, 2, 3);
let later = now + Duration::from_secs(2);
{
let mut rctx = subs.report(later, 0, &subs_bufs).unwrap();
let mut matched_peers: std::vec::Vec<u64> = std::vec::Vec::new();
let removed = subs.remove(&subs_bufs, |sub| {
matched_peers.push(sub.ids().peer_node_id);
(sub.ids().peer_node_id == 100).then_some("test-cancel")
});
assert!(removed);
assert!(matched_peers.contains(&100));
rctx.set_keep();
}
subs.state.lock(|s| {
let s = s.borrow();
assert_eq!(s.subscriptions_count, 0);
assert!(s.subscriptions.is_empty());
assert!(s.reporting.is_none());
assert!(s.reporting_cancelled.is_none());
});
let mut r = add_sub(&subs, &subs_bufs, &pool, now, 1, 101, 1, 60);
r.set_keep();
}
#[test]
fn remove_not_matching_in_flight_leaves_it_intact() {
let subs: Subscriptions<2> = Subscriptions::new();
let pool = TestPool::<3>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<3>, 2> = SubscriptionsBuffers::new();
let now = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 100, 1, 60);
rctx.set_keep();
}
subs.notify_attr_changed(1, 2, 3);
let later = now + Duration::from_secs(2);
{
let mut rctx = subs.report(later, 0, &subs_bufs).unwrap();
let removed = subs.remove(&subs_bufs, |sub| {
(sub.ids().peer_node_id == 999).then_some("no-match")
});
assert!(!removed);
rctx.set_keep();
}
subs.state.lock(|s| {
let s = s.borrow();
assert_eq!(s.subscriptions_count, 1);
assert_eq!(s.subscriptions.len(), 1);
assert!(s.reporting.is_none());
assert!(s.reporting_cancelled.is_none());
});
}
#[test]
fn purge_reported_changes_keeps_entries_until_all_subs_catch_up() {
let subs: Subscriptions<2> = Subscriptions::new();
let pool = TestPool::<3>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<3>, 2> = SubscriptionsBuffers::new();
let base = Instant::now();
{
let mut r1 = add_sub(&subs, &subs_bufs, &pool, base, 1, 100, 1, 60);
r1.set_keep();
let mut r2 = add_sub(&subs, &subs_bufs, &pool, base, 1, 101, 1, 60);
r2.set_keep();
}
subs.notify_attr_changed(1, 2, 3); subs.notify_attr_changed(1, 2, 4);
let later = base + Duration::from_secs(2);
for _ in 0..2 {
let mut rctx = subs.report(later, 0, &subs_bufs).unwrap();
assert!(rctx.should_report_attr(1, 2, 3));
assert!(rctx.should_report_attr(1, 2, 4));
rctx.set_keep();
}
subs.purge_reported_changes();
assert!(subs.report(later, 0, &subs_bufs).is_none());
subs.notify_attr_changed(5, 6, 7);
let even_later = later + Duration::from_secs(2);
{
let mut rctx = subs.report(even_later, 0, &subs_bufs).unwrap();
assert!(rctx.should_report_attr(5, 6, 7));
assert!(!rctx.should_report_attr(1, 2, 3));
rctx.set_keep();
}
}
#[test]
fn next_max_seen_event_number_captured_at_report_time() {
let subs: Subscriptions<1> = Subscriptions::new();
let pool = TestPool::<2>::new();
let subs_bufs: SubscriptionsBuffers<TestPool<2>, 1> = SubscriptionsBuffers::new();
let now = Instant::now();
{
let mut rctx = add_sub(&subs, &subs_bufs, &pool, now, 1, 10, 1, 60);
assert_eq!(rctx.max_seen_event_number(), 0);
assert_eq!(rctx.next_max_seen_event_number(), 0);
rctx.set_keep();
}
let later = now + Duration::from_secs(2);
{
let mut rctx = subs.report(later, 7, &subs_bufs).unwrap();
assert_eq!(rctx.max_seen_event_number(), 0);
assert_eq!(rctx.next_max_seen_event_number(), 7);
rctx.set_keep();
}
assert!(subs.report(later, 7, &subs_bufs).is_none());
let even_later = later + Duration::from_secs(2);
{
let mut rctx = subs.report(even_later, 42, &subs_bufs).unwrap();
assert_eq!(rctx.max_seen_event_number(), 7);
assert_eq!(rctx.next_max_seen_event_number(), 42);
rctx.set_keep();
}
}
}