1use crate::v2::control::{ConfigUpdateRequest, ConfigUpdateResponse, ConfigUpdateType};
10use crate::v2::metrics::{HistogramBucket, MetricsReport};
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::time::{Duration, Instant};
14
15#[derive(Debug)]
17pub struct MetricsCollector {
18 counters: RwLock<HashMap<MetricKey, AggregatedCounter>>,
20 gauges: RwLock<HashMap<MetricKey, AggregatedGauge>>,
22 histograms: RwLock<HashMap<MetricKey, AggregatedHistogram>>,
24 last_report: RwLock<HashMap<String, Instant>>,
26 config: MetricsCollectorConfig,
28}
29
30#[derive(Debug, Clone)]
32pub struct MetricsCollectorConfig {
33 pub max_age: Duration,
35 pub max_series: usize,
37 pub include_agent_id_label: bool,
39}
40
41impl Default for MetricsCollectorConfig {
42 fn default() -> Self {
43 Self {
44 max_age: Duration::from_secs(300), max_series: 10_000,
46 include_agent_id_label: true,
47 }
48 }
49}
50
51#[derive(Debug, Clone, Hash, PartialEq, Eq)]
53struct MetricKey {
54 agent_id: String,
55 name: String,
56 labels_key: String,
57}
58
59impl MetricKey {
60 fn new(agent_id: &str, name: &str, labels: &HashMap<String, String>) -> Self {
61 let labels_key = Self::labels_to_key(labels);
62 Self {
63 agent_id: agent_id.to_string(),
64 name: name.to_string(),
65 labels_key,
66 }
67 }
68
69 fn labels_to_key(labels: &HashMap<String, String>) -> String {
70 let mut pairs: Vec<_> = labels.iter().collect();
71 pairs.sort_by_key(|(k, _)| *k);
72 pairs
73 .into_iter()
74 .map(|(k, v)| format!("{}={}", k, v))
75 .collect::<Vec<_>>()
76 .join(",")
77 }
78}
79
80#[derive(Debug, Clone)]
82struct AggregatedCounter {
83 name: String,
84 help: Option<String>,
85 labels: HashMap<String, String>,
86 value: u64,
87 last_updated: Instant,
88}
89
90#[derive(Debug, Clone)]
92struct AggregatedGauge {
93 name: String,
94 help: Option<String>,
95 labels: HashMap<String, String>,
96 value: f64,
97 last_updated: Instant,
98}
99
100#[derive(Debug, Clone)]
102struct AggregatedHistogram {
103 name: String,
104 help: Option<String>,
105 labels: HashMap<String, String>,
106 sum: f64,
107 count: u64,
108 buckets: Vec<HistogramBucket>,
109 last_updated: Instant,
110}
111
112impl MetricsCollector {
113 pub fn new() -> Self {
115 Self::with_config(MetricsCollectorConfig::default())
116 }
117
118 pub fn with_config(config: MetricsCollectorConfig) -> Self {
120 Self {
121 counters: RwLock::new(HashMap::new()),
122 gauges: RwLock::new(HashMap::new()),
123 histograms: RwLock::new(HashMap::new()),
124 last_report: RwLock::new(HashMap::new()),
125 config,
126 }
127 }
128
129 pub fn record(&self, report: &MetricsReport) {
131 let now = Instant::now();
132
133 self.last_report
135 .write()
136 .insert(report.agent_id.clone(), now);
137
138 for counter in &report.counters {
140 let mut labels = counter.labels.clone();
141 if self.config.include_agent_id_label {
142 labels.insert("agent_id".to_string(), report.agent_id.clone());
143 }
144
145 let key = MetricKey::new(&report.agent_id, &counter.name, &labels);
146
147 let mut counters = self.counters.write();
148 counters.insert(
149 key,
150 AggregatedCounter {
151 name: counter.name.clone(),
152 help: counter.help.clone(),
153 labels,
154 value: counter.value,
155 last_updated: now,
156 },
157 );
158 }
159
160 for gauge in &report.gauges {
162 let mut labels = gauge.labels.clone();
163 if self.config.include_agent_id_label {
164 labels.insert("agent_id".to_string(), report.agent_id.clone());
165 }
166
167 let key = MetricKey::new(&report.agent_id, &gauge.name, &labels);
168
169 let mut gauges = self.gauges.write();
170 gauges.insert(
171 key,
172 AggregatedGauge {
173 name: gauge.name.clone(),
174 help: gauge.help.clone(),
175 labels,
176 value: gauge.value,
177 last_updated: now,
178 },
179 );
180 }
181
182 for histogram in &report.histograms {
184 let mut labels = histogram.labels.clone();
185 if self.config.include_agent_id_label {
186 labels.insert("agent_id".to_string(), report.agent_id.clone());
187 }
188
189 let key = MetricKey::new(&report.agent_id, &histogram.name, &labels);
190
191 let mut histograms = self.histograms.write();
192 histograms.insert(
193 key,
194 AggregatedHistogram {
195 name: histogram.name.clone(),
196 help: histogram.help.clone(),
197 labels,
198 sum: histogram.sum,
199 count: histogram.count,
200 buckets: histogram.buckets.clone(),
201 last_updated: now,
202 },
203 );
204 }
205 }
206
207 pub fn expire_old_metrics(&self) {
209 let now = Instant::now();
210 let max_age = self.config.max_age;
211
212 self.counters
213 .write()
214 .retain(|_, v| now.duration_since(v.last_updated) < max_age);
215 self.gauges
216 .write()
217 .retain(|_, v| now.duration_since(v.last_updated) < max_age);
218 self.histograms
219 .write()
220 .retain(|_, v| now.duration_since(v.last_updated) < max_age);
221 }
222
223 pub fn series_count(&self) -> usize {
225 self.counters.read().len() + self.gauges.read().len() + self.histograms.read().len()
226 }
227
228 pub fn active_agents(&self) -> Vec<String> {
230 self.last_report.read().keys().cloned().collect()
231 }
232
233 pub fn export_prometheus(&self) -> String {
235 let mut output = String::new();
236
237 let counters = self.counters.read();
239 let mut counter_names: Vec<_> = counters.values().map(|c| &c.name).collect();
240 counter_names.sort();
241 counter_names.dedup();
242
243 for name in counter_names {
244 let metrics: Vec<_> = counters.values().filter(|c| &c.name == name).collect();
245 if let Some(first) = metrics.first() {
246 if let Some(help) = &first.help {
247 output.push_str(&format!("# HELP {} {}\n", name, help));
248 }
249 output.push_str(&format!("# TYPE {} counter\n", name));
250 }
251 for metric in metrics {
252 output.push_str(&format_metric_line(
253 name,
254 &metric.labels,
255 metric.value as f64,
256 ));
257 }
258 }
259
260 let gauges = self.gauges.read();
262 let mut gauge_names: Vec<_> = gauges.values().map(|g| &g.name).collect();
263 gauge_names.sort();
264 gauge_names.dedup();
265
266 for name in gauge_names {
267 let metrics: Vec<_> = gauges.values().filter(|g| &g.name == name).collect();
268 if let Some(first) = metrics.first() {
269 if let Some(help) = &first.help {
270 output.push_str(&format!("# HELP {} {}\n", name, help));
271 }
272 output.push_str(&format!("# TYPE {} gauge\n", name));
273 }
274 for metric in metrics {
275 output.push_str(&format_metric_line(name, &metric.labels, metric.value));
276 }
277 }
278
279 let histograms = self.histograms.read();
281 let mut histogram_names: Vec<_> = histograms.values().map(|h| &h.name).collect();
282 histogram_names.sort();
283 histogram_names.dedup();
284
285 for name in histogram_names {
286 let metrics: Vec<_> = histograms.values().filter(|h| &h.name == name).collect();
287 if let Some(first) = metrics.first() {
288 if let Some(help) = &first.help {
289 output.push_str(&format!("# HELP {} {}\n", name, help));
290 }
291 output.push_str(&format!("# TYPE {} histogram\n", name));
292 }
293 for metric in metrics {
294 for bucket in &metric.buckets {
296 let mut labels = metric.labels.clone();
297 labels.insert(
298 "le".to_string(),
299 if bucket.le.is_infinite() {
300 "+Inf".to_string()
301 } else {
302 bucket.le.to_string()
303 },
304 );
305 output.push_str(&format_metric_line(
306 &format!("{}_bucket", name),
307 &labels,
308 bucket.count as f64,
309 ));
310 }
311 output.push_str(&format_metric_line(
313 &format!("{}_sum", name),
314 &metric.labels,
315 metric.sum,
316 ));
317 output.push_str(&format_metric_line(
318 &format!("{}_count", name),
319 &metric.labels,
320 metric.count as f64,
321 ));
322 }
323 }
324
325 output
326 }
327
328 pub fn snapshot(&self) -> MetricsSnapshot {
330 MetricsSnapshot {
331 counters: self.counters.read().values().cloned().collect(),
332 gauges: self.gauges.read().values().cloned().collect(),
333 histograms: self.histograms.read().values().cloned().collect(),
334 timestamp: Instant::now(),
335 }
336 }
337}
338
339impl Default for MetricsCollector {
340 fn default() -> Self {
341 Self::new()
342 }
343}
344
345#[derive(Debug)]
347pub struct MetricsSnapshot {
348 counters: Vec<AggregatedCounter>,
349 gauges: Vec<AggregatedGauge>,
350 histograms: Vec<AggregatedHistogram>,
351 timestamp: Instant,
352}
353
354impl MetricsSnapshot {
355 pub fn counter_count(&self) -> usize {
357 self.counters.len()
358 }
359
360 pub fn gauge_count(&self) -> usize {
362 self.gauges.len()
363 }
364
365 pub fn histogram_count(&self) -> usize {
367 self.histograms.len()
368 }
369}
370
371#[derive(Debug)]
378pub struct UnifiedMetricsAggregator {
379 proxy_counters: RwLock<HashMap<String, ProxyCounter>>,
381 proxy_gauges: RwLock<HashMap<String, ProxyGauge>>,
383 proxy_histograms: RwLock<HashMap<String, ProxyHistogram>>,
385 agent_collector: MetricsCollector,
387 service_name: String,
389 instance_id: String,
391}
392
393#[derive(Debug, Clone)]
395struct ProxyCounter {
396 name: String,
397 help: String,
398 labels: HashMap<String, String>,
399 value: u64,
400}
401
402#[derive(Debug, Clone)]
404struct ProxyGauge {
405 name: String,
406 help: String,
407 labels: HashMap<String, String>,
408 value: f64,
409}
410
411#[derive(Debug, Clone)]
413struct ProxyHistogram {
414 name: String,
415 help: String,
416 labels: HashMap<String, String>,
417 sum: f64,
418 count: u64,
419 buckets: Vec<(f64, u64)>,
420}
421
422impl UnifiedMetricsAggregator {
423 pub fn new(service_name: impl Into<String>, instance_id: impl Into<String>) -> Self {
425 Self {
426 proxy_counters: RwLock::new(HashMap::new()),
427 proxy_gauges: RwLock::new(HashMap::new()),
428 proxy_histograms: RwLock::new(HashMap::new()),
429 agent_collector: MetricsCollector::new(),
430 service_name: service_name.into(),
431 instance_id: instance_id.into(),
432 }
433 }
434
435 pub fn with_agent_config(
437 service_name: impl Into<String>,
438 instance_id: impl Into<String>,
439 agent_config: MetricsCollectorConfig,
440 ) -> Self {
441 Self {
442 proxy_counters: RwLock::new(HashMap::new()),
443 proxy_gauges: RwLock::new(HashMap::new()),
444 proxy_histograms: RwLock::new(HashMap::new()),
445 agent_collector: MetricsCollector::with_config(agent_config),
446 service_name: service_name.into(),
447 instance_id: instance_id.into(),
448 }
449 }
450
451 pub fn agent_collector(&self) -> &MetricsCollector {
453 &self.agent_collector
454 }
455
456 pub fn increment_counter(
458 &self,
459 name: &str,
460 help: &str,
461 labels: HashMap<String, String>,
462 delta: u64,
463 ) {
464 let key = Self::metric_key(name, &labels);
465 let mut counters = self.proxy_counters.write();
466
467 if let Some(counter) = counters.get_mut(&key) {
468 counter.value += delta;
469 } else {
470 counters.insert(
471 key,
472 ProxyCounter {
473 name: name.to_string(),
474 help: help.to_string(),
475 labels,
476 value: delta,
477 },
478 );
479 }
480 }
481
482 pub fn set_gauge(&self, name: &str, help: &str, labels: HashMap<String, String>, value: f64) {
484 let key = Self::metric_key(name, &labels);
485 self.proxy_gauges.write().insert(
486 key,
487 ProxyGauge {
488 name: name.to_string(),
489 help: help.to_string(),
490 labels,
491 value,
492 },
493 );
494 }
495
496 pub fn observe_histogram(
498 &self,
499 name: &str,
500 help: &str,
501 labels: HashMap<String, String>,
502 bucket_boundaries: &[f64],
503 value: f64,
504 ) {
505 let key = Self::metric_key(name, &labels);
506 let mut histograms = self.proxy_histograms.write();
507
508 if let Some(histogram) = histograms.get_mut(&key) {
509 histogram.sum += value;
510 histogram.count += 1;
511 for (boundary, count) in histogram.buckets.iter_mut() {
513 if value <= *boundary {
514 *count += 1;
515 }
516 }
517 } else {
518 let mut buckets: Vec<(f64, u64)> = bucket_boundaries
520 .iter()
521 .map(|&b| (b, if value <= b { 1 } else { 0 }))
522 .collect();
523 buckets.push((f64::INFINITY, 1)); histograms.insert(
526 key,
527 ProxyHistogram {
528 name: name.to_string(),
529 help: help.to_string(),
530 labels,
531 sum: value,
532 count: 1,
533 buckets,
534 },
535 );
536 }
537 }
538
539 pub fn record_agent_metrics(&self, report: &MetricsReport) {
541 self.agent_collector.record(report);
542 }
543
544 pub fn export_prometheus(&self) -> String {
546 let mut output = String::new();
547
548 output.push_str(
550 "# HELP sentinel_info Sentinel proxy information\n# TYPE sentinel_info gauge\n",
551 );
552 output.push_str(&format!(
553 "sentinel_info{{service=\"{}\",instance=\"{}\"}} 1\n",
554 escape_label_value(&self.service_name),
555 escape_label_value(&self.instance_id)
556 ));
557
558 let counters = self.proxy_counters.read();
560 let mut counter_names: Vec<_> = counters.values().map(|c| &c.name).collect();
561 counter_names.sort();
562 counter_names.dedup();
563
564 for name in counter_names {
565 let metrics: Vec<_> = counters.values().filter(|c| &c.name == name).collect();
566 if let Some(first) = metrics.first() {
567 output.push_str(&format!("# HELP {} {}\n", name, first.help));
568 output.push_str(&format!("# TYPE {} counter\n", name));
569 }
570 for metric in metrics {
571 output.push_str(&format_metric_line(
572 name,
573 &metric.labels,
574 metric.value as f64,
575 ));
576 }
577 }
578
579 let gauges = self.proxy_gauges.read();
581 let mut gauge_names: Vec<_> = gauges.values().map(|g| &g.name).collect();
582 gauge_names.sort();
583 gauge_names.dedup();
584
585 for name in gauge_names {
586 let metrics: Vec<_> = gauges.values().filter(|g| &g.name == name).collect();
587 if let Some(first) = metrics.first() {
588 output.push_str(&format!("# HELP {} {}\n", name, first.help));
589 output.push_str(&format!("# TYPE {} gauge\n", name));
590 }
591 for metric in metrics {
592 output.push_str(&format_metric_line(name, &metric.labels, metric.value));
593 }
594 }
595
596 let histograms = self.proxy_histograms.read();
598 let mut histogram_names: Vec<_> = histograms.values().map(|h| &h.name).collect();
599 histogram_names.sort();
600 histogram_names.dedup();
601
602 for name in histogram_names {
603 let metrics: Vec<_> = histograms.values().filter(|h| &h.name == name).collect();
604 if let Some(first) = metrics.first() {
605 output.push_str(&format!("# HELP {} {}\n", name, first.help));
606 output.push_str(&format!("# TYPE {} histogram\n", name));
607 }
608 for metric in metrics {
609 for (le, count) in &metric.buckets {
611 let mut labels = metric.labels.clone();
612 labels.insert(
613 "le".to_string(),
614 if le.is_infinite() {
615 "+Inf".to_string()
616 } else {
617 le.to_string()
618 },
619 );
620 output.push_str(&format_metric_line(
621 &format!("{}_bucket", name),
622 &labels,
623 *count as f64,
624 ));
625 }
626 output.push_str(&format_metric_line(
628 &format!("{}_sum", name),
629 &metric.labels,
630 metric.sum,
631 ));
632 output.push_str(&format_metric_line(
633 &format!("{}_count", name),
634 &metric.labels,
635 metric.count as f64,
636 ));
637 }
638 }
639
640 output.push_str("\n# Agent metrics\n");
642 output.push_str(&self.agent_collector.export_prometheus());
643
644 output
645 }
646
647 pub fn series_count(&self) -> usize {
649 self.proxy_counters.read().len()
650 + self.proxy_gauges.read().len()
651 + self.proxy_histograms.read().len()
652 + self.agent_collector.series_count()
653 }
654
655 fn metric_key(name: &str, labels: &HashMap<String, String>) -> String {
656 let mut pairs: Vec<_> = labels.iter().collect();
657 pairs.sort_by_key(|(k, _)| *k);
658 let labels_str = pairs
659 .into_iter()
660 .map(|(k, v)| format!("{}={}", k, v))
661 .collect::<Vec<_>>()
662 .join(",");
663 format!("{}|{}", name, labels_str)
664 }
665}
666
667impl Default for UnifiedMetricsAggregator {
668 fn default() -> Self {
669 Self::new("sentinel", "default")
670 }
671}
672
673fn format_metric_line(name: &str, labels: &HashMap<String, String>, value: f64) -> String {
675 if labels.is_empty() {
676 format!("{} {}\n", name, format_value(value))
677 } else {
678 let mut pairs: Vec<_> = labels.iter().collect();
679 pairs.sort_by_key(|(k, _)| *k);
680 let labels_str = pairs
681 .into_iter()
682 .map(|(k, v)| format!("{}=\"{}\"", k, escape_label_value(v)))
683 .collect::<Vec<_>>()
684 .join(",");
685 format!("{}{{{}}} {}\n", name, labels_str, format_value(value))
686 }
687}
688
689fn format_value(v: f64) -> String {
691 if v.is_infinite() {
692 if v.is_sign_positive() {
693 "+Inf".to_string()
694 } else {
695 "-Inf".to_string()
696 }
697 } else if v.is_nan() {
698 "NaN".to_string()
699 } else if v.fract() == 0.0 {
700 format!("{}", v as i64)
701 } else {
702 format!("{}", v)
703 }
704}
705
706fn escape_label_value(s: &str) -> String {
708 s.replace('\\', "\\\\")
709 .replace('"', "\\\"")
710 .replace('\n', "\\n")
711}
712
713pub struct ConfigUpdateHandler {
715 pending: RwLock<HashMap<String, PendingUpdate>>,
717 #[allow(clippy::type_complexity)]
719 on_rule_update: Option<
720 Box<dyn Fn(&str, &[crate::v2::control::RuleDefinition], &[String]) -> bool + Send + Sync>,
721 >,
722 #[allow(clippy::type_complexity)]
724 on_list_update: Option<Box<dyn Fn(&str, &[String], &[String]) -> bool + Send + Sync>>,
725}
726
727struct PendingUpdate {
728 request: ConfigUpdateRequest,
729 received_at: Instant,
730}
731
732impl ConfigUpdateHandler {
733 pub fn new() -> Self {
735 Self {
736 pending: RwLock::new(HashMap::new()),
737 on_rule_update: None,
738 on_list_update: None,
739 }
740 }
741
742 pub fn on_rule_update<F>(mut self, f: F) -> Self
744 where
745 F: Fn(&str, &[crate::v2::control::RuleDefinition], &[String]) -> bool
746 + Send
747 + Sync
748 + 'static,
749 {
750 self.on_rule_update = Some(Box::new(f));
751 self
752 }
753
754 pub fn on_list_update<F>(mut self, f: F) -> Self
756 where
757 F: Fn(&str, &[String], &[String]) -> bool + Send + Sync + 'static,
758 {
759 self.on_list_update = Some(Box::new(f));
760 self
761 }
762
763 pub fn handle(&self, request: ConfigUpdateRequest) -> ConfigUpdateResponse {
765 let request_id = request.request_id.clone();
766
767 match &request.update_type {
768 ConfigUpdateType::RequestReload => {
769 self.pending.write().insert(
771 request_id.clone(),
772 PendingUpdate {
773 request,
774 received_at: Instant::now(),
775 },
776 );
777 ConfigUpdateResponse::success(request_id)
778 }
779 ConfigUpdateType::RuleUpdate {
780 rule_set,
781 rules,
782 remove_rules,
783 } => {
784 if let Some(ref callback) = self.on_rule_update {
785 if callback(rule_set, rules, remove_rules) {
786 ConfigUpdateResponse::success(request_id)
787 } else {
788 ConfigUpdateResponse::failure(request_id, "Rule update rejected")
789 }
790 } else {
791 ConfigUpdateResponse::failure(request_id, "Rule updates not supported")
792 }
793 }
794 ConfigUpdateType::ListUpdate {
795 list_id,
796 add,
797 remove,
798 } => {
799 if let Some(ref callback) = self.on_list_update {
800 if callback(list_id, add, remove) {
801 ConfigUpdateResponse::success(request_id)
802 } else {
803 ConfigUpdateResponse::failure(request_id, "List update rejected")
804 }
805 } else {
806 ConfigUpdateResponse::failure(request_id, "List updates not supported")
807 }
808 }
809 ConfigUpdateType::RestartRequired {
810 reason,
811 grace_period_ms,
812 } => {
813 tracing::warn!(
815 reason = reason,
816 grace_period_ms = grace_period_ms,
817 "Agent requested restart"
818 );
819 ConfigUpdateResponse::success(request_id)
820 }
821 ConfigUpdateType::ConfigError { error, field } => {
822 tracing::error!(
823 error = error,
824 field = ?field,
825 "Agent reported configuration error"
826 );
827 ConfigUpdateResponse::success(request_id)
828 }
829 }
830 }
831
832 pub fn pending_count(&self) -> usize {
834 self.pending.read().len()
835 }
836
837 pub fn clear_old_pending(&self, max_age: Duration) {
839 let now = Instant::now();
840 self.pending
841 .write()
842 .retain(|_, v| now.duration_since(v.received_at) < max_age);
843 }
844}
845
846impl Default for ConfigUpdateHandler {
847 fn default() -> Self {
848 Self::new()
849 }
850}
851
852impl std::fmt::Debug for ConfigUpdateHandler {
854 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
855 f.debug_struct("ConfigUpdateHandler")
856 .field("pending_count", &self.pending.read().len())
857 .field("has_rule_callback", &self.on_rule_update.is_some())
858 .field("has_list_callback", &self.on_list_update.is_some())
859 .finish()
860 }
861}
862
863#[derive(Debug)]
868pub struct ConfigPusher {
869 agents: RwLock<HashMap<String, AgentConnection>>,
871 pending_pushes: RwLock<HashMap<String, PendingPush>>,
873 config: ConfigPusherConfig,
875 sequence: std::sync::atomic::AtomicU64,
877}
878
879#[derive(Debug, Clone)]
881pub struct ConfigPusherConfig {
882 pub ack_timeout: Duration,
884 pub max_retries: usize,
886 pub retry_interval: Duration,
888 pub max_pending_per_agent: usize,
890}
891
892impl Default for ConfigPusherConfig {
893 fn default() -> Self {
894 Self {
895 ack_timeout: Duration::from_secs(10),
896 max_retries: 3,
897 retry_interval: Duration::from_secs(2),
898 max_pending_per_agent: 100,
899 }
900 }
901}
902
903#[derive(Debug, Clone)]
905pub struct AgentConnection {
906 pub agent_id: String,
908 pub name: String,
910 pub connected_at: Instant,
912 pub last_seen: Instant,
914 pub successful_pushes: u64,
916 pub failed_pushes: u64,
918 pub supports_push: bool,
920}
921
922#[derive(Debug)]
924struct PendingPush {
925 push_id: String,
926 agent_id: String,
927 update: ConfigUpdateRequest,
928 created_at: Instant,
929 last_attempt: Instant,
930 attempts: usize,
931 status: PushStatus,
932}
933
934#[derive(Debug, Clone, PartialEq, Eq)]
936pub enum PushStatus {
937 Pending,
938 Sent,
939 Acknowledged,
940 Failed { reason: String },
941 Expired,
942}
943
944#[derive(Debug)]
946pub struct PushResult {
947 pub push_id: String,
948 pub agent_id: String,
949 pub status: PushStatus,
950 pub attempts: usize,
951}
952
953impl ConfigPusher {
954 pub fn new() -> Self {
956 Self::with_config(ConfigPusherConfig::default())
957 }
958
959 pub fn with_config(config: ConfigPusherConfig) -> Self {
961 Self {
962 agents: RwLock::new(HashMap::new()),
963 pending_pushes: RwLock::new(HashMap::new()),
964 config,
965 sequence: std::sync::atomic::AtomicU64::new(1),
966 }
967 }
968
969 pub fn register_agent(
971 &self,
972 agent_id: impl Into<String>,
973 name: impl Into<String>,
974 supports_push: bool,
975 ) {
976 let agent_id = agent_id.into();
977 let now = Instant::now();
978 self.agents.write().insert(
979 agent_id.clone(),
980 AgentConnection {
981 agent_id,
982 name: name.into(),
983 connected_at: now,
984 last_seen: now,
985 successful_pushes: 0,
986 failed_pushes: 0,
987 supports_push,
988 },
989 );
990 }
991
992 pub fn unregister_agent(&self, agent_id: &str) {
994 self.agents.write().remove(agent_id);
995 self.pending_pushes
997 .write()
998 .retain(|_, p| p.agent_id != agent_id);
999 }
1000
1001 pub fn touch_agent(&self, agent_id: &str) {
1003 if let Some(agent) = self.agents.write().get_mut(agent_id) {
1004 agent.last_seen = Instant::now();
1005 }
1006 }
1007
1008 pub fn connected_agents(&self) -> Vec<AgentConnection> {
1010 self.agents.read().values().cloned().collect()
1011 }
1012
1013 pub fn pushable_agents(&self) -> Vec<AgentConnection> {
1015 self.agents
1016 .read()
1017 .values()
1018 .filter(|a| a.supports_push)
1019 .cloned()
1020 .collect()
1021 }
1022
1023 pub fn push_to_agent(&self, agent_id: &str, update_type: ConfigUpdateType) -> Option<String> {
1025 let agents = self.agents.read();
1026 let agent = agents.get(agent_id)?;
1027
1028 if !agent.supports_push {
1029 return None;
1030 }
1031
1032 let push_id = self.next_push_id();
1033 let now = Instant::now();
1034
1035 let update = ConfigUpdateRequest {
1036 update_type,
1037 request_id: push_id.clone(),
1038 timestamp_ms: now_ms(),
1039 };
1040
1041 self.pending_pushes.write().insert(
1042 push_id.clone(),
1043 PendingPush {
1044 push_id: push_id.clone(),
1045 agent_id: agent_id.to_string(),
1046 update,
1047 created_at: now,
1048 last_attempt: now,
1049 attempts: 1,
1050 status: PushStatus::Sent,
1051 },
1052 );
1053
1054 Some(push_id)
1055 }
1056
1057 pub fn push_to_all(&self, update_type: ConfigUpdateType) -> Vec<String> {
1059 let pushable = self.pushable_agents();
1060 let mut push_ids = Vec::with_capacity(pushable.len());
1061
1062 for agent in pushable {
1063 if let Some(push_id) = self.push_to_agent(&agent.agent_id, update_type.clone()) {
1064 push_ids.push(push_id);
1065 }
1066 }
1067
1068 push_ids
1069 }
1070
1071 pub fn acknowledge(&self, push_id: &str, accepted: bool, error: Option<String>) {
1073 let mut pending = self.pending_pushes.write();
1074 if let Some(push) = pending.get_mut(push_id) {
1075 if accepted {
1076 push.status = PushStatus::Acknowledged;
1077 if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
1079 agent.successful_pushes += 1;
1080 }
1081 } else {
1082 push.status = PushStatus::Failed {
1083 reason: error.unwrap_or_else(|| "Unknown error".to_string()),
1084 };
1085 if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
1087 agent.failed_pushes += 1;
1088 }
1089 }
1090 }
1091 }
1092
1093 pub fn get_retryable(&self) -> Vec<(String, ConfigUpdateRequest)> {
1095 let now = Instant::now();
1096 let mut retryable = Vec::new();
1097 let mut pending = self.pending_pushes.write();
1098
1099 for push in pending.values_mut() {
1100 if push.status == PushStatus::Sent
1101 && now.duration_since(push.last_attempt) >= self.config.retry_interval
1102 && push.attempts < self.config.max_retries
1103 {
1104 push.attempts += 1;
1105 push.last_attempt = now;
1106 retryable.push((push.agent_id.clone(), push.update.clone()));
1107 }
1108 }
1109
1110 retryable
1111 }
1112
1113 pub fn expire_old(&self) {
1115 let now = Instant::now();
1116 let mut pending = self.pending_pushes.write();
1117
1118 for push in pending.values_mut() {
1119 if push.status == PushStatus::Sent
1120 && (now.duration_since(push.created_at) >= self.config.ack_timeout
1121 || push.attempts >= self.config.max_retries)
1122 {
1123 push.status = PushStatus::Expired;
1124 if let Some(agent) = self.agents.write().get_mut(&push.agent_id) {
1126 agent.failed_pushes += 1;
1127 }
1128 }
1129 }
1130
1131 let cleanup_age = Duration::from_secs(60);
1133 pending.retain(|_, p| {
1134 now.duration_since(p.created_at) < cleanup_age
1135 || matches!(p.status, PushStatus::Pending | PushStatus::Sent)
1136 });
1137 }
1138
1139 pub fn get_results(&self) -> Vec<PushResult> {
1141 self.pending_pushes
1142 .read()
1143 .values()
1144 .map(|p| PushResult {
1145 push_id: p.push_id.clone(),
1146 agent_id: p.agent_id.clone(),
1147 status: p.status.clone(),
1148 attempts: p.attempts,
1149 })
1150 .collect()
1151 }
1152
1153 pub fn pending_count(&self) -> usize {
1155 self.pending_pushes
1156 .read()
1157 .values()
1158 .filter(|p| matches!(p.status, PushStatus::Pending | PushStatus::Sent))
1159 .count()
1160 }
1161
1162 fn next_push_id(&self) -> String {
1163 let seq = self
1164 .sequence
1165 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1166 format!("push-{}", seq)
1167 }
1168}
1169
1170impl Default for ConfigPusher {
1171 fn default() -> Self {
1172 Self::new()
1173 }
1174}
1175
1176fn now_ms() -> u64 {
1177 std::time::SystemTime::now()
1178 .duration_since(std::time::UNIX_EPOCH)
1179 .map(|d| d.as_millis() as u64)
1180 .unwrap_or(0)
1181}
1182
1183#[cfg(test)]
1184mod tests {
1185 use super::*;
1186 use crate::v2::metrics::{standard, CounterMetric, GaugeMetric, HistogramMetric};
1187
1188 #[test]
1189 fn test_metrics_collector_basic() {
1190 let collector = MetricsCollector::new();
1191
1192 let mut report = MetricsReport::new("test-agent", 10_000);
1193 report
1194 .counters
1195 .push(CounterMetric::new(standard::REQUESTS_TOTAL, 100));
1196 report
1197 .gauges
1198 .push(GaugeMetric::new(standard::IN_FLIGHT_REQUESTS, 5.0));
1199
1200 collector.record(&report);
1201
1202 assert_eq!(collector.series_count(), 2);
1203 assert_eq!(collector.active_agents(), vec!["test-agent"]);
1204 }
1205
1206 #[test]
1207 fn test_metrics_collector_with_labels() {
1208 let collector = MetricsCollector::new();
1209
1210 let mut report = MetricsReport::new("agent-1", 10_000);
1211 let mut counter = CounterMetric::new(standard::REQUESTS_TOTAL, 50);
1212 counter
1213 .labels
1214 .insert("route".to_string(), "/api".to_string());
1215 report.counters.push(counter);
1216
1217 collector.record(&report);
1218
1219 let prometheus = collector.export_prometheus();
1220 assert!(prometheus.contains("agent_requests_total"));
1221 assert!(prometheus.contains("route=\"/api\""));
1222 assert!(prometheus.contains("agent_id=\"agent-1\""));
1223 }
1224
1225 #[test]
1226 fn test_prometheus_export() {
1227 let collector = MetricsCollector::new();
1228
1229 let mut report = MetricsReport::new("test", 10_000);
1230 let mut counter = CounterMetric::new("http_requests_total", 123);
1231 counter.help = Some("Total HTTP requests".to_string());
1232 report.counters.push(counter);
1233
1234 collector.record(&report);
1235
1236 let output = collector.export_prometheus();
1237 assert!(output.contains("# HELP http_requests_total Total HTTP requests"));
1238 assert!(output.contains("# TYPE http_requests_total counter"));
1239 assert!(output.contains("123"));
1240 }
1241
1242 #[test]
1243 fn test_histogram_export() {
1244 let config = MetricsCollectorConfig {
1246 include_agent_id_label: false,
1247 ..MetricsCollectorConfig::default()
1248 };
1249 let collector = MetricsCollector::with_config(config);
1250
1251 let mut report = MetricsReport::new("test", 10_000);
1252 report.histograms.push(HistogramMetric {
1253 name: "request_duration_seconds".to_string(),
1254 help: Some("Request duration".to_string()),
1255 labels: HashMap::new(),
1256 sum: 10.5,
1257 count: 100,
1258 buckets: vec![
1259 HistogramBucket { le: 0.1, count: 50 },
1260 HistogramBucket { le: 0.5, count: 80 },
1261 HistogramBucket { le: 1.0, count: 95 },
1262 HistogramBucket::infinity(),
1263 ],
1264 });
1265
1266 collector.record(&report);
1267
1268 let output = collector.export_prometheus();
1269 assert!(output.contains("request_duration_seconds_bucket"));
1270 assert!(output.contains("le=\"0.1\""));
1271 assert!(output.contains("le=\"+Inf\""));
1272 assert!(output.contains("request_duration_seconds_sum 10.5"));
1273 assert!(output.contains("request_duration_seconds_count 100"));
1274 }
1275
1276 #[test]
1277 fn test_config_update_handler() {
1278 let handler = ConfigUpdateHandler::new();
1279
1280 let request = ConfigUpdateRequest {
1281 update_type: ConfigUpdateType::RequestReload,
1282 request_id: "req-1".to_string(),
1283 timestamp_ms: 0,
1284 };
1285
1286 let response = handler.handle(request);
1287 assert!(response.accepted);
1288 assert_eq!(handler.pending_count(), 1);
1289 }
1290
1291 #[test]
1292 fn test_escape_label_value() {
1293 assert_eq!(escape_label_value("simple"), "simple");
1294 assert_eq!(escape_label_value("with\"quotes"), "with\\\"quotes");
1295 assert_eq!(escape_label_value("with\\backslash"), "with\\\\backslash");
1296 assert_eq!(escape_label_value("with\nnewline"), "with\\nnewline");
1297 }
1298
1299 #[test]
1300 fn test_config_pusher_basic() {
1301 let pusher = ConfigPusher::new();
1302
1303 pusher.register_agent("agent-1", "Test Agent", true);
1305
1306 let agents = pusher.connected_agents();
1307 assert_eq!(agents.len(), 1);
1308 assert_eq!(agents[0].agent_id, "agent-1");
1309 assert!(agents[0].supports_push);
1310 }
1311
1312 #[test]
1313 fn test_config_pusher_push_to_agent() {
1314 let pusher = ConfigPusher::new();
1315 pusher.register_agent("agent-1", "Test Agent", true);
1316
1317 let update_type = ConfigUpdateType::RuleUpdate {
1318 rule_set: "default".to_string(),
1319 rules: vec![],
1320 remove_rules: vec![],
1321 };
1322
1323 let push_id = pusher.push_to_agent("agent-1", update_type);
1324 assert!(push_id.is_some());
1325
1326 let push_id = push_id.unwrap();
1327 assert!(push_id.starts_with("push-"));
1328 assert_eq!(pusher.pending_count(), 1);
1329 }
1330
1331 #[test]
1332 fn test_config_pusher_acknowledge() {
1333 let pusher = ConfigPusher::new();
1334 pusher.register_agent("agent-1", "Test Agent", true);
1335
1336 let push_id = pusher
1337 .push_to_agent("agent-1", ConfigUpdateType::RequestReload)
1338 .unwrap();
1339
1340 pusher.acknowledge(&push_id, true, None);
1342
1343 let results = pusher.get_results();
1344 assert_eq!(results.len(), 1);
1345 assert_eq!(results[0].status, PushStatus::Acknowledged);
1346
1347 let agents = pusher.connected_agents();
1349 assert_eq!(agents[0].successful_pushes, 1);
1350 }
1351
1352 #[test]
1353 fn test_config_pusher_push_to_non_pushable() {
1354 let pusher = ConfigPusher::new();
1355 pusher.register_agent("agent-1", "Test Agent", false);
1356
1357 let push_id = pusher.push_to_agent("agent-1", ConfigUpdateType::RequestReload);
1358 assert!(push_id.is_none());
1359 }
1360
1361 #[test]
1362 fn test_config_pusher_push_to_all() {
1363 let pusher = ConfigPusher::new();
1364 pusher.register_agent("agent-1", "Agent 1", true);
1365 pusher.register_agent("agent-2", "Agent 2", true);
1366 pusher.register_agent("agent-3", "Agent 3", false); let push_ids = pusher.push_to_all(ConfigUpdateType::RequestReload);
1369 assert_eq!(push_ids.len(), 2);
1370 assert_eq!(pusher.pending_count(), 2);
1371 }
1372
1373 #[test]
1374 fn test_config_pusher_unregister() {
1375 let pusher = ConfigPusher::new();
1376 pusher.register_agent("agent-1", "Test Agent", true);
1377
1378 let _push_id = pusher.push_to_agent("agent-1", ConfigUpdateType::RequestReload);
1379 assert_eq!(pusher.pending_count(), 1);
1380
1381 pusher.unregister_agent("agent-1");
1382
1383 assert_eq!(pusher.connected_agents().len(), 0);
1384 assert_eq!(pusher.pending_count(), 0); }
1386
1387 #[test]
1388 fn test_metrics_snapshot() {
1389 let collector = MetricsCollector::new();
1390
1391 let mut report = MetricsReport::new("test", 10_000);
1392 report
1393 .counters
1394 .push(CounterMetric::new("requests_total", 100));
1395 report.gauges.push(GaugeMetric::new("connections", 5.0));
1396
1397 collector.record(&report);
1398
1399 let snapshot = collector.snapshot();
1400 assert_eq!(snapshot.counter_count(), 1);
1401 assert_eq!(snapshot.gauge_count(), 1);
1402 }
1403
1404 #[test]
1405 fn test_unified_aggregator_basic() {
1406 let aggregator = UnifiedMetricsAggregator::new("test-service", "instance-1");
1407
1408 aggregator.increment_counter(
1410 "http_requests_total",
1411 "Total HTTP requests",
1412 HashMap::new(),
1413 100,
1414 );
1415
1416 aggregator.set_gauge(
1418 "active_connections",
1419 "Active connections",
1420 HashMap::new(),
1421 42.0,
1422 );
1423
1424 assert_eq!(aggregator.series_count(), 2);
1425 }
1426
1427 #[test]
1428 fn test_unified_aggregator_counter_increment() {
1429 let aggregator = UnifiedMetricsAggregator::new("test", "1");
1430
1431 aggregator.increment_counter("requests", "Total requests", HashMap::new(), 10);
1432 aggregator.increment_counter("requests", "Total requests", HashMap::new(), 5);
1433
1434 let output = aggregator.export_prometheus();
1435 assert!(output.contains("requests 15"));
1436 }
1437
1438 #[test]
1439 fn test_unified_aggregator_labeled_metrics() {
1440 let aggregator = UnifiedMetricsAggregator::new("test", "1");
1441
1442 let mut labels = HashMap::new();
1443 labels.insert("method".to_string(), "GET".to_string());
1444 aggregator.increment_counter("requests", "Total requests", labels.clone(), 100);
1445
1446 let mut labels2 = HashMap::new();
1447 labels2.insert("method".to_string(), "POST".to_string());
1448 aggregator.increment_counter("requests", "Total requests", labels2, 50);
1449
1450 let output = aggregator.export_prometheus();
1451 assert!(output.contains("method=\"GET\""));
1452 assert!(output.contains("method=\"POST\""));
1453 }
1454
1455 #[test]
1456 fn test_unified_aggregator_histogram() {
1457 let aggregator = UnifiedMetricsAggregator::new("test", "1");
1458 let buckets = vec![
1459 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
1460 ];
1461
1462 aggregator.observe_histogram(
1464 "request_duration",
1465 "Request duration",
1466 HashMap::new(),
1467 &buckets,
1468 0.05,
1469 );
1470 aggregator.observe_histogram(
1471 "request_duration",
1472 "Request duration",
1473 HashMap::new(),
1474 &buckets,
1475 0.2,
1476 );
1477 aggregator.observe_histogram(
1478 "request_duration",
1479 "Request duration",
1480 HashMap::new(),
1481 &buckets,
1482 1.5,
1483 );
1484
1485 let output = aggregator.export_prometheus();
1486 assert!(output.contains("request_duration_bucket"));
1487 assert!(output.contains("request_duration_sum"));
1488 assert!(output.contains("request_duration_count 3"));
1489 }
1490
1491 #[test]
1492 fn test_unified_aggregator_with_agent_metrics() {
1493 let aggregator = UnifiedMetricsAggregator::new("test", "1");
1494
1495 aggregator.increment_counter("proxy_requests", "Proxy requests", HashMap::new(), 1000);
1497
1498 let mut report = MetricsReport::new("waf-agent", 5_000);
1500 report.counters.push(CounterMetric::new("waf_blocked", 50));
1501 aggregator.record_agent_metrics(&report);
1502
1503 let output = aggregator.export_prometheus();
1504 assert!(output.contains("proxy_requests 1000"));
1505 assert!(output.contains("waf_blocked"));
1506 assert!(output.contains("Agent metrics"));
1507 }
1508
1509 #[test]
1510 fn test_unified_aggregator_service_info() {
1511 let aggregator = UnifiedMetricsAggregator::new("my-service", "node-42");
1512
1513 let output = aggregator.export_prometheus();
1514 assert!(output.contains("sentinel_info"));
1515 assert!(output.contains("service=\"my-service\""));
1516 assert!(output.contains("instance=\"node-42\""));
1517 }
1518}