1use std::collections::HashMap;
46
47use parking_lot::Mutex;
48
49use crate::cluster::peer::PeerState;
50use crate::msg::ConsistencyLevel;
51
52#[derive(Debug, Default)]
56pub struct FailureMetrics {
57 inner: Mutex<FailureInner>,
58}
59
60#[derive(Debug, Default)]
61struct FailureInner {
62 no_targets: HashMap<NoTargetsKey, u64>,
63 peer_send_full: HashMap<PeerKey, u64>,
64 peer_send_closed: HashMap<PeerKey, u64>,
65 backend_send_full: u64,
66 backend_send_closed: u64,
67 response_timeout: HashMap<ConsistencyLevel, u64>,
68 peer_state_transitions: HashMap<TransitionKey, u64>,
69 peer_state_current: HashMap<u32, PeerStateRecord>,
70 peer_phi: HashMap<u32, PhiRecord>,
71}
72
73#[derive(Debug, Eq, PartialEq, Hash, Clone)]
74struct NoTargetsKey {
75 dc: String,
76 rack: String,
77 consistency: ConsistencyLevel,
78}
79
80#[derive(Debug, Eq, PartialEq, Hash, Clone)]
81struct PeerKey {
82 peer_idx: u32,
83 peer_dc: String,
84}
85
86#[derive(Debug, Eq, PartialEq, Hash, Clone)]
87struct TransitionKey {
88 peer_idx: u32,
89 from: PeerState,
90 to: PeerState,
91}
92
93#[derive(Debug, Clone)]
94struct PeerStateRecord {
95 dc: String,
96 rack: String,
97 state: PeerState,
98}
99
100#[derive(Debug, Clone)]
101struct PhiRecord {
102 dc: String,
103 rack: String,
104 phi_milli: i64,
107}
108
109impl FailureMetrics {
110 #[must_use]
121 pub fn new() -> Self {
122 Self::default()
123 }
124
125 pub fn record_no_targets(&self, dc: &str, rack: &str, consistency: ConsistencyLevel) {
139 let key = NoTargetsKey {
140 dc: dc.to_owned(),
141 rack: rack.to_owned(),
142 consistency,
143 };
144 let mut inner = self.inner.lock();
145 *inner.no_targets.entry(key).or_insert(0) += 1;
146 }
147
148 pub fn record_peer_send_full(&self, peer_idx: u32, peer_dc: &str) {
150 let key = PeerKey {
151 peer_idx,
152 peer_dc: peer_dc.to_owned(),
153 };
154 let mut inner = self.inner.lock();
155 *inner.peer_send_full.entry(key).or_insert(0) += 1;
156 }
157
158 pub fn record_peer_send_closed(&self, peer_idx: u32, peer_dc: &str) {
160 let key = PeerKey {
161 peer_idx,
162 peer_dc: peer_dc.to_owned(),
163 };
164 let mut inner = self.inner.lock();
165 *inner.peer_send_closed.entry(key).or_insert(0) += 1;
166 }
167
168 pub fn record_backend_send_full(&self) {
170 self.inner.lock().backend_send_full += 1;
171 }
172
173 pub fn record_backend_send_closed(&self) {
176 self.inner.lock().backend_send_closed += 1;
177 }
178
179 pub fn record_response_timeout(&self, consistency: ConsistencyLevel) {
184 let mut inner = self.inner.lock();
185 *inner.response_timeout.entry(consistency).or_insert(0) += 1;
186 }
187
188 pub fn record_peer_state_transition(
192 &self,
193 peer_idx: u32,
194 dc: &str,
195 rack: &str,
196 from: PeerState,
197 to: PeerState,
198 ) {
199 let key = TransitionKey { peer_idx, from, to };
200 let mut inner = self.inner.lock();
201 *inner.peer_state_transitions.entry(key).or_insert(0) += 1;
202 inner.peer_state_current.insert(
203 peer_idx,
204 PeerStateRecord {
205 dc: dc.to_owned(),
206 rack: rack.to_owned(),
207 state: to,
208 },
209 );
210 }
211
212 pub fn observe_peer_state(&self, peer_idx: u32, dc: &str, rack: &str, state: PeerState) {
217 let mut inner = self.inner.lock();
218 inner.peer_state_current.insert(
219 peer_idx,
220 PeerStateRecord {
221 dc: dc.to_owned(),
222 rack: rack.to_owned(),
223 state,
224 },
225 );
226 }
227
228 pub fn observe_phi(&self, peer_idx: u32, dc: &str, rack: &str, phi: f64) {
233 let phi_milli = phi_to_milli(phi);
234 let mut inner = self.inner.lock();
235 inner.peer_phi.insert(
236 peer_idx,
237 PhiRecord {
238 dc: dc.to_owned(),
239 rack: rack.to_owned(),
240 phi_milli,
241 },
242 );
243 }
244
245 #[must_use]
251 pub fn snapshot(&self) -> FailureSnapshot {
252 let inner = self.inner.lock();
253 let mut no_targets: Vec<NoTargetsEntry> = inner
254 .no_targets
255 .iter()
256 .map(|(k, v)| NoTargetsEntry {
257 dc: k.dc.clone(),
258 rack: k.rack.clone(),
259 consistency: k.consistency,
260 count: *v,
261 })
262 .collect();
263 no_targets.sort_by(|a, b| {
264 a.dc.cmp(&b.dc)
265 .then(a.rack.cmp(&b.rack))
266 .then((a.consistency as u8).cmp(&(b.consistency as u8)))
267 });
268 let peer_send_full = collect_peer_entries(&inner.peer_send_full);
269 let peer_send_closed = collect_peer_entries(&inner.peer_send_closed);
270 let mut response_timeout: Vec<TimeoutEntry> = inner
271 .response_timeout
272 .iter()
273 .map(|(c, v)| TimeoutEntry {
274 consistency: *c,
275 count: *v,
276 })
277 .collect();
278 response_timeout.sort_by_key(|e| e.consistency as u8);
279 let mut peer_state_transitions: Vec<TransitionEntry> = inner
280 .peer_state_transitions
281 .iter()
282 .map(|(k, v)| TransitionEntry {
283 peer_idx: k.peer_idx,
284 from: k.from,
285 to: k.to,
286 count: *v,
287 })
288 .collect();
289 peer_state_transitions.sort_by(|a, b| {
290 a.peer_idx
291 .cmp(&b.peer_idx)
292 .then((a.from as u8).cmp(&(b.from as u8)))
293 .then((a.to as u8).cmp(&(b.to as u8)))
294 });
295 let mut peer_state_current: Vec<PeerStateEntry> = inner
296 .peer_state_current
297 .iter()
298 .map(|(idx, rec)| PeerStateEntry {
299 peer_idx: *idx,
300 dc: rec.dc.clone(),
301 rack: rec.rack.clone(),
302 state: rec.state,
303 })
304 .collect();
305 peer_state_current.sort_by_key(|e| e.peer_idx);
306 let mut peer_phi: Vec<PhiEntry> = inner
307 .peer_phi
308 .iter()
309 .map(|(idx, rec)| PhiEntry {
310 peer_idx: *idx,
311 dc: rec.dc.clone(),
312 rack: rec.rack.clone(),
313 phi: milli_to_phi(rec.phi_milli),
314 })
315 .collect();
316 peer_phi.sort_by_key(|e| e.peer_idx);
317 FailureSnapshot {
318 no_targets,
319 peer_send_full,
320 peer_send_closed,
321 backend_send_full: inner.backend_send_full,
322 backend_send_closed: inner.backend_send_closed,
323 response_timeout,
324 peer_state_transitions,
325 peer_state_current,
326 peer_phi,
327 }
328 }
329}
330
331fn phi_to_milli(phi: f64) -> i64 {
336 let saturating = i64::MAX / 1000;
337 if phi.is_nan() {
338 return saturating;
339 }
340 if !phi.is_finite() {
341 if phi > 0.0 {
345 return saturating;
346 }
347 return 0;
348 }
349 if phi <= 0.0 {
350 return 0;
351 }
352 let scaled = (phi * 1000.0).round();
353 f64_to_i64_clamped(scaled).min(saturating)
354}
355
356fn milli_to_phi(milli: i64) -> f64 {
359 i64_to_f64(milli) / 1000.0
360}
361
362fn i64_to_f64(value: i64) -> f64 {
365 let neg = value < 0;
366 let abs = value.unsigned_abs();
367 let hi = u32::try_from(abs >> 32).unwrap_or(u32::MAX);
368 let lo = u32::try_from(abs & 0xFFFF_FFFF).unwrap_or(u32::MAX);
369 let f = f64::from(hi) * 4_294_967_296.0_f64 + f64::from(lo);
370 if neg {
371 -f
372 } else {
373 f
374 }
375}
376
377fn f64_to_i64_clamped(x: f64) -> i64 {
380 if !x.is_finite() || x <= 0.0 {
381 return 0;
382 }
383 let bits = x.to_bits();
384 let exp_field = u32::try_from((bits >> 52) & 0x7FF).unwrap_or(0);
385 let mant = bits & ((1u64 << 52) - 1);
386 if exp_field < 1023 {
387 return 0;
388 }
389 let unbiased = exp_field - 1023;
390 if unbiased >= 63 {
391 return i64::MAX;
392 }
393 let m = (1u64 << 52) | mant;
394 let value = if unbiased >= 52 {
395 let shift = unbiased - 52;
396 m.checked_shl(shift).unwrap_or(u64::MAX)
397 } else {
398 m >> (52 - unbiased)
399 };
400 i64::try_from(value).unwrap_or(i64::MAX)
401}
402
403fn collect_peer_entries(map: &HashMap<PeerKey, u64>) -> Vec<PeerEntry> {
404 let mut out: Vec<PeerEntry> = map
405 .iter()
406 .map(|(k, v)| PeerEntry {
407 peer_idx: k.peer_idx,
408 peer_dc: k.peer_dc.clone(),
409 count: *v,
410 })
411 .collect();
412 out.sort_by(|a, b| a.peer_idx.cmp(&b.peer_idx).then(a.peer_dc.cmp(&b.peer_dc)));
413 out
414}
415
416#[derive(Clone, Debug, Default, Eq, PartialEq)]
418pub struct NoTargetsEntry {
419 pub dc: String,
421 pub rack: String,
423 pub consistency: ConsistencyLevel,
426 pub count: u64,
428}
429
430#[derive(Clone, Debug, Default, Eq, PartialEq)]
432pub struct PeerEntry {
433 pub peer_idx: u32,
435 pub peer_dc: String,
437 pub count: u64,
439}
440
441#[derive(Clone, Debug, Default, Eq, PartialEq)]
443pub struct TimeoutEntry {
444 pub consistency: ConsistencyLevel,
446 pub count: u64,
448}
449
450#[derive(Clone, Debug, Eq, PartialEq)]
452pub struct TransitionEntry {
453 pub peer_idx: u32,
455 pub from: PeerState,
457 pub to: PeerState,
459 pub count: u64,
461}
462
463#[derive(Clone, Debug, Eq, PartialEq)]
465pub struct PeerStateEntry {
466 pub peer_idx: u32,
468 pub dc: String,
470 pub rack: String,
472 pub state: PeerState,
474}
475
476#[derive(Clone, Debug)]
478pub struct PhiEntry {
479 pub peer_idx: u32,
481 pub dc: String,
483 pub rack: String,
485 pub phi: f64,
487}
488
489#[derive(Clone, Debug, Default)]
491pub struct FailureSnapshot {
492 pub no_targets: Vec<NoTargetsEntry>,
494 pub peer_send_full: Vec<PeerEntry>,
496 pub peer_send_closed: Vec<PeerEntry>,
498 pub backend_send_full: u64,
500 pub backend_send_closed: u64,
502 pub response_timeout: Vec<TimeoutEntry>,
504 pub peer_state_transitions: Vec<TransitionEntry>,
506 pub peer_state_current: Vec<PeerStateEntry>,
508 pub peer_phi: Vec<PhiEntry>,
510}
511
512impl FailureSnapshot {
513 #[must_use]
518 pub fn is_empty(&self) -> bool {
519 self.no_targets.is_empty()
520 && self.peer_send_full.is_empty()
521 && self.peer_send_closed.is_empty()
522 && self.backend_send_full == 0
523 && self.backend_send_closed == 0
524 && self.response_timeout.is_empty()
525 && self.peer_state_transitions.is_empty()
526 && self.peer_state_current.is_empty()
527 && self.peer_phi.is_empty()
528 }
529}
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534
535 #[test]
536 fn no_targets_increments_per_label_set() {
537 let m = FailureMetrics::new();
538 m.record_no_targets("dc1", "rA", ConsistencyLevel::DcQuorum);
539 m.record_no_targets("dc1", "rA", ConsistencyLevel::DcQuorum);
540 m.record_no_targets("dc2", "rA", ConsistencyLevel::DcQuorum);
541 let s = m.snapshot();
542 assert_eq!(s.no_targets.len(), 2);
543 let dc1 = s.no_targets.iter().find(|e| e.dc == "dc1").unwrap();
544 let dc2 = s.no_targets.iter().find(|e| e.dc == "dc2").unwrap();
545 assert_eq!(dc1.count, 2);
546 assert_eq!(dc2.count, 1);
547 }
548
549 #[test]
550 fn peer_send_full_and_closed_are_distinct_buckets() {
551 let m = FailureMetrics::new();
552 m.record_peer_send_full(7, "dc2");
553 m.record_peer_send_closed(7, "dc2");
554 m.record_peer_send_closed(7, "dc2");
555 let s = m.snapshot();
556 assert_eq!(s.peer_send_full.len(), 1);
557 assert_eq!(s.peer_send_full[0].count, 1);
558 assert_eq!(s.peer_send_closed.len(), 1);
559 assert_eq!(s.peer_send_closed[0].count, 2);
560 }
561
562 #[test]
563 fn backend_counters_track_independently() {
564 let m = FailureMetrics::new();
565 m.record_backend_send_full();
566 m.record_backend_send_closed();
567 m.record_backend_send_closed();
568 let s = m.snapshot();
569 assert_eq!(s.backend_send_full, 1);
570 assert_eq!(s.backend_send_closed, 2);
571 }
572
573 #[test]
574 fn response_timeout_rolls_up_by_consistency() {
575 let m = FailureMetrics::new();
576 m.record_response_timeout(ConsistencyLevel::DcOne);
577 m.record_response_timeout(ConsistencyLevel::DcQuorum);
578 m.record_response_timeout(ConsistencyLevel::DcQuorum);
579 let s = m.snapshot();
580 assert_eq!(s.response_timeout.len(), 2);
581 let q = s
582 .response_timeout
583 .iter()
584 .find(|e| e.consistency == ConsistencyLevel::DcQuorum)
585 .unwrap();
586 assert_eq!(q.count, 2);
587 }
588
589 #[test]
590 fn peer_state_transition_records_count_and_current() {
591 let m = FailureMetrics::new();
592 m.record_peer_state_transition(3, "dc1", "rA", PeerState::Normal, PeerState::Down);
593 m.record_peer_state_transition(3, "dc1", "rA", PeerState::Down, PeerState::Normal);
594 m.record_peer_state_transition(3, "dc1", "rA", PeerState::Normal, PeerState::Down);
595 let s = m.snapshot();
596 let to_down = s
597 .peer_state_transitions
598 .iter()
599 .find(|t| t.from == PeerState::Normal && t.to == PeerState::Down)
600 .unwrap();
601 assert_eq!(to_down.count, 2);
602 assert_eq!(s.peer_state_current.len(), 1);
603 assert_eq!(s.peer_state_current[0].state, PeerState::Down);
604 }
605
606 #[test]
607 fn observe_phi_rounds_to_thousandths() {
608 let m = FailureMetrics::new();
609 m.observe_phi(1, "dc1", "rA", 1.234_567);
610 let s = m.snapshot();
611 assert_eq!(s.peer_phi.len(), 1);
612 let diff = (s.peer_phi[0].phi - 1.235).abs();
614 assert!(diff < 1e-9, "phi={}", s.peer_phi[0].phi);
615 }
616
617 #[test]
618 fn snapshot_empty_predicate_is_correct() {
619 let m = FailureMetrics::new();
620 assert!(m.snapshot().is_empty());
621 m.record_backend_send_full();
622 assert!(!m.snapshot().is_empty());
623 }
624}