Skip to main content

grapsus_agent_protocol/v2/
observability.rs

1//! Observability infrastructure for Protocol v2.
2//!
3//! This module provides:
4//! - Metrics collection from agents
5//! - Metrics aggregation across agents
6//! - Prometheus-compatible export
7//! - Config update handling
8
9use crate::v2::control::{ConfigUpdateRequest, ConfigUpdateResponse, ConfigUpdateType};
10use crate::v2::metrics::{HistogramBucket, MetricsReport};
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::time::{Duration, Instant};
14
15/// Metrics collector that receives and aggregates metrics from agents.
16#[derive(Debug)]
17pub struct MetricsCollector {
18    /// Aggregated counters by (agent_id, metric_name, labels_key)
19    counters: RwLock<HashMap<MetricKey, AggregatedCounter>>,
20    /// Aggregated gauges by (agent_id, metric_name, labels_key)
21    gauges: RwLock<HashMap<MetricKey, AggregatedGauge>>,
22    /// Aggregated histograms by (agent_id, metric_name, labels_key)
23    histograms: RwLock<HashMap<MetricKey, AggregatedHistogram>>,
24    /// Last report time per agent
25    last_report: RwLock<HashMap<String, Instant>>,
26    /// Configuration
27    config: MetricsCollectorConfig,
28}
29
30/// Configuration for the metrics collector.
31#[derive(Debug, Clone)]
32pub struct MetricsCollectorConfig {
33    /// Maximum age of metrics before expiry
34    pub max_age: Duration,
35    /// Maximum number of unique metric series
36    pub max_series: usize,
37    /// Whether to include agent_id as a label
38    pub include_agent_id_label: bool,
39}
40
41impl Default for MetricsCollectorConfig {
42    fn default() -> Self {
43        Self {
44            max_age: Duration::from_secs(300), // 5 minutes
45            max_series: 10_000,
46            include_agent_id_label: true,
47        }
48    }
49}
50
51/// Key for identifying a unique metric series.
52#[derive(Debug, Clone, Hash, PartialEq, Eq)]
53struct MetricKey {
54    agent_id: String,
55    name: String,
56    labels_key: String,
57}
58
59impl MetricKey {
60    fn new(agent_id: &str, name: &str, labels: &HashMap<String, String>) -> Self {
61        let labels_key = Self::labels_to_key(labels);
62        Self {
63            agent_id: agent_id.to_string(),
64            name: name.to_string(),
65            labels_key,
66        }
67    }
68
69    fn labels_to_key(labels: &HashMap<String, String>) -> String {
70        let mut pairs: Vec<_> = labels.iter().collect();
71        pairs.sort_by_key(|(k, _)| *k);
72        pairs
73            .into_iter()
74            .map(|(k, v)| format!("{}={}", k, v))
75            .collect::<Vec<_>>()
76            .join(",")
77    }
78}
79
80/// Aggregated counter value.
81#[derive(Debug, Clone)]
82struct AggregatedCounter {
83    name: String,
84    help: Option<String>,
85    labels: HashMap<String, String>,
86    value: u64,
87    last_updated: Instant,
88}
89
90/// Aggregated gauge value.
91#[derive(Debug, Clone)]
92struct AggregatedGauge {
93    name: String,
94    help: Option<String>,
95    labels: HashMap<String, String>,
96    value: f64,
97    last_updated: Instant,
98}
99
100/// Aggregated histogram.
101#[derive(Debug, Clone)]
102struct AggregatedHistogram {
103    name: String,
104    help: Option<String>,
105    labels: HashMap<String, String>,
106    sum: f64,
107    count: u64,
108    buckets: Vec<HistogramBucket>,
109    last_updated: Instant,
110}
111
112impl MetricsCollector {
113    /// Create a new metrics collector with default configuration.
114    pub fn new() -> Self {
115        Self::with_config(MetricsCollectorConfig::default())
116    }
117
118    /// Create a new metrics collector with custom configuration.
119    pub fn with_config(config: MetricsCollectorConfig) -> Self {
120        Self {
121            counters: RwLock::new(HashMap::new()),
122            gauges: RwLock::new(HashMap::new()),
123            histograms: RwLock::new(HashMap::new()),
124            last_report: RwLock::new(HashMap::new()),
125            config,
126        }
127    }
128
129    /// Record a metrics report from an agent.
130    pub fn record(&self, report: &MetricsReport) {
131        let now = Instant::now();
132
133        // Update last report time
134        self.last_report
135            .write()
136            .insert(report.agent_id.clone(), now);
137
138        // Process counters
139        for counter in &report.counters {
140            let mut labels = counter.labels.clone();
141            if self.config.include_agent_id_label {
142                labels.insert("agent_id".to_string(), report.agent_id.clone());
143            }
144
145            let key = MetricKey::new(&report.agent_id, &counter.name, &labels);
146
147            let mut counters = self.counters.write();
148            counters.insert(
149                key,
150                AggregatedCounter {
151                    name: counter.name.clone(),
152                    help: counter.help.clone(),
153                    labels,
154                    value: counter.value,
155                    last_updated: now,
156                },
157            );
158        }
159
160        // Process gauges
161        for gauge in &report.gauges {
162            let mut labels = gauge.labels.clone();
163            if self.config.include_agent_id_label {
164                labels.insert("agent_id".to_string(), report.agent_id.clone());
165            }
166
167            let key = MetricKey::new(&report.agent_id, &gauge.name, &labels);
168
169            let mut gauges = self.gauges.write();
170            gauges.insert(
171                key,
172                AggregatedGauge {
173                    name: gauge.name.clone(),
174                    help: gauge.help.clone(),
175                    labels,
176                    value: gauge.value,
177                    last_updated: now,
178                },
179            );
180        }
181
182        // Process histograms
183        for histogram in &report.histograms {
184            let mut labels = histogram.labels.clone();
185            if self.config.include_agent_id_label {
186                labels.insert("agent_id".to_string(), report.agent_id.clone());
187            }
188
189            let key = MetricKey::new(&report.agent_id, &histogram.name, &labels);
190
191            let mut histograms = self.histograms.write();
192            histograms.insert(
193                key,
194                AggregatedHistogram {
195                    name: histogram.name.clone(),
196                    help: histogram.help.clone(),
197                    labels,
198                    sum: histogram.sum,
199                    count: histogram.count,
200                    buckets: histogram.buckets.clone(),
201                    last_updated: now,
202                },
203            );
204        }
205    }
206
207    /// Remove expired metrics.
208    pub fn expire_old_metrics(&self) {
209        let now = Instant::now();
210        let max_age = self.config.max_age;
211
212        self.counters
213            .write()
214            .retain(|_, v| now.duration_since(v.last_updated) < max_age);
215        self.gauges
216            .write()
217            .retain(|_, v| now.duration_since(v.last_updated) < max_age);
218        self.histograms
219            .write()
220            .retain(|_, v| now.duration_since(v.last_updated) < max_age);
221    }
222
223    /// Get the number of active metric series.
224    pub fn series_count(&self) -> usize {
225        self.counters.read().len() + self.gauges.read().len() + self.histograms.read().len()
226    }
227
228    /// Get active agent IDs.
229    pub fn active_agents(&self) -> Vec<String> {
230        self.last_report.read().keys().cloned().collect()
231    }
232
233    /// Export metrics in Prometheus text format.
234    pub fn export_prometheus(&self) -> String {
235        let mut output = String::new();
236
237        // Export counters
238        let counters = self.counters.read();
239        let mut counter_names: Vec<_> = counters.values().map(|c| &c.name).collect();
240        counter_names.sort();
241        counter_names.dedup();
242
243        for name in counter_names {
244            let metrics: Vec<_> = counters.values().filter(|c| &c.name == name).collect();
245            if let Some(first) = metrics.first() {
246                if let Some(help) = &first.help {
247                    output.push_str(&format!("# HELP {} {}\n", name, help));
248                }
249                output.push_str(&format!("# TYPE {} counter\n", name));
250            }
251            for metric in metrics {
252                output.push_str(&format_metric_line(
253                    name,
254                    &metric.labels,
255                    metric.value as f64,
256                ));
257            }
258        }
259
260        // Export gauges
261        let gauges = self.gauges.read();
262        let mut gauge_names: Vec<_> = gauges.values().map(|g| &g.name).collect();
263        gauge_names.sort();
264        gauge_names.dedup();
265
266        for name in gauge_names {
267            let metrics: Vec<_> = gauges.values().filter(|g| &g.name == name).collect();
268            if let Some(first) = metrics.first() {
269                if let Some(help) = &first.help {
270                    output.push_str(&format!("# HELP {} {}\n", name, help));
271                }
272                output.push_str(&format!("# TYPE {} gauge\n", name));
273            }
274            for metric in metrics {
275                output.push_str(&format_metric_line(name, &metric.labels, metric.value));
276            }
277        }
278
279        // Export histograms
280        let histograms = self.histograms.read();
281        let mut histogram_names: Vec<_> = histograms.values().map(|h| &h.name).collect();
282        histogram_names.sort();
283        histogram_names.dedup();
284
285        for name in histogram_names {
286            let metrics: Vec<_> = histograms.values().filter(|h| &h.name == name).collect();
287            if let Some(first) = metrics.first() {
288                if let Some(help) = &first.help {
289                    output.push_str(&format!("# HELP {} {}\n", name, help));
290                }
291                output.push_str(&format!("# TYPE {} histogram\n", name));
292            }
293            for metric in metrics {
294                // Buckets
295                for bucket in &metric.buckets {
296                    let mut labels = metric.labels.clone();
297                    labels.insert(
298                        "le".to_string(),
299                        if bucket.le.is_infinite() {
300                            "+Inf".to_string()
301                        } else {
302                            bucket.le.to_string()
303                        },
304                    );
305                    output.push_str(&format_metric_line(
306                        &format!("{}_bucket", name),
307                        &labels,
308                        bucket.count as f64,
309                    ));
310                }
311                // Sum and count
312                output.push_str(&format_metric_line(
313                    &format!("{}_sum", name),
314                    &metric.labels,
315                    metric.sum,
316                ));
317                output.push_str(&format_metric_line(
318                    &format!("{}_count", name),
319                    &metric.labels,
320                    metric.count as f64,
321                ));
322            }
323        }
324
325        output
326    }
327
328    /// Get a snapshot of all metrics.
329    pub fn snapshot(&self) -> MetricsSnapshot {
330        MetricsSnapshot {
331            counters: self.counters.read().values().cloned().collect(),
332            gauges: self.gauges.read().values().cloned().collect(),
333            histograms: self.histograms.read().values().cloned().collect(),
334            timestamp: Instant::now(),
335        }
336    }
337}
338
339impl Default for MetricsCollector {
340    fn default() -> Self {
341        Self::new()
342    }
343}
344
345/// Snapshot of all metrics at a point in time.
346#[derive(Debug)]
347pub struct MetricsSnapshot {
348    counters: Vec<AggregatedCounter>,
349    gauges: Vec<AggregatedGauge>,
350    histograms: Vec<AggregatedHistogram>,
351    timestamp: Instant,
352}
353
354impl MetricsSnapshot {
355    /// Get counter values.
356    pub fn counter_count(&self) -> usize {
357        self.counters.len()
358    }
359
360    /// Get gauge values.
361    pub fn gauge_count(&self) -> usize {
362        self.gauges.len()
363    }
364
365    /// Get histogram count.
366    pub fn histogram_count(&self) -> usize {
367        self.histograms.len()
368    }
369}
370
371/// Unified metrics aggregator that combines metrics from multiple sources.
372///
373/// This aggregator collects metrics from:
374/// - The proxy itself (request counts, latencies, etc.)
375/// - Connected agents (via MetricsCollector)
376/// - System metrics (optional)
377#[derive(Debug)]
378pub struct UnifiedMetricsAggregator {
379    /// Proxy-level counters
380    proxy_counters: RwLock<HashMap<String, ProxyCounter>>,
381    /// Proxy-level gauges
382    proxy_gauges: RwLock<HashMap<String, ProxyGauge>>,
383    /// Proxy-level histograms
384    proxy_histograms: RwLock<HashMap<String, ProxyHistogram>>,
385    /// Agent metrics collector
386    agent_collector: MetricsCollector,
387    /// Service name for labeling
388    service_name: String,
389    /// Instance identifier
390    instance_id: String,
391}
392
393/// Proxy-level counter metric.
394#[derive(Debug, Clone)]
395struct ProxyCounter {
396    name: String,
397    help: String,
398    labels: HashMap<String, String>,
399    value: u64,
400}
401
402/// Proxy-level gauge metric.
403#[derive(Debug, Clone)]
404struct ProxyGauge {
405    name: String,
406    help: String,
407    labels: HashMap<String, String>,
408    value: f64,
409}
410
411/// Proxy-level histogram metric.
412#[derive(Debug, Clone)]
413struct ProxyHistogram {
414    name: String,
415    help: String,
416    labels: HashMap<String, String>,
417    sum: f64,
418    count: u64,
419    buckets: Vec<(f64, u64)>,
420}
421
422impl UnifiedMetricsAggregator {
423    /// Create a new unified metrics aggregator.
424    pub fn new(service_name: impl Into<String>, instance_id: impl Into<String>) -> Self {
425        Self {
426            proxy_counters: RwLock::new(HashMap::new()),
427            proxy_gauges: RwLock::new(HashMap::new()),
428            proxy_histograms: RwLock::new(HashMap::new()),
429            agent_collector: MetricsCollector::new(),
430            service_name: service_name.into(),
431            instance_id: instance_id.into(),
432        }
433    }
434
435    /// Create with custom agent collector config.
436    pub fn with_agent_config(
437        service_name: impl Into<String>,
438        instance_id: impl Into<String>,
439        agent_config: MetricsCollectorConfig,
440    ) -> Self {
441        Self {
442            proxy_counters: RwLock::new(HashMap::new()),
443            proxy_gauges: RwLock::new(HashMap::new()),
444            proxy_histograms: RwLock::new(HashMap::new()),
445            agent_collector: MetricsCollector::with_config(agent_config),
446            service_name: service_name.into(),
447            instance_id: instance_id.into(),
448        }
449    }
450
451    /// Get the agent metrics collector.
452    pub fn agent_collector(&self) -> &MetricsCollector {
453        &self.agent_collector
454    }
455
456    /// Increment a proxy counter.
457    pub fn increment_counter(
458        &self,
459        name: &str,
460        help: &str,
461        labels: HashMap<String, String>,
462        delta: u64,
463    ) {
464        let key = Self::metric_key(name, &labels);
465        let mut counters = self.proxy_counters.write();
466
467        if let Some(counter) = counters.get_mut(&key) {
468            counter.value += delta;
469        } else {
470            counters.insert(
471                key,
472                ProxyCounter {
473                    name: name.to_string(),
474                    help: help.to_string(),
475                    labels,
476                    value: delta,
477                },
478            );
479        }
480    }
481
482    /// Set a proxy gauge.
483    pub fn set_gauge(&self, name: &str, help: &str, labels: HashMap<String, String>, value: f64) {
484        let key = Self::metric_key(name, &labels);
485        self.proxy_gauges.write().insert(
486            key,
487            ProxyGauge {
488                name: name.to_string(),
489                help: help.to_string(),
490                labels,
491                value,
492            },
493        );
494    }
495
496    /// Record a histogram observation.
497    pub fn observe_histogram(
498        &self,
499        name: &str,
500        help: &str,
501        labels: HashMap<String, String>,
502        bucket_boundaries: &[f64],
503        value: f64,
504    ) {
505        let key = Self::metric_key(name, &labels);
506        let mut histograms = self.proxy_histograms.write();
507
508        if let Some(histogram) = histograms.get_mut(&key) {
509            histogram.sum += value;
510            histogram.count += 1;
511            // Update bucket counts
512            for (boundary, count) in histogram.buckets.iter_mut() {
513                if value <= *boundary {
514                    *count += 1;
515                }
516            }
517        } else {
518            // Initialize buckets
519            let mut buckets: Vec<(f64, u64)> = bucket_boundaries
520                .iter()
521                .map(|&b| (b, if value <= b { 1 } else { 0 }))
522                .collect();
523            buckets.push((f64::INFINITY, 1)); // +Inf bucket always includes all observations
524
525            histograms.insert(
526                key,
527                ProxyHistogram {
528                    name: name.to_string(),
529                    help: help.to_string(),
530                    labels,
531                    sum: value,
532                    count: 1,
533                    buckets,
534                },
535            );
536        }
537    }
538
539    /// Record agent metrics.
540    pub fn record_agent_metrics(&self, report: &MetricsReport) {
541        self.agent_collector.record(report);
542    }
543
544    /// Export all metrics in Prometheus text format.
545    pub fn export_prometheus(&self) -> String {
546        let mut output = String::new();
547
548        // Add service info metric
549        output.push_str(
550            "# HELP grapsus_info Grapsus proxy information\n# TYPE grapsus_info gauge\n",
551        );
552        output.push_str(&format!(
553            "grapsus_info{{service=\"{}\",instance=\"{}\"}} 1\n",
554            escape_label_value(&self.service_name),
555            escape_label_value(&self.instance_id)
556        ));
557
558        // Export proxy counters
559        let counters = self.proxy_counters.read();
560        let mut counter_names: Vec<_> = counters.values().map(|c| &c.name).collect();
561        counter_names.sort();
562        counter_names.dedup();
563
564        for name in counter_names {
565            let metrics: Vec<_> = counters.values().filter(|c| &c.name == name).collect();
566            if let Some(first) = metrics.first() {
567                output.push_str(&format!("# HELP {} {}\n", name, first.help));
568                output.push_str(&format!("# TYPE {} counter\n", name));
569            }
570            for metric in metrics {
571                output.push_str(&format_metric_line(
572                    name,
573                    &metric.labels,
574                    metric.value as f64,
575                ));
576            }
577        }
578
579        // Export proxy gauges
580        let gauges = self.proxy_gauges.read();
581        let mut gauge_names: Vec<_> = gauges.values().map(|g| &g.name).collect();
582        gauge_names.sort();
583        gauge_names.dedup();
584
585        for name in gauge_names {
586            let metrics: Vec<_> = gauges.values().filter(|g| &g.name == name).collect();
587            if let Some(first) = metrics.first() {
588                output.push_str(&format!("# HELP {} {}\n", name, first.help));
589                output.push_str(&format!("# TYPE {} gauge\n", name));
590            }
591            for metric in metrics {
592                output.push_str(&format_metric_line(name, &metric.labels, metric.value));
593            }
594        }
595
596        // Export proxy histograms
597        let histograms = self.proxy_histograms.read();
598        let mut histogram_names: Vec<_> = histograms.values().map(|h| &h.name).collect();
599        histogram_names.sort();
600        histogram_names.dedup();
601
602        for name in histogram_names {
603            let metrics: Vec<_> = histograms.values().filter(|h| &h.name == name).collect();
604            if let Some(first) = metrics.first() {
605                output.push_str(&format!("# HELP {} {}\n", name, first.help));
606                output.push_str(&format!("# TYPE {} histogram\n", name));
607            }
608            for metric in metrics {
609                // Buckets
610                for (le, count) in &metric.buckets {
611                    let mut labels = metric.labels.clone();
612                    labels.insert(
613                        "le".to_string(),
614                        if le.is_infinite() {
615                            "+Inf".to_string()
616                        } else {
617                            le.to_string()
618                        },
619                    );
620                    output.push_str(&format_metric_line(
621                        &format!("{}_bucket", name),
622                        &labels,
623                        *count as f64,
624                    ));
625                }
626                // Sum and count
627                output.push_str(&format_metric_line(
628                    &format!("{}_sum", name),
629                    &metric.labels,
630                    metric.sum,
631                ));
632                output.push_str(&format_metric_line(
633                    &format!("{}_count", name),
634                    &metric.labels,
635                    metric.count as f64,
636                ));
637            }
638        }
639
640        // Export agent metrics
641        output.push_str("\n# Agent metrics\n");
642        output.push_str(&self.agent_collector.export_prometheus());
643
644        output
645    }
646
647    /// Get total metric series count.
648    pub fn series_count(&self) -> usize {
649        self.proxy_counters.read().len()
650            + self.proxy_gauges.read().len()
651            + self.proxy_histograms.read().len()
652            + self.agent_collector.series_count()
653    }
654
655    fn metric_key(name: &str, labels: &HashMap<String, String>) -> String {
656        let mut pairs: Vec<_> = labels.iter().collect();
657        pairs.sort_by_key(|(k, _)| *k);
658        let labels_str = pairs
659            .into_iter()
660            .map(|(k, v)| format!("{}={}", k, v))
661            .collect::<Vec<_>>()
662            .join(",");
663        format!("{}|{}", name, labels_str)
664    }
665}
666
667impl Default for UnifiedMetricsAggregator {
668    fn default() -> Self {
669        Self::new("grapsus", "default")
670    }
671}
672
673/// Format a single metric line in Prometheus format.
674fn format_metric_line(name: &str, labels: &HashMap<String, String>, value: f64) -> String {
675    if labels.is_empty() {
676        format!("{} {}\n", name, format_value(value))
677    } else {
678        let mut pairs: Vec<_> = labels.iter().collect();
679        pairs.sort_by_key(|(k, _)| *k);
680        let labels_str = pairs
681            .into_iter()
682            .map(|(k, v)| format!("{}=\"{}\"", k, escape_label_value(v)))
683            .collect::<Vec<_>>()
684            .join(",");
685        format!("{}{{{}}} {}\n", name, labels_str, format_value(value))
686    }
687}
688
689/// Format a value for Prometheus output.
690fn format_value(v: f64) -> String {
691    if v.is_infinite() {
692        if v.is_sign_positive() {
693            "+Inf".to_string()
694        } else {
695            "-Inf".to_string()
696        }
697    } else if v.is_nan() {
698        "NaN".to_string()
699    } else if v.fract() == 0.0 {
700        format!("{}", v as i64)
701    } else {
702        format!("{}", v)
703    }
704}
705
706/// Escape a label value for Prometheus format.
707fn escape_label_value(s: &str) -> String {
708    s.replace('\\', "\\\\")
709        .replace('"', "\\\"")
710        .replace('\n', "\\n")
711}
712
713/// Handler for config updates from agents.
714pub struct ConfigUpdateHandler {
715    /// Pending updates awaiting acknowledgment
716    pending: RwLock<HashMap<String, PendingUpdate>>,
717    /// Callback for rule updates
718    #[allow(clippy::type_complexity)]
719    on_rule_update: Option<
720        Box<dyn Fn(&str, &[crate::v2::control::RuleDefinition], &[String]) -> bool + Send + Sync>,
721    >,
722    /// Callback for list updates
723    #[allow(clippy::type_complexity)]
724    on_list_update: Option<Box<dyn Fn(&str, &[String], &[String]) -> bool + Send + Sync>>,
725}
726
727struct PendingUpdate {
728    request: ConfigUpdateRequest,
729    received_at: Instant,
730}
731
732impl ConfigUpdateHandler {
733    /// Create a new config update handler.
734    pub fn new() -> Self {
735        Self {
736            pending: RwLock::new(HashMap::new()),
737            on_rule_update: None,
738            on_list_update: None,
739        }
740    }
741
742    /// Set the rule update callback.
743    pub fn on_rule_update<F>(mut self, f: F) -> Self
744    where
745        F: Fn(&str, &[crate::v2::control::RuleDefinition], &[String]) -> bool
746            + Send
747            + Sync
748            + 'static,
749    {
750        self.on_rule_update = Some(Box::new(f));
751        self
752    }
753
754    /// Set the list update callback.
755    pub fn on_list_update<F>(mut self, f: F) -> Self
756    where
757        F: Fn(&str, &[String], &[String]) -> bool + Send + Sync + 'static,
758    {
759        self.on_list_update = Some(Box::new(f));
760        self
761    }
762
763    /// Handle a config update request.
764    pub fn handle(&self, request: ConfigUpdateRequest) -> ConfigUpdateResponse {
765        let request_id = request.request_id.clone();
766
767        match &request.update_type {
768            ConfigUpdateType::RequestReload => {
769                // Store pending and return success - actual reload happens asynchronously
770                self.pending.write().insert(
771                    request_id.clone(),
772                    PendingUpdate {
773                        request,
774                        received_at: Instant::now(),
775                    },
776                );
777                ConfigUpdateResponse::success(request_id)
778            }
779            ConfigUpdateType::RuleUpdate {
780                rule_set,
781                rules,
782                remove_rules,
783            } => {
784                if let Some(ref callback) = self.on_rule_update {
785                    if callback(rule_set, rules, remove_rules) {
786                        ConfigUpdateResponse::success(request_id)
787                    } else {
788                        ConfigUpdateResponse::failure(request_id, "Rule update rejected")
789                    }
790                } else {
791                    ConfigUpdateResponse::failure(request_id, "Rule updates not supported")
792                }
793            }
794            ConfigUpdateType::ListUpdate {
795                list_id,
796                add,
797                remove,
798            } => {
799                if let Some(ref callback) = self.on_list_update {
800                    if callback(list_id, add, remove) {
801                        ConfigUpdateResponse::success(request_id)
802                    } else {
803                        ConfigUpdateResponse::failure(request_id, "List update rejected")
804                    }
805                } else {
806                    ConfigUpdateResponse::failure(request_id, "List updates not supported")
807                }
808            }
809            ConfigUpdateType::RestartRequired {
810                reason,
811                grace_period_ms,
812            } => {
813                // Log and acknowledge - actual restart is handled by orchestrator
814                tracing::warn!(
815                    reason = reason,
816                    grace_period_ms = grace_period_ms,
817                    "Agent requested restart"
818                );
819                ConfigUpdateResponse::success(request_id)
820            }
821            ConfigUpdateType::ConfigError { error, field } => {
822                tracing::error!(
823                    error = error,
824                    field = ?field,
825                    "Agent reported configuration error"
826                );
827                ConfigUpdateResponse::success(request_id)
828            }
829        }
830    }
831
832    /// Get pending update count.
833    pub fn pending_count(&self) -> usize {
834        self.pending.read().len()
835    }
836
837    /// Clear old pending updates.
838    pub fn clear_old_pending(&self, max_age: Duration) {
839        let now = Instant::now();
840        self.pending
841            .write()
842            .retain(|_, v| now.duration_since(v.received_at) < max_age);
843    }
844}
845
846impl Default for ConfigUpdateHandler {
847    fn default() -> Self {
848        Self::new()
849    }
850}
851
852// Debug implementation for ConfigUpdateHandler
853impl std::fmt::Debug for ConfigUpdateHandler {
854    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
855        f.debug_struct("ConfigUpdateHandler")
856            .field("pending_count", &self.pending.read().len())
857            .field("has_rule_callback", &self.on_rule_update.is_some())
858            .field("has_list_callback", &self.on_list_update.is_some())
859            .finish()
860    }
861}
862
863/// Configuration pusher for distributing config updates to agents.
864///
865/// This allows the proxy to push configuration changes (rules, lists, etc.)
866/// to connected agents and track acknowledgments.
867#[derive(Debug)]
868pub struct ConfigPusher {
869    /// Connected agents indexed by agent_id
870    agents: RwLock<HashMap<String, AgentConnection>>,
871    /// Pending pushes awaiting acknowledgment
872    pending_pushes: RwLock<HashMap<String, PendingPush>>,
873    /// Configuration
874    config: ConfigPusherConfig,
875    /// Sequence counter for push IDs
876    sequence: std::sync::atomic::AtomicU64,
877}
878
879/// Configuration for the config pusher.
880#[derive(Debug, Clone)]
881pub struct ConfigPusherConfig {
882    /// Maximum time to wait for acknowledgment
883    pub ack_timeout: Duration,
884    /// Maximum number of retry attempts
885    pub max_retries: usize,
886    /// Time between retries
887    pub retry_interval: Duration,
888    /// Maximum pending pushes per agent
889    pub max_pending_per_agent: usize,
890}
891
892impl Default for ConfigPusherConfig {
893    fn default() -> Self {
894        Self {
895            ack_timeout: Duration::from_secs(10),
896            max_retries: 3,
897            retry_interval: Duration::from_secs(2),
898            max_pending_per_agent: 100,
899        }
900    }
901}
902
903/// Information about a connected agent.
904#[derive(Debug, Clone)]
905pub struct AgentConnection {
906    /// Agent identifier
907    pub agent_id: String,
908    /// Agent name
909    pub name: String,
910    /// Connection time
911    pub connected_at: Instant,
912    /// Last message time
913    pub last_seen: Instant,
914    /// Number of successful config pushes
915    pub successful_pushes: u64,
916    /// Number of failed config pushes
917    pub failed_pushes: u64,
918    /// Whether the agent supports config push
919    pub supports_push: bool,
920}
921
922/// A pending configuration push.
923#[derive(Debug)]
924struct PendingPush {
925    push_id: String,
926    agent_id: String,
927    update: ConfigUpdateRequest,
928    created_at: Instant,
929    last_attempt: Instant,
930    attempts: usize,
931    status: PushStatus,
932}
933
934/// Status of a config push.
935#[derive(Debug, Clone, PartialEq, Eq)]
936pub enum PushStatus {
937    Pending,
938    Sent,
939    Acknowledged,
940    Failed { reason: String },
941    Expired,
942}
943
944/// Result of a push operation.
945#[derive(Debug)]
946pub struct PushResult {
947    pub push_id: String,
948    pub agent_id: String,
949    pub status: PushStatus,
950    pub attempts: usize,
951}
952
953impl ConfigPusher {
954    /// Create a new config pusher with default configuration.
955    pub fn new() -> Self {
956        Self::with_config(ConfigPusherConfig::default())
957    }
958
959    /// Create a new config pusher with custom configuration.
960    pub fn with_config(config: ConfigPusherConfig) -> Self {
961        Self {
962            agents: RwLock::new(HashMap::new()),
963            pending_pushes: RwLock::new(HashMap::new()),
964            config,
965            sequence: std::sync::atomic::AtomicU64::new(1),
966        }
967    }
968
969    /// Register a connected agent.
970    pub fn register_agent(
971        &self,
972        agent_id: impl Into<String>,
973        name: impl Into<String>,
974        supports_push: bool,
975    ) {
976        let agent_id = agent_id.into();
977        let now = Instant::now();
978        self.agents.write().insert(
979            agent_id.clone(),
980            AgentConnection {
981                agent_id,
982                name: name.into(),
983                connected_at: now,
984                last_seen: now,
985                successful_pushes: 0,
986                failed_pushes: 0,
987                supports_push,
988            },
989        );
990    }
991
992    /// Unregister a disconnected agent.
993    pub fn unregister_agent(&self, agent_id: &str) {
994        self.agents.write().remove(agent_id);
995        // Remove any pending pushes for this agent
996        self.pending_pushes
997            .write()
998            .retain(|_, p| p.agent_id != agent_id);
999    }
1000
1001    /// Update agent's last seen time.
1002    pub fn touch_agent(&self, agent_id: &str) {
1003        if let Some(agent) = self.agents.write().get_mut(agent_id) {
1004            agent.last_seen = Instant::now();
1005        }
1006    }
1007
1008    /// Get all connected agents.
1009    pub fn connected_agents(&self) -> Vec<AgentConnection> {
1010        self.agents.read().values().cloned().collect()
1011    }
1012
1013    /// Get agents that support config push.
1014    pub fn pushable_agents(&self) -> Vec<AgentConnection> {
1015        self.agents
1016            .read()
1017            .values()
1018            .filter(|a| a.supports_push)
1019            .cloned()
1020            .collect()
1021    }
1022
1023    /// Push a configuration update to a specific agent.
1024    pub fn push_to_agent(&self, agent_id: &str, update_type: ConfigUpdateType) -> Option<String> {
1025        let agents = self.agents.read();
1026        let agent = agents.get(agent_id)?;
1027
1028        if !agent.supports_push {
1029            return None;
1030        }
1031
1032        let push_id = self.next_push_id();
1033        let now = Instant::now();
1034
1035        let update = ConfigUpdateRequest {
1036            update_type,
1037            request_id: push_id.clone(),
1038            timestamp_ms: now_ms(),
1039        };
1040
1041        self.pending_pushes.write().insert(
1042            push_id.clone(),
1043            PendingPush {
1044                push_id: push_id.clone(),
1045                agent_id: agent_id.to_string(),
1046                update,
1047                created_at: now,
1048                last_attempt: now,
1049                attempts: 1,
1050                status: PushStatus::Sent,
1051            },
1052        );
1053
1054        Some(push_id)
1055    }
1056
1057    /// Push a configuration update to all pushable agents.
1058    pub fn push_to_all(&self, update_type: ConfigUpdateType) -> Vec<String> {
1059        let pushable = self.pushable_agents();
1060        let mut push_ids = Vec::with_capacity(pushable.len());
1061
1062        for agent in pushable {
1063            if let Some(push_id) = self.push_to_agent(&agent.agent_id, update_type.clone()) {
1064                push_ids.push(push_id);
1065            }
1066        }
1067
1068        push_ids
1069    }
1070
1071    /// Acknowledge a config push.
1072    pub fn acknowledge(&self, push_id: &str, accepted: bool, error: Option<String>) {
1073        let mut pending = self.pending_pushes.write();
1074        if let Some(push) = pending.get_mut(push_id) {
1075            if accepted {
1076                push.status = PushStatus::Acknowledged;
1077                // Update agent stats
1078                if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
1079                    agent.successful_pushes += 1;
1080                }
1081            } else {
1082                push.status = PushStatus::Failed {
1083                    reason: error.unwrap_or_else(|| "Unknown error".to_string()),
1084                };
1085                // Update agent stats
1086                if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
1087                    agent.failed_pushes += 1;
1088                }
1089            }
1090        }
1091    }
1092
1093    /// Get pushes that need to be retried.
1094    pub fn get_retryable(&self) -> Vec<(String, ConfigUpdateRequest)> {
1095        let now = Instant::now();
1096        let mut retryable = Vec::new();
1097        let mut pending = self.pending_pushes.write();
1098
1099        for push in pending.values_mut() {
1100            if push.status == PushStatus::Sent
1101                && now.duration_since(push.last_attempt) >= self.config.retry_interval
1102                && push.attempts < self.config.max_retries
1103            {
1104                push.attempts += 1;
1105                push.last_attempt = now;
1106                retryable.push((push.agent_id.clone(), push.update.clone()));
1107            }
1108        }
1109
1110        retryable
1111    }
1112
1113    /// Expire old pending pushes.
1114    pub fn expire_old(&self) {
1115        let now = Instant::now();
1116        let mut pending = self.pending_pushes.write();
1117
1118        for push in pending.values_mut() {
1119            if push.status == PushStatus::Sent
1120                && (now.duration_since(push.created_at) >= self.config.ack_timeout
1121                    || push.attempts >= self.config.max_retries)
1122            {
1123                push.status = PushStatus::Expired;
1124                // Update agent stats
1125                if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
1126                    agent.failed_pushes += 1;
1127                }
1128            }
1129        }
1130
1131        // Remove completed or expired pushes older than 1 minute
1132        let cleanup_age = Duration::from_secs(60);
1133        pending.retain(|_, p| {
1134            now.duration_since(p.created_at) < cleanup_age
1135                || matches!(p.status, PushStatus::Pending | PushStatus::Sent)
1136        });
1137    }
1138
1139    /// Get push results.
1140    pub fn get_results(&self) -> Vec<PushResult> {
1141        self.pending_pushes
1142            .read()
1143            .values()
1144            .map(|p| PushResult {
1145                push_id: p.push_id.clone(),
1146                agent_id: p.agent_id.clone(),
1147                status: p.status.clone(),
1148                attempts: p.attempts,
1149            })
1150            .collect()
1151    }
1152
1153    /// Get pending push count.
1154    pub fn pending_count(&self) -> usize {
1155        self.pending_pushes
1156            .read()
1157            .values()
1158            .filter(|p| matches!(p.status, PushStatus::Pending | PushStatus::Sent))
1159            .count()
1160    }
1161
1162    fn next_push_id(&self) -> String {
1163        let seq = self
1164            .sequence
1165            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1166        format!("push-{}", seq)
1167    }
1168}
1169
1170impl Default for ConfigPusher {
1171    fn default() -> Self {
1172        Self::new()
1173    }
1174}
1175
1176fn now_ms() -> u64 {
1177    std::time::SystemTime::now()
1178        .duration_since(std::time::UNIX_EPOCH)
1179        .map(|d| d.as_millis() as u64)
1180        .unwrap_or(0)
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185    use super::*;
1186    use crate::v2::metrics::{standard, CounterMetric, GaugeMetric, HistogramMetric};
1187
1188    #[test]
1189    fn test_metrics_collector_basic() {
1190        let collector = MetricsCollector::new();
1191
1192        let mut report = MetricsReport::new("test-agent", 10_000);
1193        report
1194            .counters
1195            .push(CounterMetric::new(standard::REQUESTS_TOTAL, 100));
1196        report
1197            .gauges
1198            .push(GaugeMetric::new(standard::IN_FLIGHT_REQUESTS, 5.0));
1199
1200        collector.record(&report);
1201
1202        assert_eq!(collector.series_count(), 2);
1203        assert_eq!(collector.active_agents(), vec!["test-agent"]);
1204    }
1205
1206    #[test]
1207    fn test_metrics_collector_with_labels() {
1208        let collector = MetricsCollector::new();
1209
1210        let mut report = MetricsReport::new("agent-1", 10_000);
1211        let mut counter = CounterMetric::new(standard::REQUESTS_TOTAL, 50);
1212        counter
1213            .labels
1214            .insert("route".to_string(), "/api".to_string());
1215        report.counters.push(counter);
1216
1217        collector.record(&report);
1218
1219        let prometheus = collector.export_prometheus();
1220        assert!(prometheus.contains("agent_requests_total"));
1221        assert!(prometheus.contains("route=\"/api\""));
1222        assert!(prometheus.contains("agent_id=\"agent-1\""));
1223    }
1224
1225    #[test]
1226    fn test_prometheus_export() {
1227        let collector = MetricsCollector::new();
1228
1229        let mut report = MetricsReport::new("test", 10_000);
1230        let mut counter = CounterMetric::new("http_requests_total", 123);
1231        counter.help = Some("Total HTTP requests".to_string());
1232        report.counters.push(counter);
1233
1234        collector.record(&report);
1235
1236        let output = collector.export_prometheus();
1237        assert!(output.contains("# HELP http_requests_total Total HTTP requests"));
1238        assert!(output.contains("# TYPE http_requests_total counter"));
1239        assert!(output.contains("123"));
1240    }
1241
1242    #[test]
1243    fn test_histogram_export() {
1244        // Use config without agent_id label for simpler assertions
1245        let config = MetricsCollectorConfig {
1246            include_agent_id_label: false,
1247            ..MetricsCollectorConfig::default()
1248        };
1249        let collector = MetricsCollector::with_config(config);
1250
1251        let mut report = MetricsReport::new("test", 10_000);
1252        report.histograms.push(HistogramMetric {
1253            name: "request_duration_seconds".to_string(),
1254            help: Some("Request duration".to_string()),
1255            labels: HashMap::new(),
1256            sum: 10.5,
1257            count: 100,
1258            buckets: vec![
1259                HistogramBucket { le: 0.1, count: 50 },
1260                HistogramBucket { le: 0.5, count: 80 },
1261                HistogramBucket { le: 1.0, count: 95 },
1262                HistogramBucket::infinity(),
1263            ],
1264        });
1265
1266        collector.record(&report);
1267
1268        let output = collector.export_prometheus();
1269        assert!(output.contains("request_duration_seconds_bucket"));
1270        assert!(output.contains("le=\"0.1\""));
1271        assert!(output.contains("le=\"+Inf\""));
1272        assert!(output.contains("request_duration_seconds_sum 10.5"));
1273        assert!(output.contains("request_duration_seconds_count 100"));
1274    }
1275
1276    #[test]
1277    fn test_config_update_handler() {
1278        let handler = ConfigUpdateHandler::new();
1279
1280        let request = ConfigUpdateRequest {
1281            update_type: ConfigUpdateType::RequestReload,
1282            request_id: "req-1".to_string(),
1283            timestamp_ms: 0,
1284        };
1285
1286        let response = handler.handle(request);
1287        assert!(response.accepted);
1288        assert_eq!(handler.pending_count(), 1);
1289    }
1290
1291    #[test]
1292    fn test_escape_label_value() {
1293        assert_eq!(escape_label_value("simple"), "simple");
1294        assert_eq!(escape_label_value("with\"quotes"), "with\\\"quotes");
1295        assert_eq!(escape_label_value("with\\backslash"), "with\\\\backslash");
1296        assert_eq!(escape_label_value("with\nnewline"), "with\\nnewline");
1297    }
1298
1299    #[test]
1300    fn test_config_pusher_basic() {
1301        let pusher = ConfigPusher::new();
1302
1303        // Register an agent
1304        pusher.register_agent("agent-1", "Test Agent", true);
1305
1306        let agents = pusher.connected_agents();
1307        assert_eq!(agents.len(), 1);
1308        assert_eq!(agents[0].agent_id, "agent-1");
1309        assert!(agents[0].supports_push);
1310    }
1311
1312    #[test]
1313    fn test_config_pusher_push_to_agent() {
1314        let pusher = ConfigPusher::new();
1315        pusher.register_agent("agent-1", "Test Agent", true);
1316
1317        let update_type = ConfigUpdateType::RuleUpdate {
1318            rule_set: "default".to_string(),
1319            rules: vec![],
1320            remove_rules: vec![],
1321        };
1322
1323        let push_id = pusher.push_to_agent("agent-1", update_type);
1324        assert!(push_id.is_some());
1325
1326        let push_id = push_id.unwrap();
1327        assert!(push_id.starts_with("push-"));
1328        assert_eq!(pusher.pending_count(), 1);
1329    }
1330
1331    #[test]
1332    fn test_config_pusher_acknowledge() {
1333        let pusher = ConfigPusher::new();
1334        pusher.register_agent("agent-1", "Test Agent", true);
1335
1336        let push_id = pusher
1337            .push_to_agent("agent-1", ConfigUpdateType::RequestReload)
1338            .unwrap();
1339
1340        // Acknowledge success
1341        pusher.acknowledge(&push_id, true, None);
1342
1343        let results = pusher.get_results();
1344        assert_eq!(results.len(), 1);
1345        assert_eq!(results[0].status, PushStatus::Acknowledged);
1346
1347        // Check agent stats
1348        let agents = pusher.connected_agents();
1349        assert_eq!(agents[0].successful_pushes, 1);
1350    }
1351
1352    #[test]
1353    fn test_config_pusher_push_to_non_pushable() {
1354        let pusher = ConfigPusher::new();
1355        pusher.register_agent("agent-1", "Test Agent", false);
1356
1357        let push_id = pusher.push_to_agent("agent-1", ConfigUpdateType::RequestReload);
1358        assert!(push_id.is_none());
1359    }
1360
1361    #[test]
1362    fn test_config_pusher_push_to_all() {
1363        let pusher = ConfigPusher::new();
1364        pusher.register_agent("agent-1", "Agent 1", true);
1365        pusher.register_agent("agent-2", "Agent 2", true);
1366        pusher.register_agent("agent-3", "Agent 3", false); // Not pushable
1367
1368        let push_ids = pusher.push_to_all(ConfigUpdateType::RequestReload);
1369        assert_eq!(push_ids.len(), 2);
1370        assert_eq!(pusher.pending_count(), 2);
1371    }
1372
1373    #[test]
1374    fn test_config_pusher_unregister() {
1375        let pusher = ConfigPusher::new();
1376        pusher.register_agent("agent-1", "Test Agent", true);
1377
1378        let _push_id = pusher.push_to_agent("agent-1", ConfigUpdateType::RequestReload);
1379        assert_eq!(pusher.pending_count(), 1);
1380
1381        pusher.unregister_agent("agent-1");
1382
1383        assert_eq!(pusher.connected_agents().len(), 0);
1384        assert_eq!(pusher.pending_count(), 0); // Pending pushes should be removed
1385    }
1386
1387    #[test]
1388    fn test_metrics_snapshot() {
1389        let collector = MetricsCollector::new();
1390
1391        let mut report = MetricsReport::new("test", 10_000);
1392        report
1393            .counters
1394            .push(CounterMetric::new("requests_total", 100));
1395        report.gauges.push(GaugeMetric::new("connections", 5.0));
1396
1397        collector.record(&report);
1398
1399        let snapshot = collector.snapshot();
1400        assert_eq!(snapshot.counter_count(), 1);
1401        assert_eq!(snapshot.gauge_count(), 1);
1402    }
1403
1404    #[test]
1405    fn test_unified_aggregator_basic() {
1406        let aggregator = UnifiedMetricsAggregator::new("test-service", "instance-1");
1407
1408        // Add proxy counter
1409        aggregator.increment_counter(
1410            "http_requests_total",
1411            "Total HTTP requests",
1412            HashMap::new(),
1413            100,
1414        );
1415
1416        // Add proxy gauge
1417        aggregator.set_gauge(
1418            "active_connections",
1419            "Active connections",
1420            HashMap::new(),
1421            42.0,
1422        );
1423
1424        assert_eq!(aggregator.series_count(), 2);
1425    }
1426
1427    #[test]
1428    fn test_unified_aggregator_counter_increment() {
1429        let aggregator = UnifiedMetricsAggregator::new("test", "1");
1430
1431        aggregator.increment_counter("requests", "Total requests", HashMap::new(), 10);
1432        aggregator.increment_counter("requests", "Total requests", HashMap::new(), 5);
1433
1434        let output = aggregator.export_prometheus();
1435        assert!(output.contains("requests 15"));
1436    }
1437
1438    #[test]
1439    fn test_unified_aggregator_labeled_metrics() {
1440        let aggregator = UnifiedMetricsAggregator::new("test", "1");
1441
1442        let mut labels = HashMap::new();
1443        labels.insert("method".to_string(), "GET".to_string());
1444        aggregator.increment_counter("requests", "Total requests", labels.clone(), 100);
1445
1446        let mut labels2 = HashMap::new();
1447        labels2.insert("method".to_string(), "POST".to_string());
1448        aggregator.increment_counter("requests", "Total requests", labels2, 50);
1449
1450        let output = aggregator.export_prometheus();
1451        assert!(output.contains("method=\"GET\""));
1452        assert!(output.contains("method=\"POST\""));
1453    }
1454
1455    #[test]
1456    fn test_unified_aggregator_histogram() {
1457        let aggregator = UnifiedMetricsAggregator::new("test", "1");
1458        let buckets = vec![
1459            0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1460        ];
1461
1462        // Record some observations
1463        aggregator.observe_histogram(
1464            "request_duration",
1465            "Request duration",
1466            HashMap::new(),
1467            &buckets,
1468            0.05,
1469        );
1470        aggregator.observe_histogram(
1471            "request_duration",
1472            "Request duration",
1473            HashMap::new(),
1474            &buckets,
1475            0.2,
1476        );
1477        aggregator.observe_histogram(
1478            "request_duration",
1479            "Request duration",
1480            HashMap::new(),
1481            &buckets,
1482            1.5,
1483        );
1484
1485        let output = aggregator.export_prometheus();
1486        assert!(output.contains("request_duration_bucket"));
1487        assert!(output.contains("request_duration_sum"));
1488        assert!(output.contains("request_duration_count 3"));
1489    }
1490
1491    #[test]
1492    fn test_unified_aggregator_with_agent_metrics() {
1493        let aggregator = UnifiedMetricsAggregator::new("test", "1");
1494
1495        // Add proxy metric
1496        aggregator.increment_counter("proxy_requests", "Proxy requests", HashMap::new(), 1000);
1497
1498        // Add agent metrics
1499        let mut report = MetricsReport::new("waf-agent", 5_000);
1500        report.counters.push(CounterMetric::new("waf_blocked", 50));
1501        aggregator.record_agent_metrics(&report);
1502
1503        let output = aggregator.export_prometheus();
1504        assert!(output.contains("proxy_requests 1000"));
1505        assert!(output.contains("waf_blocked"));
1506        assert!(output.contains("Agent metrics"));
1507    }
1508
1509    #[test]
1510    fn test_unified_aggregator_service_info() {
1511        let aggregator = UnifiedMetricsAggregator::new("my-service", "node-42");
1512
1513        let output = aggregator.export_prometheus();
1514        assert!(output.contains("grapsus_info"));
1515        assert!(output.contains("service=\"my-service\""));
1516        assert!(output.contains("instance=\"node-42\""));
1517    }
1518}