sentinel_agent_protocol/v2/
observability.rs

1//! Observability infrastructure for Protocol v2.
2//!
3//! This module provides:
4//! - Metrics collection from agents
5//! - Metrics aggregation across agents
6//! - Prometheus-compatible export
7//! - Config update handling
8
9use crate::v2::control::{ConfigUpdateRequest, ConfigUpdateResponse, ConfigUpdateType};
10use crate::v2::metrics::{HistogramBucket, MetricsReport};
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::time::{Duration, Instant};
14
15/// Metrics collector that receives and aggregates metrics from agents.
16#[derive(Debug)]
17pub struct MetricsCollector {
18    /// Aggregated counters by (agent_id, metric_name, labels_key)
19    counters: RwLock<HashMap<MetricKey, AggregatedCounter>>,
20    /// Aggregated gauges by (agent_id, metric_name, labels_key)
21    gauges: RwLock<HashMap<MetricKey, AggregatedGauge>>,
22    /// Aggregated histograms by (agent_id, metric_name, labels_key)
23    histograms: RwLock<HashMap<MetricKey, AggregatedHistogram>>,
24    /// Last report time per agent
25    last_report: RwLock<HashMap<String, Instant>>,
26    /// Configuration
27    config: MetricsCollectorConfig,
28}
29
30/// Configuration for the metrics collector.
31#[derive(Debug, Clone)]
32pub struct MetricsCollectorConfig {
33    /// Maximum age of metrics before expiry
34    pub max_age: Duration,
35    /// Maximum number of unique metric series
36    pub max_series: usize,
37    /// Whether to include agent_id as a label
38    pub include_agent_id_label: bool,
39}
40
41impl Default for MetricsCollectorConfig {
42    fn default() -> Self {
43        Self {
44            max_age: Duration::from_secs(300), // 5 minutes
45            max_series: 10_000,
46            include_agent_id_label: true,
47        }
48    }
49}
50
51/// Key for identifying a unique metric series.
52#[derive(Debug, Clone, Hash, PartialEq, Eq)]
53struct MetricKey {
54    agent_id: String,
55    name: String,
56    labels_key: String,
57}
58
59impl MetricKey {
60    fn new(agent_id: &str, name: &str, labels: &HashMap<String, String>) -> Self {
61        let labels_key = Self::labels_to_key(labels);
62        Self {
63            agent_id: agent_id.to_string(),
64            name: name.to_string(),
65            labels_key,
66        }
67    }
68
69    fn labels_to_key(labels: &HashMap<String, String>) -> String {
70        let mut pairs: Vec<_> = labels.iter().collect();
71        pairs.sort_by_key(|(k, _)| *k);
72        pairs
73            .into_iter()
74            .map(|(k, v)| format!("{}={}", k, v))
75            .collect::<Vec<_>>()
76            .join(",")
77    }
78}
79
80/// Aggregated counter value.
81#[derive(Debug, Clone)]
82struct AggregatedCounter {
83    name: String,
84    help: Option<String>,
85    labels: HashMap<String, String>,
86    value: u64,
87    last_updated: Instant,
88}
89
90/// Aggregated gauge value.
91#[derive(Debug, Clone)]
92struct AggregatedGauge {
93    name: String,
94    help: Option<String>,
95    labels: HashMap<String, String>,
96    value: f64,
97    last_updated: Instant,
98}
99
100/// Aggregated histogram.
101#[derive(Debug, Clone)]
102struct AggregatedHistogram {
103    name: String,
104    help: Option<String>,
105    labels: HashMap<String, String>,
106    sum: f64,
107    count: u64,
108    buckets: Vec<HistogramBucket>,
109    last_updated: Instant,
110}
111
112impl MetricsCollector {
113    /// Create a new metrics collector with default configuration.
114    pub fn new() -> Self {
115        Self::with_config(MetricsCollectorConfig::default())
116    }
117
118    /// Create a new metrics collector with custom configuration.
119    pub fn with_config(config: MetricsCollectorConfig) -> Self {
120        Self {
121            counters: RwLock::new(HashMap::new()),
122            gauges: RwLock::new(HashMap::new()),
123            histograms: RwLock::new(HashMap::new()),
124            last_report: RwLock::new(HashMap::new()),
125            config,
126        }
127    }
128
129    /// Record a metrics report from an agent.
130    pub fn record(&self, report: &MetricsReport) {
131        let now = Instant::now();
132
133        // Update last report time
134        self.last_report
135            .write()
136            .insert(report.agent_id.clone(), now);
137
138        // Process counters
139        for counter in &report.counters {
140            let mut labels = counter.labels.clone();
141            if self.config.include_agent_id_label {
142                labels.insert("agent_id".to_string(), report.agent_id.clone());
143            }
144
145            let key = MetricKey::new(&report.agent_id, &counter.name, &labels);
146
147            let mut counters = self.counters.write();
148            counters.insert(
149                key,
150                AggregatedCounter {
151                    name: counter.name.clone(),
152                    help: counter.help.clone(),
153                    labels,
154                    value: counter.value,
155                    last_updated: now,
156                },
157            );
158        }
159
160        // Process gauges
161        for gauge in &report.gauges {
162            let mut labels = gauge.labels.clone();
163            if self.config.include_agent_id_label {
164                labels.insert("agent_id".to_string(), report.agent_id.clone());
165            }
166
167            let key = MetricKey::new(&report.agent_id, &gauge.name, &labels);
168
169            let mut gauges = self.gauges.write();
170            gauges.insert(
171                key,
172                AggregatedGauge {
173                    name: gauge.name.clone(),
174                    help: gauge.help.clone(),
175                    labels,
176                    value: gauge.value,
177                    last_updated: now,
178                },
179            );
180        }
181
182        // Process histograms
183        for histogram in &report.histograms {
184            let mut labels = histogram.labels.clone();
185            if self.config.include_agent_id_label {
186                labels.insert("agent_id".to_string(), report.agent_id.clone());
187            }
188
189            let key = MetricKey::new(&report.agent_id, &histogram.name, &labels);
190
191            let mut histograms = self.histograms.write();
192            histograms.insert(
193                key,
194                AggregatedHistogram {
195                    name: histogram.name.clone(),
196                    help: histogram.help.clone(),
197                    labels,
198                    sum: histogram.sum,
199                    count: histogram.count,
200                    buckets: histogram.buckets.clone(),
201                    last_updated: now,
202                },
203            );
204        }
205    }
206
207    /// Remove expired metrics.
208    pub fn expire_old_metrics(&self) {
209        let now = Instant::now();
210        let max_age = self.config.max_age;
211
212        self.counters
213            .write()
214            .retain(|_, v| now.duration_since(v.last_updated) < max_age);
215        self.gauges
216            .write()
217            .retain(|_, v| now.duration_since(v.last_updated) < max_age);
218        self.histograms
219            .write()
220            .retain(|_, v| now.duration_since(v.last_updated) < max_age);
221    }
222
223    /// Get the number of active metric series.
224    pub fn series_count(&self) -> usize {
225        self.counters.read().len() + self.gauges.read().len() + self.histograms.read().len()
226    }
227
228    /// Get active agent IDs.
229    pub fn active_agents(&self) -> Vec<String> {
230        self.last_report.read().keys().cloned().collect()
231    }
232
233    /// Export metrics in Prometheus text format.
234    pub fn export_prometheus(&self) -> String {
235        let mut output = String::new();
236
237        // Export counters
238        let counters = self.counters.read();
239        let mut counter_names: Vec<_> = counters.values().map(|c| &c.name).collect();
240        counter_names.sort();
241        counter_names.dedup();
242
243        for name in counter_names {
244            let metrics: Vec<_> = counters.values().filter(|c| &c.name == name).collect();
245            if let Some(first) = metrics.first() {
246                if let Some(help) = &first.help {
247                    output.push_str(&format!("# HELP {} {}\n", name, help));
248                }
249                output.push_str(&format!("# TYPE {} counter\n", name));
250            }
251            for metric in metrics {
252                output.push_str(&format_metric_line(name, &metric.labels, metric.value as f64));
253            }
254        }
255
256        // Export gauges
257        let gauges = self.gauges.read();
258        let mut gauge_names: Vec<_> = gauges.values().map(|g| &g.name).collect();
259        gauge_names.sort();
260        gauge_names.dedup();
261
262        for name in gauge_names {
263            let metrics: Vec<_> = gauges.values().filter(|g| &g.name == name).collect();
264            if let Some(first) = metrics.first() {
265                if let Some(help) = &first.help {
266                    output.push_str(&format!("# HELP {} {}\n", name, help));
267                }
268                output.push_str(&format!("# TYPE {} gauge\n", name));
269            }
270            for metric in metrics {
271                output.push_str(&format_metric_line(name, &metric.labels, metric.value));
272            }
273        }
274
275        // Export histograms
276        let histograms = self.histograms.read();
277        let mut histogram_names: Vec<_> = histograms.values().map(|h| &h.name).collect();
278        histogram_names.sort();
279        histogram_names.dedup();
280
281        for name in histogram_names {
282            let metrics: Vec<_> = histograms.values().filter(|h| &h.name == name).collect();
283            if let Some(first) = metrics.first() {
284                if let Some(help) = &first.help {
285                    output.push_str(&format!("# HELP {} {}\n", name, help));
286                }
287                output.push_str(&format!("# TYPE {} histogram\n", name));
288            }
289            for metric in metrics {
290                // Buckets
291                for bucket in &metric.buckets {
292                    let mut labels = metric.labels.clone();
293                    labels.insert(
294                        "le".to_string(),
295                        if bucket.le.is_infinite() {
296                            "+Inf".to_string()
297                        } else {
298                            bucket.le.to_string()
299                        },
300                    );
301                    output.push_str(&format_metric_line(
302                        &format!("{}_bucket", name),
303                        &labels,
304                        bucket.count as f64,
305                    ));
306                }
307                // Sum and count
308                output.push_str(&format_metric_line(
309                    &format!("{}_sum", name),
310                    &metric.labels,
311                    metric.sum,
312                ));
313                output.push_str(&format_metric_line(
314                    &format!("{}_count", name),
315                    &metric.labels,
316                    metric.count as f64,
317                ));
318            }
319        }
320
321        output
322    }
323
324    /// Get a snapshot of all metrics.
325    pub fn snapshot(&self) -> MetricsSnapshot {
326        MetricsSnapshot {
327            counters: self.counters.read().values().cloned().collect(),
328            gauges: self.gauges.read().values().cloned().collect(),
329            histograms: self.histograms.read().values().cloned().collect(),
330            timestamp: Instant::now(),
331        }
332    }
333}
334
335impl Default for MetricsCollector {
336    fn default() -> Self {
337        Self::new()
338    }
339}
340
341/// Snapshot of all metrics at a point in time.
342#[derive(Debug)]
343pub struct MetricsSnapshot {
344    counters: Vec<AggregatedCounter>,
345    gauges: Vec<AggregatedGauge>,
346    histograms: Vec<AggregatedHistogram>,
347    timestamp: Instant,
348}
349
350impl MetricsSnapshot {
351    /// Get counter values.
352    pub fn counter_count(&self) -> usize {
353        self.counters.len()
354    }
355
356    /// Get gauge values.
357    pub fn gauge_count(&self) -> usize {
358        self.gauges.len()
359    }
360
361    /// Get histogram count.
362    pub fn histogram_count(&self) -> usize {
363        self.histograms.len()
364    }
365}
366
367/// Unified metrics aggregator that combines metrics from multiple sources.
368///
369/// This aggregator collects metrics from:
370/// - The proxy itself (request counts, latencies, etc.)
371/// - Connected agents (via MetricsCollector)
372/// - System metrics (optional)
373#[derive(Debug)]
374pub struct UnifiedMetricsAggregator {
375    /// Proxy-level counters
376    proxy_counters: RwLock<HashMap<String, ProxyCounter>>,
377    /// Proxy-level gauges
378    proxy_gauges: RwLock<HashMap<String, ProxyGauge>>,
379    /// Proxy-level histograms
380    proxy_histograms: RwLock<HashMap<String, ProxyHistogram>>,
381    /// Agent metrics collector
382    agent_collector: MetricsCollector,
383    /// Service name for labeling
384    service_name: String,
385    /// Instance identifier
386    instance_id: String,
387}
388
389/// Proxy-level counter metric.
390#[derive(Debug, Clone)]
391struct ProxyCounter {
392    name: String,
393    help: String,
394    labels: HashMap<String, String>,
395    value: u64,
396}
397
398/// Proxy-level gauge metric.
399#[derive(Debug, Clone)]
400struct ProxyGauge {
401    name: String,
402    help: String,
403    labels: HashMap<String, String>,
404    value: f64,
405}
406
407/// Proxy-level histogram metric.
408#[derive(Debug, Clone)]
409struct ProxyHistogram {
410    name: String,
411    help: String,
412    labels: HashMap<String, String>,
413    sum: f64,
414    count: u64,
415    buckets: Vec<(f64, u64)>,
416}
417
418impl UnifiedMetricsAggregator {
419    /// Create a new unified metrics aggregator.
420    pub fn new(service_name: impl Into<String>, instance_id: impl Into<String>) -> Self {
421        Self {
422            proxy_counters: RwLock::new(HashMap::new()),
423            proxy_gauges: RwLock::new(HashMap::new()),
424            proxy_histograms: RwLock::new(HashMap::new()),
425            agent_collector: MetricsCollector::new(),
426            service_name: service_name.into(),
427            instance_id: instance_id.into(),
428        }
429    }
430
431    /// Create with custom agent collector config.
432    pub fn with_agent_config(
433        service_name: impl Into<String>,
434        instance_id: impl Into<String>,
435        agent_config: MetricsCollectorConfig,
436    ) -> Self {
437        Self {
438            proxy_counters: RwLock::new(HashMap::new()),
439            proxy_gauges: RwLock::new(HashMap::new()),
440            proxy_histograms: RwLock::new(HashMap::new()),
441            agent_collector: MetricsCollector::with_config(agent_config),
442            service_name: service_name.into(),
443            instance_id: instance_id.into(),
444        }
445    }
446
447    /// Get the agent metrics collector.
448    pub fn agent_collector(&self) -> &MetricsCollector {
449        &self.agent_collector
450    }
451
452    /// Increment a proxy counter.
453    pub fn increment_counter(&self, name: &str, help: &str, labels: HashMap<String, String>, delta: u64) {
454        let key = Self::metric_key(name, &labels);
455        let mut counters = self.proxy_counters.write();
456
457        if let Some(counter) = counters.get_mut(&key) {
458            counter.value += delta;
459        } else {
460            counters.insert(
461                key,
462                ProxyCounter {
463                    name: name.to_string(),
464                    help: help.to_string(),
465                    labels,
466                    value: delta,
467                },
468            );
469        }
470    }
471
472    /// Set a proxy gauge.
473    pub fn set_gauge(&self, name: &str, help: &str, labels: HashMap<String, String>, value: f64) {
474        let key = Self::metric_key(name, &labels);
475        self.proxy_gauges.write().insert(
476            key,
477            ProxyGauge {
478                name: name.to_string(),
479                help: help.to_string(),
480                labels,
481                value,
482            },
483        );
484    }
485
486    /// Record a histogram observation.
487    pub fn observe_histogram(
488        &self,
489        name: &str,
490        help: &str,
491        labels: HashMap<String, String>,
492        bucket_boundaries: &[f64],
493        value: f64,
494    ) {
495        let key = Self::metric_key(name, &labels);
496        let mut histograms = self.proxy_histograms.write();
497
498        if let Some(histogram) = histograms.get_mut(&key) {
499            histogram.sum += value;
500            histogram.count += 1;
501            // Update bucket counts
502            for (boundary, count) in histogram.buckets.iter_mut() {
503                if value <= *boundary {
504                    *count += 1;
505                }
506            }
507        } else {
508            // Initialize buckets
509            let mut buckets: Vec<(f64, u64)> = bucket_boundaries
510                .iter()
511                .map(|&b| (b, if value <= b { 1 } else { 0 }))
512                .collect();
513            buckets.push((f64::INFINITY, 1)); // +Inf bucket always includes all observations
514
515            histograms.insert(
516                key,
517                ProxyHistogram {
518                    name: name.to_string(),
519                    help: help.to_string(),
520                    labels,
521                    sum: value,
522                    count: 1,
523                    buckets,
524                },
525            );
526        }
527    }
528
529    /// Record agent metrics.
530    pub fn record_agent_metrics(&self, report: &MetricsReport) {
531        self.agent_collector.record(report);
532    }
533
534    /// Export all metrics in Prometheus text format.
535    pub fn export_prometheus(&self) -> String {
536        let mut output = String::new();
537
538        // Add service info metric
539        output.push_str(&format!(
540            "# HELP sentinel_info Sentinel proxy information\n# TYPE sentinel_info gauge\n"
541        ));
542        output.push_str(&format!(
543            "sentinel_info{{service=\"{}\",instance=\"{}\"}} 1\n",
544            escape_label_value(&self.service_name),
545            escape_label_value(&self.instance_id)
546        ));
547
548        // Export proxy counters
549        let counters = self.proxy_counters.read();
550        let mut counter_names: Vec<_> = counters.values().map(|c| &c.name).collect();
551        counter_names.sort();
552        counter_names.dedup();
553
554        for name in counter_names {
555            let metrics: Vec<_> = counters.values().filter(|c| &c.name == name).collect();
556            if let Some(first) = metrics.first() {
557                output.push_str(&format!("# HELP {} {}\n", name, first.help));
558                output.push_str(&format!("# TYPE {} counter\n", name));
559            }
560            for metric in metrics {
561                output.push_str(&format_metric_line(name, &metric.labels, metric.value as f64));
562            }
563        }
564
565        // Export proxy gauges
566        let gauges = self.proxy_gauges.read();
567        let mut gauge_names: Vec<_> = gauges.values().map(|g| &g.name).collect();
568        gauge_names.sort();
569        gauge_names.dedup();
570
571        for name in gauge_names {
572            let metrics: Vec<_> = gauges.values().filter(|g| &g.name == name).collect();
573            if let Some(first) = metrics.first() {
574                output.push_str(&format!("# HELP {} {}\n", name, first.help));
575                output.push_str(&format!("# TYPE {} gauge\n", name));
576            }
577            for metric in metrics {
578                output.push_str(&format_metric_line(name, &metric.labels, metric.value));
579            }
580        }
581
582        // Export proxy histograms
583        let histograms = self.proxy_histograms.read();
584        let mut histogram_names: Vec<_> = histograms.values().map(|h| &h.name).collect();
585        histogram_names.sort();
586        histogram_names.dedup();
587
588        for name in histogram_names {
589            let metrics: Vec<_> = histograms.values().filter(|h| &h.name == name).collect();
590            if let Some(first) = metrics.first() {
591                output.push_str(&format!("# HELP {} {}\n", name, first.help));
592                output.push_str(&format!("# TYPE {} histogram\n", name));
593            }
594            for metric in metrics {
595                // Buckets
596                for (le, count) in &metric.buckets {
597                    let mut labels = metric.labels.clone();
598                    labels.insert(
599                        "le".to_string(),
600                        if le.is_infinite() {
601                            "+Inf".to_string()
602                        } else {
603                            le.to_string()
604                        },
605                    );
606                    output.push_str(&format_metric_line(
607                        &format!("{}_bucket", name),
608                        &labels,
609                        *count as f64,
610                    ));
611                }
612                // Sum and count
613                output.push_str(&format_metric_line(
614                    &format!("{}_sum", name),
615                    &metric.labels,
616                    metric.sum,
617                ));
618                output.push_str(&format_metric_line(
619                    &format!("{}_count", name),
620                    &metric.labels,
621                    metric.count as f64,
622                ));
623            }
624        }
625
626        // Export agent metrics
627        output.push_str("\n# Agent metrics\n");
628        output.push_str(&self.agent_collector.export_prometheus());
629
630        output
631    }
632
633    /// Get total metric series count.
634    pub fn series_count(&self) -> usize {
635        self.proxy_counters.read().len()
636            + self.proxy_gauges.read().len()
637            + self.proxy_histograms.read().len()
638            + self.agent_collector.series_count()
639    }
640
641    fn metric_key(name: &str, labels: &HashMap<String, String>) -> String {
642        let mut pairs: Vec<_> = labels.iter().collect();
643        pairs.sort_by_key(|(k, _)| *k);
644        let labels_str = pairs
645            .into_iter()
646            .map(|(k, v)| format!("{}={}", k, v))
647            .collect::<Vec<_>>()
648            .join(",");
649        format!("{}|{}", name, labels_str)
650    }
651}
652
653impl Default for UnifiedMetricsAggregator {
654    fn default() -> Self {
655        Self::new("sentinel", "default")
656    }
657}
658
659/// Format a single metric line in Prometheus format.
660fn format_metric_line(name: &str, labels: &HashMap<String, String>, value: f64) -> String {
661    if labels.is_empty() {
662        format!("{} {}\n", name, format_value(value))
663    } else {
664        let mut pairs: Vec<_> = labels.iter().collect();
665        pairs.sort_by_key(|(k, _)| *k);
666        let labels_str = pairs
667            .into_iter()
668            .map(|(k, v)| format!("{}=\"{}\"", k, escape_label_value(v)))
669            .collect::<Vec<_>>()
670            .join(",");
671        format!("{}{{{}}} {}\n", name, labels_str, format_value(value))
672    }
673}
674
675/// Format a value for Prometheus output.
676fn format_value(v: f64) -> String {
677    if v.is_infinite() {
678        if v.is_sign_positive() {
679            "+Inf".to_string()
680        } else {
681            "-Inf".to_string()
682        }
683    } else if v.is_nan() {
684        "NaN".to_string()
685    } else if v.fract() == 0.0 {
686        format!("{}", v as i64)
687    } else {
688        format!("{}", v)
689    }
690}
691
692/// Escape a label value for Prometheus format.
693fn escape_label_value(s: &str) -> String {
694    s.replace('\\', "\\\\")
695        .replace('"', "\\\"")
696        .replace('\n', "\\n")
697}
698
699/// Handler for config updates from agents.
700pub struct ConfigUpdateHandler {
701    /// Pending updates awaiting acknowledgment
702    pending: RwLock<HashMap<String, PendingUpdate>>,
703    /// Callback for rule updates
704    on_rule_update: Option<Box<dyn Fn(&str, &[crate::v2::control::RuleDefinition], &[String]) -> bool + Send + Sync>>,
705    /// Callback for list updates
706    on_list_update: Option<Box<dyn Fn(&str, &[String], &[String]) -> bool + Send + Sync>>,
707}
708
709struct PendingUpdate {
710    request: ConfigUpdateRequest,
711    received_at: Instant,
712}
713
714impl ConfigUpdateHandler {
715    /// Create a new config update handler.
716    pub fn new() -> Self {
717        Self {
718            pending: RwLock::new(HashMap::new()),
719            on_rule_update: None,
720            on_list_update: None,
721        }
722    }
723
724    /// Set the rule update callback.
725    pub fn on_rule_update<F>(mut self, f: F) -> Self
726    where
727        F: Fn(&str, &[crate::v2::control::RuleDefinition], &[String]) -> bool + Send + Sync + 'static,
728    {
729        self.on_rule_update = Some(Box::new(f));
730        self
731    }
732
733    /// Set the list update callback.
734    pub fn on_list_update<F>(mut self, f: F) -> Self
735    where
736        F: Fn(&str, &[String], &[String]) -> bool + Send + Sync + 'static,
737    {
738        self.on_list_update = Some(Box::new(f));
739        self
740    }
741
742    /// Handle a config update request.
743    pub fn handle(&self, request: ConfigUpdateRequest) -> ConfigUpdateResponse {
744        let request_id = request.request_id.clone();
745
746        match &request.update_type {
747            ConfigUpdateType::RequestReload => {
748                // Store pending and return success - actual reload happens asynchronously
749                self.pending.write().insert(
750                    request_id.clone(),
751                    PendingUpdate {
752                        request,
753                        received_at: Instant::now(),
754                    },
755                );
756                ConfigUpdateResponse::success(request_id)
757            }
758            ConfigUpdateType::RuleUpdate { rule_set, rules, remove_rules } => {
759                if let Some(ref callback) = self.on_rule_update {
760                    if callback(rule_set, rules, remove_rules) {
761                        ConfigUpdateResponse::success(request_id)
762                    } else {
763                        ConfigUpdateResponse::failure(request_id, "Rule update rejected")
764                    }
765                } else {
766                    ConfigUpdateResponse::failure(request_id, "Rule updates not supported")
767                }
768            }
769            ConfigUpdateType::ListUpdate { list_id, add, remove } => {
770                if let Some(ref callback) = self.on_list_update {
771                    if callback(list_id, add, remove) {
772                        ConfigUpdateResponse::success(request_id)
773                    } else {
774                        ConfigUpdateResponse::failure(request_id, "List update rejected")
775                    }
776                } else {
777                    ConfigUpdateResponse::failure(request_id, "List updates not supported")
778                }
779            }
780            ConfigUpdateType::RestartRequired { reason, grace_period_ms } => {
781                // Log and acknowledge - actual restart is handled by orchestrator
782                tracing::warn!(
783                    reason = reason,
784                    grace_period_ms = grace_period_ms,
785                    "Agent requested restart"
786                );
787                ConfigUpdateResponse::success(request_id)
788            }
789            ConfigUpdateType::ConfigError { error, field } => {
790                tracing::error!(
791                    error = error,
792                    field = ?field,
793                    "Agent reported configuration error"
794                );
795                ConfigUpdateResponse::success(request_id)
796            }
797        }
798    }
799
800    /// Get pending update count.
801    pub fn pending_count(&self) -> usize {
802        self.pending.read().len()
803    }
804
805    /// Clear old pending updates.
806    pub fn clear_old_pending(&self, max_age: Duration) {
807        let now = Instant::now();
808        self.pending
809            .write()
810            .retain(|_, v| now.duration_since(v.received_at) < max_age);
811    }
812}
813
814impl Default for ConfigUpdateHandler {
815    fn default() -> Self {
816        Self::new()
817    }
818}
819
820// Debug implementation for ConfigUpdateHandler
821impl std::fmt::Debug for ConfigUpdateHandler {
822    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
823        f.debug_struct("ConfigUpdateHandler")
824            .field("pending_count", &self.pending.read().len())
825            .field("has_rule_callback", &self.on_rule_update.is_some())
826            .field("has_list_callback", &self.on_list_update.is_some())
827            .finish()
828    }
829}
830
831/// Configuration pusher for distributing config updates to agents.
832///
833/// This allows the proxy to push configuration changes (rules, lists, etc.)
834/// to connected agents and track acknowledgments.
835#[derive(Debug)]
836pub struct ConfigPusher {
837    /// Connected agents indexed by agent_id
838    agents: RwLock<HashMap<String, AgentConnection>>,
839    /// Pending pushes awaiting acknowledgment
840    pending_pushes: RwLock<HashMap<String, PendingPush>>,
841    /// Configuration
842    config: ConfigPusherConfig,
843    /// Sequence counter for push IDs
844    sequence: std::sync::atomic::AtomicU64,
845}
846
847/// Configuration for the config pusher.
848#[derive(Debug, Clone)]
849pub struct ConfigPusherConfig {
850    /// Maximum time to wait for acknowledgment
851    pub ack_timeout: Duration,
852    /// Maximum number of retry attempts
853    pub max_retries: usize,
854    /// Time between retries
855    pub retry_interval: Duration,
856    /// Maximum pending pushes per agent
857    pub max_pending_per_agent: usize,
858}
859
860impl Default for ConfigPusherConfig {
861    fn default() -> Self {
862        Self {
863            ack_timeout: Duration::from_secs(10),
864            max_retries: 3,
865            retry_interval: Duration::from_secs(2),
866            max_pending_per_agent: 100,
867        }
868    }
869}
870
871/// Information about a connected agent.
872#[derive(Debug, Clone)]
873pub struct AgentConnection {
874    /// Agent identifier
875    pub agent_id: String,
876    /// Agent name
877    pub name: String,
878    /// Connection time
879    pub connected_at: Instant,
880    /// Last message time
881    pub last_seen: Instant,
882    /// Number of successful config pushes
883    pub successful_pushes: u64,
884    /// Number of failed config pushes
885    pub failed_pushes: u64,
886    /// Whether the agent supports config push
887    pub supports_push: bool,
888}
889
890/// A pending configuration push.
891#[derive(Debug)]
892struct PendingPush {
893    push_id: String,
894    agent_id: String,
895    update: ConfigUpdateRequest,
896    created_at: Instant,
897    last_attempt: Instant,
898    attempts: usize,
899    status: PushStatus,
900}
901
902/// Status of a config push.
903#[derive(Debug, Clone, PartialEq, Eq)]
904pub enum PushStatus {
905    Pending,
906    Sent,
907    Acknowledged,
908    Failed { reason: String },
909    Expired,
910}
911
912/// Result of a push operation.
913#[derive(Debug)]
914pub struct PushResult {
915    pub push_id: String,
916    pub agent_id: String,
917    pub status: PushStatus,
918    pub attempts: usize,
919}
920
921impl ConfigPusher {
922    /// Create a new config pusher with default configuration.
923    pub fn new() -> Self {
924        Self::with_config(ConfigPusherConfig::default())
925    }
926
927    /// Create a new config pusher with custom configuration.
928    pub fn with_config(config: ConfigPusherConfig) -> Self {
929        Self {
930            agents: RwLock::new(HashMap::new()),
931            pending_pushes: RwLock::new(HashMap::new()),
932            config,
933            sequence: std::sync::atomic::AtomicU64::new(1),
934        }
935    }
936
937    /// Register a connected agent.
938    pub fn register_agent(&self, agent_id: impl Into<String>, name: impl Into<String>, supports_push: bool) {
939        let agent_id = agent_id.into();
940        let now = Instant::now();
941        self.agents.write().insert(
942            agent_id.clone(),
943            AgentConnection {
944                agent_id,
945                name: name.into(),
946                connected_at: now,
947                last_seen: now,
948                successful_pushes: 0,
949                failed_pushes: 0,
950                supports_push,
951            },
952        );
953    }
954
955    /// Unregister a disconnected agent.
956    pub fn unregister_agent(&self, agent_id: &str) {
957        self.agents.write().remove(agent_id);
958        // Remove any pending pushes for this agent
959        self.pending_pushes
960            .write()
961            .retain(|_, p| p.agent_id != agent_id);
962    }
963
964    /// Update agent's last seen time.
965    pub fn touch_agent(&self, agent_id: &str) {
966        if let Some(agent) = self.agents.write().get_mut(agent_id) {
967            agent.last_seen = Instant::now();
968        }
969    }
970
971    /// Get all connected agents.
972    pub fn connected_agents(&self) -> Vec<AgentConnection> {
973        self.agents.read().values().cloned().collect()
974    }
975
976    /// Get agents that support config push.
977    pub fn pushable_agents(&self) -> Vec<AgentConnection> {
978        self.agents
979            .read()
980            .values()
981            .filter(|a| a.supports_push)
982            .cloned()
983            .collect()
984    }
985
986    /// Push a configuration update to a specific agent.
987    pub fn push_to_agent(&self, agent_id: &str, update_type: ConfigUpdateType) -> Option<String> {
988        let agents = self.agents.read();
989        let agent = agents.get(agent_id)?;
990
991        if !agent.supports_push {
992            return None;
993        }
994
995        let push_id = self.next_push_id();
996        let now = Instant::now();
997
998        let update = ConfigUpdateRequest {
999            update_type,
1000            request_id: push_id.clone(),
1001            timestamp_ms: now_ms(),
1002        };
1003
1004        self.pending_pushes.write().insert(
1005            push_id.clone(),
1006            PendingPush {
1007                push_id: push_id.clone(),
1008                agent_id: agent_id.to_string(),
1009                update,
1010                created_at: now,
1011                last_attempt: now,
1012                attempts: 1,
1013                status: PushStatus::Sent,
1014            },
1015        );
1016
1017        Some(push_id)
1018    }
1019
1020    /// Push a configuration update to all pushable agents.
1021    pub fn push_to_all(&self, update_type: ConfigUpdateType) -> Vec<String> {
1022        let pushable = self.pushable_agents();
1023        let mut push_ids = Vec::with_capacity(pushable.len());
1024
1025        for agent in pushable {
1026            if let Some(push_id) = self.push_to_agent(&agent.agent_id, update_type.clone()) {
1027                push_ids.push(push_id);
1028            }
1029        }
1030
1031        push_ids
1032    }
1033
1034    /// Acknowledge a config push.
1035    pub fn acknowledge(&self, push_id: &str, accepted: bool, error: Option<String>) {
1036        let mut pending = self.pending_pushes.write();
1037        if let Some(push) = pending.get_mut(push_id) {
1038            if accepted {
1039                push.status = PushStatus::Acknowledged;
1040                // Update agent stats
1041                if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
1042                    agent.successful_pushes += 1;
1043                }
1044            } else {
1045                push.status = PushStatus::Failed {
1046                    reason: error.unwrap_or_else(|| "Unknown error".to_string()),
1047                };
1048                // Update agent stats
1049                if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
1050                    agent.failed_pushes += 1;
1051                }
1052            }
1053        }
1054    }
1055
1056    /// Get pushes that need to be retried.
1057    pub fn get_retryable(&self) -> Vec<(String, ConfigUpdateRequest)> {
1058        let now = Instant::now();
1059        let mut retryable = Vec::new();
1060        let mut pending = self.pending_pushes.write();
1061
1062        for push in pending.values_mut() {
1063            if push.status == PushStatus::Sent
1064                && now.duration_since(push.last_attempt) >= self.config.retry_interval
1065                && push.attempts < self.config.max_retries
1066            {
1067                push.attempts += 1;
1068                push.last_attempt = now;
1069                retryable.push((push.agent_id.clone(), push.update.clone()));
1070            }
1071        }
1072
1073        retryable
1074    }
1075
1076    /// Expire old pending pushes.
1077    pub fn expire_old(&self) {
1078        let now = Instant::now();
1079        let mut pending = self.pending_pushes.write();
1080
1081        for push in pending.values_mut() {
1082            if push.status == PushStatus::Sent
1083                && (now.duration_since(push.created_at) >= self.config.ack_timeout
1084                    || push.attempts >= self.config.max_retries)
1085            {
1086                push.status = PushStatus::Expired;
1087                // Update agent stats
1088                if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
1089                    agent.failed_pushes += 1;
1090                }
1091            }
1092        }
1093
1094        // Remove completed or expired pushes older than 1 minute
1095        let cleanup_age = Duration::from_secs(60);
1096        pending.retain(|_, p| {
1097            now.duration_since(p.created_at) < cleanup_age
1098                || matches!(p.status, PushStatus::Pending | PushStatus::Sent)
1099        });
1100    }
1101
1102    /// Get push results.
1103    pub fn get_results(&self) -> Vec<PushResult> {
1104        self.pending_pushes
1105            .read()
1106            .values()
1107            .map(|p| PushResult {
1108                push_id: p.push_id.clone(),
1109                agent_id: p.agent_id.clone(),
1110                status: p.status.clone(),
1111                attempts: p.attempts,
1112            })
1113            .collect()
1114    }
1115
1116    /// Get pending push count.
1117    pub fn pending_count(&self) -> usize {
1118        self.pending_pushes
1119            .read()
1120            .values()
1121            .filter(|p| matches!(p.status, PushStatus::Pending | PushStatus::Sent))
1122            .count()
1123    }
1124
1125    fn next_push_id(&self) -> String {
1126        let seq = self
1127            .sequence
1128            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1129        format!("push-{}", seq)
1130    }
1131}
1132
1133impl Default for ConfigPusher {
1134    fn default() -> Self {
1135        Self::new()
1136    }
1137}
1138
1139fn now_ms() -> u64 {
1140    std::time::SystemTime::now()
1141        .duration_since(std::time::UNIX_EPOCH)
1142        .map(|d| d.as_millis() as u64)
1143        .unwrap_or(0)
1144}
1145
1146#[cfg(test)]
1147mod tests {
1148    use super::*;
1149    use crate::v2::metrics::{standard, CounterMetric, GaugeMetric, HistogramMetric};
1150
1151    #[test]
1152    fn test_metrics_collector_basic() {
1153        let collector = MetricsCollector::new();
1154
1155        let mut report = MetricsReport::new("test-agent", 10_000);
1156        report.counters.push(CounterMetric::new(standard::REQUESTS_TOTAL, 100));
1157        report.gauges.push(GaugeMetric::new(standard::IN_FLIGHT_REQUESTS, 5.0));
1158
1159        collector.record(&report);
1160
1161        assert_eq!(collector.series_count(), 2);
1162        assert_eq!(collector.active_agents(), vec!["test-agent"]);
1163    }
1164
1165    #[test]
1166    fn test_metrics_collector_with_labels() {
1167        let collector = MetricsCollector::new();
1168
1169        let mut report = MetricsReport::new("agent-1", 10_000);
1170        let mut counter = CounterMetric::new(standard::REQUESTS_TOTAL, 50);
1171        counter.labels.insert("route".to_string(), "/api".to_string());
1172        report.counters.push(counter);
1173
1174        collector.record(&report);
1175
1176        let prometheus = collector.export_prometheus();
1177        assert!(prometheus.contains("agent_requests_total"));
1178        assert!(prometheus.contains("route=\"/api\""));
1179        assert!(prometheus.contains("agent_id=\"agent-1\""));
1180    }
1181
1182    #[test]
1183    fn test_prometheus_export() {
1184        let collector = MetricsCollector::new();
1185
1186        let mut report = MetricsReport::new("test", 10_000);
1187        let mut counter = CounterMetric::new("http_requests_total", 123);
1188        counter.help = Some("Total HTTP requests".to_string());
1189        report.counters.push(counter);
1190
1191        collector.record(&report);
1192
1193        let output = collector.export_prometheus();
1194        assert!(output.contains("# HELP http_requests_total Total HTTP requests"));
1195        assert!(output.contains("# TYPE http_requests_total counter"));
1196        assert!(output.contains("123"));
1197    }
1198
1199    #[test]
1200    fn test_histogram_export() {
1201        // Use config without agent_id label for simpler assertions
1202        let config = MetricsCollectorConfig {
1203            include_agent_id_label: false,
1204            ..MetricsCollectorConfig::default()
1205        };
1206        let collector = MetricsCollector::with_config(config);
1207
1208        let mut report = MetricsReport::new("test", 10_000);
1209        report.histograms.push(HistogramMetric {
1210            name: "request_duration_seconds".to_string(),
1211            help: Some("Request duration".to_string()),
1212            labels: HashMap::new(),
1213            sum: 10.5,
1214            count: 100,
1215            buckets: vec![
1216                HistogramBucket { le: 0.1, count: 50 },
1217                HistogramBucket { le: 0.5, count: 80 },
1218                HistogramBucket { le: 1.0, count: 95 },
1219                HistogramBucket::infinity(),
1220            ],
1221        });
1222
1223        collector.record(&report);
1224
1225        let output = collector.export_prometheus();
1226        assert!(output.contains("request_duration_seconds_bucket"));
1227        assert!(output.contains("le=\"0.1\""));
1228        assert!(output.contains("le=\"+Inf\""));
1229        assert!(output.contains("request_duration_seconds_sum 10.5"));
1230        assert!(output.contains("request_duration_seconds_count 100"));
1231    }
1232
1233    #[test]
1234    fn test_config_update_handler() {
1235        let handler = ConfigUpdateHandler::new();
1236
1237        let request = ConfigUpdateRequest {
1238            update_type: ConfigUpdateType::RequestReload,
1239            request_id: "req-1".to_string(),
1240            timestamp_ms: 0,
1241        };
1242
1243        let response = handler.handle(request);
1244        assert!(response.accepted);
1245        assert_eq!(handler.pending_count(), 1);
1246    }
1247
1248    #[test]
1249    fn test_escape_label_value() {
1250        assert_eq!(escape_label_value("simple"), "simple");
1251        assert_eq!(escape_label_value("with\"quotes"), "with\\\"quotes");
1252        assert_eq!(escape_label_value("with\\backslash"), "with\\\\backslash");
1253        assert_eq!(escape_label_value("with\nnewline"), "with\\nnewline");
1254    }
1255
1256    #[test]
1257    fn test_config_pusher_basic() {
1258        let pusher = ConfigPusher::new();
1259
1260        // Register an agent
1261        pusher.register_agent("agent-1", "Test Agent", true);
1262
1263        let agents = pusher.connected_agents();
1264        assert_eq!(agents.len(), 1);
1265        assert_eq!(agents[0].agent_id, "agent-1");
1266        assert!(agents[0].supports_push);
1267    }
1268
1269    #[test]
1270    fn test_config_pusher_push_to_agent() {
1271        let pusher = ConfigPusher::new();
1272        pusher.register_agent("agent-1", "Test Agent", true);
1273
1274        let update_type = ConfigUpdateType::RuleUpdate {
1275            rule_set: "default".to_string(),
1276            rules: vec![],
1277            remove_rules: vec![],
1278        };
1279
1280        let push_id = pusher.push_to_agent("agent-1", update_type);
1281        assert!(push_id.is_some());
1282
1283        let push_id = push_id.unwrap();
1284        assert!(push_id.starts_with("push-"));
1285        assert_eq!(pusher.pending_count(), 1);
1286    }
1287
1288    #[test]
1289    fn test_config_pusher_acknowledge() {
1290        let pusher = ConfigPusher::new();
1291        pusher.register_agent("agent-1", "Test Agent", true);
1292
1293        let push_id = pusher
1294            .push_to_agent("agent-1", ConfigUpdateType::RequestReload)
1295            .unwrap();
1296
1297        // Acknowledge success
1298        pusher.acknowledge(&push_id, true, None);
1299
1300        let results = pusher.get_results();
1301        assert_eq!(results.len(), 1);
1302        assert_eq!(results[0].status, PushStatus::Acknowledged);
1303
1304        // Check agent stats
1305        let agents = pusher.connected_agents();
1306        assert_eq!(agents[0].successful_pushes, 1);
1307    }
1308
1309    #[test]
1310    fn test_config_pusher_push_to_non_pushable() {
1311        let pusher = ConfigPusher::new();
1312        pusher.register_agent("agent-1", "Test Agent", false);
1313
1314        let push_id = pusher.push_to_agent("agent-1", ConfigUpdateType::RequestReload);
1315        assert!(push_id.is_none());
1316    }
1317
1318    #[test]
1319    fn test_config_pusher_push_to_all() {
1320        let pusher = ConfigPusher::new();
1321        pusher.register_agent("agent-1", "Agent 1", true);
1322        pusher.register_agent("agent-2", "Agent 2", true);
1323        pusher.register_agent("agent-3", "Agent 3", false); // Not pushable
1324
1325        let push_ids = pusher.push_to_all(ConfigUpdateType::RequestReload);
1326        assert_eq!(push_ids.len(), 2);
1327        assert_eq!(pusher.pending_count(), 2);
1328    }
1329
1330    #[test]
1331    fn test_config_pusher_unregister() {
1332        let pusher = ConfigPusher::new();
1333        pusher.register_agent("agent-1", "Test Agent", true);
1334
1335        let _push_id = pusher.push_to_agent("agent-1", ConfigUpdateType::RequestReload);
1336        assert_eq!(pusher.pending_count(), 1);
1337
1338        pusher.unregister_agent("agent-1");
1339
1340        assert_eq!(pusher.connected_agents().len(), 0);
1341        assert_eq!(pusher.pending_count(), 0); // Pending pushes should be removed
1342    }
1343
1344    #[test]
1345    fn test_metrics_snapshot() {
1346        let collector = MetricsCollector::new();
1347
1348        let mut report = MetricsReport::new("test", 10_000);
1349        report.counters.push(CounterMetric::new("requests_total", 100));
1350        report.gauges.push(GaugeMetric::new("connections", 5.0));
1351
1352        collector.record(&report);
1353
1354        let snapshot = collector.snapshot();
1355        assert_eq!(snapshot.counter_count(), 1);
1356        assert_eq!(snapshot.gauge_count(), 1);
1357    }
1358
1359    #[test]
1360    fn test_unified_aggregator_basic() {
1361        let aggregator = UnifiedMetricsAggregator::new("test-service", "instance-1");
1362
1363        // Add proxy counter
1364        aggregator.increment_counter(
1365            "http_requests_total",
1366            "Total HTTP requests",
1367            HashMap::new(),
1368            100,
1369        );
1370
1371        // Add proxy gauge
1372        aggregator.set_gauge(
1373            "active_connections",
1374            "Active connections",
1375            HashMap::new(),
1376            42.0,
1377        );
1378
1379        assert_eq!(aggregator.series_count(), 2);
1380    }
1381
1382    #[test]
1383    fn test_unified_aggregator_counter_increment() {
1384        let aggregator = UnifiedMetricsAggregator::new("test", "1");
1385
1386        aggregator.increment_counter("requests", "Total requests", HashMap::new(), 10);
1387        aggregator.increment_counter("requests", "Total requests", HashMap::new(), 5);
1388
1389        let output = aggregator.export_prometheus();
1390        assert!(output.contains("requests 15"));
1391    }
1392
1393    #[test]
1394    fn test_unified_aggregator_labeled_metrics() {
1395        let aggregator = UnifiedMetricsAggregator::new("test", "1");
1396
1397        let mut labels = HashMap::new();
1398        labels.insert("method".to_string(), "GET".to_string());
1399        aggregator.increment_counter("requests", "Total requests", labels.clone(), 100);
1400
1401        let mut labels2 = HashMap::new();
1402        labels2.insert("method".to_string(), "POST".to_string());
1403        aggregator.increment_counter("requests", "Total requests", labels2, 50);
1404
1405        let output = aggregator.export_prometheus();
1406        assert!(output.contains("method=\"GET\""));
1407        assert!(output.contains("method=\"POST\""));
1408    }
1409
1410    #[test]
1411    fn test_unified_aggregator_histogram() {
1412        let aggregator = UnifiedMetricsAggregator::new("test", "1");
1413        let buckets = vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0];
1414
1415        // Record some observations
1416        aggregator.observe_histogram("request_duration", "Request duration", HashMap::new(), &buckets, 0.05);
1417        aggregator.observe_histogram("request_duration", "Request duration", HashMap::new(), &buckets, 0.2);
1418        aggregator.observe_histogram("request_duration", "Request duration", HashMap::new(), &buckets, 1.5);
1419
1420        let output = aggregator.export_prometheus();
1421        assert!(output.contains("request_duration_bucket"));
1422        assert!(output.contains("request_duration_sum"));
1423        assert!(output.contains("request_duration_count 3"));
1424    }
1425
1426    #[test]
1427    fn test_unified_aggregator_with_agent_metrics() {
1428        let aggregator = UnifiedMetricsAggregator::new("test", "1");
1429
1430        // Add proxy metric
1431        aggregator.increment_counter("proxy_requests", "Proxy requests", HashMap::new(), 1000);
1432
1433        // Add agent metrics
1434        let mut report = MetricsReport::new("waf-agent", 5_000);
1435        report.counters.push(CounterMetric::new("waf_blocked", 50));
1436        aggregator.record_agent_metrics(&report);
1437
1438        let output = aggregator.export_prometheus();
1439        assert!(output.contains("proxy_requests 1000"));
1440        assert!(output.contains("waf_blocked"));
1441        assert!(output.contains("Agent metrics"));
1442    }
1443
1444    #[test]
1445    fn test_unified_aggregator_service_info() {
1446        let aggregator = UnifiedMetricsAggregator::new("my-service", "node-42");
1447
1448        let output = aggregator.export_prometheus();
1449        assert!(output.contains("sentinel_info"));
1450        assert!(output.contains("service=\"my-service\""));
1451        assert!(output.contains("instance=\"node-42\""));
1452    }
1453}