Skip to main content

ferro_rs/telemetry/
request_telemetry.rs

1//! Per-key in-process ring buffer for sampled time-series telemetry.
2//!
3//! Process-global storage. Bounded at 128 samples per `(key, scope)` pair.
4//! Oldest samples are dropped on overflow. Lost on process restart.
5
6use dashmap::DashMap;
7use serde::{Deserialize, Serialize};
8use std::collections::VecDeque;
9use std::sync::OnceLock;
10use std::time::SystemTime;
11
12/// One sample recorded against a `(key, scope)` bucket.
13///
14/// `value` is a `serde_json::Value` so heterogeneous payloads from different
15/// writers can share a snapshot reader.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct Sample {
18    /// Wall-clock timestamp the sample was recorded at.
19    pub recorded_at: SystemTime,
20    /// Caller-chosen payload.
21    pub value: serde_json::Value,
22}
23
24impl Sample {
25    /// Record `value` at `SystemTime::now()`.
26    pub fn now(value: serde_json::Value) -> Self {
27        Self {
28            recorded_at: SystemTime::now(),
29            value,
30        }
31    }
32
33    /// Record `value` at a caller-provided timestamp (for backfilled batches).
34    pub fn at(when: SystemTime, value: serde_json::Value) -> Self {
35        Self {
36            recorded_at: when,
37            value,
38        }
39    }
40}
41
42/// Namespace for operator-side static methods over the global telemetry store.
43///
44/// Writers do not interact with this type directly — they call
45/// `crate::http::Request::telemetry_record` or
46/// `crate::http::Request::telemetry_record_scoped`.
47pub struct RequestTelemetry;
48
49/// `(key, scope)` bucket identifier. `scope` is caller-defined; convention is
50/// `"tenant:42"`, `"route:/api/products"`, etc.
51type BucketKey = (String, Option<String>);
52
53/// Per-bucket sample storage. `VecDeque` is used for O(1) push-back / pop-front
54/// ring-buffer semantics.
55type TelemetryStore = DashMap<BucketKey, VecDeque<Sample>>;
56
57static TELEMETRY_STORE: OnceLock<TelemetryStore> = OnceLock::new();
58
59fn telemetry_store() -> &'static TelemetryStore {
60    TELEMETRY_STORE.get_or_init(DashMap::new)
61}
62
63/// Per-bucket ring-buffer capacity. Power of 2; chosen for cheap arithmetic
64/// and a "lots of samples without crowding memory" budget.
65///
66/// Exposed publicly so operator dashboards rendering "showing N of CAPACITY
67/// samples" can refer to the const symbolically instead of hardcoding `128`.
68pub const RING_BUFFER_CAPACITY: usize = 128;
69
70/// Push `sample` into the `(key, scope)` bucket. Drops the oldest if the bucket
71/// exceeds [`RING_BUFFER_CAPACITY`].
72///
73/// Called from `crate::http::Request::telemetry_record` and
74/// `crate::http::Request::telemetry_record_scoped`.
75pub(crate) fn record(key: &str, scope: Option<&str>, sample: Sample) {
76    let map_key = (key.to_string(), scope.map(|s| s.to_string()));
77    let mut entry = telemetry_store()
78        .entry(map_key)
79        .or_insert_with(|| VecDeque::with_capacity(RING_BUFFER_CAPACITY));
80    entry.push_back(sample);
81    while entry.len() > RING_BUFFER_CAPACITY {
82        entry.pop_front();
83    }
84}
85
86impl RequestTelemetry {
87    /// Clone all samples recorded under `(key, scope)` into a new `Vec` (FIFO order).
88    pub fn snapshot(key: &str, scope: Option<&str>) -> Vec<Sample> {
89        let scope_owned = scope.map(|s| s.to_string());
90        telemetry_store()
91            .get(&(key.to_string(), scope_owned))
92            .map(|entry| entry.value().iter().cloned().collect())
93            .unwrap_or_default()
94    }
95
96    /// List every `(key, scope)` pair that has at least one recorded sample.
97    pub fn keys() -> Vec<(String, Option<String>)> {
98        telemetry_store().iter().map(|e| e.key().clone()).collect()
99    }
100
101    /// Drop every sample in every bucket. Intended for operator-driven resets.
102    pub fn clear() {
103        if let Some(r) = TELEMETRY_STORE.get() {
104            r.clear();
105        }
106    }
107
108    /// Test-isolation helper. Same effect as [`RequestTelemetry::clear`] but
109    /// scoped to `#[cfg(test)]` builds so production code cannot reach for it.
110    #[cfg(test)]
111    pub(crate) fn reset() {
112        Self::clear();
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use serde_json::json;
120    use serial_test::serial;
121    use std::thread;
122    use std::time::Duration;
123
124    #[test]
125    #[serial]
126    fn sample_constructors() {
127        let before = SystemTime::now();
128        let s = Sample::now(json!({"x": 1}));
129        let after = SystemTime::now();
130        assert!(s.recorded_at >= before && s.recorded_at <= after);
131        assert_eq!(s.value, json!({"x": 1}));
132
133        let epoch = SystemTime::UNIX_EPOCH;
134        let s2 = Sample::at(epoch, json!({"x": 2}));
135        assert_eq!(s2.recorded_at, epoch);
136        assert_eq!(s2.value, json!({"x": 2}));
137    }
138
139    #[test]
140    #[serial]
141    fn record_and_snapshot_round_trip() {
142        RequestTelemetry::reset();
143        record("k1", None, Sample::now(json!({"i": 0})));
144        record("k1", None, Sample::now(json!({"i": 1})));
145        record("k1", None, Sample::now(json!({"i": 2})));
146        let snap = RequestTelemetry::snapshot("k1", None);
147        assert_eq!(snap.len(), 3);
148        assert_eq!(snap[0].value, json!({"i": 0}));
149        assert_eq!(snap[1].value, json!({"i": 1}));
150        assert_eq!(snap[2].value, json!({"i": 2}));
151    }
152
153    #[test]
154    #[serial]
155    fn snapshot_empty_when_no_record() {
156        RequestTelemetry::reset();
157        let snap = RequestTelemetry::snapshot("nonexistent", None);
158        assert!(snap.is_empty());
159    }
160
161    #[test]
162    #[serial]
163    fn ring_buffer_caps_at_128() {
164        RequestTelemetry::reset();
165        for i in 0..200usize {
166            record("cap", None, Sample::now(json!({"i": i})));
167        }
168        let snap = RequestTelemetry::snapshot("cap", None);
169        assert_eq!(snap.len(), 128);
170        // Oldest 72 dropped; first remaining sample's "i" field == 72.
171        assert_eq!(snap[0].value, json!({"i": 72}));
172        assert_eq!(snap[127].value, json!({"i": 199}));
173    }
174
175    #[test]
176    #[serial]
177    fn scope_isolation() {
178        RequestTelemetry::reset();
179        record("s", Some("a"), Sample::now(json!({"side": "a"})));
180        record("s", Some("b"), Sample::now(json!({"side": "b"})));
181        let snap_a = RequestTelemetry::snapshot("s", Some("a"));
182        let snap_b = RequestTelemetry::snapshot("s", Some("b"));
183        assert_eq!(snap_a.len(), 1);
184        assert_eq!(snap_b.len(), 1);
185        assert_eq!(snap_a[0].value, json!({"side": "a"}));
186        assert_eq!(snap_b[0].value, json!({"side": "b"}));
187    }
188
189    #[test]
190    #[serial]
191    fn concurrent_record_no_deadlock() {
192        RequestTelemetry::reset();
193        let handles: Vec<_> = (0..8u32)
194            .map(|t| {
195                thread::spawn(move || {
196                    for i in 0..50usize {
197                        record("concurrent", None, Sample::now(json!({"t": t, "i": i})));
198                    }
199                })
200            })
201            .collect();
202        for h in handles {
203            h.join().expect("thread panicked");
204        }
205        let snap = RequestTelemetry::snapshot("concurrent", None);
206        // 8 threads * 50 samples = 400 attempted; capped at 128.
207        assert_eq!(snap.len(), 128);
208        // Smoke test for liveness: this test must complete in under the
209        // default cargo-test timeout. If it deadlocks, cargo test hangs.
210        let _ = Duration::from_secs(5);
211    }
212
213    #[test]
214    #[serial]
215    fn reset_clears_store() {
216        RequestTelemetry::reset();
217        record("transient", None, Sample::now(json!({"x": 1})));
218        assert_eq!(RequestTelemetry::snapshot("transient", None).len(), 1);
219        RequestTelemetry::reset();
220        assert!(RequestTelemetry::snapshot("transient", None).is_empty());
221    }
222
223    #[test]
224    #[serial]
225    fn keys_lists_all_buckets() {
226        RequestTelemetry::reset();
227        record("a", None, Sample::now(json!({})));
228        record("b", Some("x"), Sample::now(json!({})));
229        record("c", Some("y"), Sample::now(json!({})));
230        let keys = RequestTelemetry::keys();
231        assert_eq!(keys.len(), 3);
232    }
233}