Skip to main content

heliosdb_proxy/plugins/
metrics.rs

1//! Plugin Metrics
2//!
3//! Metrics collection and reporting for the plugin system.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use dashmap::DashMap;
10
11use super::HookType;
12
13/// Plugin metrics collector
14pub struct PluginMetrics {
15    /// Per-plugin statistics. DashMap so concurrent hook calls for
16    /// different plugins land on different shards instead of serializing
17    /// on one global write lock.
18    plugin_stats: DashMap<String, PluginStatsInner>,
19
20    /// Global counters
21    global: GlobalMetrics,
22
23    /// Hook latency histograms (sharded per hook type).
24    hook_latencies: DashMap<HookType, LatencyHistogram>,
25
26    /// Creation time
27    created_at: Instant,
28}
29
30impl PluginMetrics {
31    /// Create a new metrics collector
32    pub fn new() -> Self {
33        Self {
34            plugin_stats: DashMap::new(),
35            global: GlobalMetrics::new(),
36            hook_latencies: DashMap::new(),
37            created_at: Instant::now(),
38        }
39    }
40
41    /// Record a hook call
42    pub fn record_hook_call(
43        &self,
44        plugin_name: &str,
45        hook: HookType,
46        latency: Duration,
47        success: bool,
48    ) {
49        // Update global counters
50        self.global.total_calls.fetch_add(1, Ordering::Relaxed);
51        if !success {
52            self.global.total_errors.fetch_add(1, Ordering::Relaxed);
53        }
54
55        // Update plugin-specific stats
56        {
57            let mut entry = self
58                .plugin_stats
59                .entry(plugin_name.to_string())
60                .or_insert_with(PluginStatsInner::new);
61
62            entry.total_calls += 1;
63            if success {
64                entry.successful_calls += 1;
65            } else {
66                entry.failed_calls += 1;
67            }
68            entry.total_latency += latency;
69
70            if latency > entry.max_latency {
71                entry.max_latency = latency;
72            }
73            if entry.min_latency == Duration::ZERO || latency < entry.min_latency {
74                entry.min_latency = latency;
75            }
76
77            // Update per-hook stats
78            let hook_entry = entry
79                .hook_stats
80                .entry(hook)
81                .or_insert_with(HookStatsInner::new);
82            hook_entry.calls += 1;
83            hook_entry.latency += latency;
84            if !success {
85                hook_entry.errors += 1;
86            }
87        }
88
89        // Update hook latency histogram
90        {
91            let mut histogram = self
92                .hook_latencies
93                .entry(hook)
94                .or_insert_with(LatencyHistogram::new);
95            histogram.record(latency);
96        }
97    }
98
99    /// Record a plugin load
100    pub fn record_plugin_load(&self, plugin_name: &str) {
101        self.global.plugins_loaded.fetch_add(1, Ordering::Relaxed);
102
103        let mut entry = self
104            .plugin_stats
105            .entry(plugin_name.to_string())
106            .or_insert_with(PluginStatsInner::new);
107        entry.loaded_at = Some(Instant::now());
108    }
109
110    /// Record a plugin unload
111    pub fn record_plugin_unload(&self, plugin_name: &str) {
112        self.global.plugins_unloaded.fetch_add(1, Ordering::Relaxed);
113
114        if let Some(mut entry) = self.plugin_stats.get_mut(plugin_name) {
115            entry.unloaded_at = Some(Instant::now());
116        }
117    }
118
119    /// Record a plugin error
120    pub fn record_plugin_error(&self, plugin_name: &str, _error: &str) {
121        self.global.total_errors.fetch_add(1, Ordering::Relaxed);
122
123        let mut entry = self
124            .plugin_stats
125            .entry(plugin_name.to_string())
126            .or_insert_with(PluginStatsInner::new);
127        entry.error_count += 1;
128    }
129
130    /// Get plugin statistics
131    pub fn get_plugin_stats(&self, plugin_name: &str) -> PluginStats {
132        self.plugin_stats
133            .get(plugin_name)
134            .map(|s| s.to_public())
135            .unwrap_or_default()
136    }
137
138    /// Get all plugin statistics
139    pub fn get_all_stats(&self) -> HashMap<String, PluginStats> {
140        self.plugin_stats
141            .iter()
142            .map(|e| (e.key().clone(), e.value().to_public()))
143            .collect()
144    }
145
146    /// Get total calls
147    pub fn total_calls(&self) -> u64 {
148        self.global.total_calls.load(Ordering::Relaxed)
149    }
150
151    /// Get total errors
152    pub fn total_errors(&self) -> u64 {
153        self.global.total_errors.load(Ordering::Relaxed)
154    }
155
156    /// Get average latency across all plugins
157    pub fn avg_latency(&self) -> Duration {
158        let mut total_latency = Duration::ZERO;
159        let mut total_calls = 0u64;
160
161        for s in self.plugin_stats.iter() {
162            total_latency += s.total_latency;
163            total_calls += s.total_calls;
164        }
165
166        if total_calls == 0 {
167            Duration::ZERO
168        } else {
169            total_latency / total_calls as u32
170        }
171    }
172
173    /// Get hook latency
174    pub fn get_hook_latency(&self, hook: HookType) -> HookLatency {
175        self.hook_latencies
176            .get(&hook)
177            .map(|h| h.to_latency())
178            .unwrap_or_default()
179    }
180
181    /// Get uptime
182    pub fn uptime(&self) -> Duration {
183        self.created_at.elapsed()
184    }
185
186    /// Reset all metrics
187    pub fn reset(&self) {
188        self.global.total_calls.store(0, Ordering::Relaxed);
189        self.global.total_errors.store(0, Ordering::Relaxed);
190        self.plugin_stats.clear();
191        self.hook_latencies.clear();
192    }
193}
194
195impl Default for PluginMetrics {
196    fn default() -> Self {
197        Self::new()
198    }
199}
200
201/// Global metrics
202struct GlobalMetrics {
203    total_calls: AtomicU64,
204    total_errors: AtomicU64,
205    plugins_loaded: AtomicU64,
206    plugins_unloaded: AtomicU64,
207}
208
209impl GlobalMetrics {
210    fn new() -> Self {
211        Self {
212            total_calls: AtomicU64::new(0),
213            total_errors: AtomicU64::new(0),
214            plugins_loaded: AtomicU64::new(0),
215            plugins_unloaded: AtomicU64::new(0),
216        }
217    }
218}
219
220/// Internal plugin statistics
221struct PluginStatsInner {
222    total_calls: u64,
223    successful_calls: u64,
224    failed_calls: u64,
225    error_count: u64,
226    total_latency: Duration,
227    min_latency: Duration,
228    max_latency: Duration,
229    hook_stats: HashMap<HookType, HookStatsInner>,
230    loaded_at: Option<Instant>,
231    unloaded_at: Option<Instant>,
232}
233
234impl PluginStatsInner {
235    fn new() -> Self {
236        Self {
237            total_calls: 0,
238            successful_calls: 0,
239            failed_calls: 0,
240            error_count: 0,
241            total_latency: Duration::ZERO,
242            min_latency: Duration::ZERO,
243            max_latency: Duration::ZERO,
244            hook_stats: HashMap::new(),
245            loaded_at: None,
246            unloaded_at: None,
247        }
248    }
249
250    fn to_public(&self) -> PluginStats {
251        PluginStats {
252            total_calls: self.total_calls,
253            successful_calls: self.successful_calls,
254            failed_calls: self.failed_calls,
255            error_count: self.error_count,
256            avg_latency: if self.total_calls > 0 {
257                self.total_latency / self.total_calls as u32
258            } else {
259                Duration::ZERO
260            },
261            min_latency: self.min_latency,
262            max_latency: self.max_latency,
263            uptime: self.loaded_at.map(|t| t.elapsed()),
264        }
265    }
266}
267
268/// Internal hook statistics
269struct HookStatsInner {
270    calls: u64,
271    errors: u64,
272    latency: Duration,
273}
274
275impl HookStatsInner {
276    fn new() -> Self {
277        Self {
278            calls: 0,
279            errors: 0,
280            latency: Duration::ZERO,
281        }
282    }
283}
284
285/// Public plugin statistics
286#[derive(Debug, Clone, Default)]
287pub struct PluginStats {
288    /// Total calls
289    pub total_calls: u64,
290
291    /// Successful calls
292    pub successful_calls: u64,
293
294    /// Failed calls
295    pub failed_calls: u64,
296
297    /// Error count
298    pub error_count: u64,
299
300    /// Average latency
301    pub avg_latency: Duration,
302
303    /// Minimum latency
304    pub min_latency: Duration,
305
306    /// Maximum latency
307    pub max_latency: Duration,
308
309    /// Plugin uptime
310    pub uptime: Option<Duration>,
311}
312
313impl PluginStats {
314    /// Get success rate
315    pub fn success_rate(&self) -> f64 {
316        if self.total_calls == 0 {
317            1.0
318        } else {
319            self.successful_calls as f64 / self.total_calls as f64
320        }
321    }
322}
323
324/// Hook latency statistics
325#[derive(Debug, Clone, Default)]
326pub struct HookLatency {
327    /// Total calls
328    pub count: u64,
329
330    /// Average latency
331    pub avg: Duration,
332
333    /// P50 latency
334    pub p50: Duration,
335
336    /// P90 latency
337    pub p90: Duration,
338
339    /// P99 latency
340    pub p99: Duration,
341
342    /// Maximum latency
343    pub max: Duration,
344}
345
346/// Latency histogram
347struct LatencyHistogram {
348    /// Recorded latencies (sorted)
349    latencies: Vec<Duration>,
350
351    /// Maximum latency
352    max: Duration,
353
354    /// Sum for average
355    sum: Duration,
356}
357
358impl LatencyHistogram {
359    fn new() -> Self {
360        Self {
361            latencies: Vec::new(),
362            max: Duration::ZERO,
363            sum: Duration::ZERO,
364        }
365    }
366
367    fn record(&mut self, latency: Duration) {
368        self.latencies.push(latency);
369        self.sum += latency;
370        if latency > self.max {
371            self.max = latency;
372        }
373
374        // Keep sorted for percentile calculations
375        // In production, would use a more efficient data structure
376        self.latencies.sort();
377
378        // Limit size to prevent memory growth
379        if self.latencies.len() > 10000 {
380            self.latencies.drain(0..5000);
381        }
382    }
383
384    fn percentile(&self, p: f64) -> Duration {
385        if self.latencies.is_empty() {
386            return Duration::ZERO;
387        }
388        let idx = ((self.latencies.len() as f64) * p / 100.0) as usize;
389        let idx = idx.min(self.latencies.len() - 1);
390        self.latencies[idx]
391    }
392
393    fn to_latency(&self) -> HookLatency {
394        HookLatency {
395            count: self.latencies.len() as u64,
396            avg: if self.latencies.is_empty() {
397                Duration::ZERO
398            } else {
399                self.sum / self.latencies.len() as u32
400            },
401            p50: self.percentile(50.0),
402            p90: self.percentile(90.0),
403            p99: self.percentile(99.0),
404            max: self.max,
405        }
406    }
407}
408
409/// Metrics exporter for Prometheus format
410pub struct MetricsExporter {
411    metrics: std::sync::Arc<PluginMetrics>,
412    prefix: String,
413}
414
415impl MetricsExporter {
416    /// Create a new exporter
417    pub fn new(metrics: std::sync::Arc<PluginMetrics>, prefix: &str) -> Self {
418        Self {
419            metrics,
420            prefix: prefix.to_string(),
421        }
422    }
423
424    /// Export metrics in Prometheus format
425    pub fn export(&self) -> String {
426        let mut output = String::new();
427
428        // Global metrics
429        output.push_str(&format!(
430            "# HELP {}_total_calls Total hook calls\n",
431            self.prefix
432        ));
433        output.push_str(&format!("# TYPE {}_total_calls counter\n", self.prefix));
434        output.push_str(&format!(
435            "{}_total_calls {}\n",
436            self.prefix,
437            self.metrics.total_calls()
438        ));
439
440        output.push_str(&format!(
441            "# HELP {}_total_errors Total errors\n",
442            self.prefix
443        ));
444        output.push_str(&format!("# TYPE {}_total_errors counter\n", self.prefix));
445        output.push_str(&format!(
446            "{}_total_errors {}\n",
447            self.prefix,
448            self.metrics.total_errors()
449        ));
450
451        // Per-plugin metrics
452        let all_stats = self.metrics.get_all_stats();
453        for (name, stats) in all_stats {
454            let name_label = name.replace('-', "_");
455
456            output.push_str(&format!(
457                "{}_plugin_calls{{plugin=\"{}\"}} {}\n",
458                self.prefix, name_label, stats.total_calls
459            ));
460
461            output.push_str(&format!(
462                "{}_plugin_errors{{plugin=\"{}\"}} {}\n",
463                self.prefix, name_label, stats.error_count
464            ));
465
466            output.push_str(&format!(
467                "{}_plugin_latency_avg_us{{plugin=\"{}\"}} {}\n",
468                self.prefix,
469                name_label,
470                stats.avg_latency.as_micros()
471            ));
472        }
473
474        output
475    }
476}
477
478#[cfg(test)]
479mod tests {
480    use super::*;
481
482    #[test]
483    fn test_plugin_metrics_new() {
484        let metrics = PluginMetrics::new();
485        assert_eq!(metrics.total_calls(), 0);
486        assert_eq!(metrics.total_errors(), 0);
487    }
488
489    #[test]
490    fn test_record_hook_call() {
491        let metrics = PluginMetrics::new();
492
493        metrics.record_hook_call(
494            "test-plugin",
495            HookType::PreQuery,
496            Duration::from_micros(50),
497            true,
498        );
499
500        assert_eq!(metrics.total_calls(), 1);
501        assert_eq!(metrics.total_errors(), 0);
502
503        let stats = metrics.get_plugin_stats("test-plugin");
504        assert_eq!(stats.total_calls, 1);
505        assert_eq!(stats.successful_calls, 1);
506    }
507
508    #[test]
509    fn test_record_hook_call_error() {
510        let metrics = PluginMetrics::new();
511
512        metrics.record_hook_call(
513            "test-plugin",
514            HookType::PreQuery,
515            Duration::from_micros(50),
516            false,
517        );
518
519        assert_eq!(metrics.total_calls(), 1);
520        assert_eq!(metrics.total_errors(), 1);
521
522        let stats = metrics.get_plugin_stats("test-plugin");
523        assert_eq!(stats.failed_calls, 1);
524    }
525
526    #[test]
527    fn test_plugin_stats_success_rate() {
528        let stats = PluginStats {
529            total_calls: 100,
530            successful_calls: 90,
531            failed_calls: 10,
532            ..Default::default()
533        };
534
535        assert!((stats.success_rate() - 0.9).abs() < 0.001);
536    }
537
538    #[test]
539    fn test_plugin_stats_default() {
540        let stats = PluginStats::default();
541        assert_eq!(stats.total_calls, 0);
542        assert_eq!(stats.success_rate(), 1.0);
543    }
544
545    #[test]
546    fn test_latency_histogram() {
547        let mut histogram = LatencyHistogram::new();
548
549        for i in 1..=100 {
550            histogram.record(Duration::from_micros(i));
551        }
552
553        let latency = histogram.to_latency();
554        assert_eq!(latency.count, 100);
555        assert!(latency.p50 >= Duration::from_micros(50));
556        assert!(latency.p99 >= Duration::from_micros(99));
557    }
558
559    #[test]
560    fn test_get_hook_latency() {
561        let metrics = PluginMetrics::new();
562
563        for i in 1..=10 {
564            metrics.record_hook_call(
565                "test",
566                HookType::PreQuery,
567                Duration::from_micros(i * 10),
568                true,
569            );
570        }
571
572        let latency = metrics.get_hook_latency(HookType::PreQuery);
573        assert_eq!(latency.count, 10);
574        assert!(latency.avg > Duration::ZERO);
575    }
576
577    #[test]
578    fn test_avg_latency() {
579        let metrics = PluginMetrics::new();
580
581        metrics.record_hook_call("p1", HookType::PreQuery, Duration::from_micros(100), true);
582        metrics.record_hook_call("p1", HookType::PreQuery, Duration::from_micros(200), true);
583
584        let avg = metrics.avg_latency();
585        assert_eq!(avg, Duration::from_micros(150));
586    }
587
588    #[test]
589    fn test_reset() {
590        let metrics = PluginMetrics::new();
591
592        metrics.record_hook_call("test", HookType::PreQuery, Duration::from_micros(50), true);
593        assert_eq!(metrics.total_calls(), 1);
594
595        metrics.reset();
596        assert_eq!(metrics.total_calls(), 0);
597    }
598
599    #[test]
600    fn test_metrics_exporter() {
601        let metrics = std::sync::Arc::new(PluginMetrics::new());
602
603        metrics.record_hook_call("test", HookType::PreQuery, Duration::from_micros(50), true);
604
605        let exporter = MetricsExporter::new(metrics, "helios_plugin");
606        let output = exporter.export();
607
608        assert!(output.contains("helios_plugin_total_calls"));
609        assert!(output.contains("helios_plugin_plugin_calls"));
610    }
611
612    #[test]
613    fn test_record_plugin_load_unload() {
614        let metrics = PluginMetrics::new();
615
616        metrics.record_plugin_load("test-plugin");
617
618        let stats = metrics.get_plugin_stats("test-plugin");
619        assert!(stats.uptime.is_some());
620
621        metrics.record_plugin_unload("test-plugin");
622    }
623}