1use crate::v2::control::{ConfigUpdateRequest, ConfigUpdateResponse, ConfigUpdateType};
10use crate::v2::metrics::{HistogramBucket, MetricsReport};
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::time::{Duration, Instant};
14
15#[derive(Debug)]
17pub struct MetricsCollector {
18 counters: RwLock<HashMap<MetricKey, AggregatedCounter>>,
20 gauges: RwLock<HashMap<MetricKey, AggregatedGauge>>,
22 histograms: RwLock<HashMap<MetricKey, AggregatedHistogram>>,
24 last_report: RwLock<HashMap<String, Instant>>,
26 config: MetricsCollectorConfig,
28}
29
30#[derive(Debug, Clone)]
32pub struct MetricsCollectorConfig {
33 pub max_age: Duration,
35 pub max_series: usize,
37 pub include_agent_id_label: bool,
39}
40
41impl Default for MetricsCollectorConfig {
42 fn default() -> Self {
43 Self {
44 max_age: Duration::from_secs(300), max_series: 10_000,
46 include_agent_id_label: true,
47 }
48 }
49}
50
51#[derive(Debug, Clone, Hash, PartialEq, Eq)]
53struct MetricKey {
54 agent_id: String,
55 name: String,
56 labels_key: String,
57}
58
59impl MetricKey {
60 fn new(agent_id: &str, name: &str, labels: &HashMap<String, String>) -> Self {
61 let labels_key = Self::labels_to_key(labels);
62 Self {
63 agent_id: agent_id.to_string(),
64 name: name.to_string(),
65 labels_key,
66 }
67 }
68
69 fn labels_to_key(labels: &HashMap<String, String>) -> String {
70 let mut pairs: Vec<_> = labels.iter().collect();
71 pairs.sort_by_key(|(k, _)| *k);
72 pairs
73 .into_iter()
74 .map(|(k, v)| format!("{}={}", k, v))
75 .collect::<Vec<_>>()
76 .join(",")
77 }
78}
79
80#[derive(Debug, Clone)]
82struct AggregatedCounter {
83 name: String,
84 help: Option<String>,
85 labels: HashMap<String, String>,
86 value: u64,
87 last_updated: Instant,
88}
89
90#[derive(Debug, Clone)]
92struct AggregatedGauge {
93 name: String,
94 help: Option<String>,
95 labels: HashMap<String, String>,
96 value: f64,
97 last_updated: Instant,
98}
99
100#[derive(Debug, Clone)]
102struct AggregatedHistogram {
103 name: String,
104 help: Option<String>,
105 labels: HashMap<String, String>,
106 sum: f64,
107 count: u64,
108 buckets: Vec<HistogramBucket>,
109 last_updated: Instant,
110}
111
112impl MetricsCollector {
113 pub fn new() -> Self {
115 Self::with_config(MetricsCollectorConfig::default())
116 }
117
118 pub fn with_config(config: MetricsCollectorConfig) -> Self {
120 Self {
121 counters: RwLock::new(HashMap::new()),
122 gauges: RwLock::new(HashMap::new()),
123 histograms: RwLock::new(HashMap::new()),
124 last_report: RwLock::new(HashMap::new()),
125 config,
126 }
127 }
128
129 pub fn record(&self, report: &MetricsReport) {
131 let now = Instant::now();
132
133 self.last_report
135 .write()
136 .insert(report.agent_id.clone(), now);
137
138 for counter in &report.counters {
140 let mut labels = counter.labels.clone();
141 if self.config.include_agent_id_label {
142 labels.insert("agent_id".to_string(), report.agent_id.clone());
143 }
144
145 let key = MetricKey::new(&report.agent_id, &counter.name, &labels);
146
147 let mut counters = self.counters.write();
148 counters.insert(
149 key,
150 AggregatedCounter {
151 name: counter.name.clone(),
152 help: counter.help.clone(),
153 labels,
154 value: counter.value,
155 last_updated: now,
156 },
157 );
158 }
159
160 for gauge in &report.gauges {
162 let mut labels = gauge.labels.clone();
163 if self.config.include_agent_id_label {
164 labels.insert("agent_id".to_string(), report.agent_id.clone());
165 }
166
167 let key = MetricKey::new(&report.agent_id, &gauge.name, &labels);
168
169 let mut gauges = self.gauges.write();
170 gauges.insert(
171 key,
172 AggregatedGauge {
173 name: gauge.name.clone(),
174 help: gauge.help.clone(),
175 labels,
176 value: gauge.value,
177 last_updated: now,
178 },
179 );
180 }
181
182 for histogram in &report.histograms {
184 let mut labels = histogram.labels.clone();
185 if self.config.include_agent_id_label {
186 labels.insert("agent_id".to_string(), report.agent_id.clone());
187 }
188
189 let key = MetricKey::new(&report.agent_id, &histogram.name, &labels);
190
191 let mut histograms = self.histograms.write();
192 histograms.insert(
193 key,
194 AggregatedHistogram {
195 name: histogram.name.clone(),
196 help: histogram.help.clone(),
197 labels,
198 sum: histogram.sum,
199 count: histogram.count,
200 buckets: histogram.buckets.clone(),
201 last_updated: now,
202 },
203 );
204 }
205 }
206
207 pub fn expire_old_metrics(&self) {
209 let now = Instant::now();
210 let max_age = self.config.max_age;
211
212 self.counters
213 .write()
214 .retain(|_, v| now.duration_since(v.last_updated) < max_age);
215 self.gauges
216 .write()
217 .retain(|_, v| now.duration_since(v.last_updated) < max_age);
218 self.histograms
219 .write()
220 .retain(|_, v| now.duration_since(v.last_updated) < max_age);
221 }
222
223 pub fn series_count(&self) -> usize {
225 self.counters.read().len() + self.gauges.read().len() + self.histograms.read().len()
226 }
227
228 pub fn active_agents(&self) -> Vec<String> {
230 self.last_report.read().keys().cloned().collect()
231 }
232
233 pub fn export_prometheus(&self) -> String {
235 let mut output = String::new();
236
237 let counters = self.counters.read();
239 let mut counter_names: Vec<_> = counters.values().map(|c| &c.name).collect();
240 counter_names.sort();
241 counter_names.dedup();
242
243 for name in counter_names {
244 let metrics: Vec<_> = counters.values().filter(|c| &c.name == name).collect();
245 if let Some(first) = metrics.first() {
246 if let Some(help) = &first.help {
247 output.push_str(&format!("# HELP {} {}\n", name, help));
248 }
249 output.push_str(&format!("# TYPE {} counter\n", name));
250 }
251 for metric in metrics {
252 output.push_str(&format_metric_line(name, &metric.labels, metric.value as f64));
253 }
254 }
255
256 let gauges = self.gauges.read();
258 let mut gauge_names: Vec<_> = gauges.values().map(|g| &g.name).collect();
259 gauge_names.sort();
260 gauge_names.dedup();
261
262 for name in gauge_names {
263 let metrics: Vec<_> = gauges.values().filter(|g| &g.name == name).collect();
264 if let Some(first) = metrics.first() {
265 if let Some(help) = &first.help {
266 output.push_str(&format!("# HELP {} {}\n", name, help));
267 }
268 output.push_str(&format!("# TYPE {} gauge\n", name));
269 }
270 for metric in metrics {
271 output.push_str(&format_metric_line(name, &metric.labels, metric.value));
272 }
273 }
274
275 let histograms = self.histograms.read();
277 let mut histogram_names: Vec<_> = histograms.values().map(|h| &h.name).collect();
278 histogram_names.sort();
279 histogram_names.dedup();
280
281 for name in histogram_names {
282 let metrics: Vec<_> = histograms.values().filter(|h| &h.name == name).collect();
283 if let Some(first) = metrics.first() {
284 if let Some(help) = &first.help {
285 output.push_str(&format!("# HELP {} {}\n", name, help));
286 }
287 output.push_str(&format!("# TYPE {} histogram\n", name));
288 }
289 for metric in metrics {
290 for bucket in &metric.buckets {
292 let mut labels = metric.labels.clone();
293 labels.insert(
294 "le".to_string(),
295 if bucket.le.is_infinite() {
296 "+Inf".to_string()
297 } else {
298 bucket.le.to_string()
299 },
300 );
301 output.push_str(&format_metric_line(
302 &format!("{}_bucket", name),
303 &labels,
304 bucket.count as f64,
305 ));
306 }
307 output.push_str(&format_metric_line(
309 &format!("{}_sum", name),
310 &metric.labels,
311 metric.sum,
312 ));
313 output.push_str(&format_metric_line(
314 &format!("{}_count", name),
315 &metric.labels,
316 metric.count as f64,
317 ));
318 }
319 }
320
321 output
322 }
323
324 pub fn snapshot(&self) -> MetricsSnapshot {
326 MetricsSnapshot {
327 counters: self.counters.read().values().cloned().collect(),
328 gauges: self.gauges.read().values().cloned().collect(),
329 histograms: self.histograms.read().values().cloned().collect(),
330 timestamp: Instant::now(),
331 }
332 }
333}
334
335impl Default for MetricsCollector {
336 fn default() -> Self {
337 Self::new()
338 }
339}
340
341#[derive(Debug)]
343pub struct MetricsSnapshot {
344 counters: Vec<AggregatedCounter>,
345 gauges: Vec<AggregatedGauge>,
346 histograms: Vec<AggregatedHistogram>,
347 timestamp: Instant,
348}
349
350impl MetricsSnapshot {
351 pub fn counter_count(&self) -> usize {
353 self.counters.len()
354 }
355
356 pub fn gauge_count(&self) -> usize {
358 self.gauges.len()
359 }
360
361 pub fn histogram_count(&self) -> usize {
363 self.histograms.len()
364 }
365}
366
367#[derive(Debug)]
374pub struct UnifiedMetricsAggregator {
375 proxy_counters: RwLock<HashMap<String, ProxyCounter>>,
377 proxy_gauges: RwLock<HashMap<String, ProxyGauge>>,
379 proxy_histograms: RwLock<HashMap<String, ProxyHistogram>>,
381 agent_collector: MetricsCollector,
383 service_name: String,
385 instance_id: String,
387}
388
389#[derive(Debug, Clone)]
391struct ProxyCounter {
392 name: String,
393 help: String,
394 labels: HashMap<String, String>,
395 value: u64,
396}
397
398#[derive(Debug, Clone)]
400struct ProxyGauge {
401 name: String,
402 help: String,
403 labels: HashMap<String, String>,
404 value: f64,
405}
406
407#[derive(Debug, Clone)]
409struct ProxyHistogram {
410 name: String,
411 help: String,
412 labels: HashMap<String, String>,
413 sum: f64,
414 count: u64,
415 buckets: Vec<(f64, u64)>,
416}
417
418impl UnifiedMetricsAggregator {
419 pub fn new(service_name: impl Into<String>, instance_id: impl Into<String>) -> Self {
421 Self {
422 proxy_counters: RwLock::new(HashMap::new()),
423 proxy_gauges: RwLock::new(HashMap::new()),
424 proxy_histograms: RwLock::new(HashMap::new()),
425 agent_collector: MetricsCollector::new(),
426 service_name: service_name.into(),
427 instance_id: instance_id.into(),
428 }
429 }
430
431 pub fn with_agent_config(
433 service_name: impl Into<String>,
434 instance_id: impl Into<String>,
435 agent_config: MetricsCollectorConfig,
436 ) -> Self {
437 Self {
438 proxy_counters: RwLock::new(HashMap::new()),
439 proxy_gauges: RwLock::new(HashMap::new()),
440 proxy_histograms: RwLock::new(HashMap::new()),
441 agent_collector: MetricsCollector::with_config(agent_config),
442 service_name: service_name.into(),
443 instance_id: instance_id.into(),
444 }
445 }
446
447 pub fn agent_collector(&self) -> &MetricsCollector {
449 &self.agent_collector
450 }
451
452 pub fn increment_counter(&self, name: &str, help: &str, labels: HashMap<String, String>, delta: u64) {
454 let key = Self::metric_key(name, &labels);
455 let mut counters = self.proxy_counters.write();
456
457 if let Some(counter) = counters.get_mut(&key) {
458 counter.value += delta;
459 } else {
460 counters.insert(
461 key,
462 ProxyCounter {
463 name: name.to_string(),
464 help: help.to_string(),
465 labels,
466 value: delta,
467 },
468 );
469 }
470 }
471
472 pub fn set_gauge(&self, name: &str, help: &str, labels: HashMap<String, String>, value: f64) {
474 let key = Self::metric_key(name, &labels);
475 self.proxy_gauges.write().insert(
476 key,
477 ProxyGauge {
478 name: name.to_string(),
479 help: help.to_string(),
480 labels,
481 value,
482 },
483 );
484 }
485
486 pub fn observe_histogram(
488 &self,
489 name: &str,
490 help: &str,
491 labels: HashMap<String, String>,
492 bucket_boundaries: &[f64],
493 value: f64,
494 ) {
495 let key = Self::metric_key(name, &labels);
496 let mut histograms = self.proxy_histograms.write();
497
498 if let Some(histogram) = histograms.get_mut(&key) {
499 histogram.sum += value;
500 histogram.count += 1;
501 for (boundary, count) in histogram.buckets.iter_mut() {
503 if value <= *boundary {
504 *count += 1;
505 }
506 }
507 } else {
508 let mut buckets: Vec<(f64, u64)> = bucket_boundaries
510 .iter()
511 .map(|&b| (b, if value <= b { 1 } else { 0 }))
512 .collect();
513 buckets.push((f64::INFINITY, 1)); histograms.insert(
516 key,
517 ProxyHistogram {
518 name: name.to_string(),
519 help: help.to_string(),
520 labels,
521 sum: value,
522 count: 1,
523 buckets,
524 },
525 );
526 }
527 }
528
529 pub fn record_agent_metrics(&self, report: &MetricsReport) {
531 self.agent_collector.record(report);
532 }
533
534 pub fn export_prometheus(&self) -> String {
536 let mut output = String::new();
537
538 output.push_str(&format!(
540 "# HELP sentinel_info Sentinel proxy information\n# TYPE sentinel_info gauge\n"
541 ));
542 output.push_str(&format!(
543 "sentinel_info{{service=\"{}\",instance=\"{}\"}} 1\n",
544 escape_label_value(&self.service_name),
545 escape_label_value(&self.instance_id)
546 ));
547
548 let counters = self.proxy_counters.read();
550 let mut counter_names: Vec<_> = counters.values().map(|c| &c.name).collect();
551 counter_names.sort();
552 counter_names.dedup();
553
554 for name in counter_names {
555 let metrics: Vec<_> = counters.values().filter(|c| &c.name == name).collect();
556 if let Some(first) = metrics.first() {
557 output.push_str(&format!("# HELP {} {}\n", name, first.help));
558 output.push_str(&format!("# TYPE {} counter\n", name));
559 }
560 for metric in metrics {
561 output.push_str(&format_metric_line(name, &metric.labels, metric.value as f64));
562 }
563 }
564
565 let gauges = self.proxy_gauges.read();
567 let mut gauge_names: Vec<_> = gauges.values().map(|g| &g.name).collect();
568 gauge_names.sort();
569 gauge_names.dedup();
570
571 for name in gauge_names {
572 let metrics: Vec<_> = gauges.values().filter(|g| &g.name == name).collect();
573 if let Some(first) = metrics.first() {
574 output.push_str(&format!("# HELP {} {}\n", name, first.help));
575 output.push_str(&format!("# TYPE {} gauge\n", name));
576 }
577 for metric in metrics {
578 output.push_str(&format_metric_line(name, &metric.labels, metric.value));
579 }
580 }
581
582 let histograms = self.proxy_histograms.read();
584 let mut histogram_names: Vec<_> = histograms.values().map(|h| &h.name).collect();
585 histogram_names.sort();
586 histogram_names.dedup();
587
588 for name in histogram_names {
589 let metrics: Vec<_> = histograms.values().filter(|h| &h.name == name).collect();
590 if let Some(first) = metrics.first() {
591 output.push_str(&format!("# HELP {} {}\n", name, first.help));
592 output.push_str(&format!("# TYPE {} histogram\n", name));
593 }
594 for metric in metrics {
595 for (le, count) in &metric.buckets {
597 let mut labels = metric.labels.clone();
598 labels.insert(
599 "le".to_string(),
600 if le.is_infinite() {
601 "+Inf".to_string()
602 } else {
603 le.to_string()
604 },
605 );
606 output.push_str(&format_metric_line(
607 &format!("{}_bucket", name),
608 &labels,
609 *count as f64,
610 ));
611 }
612 output.push_str(&format_metric_line(
614 &format!("{}_sum", name),
615 &metric.labels,
616 metric.sum,
617 ));
618 output.push_str(&format_metric_line(
619 &format!("{}_count", name),
620 &metric.labels,
621 metric.count as f64,
622 ));
623 }
624 }
625
626 output.push_str("\n# Agent metrics\n");
628 output.push_str(&self.agent_collector.export_prometheus());
629
630 output
631 }
632
633 pub fn series_count(&self) -> usize {
635 self.proxy_counters.read().len()
636 + self.proxy_gauges.read().len()
637 + self.proxy_histograms.read().len()
638 + self.agent_collector.series_count()
639 }
640
641 fn metric_key(name: &str, labels: &HashMap<String, String>) -> String {
642 let mut pairs: Vec<_> = labels.iter().collect();
643 pairs.sort_by_key(|(k, _)| *k);
644 let labels_str = pairs
645 .into_iter()
646 .map(|(k, v)| format!("{}={}", k, v))
647 .collect::<Vec<_>>()
648 .join(",");
649 format!("{}|{}", name, labels_str)
650 }
651}
652
653impl Default for UnifiedMetricsAggregator {
654 fn default() -> Self {
655 Self::new("sentinel", "default")
656 }
657}
658
659fn format_metric_line(name: &str, labels: &HashMap<String, String>, value: f64) -> String {
661 if labels.is_empty() {
662 format!("{} {}\n", name, format_value(value))
663 } else {
664 let mut pairs: Vec<_> = labels.iter().collect();
665 pairs.sort_by_key(|(k, _)| *k);
666 let labels_str = pairs
667 .into_iter()
668 .map(|(k, v)| format!("{}=\"{}\"", k, escape_label_value(v)))
669 .collect::<Vec<_>>()
670 .join(",");
671 format!("{}{{{}}} {}\n", name, labels_str, format_value(value))
672 }
673}
674
675fn format_value(v: f64) -> String {
677 if v.is_infinite() {
678 if v.is_sign_positive() {
679 "+Inf".to_string()
680 } else {
681 "-Inf".to_string()
682 }
683 } else if v.is_nan() {
684 "NaN".to_string()
685 } else if v.fract() == 0.0 {
686 format!("{}", v as i64)
687 } else {
688 format!("{}", v)
689 }
690}
691
692fn escape_label_value(s: &str) -> String {
694 s.replace('\\', "\\\\")
695 .replace('"', "\\\"")
696 .replace('\n', "\\n")
697}
698
699pub struct ConfigUpdateHandler {
701 pending: RwLock<HashMap<String, PendingUpdate>>,
703 on_rule_update: Option<Box<dyn Fn(&str, &[crate::v2::control::RuleDefinition], &[String]) -> bool + Send + Sync>>,
705 on_list_update: Option<Box<dyn Fn(&str, &[String], &[String]) -> bool + Send + Sync>>,
707}
708
709struct PendingUpdate {
710 request: ConfigUpdateRequest,
711 received_at: Instant,
712}
713
714impl ConfigUpdateHandler {
715 pub fn new() -> Self {
717 Self {
718 pending: RwLock::new(HashMap::new()),
719 on_rule_update: None,
720 on_list_update: None,
721 }
722 }
723
724 pub fn on_rule_update<F>(mut self, f: F) -> Self
726 where
727 F: Fn(&str, &[crate::v2::control::RuleDefinition], &[String]) -> bool + Send + Sync + 'static,
728 {
729 self.on_rule_update = Some(Box::new(f));
730 self
731 }
732
733 pub fn on_list_update<F>(mut self, f: F) -> Self
735 where
736 F: Fn(&str, &[String], &[String]) -> bool + Send + Sync + 'static,
737 {
738 self.on_list_update = Some(Box::new(f));
739 self
740 }
741
742 pub fn handle(&self, request: ConfigUpdateRequest) -> ConfigUpdateResponse {
744 let request_id = request.request_id.clone();
745
746 match &request.update_type {
747 ConfigUpdateType::RequestReload => {
748 self.pending.write().insert(
750 request_id.clone(),
751 PendingUpdate {
752 request,
753 received_at: Instant::now(),
754 },
755 );
756 ConfigUpdateResponse::success(request_id)
757 }
758 ConfigUpdateType::RuleUpdate { rule_set, rules, remove_rules } => {
759 if let Some(ref callback) = self.on_rule_update {
760 if callback(rule_set, rules, remove_rules) {
761 ConfigUpdateResponse::success(request_id)
762 } else {
763 ConfigUpdateResponse::failure(request_id, "Rule update rejected")
764 }
765 } else {
766 ConfigUpdateResponse::failure(request_id, "Rule updates not supported")
767 }
768 }
769 ConfigUpdateType::ListUpdate { list_id, add, remove } => {
770 if let Some(ref callback) = self.on_list_update {
771 if callback(list_id, add, remove) {
772 ConfigUpdateResponse::success(request_id)
773 } else {
774 ConfigUpdateResponse::failure(request_id, "List update rejected")
775 }
776 } else {
777 ConfigUpdateResponse::failure(request_id, "List updates not supported")
778 }
779 }
780 ConfigUpdateType::RestartRequired { reason, grace_period_ms } => {
781 tracing::warn!(
783 reason = reason,
784 grace_period_ms = grace_period_ms,
785 "Agent requested restart"
786 );
787 ConfigUpdateResponse::success(request_id)
788 }
789 ConfigUpdateType::ConfigError { error, field } => {
790 tracing::error!(
791 error = error,
792 field = ?field,
793 "Agent reported configuration error"
794 );
795 ConfigUpdateResponse::success(request_id)
796 }
797 }
798 }
799
800 pub fn pending_count(&self) -> usize {
802 self.pending.read().len()
803 }
804
805 pub fn clear_old_pending(&self, max_age: Duration) {
807 let now = Instant::now();
808 self.pending
809 .write()
810 .retain(|_, v| now.duration_since(v.received_at) < max_age);
811 }
812}
813
814impl Default for ConfigUpdateHandler {
815 fn default() -> Self {
816 Self::new()
817 }
818}
819
820impl std::fmt::Debug for ConfigUpdateHandler {
822 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
823 f.debug_struct("ConfigUpdateHandler")
824 .field("pending_count", &self.pending.read().len())
825 .field("has_rule_callback", &self.on_rule_update.is_some())
826 .field("has_list_callback", &self.on_list_update.is_some())
827 .finish()
828 }
829}
830
831#[derive(Debug)]
836pub struct ConfigPusher {
837 agents: RwLock<HashMap<String, AgentConnection>>,
839 pending_pushes: RwLock<HashMap<String, PendingPush>>,
841 config: ConfigPusherConfig,
843 sequence: std::sync::atomic::AtomicU64,
845}
846
847#[derive(Debug, Clone)]
849pub struct ConfigPusherConfig {
850 pub ack_timeout: Duration,
852 pub max_retries: usize,
854 pub retry_interval: Duration,
856 pub max_pending_per_agent: usize,
858}
859
860impl Default for ConfigPusherConfig {
861 fn default() -> Self {
862 Self {
863 ack_timeout: Duration::from_secs(10),
864 max_retries: 3,
865 retry_interval: Duration::from_secs(2),
866 max_pending_per_agent: 100,
867 }
868 }
869}
870
871#[derive(Debug, Clone)]
873pub struct AgentConnection {
874 pub agent_id: String,
876 pub name: String,
878 pub connected_at: Instant,
880 pub last_seen: Instant,
882 pub successful_pushes: u64,
884 pub failed_pushes: u64,
886 pub supports_push: bool,
888}
889
890#[derive(Debug)]
892struct PendingPush {
893 push_id: String,
894 agent_id: String,
895 update: ConfigUpdateRequest,
896 created_at: Instant,
897 last_attempt: Instant,
898 attempts: usize,
899 status: PushStatus,
900}
901
902#[derive(Debug, Clone, PartialEq, Eq)]
904pub enum PushStatus {
905 Pending,
906 Sent,
907 Acknowledged,
908 Failed { reason: String },
909 Expired,
910}
911
912#[derive(Debug)]
914pub struct PushResult {
915 pub push_id: String,
916 pub agent_id: String,
917 pub status: PushStatus,
918 pub attempts: usize,
919}
920
921impl ConfigPusher {
922 pub fn new() -> Self {
924 Self::with_config(ConfigPusherConfig::default())
925 }
926
927 pub fn with_config(config: ConfigPusherConfig) -> Self {
929 Self {
930 agents: RwLock::new(HashMap::new()),
931 pending_pushes: RwLock::new(HashMap::new()),
932 config,
933 sequence: std::sync::atomic::AtomicU64::new(1),
934 }
935 }
936
937 pub fn register_agent(&self, agent_id: impl Into<String>, name: impl Into<String>, supports_push: bool) {
939 let agent_id = agent_id.into();
940 let now = Instant::now();
941 self.agents.write().insert(
942 agent_id.clone(),
943 AgentConnection {
944 agent_id,
945 name: name.into(),
946 connected_at: now,
947 last_seen: now,
948 successful_pushes: 0,
949 failed_pushes: 0,
950 supports_push,
951 },
952 );
953 }
954
955 pub fn unregister_agent(&self, agent_id: &str) {
957 self.agents.write().remove(agent_id);
958 self.pending_pushes
960 .write()
961 .retain(|_, p| p.agent_id != agent_id);
962 }
963
964 pub fn touch_agent(&self, agent_id: &str) {
966 if let Some(agent) = self.agents.write().get_mut(agent_id) {
967 agent.last_seen = Instant::now();
968 }
969 }
970
971 pub fn connected_agents(&self) -> Vec<AgentConnection> {
973 self.agents.read().values().cloned().collect()
974 }
975
976 pub fn pushable_agents(&self) -> Vec<AgentConnection> {
978 self.agents
979 .read()
980 .values()
981 .filter(|a| a.supports_push)
982 .cloned()
983 .collect()
984 }
985
986 pub fn push_to_agent(&self, agent_id: &str, update_type: ConfigUpdateType) -> Option<String> {
988 let agents = self.agents.read();
989 let agent = agents.get(agent_id)?;
990
991 if !agent.supports_push {
992 return None;
993 }
994
995 let push_id = self.next_push_id();
996 let now = Instant::now();
997
998 let update = ConfigUpdateRequest {
999 update_type,
1000 request_id: push_id.clone(),
1001 timestamp_ms: now_ms(),
1002 };
1003
1004 self.pending_pushes.write().insert(
1005 push_id.clone(),
1006 PendingPush {
1007 push_id: push_id.clone(),
1008 agent_id: agent_id.to_string(),
1009 update,
1010 created_at: now,
1011 last_attempt: now,
1012 attempts: 1,
1013 status: PushStatus::Sent,
1014 },
1015 );
1016
1017 Some(push_id)
1018 }
1019
1020 pub fn push_to_all(&self, update_type: ConfigUpdateType) -> Vec<String> {
1022 let pushable = self.pushable_agents();
1023 let mut push_ids = Vec::with_capacity(pushable.len());
1024
1025 for agent in pushable {
1026 if let Some(push_id) = self.push_to_agent(&agent.agent_id, update_type.clone()) {
1027 push_ids.push(push_id);
1028 }
1029 }
1030
1031 push_ids
1032 }
1033
1034 pub fn acknowledge(&self, push_id: &str, accepted: bool, error: Option<String>) {
1036 let mut pending = self.pending_pushes.write();
1037 if let Some(push) = pending.get_mut(push_id) {
1038 if accepted {
1039 push.status = PushStatus::Acknowledged;
1040 if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
1042 agent.successful_pushes += 1;
1043 }
1044 } else {
1045 push.status = PushStatus::Failed {
1046 reason: error.unwrap_or_else(|| "Unknown error".to_string()),
1047 };
1048 if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
1050 agent.failed_pushes += 1;
1051 }
1052 }
1053 }
1054 }
1055
1056 pub fn get_retryable(&self) -> Vec<(String, ConfigUpdateRequest)> {
1058 let now = Instant::now();
1059 let mut retryable = Vec::new();
1060 let mut pending = self.pending_pushes.write();
1061
1062 for push in pending.values_mut() {
1063 if push.status == PushStatus::Sent
1064 && now.duration_since(push.last_attempt) >= self.config.retry_interval
1065 && push.attempts < self.config.max_retries
1066 {
1067 push.attempts += 1;
1068 push.last_attempt = now;
1069 retryable.push((push.agent_id.clone(), push.update.clone()));
1070 }
1071 }
1072
1073 retryable
1074 }
1075
1076 pub fn expire_old(&self) {
1078 let now = Instant::now();
1079 let mut pending = self.pending_pushes.write();
1080
1081 for push in pending.values_mut() {
1082 if push.status == PushStatus::Sent
1083 && (now.duration_since(push.created_at) >= self.config.ack_timeout
1084 || push.attempts >= self.config.max_retries)
1085 {
1086 push.status = PushStatus::Expired;
1087 if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
1089 agent.failed_pushes += 1;
1090 }
1091 }
1092 }
1093
1094 let cleanup_age = Duration::from_secs(60);
1096 pending.retain(|_, p| {
1097 now.duration_since(p.created_at) < cleanup_age
1098 || matches!(p.status, PushStatus::Pending | PushStatus::Sent)
1099 });
1100 }
1101
1102 pub fn get_results(&self) -> Vec<PushResult> {
1104 self.pending_pushes
1105 .read()
1106 .values()
1107 .map(|p| PushResult {
1108 push_id: p.push_id.clone(),
1109 agent_id: p.agent_id.clone(),
1110 status: p.status.clone(),
1111 attempts: p.attempts,
1112 })
1113 .collect()
1114 }
1115
1116 pub fn pending_count(&self) -> usize {
1118 self.pending_pushes
1119 .read()
1120 .values()
1121 .filter(|p| matches!(p.status, PushStatus::Pending | PushStatus::Sent))
1122 .count()
1123 }
1124
1125 fn next_push_id(&self) -> String {
1126 let seq = self
1127 .sequence
1128 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1129 format!("push-{}", seq)
1130 }
1131}
1132
1133impl Default for ConfigPusher {
1134 fn default() -> Self {
1135 Self::new()
1136 }
1137}
1138
1139fn now_ms() -> u64 {
1140 std::time::SystemTime::now()
1141 .duration_since(std::time::UNIX_EPOCH)
1142 .map(|d| d.as_millis() as u64)
1143 .unwrap_or(0)
1144}
1145
1146#[cfg(test)]
1147mod tests {
1148 use super::*;
1149 use crate::v2::metrics::{standard, CounterMetric, GaugeMetric, HistogramMetric};
1150
1151 #[test]
1152 fn test_metrics_collector_basic() {
1153 let collector = MetricsCollector::new();
1154
1155 let mut report = MetricsReport::new("test-agent", 10_000);
1156 report.counters.push(CounterMetric::new(standard::REQUESTS_TOTAL, 100));
1157 report.gauges.push(GaugeMetric::new(standard::IN_FLIGHT_REQUESTS, 5.0));
1158
1159 collector.record(&report);
1160
1161 assert_eq!(collector.series_count(), 2);
1162 assert_eq!(collector.active_agents(), vec!["test-agent"]);
1163 }
1164
1165 #[test]
1166 fn test_metrics_collector_with_labels() {
1167 let collector = MetricsCollector::new();
1168
1169 let mut report = MetricsReport::new("agent-1", 10_000);
1170 let mut counter = CounterMetric::new(standard::REQUESTS_TOTAL, 50);
1171 counter.labels.insert("route".to_string(), "/api".to_string());
1172 report.counters.push(counter);
1173
1174 collector.record(&report);
1175
1176 let prometheus = collector.export_prometheus();
1177 assert!(prometheus.contains("agent_requests_total"));
1178 assert!(prometheus.contains("route=\"/api\""));
1179 assert!(prometheus.contains("agent_id=\"agent-1\""));
1180 }
1181
1182 #[test]
1183 fn test_prometheus_export() {
1184 let collector = MetricsCollector::new();
1185
1186 let mut report = MetricsReport::new("test", 10_000);
1187 let mut counter = CounterMetric::new("http_requests_total", 123);
1188 counter.help = Some("Total HTTP requests".to_string());
1189 report.counters.push(counter);
1190
1191 collector.record(&report);
1192
1193 let output = collector.export_prometheus();
1194 assert!(output.contains("# HELP http_requests_total Total HTTP requests"));
1195 assert!(output.contains("# TYPE http_requests_total counter"));
1196 assert!(output.contains("123"));
1197 }
1198
1199 #[test]
1200 fn test_histogram_export() {
1201 let config = MetricsCollectorConfig {
1203 include_agent_id_label: false,
1204 ..MetricsCollectorConfig::default()
1205 };
1206 let collector = MetricsCollector::with_config(config);
1207
1208 let mut report = MetricsReport::new("test", 10_000);
1209 report.histograms.push(HistogramMetric {
1210 name: "request_duration_seconds".to_string(),
1211 help: Some("Request duration".to_string()),
1212 labels: HashMap::new(),
1213 sum: 10.5,
1214 count: 100,
1215 buckets: vec![
1216 HistogramBucket { le: 0.1, count: 50 },
1217 HistogramBucket { le: 0.5, count: 80 },
1218 HistogramBucket { le: 1.0, count: 95 },
1219 HistogramBucket::infinity(),
1220 ],
1221 });
1222
1223 collector.record(&report);
1224
1225 let output = collector.export_prometheus();
1226 assert!(output.contains("request_duration_seconds_bucket"));
1227 assert!(output.contains("le=\"0.1\""));
1228 assert!(output.contains("le=\"+Inf\""));
1229 assert!(output.contains("request_duration_seconds_sum 10.5"));
1230 assert!(output.contains("request_duration_seconds_count 100"));
1231 }
1232
1233 #[test]
1234 fn test_config_update_handler() {
1235 let handler = ConfigUpdateHandler::new();
1236
1237 let request = ConfigUpdateRequest {
1238 update_type: ConfigUpdateType::RequestReload,
1239 request_id: "req-1".to_string(),
1240 timestamp_ms: 0,
1241 };
1242
1243 let response = handler.handle(request);
1244 assert!(response.accepted);
1245 assert_eq!(handler.pending_count(), 1);
1246 }
1247
1248 #[test]
1249 fn test_escape_label_value() {
1250 assert_eq!(escape_label_value("simple"), "simple");
1251 assert_eq!(escape_label_value("with\"quotes"), "with\\\"quotes");
1252 assert_eq!(escape_label_value("with\\backslash"), "with\\\\backslash");
1253 assert_eq!(escape_label_value("with\nnewline"), "with\\nnewline");
1254 }
1255
1256 #[test]
1257 fn test_config_pusher_basic() {
1258 let pusher = ConfigPusher::new();
1259
1260 pusher.register_agent("agent-1", "Test Agent", true);
1262
1263 let agents = pusher.connected_agents();
1264 assert_eq!(agents.len(), 1);
1265 assert_eq!(agents[0].agent_id, "agent-1");
1266 assert!(agents[0].supports_push);
1267 }
1268
1269 #[test]
1270 fn test_config_pusher_push_to_agent() {
1271 let pusher = ConfigPusher::new();
1272 pusher.register_agent("agent-1", "Test Agent", true);
1273
1274 let update_type = ConfigUpdateType::RuleUpdate {
1275 rule_set: "default".to_string(),
1276 rules: vec![],
1277 remove_rules: vec![],
1278 };
1279
1280 let push_id = pusher.push_to_agent("agent-1", update_type);
1281 assert!(push_id.is_some());
1282
1283 let push_id = push_id.unwrap();
1284 assert!(push_id.starts_with("push-"));
1285 assert_eq!(pusher.pending_count(), 1);
1286 }
1287
1288 #[test]
1289 fn test_config_pusher_acknowledge() {
1290 let pusher = ConfigPusher::new();
1291 pusher.register_agent("agent-1", "Test Agent", true);
1292
1293 let push_id = pusher
1294 .push_to_agent("agent-1", ConfigUpdateType::RequestReload)
1295 .unwrap();
1296
1297 pusher.acknowledge(&push_id, true, None);
1299
1300 let results = pusher.get_results();
1301 assert_eq!(results.len(), 1);
1302 assert_eq!(results[0].status, PushStatus::Acknowledged);
1303
1304 let agents = pusher.connected_agents();
1306 assert_eq!(agents[0].successful_pushes, 1);
1307 }
1308
1309 #[test]
1310 fn test_config_pusher_push_to_non_pushable() {
1311 let pusher = ConfigPusher::new();
1312 pusher.register_agent("agent-1", "Test Agent", false);
1313
1314 let push_id = pusher.push_to_agent("agent-1", ConfigUpdateType::RequestReload);
1315 assert!(push_id.is_none());
1316 }
1317
1318 #[test]
1319 fn test_config_pusher_push_to_all() {
1320 let pusher = ConfigPusher::new();
1321 pusher.register_agent("agent-1", "Agent 1", true);
1322 pusher.register_agent("agent-2", "Agent 2", true);
1323 pusher.register_agent("agent-3", "Agent 3", false); let push_ids = pusher.push_to_all(ConfigUpdateType::RequestReload);
1326 assert_eq!(push_ids.len(), 2);
1327 assert_eq!(pusher.pending_count(), 2);
1328 }
1329
1330 #[test]
1331 fn test_config_pusher_unregister() {
1332 let pusher = ConfigPusher::new();
1333 pusher.register_agent("agent-1", "Test Agent", true);
1334
1335 let _push_id = pusher.push_to_agent("agent-1", ConfigUpdateType::RequestReload);
1336 assert_eq!(pusher.pending_count(), 1);
1337
1338 pusher.unregister_agent("agent-1");
1339
1340 assert_eq!(pusher.connected_agents().len(), 0);
1341 assert_eq!(pusher.pending_count(), 0); }
1343
1344 #[test]
1345 fn test_metrics_snapshot() {
1346 let collector = MetricsCollector::new();
1347
1348 let mut report = MetricsReport::new("test", 10_000);
1349 report.counters.push(CounterMetric::new("requests_total", 100));
1350 report.gauges.push(GaugeMetric::new("connections", 5.0));
1351
1352 collector.record(&report);
1353
1354 let snapshot = collector.snapshot();
1355 assert_eq!(snapshot.counter_count(), 1);
1356 assert_eq!(snapshot.gauge_count(), 1);
1357 }
1358
1359 #[test]
1360 fn test_unified_aggregator_basic() {
1361 let aggregator = UnifiedMetricsAggregator::new("test-service", "instance-1");
1362
1363 aggregator.increment_counter(
1365 "http_requests_total",
1366 "Total HTTP requests",
1367 HashMap::new(),
1368 100,
1369 );
1370
1371 aggregator.set_gauge(
1373 "active_connections",
1374 "Active connections",
1375 HashMap::new(),
1376 42.0,
1377 );
1378
1379 assert_eq!(aggregator.series_count(), 2);
1380 }
1381
1382 #[test]
1383 fn test_unified_aggregator_counter_increment() {
1384 let aggregator = UnifiedMetricsAggregator::new("test", "1");
1385
1386 aggregator.increment_counter("requests", "Total requests", HashMap::new(), 10);
1387 aggregator.increment_counter("requests", "Total requests", HashMap::new(), 5);
1388
1389 let output = aggregator.export_prometheus();
1390 assert!(output.contains("requests 15"));
1391 }
1392
1393 #[test]
1394 fn test_unified_aggregator_labeled_metrics() {
1395 let aggregator = UnifiedMetricsAggregator::new("test", "1");
1396
1397 let mut labels = HashMap::new();
1398 labels.insert("method".to_string(), "GET".to_string());
1399 aggregator.increment_counter("requests", "Total requests", labels.clone(), 100);
1400
1401 let mut labels2 = HashMap::new();
1402 labels2.insert("method".to_string(), "POST".to_string());
1403 aggregator.increment_counter("requests", "Total requests", labels2, 50);
1404
1405 let output = aggregator.export_prometheus();
1406 assert!(output.contains("method=\"GET\""));
1407 assert!(output.contains("method=\"POST\""));
1408 }
1409
1410 #[test]
1411 fn test_unified_aggregator_histogram() {
1412 let aggregator = UnifiedMetricsAggregator::new("test", "1");
1413 let buckets = vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0];
1414
1415 aggregator.observe_histogram("request_duration", "Request duration", HashMap::new(), &buckets, 0.05);
1417 aggregator.observe_histogram("request_duration", "Request duration", HashMap::new(), &buckets, 0.2);
1418 aggregator.observe_histogram("request_duration", "Request duration", HashMap::new(), &buckets, 1.5);
1419
1420 let output = aggregator.export_prometheus();
1421 assert!(output.contains("request_duration_bucket"));
1422 assert!(output.contains("request_duration_sum"));
1423 assert!(output.contains("request_duration_count 3"));
1424 }
1425
1426 #[test]
1427 fn test_unified_aggregator_with_agent_metrics() {
1428 let aggregator = UnifiedMetricsAggregator::new("test", "1");
1429
1430 aggregator.increment_counter("proxy_requests", "Proxy requests", HashMap::new(), 1000);
1432
1433 let mut report = MetricsReport::new("waf-agent", 5_000);
1435 report.counters.push(CounterMetric::new("waf_blocked", 50));
1436 aggregator.record_agent_metrics(&report);
1437
1438 let output = aggregator.export_prometheus();
1439 assert!(output.contains("proxy_requests 1000"));
1440 assert!(output.contains("waf_blocked"));
1441 assert!(output.contains("Agent metrics"));
1442 }
1443
1444 #[test]
1445 fn test_unified_aggregator_service_info() {
1446 let aggregator = UnifiedMetricsAggregator::new("my-service", "node-42");
1447
1448 let output = aggregator.export_prometheus();
1449 assert!(output.contains("sentinel_info"));
1450 assert!(output.contains("service=\"my-service\""));
1451 assert!(output.contains("instance=\"node-42\""));
1452 }
1453}