Skip to main content

heliosdb_proxy/graphql/
metrics.rs

1//! GraphQL Metrics
2//!
3//! Metrics collection for GraphQL queries.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::sync::Mutex;
8use std::time::{Duration, Instant};
9
10use super::{ErrorCode, OperationType};
11
12/// GraphQL metrics collector
13#[derive(Debug)]
14pub struct GraphQLMetrics {
15    /// Query statistics
16    query_stats: QueryStats,
17    /// Operation metrics by name
18    operations: Mutex<HashMap<String, OperationMetrics>>,
19    /// Error counts by code
20    error_counts: Mutex<HashMap<ErrorCode, u64>>,
21    /// Created timestamp
22    created_at: Instant,
23}
24
25impl GraphQLMetrics {
26    /// Create a new metrics collector
27    pub fn new() -> Self {
28        Self {
29            query_stats: QueryStats::new(),
30            operations: Mutex::new(HashMap::new()),
31            error_counts: Mutex::new(HashMap::new()),
32            created_at: Instant::now(),
33        }
34    }
35
36    /// Record a query execution
37    pub fn record_query(&self, duration: Duration, operation_type: OperationType) {
38        self.query_stats.record(duration, operation_type);
39    }
40
41    /// Record a named operation
42    pub fn record_operation(&self, name: &str, duration: Duration, operation_type: OperationType) {
43        let mut operations = self.operations.lock().unwrap();
44        let metrics = operations
45            .entry(name.to_string())
46            .or_insert_with(|| OperationMetrics::new(operation_type));
47        metrics.record(duration);
48    }
49
50    /// Record an error
51    pub fn record_error(&self, error: &super::GraphQLError) {
52        let mut counts = self.error_counts.lock().unwrap();
53        *counts.entry(error.code).or_insert(0) += 1;
54    }
55
56    /// Get query statistics
57    pub fn query_stats(&self) -> &QueryStats {
58        &self.query_stats
59    }
60
61    /// Get operation metrics
62    pub fn operation_metrics(&self, name: &str) -> Option<OperationMetrics> {
63        self.operations.lock().unwrap().get(name).cloned()
64    }
65
66    /// Get all operation metrics
67    pub fn all_operations(&self) -> HashMap<String, OperationMetrics> {
68        self.operations.lock().unwrap().clone()
69    }
70
71    /// Get error counts
72    pub fn error_counts(&self) -> HashMap<ErrorCode, u64> {
73        self.error_counts.lock().unwrap().clone()
74    }
75
76    /// Get uptime
77    pub fn uptime(&self) -> Duration {
78        self.created_at.elapsed()
79    }
80
81    /// Reset all metrics
82    pub fn reset(&self) {
83        self.query_stats.reset();
84        self.operations.lock().unwrap().clear();
85        self.error_counts.lock().unwrap().clear();
86    }
87
88    /// Export metrics in Prometheus format
89    pub fn to_prometheus(&self) -> String {
90        let mut output = String::new();
91
92        // Query counts
93        output.push_str("# HELP helios_graphql_queries_total Total GraphQL queries\n");
94        output.push_str("# TYPE helios_graphql_queries_total counter\n");
95        output.push_str(&format!(
96            "helios_graphql_queries_total{{type=\"query\"}} {}\n",
97            self.query_stats.query_count.load(Ordering::Relaxed)
98        ));
99        output.push_str(&format!(
100            "helios_graphql_queries_total{{type=\"mutation\"}} {}\n",
101            self.query_stats.mutation_count.load(Ordering::Relaxed)
102        ));
103        output.push_str(&format!(
104            "helios_graphql_queries_total{{type=\"subscription\"}} {}\n",
105            self.query_stats.subscription_count.load(Ordering::Relaxed)
106        ));
107
108        // Latency
109        output.push_str("\n# HELP helios_graphql_latency_ms Query latency in milliseconds\n");
110        output.push_str("# TYPE helios_graphql_latency_ms gauge\n");
111        if let Some(avg) = self.query_stats.average_duration() {
112            output.push_str(&format!(
113                "helios_graphql_latency_ms{{quantile=\"avg\"}} {}\n",
114                avg.as_millis()
115            ));
116        }
117        if let Some(min) = self.query_stats.min_duration() {
118            output.push_str(&format!(
119                "helios_graphql_latency_ms{{quantile=\"min\"}} {}\n",
120                min.as_millis()
121            ));
122        }
123        if let Some(max) = self.query_stats.max_duration() {
124            output.push_str(&format!(
125                "helios_graphql_latency_ms{{quantile=\"max\"}} {}\n",
126                max.as_millis()
127            ));
128        }
129
130        // Errors
131        output.push_str("\n# HELP helios_graphql_errors_total Total GraphQL errors\n");
132        output.push_str("# TYPE helios_graphql_errors_total counter\n");
133        for (code, count) in self.error_counts() {
134            output.push_str(&format!(
135                "helios_graphql_errors_total{{code=\"{:?}\"}} {}\n",
136                code, count
137            ));
138        }
139
140        // Operations
141        output.push_str("\n# HELP helios_graphql_operation_calls Operation call counts\n");
142        output.push_str("# TYPE helios_graphql_operation_calls counter\n");
143        for (name, metrics) in self.all_operations() {
144            output.push_str(&format!(
145                "helios_graphql_operation_calls{{name=\"{}\"}} {}\n",
146                name,
147                metrics.call_count.load(Ordering::Relaxed)
148            ));
149        }
150
151        output
152    }
153}
154
155impl Default for GraphQLMetrics {
156    fn default() -> Self {
157        Self::new()
158    }
159}
160
161/// Query statistics
162#[derive(Debug)]
163pub struct QueryStats {
164    /// Total query count
165    pub query_count: AtomicU64,
166    /// Total mutation count
167    pub mutation_count: AtomicU64,
168    /// Total subscription count
169    pub subscription_count: AtomicU64,
170    /// Total duration (in microseconds)
171    total_duration_us: AtomicU64,
172    /// Minimum duration (in microseconds)
173    min_duration_us: AtomicU64,
174    /// Maximum duration (in microseconds)
175    max_duration_us: AtomicU64,
176    /// Latency histogram buckets (in microseconds)
177    latency_buckets: Mutex<LatencyHistogram>,
178}
179
180impl QueryStats {
181    /// Create new query statistics
182    pub fn new() -> Self {
183        Self {
184            query_count: AtomicU64::new(0),
185            mutation_count: AtomicU64::new(0),
186            subscription_count: AtomicU64::new(0),
187            total_duration_us: AtomicU64::new(0),
188            min_duration_us: AtomicU64::new(u64::MAX),
189            max_duration_us: AtomicU64::new(0),
190            latency_buckets: Mutex::new(LatencyHistogram::new()),
191        }
192    }
193
194    /// Record a query execution
195    pub fn record(&self, duration: Duration, operation_type: OperationType) {
196        let duration_us = duration.as_micros() as u64;
197
198        // Update operation count
199        match operation_type {
200            OperationType::Query => {
201                self.query_count.fetch_add(1, Ordering::Relaxed);
202            }
203            OperationType::Mutation => {
204                self.mutation_count.fetch_add(1, Ordering::Relaxed);
205            }
206            OperationType::Subscription => {
207                self.subscription_count.fetch_add(1, Ordering::Relaxed);
208            }
209        }
210
211        // Update duration stats
212        self.total_duration_us
213            .fetch_add(duration_us, Ordering::Relaxed);
214
215        // Update min
216        let mut current_min = self.min_duration_us.load(Ordering::Relaxed);
217        while duration_us < current_min {
218            match self.min_duration_us.compare_exchange_weak(
219                current_min,
220                duration_us,
221                Ordering::Relaxed,
222                Ordering::Relaxed,
223            ) {
224                Ok(_) => break,
225                Err(x) => current_min = x,
226            }
227        }
228
229        // Update max
230        let mut current_max = self.max_duration_us.load(Ordering::Relaxed);
231        while duration_us > current_max {
232            match self.max_duration_us.compare_exchange_weak(
233                current_max,
234                duration_us,
235                Ordering::Relaxed,
236                Ordering::Relaxed,
237            ) {
238                Ok(_) => break,
239                Err(x) => current_max = x,
240            }
241        }
242
243        // Update histogram
244        self.latency_buckets.lock().unwrap().record(duration_us);
245    }
246
247    /// Get total query count
248    pub fn total_count(&self) -> u64 {
249        self.query_count.load(Ordering::Relaxed)
250            + self.mutation_count.load(Ordering::Relaxed)
251            + self.subscription_count.load(Ordering::Relaxed)
252    }
253
254    /// Get average duration
255    pub fn average_duration(&self) -> Option<Duration> {
256        let total = self.total_count();
257        if total == 0 {
258            return None;
259        }
260
261        let total_us = self.total_duration_us.load(Ordering::Relaxed);
262        Some(Duration::from_micros(total_us / total))
263    }
264
265    /// Get minimum duration
266    pub fn min_duration(&self) -> Option<Duration> {
267        let min = self.min_duration_us.load(Ordering::Relaxed);
268        if min == u64::MAX {
269            None
270        } else {
271            Some(Duration::from_micros(min))
272        }
273    }
274
275    /// Get maximum duration
276    pub fn max_duration(&self) -> Option<Duration> {
277        let max = self.max_duration_us.load(Ordering::Relaxed);
278        if max == 0 {
279            None
280        } else {
281            Some(Duration::from_micros(max))
282        }
283    }
284
285    /// Get percentile duration
286    pub fn percentile(&self, p: f64) -> Option<Duration> {
287        self.latency_buckets.lock().unwrap().percentile(p)
288    }
289
290    /// Reset statistics
291    pub fn reset(&self) {
292        self.query_count.store(0, Ordering::Relaxed);
293        self.mutation_count.store(0, Ordering::Relaxed);
294        self.subscription_count.store(0, Ordering::Relaxed);
295        self.total_duration_us.store(0, Ordering::Relaxed);
296        self.min_duration_us.store(u64::MAX, Ordering::Relaxed);
297        self.max_duration_us.store(0, Ordering::Relaxed);
298        self.latency_buckets.lock().unwrap().reset();
299    }
300}
301
302impl Default for QueryStats {
303    fn default() -> Self {
304        Self::new()
305    }
306}
307
308/// Operation-specific metrics
309#[derive(Debug)]
310pub struct OperationMetrics {
311    /// Operation type
312    pub operation_type: OperationType,
313    /// Call count
314    pub call_count: AtomicU64,
315    /// Total duration (microseconds)
316    pub total_duration_us: AtomicU64,
317    /// Error count
318    pub error_count: AtomicU64,
319}
320
321impl Clone for OperationMetrics {
322    fn clone(&self) -> Self {
323        Self {
324            operation_type: self.operation_type,
325            call_count: AtomicU64::new(self.call_count.load(Ordering::Relaxed)),
326            total_duration_us: AtomicU64::new(self.total_duration_us.load(Ordering::Relaxed)),
327            error_count: AtomicU64::new(self.error_count.load(Ordering::Relaxed)),
328        }
329    }
330}
331
332impl OperationMetrics {
333    /// Create new operation metrics
334    pub fn new(operation_type: OperationType) -> Self {
335        Self {
336            operation_type,
337            call_count: AtomicU64::new(0),
338            total_duration_us: AtomicU64::new(0),
339            error_count: AtomicU64::new(0),
340        }
341    }
342
343    /// Record an execution
344    pub fn record(&self, duration: Duration) {
345        self.call_count.fetch_add(1, Ordering::Relaxed);
346        self.total_duration_us
347            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
348    }
349
350    /// Record an error
351    pub fn record_error(&self) {
352        self.error_count.fetch_add(1, Ordering::Relaxed);
353    }
354
355    /// Get average duration
356    pub fn average_duration(&self) -> Option<Duration> {
357        let count = self.call_count.load(Ordering::Relaxed);
358        if count == 0 {
359            return None;
360        }
361
362        let total_us = self.total_duration_us.load(Ordering::Relaxed);
363        Some(Duration::from_micros(total_us / count))
364    }
365
366    /// Get error rate
367    pub fn error_rate(&self) -> f64 {
368        let count = self.call_count.load(Ordering::Relaxed);
369        if count == 0 {
370            return 0.0;
371        }
372
373        self.error_count.load(Ordering::Relaxed) as f64 / count as f64
374    }
375}
376
377// Implement Clone for AtomicU64 by loading values
378impl Clone for QueryStats {
379    fn clone(&self) -> Self {
380        Self {
381            query_count: AtomicU64::new(self.query_count.load(Ordering::Relaxed)),
382            mutation_count: AtomicU64::new(self.mutation_count.load(Ordering::Relaxed)),
383            subscription_count: AtomicU64::new(self.subscription_count.load(Ordering::Relaxed)),
384            total_duration_us: AtomicU64::new(self.total_duration_us.load(Ordering::Relaxed)),
385            min_duration_us: AtomicU64::new(self.min_duration_us.load(Ordering::Relaxed)),
386            max_duration_us: AtomicU64::new(self.max_duration_us.load(Ordering::Relaxed)),
387            latency_buckets: Mutex::new(self.latency_buckets.lock().unwrap().clone()),
388        }
389    }
390}
391
392/// Latency histogram for percentile calculations
393#[derive(Debug, Clone)]
394struct LatencyHistogram {
395    /// Bucket boundaries (in microseconds)
396    boundaries: Vec<u64>,
397    /// Bucket counts
398    counts: Vec<u64>,
399    /// All values for percentile calculation (limited size)
400    values: Vec<u64>,
401    /// Maximum values to store
402    max_values: usize,
403}
404
405impl LatencyHistogram {
406    /// Create a new histogram
407    fn new() -> Self {
408        // Boundaries: 100us, 500us, 1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s, 5s
409        let boundaries = vec![
410            100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000, 5_000_000,
411        ];
412        let counts = vec![0u64; boundaries.len() + 1];
413
414        Self {
415            boundaries,
416            counts,
417            values: Vec::new(),
418            max_values: 10000,
419        }
420    }
421
422    /// Record a value
423    fn record(&mut self, value_us: u64) {
424        // Update bucket
425        let bucket = self
426            .boundaries
427            .iter()
428            .position(|&b| value_us <= b)
429            .unwrap_or(self.boundaries.len());
430        self.counts[bucket] += 1;
431
432        // Store value for percentile calculation
433        if self.values.len() < self.max_values {
434            self.values.push(value_us);
435        } else {
436            // Reservoir sampling
437            let idx = rand_index(self.values.len() + 1);
438            if idx < self.values.len() {
439                self.values[idx] = value_us;
440            }
441        }
442    }
443
444    /// Get percentile value
445    fn percentile(&self, p: f64) -> Option<Duration> {
446        if self.values.is_empty() {
447            return None;
448        }
449
450        let mut sorted = self.values.clone();
451        sorted.sort_unstable();
452
453        let idx = ((p / 100.0) * (sorted.len() - 1) as f64) as usize;
454        Some(Duration::from_micros(sorted[idx]))
455    }
456
457    /// Reset histogram
458    fn reset(&mut self) {
459        for count in &mut self.counts {
460            *count = 0;
461        }
462        self.values.clear();
463    }
464}
465
466/// Simple random index for reservoir sampling
467fn rand_index(max: usize) -> usize {
468    use std::time::SystemTime;
469    let seed = SystemTime::now()
470        .duration_since(SystemTime::UNIX_EPOCH)
471        .unwrap()
472        .subsec_nanos() as usize;
473    seed % max
474}
475
476/// Metrics reporter trait
477pub trait MetricsReporter: Send + Sync {
478    /// Report metrics
479    fn report(&self, metrics: &GraphQLMetrics);
480}
481
482/// Console metrics reporter
483pub struct ConsoleReporter;
484
485impl MetricsReporter for ConsoleReporter {
486    fn report(&self, metrics: &GraphQLMetrics) {
487        let stats = metrics.query_stats();
488
489        println!("=== GraphQL Metrics ===");
490        println!("Queries: {}", stats.query_count.load(Ordering::Relaxed));
491        println!(
492            "Mutations: {}",
493            stats.mutation_count.load(Ordering::Relaxed)
494        );
495        println!(
496            "Subscriptions: {}",
497            stats.subscription_count.load(Ordering::Relaxed)
498        );
499
500        if let Some(avg) = stats.average_duration() {
501            println!("Avg latency: {:?}", avg);
502        }
503        if let Some(min) = stats.min_duration() {
504            println!("Min latency: {:?}", min);
505        }
506        if let Some(max) = stats.max_duration() {
507            println!("Max latency: {:?}", max);
508        }
509
510        println!("Errors: {:?}", metrics.error_counts());
511    }
512}
513
514#[cfg(test)]
515mod tests {
516    use super::*;
517
518    #[test]
519    fn test_query_stats_recording() {
520        let stats = QueryStats::new();
521
522        stats.record(Duration::from_millis(10), OperationType::Query);
523        stats.record(Duration::from_millis(20), OperationType::Query);
524        stats.record(Duration::from_millis(5), OperationType::Mutation);
525
526        assert_eq!(stats.query_count.load(Ordering::Relaxed), 2);
527        assert_eq!(stats.mutation_count.load(Ordering::Relaxed), 1);
528        assert_eq!(stats.total_count(), 3);
529    }
530
531    #[test]
532    fn test_query_stats_duration() {
533        let stats = QueryStats::new();
534
535        stats.record(Duration::from_millis(10), OperationType::Query);
536        stats.record(Duration::from_millis(20), OperationType::Query);
537        stats.record(Duration::from_millis(30), OperationType::Query);
538
539        assert_eq!(stats.min_duration(), Some(Duration::from_millis(10)));
540        assert_eq!(stats.max_duration(), Some(Duration::from_millis(30)));
541        assert_eq!(stats.average_duration(), Some(Duration::from_millis(20)));
542    }
543
544    #[test]
545    fn test_graphql_metrics() {
546        let metrics = GraphQLMetrics::new();
547
548        metrics.record_query(Duration::from_millis(10), OperationType::Query);
549        metrics.record_operation("GetUsers", Duration::from_millis(10), OperationType::Query);
550
551        assert_eq!(metrics.query_stats().total_count(), 1);
552        assert!(metrics.operation_metrics("GetUsers").is_some());
553    }
554
555    #[test]
556    fn test_operation_metrics() {
557        let metrics = OperationMetrics::new(OperationType::Query);
558
559        metrics.record(Duration::from_millis(10));
560        metrics.record(Duration::from_millis(20));
561        metrics.record_error();
562
563        assert_eq!(metrics.call_count.load(Ordering::Relaxed), 2);
564        assert_eq!(metrics.error_count.load(Ordering::Relaxed), 1);
565        assert_eq!(metrics.average_duration(), Some(Duration::from_millis(15)));
566        assert_eq!(metrics.error_rate(), 0.5);
567    }
568
569    #[test]
570    fn test_error_recording() {
571        let metrics = GraphQLMetrics::new();
572
573        let error1 = super::super::GraphQLError::parse_error("error1");
574        let error2 = super::super::GraphQLError::parse_error("error2");
575        let error3 = super::super::GraphQLError::validation_error("error3");
576
577        metrics.record_error(&error1);
578        metrics.record_error(&error2);
579        metrics.record_error(&error3);
580
581        let counts = metrics.error_counts();
582        assert_eq!(counts.get(&ErrorCode::ParseError), Some(&2));
583        assert_eq!(counts.get(&ErrorCode::ValidationError), Some(&1));
584    }
585
586    #[test]
587    fn test_prometheus_export() {
588        let metrics = GraphQLMetrics::new();
589
590        metrics.record_query(Duration::from_millis(10), OperationType::Query);
591        metrics.record_query(Duration::from_millis(5), OperationType::Mutation);
592
593        let output = metrics.to_prometheus();
594
595        assert!(output.contains("helios_graphql_queries_total"));
596        assert!(output.contains("helios_graphql_latency_ms"));
597    }
598
599    #[test]
600    fn test_metrics_reset() {
601        let metrics = GraphQLMetrics::new();
602
603        metrics.record_query(Duration::from_millis(10), OperationType::Query);
604        metrics.record_operation("GetUsers", Duration::from_millis(10), OperationType::Query);
605
606        assert_eq!(metrics.query_stats().total_count(), 1);
607
608        metrics.reset();
609
610        assert_eq!(metrics.query_stats().total_count(), 0);
611        assert!(metrics.all_operations().is_empty());
612    }
613
614    #[test]
615    fn test_latency_histogram_percentile() {
616        let mut histogram = LatencyHistogram::new();
617
618        for i in 1..=100 {
619            histogram.record(i * 1000); // 1ms to 100ms
620        }
621
622        let p50 = histogram.percentile(50.0).unwrap();
623        let p99 = histogram.percentile(99.0).unwrap();
624
625        // p50 should be around 50ms
626        assert!(p50.as_millis() >= 45 && p50.as_millis() <= 55);
627
628        // p99 should be around 99ms
629        assert!(p99.as_millis() >= 95);
630    }
631
632    #[test]
633    fn test_query_stats_empty() {
634        let stats = QueryStats::new();
635
636        assert_eq!(stats.total_count(), 0);
637        assert!(stats.average_duration().is_none());
638        assert!(stats.min_duration().is_none());
639        assert!(stats.max_duration().is_none());
640    }
641
642    #[test]
643    fn test_metrics_uptime() {
644        let metrics = GraphQLMetrics::new();
645
646        std::thread::sleep(Duration::from_millis(10));
647
648        let uptime = metrics.uptime();
649        assert!(uptime >= Duration::from_millis(10));
650    }
651
652    #[test]
653    fn test_operation_metrics_error_rate_zero() {
654        let metrics = OperationMetrics::new(OperationType::Query);
655
656        assert_eq!(metrics.error_rate(), 0.0);
657    }
658}