ralph_workflow/json_parser/health/
metrics.rs1#[derive(Debug, Clone, Default)]
17pub struct StreamingQualityMetrics {
18 pub total_deltas: usize,
20 pub avg_delta_size: usize,
22 pub min_delta_size: usize,
24 pub max_delta_size: usize,
26 pub pattern: StreamingPattern,
28 pub snapshot_repairs_count: usize,
30 pub large_delta_count: usize,
32 pub protocol_violations: usize,
34 pub queue_depth: usize,
36 pub queue_dropped_events: usize,
38 pub queue_backpressure_count: usize,
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
44pub enum StreamingPattern {
45 #[default]
47 Empty,
48 Smooth,
50 Normal,
52 Bursty,
54}
55
56impl StreamingQualityMetrics {
57 pub fn from_sizes<I: Iterator<Item = usize>>(sizes: I) -> Self {
62 let sizes_vec: Vec<_> = sizes.collect();
63
64 if sizes_vec.is_empty() {
65 return Self::default();
66 }
67
68 let total_deltas = sizes_vec.len();
69 let min_delta_size = sizes_vec.iter().copied().min().unwrap_or(0);
70 let max_delta_size = sizes_vec.iter().copied().max().unwrap_or(0);
71 let sum: usize = sizes_vec.iter().sum();
72 let avg_delta_size = sum / total_deltas;
73
74 let pattern = if total_deltas < 2 {
77 StreamingPattern::Normal
78 } else {
79 let mean_u32 = u32::try_from(avg_delta_size).unwrap_or(u32::MAX);
81 let mean = f64::from(mean_u32);
82 if mean < 0.001 {
83 StreamingPattern::Empty
84 } else {
85 let variance_sum: usize = sizes_vec
87 .iter()
88 .map(|&size| {
89 let diff = size.abs_diff(avg_delta_size);
90 diff.saturating_mul(diff)
91 })
92 .sum();
93 let variance = variance_sum / total_deltas;
94 let variance_u32 = u32::try_from(variance).unwrap_or(u32::MAX);
96 let std_dev = f64::from(variance_u32).sqrt();
97 let cv = std_dev / mean;
98
99 if cv < 0.3 {
101 StreamingPattern::Smooth
102 } else if cv < 1.0 {
103 StreamingPattern::Normal
104 } else {
105 StreamingPattern::Bursty
106 }
107 }
108 };
109
110 Self {
111 total_deltas,
112 avg_delta_size,
113 min_delta_size,
114 max_delta_size,
115 pattern,
116 snapshot_repairs_count: 0,
117 large_delta_count: 0,
118 protocol_violations: 0,
119 queue_depth: 0,
120 queue_dropped_events: 0,
121 queue_backpressure_count: 0,
122 }
123 }
124
125 #[must_use]
127 pub fn format(&self, colors: Colors) -> String {
128 if self.total_deltas == 0 {
129 return format!(
130 "{}[Streaming]{} No deltas recorded",
131 colors.dim(),
132 colors.reset()
133 );
134 }
135
136 let pattern_str = match self.pattern {
137 StreamingPattern::Empty => "empty",
138 StreamingPattern::Smooth => "smooth",
139 StreamingPattern::Normal => "normal",
140 StreamingPattern::Bursty => "bursty",
141 };
142
143 let mut parts = vec![format!(
144 "{}[Streaming]{} {} deltas, avg {} bytes (min {}, max {}), pattern: {}",
145 colors.dim(),
146 colors.reset(),
147 self.total_deltas,
148 self.avg_delta_size,
149 self.min_delta_size,
150 self.max_delta_size,
151 pattern_str
152 )];
153
154 if self.snapshot_repairs_count > 0 {
155 parts.push(format!(
156 "{}snapshot repairs: {}{}",
157 colors.yellow(),
158 self.snapshot_repairs_count,
159 colors.reset()
160 ));
161 }
162
163 if self.large_delta_count > 0 {
164 parts.push(format!(
165 "{}large deltas: {}{}",
166 colors.yellow(),
167 self.large_delta_count,
168 colors.reset()
169 ));
170 }
171
172 if self.protocol_violations > 0 {
173 parts.push(format!(
174 "{}protocol violations: {}{}",
175 colors.red(),
176 self.protocol_violations,
177 colors.reset()
178 ));
179 }
180
181 if self.queue_depth > 0
183 || self.queue_dropped_events > 0
184 || self.queue_backpressure_count > 0
185 {
186 let mut queue_parts = Vec::new();
187 if self.queue_depth > 0 {
188 queue_parts.push(format!("depth: {}", self.queue_depth));
189 }
190 if self.queue_dropped_events > 0 {
191 queue_parts.push(format!(
192 "{}dropped: {}{}",
193 colors.yellow(),
194 self.queue_dropped_events,
195 colors.reset()
196 ));
197 }
198 if self.queue_backpressure_count > 0 {
199 queue_parts.push(format!(
200 "{}backpressure: {}{}",
201 colors.yellow(),
202 self.queue_backpressure_count,
203 colors.reset()
204 ));
205 }
206 if !queue_parts.is_empty() {
207 parts.push(format!("queue: {}", queue_parts.join(", ")));
208 }
209 }
210
211 parts.join(", ")
212 }
213}