shelly-liveview 0.6.0

Core runtime primitives for Shelly LiveView.
Documentation
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::sync::{Arc, Mutex};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TelemetryEventKind {
    Connect,
    Disconnect,
    Mount,
    HandleEvent,
    Patch,
    Diff,
    RenderCadence,
    StreamInsert,
    StreamDelete,
    PubSubFanout,
    UploadLifecycle,
    SecurityAudit,
    JobOutcome,
    MigrationOutcome,
    Error,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct TelemetryEvent {
    pub kind: TelemetryEventKind,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub trace_id: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub span_id: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub parent_span_id: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub correlation_id: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub request_id: Option<String>,
    pub session_id: Option<String>,
    pub route_path: Option<String>,
    pub event_name: Option<String>,
    pub ok: bool,
    pub latency_ms: Option<u64>,
    pub bytes: Option<usize>,
    pub count: Option<usize>,
    #[serde(default)]
    pub attributes: Map<String, Value>,
}

impl TelemetryEvent {
    pub fn new(kind: TelemetryEventKind) -> Self {
        Self {
            kind,
            trace_id: None,
            span_id: None,
            parent_span_id: None,
            correlation_id: None,
            request_id: None,
            session_id: None,
            route_path: None,
            event_name: None,
            ok: true,
            latency_ms: None,
            bytes: None,
            count: None,
            attributes: Map::new(),
        }
    }

    pub fn with_session(mut self, session_id: impl Into<String>) -> Self {
        self.session_id = Some(session_id.into());
        self
    }

    pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
        self.trace_id = Some(trace_id.into());
        self
    }

    pub fn with_span_id(mut self, span_id: impl Into<String>) -> Self {
        self.span_id = Some(span_id.into());
        self
    }

    pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
        self.parent_span_id = Some(parent_span_id.into());
        self
    }

    pub fn with_correlation_id(mut self, correlation_id: impl Into<String>) -> Self {
        self.correlation_id = Some(correlation_id.into());
        self
    }

    pub fn with_request_id(mut self, request_id: impl Into<String>) -> Self {
        self.request_id = Some(request_id.into());
        self
    }

    pub fn with_route(mut self, route_path: impl Into<String>) -> Self {
        self.route_path = Some(route_path.into());
        self
    }

    pub fn with_event_name(mut self, event_name: impl Into<String>) -> Self {
        self.event_name = Some(event_name.into());
        self
    }

    pub fn with_latency_ms(mut self, latency_ms: u64) -> Self {
        self.latency_ms = Some(latency_ms);
        self
    }

    pub fn with_bytes(mut self, bytes: usize) -> Self {
        self.bytes = Some(bytes);
        self
    }

    pub fn with_count(mut self, count: usize) -> Self {
        self.count = Some(count);
        self
    }

    pub fn with_ok(mut self, ok: bool) -> Self {
        self.ok = ok;
        self
    }

    pub fn with_attribute(mut self, key: impl Into<String>, value: Value) -> Self {
        self.attributes.insert(key.into(), value);
        self
    }
}

pub trait TelemetrySink: Send + Sync {
    fn emit(&self, event: TelemetryEvent) -> Result<(), String>;
}

#[derive(Debug, Default)]
pub struct NoopTelemetrySink;

impl TelemetrySink for NoopTelemetrySink {
    fn emit(&self, _event: TelemetryEvent) -> Result<(), String> {
        Ok(())
    }
}

#[derive(Debug, Default, Clone)]
pub struct MemoryTelemetrySink {
    events: Arc<Mutex<Vec<TelemetryEvent>>>,
}

impl MemoryTelemetrySink {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn events(&self) -> Vec<TelemetryEvent> {
        self.events.lock().expect("telemetry events mutex").clone()
    }
}

impl TelemetrySink for MemoryTelemetrySink {
    fn emit(&self, event: TelemetryEvent) -> Result<(), String> {
        self.events
            .lock()
            .map_err(|_| "telemetry events mutex poisoned".to_string())?
            .push(event);
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::{
        MemoryTelemetrySink, NoopTelemetrySink, TelemetryEvent, TelemetryEventKind, TelemetrySink,
    };
    use serde_json::json;

    #[test]
    fn telemetry_event_builders_populate_all_optional_fields() {
        let event = TelemetryEvent::new(TelemetryEventKind::HandleEvent)
            .with_session("session-1")
            .with_trace_id("trace-1")
            .with_span_id("span-1")
            .with_parent_span_id("parent-1")
            .with_correlation_id("corr-1")
            .with_request_id("req-1")
            .with_route("/dashboard")
            .with_event_name("refresh")
            .with_latency_ms(42)
            .with_bytes(512)
            .with_count(3)
            .with_ok(false)
            .with_attribute("tenant", json!("acme"));

        assert_eq!(event.session_id.as_deref(), Some("session-1"));
        assert_eq!(event.trace_id.as_deref(), Some("trace-1"));
        assert_eq!(event.span_id.as_deref(), Some("span-1"));
        assert_eq!(event.parent_span_id.as_deref(), Some("parent-1"));
        assert_eq!(event.correlation_id.as_deref(), Some("corr-1"));
        assert_eq!(event.request_id.as_deref(), Some("req-1"));
        assert_eq!(event.route_path.as_deref(), Some("/dashboard"));
        assert_eq!(event.event_name.as_deref(), Some("refresh"));
        assert_eq!(event.latency_ms, Some(42));
        assert_eq!(event.bytes, Some(512));
        assert_eq!(event.count, Some(3));
        assert!(!event.ok);
        assert_eq!(event.attributes.get("tenant"), Some(&json!("acme")));
    }

    #[test]
    fn memory_sink_records_events_and_reports_poisoned_mutex() {
        let sink = MemoryTelemetrySink::new();
        sink.emit(TelemetryEvent::new(TelemetryEventKind::Connect))
            .expect("first emit should succeed");
        assert_eq!(sink.events().len(), 1);

        let poisoned = sink.clone();
        let _ = std::panic::catch_unwind(move || {
            let _guard = poisoned.events.lock().expect("lock sink");
            panic!("poison sink");
        });

        let err = sink
            .emit(TelemetryEvent::new(TelemetryEventKind::Disconnect))
            .expect_err("poisoned sink should fail");
        assert!(err.contains("poisoned"));
    }

    #[test]
    fn noop_sink_accepts_events() {
        let sink = NoopTelemetrySink;
        sink.emit(TelemetryEvent::new(TelemetryEventKind::Patch))
            .expect("noop sink should always succeed");
    }
}