1use std::collections::HashMap;
46use std::time::Instant;
47
48use parking_lot::Mutex;
49
50use crate::cluster::peer::PeerState;
51use crate::msg::ConsistencyLevel;
52
53#[derive(Debug, Default)]
57pub struct FailureMetrics {
58 inner: Mutex<FailureInner>,
59}
60
61#[derive(Debug, Default)]
62struct FailureInner {
63 no_targets: HashMap<NoTargetsKey, u64>,
64 peer_send_full: HashMap<PeerKey, u64>,
65 peer_send_closed: HashMap<PeerKey, u64>,
66 backend_send_full: u64,
67 backend_send_closed: u64,
68 response_timeout: HashMap<ConsistencyLevel, u64>,
69 peer_state_transitions: HashMap<TransitionKey, u64>,
70 peer_state_current: HashMap<u32, PeerStateRecord>,
71 peer_phi: HashMap<u32, PhiRecord>,
72 peer_threshold: HashMap<u32, ThresholdRecord>,
78 peer_last_change: HashMap<u32, Instant>,
83 dwell_per_state: HashMap<PeerState, DwellAccumulator>,
88}
89
90#[derive(Debug, Eq, PartialEq, Hash, Clone)]
91struct NoTargetsKey {
92 dc: String,
93 rack: String,
94 consistency: ConsistencyLevel,
95}
96
97#[derive(Debug, Eq, PartialEq, Hash, Clone)]
98struct PeerKey {
99 peer_idx: u32,
100 peer_dc: String,
101}
102
103#[derive(Debug, Eq, PartialEq, Hash, Clone)]
104struct TransitionKey {
105 peer_idx: u32,
106 from: PeerState,
107 to: PeerState,
108}
109
110#[derive(Debug, Clone)]
111struct PeerStateRecord {
112 dc: String,
113 rack: String,
114 state: PeerState,
115}
116
117#[derive(Debug, Clone)]
118struct PhiRecord {
119 dc: String,
120 rack: String,
121 phi_milli: i64,
124}
125
126#[derive(Debug, Clone)]
127struct ThresholdRecord {
128 dc: String,
129 rack: String,
130 threshold_milli: i64,
133}
134
135pub const DWELL_BUCKETS_SECONDS: &[f64] = &[
141 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 300.0, 600.0, 1_800.0, 3_600.0,
142];
143
144#[derive(Debug, Clone)]
149struct DwellAccumulator {
150 bucket_counts: Vec<u64>,
153 sum_seconds: f64,
154 count: u64,
155}
156
157impl DwellAccumulator {
158 fn new() -> Self {
159 Self {
160 bucket_counts: vec![0; DWELL_BUCKETS_SECONDS.len() + 1],
161 sum_seconds: 0.0,
162 count: 0,
163 }
164 }
165
166 fn observe(&mut self, dwell_seconds: f64) {
167 let v = if dwell_seconds.is_nan() || dwell_seconds < 0.0 {
168 0.0
169 } else {
170 dwell_seconds
171 };
172 self.sum_seconds += v;
173 self.count = self.count.saturating_add(1);
174 let last = self.bucket_counts.len() - 1;
177 for (i, upper) in DWELL_BUCKETS_SECONDS.iter().enumerate() {
178 if v <= *upper {
179 self.bucket_counts[i] = self.bucket_counts[i].saturating_add(1);
180 }
181 }
182 self.bucket_counts[last] = self.bucket_counts[last].saturating_add(1);
183 }
184}
185
186impl FailureMetrics {
187 #[must_use]
198 pub fn new() -> Self {
199 Self::default()
200 }
201
202 pub fn record_no_targets(&self, dc: &str, rack: &str, consistency: ConsistencyLevel) {
216 let key = NoTargetsKey {
217 dc: dc.to_owned(),
218 rack: rack.to_owned(),
219 consistency,
220 };
221 let mut inner = self.inner.lock();
222 *inner.no_targets.entry(key).or_insert(0) += 1;
223 }
224
225 pub fn record_peer_send_full(&self, peer_idx: u32, peer_dc: &str) {
227 let key = PeerKey {
228 peer_idx,
229 peer_dc: peer_dc.to_owned(),
230 };
231 let mut inner = self.inner.lock();
232 *inner.peer_send_full.entry(key).or_insert(0) += 1;
233 }
234
235 pub fn record_peer_send_closed(&self, peer_idx: u32, peer_dc: &str) {
237 let key = PeerKey {
238 peer_idx,
239 peer_dc: peer_dc.to_owned(),
240 };
241 let mut inner = self.inner.lock();
242 *inner.peer_send_closed.entry(key).or_insert(0) += 1;
243 }
244
245 pub fn record_backend_send_full(&self) {
247 self.inner.lock().backend_send_full += 1;
248 }
249
250 pub fn record_backend_send_closed(&self) {
253 self.inner.lock().backend_send_closed += 1;
254 }
255
256 pub fn record_response_timeout(&self, consistency: ConsistencyLevel) {
261 let mut inner = self.inner.lock();
262 *inner.response_timeout.entry(consistency).or_insert(0) += 1;
263 }
264
265 pub fn record_peer_state_transition(
272 &self,
273 peer_idx: u32,
274 dc: &str,
275 rack: &str,
276 from: PeerState,
277 to: PeerState,
278 ) {
279 self.record_peer_state_transition_at(peer_idx, dc, rack, from, to, Instant::now());
280 }
281
282 pub fn record_peer_state_transition_at(
286 &self,
287 peer_idx: u32,
288 dc: &str,
289 rack: &str,
290 from: PeerState,
291 to: PeerState,
292 now: Instant,
293 ) {
294 let key = TransitionKey { peer_idx, from, to };
295 let mut inner = self.inner.lock();
296 *inner.peer_state_transitions.entry(key).or_insert(0) += 1;
297 inner.peer_state_current.insert(
298 peer_idx,
299 PeerStateRecord {
300 dc: dc.to_owned(),
301 rack: rack.to_owned(),
302 state: to,
303 },
304 );
305 if let Some(prev) = inner.peer_last_change.insert(peer_idx, now) {
306 let dwell = now.saturating_duration_since(prev).as_secs_f64();
307 let acc = inner
308 .dwell_per_state
309 .entry(from)
310 .or_insert_with(DwellAccumulator::new);
311 acc.observe(dwell);
312 }
313 }
314
315 pub fn observe_peer_state(&self, peer_idx: u32, dc: &str, rack: &str, state: PeerState) {
320 let mut inner = self.inner.lock();
321 inner.peer_state_current.insert(
322 peer_idx,
323 PeerStateRecord {
324 dc: dc.to_owned(),
325 rack: rack.to_owned(),
326 state,
327 },
328 );
329 }
330
331 pub fn observe_phi(&self, peer_idx: u32, dc: &str, rack: &str, phi: f64) {
336 let phi_milli = phi_to_milli(phi);
337 let mut inner = self.inner.lock();
338 inner.peer_phi.insert(
339 peer_idx,
340 PhiRecord {
341 dc: dc.to_owned(),
342 rack: rack.to_owned(),
343 phi_milli,
344 },
345 );
346 }
347
348 pub fn observe_threshold(&self, peer_idx: u32, dc: &str, rack: &str, threshold: f64) {
353 let threshold_milli = phi_to_milli(threshold);
354 let mut inner = self.inner.lock();
355 inner.peer_threshold.insert(
356 peer_idx,
357 ThresholdRecord {
358 dc: dc.to_owned(),
359 rack: rack.to_owned(),
360 threshold_milli,
361 },
362 );
363 }
364
365 #[must_use]
371 pub fn snapshot(&self) -> FailureSnapshot {
372 let inner = self.inner.lock();
373 let mut no_targets: Vec<NoTargetsEntry> = inner
374 .no_targets
375 .iter()
376 .map(|(k, v)| NoTargetsEntry {
377 dc: k.dc.clone(),
378 rack: k.rack.clone(),
379 consistency: k.consistency,
380 count: *v,
381 })
382 .collect();
383 no_targets.sort_by(|a, b| {
384 a.dc.cmp(&b.dc)
385 .then(a.rack.cmp(&b.rack))
386 .then((a.consistency as u8).cmp(&(b.consistency as u8)))
387 });
388 let peer_send_full = collect_peer_entries(&inner.peer_send_full);
389 let peer_send_closed = collect_peer_entries(&inner.peer_send_closed);
390 let mut response_timeout: Vec<TimeoutEntry> = inner
391 .response_timeout
392 .iter()
393 .map(|(c, v)| TimeoutEntry {
394 consistency: *c,
395 count: *v,
396 })
397 .collect();
398 response_timeout.sort_by_key(|e| e.consistency as u8);
399 let mut peer_state_transitions: Vec<TransitionEntry> = inner
400 .peer_state_transitions
401 .iter()
402 .map(|(k, v)| TransitionEntry {
403 peer_idx: k.peer_idx,
404 from: k.from,
405 to: k.to,
406 count: *v,
407 })
408 .collect();
409 peer_state_transitions.sort_by(|a, b| {
410 a.peer_idx
411 .cmp(&b.peer_idx)
412 .then((a.from as u8).cmp(&(b.from as u8)))
413 .then((a.to as u8).cmp(&(b.to as u8)))
414 });
415 let mut peer_state_current: Vec<PeerStateEntry> = inner
416 .peer_state_current
417 .iter()
418 .map(|(idx, rec)| PeerStateEntry {
419 peer_idx: *idx,
420 dc: rec.dc.clone(),
421 rack: rec.rack.clone(),
422 state: rec.state,
423 })
424 .collect();
425 peer_state_current.sort_by_key(|e| e.peer_idx);
426 let mut peer_phi: Vec<PhiEntry> = inner
427 .peer_phi
428 .iter()
429 .map(|(idx, rec)| PhiEntry {
430 peer_idx: *idx,
431 dc: rec.dc.clone(),
432 rack: rec.rack.clone(),
433 phi: milli_to_phi(rec.phi_milli),
434 })
435 .collect();
436 peer_phi.sort_by_key(|e| e.peer_idx);
437 let mut peer_threshold: Vec<ThresholdEntry> = inner
438 .peer_threshold
439 .iter()
440 .map(|(idx, rec)| ThresholdEntry {
441 peer_idx: *idx,
442 dc: rec.dc.clone(),
443 rack: rec.rack.clone(),
444 threshold: milli_to_phi(rec.threshold_milli),
445 })
446 .collect();
447 peer_threshold.sort_by_key(|e| e.peer_idx);
448 let mut peer_state_dwell: Vec<DwellEntry> = inner
449 .dwell_per_state
450 .iter()
451 .map(|(state, acc)| DwellEntry {
452 state: *state,
453 count: acc.count,
454 sum_seconds: acc.sum_seconds,
455 bucket_counts: acc.bucket_counts.clone(),
456 })
457 .collect();
458 peer_state_dwell.sort_by_key(|e| e.state as u8);
459 FailureSnapshot {
460 no_targets,
461 peer_send_full,
462 peer_send_closed,
463 backend_send_full: inner.backend_send_full,
464 backend_send_closed: inner.backend_send_closed,
465 response_timeout,
466 peer_state_transitions,
467 peer_state_current,
468 peer_phi,
469 peer_threshold,
470 peer_state_dwell,
471 }
472 }
473}
474
475fn phi_to_milli(phi: f64) -> i64 {
480 let saturating = i64::MAX / 1000;
481 if phi.is_nan() {
482 return saturating;
483 }
484 if !phi.is_finite() {
485 if phi > 0.0 {
489 return saturating;
490 }
491 return 0;
492 }
493 if phi <= 0.0 {
494 return 0;
495 }
496 let scaled = (phi * 1000.0).round();
497 f64_to_i64_clamped(scaled).min(saturating)
498}
499
500fn milli_to_phi(milli: i64) -> f64 {
503 i64_to_f64(milli) / 1000.0
504}
505
506fn i64_to_f64(value: i64) -> f64 {
509 let neg = value < 0;
510 let abs = value.unsigned_abs();
511 let hi = u32::try_from(abs >> 32).unwrap_or(u32::MAX);
512 let lo = u32::try_from(abs & 0xFFFF_FFFF).unwrap_or(u32::MAX);
513 let f = f64::from(hi) * 4_294_967_296.0_f64 + f64::from(lo);
514 if neg {
515 -f
516 } else {
517 f
518 }
519}
520
521fn f64_to_i64_clamped(x: f64) -> i64 {
524 if !x.is_finite() || x <= 0.0 {
525 return 0;
526 }
527 let bits = x.to_bits();
528 let exp_field = u32::try_from((bits >> 52) & 0x7FF).unwrap_or(0);
529 let mant = bits & ((1u64 << 52) - 1);
530 if exp_field < 1023 {
531 return 0;
532 }
533 let unbiased = exp_field - 1023;
534 if unbiased >= 63 {
535 return i64::MAX;
536 }
537 let m = (1u64 << 52) | mant;
538 let value = if unbiased >= 52 {
539 let shift = unbiased - 52;
540 m.checked_shl(shift).unwrap_or(u64::MAX)
541 } else {
542 m >> (52 - unbiased)
543 };
544 i64::try_from(value).unwrap_or(i64::MAX)
545}
546
547fn collect_peer_entries(map: &HashMap<PeerKey, u64>) -> Vec<PeerEntry> {
548 let mut out: Vec<PeerEntry> = map
549 .iter()
550 .map(|(k, v)| PeerEntry {
551 peer_idx: k.peer_idx,
552 peer_dc: k.peer_dc.clone(),
553 count: *v,
554 })
555 .collect();
556 out.sort_by(|a, b| a.peer_idx.cmp(&b.peer_idx).then(a.peer_dc.cmp(&b.peer_dc)));
557 out
558}
559
560#[derive(Clone, Debug, Default, Eq, PartialEq)]
562pub struct NoTargetsEntry {
563 pub dc: String,
565 pub rack: String,
567 pub consistency: ConsistencyLevel,
570 pub count: u64,
572}
573
574#[derive(Clone, Debug, Default, Eq, PartialEq)]
576pub struct PeerEntry {
577 pub peer_idx: u32,
579 pub peer_dc: String,
581 pub count: u64,
583}
584
585#[derive(Clone, Debug, Default, Eq, PartialEq)]
587pub struct TimeoutEntry {
588 pub consistency: ConsistencyLevel,
590 pub count: u64,
592}
593
594#[derive(Clone, Debug, Eq, PartialEq)]
596pub struct TransitionEntry {
597 pub peer_idx: u32,
599 pub from: PeerState,
601 pub to: PeerState,
603 pub count: u64,
605}
606
607#[derive(Clone, Debug, Eq, PartialEq)]
609pub struct PeerStateEntry {
610 pub peer_idx: u32,
612 pub dc: String,
614 pub rack: String,
616 pub state: PeerState,
618}
619
620#[derive(Clone, Debug)]
622pub struct PhiEntry {
623 pub peer_idx: u32,
625 pub dc: String,
627 pub rack: String,
629 pub phi: f64,
631}
632
633#[derive(Clone, Debug)]
635pub struct ThresholdEntry {
636 pub peer_idx: u32,
638 pub dc: String,
640 pub rack: String,
642 pub threshold: f64,
647}
648
649#[derive(Clone, Debug)]
655pub struct DwellEntry {
656 pub state: PeerState,
658 pub count: u64,
660 pub sum_seconds: f64,
662 pub bucket_counts: Vec<u64>,
666}
667
668#[derive(Clone, Debug, Default)]
670pub struct FailureSnapshot {
671 pub no_targets: Vec<NoTargetsEntry>,
673 pub peer_send_full: Vec<PeerEntry>,
675 pub peer_send_closed: Vec<PeerEntry>,
677 pub backend_send_full: u64,
679 pub backend_send_closed: u64,
681 pub response_timeout: Vec<TimeoutEntry>,
683 pub peer_state_transitions: Vec<TransitionEntry>,
685 pub peer_state_current: Vec<PeerStateEntry>,
687 pub peer_phi: Vec<PhiEntry>,
689 pub peer_threshold: Vec<ThresholdEntry>,
691 pub peer_state_dwell: Vec<DwellEntry>,
694}
695
696impl FailureSnapshot {
697 #[must_use]
702 pub fn is_empty(&self) -> bool {
703 self.no_targets.is_empty()
704 && self.peer_send_full.is_empty()
705 && self.peer_send_closed.is_empty()
706 && self.backend_send_full == 0
707 && self.backend_send_closed == 0
708 && self.response_timeout.is_empty()
709 && self.peer_state_transitions.is_empty()
710 && self.peer_state_current.is_empty()
711 && self.peer_phi.is_empty()
712 && self.peer_threshold.is_empty()
713 && self.peer_state_dwell.is_empty()
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use super::*;
720
721 #[test]
722 fn no_targets_increments_per_label_set() {
723 let m = FailureMetrics::new();
724 m.record_no_targets("dc1", "rA", ConsistencyLevel::DcQuorum);
725 m.record_no_targets("dc1", "rA", ConsistencyLevel::DcQuorum);
726 m.record_no_targets("dc2", "rA", ConsistencyLevel::DcQuorum);
727 let s = m.snapshot();
728 assert_eq!(s.no_targets.len(), 2);
729 let dc1 = s.no_targets.iter().find(|e| e.dc == "dc1").unwrap();
730 let dc2 = s.no_targets.iter().find(|e| e.dc == "dc2").unwrap();
731 assert_eq!(dc1.count, 2);
732 assert_eq!(dc2.count, 1);
733 }
734
735 #[test]
736 fn peer_send_full_and_closed_are_distinct_buckets() {
737 let m = FailureMetrics::new();
738 m.record_peer_send_full(7, "dc2");
739 m.record_peer_send_closed(7, "dc2");
740 m.record_peer_send_closed(7, "dc2");
741 let s = m.snapshot();
742 assert_eq!(s.peer_send_full.len(), 1);
743 assert_eq!(s.peer_send_full[0].count, 1);
744 assert_eq!(s.peer_send_closed.len(), 1);
745 assert_eq!(s.peer_send_closed[0].count, 2);
746 }
747
748 #[test]
749 fn backend_counters_track_independently() {
750 let m = FailureMetrics::new();
751 m.record_backend_send_full();
752 m.record_backend_send_closed();
753 m.record_backend_send_closed();
754 let s = m.snapshot();
755 assert_eq!(s.backend_send_full, 1);
756 assert_eq!(s.backend_send_closed, 2);
757 }
758
759 #[test]
760 fn response_timeout_rolls_up_by_consistency() {
761 let m = FailureMetrics::new();
762 m.record_response_timeout(ConsistencyLevel::DcOne);
763 m.record_response_timeout(ConsistencyLevel::DcQuorum);
764 m.record_response_timeout(ConsistencyLevel::DcQuorum);
765 let s = m.snapshot();
766 assert_eq!(s.response_timeout.len(), 2);
767 let q = s
768 .response_timeout
769 .iter()
770 .find(|e| e.consistency == ConsistencyLevel::DcQuorum)
771 .unwrap();
772 assert_eq!(q.count, 2);
773 }
774
775 #[test]
776 fn peer_state_transition_records_count_and_current() {
777 let m = FailureMetrics::new();
778 m.record_peer_state_transition(3, "dc1", "rA", PeerState::Normal, PeerState::Down);
779 m.record_peer_state_transition(3, "dc1", "rA", PeerState::Down, PeerState::Normal);
780 m.record_peer_state_transition(3, "dc1", "rA", PeerState::Normal, PeerState::Down);
781 let s = m.snapshot();
782 let to_down = s
783 .peer_state_transitions
784 .iter()
785 .find(|t| t.from == PeerState::Normal && t.to == PeerState::Down)
786 .unwrap();
787 assert_eq!(to_down.count, 2);
788 assert_eq!(s.peer_state_current.len(), 1);
789 assert_eq!(s.peer_state_current[0].state, PeerState::Down);
790 }
791
792 #[test]
793 fn observe_phi_rounds_to_thousandths() {
794 let m = FailureMetrics::new();
795 m.observe_phi(1, "dc1", "rA", 1.234_567);
796 let s = m.snapshot();
797 assert_eq!(s.peer_phi.len(), 1);
798 let diff = (s.peer_phi[0].phi - 1.235).abs();
800 assert!(diff < 1e-9, "phi={}", s.peer_phi[0].phi);
801 }
802
803 #[test]
804 fn snapshot_empty_predicate_is_correct() {
805 let m = FailureMetrics::new();
806 assert!(m.snapshot().is_empty());
807 m.record_backend_send_full();
808 assert!(!m.snapshot().is_empty());
809 }
810
811 #[test]
812 fn observe_threshold_records_value_per_peer() {
813 let m = FailureMetrics::new();
814 m.observe_threshold(7, "dc1", "rA", 8.0);
815 m.observe_threshold(8, "dc2", "rB", 6.5);
816 let s = m.snapshot();
817 assert_eq!(s.peer_threshold.len(), 2);
818 assert!((s.peer_threshold[0].threshold - 8.0).abs() < 1e-9);
819 assert_eq!(s.peer_threshold[0].peer_idx, 7);
820 assert!((s.peer_threshold[1].threshold - 6.5).abs() < 1e-9);
821 }
822
823 #[test]
824 fn dwell_histogram_records_observation_on_transition() {
825 let m = FailureMetrics::new();
829 let t0 = Instant::now();
830 m.record_peer_state_transition_at(
833 1,
834 "dc1",
835 "rA",
836 PeerState::Unknown,
837 PeerState::Normal,
838 t0,
839 );
840 m.record_peer_state_transition_at(
842 1,
843 "dc1",
844 "rA",
845 PeerState::Normal,
846 PeerState::Down,
847 t0 + std::time::Duration::from_millis(2_500),
848 );
849 m.record_peer_state_transition_at(
851 1,
852 "dc1",
853 "rA",
854 PeerState::Down,
855 PeerState::Normal,
856 t0 + std::time::Duration::from_millis(2_500 + 45_000),
857 );
858 let s = m.snapshot();
859 let normal = s
860 .peer_state_dwell
861 .iter()
862 .find(|e| e.state == PeerState::Normal)
863 .expect("Normal dwell entry present");
864 assert_eq!(normal.count, 1);
865 assert!((normal.sum_seconds - 2.5).abs() < 1e-6);
866 let down = s
867 .peer_state_dwell
868 .iter()
869 .find(|e| e.state == PeerState::Down)
870 .expect("Down dwell entry present");
871 assert_eq!(down.count, 1);
872 assert!((down.sum_seconds - 45.0).abs() < 1e-6);
873 assert_eq!(*normal.bucket_counts.last().unwrap(), normal.count);
875 assert_eq!(*down.bucket_counts.last().unwrap(), down.count);
876 }
877
878 #[test]
879 fn dwell_buckets_are_cumulative() {
880 let m = FailureMetrics::new();
881 let t0 = Instant::now();
882 m.record_peer_state_transition_at(
883 5,
884 "dc1",
885 "rA",
886 PeerState::Unknown,
887 PeerState::Normal,
888 t0,
889 );
890 m.record_peer_state_transition_at(
892 5,
893 "dc1",
894 "rA",
895 PeerState::Normal,
896 PeerState::Down,
897 t0 + std::time::Duration::from_millis(50),
898 );
899 let s = m.snapshot();
900 let normal = s
901 .peer_state_dwell
902 .iter()
903 .find(|e| e.state == PeerState::Normal)
904 .expect("Normal dwell entry");
905 for bc in &normal.bucket_counts {
907 assert_eq!(*bc, 1, "every cumulative bucket sees the 0.05s observation");
908 }
909 }
910}