Skip to main content

ralph_workflow/json_parser/health/
metrics.rs

1// Streaming quality metrics.
2//
3// Contains StreamingQualityMetrics and StreamingPattern.
4
5use crate::logger::Colors;
6
7fn classify_streaming_pattern(
8    sizes_vec: &[usize],
9    total_deltas: usize,
10    avg_delta_size: usize,
11) -> StreamingPattern {
12    if total_deltas < 2 {
13        return StreamingPattern::Normal;
14    }
15    let mean_u32 = u32::try_from(avg_delta_size).unwrap_or(u32::MAX);
16    let mean = f64::from(mean_u32);
17    if mean < 0.001 {
18        return StreamingPattern::Empty;
19    }
20    classify_by_coefficient_of_variation(sizes_vec, total_deltas, avg_delta_size, mean)
21}
22
23fn compute_cv(sizes_vec: &[usize], total_deltas: usize, avg_delta_size: usize, mean: f64) -> f64 {
24    let variance_sum: usize = sizes_vec
25        .iter()
26        .map(|&size| {
27            let diff = size.abs_diff(avg_delta_size);
28            diff.saturating_mul(diff)
29        })
30        .sum();
31    let variance = variance_sum / total_deltas;
32    let variance_u32 = u32::try_from(variance).unwrap_or(u32::MAX);
33    f64::from(variance_u32).sqrt() / mean
34}
35
36fn cv_to_pattern(cv: f64) -> StreamingPattern {
37    if cv < 0.3 {
38        StreamingPattern::Smooth
39    } else if cv < 1.0 {
40        StreamingPattern::Normal
41    } else {
42        StreamingPattern::Bursty
43    }
44}
45
46fn classify_by_coefficient_of_variation(
47    sizes_vec: &[usize],
48    total_deltas: usize,
49    avg_delta_size: usize,
50    mean: f64,
51) -> StreamingPattern {
52    cv_to_pattern(compute_cv(sizes_vec, total_deltas, avg_delta_size, mean))
53}
54
55fn format_streaming_base(m: &StreamingQualityMetrics, colors: Colors) -> String {
56    let pattern_str = match m.pattern {
57        StreamingPattern::Empty => "empty",
58        StreamingPattern::Smooth => "smooth",
59        StreamingPattern::Normal => "normal",
60        StreamingPattern::Bursty => "bursty",
61    };
62    format!(
63        "{}[Streaming]{} {} deltas, avg {} bytes (min {}, max {}), pattern: {}",
64        colors.dim(),
65        colors.reset(),
66        m.total_deltas,
67        m.avg_delta_size,
68        m.min_delta_size,
69        m.max_delta_size,
70        pattern_str
71    )
72}
73
74fn push_if_nonzero(parts: &mut Vec<String>, count: usize, label: &str, color: &str, reset: &str) {
75    if count > 0 {
76        parts.push(format!("{}{}: {}{}", color, label, count, reset));
77    }
78}
79
80fn collect_streaming_extras(m: &StreamingQualityMetrics, colors: Colors) -> Vec<String> {
81    let mut parts = Vec::new();
82    push_if_nonzero(
83        &mut parts,
84        m.snapshot_repairs_count,
85        "snapshot repairs",
86        colors.yellow(),
87        colors.reset(),
88    );
89    push_if_nonzero(
90        &mut parts,
91        m.large_delta_count,
92        "large deltas",
93        colors.yellow(),
94        colors.reset(),
95    );
96    push_if_nonzero(
97        &mut parts,
98        m.protocol_violations,
99        "protocol violations",
100        colors.red(),
101        colors.reset(),
102    );
103    if let Some(queue_str) = format_queue_metrics(m, colors) {
104        parts.push(queue_str);
105    }
106    parts
107}
108
109fn collect_queue_parts(m: &StreamingQualityMetrics, colors: Colors) -> Vec<String> {
110    let mut queue_parts = Vec::new();
111    if m.queue_depth > 0 {
112        queue_parts.push(format!("depth: {}", m.queue_depth));
113    }
114    push_if_nonzero(
115        &mut queue_parts,
116        m.queue_dropped_events,
117        "dropped",
118        colors.yellow(),
119        colors.reset(),
120    );
121    push_if_nonzero(
122        &mut queue_parts,
123        m.queue_backpressure_count,
124        "backpressure",
125        colors.yellow(),
126        colors.reset(),
127    );
128    queue_parts
129}
130
131fn format_queue_metrics(m: &StreamingQualityMetrics, colors: Colors) -> Option<String> {
132    if m.queue_depth == 0 && m.queue_dropped_events == 0 && m.queue_backpressure_count == 0 {
133        return None;
134    }
135    let queue_parts = collect_queue_parts(m, colors);
136    if queue_parts.is_empty() {
137        None
138    } else {
139        Some(format!("queue: {}", queue_parts.join(", ")))
140    }
141}
142
143/// Streaming quality metrics for analyzing streaming behavior.
144///
145/// These metrics help diagnose issues with streaming performance and
146/// inform future improvements to the streaming infrastructure.
147///
148/// # Metrics Tracked
149///
150/// - **Delta sizes**: Average, min, max sizes to understand streaming granularity
151/// - **Total deltas**: Count of deltas processed
152/// - **Streaming pattern**: Classification based on size variance
153/// - **Queue metrics**: Event queue depth, dropped events, and backpressure (when using bounded queue)
154#[derive(Debug, Clone, Default)]
155pub struct StreamingQualityMetrics {
156    /// Total number of deltas processed
157    pub total_deltas: usize,
158    /// Average delta size in bytes
159    pub avg_delta_size: usize,
160    /// Minimum delta size in bytes
161    pub min_delta_size: usize,
162    /// Maximum delta size in bytes
163    pub max_delta_size: usize,
164    /// Classification of streaming pattern
165    pub pattern: StreamingPattern,
166    /// Number of times auto-repair was triggered for snapshot-as-delta bugs
167    pub snapshot_repairs_count: usize,
168    /// Number of deltas that exceeded the size threshold (indicating potential snapshots)
169    pub large_delta_count: usize,
170    /// Number of protocol violations detected (e.g., `MessageStart` during streaming)
171    pub protocol_violations: usize,
172    /// Queue depth (number of events in queue) - 0 if queue not in use
173    pub queue_depth: usize,
174    /// Number of events dropped due to queue overflow - 0 if queue not in use
175    pub queue_dropped_events: usize,
176    /// Number of times backpressure was triggered (send blocked on full queue) - 0 if queue not in use
177    pub queue_backpressure_count: usize,
178}
179
180/// Classification of streaming patterns based on delta size variance.
181#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
182pub enum StreamingPattern {
183    /// No deltas to classify
184    #[default]
185    Empty,
186    /// Uniform delta sizes (low variance) - smooth streaming
187    Smooth,
188    /// Mixed delta sizes (medium variance) - normal streaming
189    Normal,
190    /// Highly variable delta sizes (high variance) - bursty/chunked streaming
191    Bursty,
192}
193
194impl StreamingQualityMetrics {
195    /// Create metrics from a collection of delta sizes.
196    ///
197    /// # Arguments
198    /// * `sizes` - Iterator of delta sizes in bytes
199    pub fn from_sizes<I: Iterator<Item = usize>>(sizes: I) -> Self {
200        let sizes_vec: Vec<_> = sizes.collect();
201        if sizes_vec.is_empty() {
202            return Self::default();
203        }
204        let total_deltas = sizes_vec.len();
205        let min_delta_size = sizes_vec.iter().copied().min().unwrap_or(0);
206        let max_delta_size = sizes_vec.iter().copied().max().unwrap_or(0);
207        let sum: usize = sizes_vec.iter().sum();
208        let avg_delta_size = sum / total_deltas;
209        let pattern = classify_streaming_pattern(&sizes_vec, total_deltas, avg_delta_size);
210        Self {
211            total_deltas,
212            avg_delta_size,
213            min_delta_size,
214            max_delta_size,
215            pattern,
216            snapshot_repairs_count: 0,
217            large_delta_count: 0,
218            protocol_violations: 0,
219            queue_depth: 0,
220            queue_dropped_events: 0,
221            queue_backpressure_count: 0,
222        }
223    }
224
225    /// Format metrics for display.
226    #[must_use]
227    pub fn format(&self, colors: Colors) -> String {
228        if self.total_deltas == 0 {
229            return format!(
230                "{}[Streaming]{} No deltas recorded",
231                colors.dim(),
232                colors.reset()
233            );
234        }
235        let base = format_streaming_base(self, colors);
236        let extras = collect_streaming_extras(self, colors);
237        if extras.is_empty() {
238            base
239        } else {
240            format!("{base}, {}", extras.join(", "))
241        }
242    }
243}