ferro_rs/telemetry/
request_telemetry.rs1use dashmap::DashMap;
7use serde::{Deserialize, Serialize};
8use std::collections::VecDeque;
9use std::sync::OnceLock;
10use std::time::SystemTime;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct Sample {
18 pub recorded_at: SystemTime,
20 pub value: serde_json::Value,
22}
23
24impl Sample {
25 pub fn now(value: serde_json::Value) -> Self {
27 Self {
28 recorded_at: SystemTime::now(),
29 value,
30 }
31 }
32
33 pub fn at(when: SystemTime, value: serde_json::Value) -> Self {
35 Self {
36 recorded_at: when,
37 value,
38 }
39 }
40}
41
42pub struct RequestTelemetry;
48
49type BucketKey = (String, Option<String>);
52
53type TelemetryStore = DashMap<BucketKey, VecDeque<Sample>>;
56
57static TELEMETRY_STORE: OnceLock<TelemetryStore> = OnceLock::new();
58
59fn telemetry_store() -> &'static TelemetryStore {
60 TELEMETRY_STORE.get_or_init(DashMap::new)
61}
62
63pub const RING_BUFFER_CAPACITY: usize = 128;
69
70pub(crate) fn record(key: &str, scope: Option<&str>, sample: Sample) {
76 let map_key = (key.to_string(), scope.map(|s| s.to_string()));
77 let mut entry = telemetry_store()
78 .entry(map_key)
79 .or_insert_with(|| VecDeque::with_capacity(RING_BUFFER_CAPACITY));
80 entry.push_back(sample);
81 while entry.len() > RING_BUFFER_CAPACITY {
82 entry.pop_front();
83 }
84}
85
86impl RequestTelemetry {
87 pub fn snapshot(key: &str, scope: Option<&str>) -> Vec<Sample> {
89 let scope_owned = scope.map(|s| s.to_string());
90 telemetry_store()
91 .get(&(key.to_string(), scope_owned))
92 .map(|entry| entry.value().iter().cloned().collect())
93 .unwrap_or_default()
94 }
95
96 pub fn keys() -> Vec<(String, Option<String>)> {
98 telemetry_store().iter().map(|e| e.key().clone()).collect()
99 }
100
101 pub fn clear() {
103 if let Some(r) = TELEMETRY_STORE.get() {
104 r.clear();
105 }
106 }
107
108 #[cfg(test)]
111 pub(crate) fn reset() {
112 Self::clear();
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119 use serde_json::json;
120 use serial_test::serial;
121 use std::thread;
122 use std::time::Duration;
123
124 #[test]
125 #[serial]
126 fn sample_constructors() {
127 let before = SystemTime::now();
128 let s = Sample::now(json!({"x": 1}));
129 let after = SystemTime::now();
130 assert!(s.recorded_at >= before && s.recorded_at <= after);
131 assert_eq!(s.value, json!({"x": 1}));
132
133 let epoch = SystemTime::UNIX_EPOCH;
134 let s2 = Sample::at(epoch, json!({"x": 2}));
135 assert_eq!(s2.recorded_at, epoch);
136 assert_eq!(s2.value, json!({"x": 2}));
137 }
138
139 #[test]
140 #[serial]
141 fn record_and_snapshot_round_trip() {
142 RequestTelemetry::reset();
143 record("k1", None, Sample::now(json!({"i": 0})));
144 record("k1", None, Sample::now(json!({"i": 1})));
145 record("k1", None, Sample::now(json!({"i": 2})));
146 let snap = RequestTelemetry::snapshot("k1", None);
147 assert_eq!(snap.len(), 3);
148 assert_eq!(snap[0].value, json!({"i": 0}));
149 assert_eq!(snap[1].value, json!({"i": 1}));
150 assert_eq!(snap[2].value, json!({"i": 2}));
151 }
152
153 #[test]
154 #[serial]
155 fn snapshot_empty_when_no_record() {
156 RequestTelemetry::reset();
157 let snap = RequestTelemetry::snapshot("nonexistent", None);
158 assert!(snap.is_empty());
159 }
160
161 #[test]
162 #[serial]
163 fn ring_buffer_caps_at_128() {
164 RequestTelemetry::reset();
165 for i in 0..200usize {
166 record("cap", None, Sample::now(json!({"i": i})));
167 }
168 let snap = RequestTelemetry::snapshot("cap", None);
169 assert_eq!(snap.len(), 128);
170 assert_eq!(snap[0].value, json!({"i": 72}));
172 assert_eq!(snap[127].value, json!({"i": 199}));
173 }
174
175 #[test]
176 #[serial]
177 fn scope_isolation() {
178 RequestTelemetry::reset();
179 record("s", Some("a"), Sample::now(json!({"side": "a"})));
180 record("s", Some("b"), Sample::now(json!({"side": "b"})));
181 let snap_a = RequestTelemetry::snapshot("s", Some("a"));
182 let snap_b = RequestTelemetry::snapshot("s", Some("b"));
183 assert_eq!(snap_a.len(), 1);
184 assert_eq!(snap_b.len(), 1);
185 assert_eq!(snap_a[0].value, json!({"side": "a"}));
186 assert_eq!(snap_b[0].value, json!({"side": "b"}));
187 }
188
189 #[test]
190 #[serial]
191 fn concurrent_record_no_deadlock() {
192 RequestTelemetry::reset();
193 let handles: Vec<_> = (0..8u32)
194 .map(|t| {
195 thread::spawn(move || {
196 for i in 0..50usize {
197 record("concurrent", None, Sample::now(json!({"t": t, "i": i})));
198 }
199 })
200 })
201 .collect();
202 for h in handles {
203 h.join().expect("thread panicked");
204 }
205 let snap = RequestTelemetry::snapshot("concurrent", None);
206 assert_eq!(snap.len(), 128);
208 let _ = Duration::from_secs(5);
211 }
212
213 #[test]
214 #[serial]
215 fn reset_clears_store() {
216 RequestTelemetry::reset();
217 record("transient", None, Sample::now(json!({"x": 1})));
218 assert_eq!(RequestTelemetry::snapshot("transient", None).len(), 1);
219 RequestTelemetry::reset();
220 assert!(RequestTelemetry::snapshot("transient", None).is_empty());
221 }
222
223 #[test]
224 #[serial]
225 fn keys_lists_all_buckets() {
226 RequestTelemetry::reset();
227 record("a", None, Sample::now(json!({})));
228 record("b", Some("x"), Sample::now(json!({})));
229 record("c", Some("y"), Sample::now(json!({})));
230 let keys = RequestTelemetry::keys();
231 assert_eq!(keys.len(), 3);
232 }
233}