Skip to main content

ralph_workflow/json_parser/health/
metrics.rs

1// Streaming quality metrics.
2//
3// Contains StreamingQualityMetrics and StreamingPattern.
4
5/// Streaming quality metrics for analyzing streaming behavior.
6///
7/// These metrics help diagnose issues with streaming performance and
8/// inform future improvements to the streaming infrastructure.
9///
10/// # Metrics Tracked
11///
12/// - **Delta sizes**: Average, min, max sizes to understand streaming granularity
13/// - **Total deltas**: Count of deltas processed
14/// - **Streaming pattern**: Classification based on size variance
15/// - **Queue metrics**: Event queue depth, dropped events, and backpressure (when using bounded queue)
16#[derive(Debug, Clone, Default)]
17pub struct StreamingQualityMetrics {
18    /// Total number of deltas processed
19    pub total_deltas: usize,
20    /// Average delta size in bytes
21    pub avg_delta_size: usize,
22    /// Minimum delta size in bytes
23    pub min_delta_size: usize,
24    /// Maximum delta size in bytes
25    pub max_delta_size: usize,
26    /// Classification of streaming pattern
27    pub pattern: StreamingPattern,
28    /// Number of times auto-repair was triggered for snapshot-as-delta bugs
29    pub snapshot_repairs_count: usize,
30    /// Number of deltas that exceeded the size threshold (indicating potential snapshots)
31    pub large_delta_count: usize,
32    /// Number of protocol violations detected (e.g., `MessageStart` during streaming)
33    pub protocol_violations: usize,
34    /// Queue depth (number of events in queue) - 0 if queue not in use
35    pub queue_depth: usize,
36    /// Number of events dropped due to queue overflow - 0 if queue not in use
37    pub queue_dropped_events: usize,
38    /// Number of times backpressure was triggered (send blocked on full queue) - 0 if queue not in use
39    pub queue_backpressure_count: usize,
40}
41
42/// Classification of streaming patterns based on delta size variance.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
44pub enum StreamingPattern {
45    /// No deltas to classify
46    #[default]
47    Empty,
48    /// Uniform delta sizes (low variance) - smooth streaming
49    Smooth,
50    /// Mixed delta sizes (medium variance) - normal streaming
51    Normal,
52    /// Highly variable delta sizes (high variance) - bursty/chunked streaming
53    Bursty,
54}
55
56impl StreamingQualityMetrics {
57    /// Create metrics from a collection of delta sizes.
58    ///
59    /// # Arguments
60    /// * `sizes` - Iterator of delta sizes in bytes
61    pub fn from_sizes<I: Iterator<Item = usize>>(sizes: I) -> Self {
62        let sizes_vec: Vec<_> = sizes.collect();
63
64        if sizes_vec.is_empty() {
65            return Self::default();
66        }
67
68        let total_deltas = sizes_vec.len();
69        let min_delta_size = sizes_vec.iter().copied().min().unwrap_or(0);
70        let max_delta_size = sizes_vec.iter().copied().max().unwrap_or(0);
71        let sum: usize = sizes_vec.iter().sum();
72        let avg_delta_size = sum / total_deltas;
73
74        // Calculate variance to determine pattern
75        // Use coefficient of variation: std_dev / mean
76        let pattern = if total_deltas < 2 {
77            StreamingPattern::Normal
78        } else {
79            // Convert to u32 for safe f64 conversion (delta sizes are typically small)
80            let mean_u32 = u32::try_from(avg_delta_size).unwrap_or(u32::MAX);
81            let mean = f64::from(mean_u32);
82            if mean < 0.001 {
83                StreamingPattern::Empty
84            } else {
85                // Calculate variance using integer-safe arithmetic
86                let variance_sum: usize = sizes_vec
87                    .iter()
88                    .map(|&size| {
89                        let diff = size.abs_diff(avg_delta_size);
90                        diff.saturating_mul(diff)
91                    })
92                    .sum();
93                let variance = variance_sum / total_deltas;
94                // Convert to u32 for safe f64 conversion
95                let variance_u32 = u32::try_from(variance).unwrap_or(u32::MAX);
96                let std_dev = f64::from(variance_u32).sqrt();
97                let cv = std_dev / mean;
98
99                // Thresholds based on coefficient of variation
100                if cv < 0.3 {
101                    StreamingPattern::Smooth
102                } else if cv < 1.0 {
103                    StreamingPattern::Normal
104                } else {
105                    StreamingPattern::Bursty
106                }
107            }
108        };
109
110        Self {
111            total_deltas,
112            avg_delta_size,
113            min_delta_size,
114            max_delta_size,
115            pattern,
116            snapshot_repairs_count: 0,
117            large_delta_count: 0,
118            protocol_violations: 0,
119            queue_depth: 0,
120            queue_dropped_events: 0,
121            queue_backpressure_count: 0,
122        }
123    }
124
125    /// Format metrics for display.
126    pub fn format(&self, colors: Colors) -> String {
127        if self.total_deltas == 0 {
128            return format!(
129                "{}[Streaming]{} No deltas recorded",
130                colors.dim(),
131                colors.reset()
132            );
133        }
134
135        let pattern_str = match self.pattern {
136            StreamingPattern::Empty => "empty",
137            StreamingPattern::Smooth => "smooth",
138            StreamingPattern::Normal => "normal",
139            StreamingPattern::Bursty => "bursty",
140        };
141
142        let mut parts = vec![format!(
143            "{}[Streaming]{} {} deltas, avg {} bytes (min {}, max {}), pattern: {}",
144            colors.dim(),
145            colors.reset(),
146            self.total_deltas,
147            self.avg_delta_size,
148            self.min_delta_size,
149            self.max_delta_size,
150            pattern_str
151        )];
152
153        if self.snapshot_repairs_count > 0 {
154            parts.push(format!(
155                "{}snapshot repairs: {}{}",
156                colors.yellow(),
157                self.snapshot_repairs_count,
158                colors.reset()
159            ));
160        }
161
162        if self.large_delta_count > 0 {
163            parts.push(format!(
164                "{}large deltas: {}{}",
165                colors.yellow(),
166                self.large_delta_count,
167                colors.reset()
168            ));
169        }
170
171        if self.protocol_violations > 0 {
172            parts.push(format!(
173                "{}protocol violations: {}{}",
174                colors.red(),
175                self.protocol_violations,
176                colors.reset()
177            ));
178        }
179
180        // Queue metrics (only show if queue is in use)
181        if self.queue_depth > 0
182            || self.queue_dropped_events > 0
183            || self.queue_backpressure_count > 0
184        {
185            let mut queue_parts = Vec::new();
186            if self.queue_depth > 0 {
187                queue_parts.push(format!("depth: {}", self.queue_depth));
188            }
189            if self.queue_dropped_events > 0 {
190                queue_parts.push(format!(
191                    "{}dropped: {}{}",
192                    colors.yellow(),
193                    self.queue_dropped_events,
194                    colors.reset()
195                ));
196            }
197            if self.queue_backpressure_count > 0 {
198                queue_parts.push(format!(
199                    "{}backpressure: {}{}",
200                    colors.yellow(),
201                    self.queue_backpressure_count,
202                    colors.reset()
203                ));
204            }
205            if !queue_parts.is_empty() {
206                parts.push(format!("queue: {}", queue_parts.join(", ")));
207            }
208        }
209
210        parts.join(", ")
211    }
212}