Skip to main content

heliosdb_proxy/graphql/
metrics.rs

1//! GraphQL Metrics
2//!
3//! Metrics collection for GraphQL queries.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Mutex;
8use std::time::{Duration, Instant};
9
10use super::{OperationType, ErrorCode};
11
12/// GraphQL metrics collector
13#[derive(Debug)]
14pub struct GraphQLMetrics {
15    /// Query statistics
16    query_stats: QueryStats,
17    /// Operation metrics by name
18    operations: Mutex<HashMap<String, OperationMetrics>>,
19    /// Error counts by code
20    error_counts: Mutex<HashMap<ErrorCode, u64>>,
21    /// Created timestamp
22    created_at: Instant,
23}
24
25impl GraphQLMetrics {
26    /// Create a new metrics collector
27    pub fn new() -> Self {
28        Self {
29            query_stats: QueryStats::new(),
30            operations: Mutex::new(HashMap::new()),
31            error_counts: Mutex::new(HashMap::new()),
32            created_at: Instant::now(),
33        }
34    }
35
36    /// Record a query execution
37    pub fn record_query(&self, duration: Duration, operation_type: OperationType) {
38        self.query_stats.record(duration, operation_type);
39    }
40
41    /// Record a named operation
42    pub fn record_operation(&self, name: &str, duration: Duration, operation_type: OperationType) {
43        let mut operations = self.operations.lock().unwrap();
44        let metrics = operations
45            .entry(name.to_string())
46            .or_insert_with(|| OperationMetrics::new(operation_type));
47        metrics.record(duration);
48    }
49
50    /// Record an error
51    pub fn record_error(&self, error: &super::GraphQLError) {
52        let mut counts = self.error_counts.lock().unwrap();
53        *counts.entry(error.code).or_insert(0) += 1;
54    }
55
56    /// Get query statistics
57    pub fn query_stats(&self) -> &QueryStats {
58        &self.query_stats
59    }
60
61    /// Get operation metrics
62    pub fn operation_metrics(&self, name: &str) -> Option<OperationMetrics> {
63        self.operations.lock().unwrap().get(name).cloned()
64    }
65
66    /// Get all operation metrics
67    pub fn all_operations(&self) -> HashMap<String, OperationMetrics> {
68        self.operations.lock().unwrap().clone()
69    }
70
71    /// Get error counts
72    pub fn error_counts(&self) -> HashMap<ErrorCode, u64> {
73        self.error_counts.lock().unwrap().clone()
74    }
75
76    /// Get uptime
77    pub fn uptime(&self) -> Duration {
78        self.created_at.elapsed()
79    }
80
81    /// Reset all metrics
82    pub fn reset(&self) {
83        self.query_stats.reset();
84        self.operations.lock().unwrap().clear();
85        self.error_counts.lock().unwrap().clear();
86    }
87
88    /// Export metrics in Prometheus format
89    pub fn to_prometheus(&self) -> String {
90        let mut output = String::new();
91
92        // Query counts
93        output.push_str("# HELP helios_graphql_queries_total Total GraphQL queries\n");
94        output.push_str("# TYPE helios_graphql_queries_total counter\n");
95        output.push_str(&format!(
96            "helios_graphql_queries_total{{type=\"query\"}} {}\n",
97            self.query_stats.query_count.load(Ordering::Relaxed)
98        ));
99        output.push_str(&format!(
100            "helios_graphql_queries_total{{type=\"mutation\"}} {}\n",
101            self.query_stats.mutation_count.load(Ordering::Relaxed)
102        ));
103        output.push_str(&format!(
104            "helios_graphql_queries_total{{type=\"subscription\"}} {}\n",
105            self.query_stats.subscription_count.load(Ordering::Relaxed)
106        ));
107
108        // Latency
109        output.push_str("\n# HELP helios_graphql_latency_ms Query latency in milliseconds\n");
110        output.push_str("# TYPE helios_graphql_latency_ms gauge\n");
111        if let Some(avg) = self.query_stats.average_duration() {
112            output.push_str(&format!(
113                "helios_graphql_latency_ms{{quantile=\"avg\"}} {}\n",
114                avg.as_millis()
115            ));
116        }
117        if let Some(min) = self.query_stats.min_duration() {
118            output.push_str(&format!(
119                "helios_graphql_latency_ms{{quantile=\"min\"}} {}\n",
120                min.as_millis()
121            ));
122        }
123        if let Some(max) = self.query_stats.max_duration() {
124            output.push_str(&format!(
125                "helios_graphql_latency_ms{{quantile=\"max\"}} {}\n",
126                max.as_millis()
127            ));
128        }
129
130        // Errors
131        output.push_str("\n# HELP helios_graphql_errors_total Total GraphQL errors\n");
132        output.push_str("# TYPE helios_graphql_errors_total counter\n");
133        for (code, count) in self.error_counts() {
134            output.push_str(&format!(
135                "helios_graphql_errors_total{{code=\"{:?}\"}} {}\n",
136                code, count
137            ));
138        }
139
140        // Operations
141        output.push_str("\n# HELP helios_graphql_operation_calls Operation call counts\n");
142        output.push_str("# TYPE helios_graphql_operation_calls counter\n");
143        for (name, metrics) in self.all_operations() {
144            output.push_str(&format!(
145                "helios_graphql_operation_calls{{name=\"{}\"}} {}\n",
146                name,
147                metrics.call_count.load(Ordering::Relaxed)
148            ));
149        }
150
151        output
152    }
153}
154
155impl Default for GraphQLMetrics {
156    fn default() -> Self {
157        Self::new()
158    }
159}
160
161/// Query statistics
162#[derive(Debug)]
163pub struct QueryStats {
164    /// Total query count
165    pub query_count: AtomicU64,
166    /// Total mutation count
167    pub mutation_count: AtomicU64,
168    /// Total subscription count
169    pub subscription_count: AtomicU64,
170    /// Total duration (in microseconds)
171    total_duration_us: AtomicU64,
172    /// Minimum duration (in microseconds)
173    min_duration_us: AtomicU64,
174    /// Maximum duration (in microseconds)
175    max_duration_us: AtomicU64,
176    /// Latency histogram buckets (in microseconds)
177    latency_buckets: Mutex<LatencyHistogram>,
178}
179
180impl QueryStats {
181    /// Create new query statistics
182    pub fn new() -> Self {
183        Self {
184            query_count: AtomicU64::new(0),
185            mutation_count: AtomicU64::new(0),
186            subscription_count: AtomicU64::new(0),
187            total_duration_us: AtomicU64::new(0),
188            min_duration_us: AtomicU64::new(u64::MAX),
189            max_duration_us: AtomicU64::new(0),
190            latency_buckets: Mutex::new(LatencyHistogram::new()),
191        }
192    }
193
194    /// Record a query execution
195    pub fn record(&self, duration: Duration, operation_type: OperationType) {
196        let duration_us = duration.as_micros() as u64;
197
198        // Update operation count
199        match operation_type {
200            OperationType::Query => {
201                self.query_count.fetch_add(1, Ordering::Relaxed);
202            }
203            OperationType::Mutation => {
204                self.mutation_count.fetch_add(1, Ordering::Relaxed);
205            }
206            OperationType::Subscription => {
207                self.subscription_count.fetch_add(1, Ordering::Relaxed);
208            }
209        }
210
211        // Update duration stats
212        self.total_duration_us.fetch_add(duration_us, Ordering::Relaxed);
213
214        // Update min
215        let mut current_min = self.min_duration_us.load(Ordering::Relaxed);
216        while duration_us < current_min {
217            match self.min_duration_us.compare_exchange_weak(
218                current_min,
219                duration_us,
220                Ordering::Relaxed,
221                Ordering::Relaxed,
222            ) {
223                Ok(_) => break,
224                Err(x) => current_min = x,
225            }
226        }
227
228        // Update max
229        let mut current_max = self.max_duration_us.load(Ordering::Relaxed);
230        while duration_us > current_max {
231            match self.max_duration_us.compare_exchange_weak(
232                current_max,
233                duration_us,
234                Ordering::Relaxed,
235                Ordering::Relaxed,
236            ) {
237                Ok(_) => break,
238                Err(x) => current_max = x,
239            }
240        }
241
242        // Update histogram
243        self.latency_buckets.lock().unwrap().record(duration_us);
244    }
245
246    /// Get total query count
247    pub fn total_count(&self) -> u64 {
248        self.query_count.load(Ordering::Relaxed)
249            + self.mutation_count.load(Ordering::Relaxed)
250            + self.subscription_count.load(Ordering::Relaxed)
251    }
252
253    /// Get average duration
254    pub fn average_duration(&self) -> Option<Duration> {
255        let total = self.total_count();
256        if total == 0 {
257            return None;
258        }
259
260        let total_us = self.total_duration_us.load(Ordering::Relaxed);
261        Some(Duration::from_micros(total_us / total))
262    }
263
264    /// Get minimum duration
265    pub fn min_duration(&self) -> Option<Duration> {
266        let min = self.min_duration_us.load(Ordering::Relaxed);
267        if min == u64::MAX {
268            None
269        } else {
270            Some(Duration::from_micros(min))
271        }
272    }
273
274    /// Get maximum duration
275    pub fn max_duration(&self) -> Option<Duration> {
276        let max = self.max_duration_us.load(Ordering::Relaxed);
277        if max == 0 {
278            None
279        } else {
280            Some(Duration::from_micros(max))
281        }
282    }
283
284    /// Get percentile duration
285    pub fn percentile(&self, p: f64) -> Option<Duration> {
286        self.latency_buckets.lock().unwrap().percentile(p)
287    }
288
289    /// Reset statistics
290    pub fn reset(&self) {
291        self.query_count.store(0, Ordering::Relaxed);
292        self.mutation_count.store(0, Ordering::Relaxed);
293        self.subscription_count.store(0, Ordering::Relaxed);
294        self.total_duration_us.store(0, Ordering::Relaxed);
295        self.min_duration_us.store(u64::MAX, Ordering::Relaxed);
296        self.max_duration_us.store(0, Ordering::Relaxed);
297        self.latency_buckets.lock().unwrap().reset();
298    }
299}
300
301impl Default for QueryStats {
302    fn default() -> Self {
303        Self::new()
304    }
305}
306
307/// Operation-specific metrics
308#[derive(Debug)]
309pub struct OperationMetrics {
310    /// Operation type
311    pub operation_type: OperationType,
312    /// Call count
313    pub call_count: AtomicU64,
314    /// Total duration (microseconds)
315    pub total_duration_us: AtomicU64,
316    /// Error count
317    pub error_count: AtomicU64,
318}
319
320impl Clone for OperationMetrics {
321    fn clone(&self) -> Self {
322        Self {
323            operation_type: self.operation_type,
324            call_count: AtomicU64::new(self.call_count.load(Ordering::Relaxed)),
325            total_duration_us: AtomicU64::new(self.total_duration_us.load(Ordering::Relaxed)),
326            error_count: AtomicU64::new(self.error_count.load(Ordering::Relaxed)),
327        }
328    }
329}
330
331impl OperationMetrics {
332    /// Create new operation metrics
333    pub fn new(operation_type: OperationType) -> Self {
334        Self {
335            operation_type,
336            call_count: AtomicU64::new(0),
337            total_duration_us: AtomicU64::new(0),
338            error_count: AtomicU64::new(0),
339        }
340    }
341
342    /// Record an execution
343    pub fn record(&self, duration: Duration) {
344        self.call_count.fetch_add(1, Ordering::Relaxed);
345        self.total_duration_us.fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
346    }
347
348    /// Record an error
349    pub fn record_error(&self) {
350        self.error_count.fetch_add(1, Ordering::Relaxed);
351    }
352
353    /// Get average duration
354    pub fn average_duration(&self) -> Option<Duration> {
355        let count = self.call_count.load(Ordering::Relaxed);
356        if count == 0 {
357            return None;
358        }
359
360        let total_us = self.total_duration_us.load(Ordering::Relaxed);
361        Some(Duration::from_micros(total_us / count))
362    }
363
364    /// Get error rate
365    pub fn error_rate(&self) -> f64 {
366        let count = self.call_count.load(Ordering::Relaxed);
367        if count == 0 {
368            return 0.0;
369        }
370
371        self.error_count.load(Ordering::Relaxed) as f64 / count as f64
372    }
373}
374
375// Implement Clone for AtomicU64 by loading values
376impl Clone for QueryStats {
377    fn clone(&self) -> Self {
378        Self {
379            query_count: AtomicU64::new(self.query_count.load(Ordering::Relaxed)),
380            mutation_count: AtomicU64::new(self.mutation_count.load(Ordering::Relaxed)),
381            subscription_count: AtomicU64::new(self.subscription_count.load(Ordering::Relaxed)),
382            total_duration_us: AtomicU64::new(self.total_duration_us.load(Ordering::Relaxed)),
383            min_duration_us: AtomicU64::new(self.min_duration_us.load(Ordering::Relaxed)),
384            max_duration_us: AtomicU64::new(self.max_duration_us.load(Ordering::Relaxed)),
385            latency_buckets: Mutex::new(self.latency_buckets.lock().unwrap().clone()),
386        }
387    }
388}
389
390/// Latency histogram for percentile calculations
391#[derive(Debug, Clone)]
392struct LatencyHistogram {
393    /// Bucket boundaries (in microseconds)
394    boundaries: Vec<u64>,
395    /// Bucket counts
396    counts: Vec<u64>,
397    /// All values for percentile calculation (limited size)
398    values: Vec<u64>,
399    /// Maximum values to store
400    max_values: usize,
401}
402
403impl LatencyHistogram {
404    /// Create a new histogram
405    fn new() -> Self {
406        // Boundaries: 100us, 500us, 1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s, 5s
407        let boundaries = vec![
408            100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000, 5_000_000,
409        ];
410        let counts = vec![0u64; boundaries.len() + 1];
411
412        Self {
413            boundaries,
414            counts,
415            values: Vec::new(),
416            max_values: 10000,
417        }
418    }
419
420    /// Record a value
421    fn record(&mut self, value_us: u64) {
422        // Update bucket
423        let bucket = self.boundaries.iter()
424            .position(|&b| value_us <= b)
425            .unwrap_or(self.boundaries.len());
426        self.counts[bucket] += 1;
427
428        // Store value for percentile calculation
429        if self.values.len() < self.max_values {
430            self.values.push(value_us);
431        } else {
432            // Reservoir sampling
433            let idx = rand_index(self.values.len() + 1);
434            if idx < self.values.len() {
435                self.values[idx] = value_us;
436            }
437        }
438    }
439
440    /// Get percentile value
441    fn percentile(&self, p: f64) -> Option<Duration> {
442        if self.values.is_empty() {
443            return None;
444        }
445
446        let mut sorted = self.values.clone();
447        sorted.sort_unstable();
448
449        let idx = ((p / 100.0) * (sorted.len() - 1) as f64) as usize;
450        Some(Duration::from_micros(sorted[idx]))
451    }
452
453    /// Reset histogram
454    fn reset(&mut self) {
455        for count in &mut self.counts {
456            *count = 0;
457        }
458        self.values.clear();
459    }
460}
461
462/// Simple random index for reservoir sampling
463fn rand_index(max: usize) -> usize {
464    use std::time::SystemTime;
465    let seed = SystemTime::now()
466        .duration_since(SystemTime::UNIX_EPOCH)
467        .unwrap()
468        .subsec_nanos() as usize;
469    seed % max
470}
471
472/// Metrics reporter trait
473pub trait MetricsReporter: Send + Sync {
474    /// Report metrics
475    fn report(&self, metrics: &GraphQLMetrics);
476}
477
478/// Console metrics reporter
479pub struct ConsoleReporter;
480
481impl MetricsReporter for ConsoleReporter {
482    fn report(&self, metrics: &GraphQLMetrics) {
483        let stats = metrics.query_stats();
484
485        println!("=== GraphQL Metrics ===");
486        println!("Queries: {}", stats.query_count.load(Ordering::Relaxed));
487        println!("Mutations: {}", stats.mutation_count.load(Ordering::Relaxed));
488        println!("Subscriptions: {}", stats.subscription_count.load(Ordering::Relaxed));
489
490        if let Some(avg) = stats.average_duration() {
491            println!("Avg latency: {:?}", avg);
492        }
493        if let Some(min) = stats.min_duration() {
494            println!("Min latency: {:?}", min);
495        }
496        if let Some(max) = stats.max_duration() {
497            println!("Max latency: {:?}", max);
498        }
499
500        println!("Errors: {:?}", metrics.error_counts());
501    }
502}
503
504#[cfg(test)]
505mod tests {
506    use super::*;
507
508    #[test]
509    fn test_query_stats_recording() {
510        let stats = QueryStats::new();
511
512        stats.record(Duration::from_millis(10), OperationType::Query);
513        stats.record(Duration::from_millis(20), OperationType::Query);
514        stats.record(Duration::from_millis(5), OperationType::Mutation);
515
516        assert_eq!(stats.query_count.load(Ordering::Relaxed), 2);
517        assert_eq!(stats.mutation_count.load(Ordering::Relaxed), 1);
518        assert_eq!(stats.total_count(), 3);
519    }
520
521    #[test]
522    fn test_query_stats_duration() {
523        let stats = QueryStats::new();
524
525        stats.record(Duration::from_millis(10), OperationType::Query);
526        stats.record(Duration::from_millis(20), OperationType::Query);
527        stats.record(Duration::from_millis(30), OperationType::Query);
528
529        assert_eq!(stats.min_duration(), Some(Duration::from_millis(10)));
530        assert_eq!(stats.max_duration(), Some(Duration::from_millis(30)));
531        assert_eq!(stats.average_duration(), Some(Duration::from_millis(20)));
532    }
533
534    #[test]
535    fn test_graphql_metrics() {
536        let metrics = GraphQLMetrics::new();
537
538        metrics.record_query(Duration::from_millis(10), OperationType::Query);
539        metrics.record_operation("GetUsers", Duration::from_millis(10), OperationType::Query);
540
541        assert_eq!(metrics.query_stats().total_count(), 1);
542        assert!(metrics.operation_metrics("GetUsers").is_some());
543    }
544
545    #[test]
546    fn test_operation_metrics() {
547        let metrics = OperationMetrics::new(OperationType::Query);
548
549        metrics.record(Duration::from_millis(10));
550        metrics.record(Duration::from_millis(20));
551        metrics.record_error();
552
553        assert_eq!(metrics.call_count.load(Ordering::Relaxed), 2);
554        assert_eq!(metrics.error_count.load(Ordering::Relaxed), 1);
555        assert_eq!(metrics.average_duration(), Some(Duration::from_millis(15)));
556        assert_eq!(metrics.error_rate(), 0.5);
557    }
558
559    #[test]
560    fn test_error_recording() {
561        let metrics = GraphQLMetrics::new();
562
563        let error1 = super::super::GraphQLError::parse_error("error1");
564        let error2 = super::super::GraphQLError::parse_error("error2");
565        let error3 = super::super::GraphQLError::validation_error("error3");
566
567        metrics.record_error(&error1);
568        metrics.record_error(&error2);
569        metrics.record_error(&error3);
570
571        let counts = metrics.error_counts();
572        assert_eq!(counts.get(&ErrorCode::ParseError), Some(&2));
573        assert_eq!(counts.get(&ErrorCode::ValidationError), Some(&1));
574    }
575
576    #[test]
577    fn test_prometheus_export() {
578        let metrics = GraphQLMetrics::new();
579
580        metrics.record_query(Duration::from_millis(10), OperationType::Query);
581        metrics.record_query(Duration::from_millis(5), OperationType::Mutation);
582
583        let output = metrics.to_prometheus();
584
585        assert!(output.contains("helios_graphql_queries_total"));
586        assert!(output.contains("helios_graphql_latency_ms"));
587    }
588
589    #[test]
590    fn test_metrics_reset() {
591        let metrics = GraphQLMetrics::new();
592
593        metrics.record_query(Duration::from_millis(10), OperationType::Query);
594        metrics.record_operation("GetUsers", Duration::from_millis(10), OperationType::Query);
595
596        assert_eq!(metrics.query_stats().total_count(), 1);
597
598        metrics.reset();
599
600        assert_eq!(metrics.query_stats().total_count(), 0);
601        assert!(metrics.all_operations().is_empty());
602    }
603
604    #[test]
605    fn test_latency_histogram_percentile() {
606        let mut histogram = LatencyHistogram::new();
607
608        for i in 1..=100 {
609            histogram.record(i * 1000); // 1ms to 100ms
610        }
611
612        let p50 = histogram.percentile(50.0).unwrap();
613        let p99 = histogram.percentile(99.0).unwrap();
614
615        // p50 should be around 50ms
616        assert!(p50.as_millis() >= 45 && p50.as_millis() <= 55);
617
618        // p99 should be around 99ms
619        assert!(p99.as_millis() >= 95);
620    }
621
622    #[test]
623    fn test_query_stats_empty() {
624        let stats = QueryStats::new();
625
626        assert_eq!(stats.total_count(), 0);
627        assert!(stats.average_duration().is_none());
628        assert!(stats.min_duration().is_none());
629        assert!(stats.max_duration().is_none());
630    }
631
632    #[test]
633    fn test_metrics_uptime() {
634        let metrics = GraphQLMetrics::new();
635
636        std::thread::sleep(Duration::from_millis(10));
637
638        let uptime = metrics.uptime();
639        assert!(uptime >= Duration::from_millis(10));
640    }
641
642    #[test]
643    fn test_operation_metrics_error_rate_zero() {
644        let metrics = OperationMetrics::new(OperationType::Query);
645
646        assert_eq!(metrics.error_rate(), 0.0);
647    }
648}