nodedb_cluster/wire_version/metrics.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Metrics for unknown or mismatched wire versions observed on inbound frames.
4//!
5//! # Wiring
6//!
7//! This module defines a struct holding per-key atomic counters. An instance
8//! should be stored in the cluster subsystem's shared state (e.g. alongside
9//! `LoopMetricsRegistry`) and incremented whenever `decode_versioned` returns
10//! `WireVersionError::UnsupportedVersion`.
11//!
12//! PHASE2-WIRE: Expose `WireVersionMetrics` from the cluster subsystem and
13//! register its counters with the HTTP metrics endpoint (same pattern as
14//! `LoopMetrics`). Until then, the counters accumulate in memory and can be
15//! read programmatically by tests.
16
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU64, Ordering};
19
20/// A single counter bucket keyed by `(peer_node_id, message_type_tag, version)`.
21#[derive(Debug)]
22pub struct WireVersionCounter {
23 /// Node ID of the peer that sent the unsupported version.
24 pub peer_node_id: u64,
25 /// Human-readable message type tag (e.g. "MetadataEntry", "RaftRpc",
26 /// "SyncFrame"). Stored as a `&'static str` to avoid allocation on the
27 /// hot path.
28 pub message_type: &'static str,
29 /// The unsupported version number observed.
30 pub version: u16,
31 /// Cumulative count of frames with this key.
32 pub count: AtomicU64,
33}
34
35impl WireVersionCounter {
36 fn new(peer_node_id: u64, message_type: &'static str, version: u16) -> Self {
37 Self {
38 peer_node_id,
39 message_type,
40 version,
41 count: AtomicU64::new(0),
42 }
43 }
44}
45
46/// Registry of unknown wire-version counters, keyed by
47/// `(peer_node_id, message_type, version)`.
48///
49/// Designed for low-frequency events (version mismatches are rare after a
50/// successful negotiation handshake). Uses a `parking_lot::RwLock`-free
51/// append-only `Vec` behind an `Arc` for simplicity; contention is negligible.
52#[derive(Debug, Default, Clone)]
53pub struct WireVersionMetrics {
54 inner: Arc<std::sync::Mutex<Vec<Arc<WireVersionCounter>>>>,
55}
56
57impl WireVersionMetrics {
58 /// Increment the counter for `(peer_node_id, message_type, version)`,
59 /// creating a new bucket if this is the first observation.
60 pub fn increment_unknown_version(
61 &self,
62 peer_node_id: u64,
63 message_type: &'static str,
64 version: u16,
65 ) {
66 let mut guard = self.inner.lock().unwrap_or_else(|p| p.into_inner());
67
68 // Linear scan — buckets accumulate slowly (one per distinct
69 // unsupported version per peer type), so O(n) is acceptable.
70 for bucket in guard.iter() {
71 if bucket.peer_node_id == peer_node_id
72 && bucket.message_type == message_type
73 && bucket.version == version
74 {
75 bucket.count.fetch_add(1, Ordering::Relaxed);
76 return;
77 }
78 }
79
80 // First observation: create bucket and immediately record.
81 let bucket = Arc::new(WireVersionCounter::new(peer_node_id, message_type, version));
82 bucket.count.fetch_add(1, Ordering::Relaxed);
83 guard.push(bucket);
84 }
85
86 /// Return a snapshot of all counter values, sorted by
87 /// `(peer_node_id, message_type, version)`.
88 pub fn snapshot(&self) -> Vec<(u64, &'static str, u16, u64)> {
89 let guard = self.inner.lock().unwrap_or_else(|p| p.into_inner());
90 let mut out: Vec<(u64, &'static str, u16, u64)> = guard
91 .iter()
92 .map(|b| {
93 (
94 b.peer_node_id,
95 b.message_type,
96 b.version,
97 b.count.load(Ordering::Relaxed),
98 )
99 })
100 .collect();
101 out.sort_by_key(|(peer, msg, ver, _)| (*peer, *msg, *ver));
102 out
103 }
104
105 /// Total number of unknown-version frames seen across all buckets.
106 pub fn total(&self) -> u64 {
107 let guard = self.inner.lock().unwrap_or_else(|p| p.into_inner());
108 guard.iter().map(|b| b.count.load(Ordering::Relaxed)).sum()
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115
116 #[test]
117 fn counter_accumulates() {
118 let m = WireVersionMetrics::default();
119 m.increment_unknown_version(1, "MetadataEntry", 9999);
120 m.increment_unknown_version(1, "MetadataEntry", 9999);
121 m.increment_unknown_version(2, "RaftRpc", 9999);
122 assert_eq!(m.total(), 3);
123 let snap = m.snapshot();
124 assert_eq!(snap.len(), 2);
125 assert_eq!(snap[0], (1, "MetadataEntry", 9999, 2));
126 assert_eq!(snap[1], (2, "RaftRpc", 9999, 1));
127 }
128}