1use alloc::collections::BTreeMap;
4use alloc::string::String;
5
6use crate::Microseconds;
7
8#[derive(Debug, Clone, PartialEq, Default)]
13#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
14#[cfg_attr(feature = "minicbor", derive(minicbor::Encode, minicbor::Decode))]
15pub struct ModuleMetrics {
16 #[cfg_attr(feature = "minicbor", n(0))]
18 pub reads: BTreeMap<String, ReadMetrics>,
19
20 #[cfg_attr(feature = "minicbor", n(1))]
22 pub writes: BTreeMap<String, WriteMetrics>,
23}
24
25impl ModuleMetrics {
26 pub fn new() -> Self {
28 Self::default()
29 }
30
31 pub fn builder() -> ModuleMetricsBuilder {
33 ModuleMetricsBuilder::new()
34 }
35
36 pub fn is_empty(&self) -> bool {
38 self.reads.is_empty() && self.writes.is_empty()
39 }
40
41 pub fn total_reads(&self) -> u64 {
43 self.reads.values().map(|r| r.count).sum()
44 }
45
46 pub fn total_writes(&self) -> u64 {
48 self.writes.values().map(|w| w.count).sum()
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Default)]
54#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
55#[cfg_attr(feature = "minicbor", derive(minicbor::Encode, minicbor::Decode))]
56pub struct ReadMetrics {
57 #[cfg_attr(feature = "minicbor", n(0))]
59 pub count: u64,
60
61 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
63 #[cfg_attr(feature = "minicbor", n(1))]
64 pub backlog: Option<u64>,
65
66 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
71 #[cfg_attr(feature = "minicbor", n(2))]
72 pub pending: Option<Microseconds>,
73
74 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
76 #[cfg_attr(feature = "minicbor", n(3))]
77 pub rate: Option<f64>,
78}
79
80impl ReadMetrics {
81 pub fn new(count: u64) -> Self {
83 Self {
84 count,
85 ..Default::default()
86 }
87 }
88
89 pub fn builder() -> ReadMetricsBuilder {
91 ReadMetricsBuilder::new()
92 }
93
94 pub fn is_healthy(&self, max_backlog: u64, max_pending: Microseconds) -> bool {
98 let backlog_ok = self.backlog.is_none_or(|b| b <= max_backlog);
99 let pending_ok = self.pending.is_none_or(|p| p <= max_pending);
100 backlog_ok && pending_ok
101 }
102}
103
104#[derive(Debug, Clone, PartialEq, Default)]
106#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
107#[cfg_attr(feature = "minicbor", derive(minicbor::Encode, minicbor::Decode))]
108pub struct WriteMetrics {
109 #[cfg_attr(feature = "minicbor", n(0))]
111 pub count: u64,
112
113 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
118 #[cfg_attr(feature = "minicbor", n(1))]
119 pub pending: Option<Microseconds>,
120
121 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
123 #[cfg_attr(feature = "minicbor", n(2))]
124 pub rate: Option<f64>,
125}
126
127impl WriteMetrics {
128 pub fn new(count: u64) -> Self {
130 Self {
131 count,
132 ..Default::default()
133 }
134 }
135
136 pub fn builder() -> WriteMetricsBuilder {
138 WriteMetricsBuilder::new()
139 }
140
141 pub fn is_healthy(&self, max_pending: Microseconds) -> bool {
145 self.pending.is_none_or(|p| p <= max_pending)
146 }
147}
148
149#[derive(Debug, Default)]
155pub struct ModuleMetricsBuilder {
156 reads: BTreeMap<String, ReadMetrics>,
157 writes: BTreeMap<String, WriteMetrics>,
158}
159
160impl ModuleMetricsBuilder {
161 pub fn new() -> Self {
163 Self::default()
164 }
165
166 pub fn read<F>(mut self, topic: impl Into<String>, f: F) -> Self
168 where
169 F: FnOnce(ReadMetricsBuilder) -> ReadMetricsBuilder,
170 {
171 let metrics = f(ReadMetricsBuilder::new()).build();
172 self.reads.insert(topic.into(), metrics);
173 self
174 }
175
176 pub fn write<F>(mut self, topic: impl Into<String>, f: F) -> Self
178 where
179 F: FnOnce(WriteMetricsBuilder) -> WriteMetricsBuilder,
180 {
181 let metrics = f(WriteMetricsBuilder::new()).build();
182 self.writes.insert(topic.into(), metrics);
183 self
184 }
185
186 pub fn build(self) -> ModuleMetrics {
188 ModuleMetrics {
189 reads: self.reads,
190 writes: self.writes,
191 }
192 }
193}
194
195#[derive(Debug, Default)]
197pub struct ReadMetricsBuilder {
198 count: u64,
199 backlog: Option<u64>,
200 pending: Option<Microseconds>,
201 rate: Option<f64>,
202}
203
204impl ReadMetricsBuilder {
205 pub fn new() -> Self {
207 Self::default()
208 }
209
210 pub fn count(mut self, count: u64) -> Self {
212 self.count = count;
213 self
214 }
215
216 pub fn backlog(mut self, backlog: u64) -> Self {
218 self.backlog = Some(backlog);
219 self
220 }
221
222 pub fn pending(mut self, duration: impl Into<Microseconds>) -> Self {
224 self.pending = Some(duration.into());
225 self
226 }
227
228 pub fn rate(mut self, rate: f64) -> Self {
230 self.rate = Some(rate);
231 self
232 }
233
234 pub fn build(self) -> ReadMetrics {
236 ReadMetrics {
237 count: self.count,
238 backlog: self.backlog,
239 pending: self.pending,
240 rate: self.rate,
241 }
242 }
243}
244
245#[derive(Debug, Default)]
247pub struct WriteMetricsBuilder {
248 count: u64,
249 pending: Option<Microseconds>,
250 rate: Option<f64>,
251}
252
253impl WriteMetricsBuilder {
254 pub fn new() -> Self {
256 Self::default()
257 }
258
259 pub fn count(mut self, count: u64) -> Self {
261 self.count = count;
262 self
263 }
264
265 pub fn pending(mut self, duration: impl Into<Microseconds>) -> Self {
267 self.pending = Some(duration.into());
268 self
269 }
270
271 pub fn rate(mut self, rate: f64) -> Self {
273 self.rate = Some(rate);
274 self
275 }
276
277 pub fn build(self) -> WriteMetrics {
279 WriteMetrics {
280 count: self.count,
281 pending: self.pending,
282 rate: self.rate,
283 }
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use core::time::Duration;
291
292 #[test]
293 fn test_module_metrics_builder() {
294 let metrics = ModuleMetrics::builder()
295 .read("input", |r| r.count(100).backlog(5))
296 .write("output", |w| w.count(95))
297 .build();
298
299 assert_eq!(metrics.total_reads(), 100);
300 assert_eq!(metrics.total_writes(), 95);
301 assert_eq!(metrics.reads.get("input").unwrap().backlog, Some(5));
302 }
303
304 #[test]
305 fn test_read_metrics_health() {
306 let healthy = ReadMetrics::new(100);
307 assert!(healthy.is_healthy(10, Microseconds::from_secs(5)));
308
309 let with_backlog = ReadMetrics::builder().count(100).backlog(20).build();
310 assert!(!with_backlog.is_healthy(10, Microseconds::from_secs(5)));
311
312 let with_pending = ReadMetrics::builder()
313 .count(100)
314 .pending(Duration::from_secs(10))
315 .build();
316 assert!(!with_pending.is_healthy(10, Microseconds::from_secs(5)));
317 }
318}