Skip to main content

embeddenator_obs/obs/
telemetry.rs

1//! Telemetry Aggregation and Export
2//!
3//! Collects and aggregates observability data for export to monitoring
4//! systems. Provides periodic snapshots and structured export formats.
5//!
6//! # Features
7//!
8//! - Periodic metric snapshots
9//! - JSON export for external systems
10//! - Performance counter aggregation
11//! - Telemetry collection intervals
12//! - Low-overhead sampling
13//!
14//! # Usage
15//!
16//! ```rust,ignore
17//! use embeddenator_obs::telemetry::{Telemetry, TelemetryConfig};
18//!
19//! let config = TelemetryConfig::default();
20//! let mut telemetry = Telemetry::new(config);
21//!
22//! // Record operations
23//! telemetry.record_operation("query", 1250); // microseconds
24//! telemetry.increment_counter("cache_hits");
25//!
26//! // Get snapshot for export
27//! let snapshot = telemetry.snapshot();
28//! println!("{}", snapshot.to_json());
29//! ```
30
31use crate::metrics::MetricsSnapshot;
32use std::collections::HashMap;
33use std::time::{Duration, Instant};
34
35/// Telemetry aggregation configuration.
36#[derive(Debug, Clone)]
37pub struct TelemetryConfig {
38    /// Enable telemetry collection
39    pub enabled: bool,
40    /// Sample rate (0.0 to 1.0, where 1.0 = 100%)
41    pub sample_rate: f64,
42    /// Snapshot interval
43    pub snapshot_interval: Duration,
44    /// Maximum history to retain
45    pub max_history_entries: usize,
46}
47
48impl Default for TelemetryConfig {
49    fn default() -> Self {
50        Self {
51            enabled: true,
52            sample_rate: 1.0,
53            snapshot_interval: Duration::from_secs(60),
54            max_history_entries: 100,
55        }
56    }
57}
58
59/// Main telemetry collector.
60pub struct Telemetry {
61    config: TelemetryConfig,
62    start_time: Instant,
63    operation_timings: HashMap<String, OperationStats>,
64    counters: HashMap<String, u64>,
65    gauges: HashMap<String, f64>,
66    last_snapshot: Instant,
67}
68
69impl Telemetry {
70    /// Create new telemetry collector.
71    pub fn new(config: TelemetryConfig) -> Self {
72        Self {
73            config,
74            start_time: Instant::now(),
75            operation_timings: HashMap::new(),
76            counters: HashMap::new(),
77            gauges: HashMap::new(),
78            last_snapshot: Instant::now(),
79        }
80    }
81
82    /// Create with default configuration.
83    pub fn default_config() -> Self {
84        Self::new(TelemetryConfig::default())
85    }
86
87    /// Record operation timing (microseconds).
88    pub fn record_operation(&mut self, name: &str, duration_us: u64) {
89        if !self.config.enabled {
90            return;
91        }
92
93        let stats = self
94            .operation_timings
95            .entry(name.to_string())
96            .or_insert_with(OperationStats::new);
97
98        stats.record(duration_us);
99    }
100
101    /// Increment a counter.
102    pub fn increment_counter(&mut self, name: &str) {
103        if !self.config.enabled {
104            return;
105        }
106
107        *self.counters.entry(name.to_string()).or_insert(0) += 1;
108    }
109
110    /// Add to a counter.
111    pub fn add_to_counter(&mut self, name: &str, value: u64) {
112        if !self.config.enabled {
113            return;
114        }
115
116        *self.counters.entry(name.to_string()).or_insert(0) += value;
117    }
118
119    /// Set gauge value.
120    pub fn set_gauge(&mut self, name: &str, value: f64) {
121        if !self.config.enabled {
122            return;
123        }
124
125        self.gauges.insert(name.to_string(), value);
126    }
127
128    /// Get current snapshot.
129    pub fn snapshot(&self) -> TelemetrySnapshot {
130        let uptime = self.start_time.elapsed();
131        let since_last = self.last_snapshot.elapsed();
132
133        TelemetrySnapshot {
134            timestamp_secs: uptime.as_secs(),
135            uptime_secs: uptime.as_secs(),
136            since_last_snapshot_secs: since_last.as_secs(),
137            operation_stats: self.operation_timings.clone(),
138            counters: self.counters.clone(),
139            gauges: self.gauges.clone(),
140            metrics: crate::metrics::metrics().snapshot(),
141        }
142    }
143
144    /// Reset all collected data (useful for testing or periodic resets).
145    pub fn reset(&mut self) {
146        self.operation_timings.clear();
147        self.counters.clear();
148        self.gauges.clear();
149        self.last_snapshot = Instant::now();
150    }
151
152    /// Get uptime in seconds.
153    pub fn uptime_secs(&self) -> u64 {
154        self.start_time.elapsed().as_secs()
155    }
156}
157
158/// Statistics for a single operation type.
159#[derive(Debug, Clone)]
160pub struct OperationStats {
161    pub count: u64,
162    pub total_us: u64,
163    pub min_us: u64,
164    pub max_us: u64,
165    pub last_us: u64,
166    /// Histogram buckets for percentile calculation (microseconds)
167    pub histogram: Vec<u64>,
168    /// Sum of squares for variance calculation
169    pub sum_of_squares: f64,
170}
171
172impl OperationStats {
173    fn new() -> Self {
174        Self {
175            count: 0,
176            total_us: 0,
177            min_us: u64::MAX,
178            max_us: 0,
179            last_us: 0,
180            histogram: Vec::new(),
181            sum_of_squares: 0.0,
182        }
183    }
184
185    fn record(&mut self, duration_us: u64) {
186        self.count += 1;
187        self.total_us += duration_us;
188        self.min_us = self.min_us.min(duration_us);
189        self.max_us = self.max_us.max(duration_us);
190        self.last_us = duration_us;
191
192        // Update histogram (limited to 10000 samples for memory efficiency)
193        if self.histogram.len() < 10000 {
194            self.histogram.push(duration_us);
195        }
196
197        // Update sum of squares for variance calculation
198        let val = duration_us as f64;
199        self.sum_of_squares += val * val;
200    }
201
202    /// Calculate average duration.
203    pub fn avg_us(&self) -> f64 {
204        if self.count == 0 {
205            0.0
206        } else {
207            self.total_us as f64 / self.count as f64
208        }
209    }
210
211    /// Calculate operations per second (estimate based on total time).
212    pub fn ops_per_sec(&self) -> f64 {
213        if self.total_us == 0 {
214            0.0
215        } else {
216            (self.count as f64 * 1_000_000.0) / self.total_us as f64
217        }
218    }
219
220    /// Calculate standard deviation.
221    pub fn std_dev_us(&self) -> f64 {
222        if self.count < 2 {
223            return 0.0;
224        }
225
226        let mean = self.avg_us();
227        let variance = (self.sum_of_squares / self.count as f64) - (mean * mean);
228        variance.max(0.0).sqrt()
229    }
230
231    /// Calculate percentile from histogram (requires sorted data).
232    pub fn percentile(&self, p: f64) -> u64 {
233        if self.histogram.is_empty() {
234            return 0;
235        }
236
237        let mut sorted = self.histogram.clone();
238        sorted.sort_unstable();
239
240        let idx = ((p / 100.0) * (sorted.len() - 1) as f64).round() as usize;
241        sorted[idx.min(sorted.len() - 1)]
242    }
243
244    /// Calculate median (P50).
245    pub fn median_us(&self) -> u64 {
246        self.percentile(50.0)
247    }
248
249    /// Calculate P95.
250    pub fn p95_us(&self) -> u64 {
251        self.percentile(95.0)
252    }
253
254    /// Calculate P99.
255    pub fn p99_us(&self) -> u64 {
256        self.percentile(99.0)
257    }
258
259    /// Count samples below threshold (for Prometheus histogram buckets).
260    pub fn count_below(&self, threshold_us: u64) -> u64 {
261        self.histogram.iter().filter(|&&x| x < threshold_us).count() as u64
262    }
263}
264
265/// Point-in-time telemetry snapshot.
266#[derive(Debug, Clone)]
267pub struct TelemetrySnapshot {
268    pub timestamp_secs: u64,
269    pub uptime_secs: u64,
270    pub since_last_snapshot_secs: u64,
271    pub operation_stats: HashMap<String, OperationStats>,
272    pub counters: HashMap<String, u64>,
273    pub gauges: HashMap<String, f64>,
274    pub metrics: MetricsSnapshot,
275}
276
277impl TelemetrySnapshot {
278    /// Export as JSON string (requires serde feature).
279    #[cfg(feature = "telemetry")]
280    pub fn to_json(&self) -> String {
281        use std::fmt::Write;
282
283        let mut json = String::new();
284        writeln!(json, "{{").unwrap();
285        writeln!(json, r#"  "timestamp_secs": {},"#, self.timestamp_secs).unwrap();
286        writeln!(json, r#"  "uptime_secs": {},"#, self.uptime_secs).unwrap();
287        writeln!(
288            json,
289            r#"  "since_last_snapshot_secs": {},"#,
290            self.since_last_snapshot_secs
291        )
292        .unwrap();
293
294        // Operations
295        writeln!(json, r#"  "operations": {{"#).unwrap();
296        for (i, (name, stats)) in self.operation_stats.iter().enumerate() {
297            let comma = if i < self.operation_stats.len() - 1 {
298                ","
299            } else {
300                ""
301            };
302            writeln!(json, r#"    "{}": {{"#, name).unwrap();
303            writeln!(json, r#"      "count": {},"#, stats.count).unwrap();
304            writeln!(json, r#"      "avg_us": {:.2},"#, stats.avg_us()).unwrap();
305            writeln!(json, r#"      "min_us": {},"#, stats.min_us).unwrap();
306            writeln!(json, r#"      "max_us": {}"#, stats.max_us).unwrap();
307            writeln!(json, r#"    }}{}"#, comma).unwrap();
308        }
309        writeln!(json, r#"  }},"#).unwrap();
310
311        // Counters
312        writeln!(json, r#"  "counters": {{"#).unwrap();
313        for (i, (name, value)) in self.counters.iter().enumerate() {
314            let comma = if i < self.counters.len() - 1 { "," } else { "" };
315            writeln!(json, r#"    "{}": {}{}"#, name, value, comma).unwrap();
316        }
317        writeln!(json, r#"  }},"#).unwrap();
318
319        // Gauges
320        writeln!(json, r#"  "gauges": {{"#).unwrap();
321        for (i, (name, value)) in self.gauges.iter().enumerate() {
322            let comma = if i < self.gauges.len() - 1 { "," } else { "" };
323            writeln!(json, r#"    "{}": {:.4}{}"#, name, value, comma).unwrap();
324        }
325        writeln!(json, r#"  }}"#).unwrap();
326
327        writeln!(json, "}}").unwrap();
328        json
329    }
330
331    #[cfg(not(feature = "telemetry"))]
332    pub fn to_json(&self) -> String {
333        "{{}}".to_string()
334    }
335
336    /// Format as human-readable summary.
337    pub fn summary(&self) -> String {
338        let mut output = String::new();
339        output.push_str(&format!(
340            "=== Telemetry Snapshot (uptime: {}s) ===\n",
341            self.uptime_secs
342        ));
343
344        if !self.operation_stats.is_empty() {
345            output.push_str("\nOperations:\n");
346            for (name, stats) in &self.operation_stats {
347                output.push_str(&format!(
348                    "  {}: count={}, avg={:.2}µs, min={}µs, max={}µs\n",
349                    name,
350                    stats.count,
351                    stats.avg_us(),
352                    stats.min_us,
353                    stats.max_us
354                ));
355            }
356        }
357
358        if !self.counters.is_empty() {
359            output.push_str("\nCounters:\n");
360            for (name, value) in &self.counters {
361                output.push_str(&format!("  {}: {}\n", name, value));
362            }
363        }
364
365        if !self.gauges.is_empty() {
366            output.push_str("\nGauges:\n");
367            for (name, value) in &self.gauges {
368                output.push_str(&format!("  {}: {:.4}\n", name, value));
369            }
370        }
371
372        output
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379
380    #[test]
381    fn test_telemetry_basic() {
382        let mut telemetry = Telemetry::default_config();
383
384        telemetry.record_operation("query", 1500);
385        telemetry.record_operation("query", 2000);
386        telemetry.increment_counter("cache_hits");
387        telemetry.set_gauge("memory_mb", 256.5);
388
389        let snapshot = telemetry.snapshot();
390        assert_eq!(snapshot.counters.get("cache_hits"), Some(&1));
391        assert_eq!(snapshot.gauges.get("memory_mb"), Some(&256.5));
392
393        let query_stats = snapshot.operation_stats.get("query").unwrap();
394        assert_eq!(query_stats.count, 2);
395        assert_eq!(query_stats.min_us, 1500);
396        assert_eq!(query_stats.max_us, 2000);
397    }
398
399    #[test]
400    fn test_operation_stats() {
401        let mut stats = OperationStats::new();
402        stats.record(100);
403        stats.record(200);
404        stats.record(150);
405
406        assert_eq!(stats.count, 3);
407        assert_eq!(stats.min_us, 100);
408        assert_eq!(stats.max_us, 200);
409        assert_eq!(stats.avg_us(), 150.0);
410    }
411
412    #[test]
413    fn test_advanced_statistics() {
414        let mut stats = OperationStats::new();
415
416        // Record multiple samples
417        for val in &[100, 150, 200, 250, 300, 350, 400, 450, 500] {
418            stats.record(*val);
419        }
420
421        assert_eq!(stats.count, 9);
422        assert_eq!(stats.avg_us(), 300.0);
423
424        // Test percentiles
425        let p50 = stats.percentile(50.0);
426        assert!(p50 >= 250 && p50 <= 350); // Median should be ~300
427
428        let p95 = stats.p95_us();
429        assert!(p95 >= 400); // P95 should be high
430
431        let p99 = stats.p99_us();
432        assert!(p99 >= 450); // P99 should be very high
433
434        // Test standard deviation (should be non-zero for varied data)
435        let std_dev = stats.std_dev_us();
436        assert!(std_dev > 0.0);
437        assert!(std_dev < 200.0); // Reasonable for this data set
438    }
439
440    #[test]
441    fn test_histogram_buckets() {
442        let mut stats = OperationStats::new();
443
444        stats.record(50);
445        stats.record(150);
446        stats.record(250);
447        stats.record(750);
448        stats.record(1500);
449
450        // Count below thresholds
451        assert_eq!(stats.count_below(100), 1); // Only 50
452        assert_eq!(stats.count_below(500), 3); // 50, 150, 250
453        assert_eq!(stats.count_below(1000), 4); // All except 1500
454        assert_eq!(stats.count_below(2000), 5); // All samples
455    }
456
457    #[test]
458    fn test_telemetry_reset() {
459        let mut telemetry = Telemetry::default_config();
460
461        telemetry.increment_counter("test");
462        assert_eq!(telemetry.snapshot().counters.get("test"), Some(&1));
463
464        telemetry.reset();
465        assert_eq!(telemetry.snapshot().counters.get("test"), None);
466    }
467
468    #[test]
469    fn test_snapshot_summary() {
470        let mut telemetry = Telemetry::default_config();
471        telemetry.record_operation("test_op", 500);
472
473        let snapshot = telemetry.snapshot();
474        let summary = snapshot.summary();
475
476        assert!(summary.contains("Telemetry Snapshot"));
477        assert!(summary.contains("test_op"));
478    }
479
480    #[test]
481    fn test_disabled_telemetry() {
482        let config = TelemetryConfig {
483            enabled: false,
484            ..TelemetryConfig::default()
485        };
486        let mut telemetry = Telemetry::new(config);
487
488        telemetry.record_operation("query", 1000);
489        telemetry.increment_counter("test");
490
491        let snapshot = telemetry.snapshot();
492        assert!(snapshot.operation_stats.is_empty());
493        assert!(snapshot.counters.is_empty());
494    }
495}