ralph_workflow/json_parser/health/
metrics.rs1#[derive(Debug, Clone, Default)]
17pub struct StreamingQualityMetrics {
18 pub total_deltas: usize,
20 pub avg_delta_size: usize,
22 pub min_delta_size: usize,
24 pub max_delta_size: usize,
26 pub pattern: StreamingPattern,
28 pub snapshot_repairs_count: usize,
30 pub large_delta_count: usize,
32 pub protocol_violations: usize,
34 pub queue_depth: usize,
36 pub queue_dropped_events: usize,
38 pub queue_backpressure_count: usize,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
44pub enum StreamingPattern {
45 #[default]
47 Empty,
48 Smooth,
50 Normal,
52 Bursty,
54}
55
56impl StreamingQualityMetrics {
57 pub fn from_sizes<I: Iterator<Item = usize>>(sizes: I) -> Self {
62 let sizes_vec: Vec<_> = sizes.collect();
63
64 if sizes_vec.is_empty() {
65 return Self::default();
66 }
67
68 let total_deltas = sizes_vec.len();
69 let min_delta_size = sizes_vec.iter().copied().min().unwrap_or(0);
70 let max_delta_size = sizes_vec.iter().copied().max().unwrap_or(0);
71 let sum: usize = sizes_vec.iter().sum();
72 let avg_delta_size = sum / total_deltas;
73
74 let pattern = if total_deltas < 2 {
77 StreamingPattern::Normal
78 } else {
79 let mean_u32 = u32::try_from(avg_delta_size).unwrap_or(u32::MAX);
81 let mean = f64::from(mean_u32);
82 if mean < 0.001 {
83 StreamingPattern::Empty
84 } else {
85 let variance_sum: usize = sizes_vec
87 .iter()
88 .map(|&size| {
89 let diff = size.abs_diff(avg_delta_size);
90 diff.saturating_mul(diff)
91 })
92 .sum();
93 let variance = variance_sum / total_deltas;
94 let variance_u32 = u32::try_from(variance).unwrap_or(u32::MAX);
96 let std_dev = f64::from(variance_u32).sqrt();
97 let cv = std_dev / mean;
98
99 if cv < 0.3 {
101 StreamingPattern::Smooth
102 } else if cv < 1.0 {
103 StreamingPattern::Normal
104 } else {
105 StreamingPattern::Bursty
106 }
107 }
108 };
109
110 Self {
111 total_deltas,
112 avg_delta_size,
113 min_delta_size,
114 max_delta_size,
115 pattern,
116 snapshot_repairs_count: 0,
117 large_delta_count: 0,
118 protocol_violations: 0,
119 queue_depth: 0,
120 queue_dropped_events: 0,
121 queue_backpressure_count: 0,
122 }
123 }
124
125 pub fn format(&self, colors: Colors) -> String {
127 if self.total_deltas == 0 {
128 return format!(
129 "{}[Streaming]{} No deltas recorded",
130 colors.dim(),
131 colors.reset()
132 );
133 }
134
135 let pattern_str = match self.pattern {
136 StreamingPattern::Empty => "empty",
137 StreamingPattern::Smooth => "smooth",
138 StreamingPattern::Normal => "normal",
139 StreamingPattern::Bursty => "bursty",
140 };
141
142 let mut parts = vec![format!(
143 "{}[Streaming]{} {} deltas, avg {} bytes (min {}, max {}), pattern: {}",
144 colors.dim(),
145 colors.reset(),
146 self.total_deltas,
147 self.avg_delta_size,
148 self.min_delta_size,
149 self.max_delta_size,
150 pattern_str
151 )];
152
153 if self.snapshot_repairs_count > 0 {
154 parts.push(format!(
155 "{}snapshot repairs: {}{}",
156 colors.yellow(),
157 self.snapshot_repairs_count,
158 colors.reset()
159 ));
160 }
161
162 if self.large_delta_count > 0 {
163 parts.push(format!(
164 "{}large deltas: {}{}",
165 colors.yellow(),
166 self.large_delta_count,
167 colors.reset()
168 ));
169 }
170
171 if self.protocol_violations > 0 {
172 parts.push(format!(
173 "{}protocol violations: {}{}",
174 colors.red(),
175 self.protocol_violations,
176 colors.reset()
177 ));
178 }
179
180 if self.queue_depth > 0
182 || self.queue_dropped_events > 0
183 || self.queue_backpressure_count > 0
184 {
185 let mut queue_parts = Vec::new();
186 if self.queue_depth > 0 {
187 queue_parts.push(format!("depth: {}", self.queue_depth));
188 }
189 if self.queue_dropped_events > 0 {
190 queue_parts.push(format!(
191 "{}dropped: {}{}",
192 colors.yellow(),
193 self.queue_dropped_events,
194 colors.reset()
195 ));
196 }
197 if self.queue_backpressure_count > 0 {
198 queue_parts.push(format!(
199 "{}backpressure: {}{}",
200 colors.yellow(),
201 self.queue_backpressure_count,
202 colors.reset()
203 ));
204 }
205 if !queue_parts.is_empty() {
206 parts.push(format!("queue: {}", queue_parts.join(", ")));
207 }
208 }
209
210 parts.join(", ")
211 }
212}