use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TelemetryEventKind {
Connect,
Disconnect,
Mount,
HandleEvent,
Patch,
Diff,
StreamInsert,
StreamDelete,
PubSubFanout,
UploadLifecycle,
SecurityAudit,
JobOutcome,
MigrationOutcome,
Error,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TelemetryEvent {
pub kind: TelemetryEventKind,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub trace_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub span_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parent_span_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub correlation_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub request_id: Option<String>,
pub session_id: Option<String>,
pub route_path: Option<String>,
pub event_name: Option<String>,
pub ok: bool,
pub latency_ms: Option<u64>,
pub bytes: Option<usize>,
pub count: Option<usize>,
#[serde(default)]
pub attributes: Map<String, Value>,
}
impl TelemetryEvent {
pub fn new(kind: TelemetryEventKind) -> Self {
Self {
kind,
trace_id: None,
span_id: None,
parent_span_id: None,
correlation_id: None,
request_id: None,
session_id: None,
route_path: None,
event_name: None,
ok: true,
latency_ms: None,
bytes: None,
count: None,
attributes: Map::new(),
}
}
pub fn with_session(mut self, session_id: impl Into<String>) -> Self {
self.session_id = Some(session_id.into());
self
}
pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
self.trace_id = Some(trace_id.into());
self
}
pub fn with_span_id(mut self, span_id: impl Into<String>) -> Self {
self.span_id = Some(span_id.into());
self
}
pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
self.parent_span_id = Some(parent_span_id.into());
self
}
pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
self.correlation_id = Some(correlation_id.into());
self
}
pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self {
self.request_id = Some(request_id.into());
self
}
pub fn with_route(mut self, route_path: impl Into<String>) -> Self {
self.route_path = Some(route_path.into());
self
}
pub fn with_event_name(mut self, event_name: impl Into<String>) -> Self {
self.event_name = Some(event_name.into());
self
}
pub fn with_latency_ms(mut self, latency_ms: u64) -> Self {
self.latency_ms = Some(latency_ms);
self
}
pub fn with_bytes(mut self, bytes: usize) -> Self {
self.bytes = Some(bytes);
self
}
pub fn with_count(mut self, count: usize) -> Self {
self.count = Some(count);
self
}
pub fn with_ok(mut self, ok: bool) -> Self {
self.ok = ok;
self
}
pub fn with_attribute(mut self, key: impl Into<String>, value: Value) -> Self {
self.attributes.insert(key.into(), value);
self
}
}
pub trait TelemetrySink: Send + Sync {
fn emit(&self, event: TelemetryEvent) -> Result<(), String>;
}
#[derive(Debug, Default)]
pub struct NoopTelemetrySink;
impl TelemetrySink for NoopTelemetrySink {
fn emit(&self, _event: TelemetryEvent) -> Result<(), String> {
Ok(())
}
}
#[derive(Debug, Default, Clone)]
pub struct MemoryTelemetrySink {
events: Arc<Mutex<Vec<TelemetryEvent>>>,
}
impl MemoryTelemetrySink {
pub fn new() -> Self {
Self::default()
}
pub fn events(&self) -> Vec<TelemetryEvent> {
self.events.lock().expect("telemetry events mutex").clone()
}
}
impl TelemetrySink for MemoryTelemetrySink {
fn emit(&self, event: TelemetryEvent) -> Result<(), String> {
self.events
.lock()
.map_err(|_| "telemetry events mutex poisoned".to_string())?
.push(event);
Ok(())
}
}