Skip to main content

dynomite/stats/
failure.rs

1//! Failure-cause counters for the dispatch and gossip planes.
2//!
3//! The pre-existing pool / server metrics maintained by
4//! [`crate::stats::Stats`] track aggregate request volume and a
5//! single coarse `errors` bucket. When operators need to tell
6//! "the cluster lost quorum because a peer was Down" apart from
7//! "the cluster lost quorum because the per-peer outbound
8//! channel was full", that single bucket is not enough.
9//!
10//! [`FailureMetrics`] supplements the existing counters with
11//! label-rich counters and gauges:
12//!
13//! * Per-cause dispatch error counters (no-targets, channel
14//!   full, channel closed, response timeout) keyed by the
15//!   labels operators want to slice by.
16//! * Per-peer state transition counts, for charting how often
17//!   gossip flips a peer between [`crate::cluster::peer::PeerState`]
18//!   variants.
19//! * Per-peer current-state and phi-score gauges, so a
20//!   dashboard can show the live failure-detector view.
21//!
22//! All counters initialise to zero. The accumulators take a
23//! single `parking_lot::Mutex` over the inner state; every
24//! observation is a hashmap insert and a tiny arithmetic
25//! update, so the lock is held for at most a handful of
26//! nanoseconds per call. The dispatch hot path is a direct
27//! method invocation on an `Arc<FailureMetrics>`; when the
28//! handle is `None` the dispatcher's call site short-circuits
29//! to a single null-pointer test (see
30//! [`crate::cluster::dispatch::ClusterDispatcher::with_failure_metrics`]).
31//!
32//! # Examples
33//!
34//! ```
35//! use dynomite::msg::ConsistencyLevel;
36//! use dynomite::stats::FailureMetrics;
37//!
38//! let m = FailureMetrics::new();
39//! m.record_no_targets("dc1", "rA", ConsistencyLevel::DcQuorum);
40//! let snap = m.snapshot();
41//! assert_eq!(snap.no_targets.len(), 1);
42//! assert_eq!(snap.no_targets[0].count, 1);
43//! ```
44
45use std::collections::HashMap;
46
47use parking_lot::Mutex;
48
49use crate::cluster::peer::PeerState;
50use crate::msg::ConsistencyLevel;
51
52/// Live, mutable accumulator for failure-cause counters.
53///
54/// Cheap to clone via `Arc`; every method takes `&self`.
55#[derive(Debug, Default)]
56pub struct FailureMetrics {
57    inner: Mutex<FailureInner>,
58}
59
60#[derive(Debug, Default)]
61struct FailureInner {
62    no_targets: HashMap<NoTargetsKey, u64>,
63    peer_send_full: HashMap<PeerKey, u64>,
64    peer_send_closed: HashMap<PeerKey, u64>,
65    backend_send_full: u64,
66    backend_send_closed: u64,
67    response_timeout: HashMap<ConsistencyLevel, u64>,
68    peer_state_transitions: HashMap<TransitionKey, u64>,
69    peer_state_current: HashMap<u32, PeerStateRecord>,
70    peer_phi: HashMap<u32, PhiRecord>,
71}
72
73#[derive(Debug, Eq, PartialEq, Hash, Clone)]
74struct NoTargetsKey {
75    dc: String,
76    rack: String,
77    consistency: ConsistencyLevel,
78}
79
80#[derive(Debug, Eq, PartialEq, Hash, Clone)]
81struct PeerKey {
82    peer_idx: u32,
83    peer_dc: String,
84}
85
86#[derive(Debug, Eq, PartialEq, Hash, Clone)]
87struct TransitionKey {
88    peer_idx: u32,
89    from: PeerState,
90    to: PeerState,
91}
92
93#[derive(Debug, Clone)]
94struct PeerStateRecord {
95    dc: String,
96    rack: String,
97    state: PeerState,
98}
99
100#[derive(Debug, Clone)]
101struct PhiRecord {
102    dc: String,
103    rack: String,
104    /// Phi rounded to thousandths so the gauge survives the
105    /// `i64` round-trip in the Prometheus encoder.
106    phi_milli: i64,
107}
108
109impl FailureMetrics {
110    /// Construct a fresh accumulator with all counters and
111    /// gauges zeroed.
112    ///
113    /// # Examples
114    ///
115    /// ```
116    /// use dynomite::stats::FailureMetrics;
117    /// let m = FailureMetrics::new();
118    /// assert_eq!(m.snapshot().backend_send_full, 0);
119    /// ```
120    #[must_use]
121    pub fn new() -> Self {
122        Self::default()
123    }
124
125    /// Increment the `dispatch_no_targets_total` counter for the
126    /// given local-DC labels.
127    ///
128    /// # Examples
129    ///
130    /// ```
131    /// use dynomite::msg::ConsistencyLevel;
132    /// use dynomite::stats::FailureMetrics;
133    /// let m = FailureMetrics::new();
134    /// m.record_no_targets("dc1", "rA", ConsistencyLevel::DcOne);
135    /// m.record_no_targets("dc1", "rA", ConsistencyLevel::DcOne);
136    /// assert_eq!(m.snapshot().no_targets[0].count, 2);
137    /// ```
138    pub fn record_no_targets(&self, dc: &str, rack: &str, consistency: ConsistencyLevel) {
139        let key = NoTargetsKey {
140            dc: dc.to_owned(),
141            rack: rack.to_owned(),
142            consistency,
143        };
144        let mut inner = self.inner.lock();
145        *inner.no_targets.entry(key).or_insert(0) += 1;
146    }
147
148    /// Increment the `dispatch_peer_send_full_total` counter.
149    pub fn record_peer_send_full(&self, peer_idx: u32, peer_dc: &str) {
150        let key = PeerKey {
151            peer_idx,
152            peer_dc: peer_dc.to_owned(),
153        };
154        let mut inner = self.inner.lock();
155        *inner.peer_send_full.entry(key).or_insert(0) += 1;
156    }
157
158    /// Increment the `dispatch_peer_send_closed_total` counter.
159    pub fn record_peer_send_closed(&self, peer_idx: u32, peer_dc: &str) {
160        let key = PeerKey {
161            peer_idx,
162            peer_dc: peer_dc.to_owned(),
163        };
164        let mut inner = self.inner.lock();
165        *inner.peer_send_closed.entry(key).or_insert(0) += 1;
166    }
167
168    /// Increment the `dispatch_backend_send_full_total` counter.
169    pub fn record_backend_send_full(&self) {
170        self.inner.lock().backend_send_full += 1;
171    }
172
173    /// Increment the `dispatch_backend_send_closed_total`
174    /// counter.
175    pub fn record_backend_send_closed(&self) {
176        self.inner.lock().backend_send_closed += 1;
177    }
178
179    /// Increment the `dispatch_response_timeout_total` counter.
180    /// Used by the response coalescer when every per-target
181    /// sender drops without producing a reply (i.e. the request
182    /// timed out at the dispatch layer).
183    pub fn record_response_timeout(&self, consistency: ConsistencyLevel) {
184        let mut inner = self.inner.lock();
185        *inner.response_timeout.entry(consistency).or_insert(0) += 1;
186    }
187
188    /// Record a peer-state transition. Increments
189    /// `peer_state_transitions_total` by one and updates the
190    /// `peer_state_current` gauge to the new state.
191    pub fn record_peer_state_transition(
192        &self,
193        peer_idx: u32,
194        dc: &str,
195        rack: &str,
196        from: PeerState,
197        to: PeerState,
198    ) {
199        let key = TransitionKey { peer_idx, from, to };
200        let mut inner = self.inner.lock();
201        *inner.peer_state_transitions.entry(key).or_insert(0) += 1;
202        inner.peer_state_current.insert(
203            peer_idx,
204            PeerStateRecord {
205                dc: dc.to_owned(),
206                rack: rack.to_owned(),
207                state: to,
208            },
209        );
210    }
211
212    /// Update the `peer_state_current` gauge without recording
213    /// a transition. Useful for the initial publish at startup
214    /// and when an evaluate tick observes a peer whose state
215    /// has not changed.
216    pub fn observe_peer_state(&self, peer_idx: u32, dc: &str, rack: &str, state: PeerState) {
217        let mut inner = self.inner.lock();
218        inner.peer_state_current.insert(
219            peer_idx,
220            PeerStateRecord {
221                dc: dc.to_owned(),
222                rack: rack.to_owned(),
223                state,
224            },
225        );
226    }
227
228    /// Update the `gossip_phi_score` gauge for a peer. The phi
229    /// value is rounded to thousandths and stored as an `i64`
230    /// (millis); the snapshot exposes a floating-point view
231    /// rebuilt from that integer.
232    pub fn observe_phi(&self, peer_idx: u32, dc: &str, rack: &str, phi: f64) {
233        let phi_milli = phi_to_milli(phi);
234        let mut inner = self.inner.lock();
235        inner.peer_phi.insert(
236            peer_idx,
237            PhiRecord {
238                dc: dc.to_owned(),
239                rack: rack.to_owned(),
240                phi_milli,
241            },
242        );
243    }
244
245    /// Build an immutable snapshot of every counter and gauge.
246    ///
247    /// The returned `FailureSnapshot` is `Clone` and `Send`, so
248    /// the stats aggregator can stash it in the snapshot cell
249    /// without holding the underlying lock.
250    #[must_use]
251    pub fn snapshot(&self) -> FailureSnapshot {
252        let inner = self.inner.lock();
253        let mut no_targets: Vec<NoTargetsEntry> = inner
254            .no_targets
255            .iter()
256            .map(|(k, v)| NoTargetsEntry {
257                dc: k.dc.clone(),
258                rack: k.rack.clone(),
259                consistency: k.consistency,
260                count: *v,
261            })
262            .collect();
263        no_targets.sort_by(|a, b| {
264            a.dc.cmp(&b.dc)
265                .then(a.rack.cmp(&b.rack))
266                .then((a.consistency as u8).cmp(&(b.consistency as u8)))
267        });
268        let peer_send_full = collect_peer_entries(&inner.peer_send_full);
269        let peer_send_closed = collect_peer_entries(&inner.peer_send_closed);
270        let mut response_timeout: Vec<TimeoutEntry> = inner
271            .response_timeout
272            .iter()
273            .map(|(c, v)| TimeoutEntry {
274                consistency: *c,
275                count: *v,
276            })
277            .collect();
278        response_timeout.sort_by_key(|e| e.consistency as u8);
279        let mut peer_state_transitions: Vec<TransitionEntry> = inner
280            .peer_state_transitions
281            .iter()
282            .map(|(k, v)| TransitionEntry {
283                peer_idx: k.peer_idx,
284                from: k.from,
285                to: k.to,
286                count: *v,
287            })
288            .collect();
289        peer_state_transitions.sort_by(|a, b| {
290            a.peer_idx
291                .cmp(&b.peer_idx)
292                .then((a.from as u8).cmp(&(b.from as u8)))
293                .then((a.to as u8).cmp(&(b.to as u8)))
294        });
295        let mut peer_state_current: Vec<PeerStateEntry> = inner
296            .peer_state_current
297            .iter()
298            .map(|(idx, rec)| PeerStateEntry {
299                peer_idx: *idx,
300                dc: rec.dc.clone(),
301                rack: rec.rack.clone(),
302                state: rec.state,
303            })
304            .collect();
305        peer_state_current.sort_by_key(|e| e.peer_idx);
306        let mut peer_phi: Vec<PhiEntry> = inner
307            .peer_phi
308            .iter()
309            .map(|(idx, rec)| PhiEntry {
310                peer_idx: *idx,
311                dc: rec.dc.clone(),
312                rack: rec.rack.clone(),
313                phi: milli_to_phi(rec.phi_milli),
314            })
315            .collect();
316        peer_phi.sort_by_key(|e| e.peer_idx);
317        FailureSnapshot {
318            no_targets,
319            peer_send_full,
320            peer_send_closed,
321            backend_send_full: inner.backend_send_full,
322            backend_send_closed: inner.backend_send_closed,
323            response_timeout,
324            peer_state_transitions,
325            peer_state_current,
326            peer_phi,
327        }
328    }
329}
330
331/// Convert a phi value to a thousandths-rounded `i64`. Floats
332/// outside `[0, i64::MAX/1000]`, NaN, and infinity all clamp to
333/// the saturating ceiling. The function is implemented without
334/// `as`-casts so the pedantic precision-loss lint stays clean.
335fn phi_to_milli(phi: f64) -> i64 {
336    let saturating = i64::MAX / 1000;
337    if phi.is_nan() {
338        return saturating;
339    }
340    if !phi.is_finite() {
341        // Both +inf and -inf are unexpected; treat positive
342        // infinity as a saturating high and negative infinity
343        // as zero (phi cannot be negative in practice).
344        if phi > 0.0 {
345            return saturating;
346        }
347        return 0;
348    }
349    if phi <= 0.0 {
350        return 0;
351    }
352    let scaled = (phi * 1000.0).round();
353    f64_to_i64_clamped(scaled).min(saturating)
354}
355
356/// Render the stored thousandths-precision integer back as a
357/// floating-point phi value.
358fn milli_to_phi(milli: i64) -> f64 {
359    i64_to_f64(milli) / 1000.0
360}
361
362/// Lossless `i64 -> f64` for the small magnitudes we hold in
363/// the gauge. Implemented without an `as`-cast.
364fn i64_to_f64(value: i64) -> f64 {
365    let neg = value < 0;
366    let abs = value.unsigned_abs();
367    let hi = u32::try_from(abs >> 32).unwrap_or(u32::MAX);
368    let lo = u32::try_from(abs & 0xFFFF_FFFF).unwrap_or(u32::MAX);
369    let f = f64::from(hi) * 4_294_967_296.0_f64 + f64::from(lo);
370    if neg {
371        -f
372    } else {
373        f
374    }
375}
376
377/// Convert a non-negative finite f64 (assumed less than
378/// `i64::MAX`) to an `i64` without using a raw `as` cast.
379fn f64_to_i64_clamped(x: f64) -> i64 {
380    if !x.is_finite() || x <= 0.0 {
381        return 0;
382    }
383    let bits = x.to_bits();
384    let exp_field = u32::try_from((bits >> 52) & 0x7FF).unwrap_or(0);
385    let mant = bits & ((1u64 << 52) - 1);
386    if exp_field < 1023 {
387        return 0;
388    }
389    let unbiased = exp_field - 1023;
390    if unbiased >= 63 {
391        return i64::MAX;
392    }
393    let m = (1u64 << 52) | mant;
394    let value = if unbiased >= 52 {
395        let shift = unbiased - 52;
396        m.checked_shl(shift).unwrap_or(u64::MAX)
397    } else {
398        m >> (52 - unbiased)
399    };
400    i64::try_from(value).unwrap_or(i64::MAX)
401}
402
403fn collect_peer_entries(map: &HashMap<PeerKey, u64>) -> Vec<PeerEntry> {
404    let mut out: Vec<PeerEntry> = map
405        .iter()
406        .map(|(k, v)| PeerEntry {
407            peer_idx: k.peer_idx,
408            peer_dc: k.peer_dc.clone(),
409            count: *v,
410        })
411        .collect();
412    out.sort_by(|a, b| a.peer_idx.cmp(&b.peer_idx).then(a.peer_dc.cmp(&b.peer_dc)));
413    out
414}
415
416/// A single labeled `dispatch_no_targets_total` row.
417#[derive(Clone, Debug, Default, Eq, PartialEq)]
418pub struct NoTargetsEntry {
419    /// Local datacenter the request originated from.
420    pub dc: String,
421    /// Local rack.
422    pub rack: String,
423    /// Effective consistency level of the request that produced
424    /// the `NoTargets` plan.
425    pub consistency: ConsistencyLevel,
426    /// Cumulative occurrences.
427    pub count: u64,
428}
429
430/// A single labeled per-peer dispatch error row.
431#[derive(Clone, Debug, Default, Eq, PartialEq)]
432pub struct PeerEntry {
433    /// Index of the target peer in the pool's peer array.
434    pub peer_idx: u32,
435    /// Datacenter of the target peer.
436    pub peer_dc: String,
437    /// Cumulative occurrences.
438    pub count: u64,
439}
440
441/// A single labeled `dispatch_response_timeout_total` row.
442#[derive(Clone, Debug, Default, Eq, PartialEq)]
443pub struct TimeoutEntry {
444    /// Consistency level of the request that timed out.
445    pub consistency: ConsistencyLevel,
446    /// Cumulative occurrences.
447    pub count: u64,
448}
449
450/// A single labeled `peer_state_transitions_total` row.
451#[derive(Clone, Debug, Eq, PartialEq)]
452pub struct TransitionEntry {
453    /// Peer that transitioned.
454    pub peer_idx: u32,
455    /// State the peer was in before the transition.
456    pub from: PeerState,
457    /// State the peer is in after the transition.
458    pub to: PeerState,
459    /// Cumulative occurrences.
460    pub count: u64,
461}
462
463/// A single labeled `peer_state_current` gauge row.
464#[derive(Clone, Debug, Eq, PartialEq)]
465pub struct PeerStateEntry {
466    /// Peer index.
467    pub peer_idx: u32,
468    /// Datacenter of the peer.
469    pub dc: String,
470    /// Rack of the peer.
471    pub rack: String,
472    /// Current state.
473    pub state: PeerState,
474}
475
476/// A single labeled `gossip_phi_score` gauge row.
477#[derive(Clone, Debug)]
478pub struct PhiEntry {
479    /// Peer index.
480    pub peer_idx: u32,
481    /// Datacenter of the peer.
482    pub dc: String,
483    /// Rack of the peer.
484    pub rack: String,
485    /// Current phi value as observed at the last evaluate tick.
486    pub phi: f64,
487}
488
489/// Immutable snapshot of every failure-cause metric.
490#[derive(Clone, Debug, Default)]
491pub struct FailureSnapshot {
492    /// `dispatch_no_targets_total` rows.
493    pub no_targets: Vec<NoTargetsEntry>,
494    /// `dispatch_peer_send_full_total` rows.
495    pub peer_send_full: Vec<PeerEntry>,
496    /// `dispatch_peer_send_closed_total` rows.
497    pub peer_send_closed: Vec<PeerEntry>,
498    /// `dispatch_backend_send_full_total` value.
499    pub backend_send_full: u64,
500    /// `dispatch_backend_send_closed_total` value.
501    pub backend_send_closed: u64,
502    /// `dispatch_response_timeout_total` rows.
503    pub response_timeout: Vec<TimeoutEntry>,
504    /// `peer_state_transitions_total` rows.
505    pub peer_state_transitions: Vec<TransitionEntry>,
506    /// `peer_state_current` gauge rows.
507    pub peer_state_current: Vec<PeerStateEntry>,
508    /// `gossip_phi_score` gauge rows.
509    pub peer_phi: Vec<PhiEntry>,
510}
511
512impl FailureSnapshot {
513    /// True when every counter and gauge is empty. Used by the
514    /// stats subsystem to skip rendering the failure block when
515    /// the operator has not wired the metrics in (and so every
516    /// observation has been a no-op).
517    #[must_use]
518    pub fn is_empty(&self) -> bool {
519        self.no_targets.is_empty()
520            && self.peer_send_full.is_empty()
521            && self.peer_send_closed.is_empty()
522            && self.backend_send_full == 0
523            && self.backend_send_closed == 0
524            && self.response_timeout.is_empty()
525            && self.peer_state_transitions.is_empty()
526            && self.peer_state_current.is_empty()
527            && self.peer_phi.is_empty()
528    }
529}
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534
535    #[test]
536    fn no_targets_increments_per_label_set() {
537        let m = FailureMetrics::new();
538        m.record_no_targets("dc1", "rA", ConsistencyLevel::DcQuorum);
539        m.record_no_targets("dc1", "rA", ConsistencyLevel::DcQuorum);
540        m.record_no_targets("dc2", "rA", ConsistencyLevel::DcQuorum);
541        let s = m.snapshot();
542        assert_eq!(s.no_targets.len(), 2);
543        let dc1 = s.no_targets.iter().find(|e| e.dc == "dc1").unwrap();
544        let dc2 = s.no_targets.iter().find(|e| e.dc == "dc2").unwrap();
545        assert_eq!(dc1.count, 2);
546        assert_eq!(dc2.count, 1);
547    }
548
549    #[test]
550    fn peer_send_full_and_closed_are_distinct_buckets() {
551        let m = FailureMetrics::new();
552        m.record_peer_send_full(7, "dc2");
553        m.record_peer_send_closed(7, "dc2");
554        m.record_peer_send_closed(7, "dc2");
555        let s = m.snapshot();
556        assert_eq!(s.peer_send_full.len(), 1);
557        assert_eq!(s.peer_send_full[0].count, 1);
558        assert_eq!(s.peer_send_closed.len(), 1);
559        assert_eq!(s.peer_send_closed[0].count, 2);
560    }
561
562    #[test]
563    fn backend_counters_track_independently() {
564        let m = FailureMetrics::new();
565        m.record_backend_send_full();
566        m.record_backend_send_closed();
567        m.record_backend_send_closed();
568        let s = m.snapshot();
569        assert_eq!(s.backend_send_full, 1);
570        assert_eq!(s.backend_send_closed, 2);
571    }
572
573    #[test]
574    fn response_timeout_rolls_up_by_consistency() {
575        let m = FailureMetrics::new();
576        m.record_response_timeout(ConsistencyLevel::DcOne);
577        m.record_response_timeout(ConsistencyLevel::DcQuorum);
578        m.record_response_timeout(ConsistencyLevel::DcQuorum);
579        let s = m.snapshot();
580        assert_eq!(s.response_timeout.len(), 2);
581        let q = s
582            .response_timeout
583            .iter()
584            .find(|e| e.consistency == ConsistencyLevel::DcQuorum)
585            .unwrap();
586        assert_eq!(q.count, 2);
587    }
588
589    #[test]
590    fn peer_state_transition_records_count_and_current() {
591        let m = FailureMetrics::new();
592        m.record_peer_state_transition(3, "dc1", "rA", PeerState::Normal, PeerState::Down);
593        m.record_peer_state_transition(3, "dc1", "rA", PeerState::Down, PeerState::Normal);
594        m.record_peer_state_transition(3, "dc1", "rA", PeerState::Normal, PeerState::Down);
595        let s = m.snapshot();
596        let to_down = s
597            .peer_state_transitions
598            .iter()
599            .find(|t| t.from == PeerState::Normal && t.to == PeerState::Down)
600            .unwrap();
601        assert_eq!(to_down.count, 2);
602        assert_eq!(s.peer_state_current.len(), 1);
603        assert_eq!(s.peer_state_current[0].state, PeerState::Down);
604    }
605
606    #[test]
607    fn observe_phi_rounds_to_thousandths() {
608        let m = FailureMetrics::new();
609        m.observe_phi(1, "dc1", "rA", 1.234_567);
610        let s = m.snapshot();
611        assert_eq!(s.peer_phi.len(), 1);
612        // 1.234_567 rounds to 1.235 at three decimal places.
613        let diff = (s.peer_phi[0].phi - 1.235).abs();
614        assert!(diff < 1e-9, "phi={}", s.peer_phi[0].phi);
615    }
616
617    #[test]
618    fn snapshot_empty_predicate_is_correct() {
619        let m = FailureMetrics::new();
620        assert!(m.snapshot().is_empty());
621        m.record_backend_send_full();
622        assert!(!m.snapshot().is_empty());
623    }
624}