1use crate::logger::Colors;
44use std::cell::Cell;
45
46#[derive(Debug, Clone, Default)]
58pub struct StreamingQualityMetrics {
59 pub total_deltas: usize,
61 pub avg_delta_size: usize,
63 pub min_delta_size: usize,
65 pub max_delta_size: usize,
67 pub pattern: StreamingPattern,
69 pub snapshot_repairs_count: usize,
71 pub large_delta_count: usize,
73 pub protocol_violations: usize,
75 pub queue_depth: usize,
77 pub queue_dropped_events: usize,
79 pub queue_backpressure_count: usize,
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
85pub enum StreamingPattern {
86 #[default]
88 Empty,
89 Smooth,
91 Normal,
93 Bursty,
95}
96
97impl StreamingQualityMetrics {
98 pub fn from_sizes<I: Iterator<Item = usize>>(sizes: I) -> Self {
103 let sizes_vec: Vec<_> = sizes.collect();
104
105 if sizes_vec.is_empty() {
106 return Self::default();
107 }
108
109 let total_deltas = sizes_vec.len();
110 let min_delta_size = sizes_vec.iter().copied().min().unwrap_or(0);
111 let max_delta_size = sizes_vec.iter().copied().max().unwrap_or(0);
112 let sum: usize = sizes_vec.iter().sum();
113 let avg_delta_size = sum / total_deltas;
114
115 let pattern = if total_deltas < 2 {
118 StreamingPattern::Normal
119 } else {
120 let mean_u32 = u32::try_from(avg_delta_size).unwrap_or(u32::MAX);
122 let mean = f64::from(mean_u32);
123 if mean < 0.001 {
124 StreamingPattern::Empty
125 } else {
126 let variance_sum: usize = sizes_vec
128 .iter()
129 .map(|&size| {
130 let diff = size.abs_diff(avg_delta_size);
131 diff.saturating_mul(diff)
132 })
133 .sum();
134 let variance = variance_sum / total_deltas;
135 let variance_u32 = u32::try_from(variance).unwrap_or(u32::MAX);
137 let std_dev = f64::from(variance_u32).sqrt();
138 let cv = std_dev / mean;
139
140 if cv < 0.3 {
142 StreamingPattern::Smooth
143 } else if cv < 1.0 {
144 StreamingPattern::Normal
145 } else {
146 StreamingPattern::Bursty
147 }
148 }
149 };
150
151 Self {
152 total_deltas,
153 avg_delta_size,
154 min_delta_size,
155 max_delta_size,
156 pattern,
157 snapshot_repairs_count: 0,
158 large_delta_count: 0,
159 protocol_violations: 0,
160 queue_depth: 0,
161 queue_dropped_events: 0,
162 queue_backpressure_count: 0,
163 }
164 }
165
166 pub fn format(&self, colors: Colors) -> String {
168 if self.total_deltas == 0 {
169 return format!(
170 "{}[Streaming]{} No deltas recorded",
171 colors.dim(),
172 colors.reset()
173 );
174 }
175
176 let pattern_str = match self.pattern {
177 StreamingPattern::Empty => "empty",
178 StreamingPattern::Smooth => "smooth",
179 StreamingPattern::Normal => "normal",
180 StreamingPattern::Bursty => "bursty",
181 };
182
183 let mut parts = vec![format!(
184 "{}[Streaming]{} {} deltas, avg {} bytes (min {}, max {}), pattern: {}",
185 colors.dim(),
186 colors.reset(),
187 self.total_deltas,
188 self.avg_delta_size,
189 self.min_delta_size,
190 self.max_delta_size,
191 pattern_str
192 )];
193
194 if self.snapshot_repairs_count > 0 {
195 parts.push(format!(
196 "{}snapshot repairs: {}{}",
197 colors.yellow(),
198 self.snapshot_repairs_count,
199 colors.reset()
200 ));
201 }
202
203 if self.large_delta_count > 0 {
204 parts.push(format!(
205 "{}large deltas: {}{}",
206 colors.yellow(),
207 self.large_delta_count,
208 colors.reset()
209 ));
210 }
211
212 if self.protocol_violations > 0 {
213 parts.push(format!(
214 "{}protocol violations: {}{}",
215 colors.red(),
216 self.protocol_violations,
217 colors.reset()
218 ));
219 }
220
221 if self.queue_depth > 0
223 || self.queue_dropped_events > 0
224 || self.queue_backpressure_count > 0
225 {
226 let mut queue_parts = Vec::new();
227 if self.queue_depth > 0 {
228 queue_parts.push(format!("depth: {}", self.queue_depth));
229 }
230 if self.queue_dropped_events > 0 {
231 queue_parts.push(format!(
232 "{}dropped: {}{}",
233 colors.yellow(),
234 self.queue_dropped_events,
235 colors.reset()
236 ));
237 }
238 if self.queue_backpressure_count > 0 {
239 queue_parts.push(format!(
240 "{}backpressure: {}{}",
241 colors.yellow(),
242 self.queue_backpressure_count,
243 colors.reset()
244 ));
245 }
246 if !queue_parts.is_empty() {
247 parts.push(format!("queue: {}", queue_parts.join(", ")));
248 }
249 }
250
251 parts.join(", ")
252 }
253}
254
255#[derive(Debug, Default, Clone, Copy)]
257pub struct ParserHealth {
258 pub total_events: u64,
260 pub parsed_events: u64,
262 pub partial_events: u64,
264 pub ignored_events: u64,
266 pub control_events: u64,
268 pub unknown_events: u64,
270 pub parse_errors: u64,
272}
273
274impl ParserHealth {
275 pub fn new() -> Self {
277 Self::default()
278 }
279
280 pub const fn record_parsed(&mut self) {
282 self.total_events += 1;
283 self.parsed_events += 1;
284 }
285
286 pub const fn record_ignored(&mut self) {
288 self.total_events += 1;
289 self.ignored_events += 1;
290 }
291
292 pub const fn record_unknown_event(&mut self) {
298 self.total_events += 1;
299 self.unknown_events += 1;
300 self.ignored_events += 1;
301 }
302
303 pub const fn record_parse_error(&mut self) {
305 self.total_events += 1;
306 self.parse_errors += 1;
307 self.ignored_events += 1;
308 }
309
310 pub const fn record_control_event(&mut self) {
316 self.total_events += 1;
317 self.control_events += 1;
318 }
319
320 pub const fn record_partial_event(&mut self) {
326 self.total_events += 1;
327 self.partial_events += 1;
328 }
329
330 pub fn parse_error_percentage(&self) -> f64 {
334 if self.total_events == 0 {
335 return 0.0;
336 }
337 let percent_hundredths = self
340 .parse_errors
341 .saturating_mul(10000)
342 .checked_div(self.total_events)
343 .unwrap_or(0);
344 let scaled: u32 = u32::try_from(percent_hundredths)
347 .unwrap_or(u32::MAX)
348 .min(10000);
349 f64::from(scaled) / 100.0
350 }
351
352 pub fn parse_error_percentage_int(&self) -> u32 {
356 if self.total_events == 0 {
357 return 0;
358 }
359 self.parse_errors
361 .saturating_mul(100)
362 .checked_div(self.total_events)
363 .and_then(|v| u32::try_from(v).ok())
364 .unwrap_or(0)
365 .min(100)
366 }
367
368 pub fn is_concerning(&self) -> bool {
374 self.total_events > 10 && self.parse_error_percentage() > 50.0
375 }
376
377 pub fn warning(&self, parser_name: &str, colors: Colors) -> Option<String> {
379 if !self.is_concerning() {
380 return None;
381 }
382
383 let msg = if self.unknown_events > 0 || self.control_events > 0 || self.partial_events > 0 {
384 format!(
385 "{}[Parser Health Warning]{} {} parser has {} parse errors ({}% of {} events). \
386 Also encountered {} unknown event types (valid JSON but unhandled), \
387 {} control events (state management), \
388 and {} partial events (streaming deltas). \
389 This may indicate a parser mismatch. Consider using a different json_parser in your agent config.",
390 colors.yellow(),
391 colors.reset(),
392 parser_name,
393 self.parse_errors,
394 self.parse_error_percentage_int(),
395 self.total_events,
396 self.unknown_events,
397 self.control_events,
398 self.partial_events
399 )
400 } else {
401 format!(
402 "{}[Parser Health Warning]{} {} parser has {} parse errors ({}% of {} events). \
403 This may indicate malformed JSON output. Consider using a different json_parser in your agent config.",
404 colors.yellow(),
405 colors.reset(),
406 parser_name,
407 self.parse_errors,
408 self.parse_error_percentage_int(),
409 self.total_events
410 )
411 };
412
413 Some(msg)
414 }
415}
416
417pub struct HealthMonitor {
422 health: Cell<ParserHealth>,
423 parser_name: &'static str,
424 threshold_warned: Cell<bool>,
425}
426
427impl HealthMonitor {
428 pub fn new(parser_name: &'static str) -> Self {
430 Self {
431 health: Cell::new(ParserHealth::new()),
432 parser_name,
433 threshold_warned: Cell::new(false),
434 }
435 }
436
437 pub fn record_parsed(&self) {
439 let mut h = self.health.get();
440 h.record_parsed();
441 self.health.set(h);
442 }
443
444 pub fn record_ignored(&self) {
446 let mut h = self.health.get();
447 h.record_ignored();
448 self.health.set(h);
449 }
450
451 pub fn record_unknown_event(&self) {
453 let mut h = self.health.get();
454 h.record_unknown_event();
455 self.health.set(h);
456 }
457
458 pub fn record_parse_error(&self) {
460 let mut h = self.health.get();
461 h.record_parse_error();
462 self.health.set(h);
463 }
464
465 pub fn record_control_event(&self) {
467 let mut h = self.health.get();
468 h.record_control_event();
469 self.health.set(h);
470 }
471
472 pub fn record_partial_event(&self) {
478 let mut h = self.health.get();
479 h.record_partial_event();
480 self.health.set(h);
481 }
482
483 pub fn check_and_warn(&self, colors: Colors) -> Option<String> {
485 if self.threshold_warned.get() {
486 return None;
487 }
488
489 let health = self.health.get();
490 let warning = health.warning(self.parser_name, colors);
491 if warning.is_some() {
492 self.threshold_warned.set(true);
493 }
494 warning
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501
502 #[test]
503 fn test_parser_health_new() {
504 let health = ParserHealth::new();
505 assert_eq!(health.total_events, 0);
506 assert_eq!(health.parsed_events, 0);
507 assert_eq!(health.ignored_events, 0);
508 }
509
510 #[test]
511 fn test_parser_health_record_parsed() {
512 let mut health = ParserHealth::new();
513 health.record_parsed();
514 assert_eq!(health.total_events, 1);
515 assert_eq!(health.parsed_events, 1);
516 assert_eq!(health.ignored_events, 0);
517 }
518
519 #[test]
520 fn test_parser_health_record_ignored() {
521 let mut health = ParserHealth::new();
522 health.record_ignored();
523 assert_eq!(health.total_events, 1);
524 assert_eq!(health.parsed_events, 0);
525 assert_eq!(health.ignored_events, 1);
526 }
527
528 #[test]
529 fn test_parser_health_is_concerning() {
530 let mut health = ParserHealth::new();
531 for _ in 0..3 {
533 health.record_ignored();
534 }
535 assert!(!health.is_concerning());
536
537 for _ in 0..20 {
539 health.record_unknown_event();
540 }
541 assert!(!health.is_concerning()); let mut health2 = ParserHealth::new();
545 for _ in 0..10 {
546 health2.record_parsed();
547 }
548 for _ in 0..15 {
549 health2.record_parse_error();
550 }
551 assert!(health2.is_concerning()); let mut health3 = ParserHealth::new();
555 for _ in 0..15 {
556 health3.record_parsed();
557 }
558 for _ in 0..10 {
559 health3.record_unknown_event();
560 }
561 for _ in 0..2 {
562 health3.record_parse_error();
563 }
564 assert!(!health3.is_concerning()); }
566
567 #[test]
568 fn test_parser_health_unknown_events() {
569 let mut health = ParserHealth::new();
570 assert_eq!(health.unknown_events, 0);
571
572 health.record_unknown_event();
573 health.record_unknown_event();
574 assert_eq!(health.unknown_events, 2);
575 assert_eq!(health.ignored_events, 2); assert_eq!(health.parse_errors, 0); assert!(!health.is_concerning());
580 }
581
582 #[test]
583 fn test_health_monitor() {
584 let monitor = HealthMonitor::new("claude");
585
586 monitor.record_parsed();
587 monitor.record_parsed();
588 monitor.record_ignored();
589
590 let colors = Colors { enabled: false };
591 assert!(monitor.check_and_warn(colors).is_none());
593
594 let fresh_monitor = HealthMonitor::new("claude");
596 assert!(fresh_monitor.check_and_warn(colors).is_none());
598 }
599
600 #[test]
601 fn test_health_monitor_warns_once() {
602 let monitor = HealthMonitor::new("test");
603 let colors = Colors { enabled: false };
604
605 for _ in 0..15 {
607 monitor.record_parse_error();
608 }
609
610 let warning1 = monitor.check_and_warn(colors);
611 assert!(warning1.is_some());
612
613 let warning2 = monitor.check_and_warn(colors);
614 assert!(warning2.is_none()); }
616
617 #[test]
618 fn test_health_monitor_many_unknown_no_warning() {
619 let monitor = HealthMonitor::new("test");
620 let colors = Colors { enabled: false };
621
622 for _ in 0..2049 {
624 monitor.record_unknown_event();
625 }
626 for _ in 0..53 {
627 monitor.record_parsed();
628 }
629
630 let warning = monitor.check_and_warn(colors);
631 assert!(warning.is_none()); }
633
634 #[test]
635 fn test_health_monitor_mixed_unknown_and_parse_errors() {
636 let monitor = HealthMonitor::new("test");
637 let colors = Colors { enabled: false };
638
639 for _ in 0..100 {
641 monitor.record_unknown_event();
642 }
643 for _ in 0..20 {
644 monitor.record_parse_error();
645 }
646 for _ in 0..20 {
647 monitor.record_parsed();
648 }
649
650 let warning = monitor.check_and_warn(colors);
652 assert!(warning.is_none());
653
654 for _ in 0..30 {
656 monitor.record_parse_error();
657 }
658
659 let warning = monitor.check_and_warn(colors);
661 assert!(warning.is_none());
662
663 for _ in 0..60 {
665 monitor.record_parse_error();
666 }
667
668 let warning = monitor.check_and_warn(colors);
670 assert!(warning.is_none());
671
672 for _ in 0..30 {
674 monitor.record_parse_error();
675 }
676
677 let warning = monitor.check_and_warn(colors);
679 assert!(warning.is_some());
680 }
681
682 #[test]
683 fn test_parser_health_parse_error_percentage() {
684 let mut health = ParserHealth::new();
685 assert!((health.parse_error_percentage() - 0.0).abs() < f64::EPSILON);
686
687 for _ in 0..5 {
689 health.record_parse_error();
690 }
691 assert!((health.parse_error_percentage() - 100.0).abs() < f64::EPSILON);
692
693 let mut health2 = ParserHealth::new();
695 for _ in 0..5 {
696 health2.record_parse_error();
697 }
698 for _ in 0..5 {
699 health2.record_parsed();
700 }
701 assert!((health2.parse_error_percentage() - 50.0).abs() < f64::EPSILON);
702
703 let mut health3 = ParserHealth::new();
705 for _ in 0..5 {
706 health3.record_parse_error();
707 }
708 for _ in 0..10 {
709 health3.record_unknown_event();
710 }
711 for _ in 0..5 {
712 health3.record_parsed();
713 }
714 assert!((health3.parse_error_percentage() - 25.0).abs() < f64::EPSILON);
716 }
717
718 #[test]
719 fn test_parser_health_control_events() {
720 let mut health = ParserHealth::new();
721 assert_eq!(health.control_events, 0);
722
723 health.record_control_event();
724 health.record_control_event();
725 health.record_control_event();
726 assert_eq!(health.control_events, 3);
727 assert_eq!(health.total_events, 3);
728 assert_eq!(health.ignored_events, 0);
730 assert_eq!(health.unknown_events, 0);
731
732 assert!(!health.is_concerning());
734 }
735
736 #[test]
737 fn test_parser_health_control_events_with_other_types() {
738 let mut health = ParserHealth::new();
739
740 for _ in 0..100 {
742 health.record_control_event();
743 }
744 for _ in 0..50 {
745 health.record_parsed();
746 }
747 for _ in 0..30 {
748 health.record_unknown_event();
749 }
750
751 assert_eq!(health.total_events, 180);
753 assert_eq!(health.control_events, 100);
754 assert_eq!(health.parsed_events, 50);
755 assert_eq!(health.unknown_events, 30);
756 assert_eq!(health.ignored_events, 30); assert!(!health.is_concerning());
760 assert!((health.parse_error_percentage() - 0.0).abs() < f64::EPSILON);
761 }
762
763 #[test]
764 fn test_health_monitor_control_events() {
765 let monitor = HealthMonitor::new("test");
766 let colors = Colors { enabled: false };
767
768 for _ in 0..2000 {
770 monitor.record_control_event();
771 }
772 for _ in 0..50 {
774 monitor.record_parsed();
775 }
776
777 let warning = monitor.check_and_warn(colors);
780 assert!(warning.is_none());
783 }
784
785 #[test]
786 fn test_health_monitor_warning_includes_control_events() {
787 let monitor = HealthMonitor::new("test");
788 let colors = Colors { enabled: false };
789
790 for _ in 0..15 {
792 monitor.record_parse_error();
793 }
794 for _ in 0..10 {
796 monitor.record_control_event();
797 }
798
799 let warning = monitor.check_and_warn(colors);
800 assert!(warning.is_some());
801
802 let warning_text = warning.unwrap();
803 assert!(warning_text.contains("10 control events"));
805 }
806
807 #[test]
808 fn test_parser_health_partial_events() {
809 let mut health = ParserHealth::new();
810 assert_eq!(health.partial_events, 0);
811
812 health.record_partial_event();
813 health.record_partial_event();
814 health.record_partial_event();
815 assert_eq!(health.partial_events, 3);
816 assert_eq!(health.total_events, 3);
817 assert_eq!(health.ignored_events, 0);
819 assert_eq!(health.unknown_events, 0);
820
821 assert!(!health.is_concerning());
823 }
824
825 #[test]
826 fn test_parser_health_partial_events_with_other_types() {
827 let mut health = ParserHealth::new();
828
829 for _ in 0..100 {
831 health.record_partial_event();
832 }
833 for _ in 0..50 {
834 health.record_control_event();
835 }
836 for _ in 0..30 {
837 health.record_parsed();
838 }
839 for _ in 0..20 {
840 health.record_unknown_event();
841 }
842
843 assert_eq!(health.total_events, 200);
845 assert_eq!(health.partial_events, 100);
846 assert_eq!(health.control_events, 50);
847 assert_eq!(health.parsed_events, 30);
848 assert_eq!(health.unknown_events, 20);
849 assert_eq!(health.ignored_events, 20); assert!(!health.is_concerning());
853 assert!((health.parse_error_percentage() - 0.0).abs() < f64::EPSILON);
854 }
855
856 #[test]
857 fn test_health_monitor_partial_events() {
858 let monitor = HealthMonitor::new("test");
859 let colors = Colors { enabled: false };
860
861 for _ in 0..2049 {
863 monitor.record_partial_event();
864 }
865 for _ in 0..53 {
867 monitor.record_parsed();
868 }
869
870 let warning = monitor.check_and_warn(colors);
873 assert!(warning.is_none());
876 }
877
878 #[test]
879 fn test_health_monitor_warning_includes_partial_events() {
880 let monitor = HealthMonitor::new("test");
881 let colors = Colors { enabled: false };
882
883 for _ in 0..15 {
885 monitor.record_parse_error();
886 }
887 for _ in 0..10 {
889 monitor.record_partial_event();
890 }
891 for _ in 0..2 {
893 monitor.record_control_event();
894 }
895
896 let warning = monitor.check_and_warn(colors);
897 assert!(warning.is_some());
898
899 let warning_text = warning.unwrap();
900 assert!(warning_text.contains("2 control events"));
902 assert!(warning_text.contains("10 partial events"));
903 }
904
905 #[test]
908 fn test_streaming_quality_metrics_empty() {
909 let metrics = StreamingQualityMetrics::from_sizes(std::iter::empty());
910 assert_eq!(metrics.total_deltas, 0);
911 assert_eq!(metrics.avg_delta_size, 0);
912 assert_eq!(metrics.min_delta_size, 0);
913 assert_eq!(metrics.max_delta_size, 0);
914 assert_eq!(metrics.pattern, StreamingPattern::Empty);
915 }
916
917 #[test]
918 fn test_streaming_quality_metrics_single_delta() {
919 let metrics = StreamingQualityMetrics::from_sizes([42].into_iter());
920 assert_eq!(metrics.total_deltas, 1);
921 assert_eq!(metrics.avg_delta_size, 42);
922 assert_eq!(metrics.min_delta_size, 42);
923 assert_eq!(metrics.max_delta_size, 42);
924 assert_eq!(metrics.pattern, StreamingPattern::Normal);
926 }
927
928 #[test]
929 fn test_streaming_quality_metrics_uniform_sizes() {
930 let sizes = vec![10, 10, 10, 10, 10];
932 let metrics = StreamingQualityMetrics::from_sizes(sizes.into_iter());
933 assert_eq!(metrics.total_deltas, 5);
934 assert_eq!(metrics.avg_delta_size, 10);
935 assert_eq!(metrics.min_delta_size, 10);
936 assert_eq!(metrics.max_delta_size, 10);
937 assert_eq!(metrics.pattern, StreamingPattern::Smooth);
938 }
939
940 #[test]
941 fn test_streaming_quality_metrics_varied_sizes() {
942 let sizes = vec![8, 10, 12, 9, 11];
944 let metrics = StreamingQualityMetrics::from_sizes(sizes.into_iter());
945 assert_eq!(metrics.total_deltas, 5);
946 assert_eq!(metrics.avg_delta_size, 10);
947 assert_eq!(metrics.min_delta_size, 8);
948 assert_eq!(metrics.max_delta_size, 12);
949 assert_eq!(metrics.pattern, StreamingPattern::Smooth);
951 }
952
953 #[test]
954 fn test_streaming_quality_metrics_bursty() {
955 let sizes = vec![1, 100, 2, 200, 5];
957 let metrics = StreamingQualityMetrics::from_sizes(sizes.into_iter());
958 assert_eq!(metrics.total_deltas, 5);
959 assert_eq!(metrics.min_delta_size, 1);
960 assert_eq!(metrics.max_delta_size, 200);
961 assert_eq!(metrics.pattern, StreamingPattern::Bursty);
962 }
963
964 #[test]
965 fn test_streaming_quality_metrics_format() {
966 let sizes = vec![10, 20, 15];
967 let metrics = StreamingQualityMetrics::from_sizes(sizes.into_iter());
968 let colors = Colors { enabled: false };
969 let formatted = metrics.format(colors);
970
971 assert!(formatted.contains("3 deltas"));
972 assert!(formatted.contains("avg 15 bytes"));
973 assert!(formatted.contains("min 10"));
974 assert!(formatted.contains("max 20"));
975 }
976
977 #[test]
978 fn test_streaming_quality_metrics_format_empty() {
979 let metrics = StreamingQualityMetrics::from_sizes(std::iter::empty());
980 let colors = Colors { enabled: false };
981 let formatted = metrics.format(colors);
982
983 assert!(formatted.contains("No deltas recorded"));
984 }
985
986 #[test]
987 fn test_streaming_quality_metrics_format_with_snapshot_repairs() {
988 let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
989 metrics.snapshot_repairs_count = 2;
990 let colors = Colors { enabled: false };
991 let formatted = metrics.format(colors);
992
993 assert!(formatted.contains("3 deltas"));
994 assert!(formatted.contains("snapshot repairs: 2"));
995 }
996
997 #[test]
998 fn test_streaming_quality_metrics_format_with_large_deltas() {
999 let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1000 metrics.large_delta_count = 5;
1001 let colors = Colors { enabled: false };
1002 let formatted = metrics.format(colors);
1003
1004 assert!(formatted.contains("3 deltas"));
1005 assert!(formatted.contains("large deltas: 5"));
1006 }
1007
1008 #[test]
1009 fn test_streaming_quality_metrics_format_with_protocol_violations() {
1010 let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1011 metrics.protocol_violations = 1;
1012 let colors = Colors { enabled: false };
1013 let formatted = metrics.format(colors);
1014
1015 assert!(formatted.contains("3 deltas"));
1016 assert!(formatted.contains("protocol violations: 1"));
1017 }
1018
1019 #[test]
1020 fn test_streaming_quality_metrics_format_with_all_metrics() {
1021 let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1022 metrics.snapshot_repairs_count = 2;
1023 metrics.large_delta_count = 5;
1024 metrics.protocol_violations = 1;
1025 let colors = Colors { enabled: false };
1026 let formatted = metrics.format(colors);
1027
1028 assert!(formatted.contains("3 deltas"));
1029 assert!(formatted.contains("snapshot repairs: 2"));
1030 assert!(formatted.contains("large deltas: 5"));
1031 assert!(formatted.contains("protocol violations: 1"));
1032 }
1033
1034 #[test]
1035 fn test_streaming_quality_metrics_new_fields_zero_by_default() {
1036 let metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1037
1038 assert_eq!(metrics.snapshot_repairs_count, 0);
1039 assert_eq!(metrics.large_delta_count, 0);
1040 assert_eq!(metrics.protocol_violations, 0);
1041 assert_eq!(metrics.queue_depth, 0);
1042 assert_eq!(metrics.queue_dropped_events, 0);
1043 assert_eq!(metrics.queue_backpressure_count, 0);
1044 }
1045
1046 #[test]
1047 fn test_streaming_quality_metrics_queue_metrics() {
1048 let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1049
1050 metrics.queue_depth = 5;
1052 metrics.queue_dropped_events = 2;
1053 metrics.queue_backpressure_count = 10;
1054
1055 assert_eq!(metrics.queue_depth, 5);
1056 assert_eq!(metrics.queue_dropped_events, 2);
1057 assert_eq!(metrics.queue_backpressure_count, 10);
1058 }
1059
1060 #[test]
1061 fn test_streaming_quality_metrics_format_with_queue_metrics() {
1062 let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1063 metrics.queue_depth = 5;
1064 metrics.queue_dropped_events = 2;
1065 metrics.queue_backpressure_count = 10;
1066 let colors = Colors { enabled: false };
1067 let formatted = metrics.format(colors);
1068
1069 assert!(formatted.contains("queue:"));
1070 assert!(formatted.contains("depth: 5"));
1071 assert!(formatted.contains("dropped: 2"));
1072 assert!(formatted.contains("backpressure: 10"));
1073 }
1074
1075 #[test]
1076 fn test_streaming_quality_metrics_format_queue_depth_only() {
1077 let mut metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1078 metrics.queue_depth = 3;
1079 let colors = Colors { enabled: false };
1080 let formatted = metrics.format(colors);
1081
1082 assert!(formatted.contains("queue: depth: 3"));
1083 }
1084
1085 #[test]
1086 fn test_streaming_quality_metrics_format_no_queue_metrics() {
1087 let metrics = StreamingQualityMetrics::from_sizes([10, 20, 15].into_iter());
1088 let colors = Colors { enabled: false };
1089 let formatted = metrics.format(colors);
1090
1091 assert!(!formatted.contains("queue:"));
1093 }
1094}