use alloc::collections::BTreeMap;
use alloc::string::String;
use crate::Microseconds;
#[derive(Debug, Clone, PartialEq, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "minicbor", derive(minicbor::Encode, minicbor::Decode))]
pub struct ModuleMetrics {
#[cfg_attr(feature = "minicbor", n(0))]
pub reads: BTreeMap<String, ReadMetrics>,
#[cfg_attr(feature = "minicbor", n(1))]
pub writes: BTreeMap<String, WriteMetrics>,
}
impl ModuleMetrics {
pub fn new() -> Self {
Self::default()
}
pub fn builder() -> ModuleMetricsBuilder {
ModuleMetricsBuilder::new()
}
pub fn is_empty(&self) -> bool {
self.reads.is_empty() && self.writes.is_empty()
}
pub fn total_reads(&self) -> u64 {
self.reads.values().map(|r| r.count).sum()
}
pub fn total_writes(&self) -> u64 {
self.writes.values().map(|w| w.count).sum()
}
}
#[derive(Debug, Clone, PartialEq, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "minicbor", derive(minicbor::Encode, minicbor::Decode))]
pub struct ReadMetrics {
#[cfg_attr(feature = "minicbor", n(0))]
pub count: u64,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(feature = "minicbor", n(1))]
pub backlog: Option<u64>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(feature = "minicbor", n(2))]
pub pending: Option<Microseconds>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(feature = "minicbor", n(3))]
pub rate: Option<f64>,
}
impl ReadMetrics {
pub fn new(count: u64) -> Self {
Self {
count,
..Default::default()
}
}
pub fn builder() -> ReadMetricsBuilder {
ReadMetricsBuilder::new()
}
pub fn is_healthy(&self, max_backlog: u64, max_pending: Microseconds) -> bool {
let backlog_ok = self.backlog.is_none_or(|b| b <= max_backlog);
let pending_ok = self.pending.is_none_or(|p| p <= max_pending);
backlog_ok && pending_ok
}
}
#[derive(Debug, Clone, PartialEq, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "minicbor", derive(minicbor::Encode, minicbor::Decode))]
pub struct WriteMetrics {
#[cfg_attr(feature = "minicbor", n(0))]
pub count: u64,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(feature = "minicbor", n(1))]
pub pending: Option<Microseconds>,
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
#[cfg_attr(feature = "minicbor", n(2))]
pub rate: Option<f64>,
}
impl WriteMetrics {
pub fn new(count: u64) -> Self {
Self {
count,
..Default::default()
}
}
pub fn builder() -> WriteMetricsBuilder {
WriteMetricsBuilder::new()
}
pub fn is_healthy(&self, max_pending: Microseconds) -> bool {
self.pending.is_none_or(|p| p <= max_pending)
}
}
#[derive(Debug, Default)]
pub struct ModuleMetricsBuilder {
reads: BTreeMap<String, ReadMetrics>,
writes: BTreeMap<String, WriteMetrics>,
}
impl ModuleMetricsBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn read<F>(mut self, topic: impl Into<String>, f: F) -> Self
where
F: FnOnce(ReadMetricsBuilder) -> ReadMetricsBuilder,
{
let metrics = f(ReadMetricsBuilder::new()).build();
self.reads.insert(topic.into(), metrics);
self
}
pub fn write<F>(mut self, topic: impl Into<String>, f: F) -> Self
where
F: FnOnce(WriteMetricsBuilder) -> WriteMetricsBuilder,
{
let metrics = f(WriteMetricsBuilder::new()).build();
self.writes.insert(topic.into(), metrics);
self
}
pub fn build(self) -> ModuleMetrics {
ModuleMetrics {
reads: self.reads,
writes: self.writes,
}
}
}
#[derive(Debug, Default)]
pub struct ReadMetricsBuilder {
count: u64,
backlog: Option<u64>,
pending: Option<Microseconds>,
rate: Option<f64>,
}
impl ReadMetricsBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn count(mut self, count: u64) -> Self {
self.count = count;
self
}
pub fn backlog(mut self, backlog: u64) -> Self {
self.backlog = Some(backlog);
self
}
pub fn pending(mut self, duration: impl Into<Microseconds>) -> Self {
self.pending = Some(duration.into());
self
}
pub fn rate(mut self, rate: f64) -> Self {
self.rate = Some(rate);
self
}
pub fn build(self) -> ReadMetrics {
ReadMetrics {
count: self.count,
backlog: self.backlog,
pending: self.pending,
rate: self.rate,
}
}
}
#[derive(Debug, Default)]
pub struct WriteMetricsBuilder {
count: u64,
pending: Option<Microseconds>,
rate: Option<f64>,
}
impl WriteMetricsBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn count(mut self, count: u64) -> Self {
self.count = count;
self
}
pub fn pending(mut self, duration: impl Into<Microseconds>) -> Self {
self.pending = Some(duration.into());
self
}
pub fn rate(mut self, rate: f64) -> Self {
self.rate = Some(rate);
self
}
pub fn build(self) -> WriteMetrics {
WriteMetrics {
count: self.count,
pending: self.pending,
rate: self.rate,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use core::time::Duration;
#[test]
fn test_module_metrics_builder() {
let metrics = ModuleMetrics::builder()
.read("input", |r| r.count(100).backlog(5))
.write("output", |w| w.count(95))
.build();
assert_eq!(metrics.total_reads(), 100);
assert_eq!(metrics.total_writes(), 95);
assert_eq!(metrics.reads.get("input").unwrap().backlog, Some(5));
}
#[test]
fn test_read_metrics_health() {
let healthy = ReadMetrics::new(100);
assert!(healthy.is_healthy(10, Microseconds::from_secs(5)));
let with_backlog = ReadMetrics::builder().count(100).backlog(20).build();
assert!(!with_backlog.is_healthy(10, Microseconds::from_secs(5)));
let with_pending = ReadMetrics::builder()
.count(100)
.pending(Duration::from_secs(10))
.build();
assert!(!with_pending.is_healthy(10, Microseconds::from_secs(5)));
}
}