Skip to main content

heliosdb_proxy/plugins/
metrics.rs

1//! Plugin Metrics
2//!
3//! Metrics collection and reporting for the plugin system.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9use dashmap::DashMap;
10
11use super::HookType;
12
13/// Plugin metrics collector
14pub struct PluginMetrics {
15    /// Per-plugin statistics. DashMap so concurrent hook calls for
16    /// different plugins land on different shards instead of serializing
17    /// on one global write lock.
18    plugin_stats: DashMap<String, PluginStatsInner>,
19
20    /// Global counters
21    global: GlobalMetrics,
22
23    /// Hook latency histograms (sharded per hook type).
24    hook_latencies: DashMap<HookType, LatencyHistogram>,
25
26    /// Creation time
27    created_at: Instant,
28}
29
30impl PluginMetrics {
31    /// Create a new metrics collector
32    pub fn new() -> Self {
33        Self {
34            plugin_stats: DashMap::new(),
35            global: GlobalMetrics::new(),
36            hook_latencies: DashMap::new(),
37            created_at: Instant::now(),
38        }
39    }
40
41    /// Record a hook call
42    pub fn record_hook_call(
43        &self,
44        plugin_name: &str,
45        hook: HookType,
46        latency: Duration,
47        success: bool,
48    ) {
49        // Update global counters
50        self.global.total_calls.fetch_add(1, Ordering::Relaxed);
51        if !success {
52            self.global.total_errors.fetch_add(1, Ordering::Relaxed);
53        }
54
55        // Update plugin-specific stats
56        {
57            let mut entry = self
58                .plugin_stats
59                .entry(plugin_name.to_string())
60                .or_insert_with(PluginStatsInner::new);
61
62            entry.total_calls += 1;
63            if success {
64                entry.successful_calls += 1;
65            } else {
66                entry.failed_calls += 1;
67            }
68            entry.total_latency += latency;
69
70            if latency > entry.max_latency {
71                entry.max_latency = latency;
72            }
73            if entry.min_latency == Duration::ZERO || latency < entry.min_latency {
74                entry.min_latency = latency;
75            }
76
77            // Update per-hook stats
78            let hook_entry = entry
79                .hook_stats
80                .entry(hook)
81                .or_insert_with(HookStatsInner::new);
82            hook_entry.calls += 1;
83            hook_entry.latency += latency;
84            if !success {
85                hook_entry.errors += 1;
86            }
87        }
88
89        // Update hook latency histogram
90        {
91            let mut histogram = self
92                .hook_latencies
93                .entry(hook)
94                .or_insert_with(LatencyHistogram::new);
95            histogram.record(latency);
96        }
97    }
98
99    /// Record a plugin load
100    pub fn record_plugin_load(&self, plugin_name: &str) {
101        self.global.plugins_loaded.fetch_add(1, Ordering::Relaxed);
102
103        let mut entry = self
104            .plugin_stats
105            .entry(plugin_name.to_string())
106            .or_insert_with(PluginStatsInner::new);
107        entry.loaded_at = Some(Instant::now());
108    }
109
110    /// Record a plugin unload
111    pub fn record_plugin_unload(&self, plugin_name: &str) {
112        self.global.plugins_unloaded.fetch_add(1, Ordering::Relaxed);
113
114        if let Some(mut entry) = self.plugin_stats.get_mut(plugin_name) {
115            entry.unloaded_at = Some(Instant::now());
116        }
117    }
118
119    /// Record a plugin error
120    pub fn record_plugin_error(&self, plugin_name: &str, _error: &str) {
121        self.global.total_errors.fetch_add(1, Ordering::Relaxed);
122
123        let mut entry = self
124            .plugin_stats
125            .entry(plugin_name.to_string())
126            .or_insert_with(PluginStatsInner::new);
127        entry.error_count += 1;
128    }
129
130    /// Get plugin statistics
131    pub fn get_plugin_stats(&self, plugin_name: &str) -> PluginStats {
132        self.plugin_stats
133            .get(plugin_name)
134            .map(|s| s.to_public())
135            .unwrap_or_default()
136    }
137
138    /// Get all plugin statistics
139    pub fn get_all_stats(&self) -> HashMap<String, PluginStats> {
140        self.plugin_stats
141            .iter()
142            .map(|e| (e.key().clone(), e.value().to_public()))
143            .collect()
144    }
145
146    /// Get total calls
147    pub fn total_calls(&self) -> u64 {
148        self.global.total_calls.load(Ordering::Relaxed)
149    }
150
151    /// Get total errors
152    pub fn total_errors(&self) -> u64 {
153        self.global.total_errors.load(Ordering::Relaxed)
154    }
155
156    /// Get average latency across all plugins
157    pub fn avg_latency(&self) -> Duration {
158        let mut total_latency = Duration::ZERO;
159        let mut total_calls = 0u64;
160
161        for s in self.plugin_stats.iter() {
162            total_latency += s.total_latency;
163            total_calls += s.total_calls;
164        }
165
166        if total_calls == 0 {
167            Duration::ZERO
168        } else {
169            total_latency / total_calls as u32
170        }
171    }
172
173    /// Get hook latency
174    pub fn get_hook_latency(&self, hook: HookType) -> HookLatency {
175        self.hook_latencies
176            .get(&hook)
177            .map(|h| h.to_latency())
178            .unwrap_or_default()
179    }
180
181    /// Get uptime
182    pub fn uptime(&self) -> Duration {
183        self.created_at.elapsed()
184    }
185
186    /// Reset all metrics
187    pub fn reset(&self) {
188        self.global.total_calls.store(0, Ordering::Relaxed);
189        self.global.total_errors.store(0, Ordering::Relaxed);
190        self.plugin_stats.clear();
191        self.hook_latencies.clear();
192    }
193}
194
195impl Default for PluginMetrics {
196    fn default() -> Self {
197        Self::new()
198    }
199}
200
201/// Global metrics
202struct GlobalMetrics {
203    total_calls: AtomicU64,
204    total_errors: AtomicU64,
205    plugins_loaded: AtomicU64,
206    plugins_unloaded: AtomicU64,
207}
208
209impl GlobalMetrics {
210    fn new() -> Self {
211        Self {
212            total_calls: AtomicU64::new(0),
213            total_errors: AtomicU64::new(0),
214            plugins_loaded: AtomicU64::new(0),
215            plugins_unloaded: AtomicU64::new(0),
216        }
217    }
218}
219
220/// Internal plugin statistics
221struct PluginStatsInner {
222    total_calls: u64,
223    successful_calls: u64,
224    failed_calls: u64,
225    error_count: u64,
226    total_latency: Duration,
227    min_latency: Duration,
228    max_latency: Duration,
229    hook_stats: HashMap<HookType, HookStatsInner>,
230    loaded_at: Option<Instant>,
231    unloaded_at: Option<Instant>,
232}
233
234impl PluginStatsInner {
235    fn new() -> Self {
236        Self {
237            total_calls: 0,
238            successful_calls: 0,
239            failed_calls: 0,
240            error_count: 0,
241            total_latency: Duration::ZERO,
242            min_latency: Duration::ZERO,
243            max_latency: Duration::ZERO,
244            hook_stats: HashMap::new(),
245            loaded_at: None,
246            unloaded_at: None,
247        }
248    }
249
250    fn to_public(&self) -> PluginStats {
251        PluginStats {
252            total_calls: self.total_calls,
253            successful_calls: self.successful_calls,
254            failed_calls: self.failed_calls,
255            error_count: self.error_count,
256            avg_latency: if self.total_calls > 0 {
257                self.total_latency / self.total_calls as u32
258            } else {
259                Duration::ZERO
260            },
261            min_latency: self.min_latency,
262            max_latency: self.max_latency,
263            uptime: self.loaded_at.map(|t| t.elapsed()),
264        }
265    }
266}
267
268/// Internal hook statistics
269struct HookStatsInner {
270    calls: u64,
271    errors: u64,
272    latency: Duration,
273}
274
275impl HookStatsInner {
276    fn new() -> Self {
277        Self {
278            calls: 0,
279            errors: 0,
280            latency: Duration::ZERO,
281        }
282    }
283}
284
285/// Public plugin statistics
286#[derive(Debug, Clone, Default)]
287pub struct PluginStats {
288    /// Total calls
289    pub total_calls: u64,
290
291    /// Successful calls
292    pub successful_calls: u64,
293
294    /// Failed calls
295    pub failed_calls: u64,
296
297    /// Error count
298    pub error_count: u64,
299
300    /// Average latency
301    pub avg_latency: Duration,
302
303    /// Minimum latency
304    pub min_latency: Duration,
305
306    /// Maximum latency
307    pub max_latency: Duration,
308
309    /// Plugin uptime
310    pub uptime: Option<Duration>,
311}
312
313impl PluginStats {
314    /// Get success rate
315    pub fn success_rate(&self) -> f64 {
316        if self.total_calls == 0 {
317            1.0
318        } else {
319            self.successful_calls as f64 / self.total_calls as f64
320        }
321    }
322}
323
324/// Hook latency statistics
325#[derive(Debug, Clone, Default)]
326pub struct HookLatency {
327    /// Total calls
328    pub count: u64,
329
330    /// Average latency
331    pub avg: Duration,
332
333    /// P50 latency
334    pub p50: Duration,
335
336    /// P90 latency
337    pub p90: Duration,
338
339    /// P99 latency
340    pub p99: Duration,
341
342    /// Maximum latency
343    pub max: Duration,
344}
345
346/// Latency histogram
347struct LatencyHistogram {
348    /// Recorded latencies (sorted)
349    latencies: Vec<Duration>,
350
351    /// Maximum latency
352    max: Duration,
353
354    /// Sum for average
355    sum: Duration,
356}
357
358impl LatencyHistogram {
359    fn new() -> Self {
360        Self {
361            latencies: Vec::new(),
362            max: Duration::ZERO,
363            sum: Duration::ZERO,
364        }
365    }
366
367    fn record(&mut self, latency: Duration) {
368        self.latencies.push(latency);
369        self.sum += latency;
370        if latency > self.max {
371            self.max = latency;
372        }
373
374        // Keep sorted for percentile calculations
375        // In production, would use a more efficient data structure
376        self.latencies.sort();
377
378        // Limit size to prevent memory growth
379        if self.latencies.len() > 10000 {
380            self.latencies.drain(0..5000);
381        }
382    }
383
384    fn percentile(&self, p: f64) -> Duration {
385        if self.latencies.is_empty() {
386            return Duration::ZERO;
387        }
388        let idx = ((self.latencies.len() as f64) * p / 100.0) as usize;
389        let idx = idx.min(self.latencies.len() - 1);
390        self.latencies[idx]
391    }
392
393    fn to_latency(&self) -> HookLatency {
394        HookLatency {
395            count: self.latencies.len() as u64,
396            avg: if self.latencies.is_empty() {
397                Duration::ZERO
398            } else {
399                self.sum / self.latencies.len() as u32
400            },
401            p50: self.percentile(50.0),
402            p90: self.percentile(90.0),
403            p99: self.percentile(99.0),
404            max: self.max,
405        }
406    }
407}
408
409/// Metrics exporter for Prometheus format
410pub struct MetricsExporter {
411    metrics: std::sync::Arc<PluginMetrics>,
412    prefix: String,
413}
414
415impl MetricsExporter {
416    /// Create a new exporter
417    pub fn new(metrics: std::sync::Arc<PluginMetrics>, prefix: &str) -> Self {
418        Self {
419            metrics,
420            prefix: prefix.to_string(),
421        }
422    }
423
424    /// Export metrics in Prometheus format
425    pub fn export(&self) -> String {
426        let mut output = String::new();
427
428        // Global metrics
429        output.push_str(&format!(
430            "# HELP {}_total_calls Total hook calls\n",
431            self.prefix
432        ));
433        output.push_str(&format!(
434            "# TYPE {}_total_calls counter\n",
435            self.prefix
436        ));
437        output.push_str(&format!(
438            "{}_total_calls {}\n",
439            self.prefix,
440            self.metrics.total_calls()
441        ));
442
443        output.push_str(&format!(
444            "# HELP {}_total_errors Total errors\n",
445            self.prefix
446        ));
447        output.push_str(&format!(
448            "# TYPE {}_total_errors counter\n",
449            self.prefix
450        ));
451        output.push_str(&format!(
452            "{}_total_errors {}\n",
453            self.prefix,
454            self.metrics.total_errors()
455        ));
456
457        // Per-plugin metrics
458        let all_stats = self.metrics.get_all_stats();
459        for (name, stats) in all_stats {
460            let name_label = name.replace('-', "_");
461
462            output.push_str(&format!(
463                "{}_plugin_calls{{plugin=\"{}\"}} {}\n",
464                self.prefix, name_label, stats.total_calls
465            ));
466
467            output.push_str(&format!(
468                "{}_plugin_errors{{plugin=\"{}\"}} {}\n",
469                self.prefix, name_label, stats.error_count
470            ));
471
472            output.push_str(&format!(
473                "{}_plugin_latency_avg_us{{plugin=\"{}\"}} {}\n",
474                self.prefix,
475                name_label,
476                stats.avg_latency.as_micros()
477            ));
478        }
479
480        output
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487
488    #[test]
489    fn test_plugin_metrics_new() {
490        let metrics = PluginMetrics::new();
491        assert_eq!(metrics.total_calls(), 0);
492        assert_eq!(metrics.total_errors(), 0);
493    }
494
495    #[test]
496    fn test_record_hook_call() {
497        let metrics = PluginMetrics::new();
498
499        metrics.record_hook_call(
500            "test-plugin",
501            HookType::PreQuery,
502            Duration::from_micros(50),
503            true,
504        );
505
506        assert_eq!(metrics.total_calls(), 1);
507        assert_eq!(metrics.total_errors(), 0);
508
509        let stats = metrics.get_plugin_stats("test-plugin");
510        assert_eq!(stats.total_calls, 1);
511        assert_eq!(stats.successful_calls, 1);
512    }
513
514    #[test]
515    fn test_record_hook_call_error() {
516        let metrics = PluginMetrics::new();
517
518        metrics.record_hook_call(
519            "test-plugin",
520            HookType::PreQuery,
521            Duration::from_micros(50),
522            false,
523        );
524
525        assert_eq!(metrics.total_calls(), 1);
526        assert_eq!(metrics.total_errors(), 1);
527
528        let stats = metrics.get_plugin_stats("test-plugin");
529        assert_eq!(stats.failed_calls, 1);
530    }
531
532    #[test]
533    fn test_plugin_stats_success_rate() {
534        let stats = PluginStats {
535            total_calls: 100,
536            successful_calls: 90,
537            failed_calls: 10,
538            ..Default::default()
539        };
540
541        assert!((stats.success_rate() - 0.9).abs() < 0.001);
542    }
543
544    #[test]
545    fn test_plugin_stats_default() {
546        let stats = PluginStats::default();
547        assert_eq!(stats.total_calls, 0);
548        assert_eq!(stats.success_rate(), 1.0);
549    }
550
551    #[test]
552    fn test_latency_histogram() {
553        let mut histogram = LatencyHistogram::new();
554
555        for i in 1..=100 {
556            histogram.record(Duration::from_micros(i));
557        }
558
559        let latency = histogram.to_latency();
560        assert_eq!(latency.count, 100);
561        assert!(latency.p50 >= Duration::from_micros(50));
562        assert!(latency.p99 >= Duration::from_micros(99));
563    }
564
565    #[test]
566    fn test_get_hook_latency() {
567        let metrics = PluginMetrics::new();
568
569        for i in 1..=10 {
570            metrics.record_hook_call(
571                "test",
572                HookType::PreQuery,
573                Duration::from_micros(i * 10),
574                true,
575            );
576        }
577
578        let latency = metrics.get_hook_latency(HookType::PreQuery);
579        assert_eq!(latency.count, 10);
580        assert!(latency.avg > Duration::ZERO);
581    }
582
583    #[test]
584    fn test_avg_latency() {
585        let metrics = PluginMetrics::new();
586
587        metrics.record_hook_call("p1", HookType::PreQuery, Duration::from_micros(100), true);
588        metrics.record_hook_call("p1", HookType::PreQuery, Duration::from_micros(200), true);
589
590        let avg = metrics.avg_latency();
591        assert_eq!(avg, Duration::from_micros(150));
592    }
593
594    #[test]
595    fn test_reset() {
596        let metrics = PluginMetrics::new();
597
598        metrics.record_hook_call("test", HookType::PreQuery, Duration::from_micros(50), true);
599        assert_eq!(metrics.total_calls(), 1);
600
601        metrics.reset();
602        assert_eq!(metrics.total_calls(), 0);
603    }
604
605    #[test]
606    fn test_metrics_exporter() {
607        let metrics = std::sync::Arc::new(PluginMetrics::new());
608
609        metrics.record_hook_call("test", HookType::PreQuery, Duration::from_micros(50), true);
610
611        let exporter = MetricsExporter::new(metrics, "helios_plugin");
612        let output = exporter.export();
613
614        assert!(output.contains("helios_plugin_total_calls"));
615        assert!(output.contains("helios_plugin_plugin_calls"));
616    }
617
618    #[test]
619    fn test_record_plugin_load_unload() {
620        let metrics = PluginMetrics::new();
621
622        metrics.record_plugin_load("test-plugin");
623
624        let stats = metrics.get_plugin_stats("test-plugin");
625        assert!(stats.uptime.is_some());
626
627        metrics.record_plugin_unload("test-plugin");
628    }
629}