1use crate::error::StreamError;
12use dashmap::DashMap;
13use std::sync::Arc;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
17pub enum HealthStatus {
18 Healthy,
20 Stale,
22 Unknown,
24}
25
26#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28pub struct FeedHealth {
29 pub feed_id: String,
31 pub status: HealthStatus,
33 pub last_tick_ms: Option<u64>,
35 pub stale_threshold_ms: u64,
37 pub tick_count: u64,
39 pub consecutive_stale: u32,
41}
42
43impl FeedHealth {
44 pub fn elapsed_ms(&self, now_ms: u64) -> Option<u64> {
46 self.last_tick_ms.map(|t| now_ms.saturating_sub(t))
47 }
48
49 pub fn is_healthy(&self) -> bool {
51 self.status == HealthStatus::Healthy
52 }
53
54 pub fn is_stale(&self) -> bool {
56 self.status == HealthStatus::Stale
57 }
58}
59
60pub struct HealthMonitor {
62 feeds: Arc<DashMap<String, FeedHealth>>,
63 default_stale_threshold_ms: u64,
64 circuit_breaker_threshold: u32,
66}
67
68impl HealthMonitor {
69 pub fn new(default_stale_threshold_ms: u64) -> Self {
74 Self {
75 feeds: Arc::new(DashMap::new()),
76 default_stale_threshold_ms,
77 circuit_breaker_threshold: 3,
78 }
79 }
80
81 pub fn with_circuit_breaker_threshold(mut self, threshold: u32) -> Self {
84 self.circuit_breaker_threshold = threshold;
85 self
86 }
87
88 pub fn is_circuit_open(&self, feed_id: &str) -> bool {
91 if self.circuit_breaker_threshold == 0 {
92 return false;
93 }
94 self.feeds
95 .get(feed_id)
96 .map_or(false, |e| e.consecutive_stale >= self.circuit_breaker_threshold)
97 }
98
99 pub fn register_many(&self, ids: &[&str], stale_threshold_ms: Option<u64>) {
105 for id in ids {
106 self.register(*id, stale_threshold_ms);
107 }
108 }
109
110 pub fn register(&self, feed_id: impl Into<String>, stale_threshold_ms: Option<u64>) {
112 let id = feed_id.into();
113 let threshold = stale_threshold_ms.unwrap_or(self.default_stale_threshold_ms);
114 self.feeds.insert(
115 id.clone(),
116 FeedHealth {
117 feed_id: id,
118 status: HealthStatus::Unknown,
119 last_tick_ms: None,
120 stale_threshold_ms: threshold,
121 tick_count: 0,
122 consecutive_stale: 0,
123 },
124 );
125 }
126
127 pub fn deregister(&self, feed_id: &str) -> Option<FeedHealth> {
132 self.feeds.remove(feed_id).map(|(_, v)| v)
133 }
134
135 pub fn heartbeat(&self, feed_id: &str, ts_ms: u64) -> Result<(), StreamError> {
142 let mut entry = self
143 .feeds
144 .get_mut(feed_id)
145 .ok_or_else(|| StreamError::UnknownFeed {
146 feed_id: feed_id.to_string(),
147 })?;
148 entry.last_tick_ms = Some(ts_ms);
149 entry.tick_count += 1;
150 entry.status = HealthStatus::Healthy;
151 entry.consecutive_stale = 0;
152 Ok(())
153 }
154
155 pub fn check_all(&self, now_ms: u64) -> Vec<(String, StreamError)> {
161 let mut errors = Vec::new();
162 for mut entry in self.feeds.iter_mut() {
163 let elapsed = entry.elapsed_ms(now_ms);
164 if let Some(elapsed) = elapsed {
165 if elapsed > entry.stale_threshold_ms {
166 entry.status = HealthStatus::Stale;
167 entry.consecutive_stale += 1;
168 let feed_id = entry.feed_id.clone();
169 errors.push((
170 feed_id.clone(),
171 StreamError::StaleFeed {
172 feed_id,
173 elapsed_ms: elapsed,
174 threshold_ms: entry.stale_threshold_ms,
175 },
176 ));
177 }
178 }
179 }
180 errors
181 }
182
183 pub fn get(&self, feed_id: &str) -> Option<FeedHealth> {
185 self.feeds.get(feed_id).map(|e| e.value().clone())
186 }
187
188 pub fn all_feeds(&self) -> Vec<FeedHealth> {
190 self.feeds.iter().map(|e| e.value().clone()).collect()
191 }
192
193 pub fn feed_count(&self) -> usize {
195 self.feeds.len()
196 }
197
198 pub fn check_one(
204 &self,
205 feed_id: &str,
206 now_ms: u64,
207 ) -> Result<Option<StreamError>, StreamError> {
208 let mut entry = self
209 .feeds
210 .get_mut(feed_id)
211 .ok_or_else(|| StreamError::UnknownFeed {
212 feed_id: feed_id.to_string(),
213 })?;
214 let elapsed = match entry.last_tick_ms {
215 Some(t) => now_ms.saturating_sub(t),
216 None => return Ok(None),
217 };
218 if elapsed > entry.stale_threshold_ms {
219 entry.status = HealthStatus::Stale;
220 entry.consecutive_stale += 1;
221 Ok(Some(StreamError::StaleFeed {
222 feed_id: entry.feed_id.clone(),
223 elapsed_ms: elapsed,
224 threshold_ms: entry.stale_threshold_ms,
225 }))
226 } else {
227 Ok(None)
228 }
229 }
230
231 pub fn reset_feed(&self, feed_id: &str) -> Result<(), StreamError> {
241 let mut entry = self
242 .feeds
243 .get_mut(feed_id)
244 .ok_or_else(|| StreamError::UnknownFeed {
245 feed_id: feed_id.to_string(),
246 })?;
247 entry.status = HealthStatus::Unknown;
248 entry.last_tick_ms = None;
249 entry.tick_count = 0;
250 entry.consecutive_stale = 0;
251 Ok(())
252 }
253
254 pub fn feed_ids(&self) -> Vec<String> {
256 let mut ids: Vec<String> = self.feeds.iter().map(|e| e.feed_id.clone()).collect();
257 ids.sort();
258 ids
259 }
260
261 pub fn healthy_count(&self) -> usize {
263 self.feeds_by_status(HealthStatus::Healthy).len()
264 }
265
266 pub fn stale_ratio(&self) -> f64 {
270 let total = self.feed_count();
271 if total == 0 {
272 return 0.0;
273 }
274 self.stale_count() as f64 / total as f64
275 }
276
277 pub fn is_any_stale(&self) -> bool {
279 self.stale_count() > 0
280 }
281
282 pub fn stale_count(&self) -> usize {
284 self.feeds_by_status(HealthStatus::Stale).len()
285 }
286
287 pub fn stale_feeds(&self) -> Vec<FeedHealth> {
292 self.feeds_by_status(HealthStatus::Stale)
293 }
294
295 pub fn oldest_tick_ms(&self) -> Option<u64> {
298 self.feeds
299 .iter()
300 .filter_map(|e| e.last_tick_ms)
301 .min()
302 }
303
304 pub fn newest_tick_ms(&self) -> Option<u64> {
307 self.feeds
308 .iter()
309 .filter_map(|e| e.last_tick_ms)
310 .max()
311 }
312
313 pub fn total_tick_count(&self) -> u64 {
319 self.feeds.iter().map(|e| e.tick_count).sum()
320 }
321
322 pub fn lag_ms(&self) -> Option<u64> {
328 let newest = self.newest_tick_ms()?;
329 let oldest = self.oldest_tick_ms()?;
330 Some(newest.saturating_sub(oldest))
331 }
332
333 pub fn most_stale_feed(&self) -> Option<FeedHealth> {
340 self.feeds.iter().fold(None, |acc: Option<FeedHealth>, entry| {
341 let feed = entry.clone();
342 match acc {
343 None => Some(feed),
344 Some(current) => {
345 let more_stale = match (feed.last_tick_ms, current.last_tick_ms) {
346 (None, _) => true,
347 (Some(_), None) => false,
348 (Some(a), Some(b)) => a < b,
349 };
350 if more_stale { Some(feed) } else { Some(current) }
351 }
352 }
353 })
354 }
355
356 pub fn stale_ratio_excluding_unknown(&self) -> f64 {
361 let known: Vec<_> = self.feeds.iter()
362 .filter(|e| e.status != HealthStatus::Unknown)
363 .collect();
364 if known.is_empty() { return 0.0; }
365 let stale = known.iter().filter(|e| e.status == HealthStatus::Stale).count();
366 stale as f64 / known.len() as f64
367 }
368
369 pub fn healthy_feeds(&self) -> Vec<String> {
374 let mut ids: Vec<String> = self
375 .feeds
376 .iter()
377 .filter(|e| e.status == HealthStatus::Healthy)
378 .map(|e| e.feed_id.clone())
379 .collect();
380 ids.sort();
381 ids
382 }
383
384 pub fn unhealthy_feeds(&self) -> Vec<String> {
390 let mut ids: Vec<String> = self
391 .feeds
392 .iter()
393 .filter(|e| e.status != HealthStatus::Healthy)
394 .map(|e| e.feed_id.clone())
395 .collect();
396 ids.sort();
397 ids
398 }
399
400 pub fn reset_all(&self) {
405 for mut entry in self.feeds.iter_mut() {
406 entry.status = HealthStatus::Unknown;
407 entry.last_tick_ms = None;
408 entry.consecutive_stale = 0;
409 }
410 }
411
412 pub fn all_healthy(&self) -> bool {
417 self.feeds.iter().all(|e| e.status == HealthStatus::Healthy)
418 }
419
420 pub fn ratio_healthy(&self) -> f64 {
424 let total = self.feed_count();
425 if total == 0 {
426 return 0.0;
427 }
428 self.healthy_count() as f64 / total as f64
429 }
430
431 pub fn last_updated_feed_id(&self) -> Option<String> {
435 self.feeds
436 .iter()
437 .filter_map(|e| e.last_tick_ms.map(|t| (t, e.feed_id.clone())))
438 .max_by_key(|(t, _)| *t)
439 .map(|(_, id)| id)
440 }
441
442 pub fn unknown_feed_ids(&self) -> Vec<String> {
444 self.feeds
445 .iter()
446 .filter(|e| e.status == HealthStatus::Unknown)
447 .map(|e| e.feed_id.clone())
448 .collect()
449 }
450
451 pub fn status_summary(&self) -> (usize, usize, usize) {
457 let (mut healthy, mut stale, mut unknown) = (0, 0, 0);
458 for e in self.feeds.iter() {
459 match e.status {
460 HealthStatus::Healthy => healthy += 1,
461 HealthStatus::Stale => stale += 1,
462 HealthStatus::Unknown => unknown += 1,
463 }
464 }
465 (healthy, stale, unknown)
466 }
467
468 pub fn stale_feed_ids(&self) -> Vec<String> {
473 let mut ids: Vec<String> = self
474 .feeds
475 .iter()
476 .filter(|e| e.status == HealthStatus::Stale)
477 .map(|e| e.feed_id.clone())
478 .collect();
479 ids.sort();
480 ids
481 }
482
483 #[deprecated(since = "2.2.0", note = "Use `stale_count` instead")]
487 pub fn total_stale_count(&self) -> usize {
488 self.stale_count()
489 }
490
491 #[deprecated(since = "2.2.0", note = "Use `unhealthy_feeds` instead")]
495 pub fn feeds_needing_check(&self) -> Vec<String> {
496 self.unhealthy_feeds()
497 }
498
499 pub fn avg_feed_age_ms(&self, now_ms: u64) -> Option<f64> {
503 let ages: Vec<u64> = self
504 .feeds
505 .iter()
506 .filter_map(|e| e.last_tick_ms)
507 .map(|t| now_ms.saturating_sub(t))
508 .collect();
509 if ages.is_empty() {
510 return None;
511 }
512 Some(ages.iter().sum::<u64>() as f64 / ages.len() as f64)
513 }
514
515 pub fn unknown_count(&self) -> usize {
520 self.feeds_by_status(HealthStatus::Unknown).len()
521 }
522
523 pub fn avg_tick_count(&self) -> f64 {
527 let count = self.feed_count();
528 if count == 0 {
529 return 0.0;
530 }
531 self.total_tick_count() as f64 / count as f64
532 }
533
534 pub fn max_consecutive_stale(&self) -> u32 {
538 self.feeds
539 .iter()
540 .map(|e| e.consecutive_stale)
541 .max()
542 .unwrap_or(0)
543 }
544
545 #[deprecated(since = "2.2.0", note = "Use `unknown_count` instead")]
549 pub fn unknown_feed_count(&self) -> usize {
550 self.unknown_count()
551 }
552
553 pub fn feeds_by_status(&self, status: HealthStatus) -> Vec<FeedHealth> {
559 self.feeds
560 .iter()
561 .filter(|e| e.value().status == status)
562 .map(|e| e.value().clone())
563 .collect()
564 }
565
566 pub fn oldest_stale_feed(&self) -> Option<FeedHealth> {
572 self.feeds
573 .iter()
574 .filter(|e| e.value().status == HealthStatus::Stale)
575 .min_by_key(|e| e.value().last_tick_ms.unwrap_or(u64::MAX))
576 .map(|e| e.value().clone())
577 }
578
579 #[deprecated(since = "2.2.0", note = "Use `ratio_healthy` instead")]
583 pub fn healthy_ratio(&self) -> f64 {
584 self.ratio_healthy()
585 }
586
587 pub fn most_reliable_feed(&self) -> Option<FeedHealth> {
593 self.feeds.iter()
594 .max_by_key(|e| e.value().tick_count)
595 .map(|e| e.value().clone())
596 }
597
598 pub fn feeds_never_seen(&self) -> Vec<FeedHealth> {
604 self.feeds.iter()
605 .filter(|e| e.value().last_tick_ms.is_none())
606 .map(|e| e.value().clone())
607 .collect()
608 }
609
610 #[deprecated(since = "2.2.0", note = "Use `is_any_stale` instead")]
615 pub fn is_any_feed_stale(&self) -> bool {
616 self.is_any_stale()
617 }
618
619 pub fn all_feeds_seen(&self) -> bool {
624 self.feeds.iter().all(|e| e.last_tick_ms.is_some())
625 }
626
627 pub fn tick_count_for(&self, feed_id: &str) -> Option<u64> {
630 self.feeds.iter()
631 .find(|e| e.feed_id == feed_id)
632 .map(|e| e.tick_count)
633 }
634
635 #[deprecated(since = "2.2.0", note = "Use `avg_tick_count` instead")]
639 pub fn average_tick_count(&self) -> f64 {
640 self.avg_tick_count()
641 }
642
643 pub fn feeds_above_tick_count(&self, threshold: u64) -> usize {
645 self.feeds.iter().filter(|e| e.tick_count > threshold).count()
646 }
647
648 pub fn oldest_feed_age_ms(&self, now_ms: u64) -> Option<u64> {
653 self.feeds
654 .iter()
655 .filter_map(|e| e.last_tick_ms)
656 .map(|t| now_ms.saturating_sub(t))
657 .max()
658 }
659
660 pub fn has_any_unknown(&self) -> bool {
663 self.feeds.iter().any(|e| e.status == HealthStatus::Unknown)
664 }
665
666 pub fn is_degraded(&self) -> bool {
672 let total = self.feed_count();
673 if total == 0 {
674 return false;
675 }
676 let healthy = self.healthy_count();
677 healthy > 0 && healthy < total
678 }
679
680 pub fn unhealthy_count(&self) -> usize {
682 self.feed_count().saturating_sub(self.healthy_count())
683 }
684
685 pub fn feed_exists(&self, feed_id: &str) -> bool {
687 self.feeds.iter().any(|e| e.feed_id == feed_id)
688 }
689
690 #[deprecated(since = "2.2.0", note = "Use `has_any_unknown` instead")]
694 pub fn any_unknown(&self) -> bool {
695 self.has_any_unknown()
696 }
697
698 #[deprecated(since = "2.2.0", note = "Use `stale_count` instead")]
702 pub fn degraded_count(&self) -> usize {
703 self.stale_count()
704 }
705
706 pub fn time_since_last_heartbeat(&self, feed_id: &str, now_ms: u64) -> Option<u64> {
710 self.feeds
711 .iter()
712 .find(|e| e.feed_id == feed_id)?
713 .last_tick_ms
714 .map(|t| now_ms.saturating_sub(t))
715 }
716
717 pub fn healthy_feed_ids(&self) -> Vec<String> {
719 self.feeds
720 .iter()
721 .filter(|e| e.status == HealthStatus::Healthy)
722 .map(|e| e.feed_id.clone())
723 .collect()
724 }
725
726 pub fn register_batch(&self, feeds: &[(&str, u64)]) {
731 for (id, threshold) in feeds {
732 self.register(*id, Some(*threshold));
733 }
734 }
735
736 pub fn min_healthy_age_ms(&self, now_ms: u64) -> Option<u64> {
740 self.feeds
741 .iter()
742 .filter(|e| e.status == HealthStatus::Healthy)
743 .filter_map(|e| e.last_tick_ms)
744 .map(|t| now_ms.saturating_sub(t))
745 .min()
746 }
747
748}
749
750#[cfg(test)]
751mod tests {
752 use super::*;
753
754 fn monitor() -> HealthMonitor {
755 HealthMonitor::new(5_000)
756 }
757
758 #[test]
759 fn test_register_creates_unknown_feed() {
760 let m = monitor();
761 m.register("BTC-USD", None);
762 let h = m.get("BTC-USD").unwrap();
763 assert_eq!(h.status, HealthStatus::Unknown);
764 assert!(h.last_tick_ms.is_none());
765 }
766
767 #[test]
768 fn test_heartbeat_marks_feed_healthy() {
769 let m = monitor();
770 m.register("BTC-USD", None);
771 m.heartbeat("BTC-USD", 1_000_000).unwrap();
772 let h = m.get("BTC-USD").unwrap();
773 assert_eq!(h.status, HealthStatus::Healthy);
774 assert_eq!(h.last_tick_ms, Some(1_000_000));
775 }
776
777 #[test]
778 fn test_heartbeat_increments_tick_count() {
779 let m = monitor();
780 m.register("BTC-USD", None);
781 m.heartbeat("BTC-USD", 1000).unwrap();
782 m.heartbeat("BTC-USD", 2000).unwrap();
783 m.heartbeat("BTC-USD", 3000).unwrap();
784 assert_eq!(m.get("BTC-USD").unwrap().tick_count, 3);
785 }
786
787 #[test]
788 fn test_heartbeat_unknown_feed_returns_unknown_feed_error() {
789 let m = monitor();
790 let result = m.heartbeat("ghost", 1000);
791 assert!(matches!(result, Err(StreamError::UnknownFeed { .. })));
792 }
793
794 #[test]
795 fn test_check_all_healthy_feed_no_errors() {
796 let m = monitor();
797 m.register("BTC-USD", None);
798 m.heartbeat("BTC-USD", 1_000_000).unwrap();
799 let errors = m.check_all(1_003_000); assert!(errors.is_empty());
801 }
802
803 #[test]
804 fn test_check_all_stale_feed_returns_error() {
805 let m = monitor();
806 m.register("BTC-USD", None);
807 m.heartbeat("BTC-USD", 1_000_000).unwrap();
808 let errors = m.check_all(1_010_000); assert_eq!(errors.len(), 1);
810 assert_eq!(errors[0].0, "BTC-USD");
811 assert!(matches!(&errors[0].1, StreamError::StaleFeed { feed_id, .. } if feed_id == "BTC-USD"));
812 }
813
814 #[test]
815 fn test_check_all_marks_stale_in_state() {
816 let m = monitor();
817 m.register("BTC-USD", None);
818 m.heartbeat("BTC-USD", 1_000_000).unwrap();
819 m.check_all(1_010_000);
820 assert_eq!(m.get("BTC-USD").unwrap().status, HealthStatus::Stale);
821 }
822
823 #[test]
824 fn test_check_all_unknown_feed_not_counted_as_stale() {
825 let m = monitor();
826 m.register("BTC-USD", None);
827 let errors = m.check_all(9_999_999);
829 assert!(errors.is_empty());
830 }
831
832 #[test]
833 fn test_custom_threshold_per_feed() {
834 let m = monitor();
835 m.register("BTC-USD", Some(1_000)); m.heartbeat("BTC-USD", 1_000_000).unwrap();
837 let errors = m.check_all(1_002_000); assert!(!errors.is_empty());
839 }
840
841 #[test]
842 fn test_feed_count() {
843 let m = monitor();
844 m.register("BTC-USD", None);
845 m.register("ETH-USD", None);
846 assert_eq!(m.feed_count(), 2);
847 }
848
849 #[test]
850 fn test_healthy_count_and_stale_count() {
851 let m = monitor();
852 m.register("BTC-USD", None);
853 m.register("ETH-USD", None);
854 m.heartbeat("BTC-USD", 1_000_000).unwrap();
855 m.heartbeat("ETH-USD", 1_000_000).unwrap();
856 m.check_all(1_010_000); assert_eq!(m.stale_count(), 2);
858 assert_eq!(m.healthy_count(), 0);
859 }
860
861 #[test]
862 fn test_feed_health_elapsed_ms() {
863 let h = FeedHealth {
864 feed_id: "BTC-USD".into(),
865 status: HealthStatus::Healthy,
866 last_tick_ms: Some(1_000_000),
867 stale_threshold_ms: 5_000,
868 tick_count: 1,
869 consecutive_stale: 0,
870 };
871 assert_eq!(h.elapsed_ms(1_003_000), Some(3_000));
872 }
873
874 #[test]
875 fn test_feed_health_elapsed_ms_none_when_no_last_tick() {
876 let h = FeedHealth {
877 feed_id: "X".into(),
878 status: HealthStatus::Unknown,
879 last_tick_ms: None,
880 stale_threshold_ms: 5_000,
881 tick_count: 0,
882 consecutive_stale: 0,
883 };
884 assert!(h.elapsed_ms(9_999_999).is_none());
885 }
886
887 #[test]
888 fn test_all_feeds_returns_all() {
889 let m = monitor();
890 m.register("A", None);
891 m.register("B", None);
892 let feeds = m.all_feeds();
893 assert_eq!(feeds.len(), 2);
894 }
895
896 #[test]
897 fn test_circuit_not_open_before_threshold() {
898 let m = monitor(); m.register("BTC-USD", None);
900 m.heartbeat("BTC-USD", 1_000_000).unwrap();
901 m.check_all(1_010_000); m.check_all(1_020_000); assert!(!m.is_circuit_open("BTC-USD"));
904 }
905
906 #[test]
907 fn test_circuit_opens_at_threshold() {
908 let m = monitor(); m.register("BTC-USD", None);
910 m.heartbeat("BTC-USD", 1_000_000).unwrap();
911 m.check_all(1_010_000); m.check_all(1_020_000); m.check_all(1_030_000); assert!(m.is_circuit_open("BTC-USD"));
915 }
916
917 #[test]
918 fn test_circuit_resets_on_heartbeat() {
919 let m = monitor();
920 m.register("BTC-USD", None);
921 m.heartbeat("BTC-USD", 1_000_000).unwrap();
922 m.check_all(1_010_000);
923 m.check_all(1_020_000);
924 m.check_all(1_030_000);
925 assert!(m.is_circuit_open("BTC-USD"));
926 m.heartbeat("BTC-USD", 1_040_000).unwrap();
927 assert!(!m.is_circuit_open("BTC-USD"));
928 }
929
930 #[test]
931 fn test_circuit_disabled_when_threshold_zero() {
932 let m = HealthMonitor::new(5_000).with_circuit_breaker_threshold(0);
933 m.register("BTC-USD", None);
934 m.heartbeat("BTC-USD", 1_000_000).unwrap();
935 for i in 0..10 {
936 m.check_all(1_010_000 + i * 10_000);
937 }
938 assert!(!m.is_circuit_open("BTC-USD"));
939 }
940
941 #[test]
942 fn test_circuit_open_returns_false_for_unknown_feed() {
943 let m = monitor();
944 assert!(!m.is_circuit_open("ghost"));
945 }
946
947 #[test]
948 fn test_deregister_removes_feed() {
949 let m = monitor();
950 m.register("BTC-USD", None);
951 m.heartbeat("BTC-USD", 1_000_000).unwrap();
952 let removed = m.deregister("BTC-USD");
953 assert!(removed.is_some());
954 assert_eq!(removed.unwrap().feed_id, "BTC-USD");
955 assert!(m.get("BTC-USD").is_none());
956 assert_eq!(m.feed_count(), 0);
957 }
958
959 #[test]
960 fn test_deregister_unknown_feed_returns_none() {
961 let m = monitor();
962 assert!(m.deregister("ghost").is_none());
963 }
964
965 #[test]
966 fn test_feed_ids_returns_sorted_ids() {
967 let m = monitor();
968 m.register("ETH-USD", None);
969 m.register("BTC-USD", None);
970 m.register("SOL-USD", None);
971 let ids = m.feed_ids();
972 assert_eq!(ids, vec!["BTC-USD", "ETH-USD", "SOL-USD"]);
973 }
974
975 #[test]
976 fn test_feed_ids_empty_when_no_feeds() {
977 let m = monitor();
978 assert!(m.feed_ids().is_empty());
979 }
980
981 #[test]
982 fn test_feed_ids_updates_after_deregister() {
983 let m = monitor();
984 m.register("BTC-USD", None);
985 m.register("ETH-USD", None);
986 m.deregister("BTC-USD");
987 assert_eq!(m.feed_ids(), vec!["ETH-USD"]);
988 }
989
990 #[test]
991 fn test_reset_feed_clears_state() {
992 let m = monitor();
993 m.register("BTC-USD", None);
994 m.heartbeat("BTC-USD", 1_000_000).unwrap();
995 m.check_all(1_010_000); assert_eq!(m.get("BTC-USD").unwrap().status, HealthStatus::Stale);
997 m.reset_feed("BTC-USD").unwrap();
998 let h = m.get("BTC-USD").unwrap();
999 assert_eq!(h.status, HealthStatus::Unknown);
1000 assert_eq!(h.tick_count, 0);
1001 assert_eq!(h.consecutive_stale, 0);
1002 assert!(h.last_tick_ms.is_none());
1003 }
1004
1005 #[test]
1006 fn test_reset_feed_unknown_returns_error() {
1007 let m = monitor();
1008 assert!(matches!(
1009 m.reset_feed("ghost"),
1010 Err(StreamError::UnknownFeed { .. })
1011 ));
1012 }
1013
1014 #[test]
1015 fn test_check_one_healthy_feed_returns_none() {
1016 let m = monitor();
1017 m.register("BTC-USD", None);
1018 m.heartbeat("BTC-USD", 1_000_000).unwrap();
1019 assert!(m.check_one("BTC-USD", 1_003_000).unwrap().is_none());
1020 }
1021
1022 #[test]
1023 fn test_check_one_stale_feed_returns_some_error() {
1024 let m = monitor();
1025 m.register("BTC-USD", None);
1026 m.heartbeat("BTC-USD", 1_000_000).unwrap();
1027 let result = m.check_one("BTC-USD", 1_010_000).unwrap();
1028 assert!(matches!(result, Some(StreamError::StaleFeed { .. })));
1029 }
1030
1031 #[test]
1032 fn test_check_one_unknown_feed_returns_err() {
1033 let m = monitor();
1034 assert!(matches!(
1035 m.check_one("ghost", 0),
1036 Err(StreamError::UnknownFeed { .. })
1037 ));
1038 }
1039
1040 #[test]
1041 fn test_heartbeat_after_deregister_returns_unknown_feed_error() {
1042 let m = monitor();
1043 m.register("BTC-USD", None);
1044 m.deregister("BTC-USD");
1045 let result = m.heartbeat("BTC-USD", 1_000_000);
1046 assert!(matches!(result, Err(StreamError::UnknownFeed { .. })));
1047 }
1048
1049 #[test]
1050 fn test_unhealthy_feeds_returns_non_healthy_sorted() {
1051 let m = HealthMonitor::new(5_000);
1052 m.register("BTC-USD", None);
1053 m.register("ETH-USD", None);
1054 m.register("SOL-USD", None);
1055 m.heartbeat("BTC-USD", 1_000_000).unwrap();
1057 let unhealthy = m.unhealthy_feeds();
1058 assert!(unhealthy.contains(&"ETH-USD".to_string()));
1060 assert!(unhealthy.contains(&"SOL-USD".to_string()));
1061 assert!(!unhealthy.contains(&"BTC-USD".to_string()));
1062 assert_eq!(unhealthy[0], "ETH-USD");
1064 assert_eq!(unhealthy[1], "SOL-USD");
1065 }
1066
1067 #[test]
1068 fn test_oldest_tick_ms_returns_minimum() {
1069 let m = HealthMonitor::new(5_000);
1070 m.register("A", None);
1071 m.register("B", None);
1072 m.heartbeat("A", 1_000_000).unwrap();
1073 m.heartbeat("B", 2_000_000).unwrap();
1074 assert_eq!(m.oldest_tick_ms(), Some(1_000_000));
1075 }
1076
1077 #[test]
1078 fn test_newest_tick_ms_returns_maximum() {
1079 let m = HealthMonitor::new(5_000);
1080 m.register("A", None);
1081 m.register("B", None);
1082 m.heartbeat("A", 1_000_000).unwrap();
1083 m.heartbeat("B", 2_000_000).unwrap();
1084 assert_eq!(m.newest_tick_ms(), Some(2_000_000));
1085 }
1086
1087 #[test]
1088 fn test_oldest_newest_tick_ms_none_when_no_ticks() {
1089 let m = HealthMonitor::new(5_000);
1090 m.register("A", None);
1091 assert!(m.oldest_tick_ms().is_none());
1092 assert!(m.newest_tick_ms().is_none());
1093 }
1094
1095 #[test]
1096 fn test_unhealthy_feeds_empty_when_all_healthy() {
1097 let m = HealthMonitor::new(5_000);
1098 m.register("BTC-USD", None);
1099 m.heartbeat("BTC-USD", 1_000_000).unwrap();
1100 assert!(m.unhealthy_feeds().is_empty());
1101 }
1102
1103 #[test]
1104 fn test_unhealthy_feeds_includes_stale_feeds() {
1105 let m = HealthMonitor::new(1_000); m.register("BTC-USD", None);
1107 m.heartbeat("BTC-USD", 1_000_000).unwrap();
1108 m.check_all(1_002_000); let unhealthy = m.unhealthy_feeds();
1110 assert!(unhealthy.contains(&"BTC-USD".to_string()));
1111 }
1112
1113 #[test]
1116 fn test_all_healthy_vacuously_true_no_feeds() {
1117 let m = HealthMonitor::new(5_000);
1118 assert!(m.all_healthy());
1119 }
1120
1121 #[test]
1122 fn test_all_healthy_false_when_unknown() {
1123 let m = HealthMonitor::new(5_000);
1124 m.register("BTC-USD", None);
1125 assert!(!m.all_healthy());
1127 }
1128
1129 #[test]
1130 fn test_all_healthy_true_after_heartbeats() {
1131 let m = HealthMonitor::new(5_000);
1132 m.register("BTC-USD", None);
1133 m.register("ETH-USD", None);
1134 m.heartbeat("BTC-USD", 1_000_000).unwrap();
1135 m.heartbeat("ETH-USD", 1_000_000).unwrap();
1136 assert!(m.all_healthy());
1137 }
1138
1139 #[test]
1140 fn test_all_healthy_false_when_one_stale() {
1141 let m = HealthMonitor::new(1_000);
1142 m.register("BTC-USD", None);
1143 m.register("ETH-USD", None);
1144 m.heartbeat("BTC-USD", 1_000_000).unwrap();
1145 m.heartbeat("ETH-USD", 1_000_000).unwrap();
1146 m.check_all(1_002_000); assert!(!m.all_healthy());
1148 }
1149
1150 #[test]
1153 fn test_stale_feed_ids_empty_when_none_stale() {
1154 let m = HealthMonitor::new(5_000);
1155 m.register("BTC-USD", None);
1156 m.heartbeat("BTC-USD", 1_000_000).unwrap();
1157 assert!(m.stale_feed_ids().is_empty());
1158 }
1159
1160 #[test]
1161 fn test_stale_feed_ids_excludes_unknown() {
1162 let m = HealthMonitor::new(5_000);
1163 m.register("BTC-USD", None); assert!(m.stale_feed_ids().is_empty()); }
1166
1167 #[test]
1168 fn test_stale_feed_ids_returns_only_stale() {
1169 let m = HealthMonitor::new(1_000);
1170 m.register("BTC-USD", None);
1171 m.register("ETH-USD", Some(10_000)); m.heartbeat("BTC-USD", 1_000_000).unwrap();
1173 m.heartbeat("ETH-USD", 1_000_000).unwrap();
1174 m.check_all(1_002_000); let stale = m.stale_feed_ids();
1176 assert_eq!(stale, vec!["BTC-USD".to_string()]);
1177 }
1178
1179 #[test]
1182 fn test_lag_ms_none_when_no_ticks() {
1183 let m = HealthMonitor::new(5_000);
1184 m.register("A", None);
1185 m.register("B", None);
1186 assert!(m.lag_ms().is_none());
1187 }
1188
1189 #[test]
1190 fn test_lag_ms_zero_when_one_feed_or_equal_timestamps() {
1191 let m = HealthMonitor::new(5_000);
1192 m.register("A", None);
1193 m.register("B", None);
1194 m.heartbeat("A", 1_000_000).unwrap();
1195 m.heartbeat("B", 1_000_000).unwrap();
1196 assert_eq!(m.lag_ms(), Some(0));
1197 }
1198
1199 #[test]
1200 fn test_lag_ms_returns_spread() {
1201 let m = HealthMonitor::new(5_000);
1202 m.register("fast", None);
1203 m.register("slow", None);
1204 m.heartbeat("fast", 2_000_000).unwrap();
1205 m.heartbeat("slow", 1_000_000).unwrap();
1206 assert_eq!(m.lag_ms(), Some(1_000_000));
1207 }
1208
1209 #[test]
1212 fn test_total_tick_count_zero_initially() {
1213 let m = HealthMonitor::new(5_000);
1214 m.register("A", None);
1215 m.register("B", None);
1216 assert_eq!(m.total_tick_count(), 0);
1217 }
1218
1219 #[test]
1220 fn test_total_tick_count_sums_across_feeds() {
1221 let m = HealthMonitor::new(5_000);
1222 m.register("A", None);
1223 m.register("B", None);
1224 m.heartbeat("A", 1_000_000).unwrap();
1225 m.heartbeat("A", 1_001_000).unwrap();
1226 m.heartbeat("B", 1_000_000).unwrap();
1227 assert_eq!(m.total_tick_count(), 3);
1228 }
1229
1230 #[test]
1233 fn test_healthy_feeds_empty_initially() {
1234 let m = HealthMonitor::new(5_000);
1235 m.register("A", None); assert!(m.healthy_feeds().is_empty());
1237 }
1238
1239 #[test]
1240 fn test_healthy_feeds_after_heartbeat() {
1241 let m = HealthMonitor::new(5_000);
1242 m.register("A", None);
1243 m.register("B", None);
1244 m.heartbeat("A", 1_000_000).unwrap();
1245 m.check_all(1_001_000); let healthy = m.healthy_feeds();
1247 assert_eq!(healthy, vec!["A".to_string()]);
1248 }
1249
1250 #[test]
1251 fn test_healthy_feeds_sorted() {
1252 let m = HealthMonitor::new(5_000);
1253 m.register("C", None);
1254 m.register("A", None);
1255 m.register("B", None);
1256 m.heartbeat("C", 1_000_000).unwrap();
1257 m.heartbeat("A", 1_000_000).unwrap();
1258 m.heartbeat("B", 1_000_000).unwrap();
1259 m.check_all(1_001_000);
1260 let healthy = m.healthy_feeds();
1261 assert_eq!(healthy, vec!["A".to_string(), "B".to_string(), "C".to_string()]);
1262 }
1263
1264 #[test]
1267 fn test_unknown_count_all_new_feeds() {
1268 let m = monitor();
1269 m.register("A", None);
1270 m.register("B", None);
1271 assert_eq!(m.unknown_count(), 2);
1272 }
1273
1274 #[test]
1275 fn test_unknown_count_decreases_after_heartbeat() {
1276 let m = monitor();
1277 m.register("A", None);
1278 m.register("B", None);
1279 m.heartbeat("A", 1_000).unwrap();
1280 assert_eq!(m.unknown_count(), 1); }
1282
1283 #[test]
1284 fn test_unknown_count_zero_when_all_healthy() {
1285 let m = monitor();
1286 m.register("A", None);
1287 m.heartbeat("A", 1_000).unwrap();
1288 assert_eq!(m.unknown_count(), 0);
1289 }
1290
1291 #[test]
1292 fn test_feed_health_is_healthy_true_after_heartbeat() {
1293 let m = monitor();
1294 m.register("X", None);
1295 m.heartbeat("X", 1_000).unwrap();
1296 let fh = m.get("X").unwrap();
1297 assert!(fh.is_healthy());
1298 }
1299
1300 #[test]
1301 fn test_feed_health_is_healthy_false_when_unknown() {
1302 let m = monitor();
1303 m.register("X", None);
1304 let fh = m.get("X").unwrap();
1305 assert!(!fh.is_healthy());
1306 }
1307
1308 #[test]
1311 fn test_most_stale_feed_none_when_no_feeds() {
1312 let m = HealthMonitor::new(5_000);
1313 assert!(m.most_stale_feed().is_none());
1314 }
1315
1316 #[test]
1317 fn test_most_stale_feed_returns_unticked_feed_first() {
1318 let m = HealthMonitor::new(5_000);
1319 m.register("A", None);
1320 m.register("B", None);
1321 m.heartbeat("A", 1_000_000).unwrap();
1322 let stale = m.most_stale_feed().unwrap();
1324 assert_eq!(stale.feed_id, "B");
1325 }
1326
1327 #[test]
1328 fn test_most_stale_feed_returns_oldest_last_tick() {
1329 let m = HealthMonitor::new(5_000);
1330 m.register("A", None);
1331 m.register("B", None);
1332 m.heartbeat("A", 1_000).unwrap();
1333 m.heartbeat("B", 5_000).unwrap();
1334 let stale = m.most_stale_feed().unwrap();
1336 assert_eq!(stale.feed_id, "A");
1337 }
1338
1339 #[test]
1340 fn test_stale_feeds_empty_when_all_healthy() {
1341 let m = monitor();
1342 m.register("A", None);
1343 m.heartbeat("A", 1_000).unwrap();
1344 assert!(m.stale_feeds().is_empty());
1345 }
1346
1347 #[test]
1348 fn test_stale_feeds_returns_all_stale() {
1349 let m = monitor();
1351 m.register("A", None);
1352 m.register("B", None);
1353 m.heartbeat("A", 1_000).unwrap();
1354 m.heartbeat("B", 9_500).unwrap();
1355 m.check_all(10_000);
1356 let stale = m.stale_feeds();
1357 assert_eq!(stale.len(), 1);
1358 assert_eq!(stale[0].feed_id, "A");
1359 }
1360
1361 #[test]
1364 fn test_register_many_creates_all_feeds() {
1365 let m = monitor();
1366 m.register_many(&["BTC-USD", "ETH-USD", "SOL-USD"], None);
1367 assert_eq!(m.feed_count(), 3);
1368 }
1369
1370 #[test]
1371 fn test_register_many_custom_threshold_applies() {
1372 let m = monitor();
1373 m.register_many(&["A", "B"], Some(1_000));
1374 m.heartbeat("A", 1_000_000).unwrap();
1375 let errors = m.check_all(1_002_000); assert!(!errors.is_empty());
1377 }
1378
1379 #[test]
1380 fn test_register_many_empty_slice_is_noop() {
1381 let m = monitor();
1382 m.register_many(&[], None);
1383 assert_eq!(m.feed_count(), 0);
1384 }
1385
1386 #[test]
1389 fn test_feed_health_is_stale_true_when_stale() {
1390 let m = monitor();
1391 m.register("BTC-USD", None);
1392 m.heartbeat("BTC-USD", 1_000_000).unwrap();
1393 m.check_all(1_010_000); assert!(m.get("BTC-USD").unwrap().is_stale());
1395 }
1396
1397 #[test]
1398 fn test_feed_health_is_stale_false_when_healthy() {
1399 let m = monitor();
1400 m.register("BTC-USD", None);
1401 m.heartbeat("BTC-USD", 1_000_000).unwrap();
1402 assert!(!m.get("BTC-USD").unwrap().is_stale());
1403 }
1404
1405 #[test]
1408 fn test_avg_tick_count_zero_with_no_feeds() {
1409 let m = HealthMonitor::new(5_000);
1410 assert!((m.avg_tick_count() - 0.0).abs() < 1e-9);
1411 }
1412
1413 #[test]
1414 fn test_avg_tick_count_with_equal_ticks() {
1415 let m = HealthMonitor::new(5_000);
1416 m.register("A", None);
1417 m.register("B", None);
1418 m.heartbeat("A", 1_000).unwrap();
1419 m.heartbeat("B", 1_000).unwrap();
1420 assert!((m.avg_tick_count() - 1.0).abs() < 1e-9);
1421 }
1422
1423 #[test]
1424 fn test_avg_tick_count_with_different_ticks() {
1425 let m = HealthMonitor::new(5_000);
1426 m.register("A", None);
1427 m.register("B", None);
1428 m.heartbeat("A", 1_000).unwrap();
1429 m.heartbeat("A", 2_000).unwrap();
1430 m.heartbeat("A", 3_000).unwrap();
1431 m.heartbeat("B", 1_000).unwrap();
1432 assert!((m.avg_tick_count() - 2.0).abs() < 1e-9);
1434 }
1435
1436 #[test]
1439 fn test_max_consecutive_stale_zero_with_no_feeds() {
1440 let m = HealthMonitor::new(5_000);
1441 assert_eq!(m.max_consecutive_stale(), 0);
1442 }
1443
1444 #[test]
1445 fn test_max_consecutive_stale_picks_highest() {
1446 let m = HealthMonitor::new(1_000);
1447 m.register("A", None);
1448 m.register("B", None);
1449 m.heartbeat("A", 1_000).unwrap();
1451 m.heartbeat("B", 1_000).unwrap();
1452 m.check_all(4_000);
1454 m.check_all(5_000);
1455 assert!(m.max_consecutive_stale() >= 2);
1457 }
1458
1459 #[test]
1460 fn test_max_consecutive_stale_zero_after_heartbeat() {
1461 let m = HealthMonitor::new(1_000);
1462 m.register("A", None);
1463 m.check_all(10_000);
1464 m.heartbeat("A", 10_001).unwrap();
1465 assert_eq!(m.max_consecutive_stale(), 0);
1466 }
1467
1468 #[test]
1469 fn test_status_summary_all_unknown() {
1470 let m = monitor();
1471 m.register("A", None);
1472 m.register("B", None);
1473 let (healthy, stale, unknown) = m.status_summary();
1474 assert_eq!(healthy, 0);
1475 assert_eq!(stale, 0);
1476 assert_eq!(unknown, 2);
1477 }
1478
1479 #[test]
1480 fn test_status_summary_mixed() {
1481 let m = monitor();
1482 m.register("A", None);
1483 m.register("B", None);
1484 m.register("C", None);
1485 m.heartbeat("A", 1_000).unwrap();
1486 m.heartbeat("B", 5_000).unwrap();
1487 m.check_all(10_000);
1489 let (healthy, stale, unknown) = m.status_summary();
1490 assert_eq!(stale, 1);
1491 assert_eq!(unknown, 1);
1492 assert_eq!(healthy, 1);
1493 }
1494
1495 #[test]
1498 fn test_feeds_by_status_returns_only_matching_status() {
1499 let mut m = monitor();
1500 m.register("A", None);
1501 m.register("B", None);
1502 m.heartbeat("A", 1_000).unwrap();
1504 m.heartbeat("B", 9_000).unwrap();
1505 m.check_all(10_000); let stale = m.feeds_by_status(HealthStatus::Stale);
1507 assert_eq!(stale.len(), 1);
1508 assert_eq!(stale[0].feed_id, "A");
1509 }
1510
1511 #[test]
1512 fn test_feeds_by_status_empty_when_none_match() {
1513 let mut m = monitor();
1514 m.register("A", None);
1515 m.heartbeat("A", 1_000).unwrap();
1516 m.check_all(2_000);
1517 let unknown = m.feeds_by_status(HealthStatus::Unknown);
1519 assert!(unknown.is_empty());
1520 }
1521
1522 #[test]
1523 fn test_feeds_by_status_all_start_as_unknown() {
1524 let mut m = monitor();
1525 m.register("A", None);
1526 m.register("B", None);
1527 let unknown = m.feeds_by_status(HealthStatus::Unknown);
1528 assert_eq!(unknown.len(), 2);
1529 }
1530
1531 #[test]
1534 fn test_unknown_feed_count_all_unknown_at_start() {
1535 let m = HealthMonitor::new(5_000);
1536 m.register("A", None);
1537 m.register("B", None);
1538 assert_eq!(m.unknown_feed_count(), 2);
1539 }
1540
1541 #[test]
1542 fn test_unknown_feed_count_decreases_after_heartbeat() {
1543 let m = HealthMonitor::new(5_000);
1544 m.register("A", None);
1545 m.register("B", None);
1546 m.heartbeat("A", 1_000).unwrap();
1547 assert_eq!(m.unknown_feed_count(), 1);
1548 }
1549
1550 #[test]
1551 fn test_unknown_feed_count_zero_with_no_feeds() {
1552 let m = HealthMonitor::new(5_000);
1553 assert_eq!(m.unknown_feed_count(), 0);
1554 }
1555
1556 #[test]
1559 fn test_all_healthy_vacuously_true_with_no_feeds() {
1560 let m = HealthMonitor::new(5_000);
1562 assert!(m.all_healthy());
1563 }
1564
1565 #[test]
1566 fn test_all_healthy_true_when_all_feeds_healthy() {
1567 let m = HealthMonitor::new(5_000);
1568 m.register("A", None);
1569 m.register("B", None);
1570 m.heartbeat("A", 1_000).unwrap();
1571 m.heartbeat("B", 1_000).unwrap();
1572 assert!(m.all_healthy());
1573 }
1574
1575 #[test]
1576 fn test_all_healthy_false_when_one_feed_unknown() {
1577 let m = HealthMonitor::new(5_000);
1578 m.register("A", None);
1579 m.register("B", None);
1580 m.heartbeat("A", 1_000).unwrap();
1581 assert!(!m.all_healthy());
1583 }
1584
1585 #[test]
1588 fn test_oldest_stale_feed_returns_feed_with_smallest_last_tick() {
1589 let mut m = monitor();
1590 m.register("A", None);
1591 m.register("B", None);
1592 m.heartbeat("A", 1_000).unwrap(); m.heartbeat("B", 3_000).unwrap(); m.check_all(10_000); let oldest = m.oldest_stale_feed().unwrap();
1596 assert_eq!(oldest.feed_id, "A");
1597 }
1598
1599 #[test]
1600 fn test_oldest_stale_feed_none_when_no_stale_feeds() {
1601 let mut m = monitor();
1602 m.register("A", None);
1603 m.heartbeat("A", 9_000).unwrap();
1604 m.check_all(10_000); assert!(m.oldest_stale_feed().is_none());
1606 }
1607
1608 #[test]
1609 fn test_healthy_ratio_zero_when_no_feeds() {
1610 let m = monitor();
1611 assert_eq!(m.healthy_ratio(), 0.0);
1612 }
1613
1614 #[test]
1615 fn test_healthy_ratio_one_when_all_healthy() {
1616 let mut m = monitor();
1617 m.register("A", None);
1618 m.register("B", None);
1619 m.heartbeat("A", 9_000).unwrap();
1620 m.heartbeat("B", 9_500).unwrap();
1621 m.check_all(10_000); assert!((m.healthy_ratio() - 1.0).abs() < 1e-10);
1623 }
1624
1625 #[test]
1626 fn test_healthy_ratio_half_when_one_of_two_healthy() {
1627 let mut m = monitor();
1628 m.register("A", None);
1629 m.register("B", None);
1630 m.heartbeat("A", 1_000).unwrap(); m.heartbeat("B", 9_500).unwrap(); m.check_all(10_000);
1633 assert!((m.healthy_ratio() - 0.5).abs() < 1e-10);
1634 }
1635
1636 #[test]
1639 fn test_most_reliable_feed_returns_highest_tick_count() {
1640 let mut m = monitor();
1641 m.register("A", None);
1642 m.register("B", None);
1643 m.heartbeat("A", 1_000).unwrap();
1644 m.heartbeat("B", 1_100).unwrap();
1645 m.heartbeat("B", 1_200).unwrap();
1646 let best = m.most_reliable_feed().unwrap();
1648 assert_eq!(best.feed_id, "B");
1649 }
1650
1651 #[test]
1652 fn test_most_reliable_feed_none_when_no_feeds() {
1653 let m = monitor();
1654 assert!(m.most_reliable_feed().is_none());
1655 }
1656
1657 #[test]
1658 fn test_feeds_never_seen_returns_feeds_with_no_heartbeat() {
1659 let mut m = monitor();
1660 m.register("A", None);
1661 m.register("B", None);
1662 m.heartbeat("A", 1_000).unwrap();
1663 let never_seen = m.feeds_never_seen();
1665 assert_eq!(never_seen.len(), 1);
1666 assert_eq!(never_seen[0].feed_id, "B");
1667 }
1668
1669 #[test]
1670 fn test_feeds_never_seen_empty_when_all_have_heartbeat() {
1671 let mut m = monitor();
1672 m.register("A", None);
1673 m.heartbeat("A", 1_000).unwrap();
1674 assert!(m.feeds_never_seen().is_empty());
1675 }
1676
1677 #[test]
1680 fn test_is_any_feed_stale_true_when_stale_feed_exists() {
1681 let mut m = monitor();
1682 m.register("A", None);
1683 m.heartbeat("A", 1_000).unwrap();
1684 m.check_all(10_000); assert!(m.is_any_feed_stale());
1686 }
1687
1688 #[test]
1689 fn test_is_any_feed_stale_false_when_all_healthy() {
1690 let mut m = monitor();
1691 m.register("A", None);
1692 m.heartbeat("A", 9_500).unwrap();
1693 m.check_all(10_000); assert!(!m.is_any_feed_stale());
1695 }
1696
1697 #[test]
1698 fn test_is_any_feed_stale_false_when_no_feeds() {
1699 let m = monitor();
1700 assert!(!m.is_any_feed_stale());
1701 }
1702
1703 #[test]
1704 fn test_all_feeds_seen_true_when_all_have_heartbeat() {
1705 let mut m = monitor();
1706 m.register("A", None);
1707 m.register("B", None);
1708 m.heartbeat("A", 1_000).unwrap();
1709 m.heartbeat("B", 2_000).unwrap();
1710 assert!(m.all_feeds_seen());
1711 }
1712
1713 #[test]
1714 fn test_all_feeds_seen_false_when_one_never_seen() {
1715 let mut m = monitor();
1716 m.register("A", None);
1717 m.register("B", None);
1718 m.heartbeat("A", 1_000).unwrap();
1719 assert!(!m.all_feeds_seen());
1720 }
1721
1722 #[test]
1723 fn test_all_feeds_seen_true_vacuously_when_no_feeds() {
1724 let m = monitor();
1725 assert!(m.all_feeds_seen());
1726 }
1727
1728 #[test]
1731 fn test_tick_count_for_returns_correct_count() {
1732 let mut m = monitor();
1733 m.register("A", None);
1734 m.heartbeat("A", 1_000).unwrap();
1735 m.heartbeat("A", 2_000).unwrap();
1736 assert_eq!(m.tick_count_for("A"), Some(2));
1737 }
1738
1739 #[test]
1740 fn test_tick_count_for_none_when_not_registered() {
1741 let m = monitor();
1742 assert!(m.tick_count_for("nonexistent").is_none());
1743 }
1744
1745 #[test]
1746 fn test_tick_count_for_zero_when_no_heartbeats() {
1747 let mut m = monitor();
1748 m.register("A", None);
1749 assert_eq!(m.tick_count_for("A"), Some(0));
1750 }
1751
1752 #[test]
1753 fn test_average_tick_count_zero_when_no_feeds() {
1754 let m = monitor();
1755 assert_eq!(m.average_tick_count(), 0.0);
1756 }
1757
1758 #[test]
1759 fn test_average_tick_count_correct_value() {
1760 let mut m = monitor();
1761 m.register("A", None);
1762 m.register("B", None);
1763 m.heartbeat("A", 1_000).unwrap();
1764 m.heartbeat("A", 2_000).unwrap(); m.heartbeat("B", 1_000).unwrap(); assert!((m.average_tick_count() - 1.5).abs() < 1e-10);
1768 }
1769
1770 #[test]
1772 fn test_feeds_above_tick_count_correct() {
1773 let mut m = monitor();
1774 m.register("A", None);
1775 m.register("B", None);
1776 m.register("C", None);
1777 m.heartbeat("A", 1_000).unwrap();
1778 m.heartbeat("A", 2_000).unwrap();
1779 m.heartbeat("A", 3_000).unwrap(); m.heartbeat("B", 1_000).unwrap(); assert_eq!(m.feeds_above_tick_count(1), 1);
1784 assert_eq!(m.feeds_above_tick_count(0), 2);
1785 assert_eq!(m.feeds_above_tick_count(5), 0);
1786 }
1787
1788 #[test]
1789 fn test_feeds_above_tick_count_zero_when_no_feeds() {
1790 let m = HealthMonitor::new(5_000);
1791 assert_eq!(m.feeds_above_tick_count(0), 0);
1792 }
1793
1794 #[test]
1796 fn test_oldest_feed_age_ms_returns_max_age() {
1797 let mut m = monitor();
1798 m.register("A", None);
1799 m.register("B", None);
1800 m.heartbeat("A", 5_000).unwrap();
1801 m.heartbeat("B", 8_000).unwrap();
1802 assert_eq!(m.oldest_feed_age_ms(10_000), Some(5_000));
1805 }
1806
1807 #[test]
1808 fn test_oldest_feed_age_ms_none_when_no_ticks() {
1809 let mut m = monitor();
1810 m.register("A", None);
1811 assert!(m.oldest_feed_age_ms(10_000).is_none());
1812 }
1813
1814 #[test]
1816 fn test_total_stale_count_zero_when_all_healthy() {
1817 let mut m = HealthMonitor::new(5_000);
1818 m.register("A", None);
1819 m.heartbeat("A", 9_500).unwrap();
1820 let _ = m.check_all(10_000);
1821 assert_eq!(m.total_stale_count(), 0);
1822 }
1823
1824 #[test]
1825 fn test_total_stale_count_correct_when_stale() {
1826 let mut m = HealthMonitor::new(5_000);
1827 m.register("A", None);
1828 m.register("B", None);
1829 m.heartbeat("A", 1_000).unwrap();
1830 m.heartbeat("B", 1_000).unwrap();
1831 let _ = m.check_all(10_000);
1833 assert_eq!(m.total_stale_count(), 2);
1834 }
1835
1836 #[test]
1838 fn test_avg_feed_age_ms_none_when_no_ticks() {
1839 let mut m = monitor();
1840 m.register("A", None);
1841 assert!(m.avg_feed_age_ms(10_000).is_none());
1842 }
1843
1844 #[test]
1845 fn test_avg_feed_age_ms_correct_average() {
1846 let mut m = monitor();
1847 m.register("A", None);
1848 m.register("B", None);
1849 m.heartbeat("A", 5_000).unwrap(); m.heartbeat("B", 8_000).unwrap(); let avg = m.avg_feed_age_ms(10_000).unwrap();
1853 assert!((avg - 3500.0).abs() < 1e-10, "got {avg}");
1854 }
1855
1856 #[test]
1859 fn test_stale_ratio_zero_with_no_feeds() {
1860 let m = HealthMonitor::new(5_000);
1861 assert_eq!(m.stale_ratio(), 0.0);
1862 }
1863
1864 #[test]
1865 fn test_stale_ratio_one_when_all_stale() {
1866 let m = HealthMonitor::new(1_000);
1867 m.register("A", None);
1868 m.register("B", None);
1869 m.heartbeat("A", 1_000).unwrap();
1870 m.heartbeat("B", 1_000).unwrap();
1871 m.check_all(5_000);
1872 assert!((m.stale_ratio() - 1.0).abs() < 1e-10);
1873 }
1874
1875 #[test]
1876 fn test_stale_ratio_half_when_one_of_two_stale() {
1877 let m = HealthMonitor::new(1_000);
1878 m.register("A", None);
1879 m.register("B", None);
1880 m.heartbeat("A", 1_000).unwrap();
1881 m.heartbeat("B", 9_500).unwrap();
1882 m.check_all(10_000);
1883 assert!((m.stale_ratio() - 0.5).abs() < 1e-10);
1884 }
1885
1886 #[test]
1889 fn test_has_any_unknown_true_for_fresh_feed() {
1890 let m = HealthMonitor::new(5_000);
1891 m.register("feed", None);
1892 assert!(m.has_any_unknown());
1894 }
1895
1896 #[test]
1897 fn test_has_any_unknown_false_after_heartbeat() {
1898 let m = HealthMonitor::new(5_000);
1899 m.register("feed", None);
1900 m.heartbeat("feed", 1_000).unwrap();
1901 assert!(!m.has_any_unknown());
1902 }
1903
1904 #[test]
1905 fn test_has_any_unknown_false_with_no_feeds() {
1906 let m = HealthMonitor::new(5_000);
1907 assert!(!m.has_any_unknown());
1908 }
1909
1910 #[test]
1913 fn test_is_degraded_true_when_some_unhealthy() {
1914 let m = HealthMonitor::new(5_000);
1915 m.register("A", None);
1916 m.register("B", None);
1917 m.heartbeat("A", 9_500).unwrap(); m.heartbeat("B", 1_000).unwrap(); m.check_all(10_000);
1920 assert!(m.is_degraded());
1922 }
1923
1924 #[test]
1925 fn test_is_degraded_false_when_all_healthy() {
1926 let m = HealthMonitor::new(5_000);
1927 m.register("A", None);
1928 m.heartbeat("A", 1_000).unwrap();
1929 assert!(!m.is_degraded());
1930 }
1931
1932 #[test]
1933 fn test_is_degraded_false_with_no_feeds() {
1934 let m = HealthMonitor::new(5_000);
1935 assert!(!m.is_degraded());
1936 }
1937
1938 #[test]
1941 fn test_unhealthy_count_all_unknown() {
1942 let m = HealthMonitor::new(5_000);
1943 m.register("A", None);
1944 m.register("B", None);
1945 assert_eq!(m.unhealthy_count(), 2);
1947 }
1948
1949 #[test]
1950 fn test_unhealthy_count_one_healthy() {
1951 let m = HealthMonitor::new(5_000);
1952 m.register("A", None);
1953 m.register("B", None);
1954 m.heartbeat("A", 1_000).unwrap();
1955 assert_eq!(m.unhealthy_count(), 1);
1957 }
1958
1959 #[test]
1960 fn test_unhealthy_count_zero_when_empty() {
1961 let m = HealthMonitor::new(5_000);
1962 assert_eq!(m.unhealthy_count(), 0);
1963 }
1964
1965 #[test]
1966 fn test_feed_exists_true_after_register() {
1967 let m = HealthMonitor::new(5_000);
1968 m.register("BTC-USD", None);
1969 assert!(m.feed_exists("BTC-USD"));
1970 }
1971
1972 #[test]
1973 fn test_feed_exists_false_for_unknown_feed() {
1974 let m = HealthMonitor::new(5_000);
1975 assert!(!m.feed_exists("ETH-USD"));
1976 }
1977
1978 #[test]
1981 fn test_most_stale_feed_returns_oldest_feed() {
1982 let m = HealthMonitor::new(5_000);
1983 m.register("A", None);
1984 m.register("B", None);
1985 m.heartbeat("A", 1_000).unwrap(); m.heartbeat("B", 9_000).unwrap(); let stale = m.most_stale_feed().unwrap();
1989 assert_eq!(stale.feed_id, "A");
1990 }
1991
1992 #[test]
1993 fn test_most_stale_feed_some_with_single_feed() {
1994 let m = HealthMonitor::new(5_000);
1995 m.register("A", None);
1996 m.heartbeat("A", 1_000).unwrap();
1997 assert!(m.most_stale_feed().is_some());
1998 }
1999
2000 #[test]
2001 fn test_most_stale_feed_none_when_empty() {
2002 let m = HealthMonitor::new(5_000);
2003 assert!(m.most_stale_feed().is_none());
2004 }
2005
2006 #[test]
2009 fn test_stale_ratio_excl_unknown_zero_when_empty() {
2010 let m = HealthMonitor::new(5_000);
2011 assert_eq!(m.stale_ratio_excluding_unknown(), 0.0);
2012 }
2013
2014 #[test]
2015 fn test_stale_ratio_excl_unknown_zero_when_all_unknown() {
2016 let m = HealthMonitor::new(5_000);
2017 m.register("A", None);
2018 m.register("B", None);
2019 assert_eq!(m.stale_ratio_excluding_unknown(), 0.0);
2021 }
2022
2023 #[test]
2024 fn test_stale_ratio_excl_unknown_half_when_one_stale() {
2025 let m = HealthMonitor::new(5_000);
2026 m.register("A", None);
2027 m.register("B", None);
2028 m.heartbeat("A", 9_500).unwrap(); m.heartbeat("B", 1_000).unwrap(); m.check_all(10_000);
2031 assert!((m.stale_ratio_excluding_unknown() - 0.5).abs() < 1e-10);
2033 }
2034
2035 #[test]
2038 fn test_any_unknown_false_when_empty() {
2039 let m = HealthMonitor::new(5_000);
2040 assert!(!m.any_unknown());
2041 }
2042
2043 #[test]
2044 fn test_any_unknown_true_when_feed_registered_but_no_heartbeat() {
2045 let m = HealthMonitor::new(5_000);
2046 m.register("BTC-USD", None);
2047 assert!(m.any_unknown());
2048 }
2049
2050 #[test]
2051 fn test_any_unknown_false_when_all_have_heartbeats() {
2052 let m = HealthMonitor::new(5_000);
2053 m.register("BTC-USD", None);
2054 m.heartbeat("BTC-USD", 1_000).unwrap();
2055 assert!(!m.any_unknown());
2056 }
2057
2058 #[test]
2061 fn test_degraded_count_zero_when_all_healthy() {
2062 let m = HealthMonitor::new(5_000);
2063 m.register("A", None);
2064 m.heartbeat("A", 9_500).unwrap();
2065 m.check_all(10_000);
2066 assert_eq!(m.degraded_count(), 0);
2067 }
2068
2069 #[test]
2070 fn test_degraded_count_one_when_one_stale() {
2071 let m = HealthMonitor::new(5_000);
2072 m.register("A", None);
2073 m.register("B", None);
2074 m.heartbeat("A", 9_500).unwrap(); m.heartbeat("B", 1_000).unwrap(); m.check_all(10_000);
2077 assert_eq!(m.degraded_count(), 1);
2078 }
2079
2080 #[test]
2081 fn test_degraded_count_zero_when_empty() {
2082 let m = HealthMonitor::new(5_000);
2083 assert_eq!(m.degraded_count(), 0);
2084 }
2085
2086 #[test]
2089 fn test_min_healthy_age_ms_none_when_no_healthy_feeds() {
2090 let m = HealthMonitor::new(5_000);
2091 m.register("A", None);
2092 assert!(m.min_healthy_age_ms(10_000).is_none());
2094 }
2095
2096 #[test]
2097 fn test_min_healthy_age_ms_returns_most_recent() {
2098 let m = HealthMonitor::new(5_000);
2099 m.register("A", None);
2100 m.register("B", None);
2101 m.heartbeat("A", 8_000).unwrap(); m.heartbeat("B", 9_000).unwrap(); m.check_all(10_000);
2104 assert_eq!(m.min_healthy_age_ms(10_000), Some(1_000));
2106 }
2107
2108 #[test]
2111 fn test_healthy_feed_ids_empty_when_no_feeds() {
2112 let m = HealthMonitor::new(5_000);
2113 assert!(m.healthy_feed_ids().is_empty());
2114 }
2115
2116 #[test]
2117 fn test_healthy_feed_ids_returns_healthy_only() {
2118 let m = HealthMonitor::new(5_000);
2119 m.register("A", None);
2120 m.register("B", None);
2121 m.heartbeat("A", 9_500).unwrap(); m.check_all(10_000);
2124 let ids = m.healthy_feed_ids();
2125 assert_eq!(ids, vec!["A".to_string()]);
2126 }
2127
2128 #[test]
2131 fn test_time_since_last_heartbeat_correct() {
2132 let m = HealthMonitor::new(5_000);
2133 m.register("BTC", None);
2134 m.heartbeat("BTC", 9_000).unwrap();
2135 assert_eq!(m.time_since_last_heartbeat("BTC", 10_000), Some(1_000));
2136 }
2137
2138 #[test]
2139 fn test_time_since_last_heartbeat_none_when_no_tick() {
2140 let m = HealthMonitor::new(5_000);
2141 m.register("ETH", None);
2142 assert!(m.time_since_last_heartbeat("ETH", 10_000).is_none());
2143 }
2144
2145 #[test]
2146 fn test_time_since_last_heartbeat_none_when_unknown_feed() {
2147 let m = HealthMonitor::new(5_000);
2148 assert!(m.time_since_last_heartbeat("MISSING", 10_000).is_none());
2149 }
2150
2151 #[test]
2154 fn test_register_batch_registers_all_feeds() {
2155 let m = HealthMonitor::new(5_000);
2156 m.register_batch(&[("BTC", 1_000), ("ETH", 2_000), ("SOL", 3_000)]);
2157 assert!(m.feed_exists("BTC"));
2158 assert!(m.feed_exists("ETH"));
2159 assert!(m.feed_exists("SOL"));
2160 }
2161
2162 #[test]
2163 fn test_register_batch_uses_custom_thresholds() {
2164 let m = HealthMonitor::new(10_000);
2165 m.register_batch(&[("BTC", 500)]);
2166 m.heartbeat("BTC", 0).unwrap();
2168 m.check_all(600);
2169 assert_eq!(m.stale_count(), 1);
2170 }
2171
2172 #[test]
2173 fn test_register_batch_empty_slice_is_noop() {
2174 let m = HealthMonitor::new(5_000);
2175 m.register_batch(&[]);
2176 assert_eq!(m.feed_count(), 0);
2177 }
2178
2179 #[test]
2182 fn test_unknown_feed_ids_all_new_feeds_are_unknown() {
2183 let m = HealthMonitor::new(5_000);
2184 m.register("BTC", None);
2185 m.register("ETH", None);
2186 let ids = m.unknown_feed_ids();
2187 assert!(ids.contains(&"BTC".to_string()));
2188 assert!(ids.contains(&"ETH".to_string()));
2189 }
2190
2191 #[test]
2192 fn test_unknown_feed_ids_empty_after_heartbeat_and_check() {
2193 let m = HealthMonitor::new(5_000);
2194 m.register("BTC", None);
2195 m.heartbeat("BTC", 0).unwrap();
2196 m.check_all(100);
2197 assert!(m.unknown_feed_ids().is_empty());
2198 }
2199
2200 #[test]
2201 fn test_unknown_feed_ids_empty_when_no_feeds() {
2202 let m = HealthMonitor::new(5_000);
2203 assert!(m.unknown_feed_ids().is_empty());
2204 }
2205
2206 #[test]
2209 fn test_feeds_needing_check_returns_stale_and_unknown() {
2210 let m = HealthMonitor::new(5_000);
2211 m.register("BTC", None); m.register("ETH", None);
2213 m.heartbeat("ETH", 0).unwrap();
2214 m.check_all(100); let needing = m.feeds_needing_check();
2216 assert!(needing.contains(&"BTC".to_string()));
2217 assert!(!needing.contains(&"ETH".to_string()));
2218 }
2219
2220 #[test]
2221 fn test_feeds_needing_check_empty_when_all_healthy() {
2222 let m = HealthMonitor::new(5_000);
2223 m.register("BTC", None);
2224 m.heartbeat("BTC", 0).unwrap();
2225 m.check_all(100);
2226 assert!(m.feeds_needing_check().is_empty());
2227 }
2228
2229 #[test]
2230 fn test_feeds_needing_check_sorted() {
2231 let m = HealthMonitor::new(5_000);
2232 m.register("ZZZ", None);
2233 m.register("AAA", None);
2234 let needing = m.feeds_needing_check();
2235 assert_eq!(needing, vec!["AAA".to_string(), "ZZZ".to_string()]);
2236 }
2237
2238 #[test]
2241 fn test_ratio_healthy_zero_when_no_feeds() {
2242 let m = HealthMonitor::new(5_000);
2243 assert_eq!(m.ratio_healthy(), 0.0);
2244 }
2245
2246 #[test]
2247 fn test_ratio_healthy_zero_when_all_unknown() {
2248 let m = HealthMonitor::new(5_000);
2249 m.register("BTC", None);
2250 m.register("ETH", None);
2251 assert_eq!(m.ratio_healthy(), 0.0);
2252 }
2253
2254 #[test]
2255 fn test_ratio_healthy_one_when_all_healthy() {
2256 let m = HealthMonitor::new(5_000);
2257 m.register("BTC", None);
2258 m.heartbeat("BTC", 0).unwrap();
2259 m.check_all(100);
2260 assert_eq!(m.ratio_healthy(), 1.0);
2261 }
2262
2263 #[test]
2264 fn test_ratio_healthy_half_when_one_of_two() {
2265 let m = HealthMonitor::new(5_000);
2266 m.register("BTC", None);
2267 m.register("ETH", None);
2268 m.heartbeat("BTC", 0).unwrap();
2269 m.check_all(100);
2270 let ratio = m.ratio_healthy();
2271 assert!((ratio - 0.5).abs() < 1e-10);
2272 }
2273
2274 #[test]
2277 fn test_total_tick_count_zero_when_no_feeds() {
2278 let m = HealthMonitor::new(5_000);
2279 assert_eq!(m.total_tick_count(), 0);
2280 }
2281
2282 #[test]
2283 fn test_total_tick_count_sums_all_feeds() {
2284 let m = HealthMonitor::new(5_000);
2285 m.register("BTC", None);
2286 m.register("ETH", None);
2287 m.heartbeat("BTC", 0).unwrap();
2288 m.heartbeat("BTC", 1).unwrap();
2289 m.heartbeat("ETH", 0).unwrap();
2290 assert_eq!(m.total_tick_count(), 3);
2291 }
2292
2293 #[test]
2296 fn test_last_updated_feed_id_none_when_no_ticks() {
2297 let m = HealthMonitor::new(5_000);
2298 m.register("BTC", None);
2299 assert!(m.last_updated_feed_id().is_none());
2300 }
2301
2302 #[test]
2303 fn test_last_updated_feed_id_returns_most_recent() {
2304 let m = HealthMonitor::new(5_000);
2305 m.register("BTC", None);
2306 m.register("ETH", None);
2307 m.heartbeat("BTC", 100).unwrap();
2308 m.heartbeat("ETH", 200).unwrap(); assert_eq!(m.last_updated_feed_id(), Some("ETH".to_string()));
2310 }
2311
2312 #[test]
2315 fn test_is_any_stale_false_when_no_feeds() {
2316 let m = HealthMonitor::new(5_000);
2317 assert!(!m.is_any_stale());
2318 }
2319
2320 #[test]
2321 fn test_is_any_stale_false_when_all_healthy() {
2322 let m = HealthMonitor::new(5_000);
2323 m.register("BTC", None);
2324 m.heartbeat("BTC", 0).unwrap();
2325 m.check_all(100);
2326 assert!(!m.is_any_stale());
2327 }
2328
2329 #[test]
2330 fn test_is_any_stale_true_when_stale_feed() {
2331 let m = HealthMonitor::new(5_000);
2332 m.register("BTC", None);
2333 m.heartbeat("BTC", 0).unwrap();
2334 m.check_all(10_000); assert!(m.is_any_stale());
2336 }
2337}