Skip to main content

nodedb_cluster/wire_version/
metrics.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Metrics for unknown or mismatched wire versions observed on inbound frames.
4//!
5//! # Wiring
6//!
7//! This module defines a struct holding per-key atomic counters. An instance
8//! should be stored in the cluster subsystem's shared state (e.g. alongside
9//! `LoopMetricsRegistry`) and incremented whenever `decode_versioned` returns
10//! `WireVersionError::UnsupportedVersion`.
11//!
12//! PHASE2-WIRE: Expose `WireVersionMetrics` from the cluster subsystem and
13//! register its counters with the HTTP metrics endpoint (same pattern as
14//! `LoopMetrics`). Until then, the counters accumulate in memory and can be
15//! read programmatically by tests.
16
17use std::sync::Arc;
18use std::sync::atomic::{AtomicU64, Ordering};
19
20/// A single counter bucket keyed by `(peer_node_id, message_type_tag, version)`.
21#[derive(Debug)]
22pub struct WireVersionCounter {
23    /// Node ID of the peer that sent the unsupported version.
24    pub peer_node_id: u64,
25    /// Human-readable message type tag (e.g. "MetadataEntry", "RaftRpc",
26    /// "SyncFrame"). Stored as a `&'static str` to avoid allocation on the
27    /// hot path.
28    pub message_type: &'static str,
29    /// The unsupported version number observed.
30    pub version: u16,
31    /// Cumulative count of frames with this key.
32    pub count: AtomicU64,
33}
34
35impl WireVersionCounter {
36    fn new(peer_node_id: u64, message_type: &'static str, version: u16) -> Self {
37        Self {
38            peer_node_id,
39            message_type,
40            version,
41            count: AtomicU64::new(0),
42        }
43    }
44}
45
46/// Registry of unknown wire-version counters, keyed by
47/// `(peer_node_id, message_type, version)`.
48///
49/// Designed for low-frequency events (version mismatches are rare after a
50/// successful negotiation handshake). Uses a `parking_lot::RwLock`-free
51/// append-only `Vec` behind an `Arc` for simplicity; contention is negligible.
52#[derive(Debug, Default, Clone)]
53pub struct WireVersionMetrics {
54    inner: Arc<std::sync::Mutex<Vec<Arc<WireVersionCounter>>>>,
55}
56
57impl WireVersionMetrics {
58    /// Increment the counter for `(peer_node_id, message_type, version)`,
59    /// creating a new bucket if this is the first observation.
60    pub fn increment_unknown_version(
61        &self,
62        peer_node_id: u64,
63        message_type: &'static str,
64        version: u16,
65    ) {
66        let mut guard = self.inner.lock().unwrap_or_else(|p| p.into_inner());
67
68        // Linear scan — buckets accumulate slowly (one per distinct
69        // unsupported version per peer type), so O(n) is acceptable.
70        for bucket in guard.iter() {
71            if bucket.peer_node_id == peer_node_id
72                && bucket.message_type == message_type
73                && bucket.version == version
74            {
75                bucket.count.fetch_add(1, Ordering::Relaxed);
76                return;
77            }
78        }
79
80        // First observation: create bucket and immediately record.
81        let bucket = Arc::new(WireVersionCounter::new(peer_node_id, message_type, version));
82        bucket.count.fetch_add(1, Ordering::Relaxed);
83        guard.push(bucket);
84    }
85
86    /// Return a snapshot of all counter values, sorted by
87    /// `(peer_node_id, message_type, version)`.
88    pub fn snapshot(&self) -> Vec<(u64, &'static str, u16, u64)> {
89        let guard = self.inner.lock().unwrap_or_else(|p| p.into_inner());
90        let mut out: Vec<(u64, &'static str, u16, u64)> = guard
91            .iter()
92            .map(|b| {
93                (
94                    b.peer_node_id,
95                    b.message_type,
96                    b.version,
97                    b.count.load(Ordering::Relaxed),
98                )
99            })
100            .collect();
101        out.sort_by_key(|(peer, msg, ver, _)| (*peer, *msg, *ver));
102        out
103    }
104
105    /// Total number of unknown-version frames seen across all buckets.
106    pub fn total(&self) -> u64 {
107        let guard = self.inner.lock().unwrap_or_else(|p| p.into_inner());
108        guard.iter().map(|b| b.count.load(Ordering::Relaxed)).sum()
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115
116    #[test]
117    fn counter_accumulates() {
118        let m = WireVersionMetrics::default();
119        m.increment_unknown_version(1, "MetadataEntry", 9999);
120        m.increment_unknown_version(1, "MetadataEntry", 9999);
121        m.increment_unknown_version(2, "RaftRpc", 9999);
122        assert_eq!(m.total(), 3);
123        let snap = m.snapshot();
124        assert_eq!(snap.len(), 2);
125        assert_eq!(snap[0], (1, "MetadataEntry", 9999, 2));
126        assert_eq!(snap[1], (2, "RaftRpc", 9999, 1));
127    }
128}