Skip to main content

nodedb_bridge/
telemetry.rs

1//! Lock-free per-core telemetry buffer.
2//!
3//! Data Plane cores MUST NOT serve HTTP endpoints or accept Prometheus scrapes.
4//! Instead, each core writes metric samples into a lock-free ring buffer.
5//! The Control Plane (Tokio) reads these buffers and serves aggregated metrics.
6//!
7//! This ensures observability never introduces jitter on the Data Plane.
8//!
9//! ## Design
10//!
11//! - One `TelemetryRing` per Data Plane core.
12//! - Producer: the Data Plane core (single writer, no contention).
13//! - Consumer: the Control Plane metrics exporter (periodic drain).
14//! - Overflow policy: drop oldest samples (metrics are ephemeral).
15
16use std::sync::atomic::{AtomicU64, Ordering};
17
18/// A single metric sample written by a Data Plane core.
19#[derive(Debug, Clone, Copy)]
20pub struct MetricSample {
21    /// Metric identifier (interned string index, not a heap String).
22    pub metric_id: u32,
23
24    /// Timestamp in nanoseconds since epoch (from `CLOCK_MONOTONIC`).
25    pub timestamp_ns: u64,
26
27    /// The metric value.
28    pub value: MetricValue,
29}
30
31/// Metric value types.
32#[derive(Debug, Clone, Copy)]
33pub enum MetricValue {
34    /// Monotonic counter increment.
35    Counter(u64),
36    /// Point-in-time gauge reading.
37    Gauge(f64),
38    /// Histogram observation (value to bucket).
39    Histogram(f64),
40}
41
42/// Fixed-capacity ring buffer for metric samples from a single Data Plane core.
43///
44/// Single-producer (Data Plane core), single-consumer (Control Plane exporter).
45/// On overflow, the oldest samples are silently dropped — this is acceptable
46/// for metrics because they are ephemeral and the exporter scrapes frequently.
47pub struct TelemetryRing {
48    /// Slot array.
49    slots: Box<[MetricSample]>,
50
51    /// Write cursor (Data Plane only). Monotonically increasing.
52    write_pos: AtomicU64,
53
54    /// Read cursor (Control Plane only). Chases write_pos.
55    read_pos: AtomicU64,
56
57    /// Capacity (power of two).
58    capacity: usize,
59
60    /// Bitmask for fast modulo.
61    mask: usize,
62
63    /// Count of samples dropped due to overflow.
64    dropped: AtomicU64,
65}
66
67// SAFETY: Same SPSC argument as the bridge buffer — single producer, single consumer,
68// disjoint slots. The atomics are the only shared state.
69unsafe impl Send for TelemetryRing {}
70unsafe impl Sync for TelemetryRing {}
71
72impl TelemetryRing {
73    /// Create a new telemetry ring with the given capacity (rounded to power of two).
74    pub fn new(capacity: usize) -> Self {
75        let capacity = capacity.next_power_of_two();
76        let mask = capacity - 1;
77
78        let default_sample = MetricSample {
79            metric_id: 0,
80            timestamp_ns: 0,
81            value: MetricValue::Counter(0),
82        };
83
84        Self {
85            slots: vec![default_sample; capacity].into_boxed_slice(),
86            write_pos: AtomicU64::new(0),
87            read_pos: AtomicU64::new(0),
88            capacity,
89            mask,
90            dropped: AtomicU64::new(0),
91        }
92    }
93
94    /// Record a metric sample (called from Data Plane core).
95    ///
96    /// Lock-free, allocation-free. If the ring is full, the oldest sample
97    /// is overwritten and the drop counter incremented.
98    pub fn record(&mut self, sample: MetricSample) {
99        let pos = self.write_pos.load(Ordering::Relaxed);
100        let read = self.read_pos.load(Ordering::Relaxed);
101
102        // If we've lapped the reader, advance the reader (drop oldest).
103        if pos.wrapping_sub(read) >= self.capacity as u64 {
104            self.read_pos.store(
105                pos.wrapping_sub(self.capacity as u64 - 1),
106                Ordering::Relaxed,
107            );
108            self.dropped.fetch_add(1, Ordering::Relaxed);
109        }
110
111        let idx = (pos as usize) & self.mask;
112        self.slots[idx] = sample;
113        self.write_pos.store(pos.wrapping_add(1), Ordering::Release);
114    }
115
116    /// Drain all available samples into the provided buffer (called from Control Plane).
117    ///
118    /// Returns the number of samples drained.
119    pub fn drain_into(&self, buf: &mut Vec<MetricSample>) -> usize {
120        let write = self.write_pos.load(Ordering::Acquire);
121        let read = self.read_pos.load(Ordering::Relaxed);
122
123        let available = write.wrapping_sub(read) as usize;
124        if available == 0 {
125            return 0;
126        }
127
128        for i in 0..available {
129            let idx = ((read.wrapping_add(i as u64)) as usize) & self.mask;
130            buf.push(self.slots[idx]);
131        }
132
133        self.read_pos.store(write, Ordering::Release);
134        available
135    }
136
137    /// Number of samples dropped due to ring overflow.
138    pub fn dropped_count(&self) -> u64 {
139        self.dropped.load(Ordering::Relaxed)
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146
147    fn sample(id: u32, val: u64) -> MetricSample {
148        MetricSample {
149            metric_id: id,
150            timestamp_ns: val,
151            value: MetricValue::Counter(val),
152        }
153    }
154
155    #[test]
156    fn basic_record_and_drain() {
157        let mut ring = TelemetryRing::new(8);
158
159        ring.record(sample(1, 100));
160        ring.record(sample(2, 200));
161        ring.record(sample(3, 300));
162
163        let mut buf = Vec::new();
164        let count = ring.drain_into(&mut buf);
165        assert_eq!(count, 3);
166        assert_eq!(buf[0].metric_id, 1);
167        assert_eq!(buf[2].metric_id, 3);
168    }
169
170    #[test]
171    fn overflow_drops_oldest() {
172        let mut ring = TelemetryRing::new(4);
173
174        // Write 6 samples into a capacity-4 ring.
175        for i in 0..6 {
176            ring.record(sample(i, i as u64));
177        }
178
179        assert!(ring.dropped_count() > 0);
180
181        let mut buf = Vec::new();
182        ring.drain_into(&mut buf);
183
184        // Should have the most recent samples, not the oldest.
185        assert!(!buf.is_empty());
186        let last = buf.last().unwrap();
187        assert_eq!(last.metric_id, 5);
188    }
189
190    #[test]
191    fn empty_drain_returns_zero() {
192        let ring = TelemetryRing::new(8);
193        let mut buf = Vec::new();
194        assert_eq!(ring.drain_into(&mut buf), 0);
195    }
196}