1use serde::{Deserialize, Serialize};
2use serde_json::{Map, Value};
3use std::sync::{Arc, Mutex};
4
5#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
6#[serde(rename_all = "snake_case")]
7pub enum TelemetryEventKind {
8 Connect,
9 Disconnect,
10 Mount,
11 HandleEvent,
12 Patch,
13 Diff,
14 RenderCadence,
15 StreamInsert,
16 StreamDelete,
17 PubSubFanout,
18 UploadLifecycle,
19 SecurityAudit,
20 JobOutcome,
21 MigrationOutcome,
22 Error,
23}
24
25#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
26pub struct TelemetryEvent {
27 pub kind: TelemetryEventKind,
28 #[serde(default, skip_serializing_if = "Option::is_none")]
29 pub trace_id: Option<String>,
30 #[serde(default, skip_serializing_if = "Option::is_none")]
31 pub span_id: Option<String>,
32 #[serde(default, skip_serializing_if = "Option::is_none")]
33 pub parent_span_id: Option<String>,
34 #[serde(default, skip_serializing_if = "Option::is_none")]
35 pub correlation_id: Option<String>,
36 #[serde(default, skip_serializing_if = "Option::is_none")]
37 pub request_id: Option<String>,
38 pub session_id: Option<String>,
39 pub route_path: Option<String>,
40 pub event_name: Option<String>,
41 pub ok: bool,
42 pub latency_ms: Option<u64>,
43 pub bytes: Option<usize>,
44 pub count: Option<usize>,
45 #[serde(default)]
46 pub attributes: Map<String, Value>,
47}
48
49impl TelemetryEvent {
50 pub fn new(kind: TelemetryEventKind) -> Self {
51 Self {
52 kind,
53 trace_id: None,
54 span_id: None,
55 parent_span_id: None,
56 correlation_id: None,
57 request_id: None,
58 session_id: None,
59 route_path: None,
60 event_name: None,
61 ok: true,
62 latency_ms: None,
63 bytes: None,
64 count: None,
65 attributes: Map::new(),
66 }
67 }
68
69 pub fn with_session(mut self, session_id: impl Into<String>) -> Self {
70 self.session_id = Some(session_id.into());
71 self
72 }
73
74 pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
75 self.trace_id = Some(trace_id.into());
76 self
77 }
78
79 pub fn with_span_id(mut self, span_id: impl Into<String>) -> Self {
80 self.span_id = Some(span_id.into());
81 self
82 }
83
84 pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
85 self.parent_span_id = Some(parent_span_id.into());
86 self
87 }
88
89 pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
90 self.correlation_id = Some(correlation_id.into());
91 self
92 }
93
94 pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self {
95 self.request_id = Some(request_id.into());
96 self
97 }
98
99 pub fn with_route(mut self, route_path: impl Into<String>) -> Self {
100 self.route_path = Some(route_path.into());
101 self
102 }
103
104 pub fn with_event_name(mut self, event_name: impl Into<String>) -> Self {
105 self.event_name = Some(event_name.into());
106 self
107 }
108
109 pub fn with_latency_ms(mut self, latency_ms: u64) -> Self {
110 self.latency_ms = Some(latency_ms);
111 self
112 }
113
114 pub fn with_bytes(mut self, bytes: usize) -> Self {
115 self.bytes = Some(bytes);
116 self
117 }
118
119 pub fn with_count(mut self, count: usize) -> Self {
120 self.count = Some(count);
121 self
122 }
123
124 pub fn with_ok(mut self, ok: bool) -> Self {
125 self.ok = ok;
126 self
127 }
128
129 pub fn with_attribute(mut self, key: impl Into<String>, value: Value) -> Self {
130 self.attributes.insert(key.into(), value);
131 self
132 }
133}
134
135pub trait TelemetrySink: Send + Sync {
136 fn emit(&self, event: TelemetryEvent) -> Result<(), String>;
137}
138
139#[derive(Debug, Default)]
140pub struct NoopTelemetrySink;
141
142impl TelemetrySink for NoopTelemetrySink {
143 fn emit(&self, _event: TelemetryEvent) -> Result<(), String> {
144 Ok(())
145 }
146}
147
148#[derive(Debug, Default, Clone)]
149pub struct MemoryTelemetrySink {
150 events: Arc<Mutex<Vec<TelemetryEvent>>>,
151}
152
153impl MemoryTelemetrySink {
154 pub fn new() -> Self {
155 Self::default()
156 }
157
158 pub fn events(&self) -> Vec<TelemetryEvent> {
159 self.events.lock().expect("telemetry events mutex").clone()
160 }
161}
162
163impl TelemetrySink for MemoryTelemetrySink {
164 fn emit(&self, event: TelemetryEvent) -> Result<(), String> {
165 self.events
166 .lock()
167 .map_err(|_| "telemetry events mutex poisoned".to_string())?
168 .push(event);
169 Ok(())
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::{
176 MemoryTelemetrySink, NoopTelemetrySink, TelemetryEvent, TelemetryEventKind, TelemetrySink,
177 };
178 use serde_json::json;
179
180 #[test]
181 fn telemetry_event_builders_populate_all_optional_fields() {
182 let event = TelemetryEvent::new(TelemetryEventKind::HandleEvent)
183 .with_session("session-1")
184 .with_trace_id("trace-1")
185 .with_span_id("span-1")
186 .with_parent_span_id("parent-1")
187 .with_correlation_id("corr-1")
188 .with_request_id("req-1")
189 .with_route("/dashboard")
190 .with_event_name("refresh")
191 .with_latency_ms(42)
192 .with_bytes(512)
193 .with_count(3)
194 .with_ok(false)
195 .with_attribute("tenant", json!("acme"));
196
197 assert_eq!(event.session_id.as_deref(), Some("session-1"));
198 assert_eq!(event.trace_id.as_deref(), Some("trace-1"));
199 assert_eq!(event.span_id.as_deref(), Some("span-1"));
200 assert_eq!(event.parent_span_id.as_deref(), Some("parent-1"));
201 assert_eq!(event.correlation_id.as_deref(), Some("corr-1"));
202 assert_eq!(event.request_id.as_deref(), Some("req-1"));
203 assert_eq!(event.route_path.as_deref(), Some("/dashboard"));
204 assert_eq!(event.event_name.as_deref(), Some("refresh"));
205 assert_eq!(event.latency_ms, Some(42));
206 assert_eq!(event.bytes, Some(512));
207 assert_eq!(event.count, Some(3));
208 assert!(!event.ok);
209 assert_eq!(event.attributes.get("tenant"), Some(&json!("acme")));
210 }
211
212 #[test]
213 fn memory_sink_records_events_and_reports_poisoned_mutex() {
214 let sink = MemoryTelemetrySink::new();
215 sink.emit(TelemetryEvent::new(TelemetryEventKind::Connect))
216 .expect("first emit should succeed");
217 assert_eq!(sink.events().len(), 1);
218
219 let poisoned = sink.clone();
220 let _ = std::panic::catch_unwind(move || {
221 let _guard = poisoned.events.lock().expect("lock sink");
222 panic!("poison sink");
223 });
224
225 let err = sink
226 .emit(TelemetryEvent::new(TelemetryEventKind::Disconnect))
227 .expect_err("poisoned sink should fail");
228 assert!(err.contains("poisoned"));
229 }
230
231 #[test]
232 fn noop_sink_accepts_events() {
233 let sink = NoopTelemetrySink;
234 sink.emit(TelemetryEvent::new(TelemetryEventKind::Patch))
235 .expect("noop sink should always succeed");
236 }
237}