use core::fmt::Debug;
use embassy_time::Instant;
use crate::acl::Accessor;
use crate::dm::{ClusterId, EndptId, EventId, Node};
use crate::error::{Error, ErrorCode};
use crate::im::{
EventData, EventDataTag, EventDataTimestamp, EventFilter, EventNumber, EventPath,
EventPriority, EventResp, EventRespTag,
};
use crate::persist::{KvBlobStore, KvBlobStoreAccess, Persist, EVENT_EPOCH_KEY};
use crate::tlv::{
FromTLV, TLVArray, TLVBuilderParent, TLVElement, TLVSequence, TLVSequenceIter, TLVTag,
TLVWrite, TagType, ToTLV,
};
use crate::utils::cell::RefCell;
use crate::utils::init::{init, Init};
use crate::utils::storage::WriteBuf;
use crate::utils::sync::blocking::Mutex;
pub const DEFAULT_MAX_EVENTS_BUF_SIZE: usize = 256;
pub const NO_EVENTS_BUF_SIZE: usize = 0;
pub type NoEvents = Events<NO_EVENTS_BUF_SIZE>;
const EVENT_NUMBER_EPOCH_SIZE: EventNumber = 10000;
pub struct Events<const N: usize = DEFAULT_MAX_EVENTS_BUF_SIZE> {
inner: Mutex<RefCell<EventsInner<N>>>,
}
impl<const N: usize> Events<N> {
#[inline(always)]
pub const fn new() -> Self {
Self {
inner: Mutex::new(RefCell::new(EventsInner::new())),
}
}
pub fn init() -> impl Init<Self> {
init!(Self {
inner <- Mutex::init(RefCell::init(EventsInner::init())),
})
}
pub fn reset(&mut self) {
self.inner.get_mut().borrow_mut().reset();
}
pub(crate) fn inner_mut(&mut self) -> &mut EventsInner<N> {
self.inner.get_mut().get_mut()
}
pub(crate) fn fetch<F, R>(&self, f: F) -> R
where
F: FnOnce(EventsIter<'_, N>) -> R,
{
self.inner.lock(|state| {
let state = state.borrow();
f(state.iter())
})
}
pub(crate) fn watermark(&self) -> EventNumber {
self.inner
.lock(|state| state.borrow().next_event_number.wrapping_sub(1))
}
pub fn push<S, F>(
&self,
endpoint_id: EndptId,
cluster_id: ClusterId,
event_id: EventId,
priority: EventPriority,
kv: S,
f: F,
) -> Result<EventNumber, Error>
where
S: KvBlobStoreAccess,
F: FnOnce(EventTLVWrite<'_>) -> Result<(), Error>,
{
let mut persist = Persist::new(kv);
let event_number = self.inner.lock(|state| {
let mut state = state.borrow_mut();
let event_number = state.next_event_number(&mut persist)?;
let timestamp = EventDataTimestamp::SystemTimestamp(Instant::now().as_millis());
state.push(
endpoint_id,
cluster_id,
event_id,
event_number,
priority,
timestamp,
f,
)?;
Ok::<_, Error>(event_number)
})?;
persist.run()?;
Ok(event_number)
}
}
impl<const N: usize> Default for Events<N> {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub(crate) struct EventsInner<const N: usize> {
buf_debug: EventsBuf<N>,
buf_info: EventsBuf<N>,
buf_critical: EventsBuf<N>,
next_event_number: EventNumber,
}
impl<const N: usize> EventsInner<N> {
const fn new() -> Self {
Self {
buf_debug: EventsBuf::new(),
buf_info: EventsBuf::new(),
buf_critical: EventsBuf::new(),
next_event_number: 1,
}
}
fn init() -> impl Init<Self> {
init!(Self {
buf_debug <- EventsBuf::init(),
buf_info <- EventsBuf::init(),
buf_critical <- EventsBuf::init(),
next_event_number: 1,
})
}
fn reset(&mut self) {
self.buf_debug.reset();
self.buf_info.reset();
self.buf_critical.reset();
self.next_event_number = 1;
}
pub(crate) fn reset_persist(
&mut self,
kv: &mut dyn KvBlobStore,
buf: &mut [u8],
) -> Result<(), Error> {
self.reset();
kv.remove(EVENT_EPOCH_KEY, buf)?;
info!("Removed events counter from storage");
Ok(())
}
pub(crate) fn load_persist(
&mut self,
kv: &mut dyn KvBlobStore,
buf: &mut [u8],
) -> Result<(), Error> {
self.reset();
if let Some(data) = kv.load(EVENT_EPOCH_KEY, buf)? {
self.load(data)?;
info!("Loaded events counter from storage");
}
Ok(())
}
fn load(&mut self, data: &[u8]) -> Result<(), Error> {
self.next_event_number = TLVElement::new(data).u64()?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn push<F>(
&mut self,
endpoint_id: EndptId,
cluster_id: ClusterId,
event_id: EventId,
event_number: EventNumber,
priority: EventPriority,
timestamp: EventDataTimestamp,
f: F,
) -> Result<(), Error>
where
F: FnOnce(EventTLVWrite<'_>) -> Result<(), Error>,
{
let mut event_writer = EventWriter::new(self);
let pos = event_writer.get_tail();
let result = (|| {
EventData {
path: EventPath {
endpoint: Some(endpoint_id),
cluster: Some(cluster_id),
event: Some(event_id),
..Default::default()
},
event_number,
priority,
timestamp,
data: TLVElement::new(&[]),
}
.write_preamble(&EVENT_TAG, event_writer.tw())?;
f(event_writer.tw())?;
event_writer.tw().end_container()
})();
if result.is_err() {
event_writer.rewind_to(pos);
}
result
}
fn next_event_number<S>(&mut self, persist: &mut Persist<S>) -> Result<EventNumber, Error>
where
S: KvBlobStoreAccess,
{
let event_number = self.next_event_number;
if event_number == 1 || event_number.is_multiple_of(EVENT_NUMBER_EPOCH_SIZE) {
persist.store_tlv(
EVENT_EPOCH_KEY,
if event_number == 1 {
EVENT_NUMBER_EPOCH_SIZE
} else {
event_number.wrapping_add(EVENT_NUMBER_EPOCH_SIZE).max(1)
},
)?;
}
self.next_event_number = event_number.wrapping_add(1).max(1);
Ok(event_number)
}
fn iter(&self) -> EventsIter<'_, N> {
EventsIter {
events: self,
buf_ref: EventPriority::Critical,
buf_iter: self.buf_critical.iter(),
}
}
fn buf(&self, priority: EventPriority) -> &EventsBuf<N> {
match priority {
EventPriority::Debug => &self.buf_debug,
EventPriority::Info => &self.buf_info,
EventPriority::Critical => &self.buf_critical,
}
}
fn buf_mut(&mut self, priority: EventPriority) -> &mut EventsBuf<N> {
match priority {
EventPriority::Debug => &mut self.buf_debug,
EventPriority::Info => &mut self.buf_info,
EventPriority::Critical => &mut self.buf_critical,
}
}
fn buf_and_next_mut(
&mut self,
priority: EventPriority,
) -> (&EventsBuf<N>, Option<&mut EventsBuf<N>>) {
match priority {
EventPriority::Debug => (&self.buf_debug, Some(&mut self.buf_info)),
EventPriority::Info => (&self.buf_info, Some(&mut self.buf_critical)),
EventPriority::Critical => (&self.buf_critical, None),
}
}
}
pub struct EventsIter<'a, const N: usize> {
events: &'a EventsInner<N>,
buf_ref: EventPriority,
buf_iter: TLVSequenceIter<'a>,
}
impl<'a, const N: usize> Iterator for EventsIter<'a, N> {
type Item = EventData<'a>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(res) = self.buf_iter.next() {
let event = unwrap!(
res,
"Should not have iter errors as we only put well-formed TLVs in the buffer"
);
let event = unwrap!(
EventData::from_tlv(&event),
"Should not have parsing errors as we only put well-formed TLVs in the buffer"
);
return Some(event);
}
if let Some(next_buf_ref) = self.buf_ref.prev() {
self.buf_iter = self.events.buf(next_buf_ref).iter();
self.buf_ref = next_buf_ref;
self.next()
} else {
None
}
}
}
struct EventWriter<'a, const N: usize> {
events: &'a mut EventsInner<N>,
bytes_written: usize,
}
impl<'a, const N: usize> EventWriter<'a, N> {
const OPER_BUF: EventPriority = EventPriority::Debug;
#[inline(always)]
const fn new(events: &'a mut EventsInner<N>) -> Self {
Self {
events,
bytes_written: 0,
}
}
#[inline(always)]
fn tw(&mut self) -> EventTLVWrite<'_> {
EventTLVWrite(self)
}
fn write(&mut self, byte: u8) -> Result<(), Error> {
if N == 0 {
return Ok(());
}
if self.bytes_written == N {
return Err(Error::new(ErrorCode::ResourceExhausted));
}
while self.events.buf_mut(Self::OPER_BUF).append(byte).is_err() {
self.evict(Self::OPER_BUF);
}
self.bytes_written += 1;
Ok(())
}
fn rewind_to(&mut self, bytes_written: usize) {
assert!(self.bytes_written >= bytes_written);
self.events
.buf_mut(Self::OPER_BUF)
.rewind_by(self.bytes_written - bytes_written);
self.bytes_written = bytes_written;
}
fn evict(&mut self, buf_ref: EventPriority) {
let event_len = self.events.buf(buf_ref).first_event_len();
if let Some(next_buf_ref) = buf_ref.next() {
let event_prio = self.events.buf(buf_ref).first_event_prio();
if next_buf_ref as u8 <= event_prio {
self.promote(buf_ref, next_buf_ref, event_len);
}
}
self.events.buf_mut(buf_ref).evict_first_event();
}
fn promote(&mut self, src_buf: EventPriority, dst_buf: EventPriority, event_len: usize) {
while self.events.buf(dst_buf).capacity() < event_len {
self.evict(dst_buf);
}
let (src, dst) = self.events.buf_and_next_mut(src_buf);
let dst = unwrap!(
dst,
"Dst buffer should always exist as this is checked in evict()"
);
unwrap!(
dst.append_slice(src.slice(event_len)),
"Should not overflow as eviction should have cleared space"
);
}
}
trait DynEventWriter {
fn write(&mut self, byte: u8) -> Result<(), Error>;
fn get_tail(&self) -> usize;
fn rewind_to(&mut self, pos: usize);
}
impl<'a, const N: usize> DynEventWriter for EventWriter<'a, N> {
fn write(&mut self, byte: u8) -> Result<(), Error> {
EventWriter::write(self, byte)
}
fn get_tail(&self) -> usize {
self.bytes_written
}
fn rewind_to(&mut self, pos: usize) {
EventWriter::rewind_to(self, pos)
}
}
pub struct EventTLVWrite<'a>(&'a mut dyn DynEventWriter);
impl core::fmt::Debug for EventTLVWrite<'_> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_tuple("EventTLVWrite").finish()
}
}
#[cfg(feature = "defmt")]
impl defmt::Format for EventTLVWrite<'_> {
fn format(&self, fmt: defmt::Formatter) {
defmt::write!(fmt, "EventTLVWrite")
}
}
impl TLVWrite for EventTLVWrite<'_> {
type Position = usize;
fn write(&mut self, byte: u8) -> Result<(), Error> {
self.0.write(byte)
}
fn get_tail(&self) -> Self::Position {
self.0.get_tail()
}
fn rewind_to(&mut self, pos: Self::Position) {
self.0.rewind_to(pos)
}
}
impl TLVBuilderParent for EventTLVWrite<'_> {
type Write = Self;
fn writer(&mut self) -> &mut Self::Write {
self
}
}
const EVENT_TAG: TLVTag = TLVTag::Context(EventRespTag::Data as _);
pub const EVENT_DATA_TAG: TLVTag = TLVTag::Context(EventDataTag::Data as _);
#[derive(Debug, Clone)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
struct EventsBuf<const N: usize> {
data: [u8; N],
head: usize,
}
impl<const N: usize> EventsBuf<N> {
const fn new() -> Self {
Self {
data: [0; N],
head: 0,
}
}
fn init() -> impl Init<Self> {
init!(Self {
data <- crate::utils::init::zeroed(),
head: 0,
})
}
fn reset(&mut self) {
self.head = 0;
}
fn rewind_by(&mut self, bytes_written: usize) {
assert!(self.head >= bytes_written);
self.head -= bytes_written;
}
fn slice(&self, len: usize) -> &[u8] {
assert!(self.head >= len);
&self.data[..len]
}
fn append(&mut self, byte: u8) -> Result<(), OverflowError> {
if self.capacity() == 0 {
return Err(OverflowError);
}
self.data[self.head] = byte;
self.head += 1;
Ok(())
}
fn append_slice(&mut self, data: &[u8]) -> Result<(), OverflowError> {
if self.capacity() < data.len() {
return Err(OverflowError);
}
self.data[self.head..self.head + data.len()].copy_from_slice(data);
self.head += data.len();
Ok(())
}
fn capacity(&self) -> usize {
self.data.len() - self.head
}
fn first_event_len(&self) -> usize {
assert!(self.head > 0);
unwrap!(TLVSequence(&self.data[..self.head]).container_len())
}
fn first_event_prio(&self) -> u8 {
unwrap!(unwrap!(
unwrap!(TLVElement::new(&self.data[..self.head]).structure())
.find_ctx(EventDataTag::Priority as _)
)
.u8())
}
fn evict_first_event(&mut self) {
let tlv_len = self.first_event_len();
self.data.copy_within(tlv_len..self.head, 0);
self.head -= tlv_len;
}
fn iter(&self) -> TLVSequenceIter<'_> {
TLVSequence(&self.data[0..self.head]).iter()
}
}
#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
struct OverflowError;
pub struct EventReader {
max_seen_event_number: u64,
next_max_seen_event_number: u64,
fabric_filtered: bool,
}
impl EventReader {
pub const fn new(
max_seen_event_number: u64,
next_max_seen_event_number: u64,
fabric_filtered: bool,
) -> Self {
Self {
max_seen_event_number,
next_max_seen_event_number,
fabric_filtered,
}
}
pub fn process_read(
&mut self,
event: EventData<'_>,
paths: &TLVArray<'_, EventPath>,
event_filters: &Option<TLVArray<'_, EventFilter>>,
node: &Node<'_>,
accessor: &Accessor<'_>,
tw: &mut WriteBuf<'_>,
) -> Result<bool, Error> {
let event_number = event.event_number;
if !(event_number > self.max_seen_event_number
&& event_number <= self.next_max_seen_event_number)
{
return Ok(false);
}
let tail = tw.get_tail();
let result = self.do_process_read(event, paths, event_filters, node, accessor, &mut *tw);
if result.is_err() {
tw.rewind_to(tail);
} else {
self.max_seen_event_number = event_number;
}
result
}
fn do_process_read(
&mut self,
event: EventData<'_>,
paths: &TLVArray<'_, EventPath>,
event_filters: &Option<TLVArray<'_, EventFilter>>,
node: &Node<'_>,
accessor: &Accessor<'_>,
tw: &mut WriteBuf<'_>,
) -> Result<bool, Error> {
if self.fabric_filtered && !Self::matches_fabric(&event, accessor) {
return Ok(false);
}
if Self::matches_paths(&event, paths, node, accessor)?
&& Self::matches_filters(&event, event_filters)?
&& Self::matches_access(&event, node, accessor)?
{
EventResp::Data(event).to_tlv(&TagType::Anonymous, &mut *tw)?;
Ok(true)
} else {
Ok(false)
}
}
fn matches_fabric(event: &EventData<'_>, accessor: &Accessor<'_>) -> bool {
let Ok(payload) = event.data.structure() else {
return true;
};
let Ok(elem) = payload.find_ctx(crate::im::encoding::FABRIC_INDEX_TAG) else {
return true;
};
match elem.non_empty().and_then(|e| e.u8().ok()) {
Some(fab_idx) => fab_idx == accessor.fab_idx,
None => true,
}
}
fn matches_paths(
event: &EventData<'_>,
paths: &TLVArray<'_, EventPath>,
node: &Node<'_>,
accessor: &Accessor<'_>,
) -> Result<bool, Error> {
for path in paths {
let path = path?;
if Self::matches_path(event, path, node, accessor) {
return Ok(true);
}
}
Ok(false)
}
fn matches_path(
event: &EventData<'_>,
path: EventPath,
node: &Node<'_>,
accessor: &Accessor<'_>,
) -> bool {
if node.validate_event_path(&path, accessor).is_err() {
return false;
}
let epath = &event.path;
epath
.node
.is_none_or(|node| path.node.is_none_or(|expected_node| expected_node == node))
&& epath.endpoint.is_none_or(|endpoint| {
path.endpoint
.is_none_or(|expected_endpoint| expected_endpoint == endpoint)
})
&& epath.cluster.is_none_or(|cluster| {
path.cluster
.is_none_or(|expected_cluster| expected_cluster == cluster)
})
&& epath.event.is_none_or(|event| {
path.event
.is_none_or(|expected_event| expected_event == event)
})
}
fn matches_filters(
event: &EventData<'_>,
event_filters: &Option<TLVArray<'_, EventFilter>>,
) -> Result<bool, Error> {
if let Some(filters) = &event_filters {
for filter in filters {
if let Some(event_min) = filter?.event_min {
if event.event_number < event_min {
return Ok(false);
}
}
}
}
Ok(true)
}
fn matches_access(
event: &EventData<'_>,
node: &Node<'_>,
accessor: &Accessor<'_>,
) -> Result<bool, Error> {
Ok(node.validate_event_path(&event.path, accessor).is_ok())
}
}
#[cfg(test)]
mod tests {
use crate::tlv::ToTLV;
use super::*;
#[test]
fn one_entry() {
let crit1 = TestEvent::new(1, EventPriority::Critical);
let mut q: EventsInner<32> = EventsInner::new();
crit1.push_into(&mut q).unwrap();
assert_eq!(TestEvent::vec_from(&q.buf_debug).unwrap(), &[crit1]);
}
#[test]
fn critical_is_promoted() {
let crit1 = TestEvent::new(1, EventPriority::Critical);
let crit2 = TestEvent::new(2, EventPriority::Info);
let crit3 = TestEvent::new(3, EventPriority::Info);
let mut q: EventsInner<32> = EventsInner::new();
crit1.push_into(&mut q).unwrap();
crit2.push_into(&mut q).unwrap();
crit3.push_into(&mut q).unwrap();
assert_eq!(TestEvent::vec_from(&q.buf_debug).unwrap(), &[crit3]);
assert_eq!(TestEvent::vec_from(&q.buf_info).unwrap(), &[crit2]);
assert_eq!(TestEvent::vec_from(&q.buf_critical).unwrap(), &[crit1]);
}
#[test]
fn debug_is_dropped() {
let crit1 = TestEvent::new(1, EventPriority::Critical);
let dbg2 = TestEvent::new(2, EventPriority::Debug);
let crit3: TestEvent = TestEvent::new(3, EventPriority::Critical);
let mut q: EventsInner<32> = EventsInner::new();
crit1.push_into(&mut q).unwrap();
dbg2.push_into(&mut q).unwrap();
crit3.push_into(&mut q).unwrap();
assert_eq!(TestEvent::vec_from(&q.buf_debug).unwrap(), &[crit3]);
assert_eq!(TestEvent::vec_from(&q.buf_info).unwrap(), &[crit1]);
assert_eq!(TestEvent::vec_from(&q.buf_critical).unwrap(), &[]);
}
#[test]
fn event_larger_than_buffer() {
let crit1 = TestEvent::new(1, EventPriority::Critical);
let mut q: EventsInner<8> = EventsInner::new();
assert_eq!(
crit1.push_into(&mut q).expect_err("").code(),
ErrorCode::ResourceExhausted
);
}
#[derive(PartialEq, Clone, Debug)]
struct TestEvent {
endpoint: EndptId,
cluster: ClusterId,
event: EventId,
event_number: EventNumber,
priority: EventPriority,
timestamp: EventDataTimestamp,
data: u64,
}
impl TestEvent {
const fn new(event_number: EventNumber, priority: EventPriority) -> Self {
Self {
endpoint: 42,
cluster: 1,
event: 0xB33F,
event_number,
priority,
timestamp: EventDataTimestamp::EpochTimestamp(10_000 + event_number),
data: 1337,
}
}
fn vec_from<const N: usize>(
buf: &EventsBuf<N>,
) -> Result<heapless::Vec<TestEvent, N>, Error> {
let mut out = heapless::Vec::new();
for tr in buf.iter() {
let e = EventData::from_tlv(&tr?)?;
out.push(TestEvent {
endpoint: e.path.endpoint.unwrap(),
cluster: e.path.cluster.unwrap(),
event: e.path.event.unwrap(),
event_number: e.event_number,
priority: e.priority,
timestamp: e.timestamp,
data: e.data.u64()?,
})
.unwrap();
}
Ok(out)
}
fn push_into<const N: usize>(&self, q: &mut EventsInner<N>) -> Result<(), Error> {
q.push(
self.endpoint,
self.cluster,
self.event,
self.event_number,
self.priority,
self.timestamp.clone(),
|tw| self.data.to_tlv(&EVENT_DATA_TAG, tw),
)
}
}
}