streamkit_core/
telemetry.rs

1// SPDX-FileCopyrightText: © 2025 StreamKit Contributors
2//
3// SPDX-License-Identifier: MPL-2.0
4
5//! Telemetry event emission and tracking.
6//!
7//! This module provides types and utilities for emitting structured telemetry events
8//! from nodes. Telemetry events are used for observability, debugging, and UI timelines.
9//!
10//! ## Design Principles
11//!
12//! - **Best-effort delivery**: Telemetry never blocks audio processing
13//! - **Wire-compatible**: Events wrap `CustomPacketData` for future "telemetry as track" support
14//! - **Rate-limited**: Emitters automatically throttle high-frequency events
15//! - **Drop accounting**: Track dropped events for health monitoring
16//!
17//! ## Event Type Convention
18//!
19//! All telemetry uses a single envelope type_id with `event_type` in the payload:
20//!
21//! ```text
22//! type_id: core::telemetry/event@1
23//! data: { event_type: "vad.start" | "stt.result" | "llm.response", ... }
24//! ```
25//!
26//! ## Usage Example
27//!
28//! ```ignore
29//! use streamkit_core::telemetry::TelemetryEmitter;
30//!
31//! // In node initialization
32//! let telemetry = TelemetryEmitter::new(
33//!     "my_node".to_string(),
34//!     context.session_id.clone(),
35//!     context.telemetry_tx.clone(),
36//! );
37//!
38//! // Emit a simple event
39//! telemetry.emit("processing.start", serde_json::json!({ "input_size": 1024 }));
40//!
41//! // Emit a correlated event (for grouping in UI)
42//! telemetry.emit_with_correlation(
43//!     "llm.response",
44//!     "turn-abc123",
45//!     serde_json::json!({ "latency_ms": 842, "output_chars": 456 })
46//! );
47//! ```
48
49use crate::types::{CustomEncoding, CustomPacketData, PacketMetadata};
50use serde::{Deserialize, Serialize};
51use serde_json::Value as JsonValue;
52use std::sync::atomic::{AtomicU64, Ordering};
53use std::time::{Instant, SystemTime, UNIX_EPOCH};
54use tokio::sync::mpsc;
55use ts_rs::TS;
56
57/// The standard type_id for all telemetry events.
58pub const TELEMETRY_TYPE_ID: &str = "core::telemetry/event@1";
59
60/// A telemetry event emitted by a node.
61///
62/// This wraps `CustomPacketData` to maintain wire-compatibility with the packet system,
63/// enabling future "telemetry as track" patterns where events flow through the graph.
64///
65/// ## Field Locations
66///
67/// - `session_id`, `node_id`: Envelope fields (not duplicated in packet data)
68/// - `event_type`, `correlation_id`, `turn_id`: Inside `packet.data`
69/// - `timestamp_us`: Inside `packet.metadata`
70#[derive(Debug, Clone)]
71pub struct TelemetryEvent {
72    /// Session this event belongs to (for future shared bus / cross-session sinks)
73    pub session_id: Option<String>,
74    /// The node that emitted this event (canonical source, not in packet data)
75    pub node_id: String,
76    /// The telemetry payload wrapped as a Custom packet for wire compatibility
77    pub packet: CustomPacketData,
78}
79
80impl TelemetryEvent {
81    /// Create a new telemetry event.
82    ///
83    /// The `event_data` should contain at minimum `event_type`. Additional fields
84    /// like `correlation_id`, `turn_id`, and event-specific data can be included.
85    pub fn new(
86        session_id: Option<String>,
87        node_id: String,
88        event_data: JsonValue,
89        timestamp_us: u64,
90    ) -> Self {
91        Self {
92            session_id,
93            node_id,
94            packet: CustomPacketData {
95                type_id: TELEMETRY_TYPE_ID.to_string(),
96                encoding: CustomEncoding::Json,
97                data: event_data,
98                metadata: Some(PacketMetadata {
99                    timestamp_us: Some(timestamp_us),
100                    duration_us: None,
101                    sequence: None,
102                }),
103            },
104        }
105    }
106
107    /// Extract the event_type from the packet data.
108    pub fn event_type(&self) -> Option<&str> {
109        self.packet.data.get("event_type").and_then(|v| v.as_str())
110    }
111
112    /// Extract the correlation_id from the packet data.
113    pub fn correlation_id(&self) -> Option<&str> {
114        self.packet.data.get("correlation_id").and_then(|v| v.as_str())
115    }
116
117    /// Extract the turn_id from the packet data.
118    pub fn turn_id(&self) -> Option<&str> {
119        self.packet.data.get("turn_id").and_then(|v| v.as_str())
120    }
121
122    /// Get the timestamp in microseconds.
123    pub fn timestamp_us(&self) -> Option<u64> {
124        self.packet.metadata.as_ref().and_then(|m| m.timestamp_us)
125    }
126}
127
128/// Configuration for telemetry behavior.
129///
130/// These settings control buffering, redaction, and rate limiting.
131/// Server-side redaction is applied before forwarding to WebSocket clients.
132#[derive(Debug, Clone, Serialize, Deserialize, TS)]
133#[ts(export)]
134pub struct TelemetryConfig {
135    /// Whether telemetry is enabled for this session
136    pub enabled: bool,
137    /// Maximum number of events to buffer for backlog replay (default: 100)
138    pub buffer_size: usize,
139    /// Whether to redact text content before forwarding to clients
140    pub redact_text: bool,
141    /// Maximum characters to include in text fields (truncated beyond this)
142    pub max_text_chars: usize,
143}
144
145impl Default for TelemetryConfig {
146    fn default() -> Self {
147        Self { enabled: true, buffer_size: 100, redact_text: false, max_text_chars: 100 }
148    }
149}
150
151/// Helper for emitting telemetry events from nodes.
152///
153/// Provides best-effort, non-blocking emission with automatic rate limiting
154/// and drop accounting. Uses `try_send()` to never block audio processing.
155///
156/// ## Drop Accounting
157///
158/// Dropped events are tracked and can be reported via health telemetry.
159/// Call `emit_health()` periodically to report dropped event counts.
160pub struct TelemetryEmitter {
161    node_id: String,
162    session_id: Option<String>,
163    tx: Option<mpsc::Sender<TelemetryEvent>>,
164    /// Events dropped because channel was full
165    dropped_full: AtomicU64,
166    /// Events dropped due to rate limiting
167    dropped_rate_limit: AtomicU64,
168    /// Last health emission time for throttling
169    last_health_emit: Instant,
170    /// Rate limiter state: (event_type_hash, last_emit_time, count_in_window)
171    rate_limit_state: std::sync::Mutex<RateLimitState>,
172}
173
174/// Internal rate limiting state
175struct RateLimitState {
176    /// Per-event-type tracking: event_type -> (last_emit_instant, count_in_window)
177    per_type: std::collections::HashMap<String, (Instant, u32)>,
178    /// Window duration for rate limiting
179    window: std::time::Duration,
180    /// Max events per window per event_type
181    max_per_window: u32,
182}
183
184impl Default for RateLimitState {
185    fn default() -> Self {
186        Self {
187            per_type: std::collections::HashMap::new(),
188            window: std::time::Duration::from_secs(1),
189            max_per_window: 100, // 100 events/sec per event_type by default
190        }
191    }
192}
193
194impl TelemetryEmitter {
195    /// Health emission interval (5 seconds)
196    const HEALTH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
197
198    /// Create a new telemetry emitter for a node.
199    pub fn new(
200        node_id: String,
201        session_id: Option<String>,
202        tx: Option<mpsc::Sender<TelemetryEvent>>,
203    ) -> Self {
204        Self {
205            node_id,
206            session_id,
207            tx,
208            dropped_full: AtomicU64::new(0),
209            dropped_rate_limit: AtomicU64::new(0),
210            last_health_emit: Instant::now(),
211            rate_limit_state: std::sync::Mutex::new(RateLimitState::default()),
212        }
213    }
214
215    /// Get current timestamp in microseconds since UNIX epoch.
216    #[allow(clippy::cast_possible_truncation)] // u64 microseconds covers ~500,000 years
217    fn now_us() -> u64 {
218        SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_micros() as u64).unwrap_or(0)
219    }
220
221    /// Check if an event should be rate-limited.
222    #[allow(clippy::expect_used)] // Mutex poisoning indicates a serious bug, panic is appropriate
223    fn should_rate_limit(&self, event_type: &str) -> bool {
224        let mut state = self.rate_limit_state.lock().expect("rate limit mutex poisoned");
225        let now = Instant::now();
226        let window = state.window;
227        let max_per_window = state.max_per_window;
228
229        let entry = state.per_type.entry(event_type.to_string()).or_insert((now, 0));
230
231        // Reset window if expired
232        if now.duration_since(entry.0) >= window {
233            entry.0 = now;
234            entry.1 = 1;
235            drop(state);
236            return false;
237        }
238
239        // Check if we're over the limit
240        if entry.1 >= max_per_window {
241            drop(state);
242            return true;
243        }
244
245        entry.1 += 1;
246        drop(state);
247        false
248    }
249
250    /// Best-effort emit a telemetry event. Never blocks.
251    ///
252    /// Returns `true` if the event was sent (or queued), `false` if dropped.
253    pub fn emit(&self, event_type: &str, data: JsonValue) -> bool {
254        self.emit_internal(event_type, None, None, data)
255    }
256
257    /// Emit an event with a correlation ID for grouping related events.
258    pub fn emit_with_correlation(
259        &self,
260        event_type: &str,
261        correlation_id: &str,
262        data: JsonValue,
263    ) -> bool {
264        self.emit_internal(event_type, Some(correlation_id), None, data)
265    }
266
267    /// Emit an event with a turn ID for voice agent conversation grouping.
268    pub fn emit_with_turn(&self, event_type: &str, turn_id: &str, data: JsonValue) -> bool {
269        self.emit_internal(event_type, None, Some(turn_id), data)
270    }
271
272    /// Emit an event with both correlation and turn IDs.
273    pub fn emit_correlated(
274        &self,
275        event_type: &str,
276        correlation_id: &str,
277        turn_id: &str,
278        data: JsonValue,
279    ) -> bool {
280        self.emit_internal(event_type, Some(correlation_id), Some(turn_id), data)
281    }
282
283    /// Internal emit implementation.
284    fn emit_internal(
285        &self,
286        event_type: &str,
287        correlation_id: Option<&str>,
288        turn_id: Option<&str>,
289        mut data: JsonValue,
290    ) -> bool {
291        let Some(ref tx) = self.tx else {
292            return false;
293        };
294
295        // Rate limiting check
296        if self.should_rate_limit(event_type) {
297            self.dropped_rate_limit.fetch_add(1, Ordering::Relaxed);
298            return false;
299        }
300
301        // Ensure data is an object and add standard fields
302        if let Some(obj) = data.as_object_mut() {
303            obj.insert("event_type".to_string(), JsonValue::String(event_type.to_string()));
304            if let Some(cid) = correlation_id {
305                obj.insert("correlation_id".to_string(), JsonValue::String(cid.to_string()));
306            }
307            if let Some(tid) = turn_id {
308                obj.insert("turn_id".to_string(), JsonValue::String(tid.to_string()));
309            }
310        } else {
311            // Wrap non-object data
312            data = serde_json::json!({
313                "event_type": event_type,
314                "correlation_id": correlation_id,
315                "turn_id": turn_id,
316                "value": data,
317            });
318        }
319
320        let event = TelemetryEvent::new(
321            self.session_id.clone(),
322            self.node_id.clone(),
323            data,
324            Self::now_us(),
325        );
326
327        // Best-effort send - never block
328        match tx.try_send(event) {
329            Ok(()) => true,
330            Err(mpsc::error::TrySendError::Full(_)) => {
331                self.dropped_full.fetch_add(1, Ordering::Relaxed);
332                false
333            },
334            Err(mpsc::error::TrySendError::Closed(_)) => false,
335        }
336    }
337
338    /// Get the current dropped event counts.
339    pub fn dropped_counts(&self) -> (u64, u64) {
340        (self.dropped_full.load(Ordering::Relaxed), self.dropped_rate_limit.load(Ordering::Relaxed))
341    }
342
343    /// Emit a health event if the interval has passed or if there are dropped events.
344    ///
345    /// Returns `true` if a health event was emitted.
346    pub fn maybe_emit_health(&mut self) -> bool {
347        let (dropped_full, dropped_rate_limit) = self.dropped_counts();
348        let has_drops = dropped_full > 0 || dropped_rate_limit > 0;
349        let interval_passed = self.last_health_emit.elapsed() >= Self::HEALTH_INTERVAL;
350
351        if !has_drops && !interval_passed {
352            return false;
353        }
354
355        if has_drops || interval_passed {
356            self.last_health_emit = Instant::now();
357
358            // Only emit if there's something to report
359            if has_drops {
360                let emitted = self.emit(
361                    "telemetry.health",
362                    serde_json::json!({
363                        "dropped_due_to_full": dropped_full,
364                        "dropped_due_to_rate_limit": dropped_rate_limit,
365                    }),
366                );
367
368                // Reset counters after emission
369                if emitted {
370                    self.dropped_full.store(0, Ordering::Relaxed);
371                    self.dropped_rate_limit.store(0, Ordering::Relaxed);
372                }
373
374                return emitted;
375            }
376        }
377
378        false
379    }
380
381    /// Configure rate limiting for this emitter.
382    ///
383    /// # Panics
384    ///
385    /// Panics if the internal rate limit mutex is poisoned (indicates a prior panic).
386    #[allow(clippy::expect_used)] // Mutex poisoning indicates a serious bug, panic is appropriate
387    pub fn set_rate_limit(&self, max_per_second: u32) {
388        let mut state = self.rate_limit_state.lock().expect("rate limit mutex poisoned");
389        state.max_per_window = max_per_second;
390        state.window = std::time::Duration::from_secs(1);
391    }
392}
393
394/// Helper functions for emitting telemetry events directly from a sender.
395/// These are lower-level functions for cases where you don't want to use `TelemetryEmitter`.
396pub mod telemetry_helpers {
397    use super::TelemetryEvent;
398    use serde_json::Value as JsonValue;
399    use std::time::{SystemTime, UNIX_EPOCH};
400    use tokio::sync::mpsc;
401
402    /// Emit a simple telemetry event.
403    #[allow(clippy::cast_possible_truncation)] // u64 microseconds covers ~500,000 years
404    pub fn emit(
405        tx: &mpsc::Sender<TelemetryEvent>,
406        session_id: Option<String>,
407        node_id: &str,
408        event_type: &str,
409        data: &JsonValue,
410    ) -> bool {
411        let timestamp_us =
412            SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_micros() as u64).unwrap_or(0);
413
414        let event_data = data.as_object().map_or_else(
415            || {
416                serde_json::json!({
417                    "event_type": event_type,
418                    "value": data,
419                })
420            },
421            |obj| {
422                let mut obj = obj.clone();
423                obj.insert("event_type".to_string(), JsonValue::String(event_type.to_string()));
424                JsonValue::Object(obj)
425            },
426        );
427
428        let event = TelemetryEvent::new(session_id, node_id.to_string(), event_data, timestamp_us);
429
430        tx.try_send(event).is_ok()
431    }
432}
433
434#[cfg(test)]
435#[allow(clippy::unwrap_used, clippy::unreadable_literal)]
436mod tests {
437    use super::*;
438
439    #[test]
440    fn test_telemetry_event_creation() {
441        let event = TelemetryEvent::new(
442            Some("session-123".to_string()),
443            "test-node".to_string(),
444            serde_json::json!({
445                "event_type": "test.event",
446                "correlation_id": "corr-456",
447            }),
448            1234567890,
449        );
450
451        assert_eq!(event.session_id, Some("session-123".to_string()));
452        assert_eq!(event.node_id, "test-node");
453        assert_eq!(event.event_type(), Some("test.event"));
454        assert_eq!(event.correlation_id(), Some("corr-456"));
455        assert_eq!(event.timestamp_us(), Some(1234567890));
456        assert_eq!(event.packet.type_id, TELEMETRY_TYPE_ID);
457    }
458
459    #[test]
460    fn test_telemetry_config_default() {
461        let config = TelemetryConfig::default();
462        assert!(config.enabled);
463        assert_eq!(config.buffer_size, 100);
464        assert!(!config.redact_text);
465        assert_eq!(config.max_text_chars, 100);
466    }
467
468    #[tokio::test]
469    async fn test_emitter_basic() {
470        let (tx, mut rx) = mpsc::channel(10);
471        let emitter =
472            TelemetryEmitter::new("node-1".to_string(), Some("session-1".to_string()), Some(tx));
473
474        assert!(emitter.emit("test.event", serde_json::json!({ "key": "value" })));
475
476        let event = rx.recv().await.unwrap();
477        assert_eq!(event.node_id, "node-1");
478        assert_eq!(event.session_id, Some("session-1".to_string()));
479        assert_eq!(event.event_type(), Some("test.event"));
480        assert_eq!(event.packet.data.get("key").and_then(|v| v.as_str()), Some("value"));
481    }
482
483    #[tokio::test]
484    async fn test_emitter_drop_accounting() {
485        // Create a channel with capacity 1
486        let (tx, _rx) = mpsc::channel(1);
487        let emitter = TelemetryEmitter::new("node-1".to_string(), None, Some(tx));
488
489        // First event should succeed
490        assert!(emitter.emit("event1", serde_json::json!({})));
491
492        // Second event should be dropped (channel full)
493        assert!(!emitter.emit("event2", serde_json::json!({})));
494
495        let (dropped_full, _) = emitter.dropped_counts();
496        assert_eq!(dropped_full, 1);
497    }
498
499    #[test]
500    fn test_emitter_no_tx() {
501        let emitter = TelemetryEmitter::new("node-1".to_string(), None, None);
502
503        // Should return false but not panic
504        assert!(!emitter.emit("test.event", serde_json::json!({})));
505    }
506}