use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TelemetryEventKind {
Connect,
Disconnect,
Mount,
HandleEvent,
Patch,
Diff,
RenderCadence,
StreamInsert,
StreamDelete,
PubSubFanout,
UploadLifecycle,
SecurityAudit,
JobOutcome,
MigrationOutcome,
Error,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TelemetryEvent {
pub kind: TelemetryEventKind,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub trace_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub span_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parent_span_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub correlation_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub request_id: Option<String>,
pub session_id: Option<String>,
pub route_path: Option<String>,
pub event_name: Option<String>,
pub ok: bool,
pub latency_ms: Option<u64>,
pub bytes: Option<usize>,
pub count: Option<usize>,
#[serde(default)]
pub attributes: Map<String, Value>,
}
impl TelemetryEvent {
pub fn new(kind: TelemetryEventKind) -> Self {
Self {
kind,
trace_id: None,
span_id: None,
parent_span_id: None,
correlation_id: None,
request_id: None,
session_id: None,
route_path: None,
event_name: None,
ok: true,
latency_ms: None,
bytes: None,
count: None,
attributes: Map::new(),
}
}
pub fn with_session(mut self, session_id: impl Into<String>) -> Self {
self.session_id = Some(session_id.into());
self
}
pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
self.trace_id = Some(trace_id.into());
self
}
pub fn with_span_id(mut self, span_id: impl Into<String>) -> Self {
self.span_id = Some(span_id.into());
self
}
pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
self.parent_span_id = Some(parent_span_id.into());
self
}
pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
self.correlation_id = Some(correlation_id.into());
self
}
pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self {
self.request_id = Some(request_id.into());
self
}
pub fn with_route(mut self, route_path: impl Into<String>) -> Self {
self.route_path = Some(route_path.into());
self
}
pub fn with_event_name(mut self, event_name: impl Into<String>) -> Self {
self.event_name = Some(event_name.into());
self
}
pub fn with_latency_ms(mut self, latency_ms: u64) -> Self {
self.latency_ms = Some(latency_ms);
self
}
pub fn with_bytes(mut self, bytes: usize) -> Self {
self.bytes = Some(bytes);
self
}
pub fn with_count(mut self, count: usize) -> Self {
self.count = Some(count);
self
}
pub fn with_ok(mut self, ok: bool) -> Self {
self.ok = ok;
self
}
pub fn with_attribute(mut self, key: impl Into<String>, value: Value) -> Self {
self.attributes.insert(key.into(), value);
self
}
}
pub trait TelemetrySink: Send + Sync {
fn emit(&self, event: TelemetryEvent) -> Result<(), String>;
}
#[derive(Debug, Default)]
pub struct NoopTelemetrySink;
impl TelemetrySink for NoopTelemetrySink {
fn emit(&self, _event: TelemetryEvent) -> Result<(), String> {
Ok(())
}
}
#[derive(Debug, Default, Clone)]
pub struct MemoryTelemetrySink {
events: Arc<Mutex<Vec<TelemetryEvent>>>,
}
impl MemoryTelemetrySink {
pub fn new() -> Self {
Self::default()
}
pub fn events(&self) -> Vec<TelemetryEvent> {
self.events.lock().expect("telemetry events mutex").clone()
}
}
impl TelemetrySink for MemoryTelemetrySink {
fn emit(&self, event: TelemetryEvent) -> Result<(), String> {
self.events
.lock()
.map_err(|_| "telemetry events mutex poisoned".to_string())?
.push(event);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{
MemoryTelemetrySink, NoopTelemetrySink, TelemetryEvent, TelemetryEventKind, TelemetrySink,
};
use serde_json::json;
#[test]
fn telemetry_event_builders_populate_all_optional_fields() {
let event = TelemetryEvent::new(TelemetryEventKind::HandleEvent)
.with_session("session-1")
.with_trace_id("trace-1")
.with_span_id("span-1")
.with_parent_span_id("parent-1")
.with_correlation_id("corr-1")
.with_request_id("req-1")
.with_route("/dashboard")
.with_event_name("refresh")
.with_latency_ms(42)
.with_bytes(512)
.with_count(3)
.with_ok(false)
.with_attribute("tenant", json!("acme"));
assert_eq!(event.session_id.as_deref(), Some("session-1"));
assert_eq!(event.trace_id.as_deref(), Some("trace-1"));
assert_eq!(event.span_id.as_deref(), Some("span-1"));
assert_eq!(event.parent_span_id.as_deref(), Some("parent-1"));
assert_eq!(event.correlation_id.as_deref(), Some("corr-1"));
assert_eq!(event.request_id.as_deref(), Some("req-1"));
assert_eq!(event.route_path.as_deref(), Some("/dashboard"));
assert_eq!(event.event_name.as_deref(), Some("refresh"));
assert_eq!(event.latency_ms, Some(42));
assert_eq!(event.bytes, Some(512));
assert_eq!(event.count, Some(3));
assert!(!event.ok);
assert_eq!(event.attributes.get("tenant"), Some(&json!("acme")));
}
#[test]
fn memory_sink_records_events_and_reports_poisoned_mutex() {
let sink = MemoryTelemetrySink::new();
sink.emit(TelemetryEvent::new(TelemetryEventKind::Connect))
.expect("first emit should succeed");
assert_eq!(sink.events().len(), 1);
let poisoned = sink.clone();
let _ = std::panic::catch_unwind(move || {
let _guard = poisoned.events.lock().expect("lock sink");
panic!("poison sink");
});
let err = sink
.emit(TelemetryEvent::new(TelemetryEventKind::Disconnect))
.expect_err("poisoned sink should fail");
assert!(err.contains("poisoned"));
}
#[test]
fn noop_sink_accepts_events() {
let sink = NoopTelemetrySink;
sink.emit(TelemetryEvent::new(TelemetryEventKind::Patch))
.expect("noop sink should always succeed");
}
}