Skip to main content

ralph_workflow/json_parser/
health.rs

1//! Parser health monitoring and graceful degradation.
2//!
3//! This module provides utilities for monitoring parser health,
4//! tracking parsed vs ignored events, and providing warnings when
5//! parsers are not working correctly with specific agents.
6//!
7//! # Event Classification
8//!
9//! Events are classified into the following categories:
10//!
11//! - **Parsed events**: Successfully processed and displayed, including:
12//!   - Complete content events
13//!   - Successfully handled event types
14//!
15//! - **Partial events**: Streaming delta events (text deltas, thinking deltas,
16//!   tool input deltas) that are displayed incrementally. These are NOT errors
17//!   and are tracked separately to show real-time streaming activity without
18//!   inflating "ignored" percentages.
19//!
20//! - **Control events**: State management events that don't produce user-facing
21//!   output. These are NOT errors and are tracked separately to avoid inflating
22//!   "ignored" percentages. Examples: `MessageStart`, `ContentBlockStart`, `Ping`,
23//!   `TurnStarted`, `StepStarted`.
24//!
25//! - **Unknown events**: Valid JSON that the parser deserializes successfully
26//!   but doesn't have specific handling for. These are NOT considered errors
27//!   and won't trigger health warnings. They represent future/new event types.
28//!
29//! - **Parse errors**: Malformed JSON that cannot be deserialized. These DO
30//!   trigger health warnings when they exceed 50% of events.
31//!
32//! - **Ignored events**: General category for events not displayed (includes
33//!   both unknown events and parse errors)
34//!
35//! # Streaming Quality Metrics
36//!
37//! The [`StreamingQualityMetrics`] struct provides insights into streaming behavior:
38//!
39//! - **Delta sizes**: Average, min, max delta sizes to understand streaming granularity
40//! - **Total deltas**: Count of deltas per content block
41//! - **Streaming pattern**: Classification as smooth, bursty, or chunked based on size variance
42
43use crate::logger::Colors;
44use std::cell::Cell;
45
46/// Streaming quality metrics for analyzing streaming behavior.
47///
48/// These metrics help diagnose issues with streaming performance and
49/// inform future improvements to the streaming infrastructure.
50///
51/// # Metrics Tracked
52///
53/// - **Delta sizes**: Average, min, max sizes to understand streaming granularity
54/// - **Total deltas**: Count of deltas processed
55/// - **Streaming pattern**: Classification based on size variance
56/// - **Queue metrics**: Event queue depth, dropped events, and backpressure (when using bounded queue)
57#[derive(Debug, Clone, Default)]
58pub struct StreamingQualityMetrics {
59    /// Total number of deltas processed
60    pub total_deltas: usize,
61    /// Average delta size in bytes
62    pub avg_delta_size: usize,
63    /// Minimum delta size in bytes
64    pub min_delta_size: usize,
65    /// Maximum delta size in bytes
66    pub max_delta_size: usize,
67    /// Classification of streaming pattern
68    pub pattern: StreamingPattern,
69    /// Number of times auto-repair was triggered for snapshot-as-delta bugs
70    pub snapshot_repairs_count: usize,
71    /// Number of deltas that exceeded the size threshold (indicating potential snapshots)
72    pub large_delta_count: usize,
73    /// Number of protocol violations detected (e.g., `MessageStart` during streaming)
74    pub protocol_violations: usize,
75    /// Queue depth (number of events in queue) - 0 if queue not in use
76    pub queue_depth: usize,
77    /// Number of events dropped due to queue overflow - 0 if queue not in use
78    pub queue_dropped_events: usize,
79    /// Number of times backpressure was triggered (send blocked on full queue) - 0 if queue not in use
80    pub queue_backpressure_count: usize,
81}
82
83/// Classification of streaming patterns based on delta size variance.
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
85pub enum StreamingPattern {
86    /// No deltas to classify
87    #[default]
88    Empty,
89    /// Uniform delta sizes (low variance) - smooth streaming
90    Smooth,
91    /// Mixed delta sizes (medium variance) - normal streaming
92    Normal,
93    /// Highly variable delta sizes (high variance) - bursty/chunked streaming
94    Bursty,
95}
96
97impl StreamingQualityMetrics {
98    /// Create metrics from a collection of delta sizes.
99    ///
100    /// # Arguments
101    /// * `sizes` - Iterator of delta sizes in bytes
102    pub fn from_sizes<I: Iterator<Item = usize>>(sizes: I) -> Self {
103        let sizes_vec: Vec<_> = sizes.collect();
104
105        if sizes_vec.is_empty() {
106            return Self::default();
107        }
108
109        let total_deltas = sizes_vec.len();
110        let min_delta_size = sizes_vec.iter().copied().min().unwrap_or(0);
111        let max_delta_size = sizes_vec.iter().copied().max().unwrap_or(0);
112        let sum: usize = sizes_vec.iter().sum();
113        let avg_delta_size = sum / total_deltas;
114
115        // Calculate variance to determine pattern
116        // Use coefficient of variation: std_dev / mean
117        let pattern = if total_deltas < 2 {
118            StreamingPattern::Normal
119        } else {
120            // Convert to u32 for safe f64 conversion (delta sizes are typically small)
121            let mean_u32 = u32::try_from(avg_delta_size).unwrap_or(u32::MAX);
122            let mean = f64::from(mean_u32);
123            if mean < 0.001 {
124                StreamingPattern::Empty
125            } else {
126                // Calculate variance using integer-safe arithmetic
127                let variance_sum: usize = sizes_vec
128                    .iter()
129                    .map(|&size| {
130                        let diff = size.abs_diff(avg_delta_size);
131                        diff.saturating_mul(diff)
132                    })
133                    .sum();
134                let variance = variance_sum / total_deltas;
135                // Convert to u32 for safe f64 conversion
136                let variance_u32 = u32::try_from(variance).unwrap_or(u32::MAX);
137                let std_dev = f64::from(variance_u32).sqrt();
138                let cv = std_dev / mean;
139
140                // Thresholds based on coefficient of variation
141                if cv < 0.3 {
142                    StreamingPattern::Smooth
143                } else if cv < 1.0 {
144                    StreamingPattern::Normal
145                } else {
146                    StreamingPattern::Bursty
147                }
148            }
149        };
150
151        Self {
152            total_deltas,
153            avg_delta_size,
154            min_delta_size,
155            max_delta_size,
156            pattern,
157            snapshot_repairs_count: 0,
158            large_delta_count: 0,
159            protocol_violations: 0,
160            queue_depth: 0,
161            queue_dropped_events: 0,
162            queue_backpressure_count: 0,
163        }
164    }
165
166    /// Format metrics for display.
167    pub fn format(&self, colors: Colors) -> String {
168        if self.total_deltas == 0 {
169            return format!(
170                "{}[Streaming]{} No deltas recorded",
171                colors.dim(),
172                colors.reset()
173            );
174        }
175
176        let pattern_str = match self.pattern {
177            StreamingPattern::Empty => "empty",
178            StreamingPattern::Smooth => "smooth",
179            StreamingPattern::Normal => "normal",
180            StreamingPattern::Bursty => "bursty",
181        };
182
183        let mut parts = vec![format!(
184            "{}[Streaming]{} {} deltas, avg {} bytes (min {}, max {}), pattern: {}",
185            colors.dim(),
186            colors.reset(),
187            self.total_deltas,
188            self.avg_delta_size,
189            self.min_delta_size,
190            self.max_delta_size,
191            pattern_str
192        )];
193
194        if self.snapshot_repairs_count > 0 {
195            parts.push(format!(
196                "{}snapshot repairs: {}{}",
197                colors.yellow(),
198                self.snapshot_repairs_count,
199                colors.reset()
200            ));
201        }
202
203        if self.large_delta_count > 0 {
204            parts.push(format!(
205                "{}large deltas: {}{}",
206                colors.yellow(),
207                self.large_delta_count,
208                colors.reset()
209            ));
210        }
211
212        if self.protocol_violations > 0 {
213            parts.push(format!(
214                "{}protocol violations: {}{}",
215                colors.red(),
216                self.protocol_violations,
217                colors.reset()
218            ));
219        }
220
221        // Queue metrics (only show if queue is in use)
222        if self.queue_depth > 0
223            || self.queue_dropped_events > 0
224            || self.queue_backpressure_count > 0
225        {
226            let mut queue_parts = Vec::new();
227            if self.queue_depth > 0 {
228                queue_parts.push(format!("depth: {}", self.queue_depth));
229            }
230            if self.queue_dropped_events > 0 {
231                queue_parts.push(format!(
232                    "{}dropped: {}{}",
233                    colors.yellow(),
234                    self.queue_dropped_events,
235                    colors.reset()
236                ));
237            }
238            if self.queue_backpressure_count > 0 {
239                queue_parts.push(format!(
240                    "{}backpressure: {}{}",
241                    colors.yellow(),
242                    self.queue_backpressure_count,
243                    colors.reset()
244                ));
245            }
246            if !queue_parts.is_empty() {
247                parts.push(format!("queue: {}", queue_parts.join(", ")));
248            }
249        }
250
251        parts.join(", ")
252    }
253}
254
255/// Parser health statistics
256#[derive(Debug, Default, Clone, Copy)]
257pub struct ParserHealth {
258    /// Total number of events processed
259    pub total_events: u64,
260    /// Number of events successfully parsed and displayed
261    pub parsed_events: u64,
262    /// Number of partial/delta events (streaming content displayed incrementally)
263    pub partial_events: u64,
264    /// Number of events ignored (malformed JSON, unknown events, etc.)
265    pub ignored_events: u64,
266    /// Number of control events (state management, no user output)
267    pub control_events: u64,
268    /// Number of unknown event types (valid JSON but unhandled)
269    pub unknown_events: u64,
270    /// Number of JSON parse errors (malformed JSON)
271    pub parse_errors: u64,
272}
273
274impl ParserHealth {
275    /// Create a new health tracker
276    pub fn new() -> Self {
277        Self::default()
278    }
279
280    /// Record a parsed event
281    pub const fn record_parsed(&mut self) {
282        self.total_events += 1;
283        self.parsed_events += 1;
284    }
285
286    /// Record an ignored event
287    pub const fn record_ignored(&mut self) {
288        self.total_events += 1;
289        self.ignored_events += 1;
290    }
291
292    /// Record an unknown event type (valid JSON but unhandled)
293    ///
294    /// Unknown events are valid JSON that the parser deserialized successfully
295    /// but doesn't have specific handling for. These should not trigger health
296    /// warnings as they represent future/new event types, not parser errors.
297    pub const fn record_unknown_event(&mut self) {
298        self.total_events += 1;
299        self.unknown_events += 1;
300        self.ignored_events += 1;
301    }
302
303    /// Record a parse error (malformed JSON)
304    pub const fn record_parse_error(&mut self) {
305        self.total_events += 1;
306        self.parse_errors += 1;
307        self.ignored_events += 1;
308    }
309
310    /// Record a control event (state management with no user-facing output)
311    ///
312    /// Control events are valid JSON that represent state transitions
313    /// rather than user-facing content. They should not be counted as
314    /// "ignored" for health monitoring purposes.
315    pub const fn record_control_event(&mut self) {
316        self.total_events += 1;
317        self.control_events += 1;
318    }
319
320    /// Record a partial/delta event (streaming content displayed incrementally)
321    ///
322    /// Partial events represent streaming content that is shown to the user
323    /// in real-time as deltas. These are NOT errors and should not trigger
324    /// health warnings. They are tracked separately to show streaming activity.
325    pub const fn record_partial_event(&mut self) {
326        self.total_events += 1;
327        self.partial_events += 1;
328    }
329
330    /// Get the percentage of parse errors (excluding unknown events)
331    ///
332    /// Returns percentage using integer-safe arithmetic to avoid precision loss warnings.
333    pub fn parse_error_percentage(&self) -> f64 {
334        if self.total_events == 0 {
335            return 0.0;
336        }
337        // Use integer arithmetic: (errors * 10000) / total, then divide by 100.0
338        // This gives two decimal places of precision without casting u64 to f64
339        let percent_hundredths = self
340            .parse_errors
341            .saturating_mul(10000)
342            .checked_div(self.total_events)
343            .unwrap_or(0);
344        // Convert to f64 only after scaling down to a reasonable range
345        // percent_hundredths is at most 10000 (100% * 100), which fits precisely in f64
346        let scaled: u32 = u32::try_from(percent_hundredths)
347            .unwrap_or(u32::MAX)
348            .min(10000);
349        f64::from(scaled) / 100.0
350    }
351
352    /// Get the percentage of parse errors as a rounded integer.
353    ///
354    /// This is for display purposes where a whole number is sufficient.
355    pub fn parse_error_percentage_int(&self) -> u32 {
356        if self.total_events == 0 {
357            return 0;
358        }
359        // (errors * 100) / total gives us the integer percentage
360        self.parse_errors
361            .saturating_mul(100)
362            .checked_div(self.total_events)
363            .and_then(|v| u32::try_from(v).ok())
364            .unwrap_or(0)
365            .min(100)
366    }
367
368    /// Check if the parser health is concerning
369    ///
370    /// Only returns true if there are actual parse errors (malformed JSON),
371    /// not just unknown event types. Unknown events are valid JSON that we
372    /// don't have specific handling for, which is not a health concern.
373    pub fn is_concerning(&self) -> bool {
374        self.total_events > 10 && self.parse_error_percentage() > 50.0
375    }
376
377    /// Get a warning message if health is concerning
378    pub fn warning(&self, parser_name: &str, colors: Colors) -> Option<String> {
379        if !self.is_concerning() {
380            return None;
381        }
382
383        let msg = if self.unknown_events > 0 || self.control_events > 0 || self.partial_events > 0 {
384            format!(
385                "{}[Parser Health Warning]{} {} parser has {} parse errors ({}% of {} events). \
386                 Also encountered {} unknown event types (valid JSON but unhandled), \
387                 {} control events (state management), \
388                 and {} partial events (streaming deltas). \
389                 This may indicate a parser mismatch. Consider using a different json_parser in your agent config.",
390                colors.yellow(),
391                colors.reset(),
392                parser_name,
393                self.parse_errors,
394                self.parse_error_percentage_int(),
395                self.total_events,
396                self.unknown_events,
397                self.control_events,
398                self.partial_events
399            )
400        } else {
401            format!(
402                "{}[Parser Health Warning]{} {} parser has {} parse errors ({}% of {} events). \
403                 This may indicate malformed JSON output. Consider using a different json_parser in your agent config.",
404                colors.yellow(),
405                colors.reset(),
406                parser_name,
407                self.parse_errors,
408                self.parse_error_percentage_int(),
409                self.total_events
410            )
411        };
412
413        Some(msg)
414    }
415}
416
417/// A wrapper that monitors parser health and provides graceful degradation
418///
419/// This wraps any parser function to track how many events are being ignored
420/// and emit warnings when the parser seems to be misconfigured.
421pub struct HealthMonitor {
422    health: Cell<ParserHealth>,
423    parser_name: &'static str,
424    threshold_warned: Cell<bool>,
425}
426
427impl HealthMonitor {
428    /// Create a new health monitor for a parser
429    pub fn new(parser_name: &'static str) -> Self {
430        Self {
431            health: Cell::new(ParserHealth::new()),
432            parser_name,
433            threshold_warned: Cell::new(false),
434        }
435    }
436
437    /// Record that an event was parsed successfully
438    pub fn record_parsed(&self) {
439        let mut h = self.health.get();
440        h.record_parsed();
441        self.health.set(h);
442    }
443
444    /// Record that an event was ignored
445    pub fn record_ignored(&self) {
446        let mut h = self.health.get();
447        h.record_ignored();
448        self.health.set(h);
449    }
450
451    /// Record an unknown event type (valid JSON but unhandled)
452    pub fn record_unknown_event(&self) {
453        let mut h = self.health.get();
454        h.record_unknown_event();
455        self.health.set(h);
456    }
457
458    /// Record a parse error (malformed JSON)
459    pub fn record_parse_error(&self) {
460        let mut h = self.health.get();
461        h.record_parse_error();
462        self.health.set(h);
463    }
464
465    /// Record a control event (state management with no user-facing output)
466    pub fn record_control_event(&self) {
467        let mut h = self.health.get();
468        h.record_control_event();
469        self.health.set(h);
470    }
471
472    /// Record a partial/delta event (streaming content displayed incrementally)
473    ///
474    /// Partial events represent streaming content that is shown to the user
475    /// in real-time as deltas. These are NOT errors and should not trigger
476    /// health warnings.
477    pub fn record_partial_event(&self) {
478        let mut h = self.health.get();
479        h.record_partial_event();
480        self.health.set(h);
481    }
482
483    /// Check if we should warn about parser health (only warn once)
484    pub fn check_and_warn(&self, colors: Colors) -> Option<String> {
485        if self.threshold_warned.get() {
486            return None;
487        }
488
489        let health = self.health.get();
490        let warning = health.warning(self.parser_name, colors);
491        if warning.is_some() {
492            self.threshold_warned.set(true);
493        }
494        warning
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501
502    #[test]
503    fn test_parser_health_new() {
504        let health = ParserHealth::new();
505        assert_eq!(health.total_events, 0);
506        assert_eq!(health.parsed_events, 0);
507        assert_eq!(health.ignored_events, 0);
508    }
509
510    #[test]
511    fn test_parser_health_record_parsed() {
512        let mut health = ParserHealth::new();
513        health.record_parsed();
514        assert_eq!(health.total_events, 1);
515        assert_eq!(health.parsed_events, 1);
516        assert_eq!(health.ignored_events, 0);
517    }
518
519    #[test]
520    fn test_parser_health_record_ignored() {
521        let mut health = ParserHealth::new();
522        health.record_ignored();
523        assert_eq!(health.total_events, 1);
524        assert_eq!(health.parsed_events, 0);
525        assert_eq!(health.ignored_events, 1);
526    }
527
528    #[test]
529    fn test_parser_health_is_concerning() {
530        let mut health = ParserHealth::new();
531        // Not concerning with few events
532        for _ in 0..3 {
533            health.record_ignored();
534        }
535        assert!(!health.is_concerning());
536
537        // Unknown events should NOT trigger concerning state (they're valid JSON)
538        for _ in 0..20 {
539            health.record_unknown_event();
540        }
541        assert!(!health.is_concerning()); // Even with many unknown events, not concerning
542
543        // Only parse errors trigger concerning state
544        let mut health2 = ParserHealth::new();
545        for _ in 0..10 {
546            health2.record_parsed();
547        }
548        for _ in 0..15 {
549            health2.record_parse_error();
550        }
551        assert!(health2.is_concerning()); // 25 total, 60% parse errors
552
553        // Not concerning when most are parsed or unknown (but few parse errors)
554        let mut health3 = ParserHealth::new();
555        for _ in 0..15 {
556            health3.record_parsed();
557        }
558        for _ in 0..10 {
559            health3.record_unknown_event();
560        }
561        for _ in 0..2 {
562            health3.record_parse_error();
563        }
564        assert!(!health3.is_concerning()); // 27 total, only 7% parse errors
565    }
566
567    #[test]
568    fn test_parser_health_unknown_events() {
569        let mut health = ParserHealth::new();
570        assert_eq!(health.unknown_events, 0);
571
572        health.record_unknown_event();
573        health.record_unknown_event();
574        assert_eq!(health.unknown_events, 2);
575        assert_eq!(health.ignored_events, 2); // unknown counts as ignored
576        assert_eq!(health.parse_errors, 0); // but not as parse error
577
578        // Unknown events don't make it concerning
579        assert!(!health.is_concerning());
580    }
581
582    #[test]
583    fn test_health_monitor() {
584        let monitor = HealthMonitor::new("claude");
585
586        monitor.record_parsed();
587        monitor.record_parsed();
588        monitor.record_ignored();
589
590        let colors = Colors { enabled: false };
591        // Behavioral test: monitor should not warn for healthy parsing
592        assert!(monitor.check_and_warn(colors).is_none());
593
594        // Behavioral test: creating a new monitor gives fresh state (instead of reset)
595        let fresh_monitor = HealthMonitor::new("claude");
596        // Fresh monitor should not have warned yet
597        assert!(fresh_monitor.check_and_warn(colors).is_none());
598    }
599
600    #[test]
601    fn test_health_monitor_warns_once() {
602        let monitor = HealthMonitor::new("test");
603        let colors = Colors { enabled: false };
604
605        // Add enough parse errors to trigger warning (unknown events shouldn't trigger)
606        for _ in 0..15 {
607            monitor.record_parse_error();
608        }
609
610        let warning1 = monitor.check_and_warn(colors);
611        assert!(warning1.is_some());
612
613        let warning2 = monitor.check_and_warn(colors);
614        assert!(warning2.is_none()); // Already warned
615    }
616
617    #[test]
618    fn test_health_monitor_many_unknown_no_warning() {
619        let monitor = HealthMonitor::new("test");
620        let colors = Colors { enabled: false };
621
622        // Add many unknown events (simulating 97.5% unknown like the bug report)
623        for _ in 0..2049 {
624            monitor.record_unknown_event();
625        }
626        for _ in 0..53 {
627            monitor.record_parsed();
628        }
629
630        let warning = monitor.check_and_warn(colors);
631        assert!(warning.is_none()); // Should NOT warn even with 97.5% unknown events
632    }
633
634    #[test]
635    fn test_health_monitor_mixed_unknown_and_parse_errors() {
636        let monitor = HealthMonitor::new("test");
637        let colors = Colors { enabled: false };
638
639        // Mix of unknown and parse errors - only parse errors count for warning
640        for _ in 0..100 {
641            monitor.record_unknown_event();
642        }
643        for _ in 0..20 {
644            monitor.record_parse_error();
645        }
646        for _ in 0..20 {
647            monitor.record_parsed();
648        }
649
650        // 140 total events, 20 parse errors = ~14% (not concerning)
651        let warning = monitor.check_and_warn(colors);
652        assert!(warning.is_none());
653
654        // Add more parse errors to trigger warning
655        for _ in 0..30 {
656            monitor.record_parse_error();
657        }
658
659        // 170 total events, 50 parse errors = ~29% (still not concerning)
660        let warning = monitor.check_and_warn(colors);
661        assert!(warning.is_none());
662
663        // Add even more parse errors
664        for _ in 0..60 {
665            monitor.record_parse_error();
666        }
667
668        // 230 total events, 110 parse errors = ~48% (close to threshold)
669        let warning = monitor.check_and_warn(colors);
670        assert!(warning.is_none());
671
672        // Push it over 50%
673        for _ in 0..30 {
674            monitor.record_parse_error();
675        }
676
677        // 260 total events, 140 parse errors = ~54% (concerning!)
678        let warning = monitor.check_and_warn(colors);
679        assert!(warning.is_some());
680    }
681
682    #[test]
683    fn test_parser_health_parse_error_percentage() {
684        let mut health = ParserHealth::new();
685        assert!((health.parse_error_percentage() - 0.0).abs() < f64::EPSILON);
686
687        // Parse errors only
688        for _ in 0..5 {
689            health.record_parse_error();
690        }
691        assert!((health.parse_error_percentage() - 100.0).abs() < f64::EPSILON);
692
693        // Add parsed events
694        let mut health2 = ParserHealth::new();
695        for _ in 0..5 {
696            health2.record_parse_error();
697        }
698        for _ in 0..5 {
699            health2.record_parsed();
700        }
701        assert!((health2.parse_error_percentage() - 50.0).abs() < f64::EPSILON);
702
703        // Unknown events don't affect parse error percentage
704        let mut health3 = ParserHealth::new();
705        for _ in 0..5 {
706            health3.record_parse_error();
707        }
708        for _ in 0..10 {
709            health3.record_unknown_event();
710        }
711        for _ in 0..5 {
712            health3.record_parsed();
713        }
714        // 20 total, 5 parse errors = 25%
715        assert!((health3.parse_error_percentage() - 25.0).abs() < f64::EPSILON);
716    }
717
718    #[test]
719    fn test_parser_health_control_events() {
720        let mut health = ParserHealth::new();
721        assert_eq!(health.control_events, 0);
722
723        health.record_control_event();
724        health.record_control_event();
725        health.record_control_event();
726        assert_eq!(health.control_events, 3);
727        assert_eq!(health.total_events, 3);
728        // Control events do NOT count as ignored
729        assert_eq!(health.ignored_events, 0);
730        assert_eq!(health.unknown_events, 0);
731
732        // Control events don't make it concerning
733        assert!(!health.is_concerning());
734    }
735
736    #[test]
737    fn test_parser_health_control_events_with_other_types() {
738        let mut health = ParserHealth::new();
739
740        // Mix of control, parsed, and unknown events
741        for _ in 0..100 {
742            health.record_control_event();
743        }
744        for _ in 0..50 {
745            health.record_parsed();
746        }
747        for _ in 0..30 {
748            health.record_unknown_event();
749        }
750
751        // 180 total events
752        assert_eq!(health.total_events, 180);
753        assert_eq!(health.control_events, 100);
754        assert_eq!(health.parsed_events, 50);
755        assert_eq!(health.unknown_events, 30);
756        assert_eq!(health.ignored_events, 30); // only unknown counts as ignored
757
758        // Not concerning - no parse errors
759        assert!(!health.is_concerning());
760        assert!((health.parse_error_percentage() - 0.0).abs() < f64::EPSILON);
761    }
762
763    #[test]
764    fn test_health_monitor_control_events() {
765        let monitor = HealthMonitor::new("test");
766        let colors = Colors { enabled: false };
767
768        // Add many control events (like MessageStart, ContentBlockStart, etc.)
769        for _ in 0..2000 {
770            monitor.record_control_event();
771        }
772        // Add some parsed events
773        for _ in 0..50 {
774            monitor.record_parsed();
775        }
776
777        // Behavioral test: control events don't trigger warnings
778        // The monitor has many control events but few parsed events
779        let warning = monitor.check_and_warn(colors);
780        // Should NOT warn even with many "non-displayed" events
781        // because they're control events, not ignored/parse errors
782        assert!(warning.is_none());
783    }
784
785    #[test]
786    fn test_health_monitor_warning_includes_control_events() {
787        let monitor = HealthMonitor::new("test");
788        let colors = Colors { enabled: false };
789
790        // Add parse errors to trigger warning
791        for _ in 0..15 {
792            monitor.record_parse_error();
793        }
794        // Add some control events
795        for _ in 0..10 {
796            monitor.record_control_event();
797        }
798
799        let warning = monitor.check_and_warn(colors);
800        assert!(warning.is_some());
801
802        let warning_text = warning.unwrap();
803        // Warning should mention control events
804        assert!(warning_text.contains("10 control events"));
805    }
806
807    #[test]
808    fn test_parser_health_partial_events() {
809        let mut health = ParserHealth::new();
810        assert_eq!(health.partial_events, 0);
811
812        health.record_partial_event();
813        health.record_partial_event();
814        health.record_partial_event();
815        assert_eq!(health.partial_events, 3);
816        assert_eq!(health.total_events, 3);
817        // Partial events do NOT count as ignored
818        assert_eq!(health.ignored_events, 0);
819        assert_eq!(health.unknown_events, 0);
820
821        // Partial events don't make it concerning
822        assert!(!health.is_concerning());
823    }
824
825    #[test]
826    fn test_parser_health_partial_events_with_other_types() {
827        let mut health = ParserHealth::new();
828
829        // Mix of partial, control, parsed, and unknown events
830        for _ in 0..100 {
831            health.record_partial_event();
832        }
833        for _ in 0..50 {
834            health.record_control_event();
835        }
836        for _ in 0..30 {
837            health.record_parsed();
838        }
839        for _ in 0..20 {
840            health.record_unknown_event();
841        }
842
843        // 200 total events
844        assert_eq!(health.total_events, 200);
845        assert_eq!(health.partial_events, 100);
846        assert_eq!(health.control_events, 50);
847        assert_eq!(health.parsed_events, 30);
848        assert_eq!(health.unknown_events, 20);
849        assert_eq!(health.ignored_events, 20); // only unknown counts as ignored
850
851        // Not concerning - no parse errors
852        assert!(!health.is_concerning());
853        assert!((health.parse_error_percentage() - 0.0).abs() < f64::EPSILON);
854    }
855
856    #[test]
857    fn test_health_monitor_partial_events() {
858        let monitor = HealthMonitor::new("test");
859        let colors = Colors { enabled: false };
860
861        // Add many partial events (simulating streaming deltas)
862        for _ in 0..2049 {
863            monitor.record_partial_event();
864        }
865        // Add some parsed events
866        for _ in 0..53 {
867            monitor.record_parsed();
868        }
869
870        // Behavioral test: partial events don't trigger warnings
871        // The monitor has many partial events but few parsed events
872        let warning = monitor.check_and_warn(colors);
873        // Should NOT warn even with many "partial" events
874        // because partial events are valid streaming content, not errors
875        assert!(warning.is_none());
876    }
877
878    #[test]
879    fn test_health_monitor_warning_includes_partial_events() {
880        let monitor = HealthMonitor::new("test");
881        let colors = Colors { enabled: false };
882
883        // Add parse errors to trigger warning (need >50% of total)
884        for _ in 0..15 {
885            monitor.record_parse_error();
886        }
887        // Add some partial events (these don't count toward parse error %)
888        for _ in 0..10 {
889            monitor.record_partial_event();
890        }
891        // Add some control events (these also don't count toward parse error %)
892        for _ in 0..2 {
893            monitor.record_control_event();
894        }
895
896        let warning = monitor.check_and_warn(colors);
897        assert!(warning.is_some());
898
899        let warning_text = warning.unwrap();
900        // Warning should mention both control and partial events
901        assert!(warning_text.contains("2 control events"));
902        assert!(warning_text.contains("10 partial events"));
903    }
904
905    // Tests for StreamingQualityMetrics
906
907    #[test]
908    fn test_streaming_quality_metrics_empty() {
909        let metrics = StreamingQualityMetrics::from_sizes(std::iter::empty());
910        assert_eq!(metrics.total_deltas, 0);
911        assert_eq!(metrics.avg_delta_size, 0);
912        assert_eq!(metrics.min_delta_size, 0);
913        assert_eq!(metrics.max_delta_size, 0);
914        assert_eq!(metrics.pattern, StreamingPattern::Empty);
915    }
916
917    #[test]
918    fn test_streaming_quality_metrics_single_delta() {
919        let metrics = StreamingQualityMetrics::from_sizes([42].into_iter());
920        assert_eq!(metrics.total_deltas, 1);
921        assert_eq!(metrics.avg_delta_size, 42);
922        assert_eq!(metrics.min_delta_size, 42);
923        assert_eq!(metrics.max_delta_size, 42);
924        // Single delta defaults to Normal pattern
925        assert_eq!(metrics.pattern, StreamingPattern::Normal);
926    }
927
928    #[test]
929    fn test_streaming_quality_metrics_uniform_sizes() {
930        // All deltas same size - should be Smooth pattern
931        let sizes = vec![10, 10, 10, 10, 10];
932        let metrics = StreamingQualityMetrics::from_sizes(sizes.into_iter());
933        assert_eq!(metrics.total_deltas, 5);
934        assert_eq!(metrics.avg_delta_size, 10);
935        assert_eq!(metrics.min_delta_size, 10);
936        assert_eq!(metrics.max_delta_size, 10);
937        assert_eq!(metrics.pattern, StreamingPattern::Smooth);
938    }
939
940    #[test]
941    fn test_streaming_quality_metrics_varied_sizes() {
942        // Moderately varied sizes - should be Normal pattern
943        let sizes = vec![8, 10, 12, 9, 11];
944        let metrics = StreamingQualityMetrics::from_sizes(sizes.into_iter());
945        assert_eq!(metrics.total_deltas, 5);
946        assert_eq!(metrics.avg_delta_size, 10);
947        assert_eq!(metrics.min_delta_size, 8);
948        assert_eq!(metrics.max_delta_size, 12);
949        // Low variance, should be Smooth
950        assert_eq!(metrics.pattern, StreamingPattern::Smooth);
951    }
952
953    #[test]
954    fn test_streaming_quality_metrics_bursty() {
955        // Highly varied sizes - should be Bursty pattern
956        let sizes = vec![1, 100, 2, 200, 5];
957        let metrics = StreamingQualityMetrics::from_sizes(sizes.into_iter());
958        assert_eq!(metrics.total_deltas, 5);
959        assert_eq!(metrics.min_delta_size, 1);
960        assert_eq!(metrics.max_delta_size, 200);
961        assert_eq!(metrics.pattern, StreamingPattern::Bursty);
962    }
963
964    #[test]
965    fn test_streaming_quality_metrics_format() {
966        let sizes = vec![10, 20, 15];
967        let metrics = StreamingQualityMetrics::from_sizes(sizes.into_iter());
968        let colors = Colors { enabled: false };
969        let formatted = metrics.format(colors);
970
971        assert!(formatted.contains("3 deltas"));
972        assert!(formatted.contains("avg 15 bytes"));
973        assert!(formatted.contains("min 10"));
974        assert!(formatted.contains("max 20"));
975    }
976
977    #[test]
978    fn test_streaming_quality_metrics_format_empty() {
979        let metrics = StreamingQualityMetrics::from_sizes(std::iter::empty());
980        let colors = Colors { enabled: false };
981        let formatted = metrics.format(colors);
982
983        assert!(formatted.contains("No deltas recorded"));
984    }
985
986    #[test]
987    fn test_streaming_quality_metrics_format_with_snapshot_repairs() {
988        let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
989        metrics.snapshot_repairs_count = 2;
990        let colors = Colors { enabled: false };
991        let formatted = metrics.format(colors);
992
993        assert!(formatted.contains("3 deltas"));
994        assert!(formatted.contains("snapshot repairs: 2"));
995    }
996
997    #[test]
998    fn test_streaming_quality_metrics_format_with_large_deltas() {
999        let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1000        metrics.large_delta_count = 5;
1001        let colors = Colors { enabled: false };
1002        let formatted = metrics.format(colors);
1003
1004        assert!(formatted.contains("3 deltas"));
1005        assert!(formatted.contains("large deltas: 5"));
1006    }
1007
1008    #[test]
1009    fn test_streaming_quality_metrics_format_with_protocol_violations() {
1010        let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1011        metrics.protocol_violations = 1;
1012        let colors = Colors { enabled: false };
1013        let formatted = metrics.format(colors);
1014
1015        assert!(formatted.contains("3 deltas"));
1016        assert!(formatted.contains("protocol violations: 1"));
1017    }
1018
1019    #[test]
1020    fn test_streaming_quality_metrics_format_with_all_metrics() {
1021        let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1022        metrics.snapshot_repairs_count = 2;
1023        metrics.large_delta_count = 5;
1024        metrics.protocol_violations = 1;
1025        let colors = Colors { enabled: false };
1026        let formatted = metrics.format(colors);
1027
1028        assert!(formatted.contains("3 deltas"));
1029        assert!(formatted.contains("snapshot repairs: 2"));
1030        assert!(formatted.contains("large deltas: 5"));
1031        assert!(formatted.contains("protocol violations: 1"));
1032    }
1033
1034    #[test]
1035    fn test_streaming_quality_metrics_new_fields_zero_by_default() {
1036        let metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1037
1038        assert_eq!(metrics.snapshot_repairs_count, 0);
1039        assert_eq!(metrics.large_delta_count, 0);
1040        assert_eq!(metrics.protocol_violations, 0);
1041        assert_eq!(metrics.queue_depth, 0);
1042        assert_eq!(metrics.queue_dropped_events, 0);
1043        assert_eq!(metrics.queue_backpressure_count, 0);
1044    }
1045
1046    #[test]
1047    fn test_streaming_quality_metrics_queue_metrics() {
1048        let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1049
1050        // Set queue metrics
1051        metrics.queue_depth = 5;
1052        metrics.queue_dropped_events = 2;
1053        metrics.queue_backpressure_count = 10;
1054
1055        assert_eq!(metrics.queue_depth, 5);
1056        assert_eq!(metrics.queue_dropped_events, 2);
1057        assert_eq!(metrics.queue_backpressure_count, 10);
1058    }
1059
1060    #[test]
1061    fn test_streaming_quality_metrics_format_with_queue_metrics() {
1062        let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1063        metrics.queue_depth = 5;
1064        metrics.queue_dropped_events = 2;
1065        metrics.queue_backpressure_count = 10;
1066        let colors = Colors { enabled: false };
1067        let formatted = metrics.format(colors);
1068
1069        assert!(formatted.contains("queue:"));
1070        assert!(formatted.contains("depth: 5"));
1071        assert!(formatted.contains("dropped: 2"));
1072        assert!(formatted.contains("backpressure: 10"));
1073    }
1074
1075    #[test]
1076    fn test_streaming_quality_metrics_format_queue_depth_only() {
1077        let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1078        metrics.queue_depth = 3;
1079        let colors = Colors { enabled: false };
1080        let formatted = metrics.format(colors);
1081
1082        assert!(formatted.contains("queue: depth: 3"));
1083    }
1084
1085    #[test]
1086    fn test_streaming_quality_metrics_format_no_queue_metrics() {
1087        let metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1088        let colors = Colors { enabled: false };
1089        let formatted = metrics.format(colors);
1090
1091        // Should not mention queue when all queue metrics are zero
1092        assert!(!formatted.contains("queue:"));
1093    }
1094}