ralph_workflow/json_parser/health/
metrics.rs1use crate::logger::Colors;
6
7fn classify_streaming_pattern(
8 sizes_vec: &[usize],
9 total_deltas: usize,
10 avg_delta_size: usize,
11) -> StreamingPattern {
12 if total_deltas < 2 {
13 return StreamingPattern::Normal;
14 }
15 let mean_u32 = u32::try_from(avg_delta_size).unwrap_or(u32::MAX);
16 let mean = f64::from(mean_u32);
17 if mean < 0.001 {
18 return StreamingPattern::Empty;
19 }
20 classify_by_coefficient_of_variation(sizes_vec, total_deltas, avg_delta_size, mean)
21}
22
23fn compute_cv(sizes_vec: &[usize], total_deltas: usize, avg_delta_size: usize, mean: f64) -> f64 {
24 let variance_sum: usize = sizes_vec
25 .iter()
26 .map(|&size| {
27 let diff = size.abs_diff(avg_delta_size);
28 diff.saturating_mul(diff)
29 })
30 .sum();
31 let variance = variance_sum / total_deltas;
32 let variance_u32 = u32::try_from(variance).unwrap_or(u32::MAX);
33 f64::from(variance_u32).sqrt() / mean
34}
35
36fn cv_to_pattern(cv: f64) -> StreamingPattern {
37 if cv < 0.3 {
38 StreamingPattern::Smooth
39 } else if cv < 1.0 {
40 StreamingPattern::Normal
41 } else {
42 StreamingPattern::Bursty
43 }
44}
45
46fn classify_by_coefficient_of_variation(
47 sizes_vec: &[usize],
48 total_deltas: usize,
49 avg_delta_size: usize,
50 mean: f64,
51) -> StreamingPattern {
52 cv_to_pattern(compute_cv(sizes_vec, total_deltas, avg_delta_size, mean))
53}
54
55fn format_streaming_base(m: &StreamingQualityMetrics, colors: Colors) -> String {
56 let pattern_str = match m.pattern {
57 StreamingPattern::Empty => "empty",
58 StreamingPattern::Smooth => "smooth",
59 StreamingPattern::Normal => "normal",
60 StreamingPattern::Bursty => "bursty",
61 };
62 format!(
63 "{}[Streaming]{} {} deltas, avg {} bytes (min {}, max {}), pattern: {}",
64 colors.dim(),
65 colors.reset(),
66 m.total_deltas,
67 m.avg_delta_size,
68 m.min_delta_size,
69 m.max_delta_size,
70 pattern_str
71 )
72}
73
74fn push_if_nonzero(parts: &mut Vec<String>, count: usize, label: &str, color: &str, reset: &str) {
75 if count > 0 {
76 parts.push(format!("{}{}: {}{}", color, label, count, reset));
77 }
78}
79
80fn collect_streaming_extras(m: &StreamingQualityMetrics, colors: Colors) -> Vec<String> {
81 let mut parts = Vec::new();
82 push_if_nonzero(
83 &mut parts,
84 m.snapshot_repairs_count,
85 "snapshot repairs",
86 colors.yellow(),
87 colors.reset(),
88 );
89 push_if_nonzero(
90 &mut parts,
91 m.large_delta_count,
92 "large deltas",
93 colors.yellow(),
94 colors.reset(),
95 );
96 push_if_nonzero(
97 &mut parts,
98 m.protocol_violations,
99 "protocol violations",
100 colors.red(),
101 colors.reset(),
102 );
103 if let Some(queue_str) = format_queue_metrics(m, colors) {
104 parts.push(queue_str);
105 }
106 parts
107}
108
109fn collect_queue_parts(m: &StreamingQualityMetrics, colors: Colors) -> Vec<String> {
110 let mut queue_parts = Vec::new();
111 if m.queue_depth > 0 {
112 queue_parts.push(format!("depth: {}", m.queue_depth));
113 }
114 push_if_nonzero(
115 &mut queue_parts,
116 m.queue_dropped_events,
117 "dropped",
118 colors.yellow(),
119 colors.reset(),
120 );
121 push_if_nonzero(
122 &mut queue_parts,
123 m.queue_backpressure_count,
124 "backpressure",
125 colors.yellow(),
126 colors.reset(),
127 );
128 queue_parts
129}
130
131fn format_queue_metrics(m: &StreamingQualityMetrics, colors: Colors) -> Option<String> {
132 if m.queue_depth == 0 && m.queue_dropped_events == 0 && m.queue_backpressure_count == 0 {
133 return None;
134 }
135 let queue_parts = collect_queue_parts(m, colors);
136 if queue_parts.is_empty() {
137 None
138 } else {
139 Some(format!("queue: {}", queue_parts.join(", ")))
140 }
141}
142
143#[derive(Debug, Clone, Default)]
155pub struct StreamingQualityMetrics {
156 pub total_deltas: usize,
158 pub avg_delta_size: usize,
160 pub min_delta_size: usize,
162 pub max_delta_size: usize,
164 pub pattern: StreamingPattern,
166 pub snapshot_repairs_count: usize,
168 pub large_delta_count: usize,
170 pub protocol_violations: usize,
172 pub queue_depth: usize,
174 pub queue_dropped_events: usize,
176 pub queue_backpressure_count: usize,
178}
179
180#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
182pub enum StreamingPattern {
183 #[default]
185 Empty,
186 Smooth,
188 Normal,
190 Bursty,
192}
193
194impl StreamingQualityMetrics {
195 pub fn from_sizes<I: Iterator<Item = usize>>(sizes: I) -> Self {
200 let sizes_vec: Vec<_> = sizes.collect();
201 if sizes_vec.is_empty() {
202 return Self::default();
203 }
204 let total_deltas = sizes_vec.len();
205 let min_delta_size = sizes_vec.iter().copied().min().unwrap_or(0);
206 let max_delta_size = sizes_vec.iter().copied().max().unwrap_or(0);
207 let sum: usize = sizes_vec.iter().sum();
208 let avg_delta_size = sum / total_deltas;
209 let pattern = classify_streaming_pattern(&sizes_vec, total_deltas, avg_delta_size);
210 Self {
211 total_deltas,
212 avg_delta_size,
213 min_delta_size,
214 max_delta_size,
215 pattern,
216 snapshot_repairs_count: 0,
217 large_delta_count: 0,
218 protocol_violations: 0,
219 queue_depth: 0,
220 queue_dropped_events: 0,
221 queue_backpressure_count: 0,
222 }
223 }
224
225 #[must_use]
227 pub fn format(&self, colors: Colors) -> String {
228 if self.total_deltas == 0 {
229 return format!(
230 "{}[Streaming]{} No deltas recorded",
231 colors.dim(),
232 colors.reset()
233 );
234 }
235 let base = format_streaming_base(self, colors);
236 let extras = collect_streaming_extras(self, colors);
237 if extras.is_empty() {
238 base
239 } else {
240 format!("{base}, {}", extras.join(", "))
241 }
242 }
243}