Skip to main content

fin_stream/health/
mod.rs

1//! Feed health monitoring — staleness detection and status tracking.
2//!
3//! ## Responsibility
4//! Track the timestamp of the most recent tick per feed. Emit StaleFeed
5//! errors when a feed has not produced data within the configured threshold.
6//!
7//! ## Guarantees
8//! - Thread-safe: HealthMonitor uses DashMap for concurrent updates
9//! - Non-panicking: all operations return Result or Option
10
11use crate::error::StreamError;
12use dashmap::DashMap;
13use std::sync::Arc;
14
15/// Health status of a single feed.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
17pub enum HealthStatus {
18    /// Feed is active and within staleness threshold.
19    Healthy,
20    /// Feed has not produced data within the staleness threshold.
21    Stale,
22    /// Feed is newly registered and no data has been received yet.
23    Unknown,
24}
25
26/// Per-feed health state.
27#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
28pub struct FeedHealth {
29    /// Identifier of this feed (e.g. `"BTC-USD"`).
30    pub feed_id: String,
31    /// Current health status of this feed.
32    pub status: HealthStatus,
33    /// Timestamp (ms since Unix epoch) of the most recent tick, if any.
34    pub last_tick_ms: Option<u64>,
35    /// Staleness threshold in milliseconds for this feed.
36    pub stale_threshold_ms: u64,
37    /// Total number of heartbeats received since registration.
38    pub tick_count: u64,
39    /// Number of consecutive stale checks. Resets to 0 on heartbeat.
40    pub consecutive_stale: u32,
41}
42
43impl FeedHealth {
44    /// Elapsed ms since last tick.
45    pub fn elapsed_ms(&self, now_ms: u64) -> Option<u64> {
46        self.last_tick_ms.map(|t| now_ms.saturating_sub(t))
47    }
48
49    /// Returns `true` if this feed's status is [`HealthStatus::Healthy`].
50    pub fn is_healthy(&self) -> bool {
51        self.status == HealthStatus::Healthy
52    }
53
54    /// Returns `true` if this feed's status is [`HealthStatus::Stale`].
55    pub fn is_stale(&self) -> bool {
56        self.status == HealthStatus::Stale
57    }
58}
59
60/// Central health monitor for all active feeds.
61pub struct HealthMonitor {
62    feeds: Arc<DashMap<String, FeedHealth>>,
63    default_stale_threshold_ms: u64,
64    /// Number of consecutive stale checks before the circuit opens. 0 = disabled.
65    circuit_breaker_threshold: u32,
66}
67
68impl HealthMonitor {
69    /// Create a monitor with a default staleness threshold (milliseconds).
70    ///
71    /// Individual feeds can override this with a custom threshold via
72    /// [`HealthMonitor::register`].
73    pub fn new(default_stale_threshold_ms: u64) -> Self {
74        Self {
75            feeds: Arc::new(DashMap::new()),
76            default_stale_threshold_ms,
77            circuit_breaker_threshold: 3,
78        }
79    }
80
81    /// Configure how many consecutive stale checks open the circuit (default: 3).
82    /// Set to 0 to disable circuit-breaking.
83    pub fn with_circuit_breaker_threshold(mut self, threshold: u32) -> Self {
84        self.circuit_breaker_threshold = threshold;
85        self
86    }
87
88    /// Returns true if the feed's circuit is open (too many consecutive stale checks).
89    /// An open circuit means callers should stop routing work to this feed.
90    pub fn is_circuit_open(&self, feed_id: &str) -> bool {
91        if self.circuit_breaker_threshold == 0 {
92            return false;
93        }
94        self.feeds
95            .get(feed_id)
96            .map_or(false, |e| e.consecutive_stale >= self.circuit_breaker_threshold)
97    }
98
99    /// Register multiple feeds in one call.
100    ///
101    /// Each feed in `ids` is registered with the same optional staleness
102    /// threshold. Equivalent to calling [`register`](Self::register) for
103    /// each ID individually.
104    pub fn register_many(&self, ids: &[&str], stale_threshold_ms: Option<u64>) {
105        for id in ids {
106            self.register(*id, stale_threshold_ms);
107        }
108    }
109
110    /// Register a feed with optional custom staleness threshold.
111    pub fn register(&self, feed_id: impl Into<String>, stale_threshold_ms: Option<u64>) {
112        let id = feed_id.into();
113        let threshold = stale_threshold_ms.unwrap_or(self.default_stale_threshold_ms);
114        self.feeds.insert(
115            id.clone(),
116            FeedHealth {
117                feed_id: id,
118                status: HealthStatus::Unknown,
119                last_tick_ms: None,
120                stale_threshold_ms: threshold,
121                tick_count: 0,
122                consecutive_stale: 0,
123            },
124        );
125    }
126
127    /// Remove a previously registered feed from the monitor.
128    ///
129    /// Returns the last known [`FeedHealth`] for the feed, or `None` if it
130    /// was not registered.
131    pub fn deregister(&self, feed_id: &str) -> Option<FeedHealth> {
132        self.feeds.remove(feed_id).map(|(_, v)| v)
133    }
134
135    /// Record a tick heartbeat for a feed.
136    ///
137    /// # Errors
138    ///
139    /// Returns [`StreamError::UnknownFeed`] if `feed_id` has not been
140    /// registered via [`register`](Self::register).
141    pub fn heartbeat(&self, feed_id: &str, ts_ms: u64) -> Result<(), StreamError> {
142        let mut entry = self
143            .feeds
144            .get_mut(feed_id)
145            .ok_or_else(|| StreamError::UnknownFeed {
146                feed_id: feed_id.to_string(),
147            })?;
148        entry.last_tick_ms = Some(ts_ms);
149        entry.tick_count += 1;
150        entry.status = HealthStatus::Healthy;
151        entry.consecutive_stale = 0;
152        Ok(())
153    }
154
155    /// Check all feeds for staleness at the given timestamp.
156    ///
157    /// Returns a list of `(feed_id, error)` pairs for stale feeds so callers
158    /// can route errors to the appropriate handler without re-parsing the feed
159    /// identifier out of the error message.
160    pub fn check_all(&self, now_ms: u64) -> Vec<(String, StreamError)> {
161        let mut errors = Vec::new();
162        for mut entry in self.feeds.iter_mut() {
163            let elapsed = entry.elapsed_ms(now_ms);
164            if let Some(elapsed) = elapsed {
165                if elapsed > entry.stale_threshold_ms {
166                    entry.status = HealthStatus::Stale;
167                    entry.consecutive_stale += 1;
168                    let feed_id = entry.feed_id.clone();
169                    errors.push((
170                        feed_id.clone(),
171                        StreamError::StaleFeed {
172                            feed_id,
173                            elapsed_ms: elapsed,
174                            threshold_ms: entry.stale_threshold_ms,
175                        },
176                    ));
177                }
178            }
179        }
180        errors
181    }
182
183    /// Get health state for a specific feed.
184    pub fn get(&self, feed_id: &str) -> Option<FeedHealth> {
185        self.feeds.get(feed_id).map(|e| e.value().clone())
186    }
187
188    /// All registered feeds.
189    pub fn all_feeds(&self) -> Vec<FeedHealth> {
190        self.feeds.iter().map(|e| e.value().clone()).collect()
191    }
192
193    /// Total number of registered feeds.
194    pub fn feed_count(&self) -> usize {
195        self.feeds.len()
196    }
197
198    /// Check staleness for a single feed at `now_ms`.
199    ///
200    /// Returns `Some(StreamError::StaleFeed { .. })` if the feed is stale,
201    /// `None` if it is healthy or has not yet received any ticks, or
202    /// `Err(StreamError::UnknownFeed)` if `feed_id` is not registered.
203    pub fn check_one(
204        &self,
205        feed_id: &str,
206        now_ms: u64,
207    ) -> Result<Option<StreamError>, StreamError> {
208        let mut entry = self
209            .feeds
210            .get_mut(feed_id)
211            .ok_or_else(|| StreamError::UnknownFeed {
212                feed_id: feed_id.to_string(),
213            })?;
214        let elapsed = match entry.last_tick_ms {
215            Some(t) => now_ms.saturating_sub(t),
216            None => return Ok(None),
217        };
218        if elapsed > entry.stale_threshold_ms {
219            entry.status = HealthStatus::Stale;
220            entry.consecutive_stale += 1;
221            Ok(Some(StreamError::StaleFeed {
222                feed_id: entry.feed_id.clone(),
223                elapsed_ms: elapsed,
224                threshold_ms: entry.stale_threshold_ms,
225            }))
226        } else {
227            Ok(None)
228        }
229    }
230
231    /// Reset a feed's stale counter and status without deregistering it.
232    ///
233    /// Clears `consecutive_stale`, `tick_count`, `last_tick_ms`, and sets
234    /// the status back to `Unknown`. Useful when a feed reconnects and should
235    /// be treated as fresh.
236    ///
237    /// # Errors
238    ///
239    /// Returns [`StreamError::UnknownFeed`] if `feed_id` is not registered.
240    pub fn reset_feed(&self, feed_id: &str) -> Result<(), StreamError> {
241        let mut entry = self
242            .feeds
243            .get_mut(feed_id)
244            .ok_or_else(|| StreamError::UnknownFeed {
245                feed_id: feed_id.to_string(),
246            })?;
247        entry.status = HealthStatus::Unknown;
248        entry.last_tick_ms = None;
249        entry.tick_count = 0;
250        entry.consecutive_stale = 0;
251        Ok(())
252    }
253
254    /// Sorted list of all registered feed identifiers.
255    pub fn feed_ids(&self) -> Vec<String> {
256        let mut ids: Vec<String> = self.feeds.iter().map(|e| e.feed_id.clone()).collect();
257        ids.sort();
258        ids
259    }
260
261    /// Number of feeds currently in the [`HealthStatus::Healthy`] state.
262    pub fn healthy_count(&self) -> usize {
263        self.feeds_by_status(HealthStatus::Healthy).len()
264    }
265
266    /// Fraction of registered feeds that are currently stale, in `[0.0, 1.0]`.
267    ///
268    /// Returns `0.0` when no feeds are registered.
269    pub fn stale_ratio(&self) -> f64 {
270        let total = self.feed_count();
271        if total == 0 {
272            return 0.0;
273        }
274        self.stale_count() as f64 / total as f64
275    }
276
277    /// Returns `true` if at least one registered feed is in [`HealthStatus::Stale`] state.
278    pub fn is_any_stale(&self) -> bool {
279        self.stale_count() > 0
280    }
281
282    /// Number of feeds currently in the [`HealthStatus::Stale`] state.
283    pub fn stale_count(&self) -> usize {
284        self.feeds_by_status(HealthStatus::Stale).len()
285    }
286
287    /// Clone of all feeds currently in the [`HealthStatus::Stale`] state.
288    ///
289    /// Useful for bulk alerting: iterate the returned vec to log or notify on
290    /// every stale feed in one call rather than checking each feed individually.
291    pub fn stale_feeds(&self) -> Vec<FeedHealth> {
292        self.feeds_by_status(HealthStatus::Stale)
293    }
294
295    /// The oldest `last_tick_ms` across all registered feeds, or `None` if no
296    /// feed has received any tick yet.
297    pub fn oldest_tick_ms(&self) -> Option<u64> {
298        self.feeds
299            .iter()
300            .filter_map(|e| e.last_tick_ms)
301            .min()
302    }
303
304    /// The most recent `last_tick_ms` across all registered feeds, or `None`
305    /// if no feed has received any tick yet.
306    pub fn newest_tick_ms(&self) -> Option<u64> {
307        self.feeds
308            .iter()
309            .filter_map(|e| e.last_tick_ms)
310            .max()
311    }
312
313    /// Sum of tick counts across all registered feeds.
314    ///
315    /// Useful for throughput monitoring: the total number of heartbeats seen
316    /// since the monitor was created (counts are not reset by
317    /// [`reset_all`](Self::reset_all)).
318    pub fn total_tick_count(&self) -> u64 {
319        self.feeds.iter().map(|e| e.tick_count).sum()
320    }
321
322    /// Feed skew: `newest_tick_ms - oldest_tick_ms` across all registered feeds.
323    ///
324    /// Returns `None` if fewer than two feeds have received ticks. A large lag
325    /// indicates that some feeds are receiving data faster than others — useful
326    /// for detecting slow feeds or feed-specific latency spikes.
327    pub fn lag_ms(&self) -> Option<u64> {
328        let newest = self.newest_tick_ms()?;
329        let oldest = self.oldest_tick_ms()?;
330        Some(newest.saturating_sub(oldest))
331    }
332
333    /// The feed that has gone the longest without receiving a tick.
334    ///
335    /// Among all registered feeds, returns the one with the oldest (smallest)
336    /// `last_tick_ms`. Feeds that have never received a tick (`last_tick_ms ==
337    /// `None`) are considered more stale than any feed that has. Returns `None`
338    /// if no feeds are registered.
339    pub fn most_stale_feed(&self) -> Option<FeedHealth> {
340        self.feeds.iter().fold(None, |acc: Option<FeedHealth>, entry| {
341            let feed = entry.clone();
342            match acc {
343                None => Some(feed),
344                Some(current) => {
345                    let more_stale = match (feed.last_tick_ms, current.last_tick_ms) {
346                        (None, _) => true,
347                        (Some(_), None) => false,
348                        (Some(a), Some(b)) => a < b,
349                    };
350                    if more_stale { Some(feed) } else { Some(current) }
351                }
352            }
353        })
354    }
355
356    /// Fraction of known (non-unknown) feeds that are stale.
357    ///
358    /// Excludes feeds in `Unknown` state from the denominator.
359    /// Returns `0.0` when no non-unknown feeds are registered.
360    pub fn stale_ratio_excluding_unknown(&self) -> f64 {
361        let known: Vec<_> = self.feeds.iter()
362            .filter(|e| e.status != HealthStatus::Unknown)
363            .collect();
364        if known.is_empty() { return 0.0; }
365        let stale = known.iter().filter(|e| e.status == HealthStatus::Stale).count();
366        stale as f64 / known.len() as f64
367    }
368
369    /// Feed identifiers that are currently in [`HealthStatus::Healthy`] state.
370    ///
371    /// Returns a sorted list of IDs. Complement of
372    /// [`unhealthy_feeds`](Self::unhealthy_feeds).
373    pub fn healthy_feeds(&self) -> Vec<String> {
374        let mut ids: Vec<String> = self
375            .feeds
376            .iter()
377            .filter(|e| e.status == HealthStatus::Healthy)
378            .map(|e| e.feed_id.clone())
379            .collect();
380        ids.sort();
381        ids
382    }
383
384    /// Feed identifiers whose status is not [`HealthStatus::Healthy`].
385    ///
386    /// Returns a sorted list of IDs that are `Stale` or `Unknown`. Complement
387    /// to [`healthy_count`](Self::healthy_count); avoids caller iteration when
388    /// only the unhealthy feed names are needed.
389    pub fn unhealthy_feeds(&self) -> Vec<String> {
390        let mut ids: Vec<String> = self
391            .feeds
392            .iter()
393            .filter(|e| e.status != HealthStatus::Healthy)
394            .map(|e| e.feed_id.clone())
395            .collect();
396        ids.sort();
397        ids
398    }
399
400    /// Reset all registered feeds to `Unknown` status, clearing last-tick timestamps.
401    ///
402    /// Useful at session boundaries (e.g. daily market open) to start fresh staleness
403    /// tracking without re-registering feeds. Tick counts are preserved.
404    pub fn reset_all(&self) {
405        for mut entry in self.feeds.iter_mut() {
406            entry.status = HealthStatus::Unknown;
407            entry.last_tick_ms = None;
408            entry.consecutive_stale = 0;
409        }
410    }
411
412    /// Returns `true` if every registered feed is in [`HealthStatus::Healthy`] state.
413    ///
414    /// Vacuously `true` when no feeds are registered. Use [`feed_count`](Self::feed_count)
415    /// to distinguish "all healthy" from "no feeds registered".
416    pub fn all_healthy(&self) -> bool {
417        self.feeds.iter().all(|e| e.status == HealthStatus::Healthy)
418    }
419
420    /// Fraction of registered feeds currently in [`HealthStatus::Healthy`] state.
421    ///
422    /// Returns `0.0` if no feeds are registered.
423    pub fn ratio_healthy(&self) -> f64 {
424        let total = self.feed_count();
425        if total == 0 {
426            return 0.0;
427        }
428        self.healthy_count() as f64 / total as f64
429    }
430
431    /// ID of the feed whose `last_tick_ms` is the most recent (largest value).
432    ///
433    /// Returns `None` if no feed has ever received a tick.
434    pub fn last_updated_feed_id(&self) -> Option<String> {
435        self.feeds
436            .iter()
437            .filter_map(|e| e.last_tick_ms.map(|t| (t, e.feed_id.clone())))
438            .max_by_key(|(t, _)| *t)
439            .map(|(_, id)| id)
440    }
441
442    /// IDs of all feeds currently in [`HealthStatus::Unknown`] state.
443    pub fn unknown_feed_ids(&self) -> Vec<String> {
444        self.feeds
445            .iter()
446            .filter(|e| e.status == HealthStatus::Unknown)
447            .map(|e| e.feed_id.clone())
448            .collect()
449    }
450
451    /// Count of feeds in each health state: `(healthy, stale, unknown)`.
452    ///
453    /// Equivalent to calling [`healthy_count`](Self::healthy_count),
454    /// [`stale_count`](Self::stale_count), and [`unknown_count`](Self::unknown_count)
455    /// in one pass.
456    pub fn status_summary(&self) -> (usize, usize, usize) {
457        let (mut healthy, mut stale, mut unknown) = (0, 0, 0);
458        for e in self.feeds.iter() {
459            match e.status {
460                HealthStatus::Healthy => healthy += 1,
461                HealthStatus::Stale => stale += 1,
462                HealthStatus::Unknown => unknown += 1,
463            }
464        }
465        (healthy, stale, unknown)
466    }
467
468    /// Feed identifiers whose status is exactly [`HealthStatus::Stale`].
469    ///
470    /// Unlike [`unhealthy_feeds`](Self::unhealthy_feeds), feeds with
471    /// [`HealthStatus::Unknown`] are excluded. Returns a sorted list.
472    pub fn stale_feed_ids(&self) -> Vec<String> {
473        let mut ids: Vec<String> = self
474            .feeds
475            .iter()
476            .filter(|e| e.status == HealthStatus::Stale)
477            .map(|e| e.feed_id.clone())
478            .collect();
479        ids.sort();
480        ids
481    }
482
483    /// Count of feeds currently in `Stale` status.
484    ///
485    /// Alias for [`stale_count`](Self::stale_count).
486    #[deprecated(since = "2.2.0", note = "Use `stale_count` instead")]
487    pub fn total_stale_count(&self) -> usize {
488        self.stale_count()
489    }
490
491    /// IDs of all feeds that are Stale or Unknown — feeds that require attention.
492    ///
493    /// Alias for [`unhealthy_feeds`](Self::unhealthy_feeds).
494    #[deprecated(since = "2.2.0", note = "Use `unhealthy_feeds` instead")]
495    pub fn feeds_needing_check(&self) -> Vec<String> {
496        self.unhealthy_feeds()
497    }
498
499    /// Average age in milliseconds across all feeds that have received at least one tick.
500    ///
501    /// Returns `None` if no feed has ever received a tick.
502    pub fn avg_feed_age_ms(&self, now_ms: u64) -> Option<f64> {
503        let ages: Vec<u64> = self
504            .feeds
505            .iter()
506            .filter_map(|e| e.last_tick_ms)
507            .map(|t| now_ms.saturating_sub(t))
508            .collect();
509        if ages.is_empty() {
510            return None;
511        }
512        Some(ages.iter().sum::<u64>() as f64 / ages.len() as f64)
513    }
514
515    /// Number of feeds whose status is [`HealthStatus::Unknown`].
516    ///
517    /// Feeds start in `Unknown` state before the first heartbeat arrives.
518    /// A non-zero count indicates feeds that have never been heard from.
519    pub fn unknown_count(&self) -> usize {
520        self.feeds_by_status(HealthStatus::Unknown).len()
521    }
522
523    /// Average tick count per registered feed.
524    ///
525    /// Returns `0.0` if no feeds are registered.
526    pub fn avg_tick_count(&self) -> f64 {
527        let count = self.feed_count();
528        if count == 0 {
529            return 0.0;
530        }
531        self.total_tick_count() as f64 / count as f64
532    }
533
534    /// Maximum `consecutive_stale` count across all registered feeds.
535    ///
536    /// Returns `0` if no feeds are registered or none have been stale yet.
537    pub fn max_consecutive_stale(&self) -> u32 {
538        self.feeds
539            .iter()
540            .map(|e| e.consecutive_stale)
541            .max()
542            .unwrap_or(0)
543    }
544
545    /// Number of feeds currently in the [`HealthStatus::Unknown`] state.
546    ///
547    /// Alias for [`unknown_count`](Self::unknown_count).
548    #[deprecated(since = "2.2.0", note = "Use `unknown_count` instead")]
549    pub fn unknown_feed_count(&self) -> usize {
550        self.unknown_count()
551    }
552
553    /// All feeds whose current status exactly matches `status`.
554    ///
555    /// Returns a `Vec` of cloned [`FeedHealth`] entries. Order is unspecified.
556    /// Use [`stale_feeds`](Self::stale_feeds) as a shorthand for the common
557    /// `Stale` case.
558    pub fn feeds_by_status(&self, status: HealthStatus) -> Vec<FeedHealth> {
559        self.feeds
560            .iter()
561            .filter(|e| e.value().status == status)
562            .map(|e| e.value().clone())
563            .collect()
564    }
565
566    /// The stale feed with the oldest `last_tick_ms`, or `None` if no feeds are stale.
567    ///
568    /// "Oldest" means the feed that has gone the longest without a heartbeat
569    /// — i.e., the one with the smallest `last_tick_ms`. Feeds with
570    /// `last_tick_ms == None` are placed last (they have never ticked).
571    pub fn oldest_stale_feed(&self) -> Option<FeedHealth> {
572        self.feeds
573            .iter()
574            .filter(|e| e.value().status == HealthStatus::Stale)
575            .min_by_key(|e| e.value().last_tick_ms.unwrap_or(u64::MAX))
576            .map(|e| e.value().clone())
577    }
578
579    /// Fraction of registered feeds that are currently `Healthy`: `healthy / total`.
580    ///
581    /// Alias for [`ratio_healthy`](Self::ratio_healthy).
582    #[deprecated(since = "2.2.0", note = "Use `ratio_healthy` instead")]
583    pub fn healthy_ratio(&self) -> f64 {
584        self.ratio_healthy()
585    }
586
587    /// The feed with the highest lifetime tick count, or `None` if no feeds
588    /// are registered.
589    ///
590    /// "Most reliable" is defined as the feed that has processed the greatest
591    /// number of heartbeats since registration or last reset.
592    pub fn most_reliable_feed(&self) -> Option<FeedHealth> {
593        self.feeds.iter()
594            .max_by_key(|e| e.value().tick_count)
595            .map(|e| e.value().clone())
596    }
597
598    /// All feeds that have never received a heartbeat (`last_tick_ms` is
599    /// `None`).
600    ///
601    /// Useful for detecting feeds that were registered but have not yet
602    /// started streaming data.
603    pub fn feeds_never_seen(&self) -> Vec<FeedHealth> {
604        self.feeds.iter()
605            .filter(|e| e.value().last_tick_ms.is_none())
606            .map(|e| e.value().clone())
607            .collect()
608    }
609
610    /// Returns `true` if at least one registered feed currently has
611    /// [`HealthStatus::Stale`] status.
612    ///
613    /// Alias for [`is_any_stale`](Self::is_any_stale).
614    #[deprecated(since = "2.2.0", note = "Use `is_any_stale` instead")]
615    pub fn is_any_feed_stale(&self) -> bool {
616        self.is_any_stale()
617    }
618
619    /// Returns `true` if every registered feed has received at least one
620    /// heartbeat (`last_tick_ms` is `Some`).
621    ///
622    /// Returns `true` vacuously when no feeds are registered.
623    pub fn all_feeds_seen(&self) -> bool {
624        self.feeds.iter().all(|e| e.last_tick_ms.is_some())
625    }
626
627    /// Returns the `tick_count` for `feed_id`, or `None` if the feed is not
628    /// registered.
629    pub fn tick_count_for(&self, feed_id: &str) -> Option<u64> {
630        self.feeds.iter()
631            .find(|e| e.feed_id == feed_id)
632            .map(|e| e.tick_count)
633    }
634
635    /// Average tick count across all registered feeds.
636    ///
637    /// Alias for [`avg_tick_count`](Self::avg_tick_count).
638    #[deprecated(since = "2.2.0", note = "Use `avg_tick_count` instead")]
639    pub fn average_tick_count(&self) -> f64 {
640        self.avg_tick_count()
641    }
642
643    /// Count of feeds whose `tick_count` exceeds `threshold`.
644    pub fn feeds_above_tick_count(&self, threshold: u64) -> usize {
645        self.feeds.iter().filter(|e| e.tick_count > threshold).count()
646    }
647
648    /// Age in milliseconds of the feed with the oldest `last_tick_ms`
649    /// (the most stale one) relative to `now_ms`.
650    ///
651    /// Returns `None` if no feed has ever received a tick.
652    pub fn oldest_feed_age_ms(&self, now_ms: u64) -> Option<u64> {
653        self.feeds
654            .iter()
655            .filter_map(|e| e.last_tick_ms)
656            .map(|t| now_ms.saturating_sub(t))
657            .max()
658    }
659
660    /// Returns `true` if at least one feed currently has
661    /// [`HealthStatus::Unknown`] status.
662    pub fn has_any_unknown(&self) -> bool {
663        self.feeds.iter().any(|e| e.status == HealthStatus::Unknown)
664    }
665
666    /// Returns `true` if at least one feed is unhealthy but not all feeds
667    /// are unhealthy.
668    ///
669    /// A fully-healthy or fully-down monitor both return `false`.
670    /// Returns `false` when no feeds are registered.
671    pub fn is_degraded(&self) -> bool {
672        let total = self.feed_count();
673        if total == 0 {
674            return false;
675        }
676        let healthy = self.healthy_count();
677        healthy > 0 && healthy < total
678    }
679
680    /// Number of feeds that are not in the [`HealthStatus::Healthy`] state.
681    pub fn unhealthy_count(&self) -> usize {
682        self.feed_count().saturating_sub(self.healthy_count())
683    }
684
685    /// Returns `true` if a feed with the given ID is registered.
686    pub fn feed_exists(&self, feed_id: &str) -> bool {
687        self.feeds.iter().any(|e| e.feed_id == feed_id)
688    }
689
690    /// Returns `true` if any registered feed has [`HealthStatus::Unknown`] status.
691    ///
692    /// Alias for [`has_any_unknown`](Self::has_any_unknown).
693    #[deprecated(since = "2.2.0", note = "Use `has_any_unknown` instead")]
694    pub fn any_unknown(&self) -> bool {
695        self.has_any_unknown()
696    }
697
698    /// Count of feeds in [`HealthStatus::Stale`] state (degraded but not unknown).
699    ///
700    /// Alias for [`stale_count`](Self::stale_count).
701    #[deprecated(since = "2.2.0", note = "Use `stale_count` instead")]
702    pub fn degraded_count(&self) -> usize {
703        self.stale_count()
704    }
705
706    /// Milliseconds since the last heartbeat for `feed_id` at `now_ms`.
707    ///
708    /// Returns `None` if the feed is not registered or has never received a tick.
709    pub fn time_since_last_heartbeat(&self, feed_id: &str, now_ms: u64) -> Option<u64> {
710        self.feeds
711            .iter()
712            .find(|e| e.feed_id == feed_id)?
713            .last_tick_ms
714            .map(|t| now_ms.saturating_sub(t))
715    }
716
717    /// IDs of all feeds currently in [`HealthStatus::Healthy`] state.
718    pub fn healthy_feed_ids(&self) -> Vec<String> {
719        self.feeds
720            .iter()
721            .filter(|e| e.status == HealthStatus::Healthy)
722            .map(|e| e.feed_id.clone())
723            .collect()
724    }
725
726    /// Register multiple feeds, each with its own staleness threshold.
727    ///
728    /// `feeds` is a slice of `(feed_id, threshold_ms)` pairs. Useful when
729    /// different feeds have different latency requirements.
730    pub fn register_batch(&self, feeds: &[(&str, u64)]) {
731        for (id, threshold) in feeds {
732            self.register(*id, Some(*threshold));
733        }
734    }
735
736    /// Age in milliseconds of the most recently-ticked healthy feed at `now_ms`.
737    ///
738    /// Returns `None` if no healthy feeds have received a tick.
739    pub fn min_healthy_age_ms(&self, now_ms: u64) -> Option<u64> {
740        self.feeds
741            .iter()
742            .filter(|e| e.status == HealthStatus::Healthy)
743            .filter_map(|e| e.last_tick_ms)
744            .map(|t| now_ms.saturating_sub(t))
745            .min()
746    }
747
748}
749
750#[cfg(test)]
751mod tests {
752    use super::*;
753
754    fn monitor() -> HealthMonitor {
755        HealthMonitor::new(5_000)
756    }
757
758    #[test]
759    fn test_register_creates_unknown_feed() {
760        let m = monitor();
761        m.register("BTC-USD", None);
762        let h = m.get("BTC-USD").unwrap();
763        assert_eq!(h.status, HealthStatus::Unknown);
764        assert!(h.last_tick_ms.is_none());
765    }
766
767    #[test]
768    fn test_heartbeat_marks_feed_healthy() {
769        let m = monitor();
770        m.register("BTC-USD", None);
771        m.heartbeat("BTC-USD", 1_000_000).unwrap();
772        let h = m.get("BTC-USD").unwrap();
773        assert_eq!(h.status, HealthStatus::Healthy);
774        assert_eq!(h.last_tick_ms, Some(1_000_000));
775    }
776
777    #[test]
778    fn test_heartbeat_increments_tick_count() {
779        let m = monitor();
780        m.register("BTC-USD", None);
781        m.heartbeat("BTC-USD", 1000).unwrap();
782        m.heartbeat("BTC-USD", 2000).unwrap();
783        m.heartbeat("BTC-USD", 3000).unwrap();
784        assert_eq!(m.get("BTC-USD").unwrap().tick_count, 3);
785    }
786
787    #[test]
788    fn test_heartbeat_unknown_feed_returns_unknown_feed_error() {
789        let m = monitor();
790        let result = m.heartbeat("ghost", 1000);
791        assert!(matches!(result, Err(StreamError::UnknownFeed { .. })));
792    }
793
794    #[test]
795    fn test_check_all_healthy_feed_no_errors() {
796        let m = monitor();
797        m.register("BTC-USD", None);
798        m.heartbeat("BTC-USD", 1_000_000).unwrap();
799        let errors = m.check_all(1_003_000); // 3s elapsed, threshold 5s
800        assert!(errors.is_empty());
801    }
802
803    #[test]
804    fn test_check_all_stale_feed_returns_error() {
805        let m = monitor();
806        m.register("BTC-USD", None);
807        m.heartbeat("BTC-USD", 1_000_000).unwrap();
808        let errors = m.check_all(1_010_000); // 10s elapsed, threshold 5s
809        assert_eq!(errors.len(), 1);
810        assert_eq!(errors[0].0, "BTC-USD");
811        assert!(matches!(&errors[0].1, StreamError::StaleFeed { feed_id, .. } if feed_id == "BTC-USD"));
812    }
813
814    #[test]
815    fn test_check_all_marks_stale_in_state() {
816        let m = monitor();
817        m.register("BTC-USD", None);
818        m.heartbeat("BTC-USD", 1_000_000).unwrap();
819        m.check_all(1_010_000);
820        assert_eq!(m.get("BTC-USD").unwrap().status, HealthStatus::Stale);
821    }
822
823    #[test]
824    fn test_check_all_unknown_feed_not_counted_as_stale() {
825        let m = monitor();
826        m.register("BTC-USD", None);
827        // No heartbeat yet
828        let errors = m.check_all(9_999_999);
829        assert!(errors.is_empty());
830    }
831
832    #[test]
833    fn test_custom_threshold_per_feed() {
834        let m = monitor();
835        m.register("BTC-USD", Some(1_000)); // 1 second threshold
836        m.heartbeat("BTC-USD", 1_000_000).unwrap();
837        let errors = m.check_all(1_002_000); // 2s elapsed, threshold 1s
838        assert!(!errors.is_empty());
839    }
840
841    #[test]
842    fn test_feed_count() {
843        let m = monitor();
844        m.register("BTC-USD", None);
845        m.register("ETH-USD", None);
846        assert_eq!(m.feed_count(), 2);
847    }
848
849    #[test]
850    fn test_healthy_count_and_stale_count() {
851        let m = monitor();
852        m.register("BTC-USD", None);
853        m.register("ETH-USD", None);
854        m.heartbeat("BTC-USD", 1_000_000).unwrap();
855        m.heartbeat("ETH-USD", 1_000_000).unwrap();
856        m.check_all(1_010_000); // stales both
857        assert_eq!(m.stale_count(), 2);
858        assert_eq!(m.healthy_count(), 0);
859    }
860
861    #[test]
862    fn test_feed_health_elapsed_ms() {
863        let h = FeedHealth {
864            feed_id: "BTC-USD".into(),
865            status: HealthStatus::Healthy,
866            last_tick_ms: Some(1_000_000),
867            stale_threshold_ms: 5_000,
868            tick_count: 1,
869            consecutive_stale: 0,
870        };
871        assert_eq!(h.elapsed_ms(1_003_000), Some(3_000));
872    }
873
874    #[test]
875    fn test_feed_health_elapsed_ms_none_when_no_last_tick() {
876        let h = FeedHealth {
877            feed_id: "X".into(),
878            status: HealthStatus::Unknown,
879            last_tick_ms: None,
880            stale_threshold_ms: 5_000,
881            tick_count: 0,
882            consecutive_stale: 0,
883        };
884        assert!(h.elapsed_ms(9_999_999).is_none());
885    }
886
887    #[test]
888    fn test_all_feeds_returns_all() {
889        let m = monitor();
890        m.register("A", None);
891        m.register("B", None);
892        let feeds = m.all_feeds();
893        assert_eq!(feeds.len(), 2);
894    }
895
896    #[test]
897    fn test_circuit_not_open_before_threshold() {
898        let m = monitor(); // threshold = 3
899        m.register("BTC-USD", None);
900        m.heartbeat("BTC-USD", 1_000_000).unwrap();
901        m.check_all(1_010_000); // 1 stale
902        m.check_all(1_020_000); // 2 stale
903        assert!(!m.is_circuit_open("BTC-USD"));
904    }
905
906    #[test]
907    fn test_circuit_opens_at_threshold() {
908        let m = monitor(); // threshold = 3
909        m.register("BTC-USD", None);
910        m.heartbeat("BTC-USD", 1_000_000).unwrap();
911        m.check_all(1_010_000); // 1
912        m.check_all(1_020_000); // 2
913        m.check_all(1_030_000); // 3 — circuit opens
914        assert!(m.is_circuit_open("BTC-USD"));
915    }
916
917    #[test]
918    fn test_circuit_resets_on_heartbeat() {
919        let m = monitor();
920        m.register("BTC-USD", None);
921        m.heartbeat("BTC-USD", 1_000_000).unwrap();
922        m.check_all(1_010_000);
923        m.check_all(1_020_000);
924        m.check_all(1_030_000);
925        assert!(m.is_circuit_open("BTC-USD"));
926        m.heartbeat("BTC-USD", 1_040_000).unwrap();
927        assert!(!m.is_circuit_open("BTC-USD"));
928    }
929
930    #[test]
931    fn test_circuit_disabled_when_threshold_zero() {
932        let m = HealthMonitor::new(5_000).with_circuit_breaker_threshold(0);
933        m.register("BTC-USD", None);
934        m.heartbeat("BTC-USD", 1_000_000).unwrap();
935        for i in 0..10 {
936            m.check_all(1_010_000 + i * 10_000);
937        }
938        assert!(!m.is_circuit_open("BTC-USD"));
939    }
940
941    #[test]
942    fn test_circuit_open_returns_false_for_unknown_feed() {
943        let m = monitor();
944        assert!(!m.is_circuit_open("ghost"));
945    }
946
947    #[test]
948    fn test_deregister_removes_feed() {
949        let m = monitor();
950        m.register("BTC-USD", None);
951        m.heartbeat("BTC-USD", 1_000_000).unwrap();
952        let removed = m.deregister("BTC-USD");
953        assert!(removed.is_some());
954        assert_eq!(removed.unwrap().feed_id, "BTC-USD");
955        assert!(m.get("BTC-USD").is_none());
956        assert_eq!(m.feed_count(), 0);
957    }
958
959    #[test]
960    fn test_deregister_unknown_feed_returns_none() {
961        let m = monitor();
962        assert!(m.deregister("ghost").is_none());
963    }
964
965    #[test]
966    fn test_feed_ids_returns_sorted_ids() {
967        let m = monitor();
968        m.register("ETH-USD", None);
969        m.register("BTC-USD", None);
970        m.register("SOL-USD", None);
971        let ids = m.feed_ids();
972        assert_eq!(ids, vec!["BTC-USD", "ETH-USD", "SOL-USD"]);
973    }
974
975    #[test]
976    fn test_feed_ids_empty_when_no_feeds() {
977        let m = monitor();
978        assert!(m.feed_ids().is_empty());
979    }
980
981    #[test]
982    fn test_feed_ids_updates_after_deregister() {
983        let m = monitor();
984        m.register("BTC-USD", None);
985        m.register("ETH-USD", None);
986        m.deregister("BTC-USD");
987        assert_eq!(m.feed_ids(), vec!["ETH-USD"]);
988    }
989
990    #[test]
991    fn test_reset_feed_clears_state() {
992        let m = monitor();
993        m.register("BTC-USD", None);
994        m.heartbeat("BTC-USD", 1_000_000).unwrap();
995        m.check_all(1_010_000); // make it stale
996        assert_eq!(m.get("BTC-USD").unwrap().status, HealthStatus::Stale);
997        m.reset_feed("BTC-USD").unwrap();
998        let h = m.get("BTC-USD").unwrap();
999        assert_eq!(h.status, HealthStatus::Unknown);
1000        assert_eq!(h.tick_count, 0);
1001        assert_eq!(h.consecutive_stale, 0);
1002        assert!(h.last_tick_ms.is_none());
1003    }
1004
1005    #[test]
1006    fn test_reset_feed_unknown_returns_error() {
1007        let m = monitor();
1008        assert!(matches!(
1009            m.reset_feed("ghost"),
1010            Err(StreamError::UnknownFeed { .. })
1011        ));
1012    }
1013
1014    #[test]
1015    fn test_check_one_healthy_feed_returns_none() {
1016        let m = monitor();
1017        m.register("BTC-USD", None);
1018        m.heartbeat("BTC-USD", 1_000_000).unwrap();
1019        assert!(m.check_one("BTC-USD", 1_003_000).unwrap().is_none());
1020    }
1021
1022    #[test]
1023    fn test_check_one_stale_feed_returns_some_error() {
1024        let m = monitor();
1025        m.register("BTC-USD", None);
1026        m.heartbeat("BTC-USD", 1_000_000).unwrap();
1027        let result = m.check_one("BTC-USD", 1_010_000).unwrap();
1028        assert!(matches!(result, Some(StreamError::StaleFeed { .. })));
1029    }
1030
1031    #[test]
1032    fn test_check_one_unknown_feed_returns_err() {
1033        let m = monitor();
1034        assert!(matches!(
1035            m.check_one("ghost", 0),
1036            Err(StreamError::UnknownFeed { .. })
1037        ));
1038    }
1039
1040    #[test]
1041    fn test_heartbeat_after_deregister_returns_unknown_feed_error() {
1042        let m = monitor();
1043        m.register("BTC-USD", None);
1044        m.deregister("BTC-USD");
1045        let result = m.heartbeat("BTC-USD", 1_000_000);
1046        assert!(matches!(result, Err(StreamError::UnknownFeed { .. })));
1047    }
1048
1049    #[test]
1050    fn test_unhealthy_feeds_returns_non_healthy_sorted() {
1051        let m = HealthMonitor::new(5_000);
1052        m.register("BTC-USD", None);
1053        m.register("ETH-USD", None);
1054        m.register("SOL-USD", None);
1055        // Make BTC healthy, leave ETH and SOL as Unknown
1056        m.heartbeat("BTC-USD", 1_000_000).unwrap();
1057        let unhealthy = m.unhealthy_feeds();
1058        // ETH and SOL are Unknown (not Healthy)
1059        assert!(unhealthy.contains(&"ETH-USD".to_string()));
1060        assert!(unhealthy.contains(&"SOL-USD".to_string()));
1061        assert!(!unhealthy.contains(&"BTC-USD".to_string()));
1062        // Verify sorted
1063        assert_eq!(unhealthy[0], "ETH-USD");
1064        assert_eq!(unhealthy[1], "SOL-USD");
1065    }
1066
1067    #[test]
1068    fn test_oldest_tick_ms_returns_minimum() {
1069        let m = HealthMonitor::new(5_000);
1070        m.register("A", None);
1071        m.register("B", None);
1072        m.heartbeat("A", 1_000_000).unwrap();
1073        m.heartbeat("B", 2_000_000).unwrap();
1074        assert_eq!(m.oldest_tick_ms(), Some(1_000_000));
1075    }
1076
1077    #[test]
1078    fn test_newest_tick_ms_returns_maximum() {
1079        let m = HealthMonitor::new(5_000);
1080        m.register("A", None);
1081        m.register("B", None);
1082        m.heartbeat("A", 1_000_000).unwrap();
1083        m.heartbeat("B", 2_000_000).unwrap();
1084        assert_eq!(m.newest_tick_ms(), Some(2_000_000));
1085    }
1086
1087    #[test]
1088    fn test_oldest_newest_tick_ms_none_when_no_ticks() {
1089        let m = HealthMonitor::new(5_000);
1090        m.register("A", None);
1091        assert!(m.oldest_tick_ms().is_none());
1092        assert!(m.newest_tick_ms().is_none());
1093    }
1094
1095    #[test]
1096    fn test_unhealthy_feeds_empty_when_all_healthy() {
1097        let m = HealthMonitor::new(5_000);
1098        m.register("BTC-USD", None);
1099        m.heartbeat("BTC-USD", 1_000_000).unwrap();
1100        assert!(m.unhealthy_feeds().is_empty());
1101    }
1102
1103    #[test]
1104    fn test_unhealthy_feeds_includes_stale_feeds() {
1105        let m = HealthMonitor::new(1_000); // 1s threshold
1106        m.register("BTC-USD", None);
1107        m.heartbeat("BTC-USD", 1_000_000).unwrap();
1108        m.check_all(1_002_000); // 2s elapsed → stale
1109        let unhealthy = m.unhealthy_feeds();
1110        assert!(unhealthy.contains(&"BTC-USD".to_string()));
1111    }
1112
1113    // ── all_healthy ───────────────────────────────────────────────────────────
1114
1115    #[test]
1116    fn test_all_healthy_vacuously_true_no_feeds() {
1117        let m = HealthMonitor::new(5_000);
1118        assert!(m.all_healthy());
1119    }
1120
1121    #[test]
1122    fn test_all_healthy_false_when_unknown() {
1123        let m = HealthMonitor::new(5_000);
1124        m.register("BTC-USD", None);
1125        // No heartbeat → Unknown → not Healthy
1126        assert!(!m.all_healthy());
1127    }
1128
1129    #[test]
1130    fn test_all_healthy_true_after_heartbeats() {
1131        let m = HealthMonitor::new(5_000);
1132        m.register("BTC-USD", None);
1133        m.register("ETH-USD", None);
1134        m.heartbeat("BTC-USD", 1_000_000).unwrap();
1135        m.heartbeat("ETH-USD", 1_000_000).unwrap();
1136        assert!(m.all_healthy());
1137    }
1138
1139    #[test]
1140    fn test_all_healthy_false_when_one_stale() {
1141        let m = HealthMonitor::new(1_000);
1142        m.register("BTC-USD", None);
1143        m.register("ETH-USD", None);
1144        m.heartbeat("BTC-USD", 1_000_000).unwrap();
1145        m.heartbeat("ETH-USD", 1_000_000).unwrap();
1146        m.check_all(1_002_000); // 2s elapsed → stale
1147        assert!(!m.all_healthy());
1148    }
1149
1150    // ── stale_feed_ids ────────────────────────────────────────────────────────
1151
1152    #[test]
1153    fn test_stale_feed_ids_empty_when_none_stale() {
1154        let m = HealthMonitor::new(5_000);
1155        m.register("BTC-USD", None);
1156        m.heartbeat("BTC-USD", 1_000_000).unwrap();
1157        assert!(m.stale_feed_ids().is_empty());
1158    }
1159
1160    #[test]
1161    fn test_stale_feed_ids_excludes_unknown() {
1162        let m = HealthMonitor::new(5_000);
1163        m.register("BTC-USD", None); // Unknown — never received heartbeat
1164        assert!(m.stale_feed_ids().is_empty()); // Unknown is not Stale
1165    }
1166
1167    #[test]
1168    fn test_stale_feed_ids_returns_only_stale() {
1169        let m = HealthMonitor::new(1_000);
1170        m.register("BTC-USD", None);
1171        m.register("ETH-USD", Some(10_000)); // 10s threshold
1172        m.heartbeat("BTC-USD", 1_000_000).unwrap();
1173        m.heartbeat("ETH-USD", 1_000_000).unwrap();
1174        m.check_all(1_002_000); // BTC stale (2s > 1s), ETH healthy (2s < 10s)
1175        let stale = m.stale_feed_ids();
1176        assert_eq!(stale, vec!["BTC-USD".to_string()]);
1177    }
1178
1179    // ── HealthMonitor::lag_ms ─────────────────────────────────────────────────
1180
1181    #[test]
1182    fn test_lag_ms_none_when_no_ticks() {
1183        let m = HealthMonitor::new(5_000);
1184        m.register("A", None);
1185        m.register("B", None);
1186        assert!(m.lag_ms().is_none());
1187    }
1188
1189    #[test]
1190    fn test_lag_ms_zero_when_one_feed_or_equal_timestamps() {
1191        let m = HealthMonitor::new(5_000);
1192        m.register("A", None);
1193        m.register("B", None);
1194        m.heartbeat("A", 1_000_000).unwrap();
1195        m.heartbeat("B", 1_000_000).unwrap();
1196        assert_eq!(m.lag_ms(), Some(0));
1197    }
1198
1199    #[test]
1200    fn test_lag_ms_returns_spread() {
1201        let m = HealthMonitor::new(5_000);
1202        m.register("fast", None);
1203        m.register("slow", None);
1204        m.heartbeat("fast", 2_000_000).unwrap();
1205        m.heartbeat("slow", 1_000_000).unwrap();
1206        assert_eq!(m.lag_ms(), Some(1_000_000));
1207    }
1208
1209    // ── HealthMonitor::total_tick_count ───────────────────────────────────────
1210
1211    #[test]
1212    fn test_total_tick_count_zero_initially() {
1213        let m = HealthMonitor::new(5_000);
1214        m.register("A", None);
1215        m.register("B", None);
1216        assert_eq!(m.total_tick_count(), 0);
1217    }
1218
1219    #[test]
1220    fn test_total_tick_count_sums_across_feeds() {
1221        let m = HealthMonitor::new(5_000);
1222        m.register("A", None);
1223        m.register("B", None);
1224        m.heartbeat("A", 1_000_000).unwrap();
1225        m.heartbeat("A", 1_001_000).unwrap();
1226        m.heartbeat("B", 1_000_000).unwrap();
1227        assert_eq!(m.total_tick_count(), 3);
1228    }
1229
1230    // ── HealthMonitor::healthy_feeds ──────────────────────────────────────────
1231
1232    #[test]
1233    fn test_healthy_feeds_empty_initially() {
1234        let m = HealthMonitor::new(5_000);
1235        m.register("A", None); // Unknown, not Healthy
1236        assert!(m.healthy_feeds().is_empty());
1237    }
1238
1239    #[test]
1240    fn test_healthy_feeds_after_heartbeat() {
1241        let m = HealthMonitor::new(5_000);
1242        m.register("A", None);
1243        m.register("B", None);
1244        m.heartbeat("A", 1_000_000).unwrap();
1245        m.check_all(1_001_000); // A healthy (1ms < 5s), B unknown
1246        let healthy = m.healthy_feeds();
1247        assert_eq!(healthy, vec!["A".to_string()]);
1248    }
1249
1250    #[test]
1251    fn test_healthy_feeds_sorted() {
1252        let m = HealthMonitor::new(5_000);
1253        m.register("C", None);
1254        m.register("A", None);
1255        m.register("B", None);
1256        m.heartbeat("C", 1_000_000).unwrap();
1257        m.heartbeat("A", 1_000_000).unwrap();
1258        m.heartbeat("B", 1_000_000).unwrap();
1259        m.check_all(1_001_000);
1260        let healthy = m.healthy_feeds();
1261        assert_eq!(healthy, vec!["A".to_string(), "B".to_string(), "C".to_string()]);
1262    }
1263
1264    // ── HealthMonitor::unknown_count / FeedHealth::is_healthy ─────────────────
1265
1266    #[test]
1267    fn test_unknown_count_all_new_feeds() {
1268        let m = monitor();
1269        m.register("A", None);
1270        m.register("B", None);
1271        assert_eq!(m.unknown_count(), 2);
1272    }
1273
1274    #[test]
1275    fn test_unknown_count_decreases_after_heartbeat() {
1276        let m = monitor();
1277        m.register("A", None);
1278        m.register("B", None);
1279        m.heartbeat("A", 1_000).unwrap();
1280        assert_eq!(m.unknown_count(), 1); // B is still Unknown
1281    }
1282
1283    #[test]
1284    fn test_unknown_count_zero_when_all_healthy() {
1285        let m = monitor();
1286        m.register("A", None);
1287        m.heartbeat("A", 1_000).unwrap();
1288        assert_eq!(m.unknown_count(), 0);
1289    }
1290
1291    #[test]
1292    fn test_feed_health_is_healthy_true_after_heartbeat() {
1293        let m = monitor();
1294        m.register("X", None);
1295        m.heartbeat("X", 1_000).unwrap();
1296        let fh = m.get("X").unwrap();
1297        assert!(fh.is_healthy());
1298    }
1299
1300    #[test]
1301    fn test_feed_health_is_healthy_false_when_unknown() {
1302        let m = monitor();
1303        m.register("X", None);
1304        let fh = m.get("X").unwrap();
1305        assert!(!fh.is_healthy());
1306    }
1307
1308    // ── HealthMonitor::most_stale_feed ────────────────────────────────────────
1309
1310    #[test]
1311    fn test_most_stale_feed_none_when_no_feeds() {
1312        let m = HealthMonitor::new(5_000);
1313        assert!(m.most_stale_feed().is_none());
1314    }
1315
1316    #[test]
1317    fn test_most_stale_feed_returns_unticked_feed_first() {
1318        let m = HealthMonitor::new(5_000);
1319        m.register("A", None);
1320        m.register("B", None);
1321        m.heartbeat("A", 1_000_000).unwrap();
1322        // B never received a tick → most stale
1323        let stale = m.most_stale_feed().unwrap();
1324        assert_eq!(stale.feed_id, "B");
1325    }
1326
1327    #[test]
1328    fn test_most_stale_feed_returns_oldest_last_tick() {
1329        let m = HealthMonitor::new(5_000);
1330        m.register("A", None);
1331        m.register("B", None);
1332        m.heartbeat("A", 1_000).unwrap();
1333        m.heartbeat("B", 5_000).unwrap();
1334        // A has older last_tick_ms → most stale
1335        let stale = m.most_stale_feed().unwrap();
1336        assert_eq!(stale.feed_id, "A");
1337    }
1338
1339    #[test]
1340    fn test_stale_feeds_empty_when_all_healthy() {
1341        let m = monitor();
1342        m.register("A", None);
1343        m.heartbeat("A", 1_000).unwrap();
1344        assert!(m.stale_feeds().is_empty());
1345    }
1346
1347    #[test]
1348    fn test_stale_feeds_returns_all_stale() {
1349        // stale_timeout = 5_000 ms; heartbeat A at 1_000, then check at 10_000
1350        let m = monitor();
1351        m.register("A", None);
1352        m.register("B", None);
1353        m.heartbeat("A", 1_000).unwrap();
1354        m.heartbeat("B", 9_500).unwrap();
1355        m.check_all(10_000);
1356        let stale = m.stale_feeds();
1357        assert_eq!(stale.len(), 1);
1358        assert_eq!(stale[0].feed_id, "A");
1359    }
1360
1361    // ── HealthMonitor::register_many ──────────────────────────────────────────
1362
1363    #[test]
1364    fn test_register_many_creates_all_feeds() {
1365        let m = monitor();
1366        m.register_many(&["BTC-USD", "ETH-USD", "SOL-USD"], None);
1367        assert_eq!(m.feed_count(), 3);
1368    }
1369
1370    #[test]
1371    fn test_register_many_custom_threshold_applies() {
1372        let m = monitor();
1373        m.register_many(&["A", "B"], Some(1_000));
1374        m.heartbeat("A", 1_000_000).unwrap();
1375        let errors = m.check_all(1_002_000); // 2s > 1s → A stale
1376        assert!(!errors.is_empty());
1377    }
1378
1379    #[test]
1380    fn test_register_many_empty_slice_is_noop() {
1381        let m = monitor();
1382        m.register_many(&[], None);
1383        assert_eq!(m.feed_count(), 0);
1384    }
1385
1386    // ── FeedHealth::is_stale ──────────────────────────────────────────────────
1387
1388    #[test]
1389    fn test_feed_health_is_stale_true_when_stale() {
1390        let m = monitor();
1391        m.register("BTC-USD", None);
1392        m.heartbeat("BTC-USD", 1_000_000).unwrap();
1393        m.check_all(1_010_000); // 10s > 5s → stale
1394        assert!(m.get("BTC-USD").unwrap().is_stale());
1395    }
1396
1397    #[test]
1398    fn test_feed_health_is_stale_false_when_healthy() {
1399        let m = monitor();
1400        m.register("BTC-USD", None);
1401        m.heartbeat("BTC-USD", 1_000_000).unwrap();
1402        assert!(!m.get("BTC-USD").unwrap().is_stale());
1403    }
1404
1405    // ── HealthMonitor::avg_tick_count ─────────────────────────────────────────
1406
1407    #[test]
1408    fn test_avg_tick_count_zero_with_no_feeds() {
1409        let m = HealthMonitor::new(5_000);
1410        assert!((m.avg_tick_count() - 0.0).abs() < 1e-9);
1411    }
1412
1413    #[test]
1414    fn test_avg_tick_count_with_equal_ticks() {
1415        let m = HealthMonitor::new(5_000);
1416        m.register("A", None);
1417        m.register("B", None);
1418        m.heartbeat("A", 1_000).unwrap();
1419        m.heartbeat("B", 1_000).unwrap();
1420        assert!((m.avg_tick_count() - 1.0).abs() < 1e-9);
1421    }
1422
1423    #[test]
1424    fn test_avg_tick_count_with_different_ticks() {
1425        let m = HealthMonitor::new(5_000);
1426        m.register("A", None);
1427        m.register("B", None);
1428        m.heartbeat("A", 1_000).unwrap();
1429        m.heartbeat("A", 2_000).unwrap();
1430        m.heartbeat("A", 3_000).unwrap();
1431        m.heartbeat("B", 1_000).unwrap();
1432        // A=3, B=1 → avg=2.0
1433        assert!((m.avg_tick_count() - 2.0).abs() < 1e-9);
1434    }
1435
1436    // ── HealthMonitor::max_consecutive_stale ──────────────────────────────────
1437
1438    #[test]
1439    fn test_max_consecutive_stale_zero_with_no_feeds() {
1440        let m = HealthMonitor::new(5_000);
1441        assert_eq!(m.max_consecutive_stale(), 0);
1442    }
1443
1444    #[test]
1445    fn test_max_consecutive_stale_picks_highest() {
1446        let m = HealthMonitor::new(1_000);
1447        m.register("A", None);
1448        m.register("B", None);
1449        // Give each feed a heartbeat, then let them go stale
1450        m.heartbeat("A", 1_000).unwrap();
1451        m.heartbeat("B", 1_000).unwrap();
1452        // 3s later → both exceed 1s threshold
1453        m.check_all(4_000);
1454        m.check_all(5_000);
1455        // Both feeds are stale; consecutive_stale should be at least 2
1456        assert!(m.max_consecutive_stale() >= 2);
1457    }
1458
1459    #[test]
1460    fn test_max_consecutive_stale_zero_after_heartbeat() {
1461        let m = HealthMonitor::new(1_000);
1462        m.register("A", None);
1463        m.check_all(10_000);
1464        m.heartbeat("A", 10_001).unwrap();
1465        assert_eq!(m.max_consecutive_stale(), 0);
1466    }
1467
1468    #[test]
1469    fn test_status_summary_all_unknown() {
1470        let m = monitor();
1471        m.register("A", None);
1472        m.register("B", None);
1473        let (healthy, stale, unknown) = m.status_summary();
1474        assert_eq!(healthy, 0);
1475        assert_eq!(stale, 0);
1476        assert_eq!(unknown, 2);
1477    }
1478
1479    #[test]
1480    fn test_status_summary_mixed() {
1481        let m = monitor();
1482        m.register("A", None);
1483        m.register("B", None);
1484        m.register("C", None);
1485        m.heartbeat("A", 1_000).unwrap();
1486        m.heartbeat("B", 5_000).unwrap();
1487        // A is stale (tick at 1_000, check at 10_000 > 5_000 threshold), B is healthy, C unknown
1488        m.check_all(10_000);
1489        let (healthy, stale, unknown) = m.status_summary();
1490        assert_eq!(stale, 1);
1491        assert_eq!(unknown, 1);
1492        assert_eq!(healthy, 1);
1493    }
1494
1495    // --- feeds_by_status ---
1496
1497    #[test]
1498    fn test_feeds_by_status_returns_only_matching_status() {
1499        let mut m = monitor();
1500        m.register("A", None);
1501        m.register("B", None);
1502        // A: old heartbeat → stale; B: recent heartbeat → healthy
1503        m.heartbeat("A", 1_000).unwrap();
1504        m.heartbeat("B", 9_000).unwrap();
1505        m.check_all(10_000); // A elapsed=9000>5000 → Stale; B elapsed=1000<5000 → Healthy
1506        let stale = m.feeds_by_status(HealthStatus::Stale);
1507        assert_eq!(stale.len(), 1);
1508        assert_eq!(stale[0].feed_id, "A");
1509    }
1510
1511    #[test]
1512    fn test_feeds_by_status_empty_when_none_match() {
1513        let mut m = monitor();
1514        m.register("A", None);
1515        m.heartbeat("A", 1_000).unwrap();
1516        m.check_all(2_000);
1517        // A is healthy; no unknown feeds remain
1518        let unknown = m.feeds_by_status(HealthStatus::Unknown);
1519        assert!(unknown.is_empty());
1520    }
1521
1522    #[test]
1523    fn test_feeds_by_status_all_start_as_unknown() {
1524        let mut m = monitor();
1525        m.register("A", None);
1526        m.register("B", None);
1527        let unknown = m.feeds_by_status(HealthStatus::Unknown);
1528        assert_eq!(unknown.len(), 2);
1529    }
1530
1531    // ── HealthMonitor::unknown_feed_count ─────────────────────────────────────
1532
1533    #[test]
1534    fn test_unknown_feed_count_all_unknown_at_start() {
1535        let m = HealthMonitor::new(5_000);
1536        m.register("A", None);
1537        m.register("B", None);
1538        assert_eq!(m.unknown_feed_count(), 2);
1539    }
1540
1541    #[test]
1542    fn test_unknown_feed_count_decreases_after_heartbeat() {
1543        let m = HealthMonitor::new(5_000);
1544        m.register("A", None);
1545        m.register("B", None);
1546        m.heartbeat("A", 1_000).unwrap();
1547        assert_eq!(m.unknown_feed_count(), 1);
1548    }
1549
1550    #[test]
1551    fn test_unknown_feed_count_zero_with_no_feeds() {
1552        let m = HealthMonitor::new(5_000);
1553        assert_eq!(m.unknown_feed_count(), 0);
1554    }
1555
1556    // ── HealthMonitor::all_healthy ────────────────────────────────────────────
1557
1558    #[test]
1559    fn test_all_healthy_vacuously_true_with_no_feeds() {
1560        // all_healthy is vacuously true when no feeds are registered
1561        let m = HealthMonitor::new(5_000);
1562        assert!(m.all_healthy());
1563    }
1564
1565    #[test]
1566    fn test_all_healthy_true_when_all_feeds_healthy() {
1567        let m = HealthMonitor::new(5_000);
1568        m.register("A", None);
1569        m.register("B", None);
1570        m.heartbeat("A", 1_000).unwrap();
1571        m.heartbeat("B", 1_000).unwrap();
1572        assert!(m.all_healthy());
1573    }
1574
1575    #[test]
1576    fn test_all_healthy_false_when_one_feed_unknown() {
1577        let m = HealthMonitor::new(5_000);
1578        m.register("A", None);
1579        m.register("B", None);
1580        m.heartbeat("A", 1_000).unwrap();
1581        // B still unknown
1582        assert!(!m.all_healthy());
1583    }
1584
1585    // --- oldest_stale_feed / healthy_ratio ---
1586
1587    #[test]
1588    fn test_oldest_stale_feed_returns_feed_with_smallest_last_tick() {
1589        let mut m = monitor();
1590        m.register("A", None);
1591        m.register("B", None);
1592        m.heartbeat("A", 1_000).unwrap(); // older tick
1593        m.heartbeat("B", 3_000).unwrap(); // newer tick
1594        m.check_all(10_000); // both stale (>5s threshold)
1595        let oldest = m.oldest_stale_feed().unwrap();
1596        assert_eq!(oldest.feed_id, "A");
1597    }
1598
1599    #[test]
1600    fn test_oldest_stale_feed_none_when_no_stale_feeds() {
1601        let mut m = monitor();
1602        m.register("A", None);
1603        m.heartbeat("A", 9_000).unwrap();
1604        m.check_all(10_000); // 1s elapsed < 5s threshold → healthy
1605        assert!(m.oldest_stale_feed().is_none());
1606    }
1607
1608    #[test]
1609    fn test_healthy_ratio_zero_when_no_feeds() {
1610        let m = monitor();
1611        assert_eq!(m.healthy_ratio(), 0.0);
1612    }
1613
1614    #[test]
1615    fn test_healthy_ratio_one_when_all_healthy() {
1616        let mut m = monitor();
1617        m.register("A", None);
1618        m.register("B", None);
1619        m.heartbeat("A", 9_000).unwrap();
1620        m.heartbeat("B", 9_500).unwrap();
1621        m.check_all(10_000); // both healthy
1622        assert!((m.healthy_ratio() - 1.0).abs() < 1e-10);
1623    }
1624
1625    #[test]
1626    fn test_healthy_ratio_half_when_one_of_two_healthy() {
1627        let mut m = monitor();
1628        m.register("A", None);
1629        m.register("B", None);
1630        m.heartbeat("A", 1_000).unwrap(); // stale
1631        m.heartbeat("B", 9_500).unwrap(); // healthy
1632        m.check_all(10_000);
1633        assert!((m.healthy_ratio() - 0.5).abs() < 1e-10);
1634    }
1635
1636    // --- most_reliable_feed / feeds_never_seen ---
1637
1638    #[test]
1639    fn test_most_reliable_feed_returns_highest_tick_count() {
1640        let mut m = monitor();
1641        m.register("A", None);
1642        m.register("B", None);
1643        m.heartbeat("A", 1_000).unwrap();
1644        m.heartbeat("B", 1_100).unwrap();
1645        m.heartbeat("B", 1_200).unwrap();
1646        // B has 2 ticks, A has 1
1647        let best = m.most_reliable_feed().unwrap();
1648        assert_eq!(best.feed_id, "B");
1649    }
1650
1651    #[test]
1652    fn test_most_reliable_feed_none_when_no_feeds() {
1653        let m = monitor();
1654        assert!(m.most_reliable_feed().is_none());
1655    }
1656
1657    #[test]
1658    fn test_feeds_never_seen_returns_feeds_with_no_heartbeat() {
1659        let mut m = monitor();
1660        m.register("A", None);
1661        m.register("B", None);
1662        m.heartbeat("A", 1_000).unwrap();
1663        // B has no heartbeat
1664        let never_seen = m.feeds_never_seen();
1665        assert_eq!(never_seen.len(), 1);
1666        assert_eq!(never_seen[0].feed_id, "B");
1667    }
1668
1669    #[test]
1670    fn test_feeds_never_seen_empty_when_all_have_heartbeat() {
1671        let mut m = monitor();
1672        m.register("A", None);
1673        m.heartbeat("A", 1_000).unwrap();
1674        assert!(m.feeds_never_seen().is_empty());
1675    }
1676
1677    // --- is_any_feed_stale / all_feeds_seen ---
1678
1679    #[test]
1680    fn test_is_any_feed_stale_true_when_stale_feed_exists() {
1681        let mut m = monitor();
1682        m.register("A", None);
1683        m.heartbeat("A", 1_000).unwrap();
1684        m.check_all(10_000); // elapsed > threshold → stale
1685        assert!(m.is_any_feed_stale());
1686    }
1687
1688    #[test]
1689    fn test_is_any_feed_stale_false_when_all_healthy() {
1690        let mut m = monitor();
1691        m.register("A", None);
1692        m.heartbeat("A", 9_500).unwrap();
1693        m.check_all(10_000); // 500ms elapsed → healthy
1694        assert!(!m.is_any_feed_stale());
1695    }
1696
1697    #[test]
1698    fn test_is_any_feed_stale_false_when_no_feeds() {
1699        let m = monitor();
1700        assert!(!m.is_any_feed_stale());
1701    }
1702
1703    #[test]
1704    fn test_all_feeds_seen_true_when_all_have_heartbeat() {
1705        let mut m = monitor();
1706        m.register("A", None);
1707        m.register("B", None);
1708        m.heartbeat("A", 1_000).unwrap();
1709        m.heartbeat("B", 2_000).unwrap();
1710        assert!(m.all_feeds_seen());
1711    }
1712
1713    #[test]
1714    fn test_all_feeds_seen_false_when_one_never_seen() {
1715        let mut m = monitor();
1716        m.register("A", None);
1717        m.register("B", None);
1718        m.heartbeat("A", 1_000).unwrap();
1719        assert!(!m.all_feeds_seen());
1720    }
1721
1722    #[test]
1723    fn test_all_feeds_seen_true_vacuously_when_no_feeds() {
1724        let m = monitor();
1725        assert!(m.all_feeds_seen());
1726    }
1727
1728    // --- tick_count_for / average_tick_count ---
1729
1730    #[test]
1731    fn test_tick_count_for_returns_correct_count() {
1732        let mut m = monitor();
1733        m.register("A", None);
1734        m.heartbeat("A", 1_000).unwrap();
1735        m.heartbeat("A", 2_000).unwrap();
1736        assert_eq!(m.tick_count_for("A"), Some(2));
1737    }
1738
1739    #[test]
1740    fn test_tick_count_for_none_when_not_registered() {
1741        let m = monitor();
1742        assert!(m.tick_count_for("nonexistent").is_none());
1743    }
1744
1745    #[test]
1746    fn test_tick_count_for_zero_when_no_heartbeats() {
1747        let mut m = monitor();
1748        m.register("A", None);
1749        assert_eq!(m.tick_count_for("A"), Some(0));
1750    }
1751
1752    #[test]
1753    fn test_average_tick_count_zero_when_no_feeds() {
1754        let m = monitor();
1755        assert_eq!(m.average_tick_count(), 0.0);
1756    }
1757
1758    #[test]
1759    fn test_average_tick_count_correct_value() {
1760        let mut m = monitor();
1761        m.register("A", None);
1762        m.register("B", None);
1763        m.heartbeat("A", 1_000).unwrap();
1764        m.heartbeat("A", 2_000).unwrap(); // A: 2 ticks
1765        m.heartbeat("B", 1_000).unwrap(); // B: 1 tick
1766        // avg = (2 + 1) / 2 = 1.5
1767        assert!((m.average_tick_count() - 1.5).abs() < 1e-10);
1768    }
1769
1770    // --- HealthMonitor::feeds_above_tick_count ---
1771    #[test]
1772    fn test_feeds_above_tick_count_correct() {
1773        let mut m = monitor();
1774        m.register("A", None);
1775        m.register("B", None);
1776        m.register("C", None);
1777        m.heartbeat("A", 1_000).unwrap();
1778        m.heartbeat("A", 2_000).unwrap();
1779        m.heartbeat("A", 3_000).unwrap(); // A: 3 ticks
1780        m.heartbeat("B", 1_000).unwrap(); // B: 1 tick
1781        // threshold=1: A(3) and B(1) → only A > 1
1782        // Wait: "above" means > threshold
1783        assert_eq!(m.feeds_above_tick_count(1), 1);
1784        assert_eq!(m.feeds_above_tick_count(0), 2);
1785        assert_eq!(m.feeds_above_tick_count(5), 0);
1786    }
1787
1788    #[test]
1789    fn test_feeds_above_tick_count_zero_when_no_feeds() {
1790        let m = HealthMonitor::new(5_000);
1791        assert_eq!(m.feeds_above_tick_count(0), 0);
1792    }
1793
1794    // --- HealthMonitor::oldest_feed_age_ms ---
1795    #[test]
1796    fn test_oldest_feed_age_ms_returns_max_age() {
1797        let mut m = monitor();
1798        m.register("A", None);
1799        m.register("B", None);
1800        m.heartbeat("A", 5_000).unwrap();
1801        m.heartbeat("B", 8_000).unwrap();
1802        // A is older (ts=5000), B more recent (ts=8000)
1803        // At now=10_000: A age=5000, B age=2000 → oldest = 5000
1804        assert_eq!(m.oldest_feed_age_ms(10_000), Some(5_000));
1805    }
1806
1807    #[test]
1808    fn test_oldest_feed_age_ms_none_when_no_ticks() {
1809        let mut m = monitor();
1810        m.register("A", None);
1811        assert!(m.oldest_feed_age_ms(10_000).is_none());
1812    }
1813
1814    // --- HealthMonitor::total_stale_count ---
1815    #[test]
1816    fn test_total_stale_count_zero_when_all_healthy() {
1817        let mut m = HealthMonitor::new(5_000);
1818        m.register("A", None);
1819        m.heartbeat("A", 9_500).unwrap();
1820        let _ = m.check_all(10_000);
1821        assert_eq!(m.total_stale_count(), 0);
1822    }
1823
1824    #[test]
1825    fn test_total_stale_count_correct_when_stale() {
1826        let mut m = HealthMonitor::new(5_000);
1827        m.register("A", None);
1828        m.register("B", None);
1829        m.heartbeat("A", 1_000).unwrap();
1830        m.heartbeat("B", 1_000).unwrap();
1831        // at t=10000, both stale (9s elapsed > 5s threshold)
1832        let _ = m.check_all(10_000);
1833        assert_eq!(m.total_stale_count(), 2);
1834    }
1835
1836    // --- HealthMonitor::avg_feed_age_ms ---
1837    #[test]
1838    fn test_avg_feed_age_ms_none_when_no_ticks() {
1839        let mut m = monitor();
1840        m.register("A", None);
1841        assert!(m.avg_feed_age_ms(10_000).is_none());
1842    }
1843
1844    #[test]
1845    fn test_avg_feed_age_ms_correct_average() {
1846        let mut m = monitor();
1847        m.register("A", None);
1848        m.register("B", None);
1849        m.heartbeat("A", 5_000).unwrap(); // age at t=10000: 5000
1850        m.heartbeat("B", 8_000).unwrap(); // age at t=10000: 2000
1851        // avg = (5000 + 2000) / 2 = 3500
1852        let avg = m.avg_feed_age_ms(10_000).unwrap();
1853        assert!((avg - 3500.0).abs() < 1e-10, "got {avg}");
1854    }
1855
1856    // ── HealthMonitor::stale_ratio ────────────────────────────────────────────
1857
1858    #[test]
1859    fn test_stale_ratio_zero_with_no_feeds() {
1860        let m = HealthMonitor::new(5_000);
1861        assert_eq!(m.stale_ratio(), 0.0);
1862    }
1863
1864    #[test]
1865    fn test_stale_ratio_one_when_all_stale() {
1866        let m = HealthMonitor::new(1_000);
1867        m.register("A", None);
1868        m.register("B", None);
1869        m.heartbeat("A", 1_000).unwrap();
1870        m.heartbeat("B", 1_000).unwrap();
1871        m.check_all(5_000);
1872        assert!((m.stale_ratio() - 1.0).abs() < 1e-10);
1873    }
1874
1875    #[test]
1876    fn test_stale_ratio_half_when_one_of_two_stale() {
1877        let m = HealthMonitor::new(1_000);
1878        m.register("A", None);
1879        m.register("B", None);
1880        m.heartbeat("A", 1_000).unwrap();
1881        m.heartbeat("B", 9_500).unwrap();
1882        m.check_all(10_000);
1883        assert!((m.stale_ratio() - 0.5).abs() < 1e-10);
1884    }
1885
1886    // ── HealthMonitor::has_any_unknown ────────────────────────────────────────
1887
1888    #[test]
1889    fn test_has_any_unknown_true_for_fresh_feed() {
1890        let m = HealthMonitor::new(5_000);
1891        m.register("feed", None);
1892        // Newly registered feed starts as Unknown
1893        assert!(m.has_any_unknown());
1894    }
1895
1896    #[test]
1897    fn test_has_any_unknown_false_after_heartbeat() {
1898        let m = HealthMonitor::new(5_000);
1899        m.register("feed", None);
1900        m.heartbeat("feed", 1_000).unwrap();
1901        assert!(!m.has_any_unknown());
1902    }
1903
1904    #[test]
1905    fn test_has_any_unknown_false_with_no_feeds() {
1906        let m = HealthMonitor::new(5_000);
1907        assert!(!m.has_any_unknown());
1908    }
1909
1910    // ── HealthMonitor::is_degraded ────────────────────────────────────────────
1911
1912    #[test]
1913    fn test_is_degraded_true_when_some_unhealthy() {
1914        let m = HealthMonitor::new(5_000);
1915        m.register("A", None);
1916        m.register("B", None);
1917        m.heartbeat("A", 9_500).unwrap(); // recent → healthy
1918        m.heartbeat("B", 1_000).unwrap(); // old → stale
1919        m.check_all(10_000);
1920        // A: healthy, B: stale → is_degraded
1921        assert!(m.is_degraded());
1922    }
1923
1924    #[test]
1925    fn test_is_degraded_false_when_all_healthy() {
1926        let m = HealthMonitor::new(5_000);
1927        m.register("A", None);
1928        m.heartbeat("A", 1_000).unwrap();
1929        assert!(!m.is_degraded());
1930    }
1931
1932    #[test]
1933    fn test_is_degraded_false_with_no_feeds() {
1934        let m = HealthMonitor::new(5_000);
1935        assert!(!m.is_degraded());
1936    }
1937
1938    // ── HealthMonitor::unhealthy_count / feed_exists ────────────────────────
1939
1940    #[test]
1941    fn test_unhealthy_count_all_unknown() {
1942        let m = HealthMonitor::new(5_000);
1943        m.register("A", None);
1944        m.register("B", None);
1945        // Both unknown → unhealthy_count = 2
1946        assert_eq!(m.unhealthy_count(), 2);
1947    }
1948
1949    #[test]
1950    fn test_unhealthy_count_one_healthy() {
1951        let m = HealthMonitor::new(5_000);
1952        m.register("A", None);
1953        m.register("B", None);
1954        m.heartbeat("A", 1_000).unwrap();
1955        // A is healthy (just received heartbeat), B still unknown
1956        assert_eq!(m.unhealthy_count(), 1);
1957    }
1958
1959    #[test]
1960    fn test_unhealthy_count_zero_when_empty() {
1961        let m = HealthMonitor::new(5_000);
1962        assert_eq!(m.unhealthy_count(), 0);
1963    }
1964
1965    #[test]
1966    fn test_feed_exists_true_after_register() {
1967        let m = HealthMonitor::new(5_000);
1968        m.register("BTC-USD", None);
1969        assert!(m.feed_exists("BTC-USD"));
1970    }
1971
1972    #[test]
1973    fn test_feed_exists_false_for_unknown_feed() {
1974        let m = HealthMonitor::new(5_000);
1975        assert!(!m.feed_exists("ETH-USD"));
1976    }
1977
1978    // ── HealthMonitor::most_stale_feed ─────────────────────────────────────
1979
1980    #[test]
1981    fn test_most_stale_feed_returns_oldest_feed() {
1982        let m = HealthMonitor::new(5_000);
1983        m.register("A", None);
1984        m.register("B", None);
1985        m.heartbeat("A", 1_000).unwrap(); // older tick
1986        m.heartbeat("B", 9_000).unwrap(); // newer tick
1987        // "A" has the oldest tick → it is the most stale
1988        let stale = m.most_stale_feed().unwrap();
1989        assert_eq!(stale.feed_id, "A");
1990    }
1991
1992    #[test]
1993    fn test_most_stale_feed_some_with_single_feed() {
1994        let m = HealthMonitor::new(5_000);
1995        m.register("A", None);
1996        m.heartbeat("A", 1_000).unwrap();
1997        assert!(m.most_stale_feed().is_some());
1998    }
1999
2000    #[test]
2001    fn test_most_stale_feed_none_when_empty() {
2002        let m = HealthMonitor::new(5_000);
2003        assert!(m.most_stale_feed().is_none());
2004    }
2005
2006    // ── HealthMonitor::stale_ratio_excluding_unknown ────────────────────────
2007
2008    #[test]
2009    fn test_stale_ratio_excl_unknown_zero_when_empty() {
2010        let m = HealthMonitor::new(5_000);
2011        assert_eq!(m.stale_ratio_excluding_unknown(), 0.0);
2012    }
2013
2014    #[test]
2015    fn test_stale_ratio_excl_unknown_zero_when_all_unknown() {
2016        let m = HealthMonitor::new(5_000);
2017        m.register("A", None);
2018        m.register("B", None);
2019        // Both unknown → excluded → ratio = 0.0
2020        assert_eq!(m.stale_ratio_excluding_unknown(), 0.0);
2021    }
2022
2023    #[test]
2024    fn test_stale_ratio_excl_unknown_half_when_one_stale() {
2025        let m = HealthMonitor::new(5_000);
2026        m.register("A", None);
2027        m.register("B", None);
2028        m.heartbeat("A", 9_500).unwrap(); // recent → healthy
2029        m.heartbeat("B", 1_000).unwrap(); // old → stale
2030        m.check_all(10_000);
2031        // A=Healthy, B=Stale → 1 stale of 2 known = 0.5
2032        assert!((m.stale_ratio_excluding_unknown() - 0.5).abs() < 1e-10);
2033    }
2034
2035    // ── HealthMonitor::any_unknown ────────────────────────────────────────────
2036
2037    #[test]
2038    fn test_any_unknown_false_when_empty() {
2039        let m = HealthMonitor::new(5_000);
2040        assert!(!m.any_unknown());
2041    }
2042
2043    #[test]
2044    fn test_any_unknown_true_when_feed_registered_but_no_heartbeat() {
2045        let m = HealthMonitor::new(5_000);
2046        m.register("BTC-USD", None);
2047        assert!(m.any_unknown());
2048    }
2049
2050    #[test]
2051    fn test_any_unknown_false_when_all_have_heartbeats() {
2052        let m = HealthMonitor::new(5_000);
2053        m.register("BTC-USD", None);
2054        m.heartbeat("BTC-USD", 1_000).unwrap();
2055        assert!(!m.any_unknown());
2056    }
2057
2058    // ── HealthMonitor::degraded_count ─────────────────────────────────────────
2059
2060    #[test]
2061    fn test_degraded_count_zero_when_all_healthy() {
2062        let m = HealthMonitor::new(5_000);
2063        m.register("A", None);
2064        m.heartbeat("A", 9_500).unwrap();
2065        m.check_all(10_000);
2066        assert_eq!(m.degraded_count(), 0);
2067    }
2068
2069    #[test]
2070    fn test_degraded_count_one_when_one_stale() {
2071        let m = HealthMonitor::new(5_000);
2072        m.register("A", None);
2073        m.register("B", None);
2074        m.heartbeat("A", 9_500).unwrap(); // healthy
2075        m.heartbeat("B", 1_000).unwrap(); // stale
2076        m.check_all(10_000);
2077        assert_eq!(m.degraded_count(), 1);
2078    }
2079
2080    #[test]
2081    fn test_degraded_count_zero_when_empty() {
2082        let m = HealthMonitor::new(5_000);
2083        assert_eq!(m.degraded_count(), 0);
2084    }
2085
2086    // ── HealthMonitor::min_healthy_age_ms ────────────────────────────────────
2087
2088    #[test]
2089    fn test_min_healthy_age_ms_none_when_no_healthy_feeds() {
2090        let m = HealthMonitor::new(5_000);
2091        m.register("A", None);
2092        // No heartbeat → Unknown, not Healthy
2093        assert!(m.min_healthy_age_ms(10_000).is_none());
2094    }
2095
2096    #[test]
2097    fn test_min_healthy_age_ms_returns_most_recent() {
2098        let m = HealthMonitor::new(5_000);
2099        m.register("A", None);
2100        m.register("B", None);
2101        m.heartbeat("A", 8_000).unwrap(); // 2s ago at now=10000
2102        m.heartbeat("B", 9_000).unwrap(); // 1s ago at now=10000
2103        m.check_all(10_000);
2104        // B is more recent → min age = 1000ms
2105        assert_eq!(m.min_healthy_age_ms(10_000), Some(1_000));
2106    }
2107
2108    // ── HealthMonitor::healthy_feed_ids ───────────────────────────────────────
2109
2110    #[test]
2111    fn test_healthy_feed_ids_empty_when_no_feeds() {
2112        let m = HealthMonitor::new(5_000);
2113        assert!(m.healthy_feed_ids().is_empty());
2114    }
2115
2116    #[test]
2117    fn test_healthy_feed_ids_returns_healthy_only() {
2118        let m = HealthMonitor::new(5_000);
2119        m.register("A", None);
2120        m.register("B", None);
2121        m.heartbeat("A", 9_500).unwrap(); // healthy
2122        // B has no heartbeat → unknown
2123        m.check_all(10_000);
2124        let ids = m.healthy_feed_ids();
2125        assert_eq!(ids, vec!["A".to_string()]);
2126    }
2127
2128    // ── HealthMonitor::time_since_last_heartbeat ──────────────────────────────
2129
2130    #[test]
2131    fn test_time_since_last_heartbeat_correct() {
2132        let m = HealthMonitor::new(5_000);
2133        m.register("BTC", None);
2134        m.heartbeat("BTC", 9_000).unwrap();
2135        assert_eq!(m.time_since_last_heartbeat("BTC", 10_000), Some(1_000));
2136    }
2137
2138    #[test]
2139    fn test_time_since_last_heartbeat_none_when_no_tick() {
2140        let m = HealthMonitor::new(5_000);
2141        m.register("ETH", None);
2142        assert!(m.time_since_last_heartbeat("ETH", 10_000).is_none());
2143    }
2144
2145    #[test]
2146    fn test_time_since_last_heartbeat_none_when_unknown_feed() {
2147        let m = HealthMonitor::new(5_000);
2148        assert!(m.time_since_last_heartbeat("MISSING", 10_000).is_none());
2149    }
2150
2151    // ── register_batch ────────────────────────────────────────────────────────
2152
2153    #[test]
2154    fn test_register_batch_registers_all_feeds() {
2155        let m = HealthMonitor::new(5_000);
2156        m.register_batch(&[("BTC", 1_000), ("ETH", 2_000), ("SOL", 3_000)]);
2157        assert!(m.feed_exists("BTC"));
2158        assert!(m.feed_exists("ETH"));
2159        assert!(m.feed_exists("SOL"));
2160    }
2161
2162    #[test]
2163    fn test_register_batch_uses_custom_thresholds() {
2164        let m = HealthMonitor::new(10_000);
2165        m.register_batch(&[("BTC", 500)]);
2166        // Heartbeat at t=0, check at t=600 → should be stale (threshold=500)
2167        m.heartbeat("BTC", 0).unwrap();
2168        m.check_all(600);
2169        assert_eq!(m.stale_count(), 1);
2170    }
2171
2172    #[test]
2173    fn test_register_batch_empty_slice_is_noop() {
2174        let m = HealthMonitor::new(5_000);
2175        m.register_batch(&[]);
2176        assert_eq!(m.feed_count(), 0);
2177    }
2178
2179    // ── unknown_feed_ids ──────────────────────────────────────────────────────
2180
2181    #[test]
2182    fn test_unknown_feed_ids_all_new_feeds_are_unknown() {
2183        let m = HealthMonitor::new(5_000);
2184        m.register("BTC", None);
2185        m.register("ETH", None);
2186        let ids = m.unknown_feed_ids();
2187        assert!(ids.contains(&"BTC".to_string()));
2188        assert!(ids.contains(&"ETH".to_string()));
2189    }
2190
2191    #[test]
2192    fn test_unknown_feed_ids_empty_after_heartbeat_and_check() {
2193        let m = HealthMonitor::new(5_000);
2194        m.register("BTC", None);
2195        m.heartbeat("BTC", 0).unwrap();
2196        m.check_all(100);
2197        assert!(m.unknown_feed_ids().is_empty());
2198    }
2199
2200    #[test]
2201    fn test_unknown_feed_ids_empty_when_no_feeds() {
2202        let m = HealthMonitor::new(5_000);
2203        assert!(m.unknown_feed_ids().is_empty());
2204    }
2205
2206    // ── feeds_needing_check ───────────────────────────────────────────────────
2207
2208    #[test]
2209    fn test_feeds_needing_check_returns_stale_and_unknown() {
2210        let m = HealthMonitor::new(5_000);
2211        m.register("BTC", None);  // Unknown
2212        m.register("ETH", None);
2213        m.heartbeat("ETH", 0).unwrap();
2214        m.check_all(100); // ETH → Healthy
2215        let needing = m.feeds_needing_check();
2216        assert!(needing.contains(&"BTC".to_string()));
2217        assert!(!needing.contains(&"ETH".to_string()));
2218    }
2219
2220    #[test]
2221    fn test_feeds_needing_check_empty_when_all_healthy() {
2222        let m = HealthMonitor::new(5_000);
2223        m.register("BTC", None);
2224        m.heartbeat("BTC", 0).unwrap();
2225        m.check_all(100);
2226        assert!(m.feeds_needing_check().is_empty());
2227    }
2228
2229    #[test]
2230    fn test_feeds_needing_check_sorted() {
2231        let m = HealthMonitor::new(5_000);
2232        m.register("ZZZ", None);
2233        m.register("AAA", None);
2234        let needing = m.feeds_needing_check();
2235        assert_eq!(needing, vec!["AAA".to_string(), "ZZZ".to_string()]);
2236    }
2237
2238    // ── ratio_healthy ─────────────────────────────────────────────────────────
2239
2240    #[test]
2241    fn test_ratio_healthy_zero_when_no_feeds() {
2242        let m = HealthMonitor::new(5_000);
2243        assert_eq!(m.ratio_healthy(), 0.0);
2244    }
2245
2246    #[test]
2247    fn test_ratio_healthy_zero_when_all_unknown() {
2248        let m = HealthMonitor::new(5_000);
2249        m.register("BTC", None);
2250        m.register("ETH", None);
2251        assert_eq!(m.ratio_healthy(), 0.0);
2252    }
2253
2254    #[test]
2255    fn test_ratio_healthy_one_when_all_healthy() {
2256        let m = HealthMonitor::new(5_000);
2257        m.register("BTC", None);
2258        m.heartbeat("BTC", 0).unwrap();
2259        m.check_all(100);
2260        assert_eq!(m.ratio_healthy(), 1.0);
2261    }
2262
2263    #[test]
2264    fn test_ratio_healthy_half_when_one_of_two() {
2265        let m = HealthMonitor::new(5_000);
2266        m.register("BTC", None);
2267        m.register("ETH", None);
2268        m.heartbeat("BTC", 0).unwrap();
2269        m.check_all(100);
2270        let ratio = m.ratio_healthy();
2271        assert!((ratio - 0.5).abs() < 1e-10);
2272    }
2273
2274    // ── total_tick_count ──────────────────────────────────────────────────────
2275
2276    #[test]
2277    fn test_total_tick_count_zero_when_no_feeds() {
2278        let m = HealthMonitor::new(5_000);
2279        assert_eq!(m.total_tick_count(), 0);
2280    }
2281
2282    #[test]
2283    fn test_total_tick_count_sums_all_feeds() {
2284        let m = HealthMonitor::new(5_000);
2285        m.register("BTC", None);
2286        m.register("ETH", None);
2287        m.heartbeat("BTC", 0).unwrap();
2288        m.heartbeat("BTC", 1).unwrap();
2289        m.heartbeat("ETH", 0).unwrap();
2290        assert_eq!(m.total_tick_count(), 3);
2291    }
2292
2293    // ── last_updated_feed_id ──────────────────────────────────────────────────
2294
2295    #[test]
2296    fn test_last_updated_feed_id_none_when_no_ticks() {
2297        let m = HealthMonitor::new(5_000);
2298        m.register("BTC", None);
2299        assert!(m.last_updated_feed_id().is_none());
2300    }
2301
2302    #[test]
2303    fn test_last_updated_feed_id_returns_most_recent() {
2304        let m = HealthMonitor::new(5_000);
2305        m.register("BTC", None);
2306        m.register("ETH", None);
2307        m.heartbeat("BTC", 100).unwrap();
2308        m.heartbeat("ETH", 200).unwrap(); // ETH more recent
2309        assert_eq!(m.last_updated_feed_id(), Some("ETH".to_string()));
2310    }
2311
2312    // ── is_any_stale ──────────────────────────────────────────────────────────
2313
2314    #[test]
2315    fn test_is_any_stale_false_when_no_feeds() {
2316        let m = HealthMonitor::new(5_000);
2317        assert!(!m.is_any_stale());
2318    }
2319
2320    #[test]
2321    fn test_is_any_stale_false_when_all_healthy() {
2322        let m = HealthMonitor::new(5_000);
2323        m.register("BTC", None);
2324        m.heartbeat("BTC", 0).unwrap();
2325        m.check_all(100);
2326        assert!(!m.is_any_stale());
2327    }
2328
2329    #[test]
2330    fn test_is_any_stale_true_when_stale_feed() {
2331        let m = HealthMonitor::new(5_000);
2332        m.register("BTC", None);
2333        m.heartbeat("BTC", 0).unwrap();
2334        m.check_all(10_000); // 10s elapsed, threshold=5s → stale
2335        assert!(m.is_any_stale());
2336    }
2337}