1use crate::types::{CustomEncoding, CustomPacketData, PacketMetadata};
50use serde::{Deserialize, Serialize};
51use serde_json::Value as JsonValue;
52use std::sync::atomic::{AtomicU64, Ordering};
53use std::time::{Instant, SystemTime, UNIX_EPOCH};
54use tokio::sync::mpsc;
55use ts_rs::TS;
56
57pub const TELEMETRY_TYPE_ID: &str = "core::telemetry/event@1";
59
60#[derive(Debug, Clone)]
71pub struct TelemetryEvent {
72 pub session_id: Option<String>,
74 pub node_id: String,
76 pub packet: CustomPacketData,
78}
79
80impl TelemetryEvent {
81 pub fn new(
86 session_id: Option<String>,
87 node_id: String,
88 event_data: JsonValue,
89 timestamp_us: u64,
90 ) -> Self {
91 Self {
92 session_id,
93 node_id,
94 packet: CustomPacketData {
95 type_id: TELEMETRY_TYPE_ID.to_string(),
96 encoding: CustomEncoding::Json,
97 data: event_data,
98 metadata: Some(PacketMetadata {
99 timestamp_us: Some(timestamp_us),
100 duration_us: None,
101 sequence: None,
102 }),
103 },
104 }
105 }
106
107 pub fn event_type(&self) -> Option<&str> {
109 self.packet.data.get("event_type").and_then(|v| v.as_str())
110 }
111
112 pub fn correlation_id(&self) -> Option<&str> {
114 self.packet.data.get("correlation_id").and_then(|v| v.as_str())
115 }
116
117 pub fn turn_id(&self) -> Option<&str> {
119 self.packet.data.get("turn_id").and_then(|v| v.as_str())
120 }
121
122 pub fn timestamp_us(&self) -> Option<u64> {
124 self.packet.metadata.as_ref().and_then(|m| m.timestamp_us)
125 }
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, TS)]
133#[ts(export)]
134pub struct TelemetryConfig {
135 pub enabled: bool,
137 pub buffer_size: usize,
139 pub redact_text: bool,
141 pub max_text_chars: usize,
143}
144
145impl Default for TelemetryConfig {
146 fn default() -> Self {
147 Self { enabled: true, buffer_size: 100, redact_text: false, max_text_chars: 100 }
148 }
149}
150
151pub struct TelemetryEmitter {
161 node_id: String,
162 session_id: Option<String>,
163 tx: Option<mpsc::Sender<TelemetryEvent>>,
164 dropped_full: AtomicU64,
166 dropped_rate_limit: AtomicU64,
168 last_health_emit: Instant,
170 rate_limit_state: std::sync::Mutex<RateLimitState>,
172}
173
174struct RateLimitState {
176 per_type: std::collections::HashMap<String, (Instant, u32)>,
178 window: std::time::Duration,
180 max_per_window: u32,
182}
183
184impl Default for RateLimitState {
185 fn default() -> Self {
186 Self {
187 per_type: std::collections::HashMap::new(),
188 window: std::time::Duration::from_secs(1),
189 max_per_window: 100, }
191 }
192}
193
194impl TelemetryEmitter {
195 const HEALTH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
197
198 pub fn new(
200 node_id: String,
201 session_id: Option<String>,
202 tx: Option<mpsc::Sender<TelemetryEvent>>,
203 ) -> Self {
204 Self {
205 node_id,
206 session_id,
207 tx,
208 dropped_full: AtomicU64::new(0),
209 dropped_rate_limit: AtomicU64::new(0),
210 last_health_emit: Instant::now(),
211 rate_limit_state: std::sync::Mutex::new(RateLimitState::default()),
212 }
213 }
214
215 #[allow(clippy::cast_possible_truncation)] fn now_us() -> u64 {
218 SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_micros() as u64).unwrap_or(0)
219 }
220
221 #[allow(clippy::expect_used)] fn should_rate_limit(&self, event_type: &str) -> bool {
224 let mut state = self.rate_limit_state.lock().expect("rate limit mutex poisoned");
225 let now = Instant::now();
226 let window = state.window;
227 let max_per_window = state.max_per_window;
228
229 let entry = state.per_type.entry(event_type.to_string()).or_insert((now, 0));
230
231 if now.duration_since(entry.0) >= window {
233 entry.0 = now;
234 entry.1 = 1;
235 drop(state);
236 return false;
237 }
238
239 if entry.1 >= max_per_window {
241 drop(state);
242 return true;
243 }
244
245 entry.1 += 1;
246 drop(state);
247 false
248 }
249
250 pub fn emit(&self, event_type: &str, data: JsonValue) -> bool {
254 self.emit_internal(event_type, None, None, data)
255 }
256
257 pub fn emit_with_correlation(
259 &self,
260 event_type: &str,
261 correlation_id: &str,
262 data: JsonValue,
263 ) -> bool {
264 self.emit_internal(event_type, Some(correlation_id), None, data)
265 }
266
267 pub fn emit_with_turn(&self, event_type: &str, turn_id: &str, data: JsonValue) -> bool {
269 self.emit_internal(event_type, None, Some(turn_id), data)
270 }
271
272 pub fn emit_correlated(
274 &self,
275 event_type: &str,
276 correlation_id: &str,
277 turn_id: &str,
278 data: JsonValue,
279 ) -> bool {
280 self.emit_internal(event_type, Some(correlation_id), Some(turn_id), data)
281 }
282
283 fn emit_internal(
285 &self,
286 event_type: &str,
287 correlation_id: Option<&str>,
288 turn_id: Option<&str>,
289 mut data: JsonValue,
290 ) -> bool {
291 let Some(ref tx) = self.tx else {
292 return false;
293 };
294
295 if self.should_rate_limit(event_type) {
297 self.dropped_rate_limit.fetch_add(1, Ordering::Relaxed);
298 return false;
299 }
300
301 if let Some(obj) = data.as_object_mut() {
303 obj.insert("event_type".to_string(), JsonValue::String(event_type.to_string()));
304 if let Some(cid) = correlation_id {
305 obj.insert("correlation_id".to_string(), JsonValue::String(cid.to_string()));
306 }
307 if let Some(tid) = turn_id {
308 obj.insert("turn_id".to_string(), JsonValue::String(tid.to_string()));
309 }
310 } else {
311 data = serde_json::json!({
313 "event_type": event_type,
314 "correlation_id": correlation_id,
315 "turn_id": turn_id,
316 "value": data,
317 });
318 }
319
320 let event = TelemetryEvent::new(
321 self.session_id.clone(),
322 self.node_id.clone(),
323 data,
324 Self::now_us(),
325 );
326
327 match tx.try_send(event) {
329 Ok(()) => true,
330 Err(mpsc::error::TrySendError::Full(_)) => {
331 self.dropped_full.fetch_add(1, Ordering::Relaxed);
332 false
333 },
334 Err(mpsc::error::TrySendError::Closed(_)) => false,
335 }
336 }
337
338 pub fn dropped_counts(&self) -> (u64, u64) {
340 (self.dropped_full.load(Ordering::Relaxed), self.dropped_rate_limit.load(Ordering::Relaxed))
341 }
342
343 pub fn maybe_emit_health(&mut self) -> bool {
347 let (dropped_full, dropped_rate_limit) = self.dropped_counts();
348 let has_drops = dropped_full > 0 || dropped_rate_limit > 0;
349 let interval_passed = self.last_health_emit.elapsed() >= Self::HEALTH_INTERVAL;
350
351 if !has_drops && !interval_passed {
352 return false;
353 }
354
355 if has_drops || interval_passed {
356 self.last_health_emit = Instant::now();
357
358 if has_drops {
360 let emitted = self.emit(
361 "telemetry.health",
362 serde_json::json!({
363 "dropped_due_to_full": dropped_full,
364 "dropped_due_to_rate_limit": dropped_rate_limit,
365 }),
366 );
367
368 if emitted {
370 self.dropped_full.store(0, Ordering::Relaxed);
371 self.dropped_rate_limit.store(0, Ordering::Relaxed);
372 }
373
374 return emitted;
375 }
376 }
377
378 false
379 }
380
381 #[allow(clippy::expect_used)] pub fn set_rate_limit(&self, max_per_second: u32) {
388 let mut state = self.rate_limit_state.lock().expect("rate limit mutex poisoned");
389 state.max_per_window = max_per_second;
390 state.window = std::time::Duration::from_secs(1);
391 }
392}
393
394pub mod telemetry_helpers {
397 use super::TelemetryEvent;
398 use serde_json::Value as JsonValue;
399 use std::time::{SystemTime, UNIX_EPOCH};
400 use tokio::sync::mpsc;
401
402 #[allow(clippy::cast_possible_truncation)] pub fn emit(
405 tx: &mpsc::Sender<TelemetryEvent>,
406 session_id: Option<String>,
407 node_id: &str,
408 event_type: &str,
409 data: &JsonValue,
410 ) -> bool {
411 let timestamp_us =
412 SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_micros() as u64).unwrap_or(0);
413
414 let event_data = data.as_object().map_or_else(
415 || {
416 serde_json::json!({
417 "event_type": event_type,
418 "value": data,
419 })
420 },
421 |obj| {
422 let mut obj = obj.clone();
423 obj.insert("event_type".to_string(), JsonValue::String(event_type.to_string()));
424 JsonValue::Object(obj)
425 },
426 );
427
428 let event = TelemetryEvent::new(session_id, node_id.to_string(), event_data, timestamp_us);
429
430 tx.try_send(event).is_ok()
431 }
432}
433
434#[cfg(test)]
435#[allow(clippy::unwrap_used, clippy::unreadable_literal)]
436mod tests {
437 use super::*;
438
439 #[test]
440 fn test_telemetry_event_creation() {
441 let event = TelemetryEvent::new(
442 Some("session-123".to_string()),
443 "test-node".to_string(),
444 serde_json::json!({
445 "event_type": "test.event",
446 "correlation_id": "corr-456",
447 }),
448 1234567890,
449 );
450
451 assert_eq!(event.session_id, Some("session-123".to_string()));
452 assert_eq!(event.node_id, "test-node");
453 assert_eq!(event.event_type(), Some("test.event"));
454 assert_eq!(event.correlation_id(), Some("corr-456"));
455 assert_eq!(event.timestamp_us(), Some(1234567890));
456 assert_eq!(event.packet.type_id, TELEMETRY_TYPE_ID);
457 }
458
459 #[test]
460 fn test_telemetry_config_default() {
461 let config = TelemetryConfig::default();
462 assert!(config.enabled);
463 assert_eq!(config.buffer_size, 100);
464 assert!(!config.redact_text);
465 assert_eq!(config.max_text_chars, 100);
466 }
467
468 #[tokio::test]
469 async fn test_emitter_basic() {
470 let (tx, mut rx) = mpsc::channel(10);
471 let emitter =
472 TelemetryEmitter::new("node-1".to_string(), Some("session-1".to_string()), Some(tx));
473
474 assert!(emitter.emit("test.event", serde_json::json!({ "key": "value" })));
475
476 let event = rx.recv().await.unwrap();
477 assert_eq!(event.node_id, "node-1");
478 assert_eq!(event.session_id, Some("session-1".to_string()));
479 assert_eq!(event.event_type(), Some("test.event"));
480 assert_eq!(event.packet.data.get("key").and_then(|v| v.as_str()), Some("value"));
481 }
482
483 #[tokio::test]
484 async fn test_emitter_drop_accounting() {
485 let (tx, _rx) = mpsc::channel(1);
487 let emitter = TelemetryEmitter::new("node-1".to_string(), None, Some(tx));
488
489 assert!(emitter.emit("event1", serde_json::json!({})));
491
492 assert!(!emitter.emit("event2", serde_json::json!({})));
494
495 let (dropped_full, _) = emitter.dropped_counts();
496 assert_eq!(dropped_full, 1);
497 }
498
499 #[test]
500 fn test_emitter_no_tx() {
501 let emitter = TelemetryEmitter::new("node-1".to_string(), None, None);
502
503 assert!(!emitter.emit("test.event", serde_json::json!({})));
505 }
506}