Skip to main content

dynomite/stats/
failure.rs

1//! Failure-cause counters for the dispatch and gossip planes.
2//!
3//! The pre-existing pool / server metrics maintained by
4//! [`crate::stats::Stats`] track aggregate request volume and a
5//! single coarse `errors` bucket. When operators need to tell
6//! "the cluster lost quorum because a peer was Down" apart from
7//! "the cluster lost quorum because the per-peer outbound
8//! channel was full", that single bucket is not enough.
9//!
10//! [`FailureMetrics`] supplements the existing counters with
11//! label-rich counters and gauges:
12//!
13//! * Per-cause dispatch error counters (no-targets, channel
14//!   full, channel closed, response timeout) keyed by the
15//!   labels operators want to slice by.
16//! * Per-peer state transition counts, for charting how often
17//!   gossip flips a peer between [`crate::cluster::peer::PeerState`]
18//!   variants.
19//! * Per-peer current-state and phi-score gauges, so a
20//!   dashboard can show the live failure-detector view.
21//!
22//! All counters initialise to zero. The accumulators take a
23//! single `parking_lot::Mutex` over the inner state; every
24//! observation is a hashmap insert and a tiny arithmetic
25//! update, so the lock is held for at most a handful of
26//! nanoseconds per call. The dispatch hot path is a direct
27//! method invocation on an `Arc<FailureMetrics>`; when the
28//! handle is `None` the dispatcher's call site short-circuits
29//! to a single null-pointer test (see
30//! [`crate::cluster::dispatch::ClusterDispatcher::with_failure_metrics`]).
31//!
32//! # Examples
33//!
34//! ```
35//! use dynomite::msg::ConsistencyLevel;
36//! use dynomite::stats::FailureMetrics;
37//!
38//! let m = FailureMetrics::new();
39//! m.record_no_targets("dc1", "rA", ConsistencyLevel::DcQuorum);
40//! let snap = m.snapshot();
41//! assert_eq!(snap.no_targets.len(), 1);
42//! assert_eq!(snap.no_targets[0].count, 1);
43//! ```
44
45use std::collections::HashMap;
46use std::time::Instant;
47
48use parking_lot::Mutex;
49
50use crate::cluster::peer::PeerState;
51use crate::msg::ConsistencyLevel;
52
53/// Live, mutable accumulator for failure-cause counters.
54///
55/// Cheap to clone via `Arc`; every method takes `&self`.
56#[derive(Debug, Default)]
57pub struct FailureMetrics {
58    inner: Mutex<FailureInner>,
59}
60
61#[derive(Debug, Default)]
62struct FailureInner {
63    no_targets: HashMap<NoTargetsKey, u64>,
64    peer_send_full: HashMap<PeerKey, u64>,
65    peer_send_closed: HashMap<PeerKey, u64>,
66    backend_send_full: u64,
67    backend_send_closed: u64,
68    response_timeout: HashMap<ConsistencyLevel, u64>,
69    peer_state_transitions: HashMap<TransitionKey, u64>,
70    peer_state_current: HashMap<u32, PeerStateRecord>,
71    peer_phi: HashMap<u32, PhiRecord>,
72    /// Rolling threshold gauge per peer, populated by
73    /// [`FailureMetrics::observe_threshold`]. Exposed as the
74    /// `gossip_phi_threshold_observed{peer}` Prometheus gauge so
75    /// operators can confirm the configured threshold per peer
76    /// without reading the running config.
77    peer_threshold: HashMap<u32, ThresholdRecord>,
78    /// Per-peer instant of the last state change. Updated on
79    /// every [`FailureMetrics::record_peer_state_transition`]
80    /// call so the next transition can compute the dwell time
81    /// the peer spent in the from-state.
82    peer_last_change: HashMap<u32, Instant>,
83    /// Per-state dwell histogram, keyed by the state being
84    /// exited. The histogram uses the same exponential-style
85    /// bucketing the engine's other observation surfaces use
86    /// ([`DWELL_BUCKETS_SECONDS`]).
87    dwell_per_state: HashMap<PeerState, DwellAccumulator>,
88}
89
90#[derive(Debug, Eq, PartialEq, Hash, Clone)]
91struct NoTargetsKey {
92    dc: String,
93    rack: String,
94    consistency: ConsistencyLevel,
95}
96
97#[derive(Debug, Eq, PartialEq, Hash, Clone)]
98struct PeerKey {
99    peer_idx: u32,
100    peer_dc: String,
101}
102
103#[derive(Debug, Eq, PartialEq, Hash, Clone)]
104struct TransitionKey {
105    peer_idx: u32,
106    from: PeerState,
107    to: PeerState,
108}
109
110#[derive(Debug, Clone)]
111struct PeerStateRecord {
112    dc: String,
113    rack: String,
114    state: PeerState,
115}
116
117#[derive(Debug, Clone)]
118struct PhiRecord {
119    dc: String,
120    rack: String,
121    /// Phi rounded to thousandths so the gauge survives the
122    /// `i64` round-trip in the Prometheus encoder.
123    phi_milli: i64,
124}
125
126#[derive(Debug, Clone)]
127struct ThresholdRecord {
128    dc: String,
129    rack: String,
130    /// Threshold rounded to thousandths so the gauge survives
131    /// the `i64` round-trip in the Prometheus encoder.
132    threshold_milli: i64,
133}
134
135/// Bucket boundaries for the `peer_state_dwell_seconds`
136/// histogram, in seconds. Picked to match the operator-facing
137/// SLO ranges every chaos run cares about: sub-second
138/// (transient flap), single-second to one minute (settling
139/// gossip), and minute-to-hour (long outage).
140pub const DWELL_BUCKETS_SECONDS: &[f64] = &[
141    0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 300.0, 600.0, 1_800.0, 3_600.0,
142];
143
144/// Per-state dwell accumulator. Stores cumulative bucket
145/// counts (one slot per upper bound in [`DWELL_BUCKETS_SECONDS`]
146/// plus a final `+Inf` slot), the running sum of observations
147/// in seconds, and the total observation count.
148#[derive(Debug, Clone)]
149struct DwellAccumulator {
150    /// One slot per bucket boundary plus a final `+Inf` slot
151    /// (`DWELL_BUCKETS_SECONDS.len() + 1` entries).
152    bucket_counts: Vec<u64>,
153    sum_seconds: f64,
154    count: u64,
155}
156
157impl DwellAccumulator {
158    fn new() -> Self {
159        Self {
160            bucket_counts: vec![0; DWELL_BUCKETS_SECONDS.len() + 1],
161            sum_seconds: 0.0,
162            count: 0,
163        }
164    }
165
166    fn observe(&mut self, dwell_seconds: f64) {
167        let v = if dwell_seconds.is_nan() || dwell_seconds < 0.0 {
168            0.0
169        } else {
170            dwell_seconds
171        };
172        self.sum_seconds += v;
173        self.count = self.count.saturating_add(1);
174        // Cumulative buckets: increment every slot whose upper
175        // bound is >= v, plus the trailing +Inf slot.
176        let last = self.bucket_counts.len() - 1;
177        for (i, upper) in DWELL_BUCKETS_SECONDS.iter().enumerate() {
178            if v <= *upper {
179                self.bucket_counts[i] = self.bucket_counts[i].saturating_add(1);
180            }
181        }
182        self.bucket_counts[last] = self.bucket_counts[last].saturating_add(1);
183    }
184}
185
186impl FailureMetrics {
187    /// Construct a fresh accumulator with all counters and
188    /// gauges zeroed.
189    ///
190    /// # Examples
191    ///
192    /// ```
193    /// use dynomite::stats::FailureMetrics;
194    /// let m = FailureMetrics::new();
195    /// assert_eq!(m.snapshot().backend_send_full, 0);
196    /// ```
197    #[must_use]
198    pub fn new() -> Self {
199        Self::default()
200    }
201
202    /// Increment the `dispatch_no_targets_total` counter for the
203    /// given local-DC labels.
204    ///
205    /// # Examples
206    ///
207    /// ```
208    /// use dynomite::msg::ConsistencyLevel;
209    /// use dynomite::stats::FailureMetrics;
210    /// let m = FailureMetrics::new();
211    /// m.record_no_targets("dc1", "rA", ConsistencyLevel::DcOne);
212    /// m.record_no_targets("dc1", "rA", ConsistencyLevel::DcOne);
213    /// assert_eq!(m.snapshot().no_targets[0].count, 2);
214    /// ```
215    pub fn record_no_targets(&self, dc: &str, rack: &str, consistency: ConsistencyLevel) {
216        let key = NoTargetsKey {
217            dc: dc.to_owned(),
218            rack: rack.to_owned(),
219            consistency,
220        };
221        let mut inner = self.inner.lock();
222        *inner.no_targets.entry(key).or_insert(0) += 1;
223    }
224
225    /// Increment the `dispatch_peer_send_full_total` counter.
226    pub fn record_peer_send_full(&self, peer_idx: u32, peer_dc: &str) {
227        let key = PeerKey {
228            peer_idx,
229            peer_dc: peer_dc.to_owned(),
230        };
231        let mut inner = self.inner.lock();
232        *inner.peer_send_full.entry(key).or_insert(0) += 1;
233    }
234
235    /// Increment the `dispatch_peer_send_closed_total` counter.
236    pub fn record_peer_send_closed(&self, peer_idx: u32, peer_dc: &str) {
237        let key = PeerKey {
238            peer_idx,
239            peer_dc: peer_dc.to_owned(),
240        };
241        let mut inner = self.inner.lock();
242        *inner.peer_send_closed.entry(key).or_insert(0) += 1;
243    }
244
245    /// Increment the `dispatch_backend_send_full_total` counter.
246    pub fn record_backend_send_full(&self) {
247        self.inner.lock().backend_send_full += 1;
248    }
249
250    /// Increment the `dispatch_backend_send_closed_total`
251    /// counter.
252    pub fn record_backend_send_closed(&self) {
253        self.inner.lock().backend_send_closed += 1;
254    }
255
256    /// Increment the `dispatch_response_timeout_total` counter.
257    /// Used by the response coalescer when every per-target
258    /// sender drops without producing a reply (i.e. the request
259    /// timed out at the dispatch layer).
260    pub fn record_response_timeout(&self, consistency: ConsistencyLevel) {
261        let mut inner = self.inner.lock();
262        *inner.response_timeout.entry(consistency).or_insert(0) += 1;
263    }
264
265    /// Record a peer-state transition. Increments
266    /// `peer_state_transitions_total` by one and updates the
267    /// `peer_state_current` gauge to the new state. Also
268    /// observes the dwell time the peer spent in `from` (when
269    /// a previous transition timestamp is available) into the
270    /// per-state `peer_state_dwell_seconds` histogram.
271    pub fn record_peer_state_transition(
272        &self,
273        peer_idx: u32,
274        dc: &str,
275        rack: &str,
276        from: PeerState,
277        to: PeerState,
278    ) {
279        self.record_peer_state_transition_at(peer_idx, dc, rack, from, to, Instant::now());
280    }
281
282    /// Variant of [`Self::record_peer_state_transition`] that
283    /// takes the wall-clock instant as a parameter. Lets tests
284    /// drive the dwell histogram without sleeping.
285    pub fn record_peer_state_transition_at(
286        &self,
287        peer_idx: u32,
288        dc: &str,
289        rack: &str,
290        from: PeerState,
291        to: PeerState,
292        now: Instant,
293    ) {
294        let key = TransitionKey { peer_idx, from, to };
295        let mut inner = self.inner.lock();
296        *inner.peer_state_transitions.entry(key).or_insert(0) += 1;
297        inner.peer_state_current.insert(
298            peer_idx,
299            PeerStateRecord {
300                dc: dc.to_owned(),
301                rack: rack.to_owned(),
302                state: to,
303            },
304        );
305        if let Some(prev) = inner.peer_last_change.insert(peer_idx, now) {
306            let dwell = now.saturating_duration_since(prev).as_secs_f64();
307            let acc = inner
308                .dwell_per_state
309                .entry(from)
310                .or_insert_with(DwellAccumulator::new);
311            acc.observe(dwell);
312        }
313    }
314
315    /// Update the `peer_state_current` gauge without recording
316    /// a transition. Useful for the initial publish at startup
317    /// and when an evaluate tick observes a peer whose state
318    /// has not changed.
319    pub fn observe_peer_state(&self, peer_idx: u32, dc: &str, rack: &str, state: PeerState) {
320        let mut inner = self.inner.lock();
321        inner.peer_state_current.insert(
322            peer_idx,
323            PeerStateRecord {
324                dc: dc.to_owned(),
325                rack: rack.to_owned(),
326                state,
327            },
328        );
329    }
330
331    /// Update the `gossip_phi_score` gauge for a peer. The phi
332    /// value is rounded to thousandths and stored as an `i64`
333    /// (millis); the snapshot exposes a floating-point view
334    /// rebuilt from that integer.
335    pub fn observe_phi(&self, peer_idx: u32, dc: &str, rack: &str, phi: f64) {
336        let phi_milli = phi_to_milli(phi);
337        let mut inner = self.inner.lock();
338        inner.peer_phi.insert(
339            peer_idx,
340            PhiRecord {
341                dc: dc.to_owned(),
342                rack: rack.to_owned(),
343                phi_milli,
344            },
345        );
346    }
347
348    /// Update the `gossip_phi_threshold_observed` gauge for a
349    /// peer. Mirrors [`Self::observe_phi`] in storage so the
350    /// operator can read the configured threshold next to the
351    /// computed phi without reading the running YAML.
352    pub fn observe_threshold(&self, peer_idx: u32, dc: &str, rack: &str, threshold: f64) {
353        let threshold_milli = phi_to_milli(threshold);
354        let mut inner = self.inner.lock();
355        inner.peer_threshold.insert(
356            peer_idx,
357            ThresholdRecord {
358                dc: dc.to_owned(),
359                rack: rack.to_owned(),
360                threshold_milli,
361            },
362        );
363    }
364
365    /// Build an immutable snapshot of every counter and gauge.
366    ///
367    /// The returned `FailureSnapshot` is `Clone` and `Send`, so
368    /// the stats aggregator can stash it in the snapshot cell
369    /// without holding the underlying lock.
370    #[must_use]
371    pub fn snapshot(&self) -> FailureSnapshot {
372        let inner = self.inner.lock();
373        let mut no_targets: Vec<NoTargetsEntry> = inner
374            .no_targets
375            .iter()
376            .map(|(k, v)| NoTargetsEntry {
377                dc: k.dc.clone(),
378                rack: k.rack.clone(),
379                consistency: k.consistency,
380                count: *v,
381            })
382            .collect();
383        no_targets.sort_by(|a, b| {
384            a.dc.cmp(&b.dc)
385                .then(a.rack.cmp(&b.rack))
386                .then((a.consistency as u8).cmp(&(b.consistency as u8)))
387        });
388        let peer_send_full = collect_peer_entries(&inner.peer_send_full);
389        let peer_send_closed = collect_peer_entries(&inner.peer_send_closed);
390        let mut response_timeout: Vec<TimeoutEntry> = inner
391            .response_timeout
392            .iter()
393            .map(|(c, v)| TimeoutEntry {
394                consistency: *c,
395                count: *v,
396            })
397            .collect();
398        response_timeout.sort_by_key(|e| e.consistency as u8);
399        let mut peer_state_transitions: Vec<TransitionEntry> = inner
400            .peer_state_transitions
401            .iter()
402            .map(|(k, v)| TransitionEntry {
403                peer_idx: k.peer_idx,
404                from: k.from,
405                to: k.to,
406                count: *v,
407            })
408            .collect();
409        peer_state_transitions.sort_by(|a, b| {
410            a.peer_idx
411                .cmp(&b.peer_idx)
412                .then((a.from as u8).cmp(&(b.from as u8)))
413                .then((a.to as u8).cmp(&(b.to as u8)))
414        });
415        let mut peer_state_current: Vec<PeerStateEntry> = inner
416            .peer_state_current
417            .iter()
418            .map(|(idx, rec)| PeerStateEntry {
419                peer_idx: *idx,
420                dc: rec.dc.clone(),
421                rack: rec.rack.clone(),
422                state: rec.state,
423            })
424            .collect();
425        peer_state_current.sort_by_key(|e| e.peer_idx);
426        let mut peer_phi: Vec<PhiEntry> = inner
427            .peer_phi
428            .iter()
429            .map(|(idx, rec)| PhiEntry {
430                peer_idx: *idx,
431                dc: rec.dc.clone(),
432                rack: rec.rack.clone(),
433                phi: milli_to_phi(rec.phi_milli),
434            })
435            .collect();
436        peer_phi.sort_by_key(|e| e.peer_idx);
437        let mut peer_threshold: Vec<ThresholdEntry> = inner
438            .peer_threshold
439            .iter()
440            .map(|(idx, rec)| ThresholdEntry {
441                peer_idx: *idx,
442                dc: rec.dc.clone(),
443                rack: rec.rack.clone(),
444                threshold: milli_to_phi(rec.threshold_milli),
445            })
446            .collect();
447        peer_threshold.sort_by_key(|e| e.peer_idx);
448        let mut peer_state_dwell: Vec<DwellEntry> = inner
449            .dwell_per_state
450            .iter()
451            .map(|(state, acc)| DwellEntry {
452                state: *state,
453                count: acc.count,
454                sum_seconds: acc.sum_seconds,
455                bucket_counts: acc.bucket_counts.clone(),
456            })
457            .collect();
458        peer_state_dwell.sort_by_key(|e| e.state as u8);
459        FailureSnapshot {
460            no_targets,
461            peer_send_full,
462            peer_send_closed,
463            backend_send_full: inner.backend_send_full,
464            backend_send_closed: inner.backend_send_closed,
465            response_timeout,
466            peer_state_transitions,
467            peer_state_current,
468            peer_phi,
469            peer_threshold,
470            peer_state_dwell,
471        }
472    }
473}
474
475/// Convert a phi value to a thousandths-rounded `i64`. Floats
476/// outside `[0, i64::MAX/1000]`, NaN, and infinity all clamp to
477/// the saturating ceiling. The function is implemented without
478/// `as`-casts so the pedantic precision-loss lint stays clean.
479fn phi_to_milli(phi: f64) -> i64 {
480    let saturating = i64::MAX / 1000;
481    if phi.is_nan() {
482        return saturating;
483    }
484    if !phi.is_finite() {
485        // Both +inf and -inf are unexpected; treat positive
486        // infinity as a saturating high and negative infinity
487        // as zero (phi cannot be negative in practice).
488        if phi > 0.0 {
489            return saturating;
490        }
491        return 0;
492    }
493    if phi <= 0.0 {
494        return 0;
495    }
496    let scaled = (phi * 1000.0).round();
497    f64_to_i64_clamped(scaled).min(saturating)
498}
499
500/// Render the stored thousandths-precision integer back as a
501/// floating-point phi value.
502fn milli_to_phi(milli: i64) -> f64 {
503    i64_to_f64(milli) / 1000.0
504}
505
506/// Lossless `i64 -> f64` for the small magnitudes we hold in
507/// the gauge. Implemented without an `as`-cast.
508fn i64_to_f64(value: i64) -> f64 {
509    let neg = value < 0;
510    let abs = value.unsigned_abs();
511    let hi = u32::try_from(abs >> 32).unwrap_or(u32::MAX);
512    let lo = u32::try_from(abs & 0xFFFF_FFFF).unwrap_or(u32::MAX);
513    let f = f64::from(hi) * 4_294_967_296.0_f64 + f64::from(lo);
514    if neg {
515        -f
516    } else {
517        f
518    }
519}
520
521/// Convert a non-negative finite f64 (assumed less than
522/// `i64::MAX`) to an `i64` without using a raw `as` cast.
523fn f64_to_i64_clamped(x: f64) -> i64 {
524    if !x.is_finite() || x <= 0.0 {
525        return 0;
526    }
527    let bits = x.to_bits();
528    let exp_field = u32::try_from((bits >> 52) & 0x7FF).unwrap_or(0);
529    let mant = bits & ((1u64 << 52) - 1);
530    if exp_field < 1023 {
531        return 0;
532    }
533    let unbiased = exp_field - 1023;
534    if unbiased >= 63 {
535        return i64::MAX;
536    }
537    let m = (1u64 << 52) | mant;
538    let value = if unbiased >= 52 {
539        let shift = unbiased - 52;
540        m.checked_shl(shift).unwrap_or(u64::MAX)
541    } else {
542        m >> (52 - unbiased)
543    };
544    i64::try_from(value).unwrap_or(i64::MAX)
545}
546
547fn collect_peer_entries(map: &HashMap<PeerKey, u64>) -> Vec<PeerEntry> {
548    let mut out: Vec<PeerEntry> = map
549        .iter()
550        .map(|(k, v)| PeerEntry {
551            peer_idx: k.peer_idx,
552            peer_dc: k.peer_dc.clone(),
553            count: *v,
554        })
555        .collect();
556    out.sort_by(|a, b| a.peer_idx.cmp(&b.peer_idx).then(a.peer_dc.cmp(&b.peer_dc)));
557    out
558}
559
560/// A single labeled `dispatch_no_targets_total` row.
561#[derive(Clone, Debug, Default, Eq, PartialEq)]
562pub struct NoTargetsEntry {
563    /// Local datacenter the request originated from.
564    pub dc: String,
565    /// Local rack.
566    pub rack: String,
567    /// Effective consistency level of the request that produced
568    /// the `NoTargets` plan.
569    pub consistency: ConsistencyLevel,
570    /// Cumulative occurrences.
571    pub count: u64,
572}
573
574/// A single labeled per-peer dispatch error row.
575#[derive(Clone, Debug, Default, Eq, PartialEq)]
576pub struct PeerEntry {
577    /// Index of the target peer in the pool's peer array.
578    pub peer_idx: u32,
579    /// Datacenter of the target peer.
580    pub peer_dc: String,
581    /// Cumulative occurrences.
582    pub count: u64,
583}
584
585/// A single labeled `dispatch_response_timeout_total` row.
586#[derive(Clone, Debug, Default, Eq, PartialEq)]
587pub struct TimeoutEntry {
588    /// Consistency level of the request that timed out.
589    pub consistency: ConsistencyLevel,
590    /// Cumulative occurrences.
591    pub count: u64,
592}
593
594/// A single labeled `peer_state_transitions_total` row.
595#[derive(Clone, Debug, Eq, PartialEq)]
596pub struct TransitionEntry {
597    /// Peer that transitioned.
598    pub peer_idx: u32,
599    /// State the peer was in before the transition.
600    pub from: PeerState,
601    /// State the peer is in after the transition.
602    pub to: PeerState,
603    /// Cumulative occurrences.
604    pub count: u64,
605}
606
607/// A single labeled `peer_state_current` gauge row.
608#[derive(Clone, Debug, Eq, PartialEq)]
609pub struct PeerStateEntry {
610    /// Peer index.
611    pub peer_idx: u32,
612    /// Datacenter of the peer.
613    pub dc: String,
614    /// Rack of the peer.
615    pub rack: String,
616    /// Current state.
617    pub state: PeerState,
618}
619
620/// A single labeled `gossip_phi_score` gauge row.
621#[derive(Clone, Debug)]
622pub struct PhiEntry {
623    /// Peer index.
624    pub peer_idx: u32,
625    /// Datacenter of the peer.
626    pub dc: String,
627    /// Rack of the peer.
628    pub rack: String,
629    /// Current phi value as observed at the last evaluate tick.
630    pub phi: f64,
631}
632
633/// A single labeled `gossip_phi_threshold_observed` gauge row.
634#[derive(Clone, Debug)]
635pub struct ThresholdEntry {
636    /// Peer index.
637    pub peer_idx: u32,
638    /// Datacenter of the peer.
639    pub dc: String,
640    /// Rack of the peer.
641    pub rack: String,
642    /// Threshold the failure detector last evaluated against
643    /// this peer (typically the cluster-wide configured
644    /// threshold, surfaced per peer so an operator viewing one
645    /// peer's panel sees its own threshold).
646    pub threshold: f64,
647}
648
649/// A single per-state row of the `peer_state_dwell_seconds`
650/// histogram. Holds the cumulative bucket counts (one slot per
651/// boundary in [`DWELL_BUCKETS_SECONDS`] plus a trailing `+Inf`
652/// slot), the running sum of observations in seconds, and the
653/// total observation count.
654#[derive(Clone, Debug)]
655pub struct DwellEntry {
656    /// State the peer was in when the dwell was observed.
657    pub state: PeerState,
658    /// Total number of dwell observations for this state.
659    pub count: u64,
660    /// Sum of all observed dwell durations in seconds.
661    pub sum_seconds: f64,
662    /// Cumulative bucket counts. Length is
663    /// `DWELL_BUCKETS_SECONDS.len() + 1`; the last entry is the
664    /// `+Inf` bucket (always equal to [`Self::count`]).
665    pub bucket_counts: Vec<u64>,
666}
667
668/// Immutable snapshot of every failure-cause metric.
669#[derive(Clone, Debug, Default)]
670pub struct FailureSnapshot {
671    /// `dispatch_no_targets_total` rows.
672    pub no_targets: Vec<NoTargetsEntry>,
673    /// `dispatch_peer_send_full_total` rows.
674    pub peer_send_full: Vec<PeerEntry>,
675    /// `dispatch_peer_send_closed_total` rows.
676    pub peer_send_closed: Vec<PeerEntry>,
677    /// `dispatch_backend_send_full_total` value.
678    pub backend_send_full: u64,
679    /// `dispatch_backend_send_closed_total` value.
680    pub backend_send_closed: u64,
681    /// `dispatch_response_timeout_total` rows.
682    pub response_timeout: Vec<TimeoutEntry>,
683    /// `peer_state_transitions_total` rows.
684    pub peer_state_transitions: Vec<TransitionEntry>,
685    /// `peer_state_current` gauge rows.
686    pub peer_state_current: Vec<PeerStateEntry>,
687    /// `gossip_phi_score` gauge rows.
688    pub peer_phi: Vec<PhiEntry>,
689    /// `gossip_phi_threshold_observed` gauge rows.
690    pub peer_threshold: Vec<ThresholdEntry>,
691    /// `peer_state_dwell_seconds` histogram rows, one per
692    /// observed [`PeerState`].
693    pub peer_state_dwell: Vec<DwellEntry>,
694}
695
696impl FailureSnapshot {
697    /// True when every counter and gauge is empty. Used by the
698    /// stats subsystem to skip rendering the failure block when
699    /// the operator has not wired the metrics in (and so every
700    /// observation has been a no-op).
701    #[must_use]
702    pub fn is_empty(&self) -> bool {
703        self.no_targets.is_empty()
704            && self.peer_send_full.is_empty()
705            && self.peer_send_closed.is_empty()
706            && self.backend_send_full == 0
707            && self.backend_send_closed == 0
708            && self.response_timeout.is_empty()
709            && self.peer_state_transitions.is_empty()
710            && self.peer_state_current.is_empty()
711            && self.peer_phi.is_empty()
712            && self.peer_threshold.is_empty()
713            && self.peer_state_dwell.is_empty()
714    }
715}
716
717#[cfg(test)]
718mod tests {
719    use super::*;
720
721    #[test]
722    fn no_targets_increments_per_label_set() {
723        let m = FailureMetrics::new();
724        m.record_no_targets("dc1", "rA", ConsistencyLevel::DcQuorum);
725        m.record_no_targets("dc1", "rA", ConsistencyLevel::DcQuorum);
726        m.record_no_targets("dc2", "rA", ConsistencyLevel::DcQuorum);
727        let s = m.snapshot();
728        assert_eq!(s.no_targets.len(), 2);
729        let dc1 = s.no_targets.iter().find(|e| e.dc == "dc1").unwrap();
730        let dc2 = s.no_targets.iter().find(|e| e.dc == "dc2").unwrap();
731        assert_eq!(dc1.count, 2);
732        assert_eq!(dc2.count, 1);
733    }
734
735    #[test]
736    fn peer_send_full_and_closed_are_distinct_buckets() {
737        let m = FailureMetrics::new();
738        m.record_peer_send_full(7, "dc2");
739        m.record_peer_send_closed(7, "dc2");
740        m.record_peer_send_closed(7, "dc2");
741        let s = m.snapshot();
742        assert_eq!(s.peer_send_full.len(), 1);
743        assert_eq!(s.peer_send_full[0].count, 1);
744        assert_eq!(s.peer_send_closed.len(), 1);
745        assert_eq!(s.peer_send_closed[0].count, 2);
746    }
747
748    #[test]
749    fn backend_counters_track_independently() {
750        let m = FailureMetrics::new();
751        m.record_backend_send_full();
752        m.record_backend_send_closed();
753        m.record_backend_send_closed();
754        let s = m.snapshot();
755        assert_eq!(s.backend_send_full, 1);
756        assert_eq!(s.backend_send_closed, 2);
757    }
758
759    #[test]
760    fn response_timeout_rolls_up_by_consistency() {
761        let m = FailureMetrics::new();
762        m.record_response_timeout(ConsistencyLevel::DcOne);
763        m.record_response_timeout(ConsistencyLevel::DcQuorum);
764        m.record_response_timeout(ConsistencyLevel::DcQuorum);
765        let s = m.snapshot();
766        assert_eq!(s.response_timeout.len(), 2);
767        let q = s
768            .response_timeout
769            .iter()
770            .find(|e| e.consistency == ConsistencyLevel::DcQuorum)
771            .unwrap();
772        assert_eq!(q.count, 2);
773    }
774
775    #[test]
776    fn peer_state_transition_records_count_and_current() {
777        let m = FailureMetrics::new();
778        m.record_peer_state_transition(3, "dc1", "rA", PeerState::Normal, PeerState::Down);
779        m.record_peer_state_transition(3, "dc1", "rA", PeerState::Down, PeerState::Normal);
780        m.record_peer_state_transition(3, "dc1", "rA", PeerState::Normal, PeerState::Down);
781        let s = m.snapshot();
782        let to_down = s
783            .peer_state_transitions
784            .iter()
785            .find(|t| t.from == PeerState::Normal && t.to == PeerState::Down)
786            .unwrap();
787        assert_eq!(to_down.count, 2);
788        assert_eq!(s.peer_state_current.len(), 1);
789        assert_eq!(s.peer_state_current[0].state, PeerState::Down);
790    }
791
792    #[test]
793    fn observe_phi_rounds_to_thousandths() {
794        let m = FailureMetrics::new();
795        m.observe_phi(1, "dc1", "rA", 1.234_567);
796        let s = m.snapshot();
797        assert_eq!(s.peer_phi.len(), 1);
798        // 1.234_567 rounds to 1.235 at three decimal places.
799        let diff = (s.peer_phi[0].phi - 1.235).abs();
800        assert!(diff < 1e-9, "phi={}", s.peer_phi[0].phi);
801    }
802
803    #[test]
804    fn snapshot_empty_predicate_is_correct() {
805        let m = FailureMetrics::new();
806        assert!(m.snapshot().is_empty());
807        m.record_backend_send_full();
808        assert!(!m.snapshot().is_empty());
809    }
810
811    #[test]
812    fn observe_threshold_records_value_per_peer() {
813        let m = FailureMetrics::new();
814        m.observe_threshold(7, "dc1", "rA", 8.0);
815        m.observe_threshold(8, "dc2", "rB", 6.5);
816        let s = m.snapshot();
817        assert_eq!(s.peer_threshold.len(), 2);
818        assert!((s.peer_threshold[0].threshold - 8.0).abs() < 1e-9);
819        assert_eq!(s.peer_threshold[0].peer_idx, 7);
820        assert!((s.peer_threshold[1].threshold - 6.5).abs() < 1e-9);
821    }
822
823    #[test]
824    fn dwell_histogram_records_observation_on_transition() {
825        // Drive a Normal->Down->Normal flap with controlled
826        // instants so the histogram observes deterministic
827        // dwell durations.
828        let m = FailureMetrics::new();
829        let t0 = Instant::now();
830        // First transition: no prior change time, so the
831        // accumulator stays empty but the timestamp is set.
832        m.record_peer_state_transition_at(
833            1,
834            "dc1",
835            "rA",
836            PeerState::Unknown,
837            PeerState::Normal,
838            t0,
839        );
840        // Spent 2.5 seconds in Normal -> 0.5..5 second bucket.
841        m.record_peer_state_transition_at(
842            1,
843            "dc1",
844            "rA",
845            PeerState::Normal,
846            PeerState::Down,
847            t0 + std::time::Duration::from_millis(2_500),
848        );
849        // Spent 45 seconds in Down -> 30..60 second bucket.
850        m.record_peer_state_transition_at(
851            1,
852            "dc1",
853            "rA",
854            PeerState::Down,
855            PeerState::Normal,
856            t0 + std::time::Duration::from_millis(2_500 + 45_000),
857        );
858        let s = m.snapshot();
859        let normal = s
860            .peer_state_dwell
861            .iter()
862            .find(|e| e.state == PeerState::Normal)
863            .expect("Normal dwell entry present");
864        assert_eq!(normal.count, 1);
865        assert!((normal.sum_seconds - 2.5).abs() < 1e-6);
866        let down = s
867            .peer_state_dwell
868            .iter()
869            .find(|e| e.state == PeerState::Down)
870            .expect("Down dwell entry present");
871        assert_eq!(down.count, 1);
872        assert!((down.sum_seconds - 45.0).abs() < 1e-6);
873        // The +Inf bucket equals the count.
874        assert_eq!(*normal.bucket_counts.last().unwrap(), normal.count);
875        assert_eq!(*down.bucket_counts.last().unwrap(), down.count);
876    }
877
878    #[test]
879    fn dwell_buckets_are_cumulative() {
880        let m = FailureMetrics::new();
881        let t0 = Instant::now();
882        m.record_peer_state_transition_at(
883            5,
884            "dc1",
885            "rA",
886            PeerState::Unknown,
887            PeerState::Normal,
888            t0,
889        );
890        // 0.05 second dwell falls in the smallest 0.1 bucket.
891        m.record_peer_state_transition_at(
892            5,
893            "dc1",
894            "rA",
895            PeerState::Normal,
896            PeerState::Down,
897            t0 + std::time::Duration::from_millis(50),
898        );
899        let s = m.snapshot();
900        let normal = s
901            .peer_state_dwell
902            .iter()
903            .find(|e| e.state == PeerState::Normal)
904            .expect("Normal dwell entry");
905        // Every bucket boundary >= 0.05 should hold count 1.
906        for bc in &normal.bucket_counts {
907            assert_eq!(*bc, 1, "every cumulative bucket sees the 0.05s observation");
908        }
909    }
910}