Skip to main content

shelly/
telemetry.rs

1use serde::{Deserialize, Serialize};
2use serde_json::{Map, Value};
3use std::sync::{Arc, Mutex};
4
5#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
6#[serde(rename_all = "snake_case")]
7pub enum TelemetryEventKind {
8    Connect,
9    Disconnect,
10    Mount,
11    HandleEvent,
12    Patch,
13    Diff,
14    RenderCadence,
15    StreamInsert,
16    StreamDelete,
17    PubSubFanout,
18    UploadLifecycle,
19    SecurityAudit,
20    JobOutcome,
21    MigrationOutcome,
22    Error,
23}
24
25#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
26pub struct TelemetryEvent {
27    pub kind: TelemetryEventKind,
28    #[serde(default, skip_serializing_if = "Option::is_none")]
29    pub trace_id: Option<String>,
30    #[serde(default, skip_serializing_if = "Option::is_none")]
31    pub span_id: Option<String>,
32    #[serde(default, skip_serializing_if = "Option::is_none")]
33    pub parent_span_id: Option<String>,
34    #[serde(default, skip_serializing_if = "Option::is_none")]
35    pub correlation_id: Option<String>,
36    #[serde(default, skip_serializing_if = "Option::is_none")]
37    pub request_id: Option<String>,
38    pub session_id: Option<String>,
39    pub route_path: Option<String>,
40    pub event_name: Option<String>,
41    pub ok: bool,
42    pub latency_ms: Option<u64>,
43    pub bytes: Option<usize>,
44    pub count: Option<usize>,
45    #[serde(default)]
46    pub attributes: Map<String, Value>,
47}
48
49impl TelemetryEvent {
50    pub fn new(kind: TelemetryEventKind) -> Self {
51        Self {
52            kind,
53            trace_id: None,
54            span_id: None,
55            parent_span_id: None,
56            correlation_id: None,
57            request_id: None,
58            session_id: None,
59            route_path: None,
60            event_name: None,
61            ok: true,
62            latency_ms: None,
63            bytes: None,
64            count: None,
65            attributes: Map::new(),
66        }
67    }
68
69    pub fn with_session(mut self, session_id: impl Into<String>) -> Self {
70        self.session_id = Some(session_id.into());
71        self
72    }
73
74    pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
75        self.trace_id = Some(trace_id.into());
76        self
77    }
78
79    pub fn with_span_id(mut self, span_id: impl Into<String>) -> Self {
80        self.span_id = Some(span_id.into());
81        self
82    }
83
84    pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
85        self.parent_span_id = Some(parent_span_id.into());
86        self
87    }
88
89    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
90        self.correlation_id = Some(correlation_id.into());
91        self
92    }
93
94    pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self {
95        self.request_id = Some(request_id.into());
96        self
97    }
98
99    pub fn with_route(mut self, route_path: impl Into<String>) -> Self {
100        self.route_path = Some(route_path.into());
101        self
102    }
103
104    pub fn with_event_name(mut self, event_name: impl Into<String>) -> Self {
105        self.event_name = Some(event_name.into());
106        self
107    }
108
109    pub fn with_latency_ms(mut self, latency_ms: u64) -> Self {
110        self.latency_ms = Some(latency_ms);
111        self
112    }
113
114    pub fn with_bytes(mut self, bytes: usize) -> Self {
115        self.bytes = Some(bytes);
116        self
117    }
118
119    pub fn with_count(mut self, count: usize) -> Self {
120        self.count = Some(count);
121        self
122    }
123
124    pub fn with_ok(mut self, ok: bool) -> Self {
125        self.ok = ok;
126        self
127    }
128
129    pub fn with_attribute(mut self, key: impl Into<String>, value: Value) -> Self {
130        self.attributes.insert(key.into(), value);
131        self
132    }
133}
134
135pub trait TelemetrySink: Send + Sync {
136    fn emit(&self, event: TelemetryEvent) -> Result<(), String>;
137}
138
139#[derive(Debug, Default)]
140pub struct NoopTelemetrySink;
141
142impl TelemetrySink for NoopTelemetrySink {
143    fn emit(&self, _event: TelemetryEvent) -> Result<(), String> {
144        Ok(())
145    }
146}
147
148#[derive(Debug, Default, Clone)]
149pub struct MemoryTelemetrySink {
150    events: Arc<Mutex<Vec<TelemetryEvent>>>,
151}
152
153impl MemoryTelemetrySink {
154    pub fn new() -> Self {
155        Self::default()
156    }
157
158    pub fn events(&self) -> Vec<TelemetryEvent> {
159        self.events.lock().expect("telemetry events mutex").clone()
160    }
161}
162
163impl TelemetrySink for MemoryTelemetrySink {
164    fn emit(&self, event: TelemetryEvent) -> Result<(), String> {
165        self.events
166            .lock()
167            .map_err(|_| "telemetry events mutex poisoned".to_string())?
168            .push(event);
169        Ok(())
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::{
176        MemoryTelemetrySink, NoopTelemetrySink, TelemetryEvent, TelemetryEventKind, TelemetrySink,
177    };
178    use serde_json::json;
179
180    #[test]
181    fn telemetry_event_builders_populate_all_optional_fields() {
182        let event = TelemetryEvent::new(TelemetryEventKind::HandleEvent)
183            .with_session("session-1")
184            .with_trace_id("trace-1")
185            .with_span_id("span-1")
186            .with_parent_span_id("parent-1")
187            .with_correlation_id("corr-1")
188            .with_request_id("req-1")
189            .with_route("/dashboard")
190            .with_event_name("refresh")
191            .with_latency_ms(42)
192            .with_bytes(512)
193            .with_count(3)
194            .with_ok(false)
195            .with_attribute("tenant", json!("acme"));
196
197        assert_eq!(event.session_id.as_deref(), Some("session-1"));
198        assert_eq!(event.trace_id.as_deref(), Some("trace-1"));
199        assert_eq!(event.span_id.as_deref(), Some("span-1"));
200        assert_eq!(event.parent_span_id.as_deref(), Some("parent-1"));
201        assert_eq!(event.correlation_id.as_deref(), Some("corr-1"));
202        assert_eq!(event.request_id.as_deref(), Some("req-1"));
203        assert_eq!(event.route_path.as_deref(), Some("/dashboard"));
204        assert_eq!(event.event_name.as_deref(), Some("refresh"));
205        assert_eq!(event.latency_ms, Some(42));
206        assert_eq!(event.bytes, Some(512));
207        assert_eq!(event.count, Some(3));
208        assert!(!event.ok);
209        assert_eq!(event.attributes.get("tenant"), Some(&json!("acme")));
210    }
211
212    #[test]
213    fn memory_sink_records_events_and_reports_poisoned_mutex() {
214        let sink = MemoryTelemetrySink::new();
215        sink.emit(TelemetryEvent::new(TelemetryEventKind::Connect))
216            .expect("first emit should succeed");
217        assert_eq!(sink.events().len(), 1);
218
219        let poisoned = sink.clone();
220        let _ = std::panic::catch_unwind(move || {
221            let _guard = poisoned.events.lock().expect("lock sink");
222            panic!("poison sink");
223        });
224
225        let err = sink
226            .emit(TelemetryEvent::new(TelemetryEventKind::Disconnect))
227            .expect_err("poisoned sink should fail");
228        assert!(err.contains("poisoned"));
229    }
230
231    #[test]
232    fn noop_sink_accepts_events() {
233        let sink = NoopTelemetrySink;
234        sink.emit(TelemetryEvent::new(TelemetryEventKind::Patch))
235            .expect("noop sink should always succeed");
236    }
237}