buswatch_types/
metrics.rs

1//! Core metric types for message bus observability.
2
3use alloc::collections::BTreeMap;
4use alloc::string::String;
5
6use crate::Microseconds;
7
8/// Metrics for a single module/consumer/producer in the message bus.
9///
10/// A module is any component that reads from or writes to topics.
11/// This could be a microservice, a worker process, or any logical unit.
12#[derive(Debug, Clone, PartialEq, Default)]
13#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
14#[cfg_attr(feature = "minicbor", derive(minicbor::Encode, minicbor::Decode))]
15pub struct ModuleMetrics {
16    /// Metrics for topics this module reads from (subscriptions).
17    #[cfg_attr(feature = "minicbor", n(0))]
18    pub reads: BTreeMap<String, ReadMetrics>,
19
20    /// Metrics for topics this module writes to (publications).
21    #[cfg_attr(feature = "minicbor", n(1))]
22    pub writes: BTreeMap<String, WriteMetrics>,
23}
24
25impl ModuleMetrics {
26    /// Create empty module metrics.
27    pub fn new() -> Self {
28        Self::default()
29    }
30
31    /// Create a builder for module metrics.
32    pub fn builder() -> ModuleMetricsBuilder {
33        ModuleMetricsBuilder::new()
34    }
35
36    /// Check if the module has any activity.
37    pub fn is_empty(&self) -> bool {
38        self.reads.is_empty() && self.writes.is_empty()
39    }
40
41    /// Total messages read across all topics.
42    pub fn total_reads(&self) -> u64 {
43        self.reads.values().map(|r| r.count).sum()
44    }
45
46    /// Total messages written across all topics.
47    pub fn total_writes(&self) -> u64 {
48        self.writes.values().map(|w| w.count).sum()
49    }
50}
51
52/// Metrics for reading from a topic (subscription/consumer).
53#[derive(Debug, Clone, PartialEq, Default)]
54#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
55#[cfg_attr(feature = "minicbor", derive(minicbor::Encode, minicbor::Decode))]
56pub struct ReadMetrics {
57    /// Number of messages successfully read.
58    #[cfg_attr(feature = "minicbor", n(0))]
59    pub count: u64,
60
61    /// Number of messages waiting to be read (backlog/lag).
62    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
63    #[cfg_attr(feature = "minicbor", n(1))]
64    pub backlog: Option<u64>,
65
66    /// How long the consumer has been waiting for a message.
67    ///
68    /// If set, indicates the consumer is blocked waiting for messages.
69    /// This helps identify slow producers or idle consumers.
70    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
71    #[cfg_attr(feature = "minicbor", n(2))]
72    pub pending: Option<Microseconds>,
73
74    /// Messages read per second (computed over a window).
75    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
76    #[cfg_attr(feature = "minicbor", n(3))]
77    pub rate: Option<f64>,
78}
79
80impl ReadMetrics {
81    /// Create new read metrics with a count.
82    pub fn new(count: u64) -> Self {
83        Self {
84            count,
85            ..Default::default()
86        }
87    }
88
89    /// Create a builder for read metrics.
90    pub fn builder() -> ReadMetricsBuilder {
91        ReadMetricsBuilder::new()
92    }
93
94    /// Check if this read stream appears healthy.
95    ///
96    /// Returns false if there's a significant backlog or long pending time.
97    pub fn is_healthy(&self, max_backlog: u64, max_pending: Microseconds) -> bool {
98        let backlog_ok = self.backlog.is_none_or(|b| b <= max_backlog);
99        let pending_ok = self.pending.is_none_or(|p| p <= max_pending);
100        backlog_ok && pending_ok
101    }
102}
103
104/// Metrics for writing to a topic (publication/producer).
105#[derive(Debug, Clone, PartialEq, Default)]
106#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
107#[cfg_attr(feature = "minicbor", derive(minicbor::Encode, minicbor::Decode))]
108pub struct WriteMetrics {
109    /// Number of messages successfully written.
110    #[cfg_attr(feature = "minicbor", n(0))]
111    pub count: u64,
112
113    /// How long the producer has been waiting to write.
114    ///
115    /// If set, indicates backpressure - the topic or downstream consumers
116    /// are not keeping up.
117    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
118    #[cfg_attr(feature = "minicbor", n(1))]
119    pub pending: Option<Microseconds>,
120
121    /// Messages written per second (computed over a window).
122    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
123    #[cfg_attr(feature = "minicbor", n(2))]
124    pub rate: Option<f64>,
125}
126
127impl WriteMetrics {
128    /// Create new write metrics with a count.
129    pub fn new(count: u64) -> Self {
130        Self {
131            count,
132            ..Default::default()
133        }
134    }
135
136    /// Create a builder for write metrics.
137    pub fn builder() -> WriteMetricsBuilder {
138        WriteMetricsBuilder::new()
139    }
140
141    /// Check if this write stream appears healthy.
142    ///
143    /// Returns false if there's a long pending time (backpressure).
144    pub fn is_healthy(&self, max_pending: Microseconds) -> bool {
145        self.pending.is_none_or(|p| p <= max_pending)
146    }
147}
148
149// ============================================================================
150// Builders
151// ============================================================================
152
153/// Builder for `ModuleMetrics`.
154#[derive(Debug, Default)]
155pub struct ModuleMetricsBuilder {
156    reads: BTreeMap<String, ReadMetrics>,
157    writes: BTreeMap<String, WriteMetrics>,
158}
159
160impl ModuleMetricsBuilder {
161    /// Create a new builder.
162    pub fn new() -> Self {
163        Self::default()
164    }
165
166    /// Add read metrics for a topic.
167    pub fn read<F>(mut self, topic: impl Into<String>, f: F) -> Self
168    where
169        F: FnOnce(ReadMetricsBuilder) -> ReadMetricsBuilder,
170    {
171        let metrics = f(ReadMetricsBuilder::new()).build();
172        self.reads.insert(topic.into(), metrics);
173        self
174    }
175
176    /// Add write metrics for a topic.
177    pub fn write<F>(mut self, topic: impl Into<String>, f: F) -> Self
178    where
179        F: FnOnce(WriteMetricsBuilder) -> WriteMetricsBuilder,
180    {
181        let metrics = f(WriteMetricsBuilder::new()).build();
182        self.writes.insert(topic.into(), metrics);
183        self
184    }
185
186    /// Build the module metrics.
187    pub fn build(self) -> ModuleMetrics {
188        ModuleMetrics {
189            reads: self.reads,
190            writes: self.writes,
191        }
192    }
193}
194
195/// Builder for `ReadMetrics`.
196#[derive(Debug, Default)]
197pub struct ReadMetricsBuilder {
198    count: u64,
199    backlog: Option<u64>,
200    pending: Option<Microseconds>,
201    rate: Option<f64>,
202}
203
204impl ReadMetricsBuilder {
205    /// Create a new builder.
206    pub fn new() -> Self {
207        Self::default()
208    }
209
210    /// Set the message count.
211    pub fn count(mut self, count: u64) -> Self {
212        self.count = count;
213        self
214    }
215
216    /// Set the backlog (unread messages).
217    pub fn backlog(mut self, backlog: u64) -> Self {
218        self.backlog = Some(backlog);
219        self
220    }
221
222    /// Set the pending duration.
223    pub fn pending(mut self, duration: impl Into<Microseconds>) -> Self {
224        self.pending = Some(duration.into());
225        self
226    }
227
228    /// Set the rate (messages per second).
229    pub fn rate(mut self, rate: f64) -> Self {
230        self.rate = Some(rate);
231        self
232    }
233
234    /// Build the read metrics.
235    pub fn build(self) -> ReadMetrics {
236        ReadMetrics {
237            count: self.count,
238            backlog: self.backlog,
239            pending: self.pending,
240            rate: self.rate,
241        }
242    }
243}
244
245/// Builder for `WriteMetrics`.
246#[derive(Debug, Default)]
247pub struct WriteMetricsBuilder {
248    count: u64,
249    pending: Option<Microseconds>,
250    rate: Option<f64>,
251}
252
253impl WriteMetricsBuilder {
254    /// Create a new builder.
255    pub fn new() -> Self {
256        Self::default()
257    }
258
259    /// Set the message count.
260    pub fn count(mut self, count: u64) -> Self {
261        self.count = count;
262        self
263    }
264
265    /// Set the pending duration.
266    pub fn pending(mut self, duration: impl Into<Microseconds>) -> Self {
267        self.pending = Some(duration.into());
268        self
269    }
270
271    /// Set the rate (messages per second).
272    pub fn rate(mut self, rate: f64) -> Self {
273        self.rate = Some(rate);
274        self
275    }
276
277    /// Build the write metrics.
278    pub fn build(self) -> WriteMetrics {
279        WriteMetrics {
280            count: self.count,
281            pending: self.pending,
282            rate: self.rate,
283        }
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use core::time::Duration;
291
292    #[test]
293    fn test_module_metrics_builder() {
294        let metrics = ModuleMetrics::builder()
295            .read("input", |r| r.count(100).backlog(5))
296            .write("output", |w| w.count(95))
297            .build();
298
299        assert_eq!(metrics.total_reads(), 100);
300        assert_eq!(metrics.total_writes(), 95);
301        assert_eq!(metrics.reads.get("input").unwrap().backlog, Some(5));
302    }
303
304    #[test]
305    fn test_read_metrics_health() {
306        let healthy = ReadMetrics::new(100);
307        assert!(healthy.is_healthy(10, Microseconds::from_secs(5)));
308
309        let with_backlog = ReadMetrics::builder().count(100).backlog(20).build();
310        assert!(!with_backlog.is_healthy(10, Microseconds::from_secs(5)));
311
312        let with_pending = ReadMetrics::builder()
313            .count(100)
314            .pending(Duration::from_secs(10))
315            .build();
316        assert!(!with_pending.is_healthy(10, Microseconds::from_secs(5)));
317    }
318}