1use crate::actor::{ActorManager, ActorState};
7use crate::crawler::CrawlerDetector;
8use crate::interrogator::{ProgressionManager, ProgressionStatsSnapshot};
9use crate::shadow::{ShadowMirrorManager, ShadowMirrorStats};
10use crate::tarpit::{TarpitManager, TarpitStats};
11use crate::trends::{Anomaly, TrendsManager};
12use crate::tunnel::TunnelChannel;
13use parking_lot::RwLock;
14use std::collections::{HashMap, VecDeque};
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use crate::block_log::{BlockEvent, BlockLog};
20use crate::entity::{EntityManager, EntitySnapshot};
21
22#[derive(Clone, Default)]
24pub struct MetricsSnapshot {
25 pub uptime_secs: u64,
26 pub total_requests: u64,
27 pub total_blocked: u64,
28 pub active_requests: u64,
29 pub avg_latency_ms: f64,
30 pub avg_waf_detection_us: f64,
31 pub request_history: Vec<u64>,
32 pub top_rules: Vec<(String, u64)>,
33 pub backend_status: Vec<(String, BackendMetrics)>,
34 pub top_crawlers: Vec<(String, u64)>,
35 pub top_bad_bots: Vec<(String, u64)>,
36 pub top_risky_actors: Vec<ActorState>,
37 pub top_ja4_clusters: Vec<(String, Vec<String>, f64)>,
38 pub top_dlp_hits: Vec<(String, u64)>,
39 pub tarpit_stats: Option<TarpitStats>,
40 pub progression_stats: Option<ProgressionStatsSnapshot>,
41 pub shadow_stats: Option<ShadowMirrorStats>,
42 pub recent_geo_anomalies: Vec<Anomaly>,
43 pub top_entities: Vec<EntitySnapshot>,
44 pub recent_blocks: Vec<BlockEvent>,
45}
46
47pub trait TuiDataProvider: Send + Sync {
49 fn get_snapshot(&self) -> MetricsSnapshot;
51
52 fn reset_all(&self);
54}
55
56const MAX_METRICS_MAP_SIZE: usize = 1000;
58
59#[derive(Default)]
61pub struct MetricsRegistry {
62 pub(crate) actor_manager: RwLock<Option<Arc<ActorManager>>>,
64 pub(crate) crawler_detector: RwLock<Option<Arc<CrawlerDetector>>>,
66 pub(crate) tarpit_manager: RwLock<Option<Arc<TarpitManager>>>,
68 pub(crate) progression_manager: RwLock<Option<Arc<ProgressionManager>>>,
70 pub(crate) shadow_mirror_manager: RwLock<Option<Arc<ShadowMirrorManager>>>,
72 pub(crate) trends_manager: RwLock<Option<Arc<TrendsManager>>>,
74 pub(crate) entity_manager: RwLock<Option<Arc<EntityManager>>>,
76 pub(crate) block_log: RwLock<Option<Arc<BlockLog>>>,
78 request_counts: RequestCounters,
80 latencies: LatencyHistogram,
82 windowed_requests: WindowedCounter,
84 waf_metrics: WafMetrics,
86 shadow_metrics: ShadowMetrics,
88 profiling_metrics: ProfilingMetrics,
90 dlp_metrics: DlpMetrics,
92 signal_dispatch_metrics: SignalDispatchMetrics,
94 tunnel_metrics: TunnelMetrics,
96 active_requests: Arc<AtomicU64>,
98 backend_metrics: Arc<RwLock<HashMap<String, BackendMetrics>>>,
100 pub status_message: Arc<RwLock<Option<String>>>,
102 start_time: Option<Instant>,
104
105 last_snapshot: RwLock<Option<(Instant, MetricsSnapshot)>>,
107}
108
109impl TuiDataProvider for MetricsRegistry {
110 fn get_snapshot(&self) -> MetricsSnapshot {
111 {
113 let last = self.last_snapshot.read();
114 if let Some((ts, snap)) = &*last {
115 if ts.elapsed() < Duration::from_secs(1) {
116 return snap.clone();
117 }
118 }
119 }
120
121 let snap = MetricsSnapshot {
123 uptime_secs: self.uptime_secs(),
124 total_requests: self.total_requests(),
125 total_blocked: self.total_blocked(),
126 active_requests: self.active_requests(),
127 avg_latency_ms: self.avg_latency_ms(),
128 avg_waf_detection_us: self.avg_waf_detection_us(),
129 request_history: self.request_history(),
130 top_rules: self.top_rules(10),
131 backend_status: self.backend_status(),
132 top_crawlers: self.top_crawlers(10),
133 top_bad_bots: self.top_bad_bots(10),
134 top_risky_actors: self.top_risky_actors(10),
135 top_ja4_clusters: self.top_ja4_clusters(10),
136 top_dlp_hits: self.top_dlp_hits(10),
137 tarpit_stats: self.tarpit_stats(),
138 progression_stats: self.progression_stats(),
139 shadow_stats: self.shadow_stats(),
140 recent_geo_anomalies: self.recent_geo_anomalies(10),
141 top_entities: self
142 .entity_manager
143 .read()
144 .as_ref()
145 .map(|m| m.list_top_risk(10))
146 .unwrap_or_default(),
147 recent_blocks: self
148 .block_log
149 .read()
150 .as_ref()
151 .map(|l| l.recent(10))
152 .unwrap_or_default(),
153 };
154
155 {
157 let mut last = self.last_snapshot.write();
158 *last = Some((Instant::now(), snap.clone()));
159 }
160
161 snap
162 }
163
164 fn reset_all(&self) {
165 self.reset();
166 }
167}
168
169impl std::fmt::Debug for MetricsRegistry {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 f.debug_struct("MetricsRegistry")
172 .field("total_requests", &self.total_requests())
173 .field("active_requests", &self.active_requests())
174 .finish()
175 }
176}
177
178#[derive(Debug)]
180pub struct ActiveRequestGuard {
181 counter: Arc<AtomicU64>,
182}
183
184impl Drop for ActiveRequestGuard {
185 fn drop(&mut self) {
186 self.counter.fetch_sub(1, Ordering::Relaxed);
187 }
188}
189
190#[derive(Debug, Default)]
192pub struct DlpMetrics {
193 pub scans_total: AtomicU64,
195 pub matches_total: AtomicU64,
197 pub matches_by_type: Arc<RwLock<HashMap<String, u64>>>,
199 pub matches_by_severity: Arc<RwLock<HashMap<String, u64>>>,
201 pub violations_dropped: AtomicU64,
203 pub graph_export_durations: Arc<RwLock<VecDeque<u64>>>,
205}
206
207impl DlpMetrics {
208 pub fn record_scan(&self) {
210 self.scans_total.fetch_add(1, Ordering::Relaxed);
211 }
212
213 pub fn record_match(&self, pattern_type: &str, severity: &str) {
215 self.matches_total.fetch_add(1, Ordering::Relaxed);
216
217 let mut by_type = self.matches_by_type.write();
218 if by_type.contains_key(pattern_type) || by_type.len() < MAX_METRICS_MAP_SIZE {
219 *by_type.entry(pattern_type.to_string()).or_insert(0) += 1;
220 }
221
222 let mut by_severity = self.matches_by_severity.write();
223 if by_severity.contains_key(severity) || by_severity.len() < MAX_METRICS_MAP_SIZE {
224 *by_severity.entry(severity.to_string()).or_insert(0) += 1;
225 }
226 }
227
228 pub fn record_violation_dropped(&self) {
230 self.violations_dropped.fetch_add(1, Ordering::Relaxed);
231 }
232
233 pub fn record_graph_export_duration(&self, duration_us: u64) {
235 let mut durations = self.graph_export_durations.write();
236 if durations.len() >= 100 {
238 durations.pop_front();
239 }
240 durations.push_back(duration_us);
241 }
242}
243
244#[derive(Debug, Default)]
246pub struct SignalDispatchMetrics {
247 pub total: AtomicU64,
249 pub success: AtomicU64,
251 pub failure: AtomicU64,
253 pub timeout: AtomicU64,
255 pub latencies: LatencyHistogram,
257}
258
259impl SignalDispatchMetrics {
260 pub fn record_attempt(&self) {
262 self.total.fetch_add(1, Ordering::Relaxed);
263 }
264
265 pub fn record_success(&self, latency_us: u64) {
267 self.success.fetch_add(1, Ordering::Relaxed);
268 self.latencies.observe(latency_us);
269 }
270
271 pub fn record_failure(&self) {
273 self.failure.fetch_add(1, Ordering::Relaxed);
274 }
275
276 pub fn record_timeout(&self) {
278 self.timeout.fetch_add(1, Ordering::Relaxed);
279 }
280}
281
282const TUNNEL_CHANNEL_COUNT: usize = TunnelChannel::ALL.len();
283
284#[derive(Debug)]
286pub struct MsHistogram {
287 buckets: Vec<u64>,
289 counts: Vec<AtomicU64>,
291 sum_ms: AtomicU64,
293 count: AtomicU64,
295}
296
297impl Default for MsHistogram {
298 fn default() -> Self {
299 let buckets = vec![
300 1, 5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000, 30000, 60000, 120000, 300000,
301 ];
302 let counts = buckets.iter().map(|_| AtomicU64::new(0)).collect();
303 Self {
304 buckets,
305 counts,
306 sum_ms: AtomicU64::new(0),
307 count: AtomicU64::new(0),
308 }
309 }
310}
311
312impl MsHistogram {
313 pub fn observe_ms(&self, value_ms: u64) {
314 self.sum_ms.fetch_add(value_ms, Ordering::Relaxed);
315 self.count.fetch_add(1, Ordering::Relaxed);
316
317 for (idx, &boundary) in self.buckets.iter().enumerate() {
318 if value_ms <= boundary {
319 self.counts[idx].fetch_add(1, Ordering::Relaxed);
320 return;
321 }
322 }
323 if let Some(last) = self.counts.last() {
324 last.fetch_add(1, Ordering::Relaxed);
325 }
326 }
327
328 pub fn reset(&self) {
329 for count in &self.counts {
330 count.store(0, Ordering::Relaxed);
331 }
332 self.sum_ms.store(0, Ordering::Relaxed);
333 self.count.store(0, Ordering::Relaxed);
334 }
335}
336
337#[derive(Debug)]
339pub struct TunnelMetrics {
340 connected: AtomicU64,
341 messages_sent: AtomicU64,
342 messages_received: AtomicU64,
343 reconnect_attempts: AtomicU64,
344 reconnect_delay_ms: MsHistogram,
345 auth_timeouts: AtomicU64,
346 heartbeats_sent: AtomicU64,
347 heartbeat_timeouts: AtomicU64,
348 channel_overflows: [AtomicU64; TUNNEL_CHANNEL_COUNT],
349 handler_latency_ms: [MsHistogram; TUNNEL_CHANNEL_COUNT],
350}
351
352impl Default for TunnelMetrics {
353 fn default() -> Self {
354 Self {
355 connected: AtomicU64::new(0),
356 messages_sent: AtomicU64::new(0),
357 messages_received: AtomicU64::new(0),
358 reconnect_attempts: AtomicU64::new(0),
359 reconnect_delay_ms: MsHistogram::default(),
360 auth_timeouts: AtomicU64::new(0),
361 heartbeats_sent: AtomicU64::new(0),
362 heartbeat_timeouts: AtomicU64::new(0),
363 channel_overflows: std::array::from_fn(|_| AtomicU64::new(0)),
364 handler_latency_ms: std::array::from_fn(|_| MsHistogram::default()),
365 }
366 }
367}
368
369impl TunnelMetrics {
370 pub fn set_connected(&self, connected: bool) {
371 self.connected
372 .store(u64::from(connected), Ordering::Relaxed);
373 }
374
375 pub fn record_message_sent(&self) {
376 self.messages_sent.fetch_add(1, Ordering::Relaxed);
377 }
378
379 pub fn record_message_received(&self) {
380 self.messages_received.fetch_add(1, Ordering::Relaxed);
381 }
382
383 pub fn record_reconnect_attempt(&self, delay_ms: u64) {
384 self.reconnect_attempts.fetch_add(1, Ordering::Relaxed);
385 self.reconnect_delay_ms.observe_ms(delay_ms);
386 }
387
388 pub fn record_auth_timeout(&self) {
389 self.auth_timeouts.fetch_add(1, Ordering::Relaxed);
390 }
391
392 pub fn record_heartbeat_sent(&self) {
393 self.heartbeats_sent.fetch_add(1, Ordering::Relaxed);
394 }
395
396 pub fn record_heartbeat_timeout(&self) {
397 self.heartbeat_timeouts.fetch_add(1, Ordering::Relaxed);
398 }
399
400 pub fn record_channel_overflow(&self, channel: TunnelChannel) {
401 let idx = tunnel_channel_index(channel);
402 self.channel_overflows[idx].fetch_add(1, Ordering::Relaxed);
403 }
404
405 pub fn record_handler_latency_ms(&self, channel: TunnelChannel, latency_ms: u64) {
406 let idx = tunnel_channel_index(channel);
407 self.handler_latency_ms[idx].observe_ms(latency_ms);
408 }
409
410 fn channel_overflow_total(&self, channel: TunnelChannel) -> u64 {
411 let idx = tunnel_channel_index(channel);
412 self.channel_overflows[idx].load(Ordering::Relaxed)
413 }
414
415 fn handler_latency_hist(&self, channel: TunnelChannel) -> &MsHistogram {
416 let idx = tunnel_channel_index(channel);
417 &self.handler_latency_ms[idx]
418 }
419}
420
421fn tunnel_channel_index(channel: TunnelChannel) -> usize {
422 match channel {
423 TunnelChannel::Shell => 0,
424 TunnelChannel::Logs => 1,
425 TunnelChannel::Diag => 2,
426 TunnelChannel::Control => 3,
427 TunnelChannel::Files => 4,
428 TunnelChannel::Update => 5,
429 }
430}
431
432#[derive(Debug, Clone)]
434pub struct EndpointStats {
435 pub hit_count: u64,
437 pub first_seen: u64,
439 pub last_seen: u64,
441 pub methods: Vec<String>,
443}
444
445impl Default for EndpointStats {
446 fn default() -> Self {
447 let now = std::time::SystemTime::now()
448 .duration_since(std::time::UNIX_EPOCH)
449 .map(|d| d.as_millis() as u64)
450 .unwrap_or(0);
451 Self {
452 hit_count: 0,
453 first_seen: now,
454 last_seen: now,
455 methods: Vec::new(),
456 }
457 }
458}
459
460#[derive(Debug, Default)]
462pub struct ProfilingMetrics {
463 pub profiles_active: AtomicU64,
465 pub profiles_total: AtomicU64,
467 pub schemas_total: AtomicU64,
469 pub profile_updates_total: AtomicU64,
471 pub schema_violations_total: Arc<RwLock<HashMap<String, u64>>>,
473 pub anomalies_detected: Arc<RwLock<HashMap<String, u64>>>,
475 pub avg_anomaly_score: AtomicU64, pub requests_with_anomalies: AtomicU64,
479 pub endpoint_stats: Arc<RwLock<HashMap<String, EndpointStats>>>,
481 pub total_bytes_in: AtomicU64,
483 pub total_bytes_out: AtomicU64,
485 pub max_request_size: AtomicU64,
487 pub max_response_size: AtomicU64,
489 pub bandwidth_request_count: AtomicU64,
491 pub bandwidth_timeline: Arc<RwLock<BandwidthTimeline>>,
493}
494
495#[derive(Debug, Clone, Default)]
497pub struct BandwidthDataPoint {
498 pub timestamp: u64,
500 pub bytes_in: u64,
502 pub bytes_out: u64,
504 pub request_count: u64,
506}
507
508#[derive(Debug)]
510pub struct BandwidthTimeline {
511 pub points: Vec<BandwidthDataPoint>,
513 pub current_index: usize,
515 pub last_minute: u64,
517}
518
519impl Default for BandwidthTimeline {
520 fn default() -> Self {
521 Self {
522 points: vec![BandwidthDataPoint::default(); 60],
523 current_index: 0,
524 last_minute: 0,
525 }
526 }
527}
528
529impl ProfilingMetrics {
530 pub fn set_active_profiles(&self, count: u64) {
532 self.profiles_active.store(count, Ordering::Relaxed);
533 }
534
535 pub fn set_profiles_total(&self, count: u64) {
537 self.profiles_total.store(count, Ordering::Relaxed);
538 }
539
540 pub fn set_schemas_total(&self, count: u64) {
542 self.schemas_total.store(count, Ordering::Relaxed);
543 }
544
545 pub fn record_profile_update(&self) {
547 self.profile_updates_total.fetch_add(1, Ordering::Relaxed);
548 }
549
550 pub fn record_schema_violation(&self, endpoint: &str) {
552 let mut violations = self.schema_violations_total.write();
553 if violations.contains_key(endpoint) || violations.len() < MAX_METRICS_MAP_SIZE {
554 *violations.entry(endpoint.to_string()).or_insert(0) += 1;
555 }
556 }
557
558 pub fn get_schema_violations(&self) -> Vec<(String, u64)> {
560 let violations = self.schema_violations_total.read();
561 violations.iter().map(|(k, v)| (k.clone(), *v)).collect()
562 }
563
564 pub fn record_anomaly(&self, anomaly_type: &str, score: f64) {
566 let mut anomalies = self.anomalies_detected.write();
567 if anomalies.contains_key(anomaly_type) || anomalies.len() < MAX_METRICS_MAP_SIZE {
568 *anomalies.entry(anomaly_type.to_string()).or_insert(0) += 1;
569 }
570
571 self.requests_with_anomalies.fetch_add(1, Ordering::Relaxed);
572
573 let scaled_score = (score * 1000.0) as u64;
575 let current = self.avg_anomaly_score.load(Ordering::Relaxed);
576 let new = if current == 0 {
577 scaled_score
578 } else {
579 (current * 9 + scaled_score) / 10 };
581 self.avg_anomaly_score.store(new, Ordering::Relaxed);
582 }
583
584 pub fn record_endpoint(&self, path: &str, method: &str) {
586 let now = std::time::SystemTime::now()
587 .duration_since(std::time::UNIX_EPOCH)
588 .map(|d| d.as_millis() as u64)
589 .unwrap_or(0);
590
591 let mut stats = self.endpoint_stats.write();
592
593 if !stats.contains_key(path) && stats.len() >= MAX_METRICS_MAP_SIZE {
595 return;
596 }
597
598 let entry = stats
599 .entry(path.to_string())
600 .or_insert_with(|| EndpointStats {
601 hit_count: 0,
602 first_seen: now,
603 last_seen: now,
604 methods: Vec::new(),
605 });
606
607 entry.hit_count += 1;
608 entry.last_seen = now;
609
610 let method_str = method.to_string();
612 if !entry.methods.contains(&method_str) {
613 entry.methods.push(method_str);
614 }
615
616 let count = stats.len() as u64;
618 self.profiles_active.store(count, Ordering::Relaxed);
619 }
620
621 pub fn get_endpoint_stats(&self) -> Vec<(String, EndpointStats)> {
623 let stats = self.endpoint_stats.read();
624 stats.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
625 }
626
627 pub fn record_request_bytes(&self, bytes: u64) {
629 self.total_bytes_in.fetch_add(bytes, Ordering::Relaxed);
630 self.bandwidth_request_count.fetch_add(1, Ordering::Relaxed);
631
632 let mut current_max = self.max_request_size.load(Ordering::Relaxed);
634 while bytes > current_max {
635 match self.max_request_size.compare_exchange_weak(
636 current_max,
637 bytes,
638 Ordering::Relaxed,
639 Ordering::Relaxed,
640 ) {
641 Ok(_) => break,
642 Err(x) => current_max = x,
643 }
644 }
645
646 self.update_timeline(bytes, 0);
648 }
649
650 pub fn record_response_bytes(&self, bytes: u64) {
652 self.total_bytes_out.fetch_add(bytes, Ordering::Relaxed);
653
654 let mut current_max = self.max_response_size.load(Ordering::Relaxed);
656 while bytes > current_max {
657 match self.max_response_size.compare_exchange_weak(
658 current_max,
659 bytes,
660 Ordering::Relaxed,
661 Ordering::Relaxed,
662 ) {
663 Ok(_) => break,
664 Err(x) => current_max = x,
665 }
666 }
667
668 self.update_timeline(0, bytes);
670 }
671
672 fn update_timeline(&self, bytes_in: u64, bytes_out: u64) {
674 let now_ms = std::time::SystemTime::now()
675 .duration_since(std::time::UNIX_EPOCH)
676 .map(|d| d.as_millis() as u64)
677 .unwrap_or(0);
678 let current_minute = now_ms / 60_000;
679
680 let mut timeline = self.bandwidth_timeline.write();
681
682 if current_minute != timeline.last_minute {
684 if timeline.last_minute > 0 {
686 timeline.current_index = (timeline.current_index + 1) % 60;
687 }
688 timeline.last_minute = current_minute;
689
690 let reset_idx = timeline.current_index;
692 timeline.points[reset_idx] = BandwidthDataPoint {
693 timestamp: now_ms,
694 bytes_in: 0,
695 bytes_out: 0,
696 request_count: 0,
697 };
698 }
699
700 let idx = timeline.current_index;
702 timeline.points[idx].bytes_in += bytes_in;
703 timeline.points[idx].bytes_out += bytes_out;
704 if bytes_in > 0 {
705 timeline.points[idx].request_count += 1;
706 }
707 }
708
709 pub fn get_bandwidth_stats(&self) -> BandwidthStats {
711 let total_bytes_in = self.total_bytes_in.load(Ordering::Relaxed);
712 let total_bytes_out = self.total_bytes_out.load(Ordering::Relaxed);
713 let request_count = self.bandwidth_request_count.load(Ordering::Relaxed);
714 let max_request = self.max_request_size.load(Ordering::Relaxed);
715 let max_response = self.max_response_size.load(Ordering::Relaxed);
716
717 let avg_bytes_per_request = if request_count > 0 {
718 (total_bytes_in + total_bytes_out) / request_count
719 } else {
720 0
721 };
722
723 let timeline = self.bandwidth_timeline.read();
725 let mut timeline_points: Vec<BandwidthDataPoint> = Vec::new();
726
727 for i in 0..60 {
729 let idx = (timeline.current_index + 60 - i) % 60;
730 let point = &timeline.points[idx];
731 if point.timestamp > 0 {
732 timeline_points.push(point.clone());
733 }
734 }
735
736 BandwidthStats {
737 total_bytes: total_bytes_in + total_bytes_out,
738 total_bytes_in,
739 total_bytes_out,
740 avg_bytes_per_request,
741 max_request_size: max_request,
742 max_response_size: max_response,
743 request_count,
744 timeline: timeline_points,
745 }
746 }
747
748 pub fn reset(&self) {
750 self.profiles_active.store(0, Ordering::Relaxed);
751 self.profiles_total.store(0, Ordering::Relaxed);
752 self.schemas_total.store(0, Ordering::Relaxed);
753 self.profile_updates_total.store(0, Ordering::Relaxed);
754 self.avg_anomaly_score.store(0, Ordering::Relaxed);
755 self.requests_with_anomalies.store(0, Ordering::Relaxed);
756 self.total_bytes_in.store(0, Ordering::Relaxed);
757 self.total_bytes_out.store(0, Ordering::Relaxed);
758 self.bandwidth_request_count.store(0, Ordering::Relaxed);
759 self.max_request_size.store(0, Ordering::Relaxed);
760 self.max_response_size.store(0, Ordering::Relaxed);
761
762 self.anomalies_detected.write().clear();
764 self.endpoint_stats.write().clear();
765 self.schema_violations_total.write().clear();
766
767 let mut timeline = self.bandwidth_timeline.write();
769 *timeline = BandwidthTimeline::default();
770
771 tracing::info!("ProfilingMetrics reset complete");
772 }
773}
774
775#[derive(Debug, Clone)]
777pub struct BandwidthStats {
778 pub total_bytes: u64,
780 pub total_bytes_in: u64,
782 pub total_bytes_out: u64,
784 pub avg_bytes_per_request: u64,
786 pub max_request_size: u64,
788 pub max_response_size: u64,
790 pub request_count: u64,
792 pub timeline: Vec<BandwidthDataPoint>,
794}
795
796#[derive(Debug, Default)]
798pub struct RequestCounters {
799 pub total: AtomicU64,
801 pub success_2xx: AtomicU64,
803 pub redirect_3xx: AtomicU64,
805 pub client_error_4xx: AtomicU64,
807 pub server_error_5xx: AtomicU64,
809 pub blocked: AtomicU64,
811}
812
813#[derive(Debug)]
815pub struct LatencyHistogram {
816 buckets: Vec<u64>,
818 counts: Vec<AtomicU64>,
820 sum_us: AtomicU64,
822 count: AtomicU64,
824}
825
826impl Default for LatencyHistogram {
827 fn default() -> Self {
828 let buckets = vec![
830 100, 500, 1000, 5000, 10000, 25000, 50000, 100000, 250000, 500000, 1000000,
831 ];
832 let counts = buckets.iter().map(|_| AtomicU64::new(0)).collect();
833 Self {
834 buckets,
835 counts,
836 sum_us: AtomicU64::new(0),
837 count: AtomicU64::new(0),
838 }
839 }
840}
841
842impl LatencyHistogram {
843 pub fn observe(&self, latency_us: u64) {
845 self.sum_us.fetch_add(latency_us, Ordering::Relaxed);
846 self.count.fetch_add(1, Ordering::Relaxed);
847
848 for (i, &boundary) in self.buckets.iter().enumerate() {
850 if latency_us <= boundary {
851 self.counts[i].fetch_add(1, Ordering::Relaxed);
852 return;
853 }
854 }
855 if let Some(last) = self.counts.last() {
857 last.fetch_add(1, Ordering::Relaxed);
858 }
859 }
860
861 pub fn average_us(&self) -> f64 {
863 let count = self.count.load(Ordering::Relaxed);
864 if count == 0 {
865 0.0
866 } else {
867 self.sum_us.load(Ordering::Relaxed) as f64 / count as f64
868 }
869 }
870
871 pub fn percentile_us(&self, percentile: f64) -> u64 {
873 let count = self.count.load(Ordering::Relaxed);
874 if count == 0 {
875 return 0;
876 }
877
878 let mut pct = percentile;
879 if pct.is_nan() {
880 pct = 0.0;
881 }
882 let pct = pct.clamp(0.0, 1.0);
883
884 let target = ((count as f64) * pct).ceil().max(1.0) as u64;
885 let mut cumulative = 0u64;
886
887 for (i, boundary) in self.buckets.iter().enumerate() {
888 cumulative += self.counts[i].load(Ordering::Relaxed);
889 if cumulative >= target {
890 return *boundary;
891 }
892 }
893
894 *self.buckets.last().unwrap_or(&0)
895 }
896
897 pub fn reset(&self) {
899 for count in &self.counts {
900 count.store(0, Ordering::Relaxed);
901 }
902 self.sum_us.store(0, Ordering::Relaxed);
903 self.count.store(0, Ordering::Relaxed);
904 }
905}
906
907#[derive(Debug)]
910pub struct WindowedCounter {
911 buckets: Vec<AtomicU64>,
913 latency_buckets: Vec<AtomicU64>,
915 current_index: AtomicU64,
917 last_rotation: RwLock<Instant>,
919 window_secs: usize,
921}
922
923impl Default for WindowedCounter {
924 fn default() -> Self {
925 Self::new(60) }
927}
928
929impl WindowedCounter {
930 pub fn new(window_secs: usize) -> Self {
932 let buckets = (0..window_secs).map(|_| AtomicU64::new(0)).collect();
933 let latency_buckets = (0..window_secs).map(|_| AtomicU64::new(0)).collect();
934 Self {
935 buckets,
936 latency_buckets,
937 current_index: AtomicU64::new(0),
938 last_rotation: RwLock::new(Instant::now()),
939 window_secs,
940 }
941 }
942
943 fn maybe_rotate(&self) {
945 let now = Instant::now();
946 let mut last = self.last_rotation.write();
947 let elapsed_secs = now.duration_since(*last).as_secs() as usize;
948
949 if elapsed_secs > 0 {
950 let current = self.current_index.load(Ordering::Relaxed) as usize;
951
952 for i in 1..=elapsed_secs.min(self.window_secs) {
954 let idx = (current + i) % self.window_secs;
955 self.buckets[idx].store(0, Ordering::Relaxed);
956 self.latency_buckets[idx].store(0, Ordering::Relaxed);
957 }
958
959 let new_index = (current + elapsed_secs) % self.window_secs;
961 self.current_index
962 .store(new_index as u64, Ordering::Relaxed);
963 *last = now;
964 }
965 }
966
967 pub fn record(&self, latency_us: u64) {
969 self.maybe_rotate();
970 let idx = self.current_index.load(Ordering::Relaxed) as usize;
971 self.buckets[idx].fetch_add(1, Ordering::Relaxed);
972 self.latency_buckets[idx].fetch_add(latency_us, Ordering::Relaxed);
973 }
974
975 pub fn count(&self) -> u64 {
977 self.maybe_rotate();
978 self.buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum()
979 }
980
981 pub fn average_latency_us(&self) -> f64 {
983 self.maybe_rotate();
984 let total_count: u64 = self.buckets.iter().map(|b| b.load(Ordering::Relaxed)).sum();
985 if total_count == 0 {
986 return 0.0;
987 }
988 let total_latency: u64 = self
989 .latency_buckets
990 .iter()
991 .map(|b| b.load(Ordering::Relaxed))
992 .sum();
993 total_latency as f64 / total_count as f64
994 }
995
996 pub fn get_history(&self) -> Vec<u64> {
999 self.maybe_rotate();
1000 let current = self.current_index.load(Ordering::Relaxed) as usize;
1001 let mut history = Vec::with_capacity(self.window_secs);
1002
1003 for i in 1..=self.window_secs {
1005 let idx = (current + i) % self.window_secs;
1006 history.push(self.buckets[idx].load(Ordering::Relaxed));
1007 }
1008 history
1009 }
1010}
1011
1012#[derive(Debug, Default)]
1014pub struct WafMetrics {
1015 pub analyzed: AtomicU64,
1017 pub blocked: AtomicU64,
1019 pub challenged: AtomicU64,
1021 pub logged: AtomicU64,
1023 pub detection_time_us: AtomicU64,
1025 rule_matches: Arc<RwLock<HashMap<String, u64>>>,
1027}
1028
1029impl WafMetrics {
1030 pub fn record(&self, blocked: bool, challenged: bool, logged: bool, detection_us: u64) {
1032 self.analyzed.fetch_add(1, Ordering::Relaxed);
1033 self.detection_time_us
1034 .fetch_add(detection_us, Ordering::Relaxed);
1035
1036 if blocked {
1037 self.blocked.fetch_add(1, Ordering::Relaxed);
1038 } else if challenged {
1039 self.challenged.fetch_add(1, Ordering::Relaxed);
1040 } else if logged {
1041 self.logged.fetch_add(1, Ordering::Relaxed);
1042 }
1043 }
1044
1045 pub fn record_rule_match(&self, rule_id: &str) {
1047 let mut matches = self.rule_matches.write();
1048 *matches.entry(rule_id.to_string()).or_insert(0) += 1;
1049 }
1050
1051 pub fn avg_detection_us(&self) -> f64 {
1053 let analyzed = self.analyzed.load(Ordering::Relaxed);
1054 if analyzed == 0 {
1055 0.0
1056 } else {
1057 self.detection_time_us.load(Ordering::Relaxed) as f64 / analyzed as f64
1058 }
1059 }
1060}
1061
1062#[derive(Debug, Default)]
1064pub struct ShadowMetrics {
1065 pub mirrored: AtomicU64,
1067 pub rate_limited: AtomicU64,
1069 pub failed: AtomicU64,
1071 pub bytes_sent: AtomicU64,
1073 pub delivery_time_us: AtomicU64,
1075}
1076
1077impl ShadowMetrics {
1078 pub fn record_success(&self, bytes: u64, delivery_us: u64) {
1080 self.mirrored.fetch_add(1, Ordering::Relaxed);
1081 self.bytes_sent.fetch_add(bytes, Ordering::Relaxed);
1082 self.delivery_time_us
1083 .fetch_add(delivery_us, Ordering::Relaxed);
1084 }
1085
1086 pub fn record_rate_limited(&self) {
1088 self.rate_limited.fetch_add(1, Ordering::Relaxed);
1089 }
1090
1091 pub fn record_failed(&self) {
1093 self.failed.fetch_add(1, Ordering::Relaxed);
1094 }
1095
1096 pub fn avg_delivery_us(&self) -> f64 {
1098 let mirrored = self.mirrored.load(Ordering::Relaxed);
1099 if mirrored == 0 {
1100 0.0
1101 } else {
1102 self.delivery_time_us.load(Ordering::Relaxed) as f64 / mirrored as f64
1103 }
1104 }
1105}
1106
1107#[derive(Debug, Default, Clone)]
1109pub struct BackendMetrics {
1110 pub requests: u64,
1112 pub successes: u64,
1114 pub failures: u64,
1116 pub response_time_us: u64,
1118 pub healthy: bool,
1120}
1121
1122impl MetricsRegistry {
1123 pub fn new() -> Self {
1125 Self {
1126 start_time: Some(Instant::now()),
1127 ..Default::default()
1128 }
1129 }
1130
1131 pub fn set_actor_manager(&self, manager: Arc<ActorManager>) {
1132 *self.actor_manager.write() = Some(manager);
1133 }
1134
1135 pub fn set_crawler_detector(&self, detector: Arc<CrawlerDetector>) {
1136 *self.crawler_detector.write() = Some(detector);
1137 }
1138
1139 pub fn set_tarpit_manager(&self, manager: Arc<TarpitManager>) {
1140 *self.tarpit_manager.write() = Some(manager);
1141 }
1142
1143 pub fn set_progression_manager(&self, manager: Arc<ProgressionManager>) {
1144 *self.progression_manager.write() = Some(manager);
1145 }
1146
1147 pub fn set_shadow_mirror_manager(&self, manager: Arc<ShadowMirrorManager>) {
1148 *self.shadow_mirror_manager.write() = Some(manager);
1149 }
1150
1151 pub fn set_trends_manager(&self, manager: Arc<TrendsManager>) {
1152 *self.trends_manager.write() = Some(manager);
1153 }
1154
1155 pub fn set_entity_manager(&self, manager: Arc<EntityManager>) {
1156 *self.entity_manager.write() = Some(manager);
1157 }
1158
1159 pub fn set_block_log(&self, log: Arc<BlockLog>) {
1160 *self.block_log.write() = Some(log);
1161 }
1162
1163 pub fn record_request(&self, status_code: u16, latency_us: u64) {
1165 self.request_counts.total.fetch_add(1, Ordering::Relaxed);
1166 self.latencies.observe(latency_us);
1167 self.windowed_requests.record(latency_us);
1168
1169 match status_code {
1170 200..=299 => self
1171 .request_counts
1172 .success_2xx
1173 .fetch_add(1, Ordering::Relaxed),
1174 300..=399 => self
1175 .request_counts
1176 .redirect_3xx
1177 .fetch_add(1, Ordering::Relaxed),
1178 400..=499 => self
1179 .request_counts
1180 .client_error_4xx
1181 .fetch_add(1, Ordering::Relaxed),
1182 500..=599 => self
1183 .request_counts
1184 .server_error_5xx
1185 .fetch_add(1, Ordering::Relaxed),
1186 _ => 0, };
1188 }
1189
1190 pub fn begin_request(&self) -> ActiveRequestGuard {
1192 self.active_requests.fetch_add(1, Ordering::Relaxed);
1193 ActiveRequestGuard {
1194 counter: Arc::clone(&self.active_requests),
1195 }
1196 }
1197
1198 pub fn active_requests(&self) -> u64 {
1200 self.active_requests.load(Ordering::Relaxed)
1201 }
1202
1203 pub fn total_blocked(&self) -> u64 {
1205 self.request_counts.blocked.load(Ordering::Relaxed)
1206 }
1207
1208 pub fn waf_blocked(&self) -> u64 {
1210 self.waf_metrics.blocked.load(Ordering::Relaxed)
1211 }
1212
1213 pub fn avg_waf_detection_us(&self) -> f64 {
1215 self.waf_metrics.avg_detection_us()
1216 }
1217
1218 pub fn record_blocked(&self) {
1220 self.request_counts.blocked.fetch_add(1, Ordering::Relaxed);
1221 }
1222
1223 pub fn requests_last_minute(&self) -> u64 {
1225 self.windowed_requests.count()
1226 }
1227
1228 pub fn total_requests(&self) -> u64 {
1230 self.request_counts.total.load(Ordering::Relaxed)
1231 }
1232
1233 pub fn error_requests(&self) -> u64 {
1235 self.request_counts.client_error_4xx.load(Ordering::Relaxed)
1236 + self.request_counts.server_error_5xx.load(Ordering::Relaxed)
1237 }
1238
1239 pub fn latency_percentile_ms(&self, percentile: f64) -> f64 {
1241 self.latencies.percentile_us(percentile) as f64 / 1000.0
1242 }
1243
1244 pub fn avg_latency_ms(&self) -> f64 {
1246 self.windowed_requests.average_latency_us() / 1000.0
1247 }
1248
1249 pub fn request_history(&self) -> Vec<u64> {
1251 self.windowed_requests.get_history()
1252 }
1253
1254 pub fn top_rules(&self, limit: usize) -> Vec<(String, u64)> {
1256 let matches = self.waf_metrics.rule_matches.read();
1257 let mut rules: Vec<_> = matches.iter().map(|(k, v)| (k.clone(), *v)).collect();
1258 rules.sort_by_key(|s| std::cmp::Reverse(s.1));
1259 rules.truncate(limit);
1260 rules
1261 }
1262
1263 pub fn backend_status(&self) -> Vec<(String, BackendMetrics)> {
1265 let backends = self.backend_metrics.read();
1266 backends
1267 .iter()
1268 .map(|(k, v)| (k.clone(), v.clone()))
1269 .collect()
1270 }
1271
1272 pub fn top_crawlers(&self, limit: usize) -> Vec<(String, u64)> {
1274 self.crawler_detector
1275 .read()
1276 .as_ref()
1277 .map(|d| d.get_crawler_distribution(limit))
1278 .unwrap_or_default()
1279 }
1280
1281 pub fn top_bad_bots(&self, limit: usize) -> Vec<(String, u64)> {
1283 self.crawler_detector
1284 .read()
1285 .as_ref()
1286 .map(|d| d.get_bad_bot_distribution(limit))
1287 .unwrap_or_default()
1288 }
1289
1290 pub fn top_risky_actors(&self, limit: usize) -> Vec<crate::actor::ActorState> {
1292 self.actor_manager
1293 .read()
1294 .as_ref()
1295 .map(|m| m.list_by_min_risk(1.0, limit, 0))
1296 .unwrap_or_default()
1297 }
1298
1299 pub fn top_ja4_clusters(&self, limit: usize) -> Vec<(String, Vec<String>, f64)> {
1301 self.actor_manager
1302 .read()
1303 .as_ref()
1304 .map(|m| m.get_fingerprint_groups(limit))
1305 .unwrap_or_default()
1306 }
1307
1308 pub fn top_dlp_hits(&self, limit: usize) -> Vec<(String, u64)> {
1310 let matches = self.dlp_metrics.matches_by_type.read();
1311 let mut dist: Vec<_> = matches.iter().map(|(k, v)| (k.clone(), *v)).collect();
1312 dist.sort_by_key(|s| std::cmp::Reverse(s.1));
1313 dist.truncate(limit);
1314 dist
1315 }
1316
1317 pub fn tarpit_stats(&self) -> Option<crate::tarpit::TarpitStats> {
1319 self.tarpit_manager.read().as_ref().map(|m| m.stats())
1320 }
1321
1322 pub fn progression_stats(&self) -> Option<crate::interrogator::ProgressionStatsSnapshot> {
1324 self.progression_manager
1325 .read()
1326 .as_ref()
1327 .map(|m| m.stats().snapshot())
1328 }
1329
1330 pub fn shadow_stats(&self) -> Option<crate::shadow::ShadowMirrorStats> {
1332 self.shadow_mirror_manager
1333 .read()
1334 .as_ref()
1335 .map(|m| m.stats())
1336 }
1337
1338 pub fn recent_geo_anomalies(&self, limit: usize) -> Vec<crate::trends::Anomaly> {
1340 self.trends_manager
1341 .read()
1342 .as_ref()
1343 .map(|m| {
1344 m.get_anomalies(crate::trends::AnomalyQueryOptions {
1345 anomaly_type: Some(crate::trends::AnomalyType::ImpossibleTravel),
1346 limit: Some(limit),
1347 ..Default::default()
1348 })
1349 })
1350 .unwrap_or_default()
1351 }
1352
1353 pub fn set_status_message(&self, message: String) {
1355 *self.status_message.write() = Some(message);
1356 }
1357
1358 pub fn get_status_message(&self) -> Option<String> {
1360 self.status_message.write().take()
1361 }
1362
1363 pub fn record_waf(&self, blocked: bool, challenged: bool, logged: bool, detection_us: u64) {
1365 self.waf_metrics
1366 .record(blocked, challenged, logged, detection_us);
1367 }
1368
1369 pub fn record_rule_match(&self, rule_id: &str) {
1371 self.waf_metrics.record_rule_match(rule_id);
1372 }
1373
1374 pub fn record_shadow_success(&self, bytes: u64, delivery_us: u64) {
1376 self.shadow_metrics.record_success(bytes, delivery_us);
1377 }
1378
1379 pub fn record_shadow_rate_limited(&self) {
1381 self.shadow_metrics.record_rate_limited();
1382 }
1383
1384 pub fn record_shadow_failed(&self) {
1386 self.shadow_metrics.record_failed();
1387 }
1388
1389 pub fn shadow_mirrored_total(&self) -> u64 {
1391 self.shadow_metrics.mirrored.load(Ordering::Relaxed)
1392 }
1393
1394 pub fn shadow_rate_limited_total(&self) -> u64 {
1396 self.shadow_metrics.rate_limited.load(Ordering::Relaxed)
1397 }
1398
1399 pub fn shadow_failed_total(&self) -> u64 {
1401 self.shadow_metrics.failed.load(Ordering::Relaxed)
1402 }
1403
1404 pub fn record_profile_metrics(&self, active_profiles: usize, anomalies: &[(String, f64)]) {
1406 self.profiling_metrics
1407 .set_active_profiles(active_profiles as u64);
1408 for (anomaly_type, score) in anomalies {
1409 self.profiling_metrics.record_anomaly(anomaly_type, *score);
1410 }
1411 }
1412
1413 pub fn record_endpoint(&self, path: &str, method: &str) {
1415 self.profiling_metrics.record_endpoint(path, method);
1416 }
1417
1418 pub fn get_endpoint_stats(&self) -> Vec<(String, EndpointStats)> {
1420 self.profiling_metrics.get_endpoint_stats()
1421 }
1422
1423 pub fn record_request_bandwidth(&self, bytes: u64) {
1425 self.profiling_metrics.record_request_bytes(bytes);
1426 }
1427
1428 pub fn record_response_bandwidth(&self, bytes: u64) {
1430 self.profiling_metrics.record_response_bytes(bytes);
1431 }
1432
1433 pub fn get_bandwidth_stats(&self) -> BandwidthStats {
1435 self.profiling_metrics.get_bandwidth_stats()
1436 }
1437
1438 pub fn reset_profiles(&self) {
1440 self.profiling_metrics
1441 .profiles_active
1442 .store(0, Ordering::Relaxed);
1443 self.profiling_metrics
1444 .profiles_total
1445 .store(0, Ordering::Relaxed);
1446 self.profiling_metrics
1447 .profile_updates_total
1448 .store(0, Ordering::Relaxed);
1449 self.profiling_metrics.endpoint_stats.write().clear();
1450 }
1451
1452 pub fn reset_schemas(&self) {
1454 self.profiling_metrics
1455 .schemas_total
1456 .store(0, Ordering::Relaxed);
1457 self.profiling_metrics
1458 .schema_violations_total
1459 .write()
1460 .clear();
1461 }
1462
1463 pub fn record_backend(&self, backend: &str, success: bool, response_time_us: u64) {
1465 let mut backends = self.backend_metrics.write();
1466 if !backends.contains_key(backend) && backends.len() >= MAX_METRICS_MAP_SIZE {
1467 return;
1468 }
1469 let metrics = backends.entry(backend.to_string()).or_default();
1470 metrics.requests += 1;
1471 metrics.response_time_us += response_time_us;
1472 if success {
1473 metrics.successes += 1;
1474 } else {
1475 metrics.failures += 1;
1476 }
1477 metrics.healthy =
1479 metrics.requests == 0 || (metrics.successes as f64 / metrics.requests as f64) > 0.5;
1480 }
1481
1482 pub fn uptime_secs(&self) -> u64 {
1484 self.start_time.map(|t| t.elapsed().as_secs()).unwrap_or(0)
1485 }
1486
1487 pub fn render_prometheus(&self) -> String {
1489 let mut output = String::with_capacity(4096);
1490
1491 output.push_str("# HELP synapse_requests_total Total number of requests\n");
1493 output.push_str("# TYPE synapse_requests_total counter\n");
1494 output.push_str(&format!(
1495 "synapse_requests_total {}\n",
1496 self.request_counts.total.load(Ordering::Relaxed)
1497 ));
1498
1499 output.push_str("# HELP synapse_requests_by_status Requests by status code class\n");
1500 output.push_str("# TYPE synapse_requests_by_status counter\n");
1501 output.push_str(&format!(
1502 "synapse_requests_by_status{{status=\"2xx\"}} {}\n",
1503 self.request_counts.success_2xx.load(Ordering::Relaxed)
1504 ));
1505 output.push_str(&format!(
1506 "synapse_requests_by_status{{status=\"3xx\"}} {}\n",
1507 self.request_counts.redirect_3xx.load(Ordering::Relaxed)
1508 ));
1509 output.push_str(&format!(
1510 "synapse_requests_by_status{{status=\"4xx\"}} {}\n",
1511 self.request_counts.client_error_4xx.load(Ordering::Relaxed)
1512 ));
1513 output.push_str(&format!(
1514 "synapse_requests_by_status{{status=\"5xx\"}} {}\n",
1515 self.request_counts.server_error_5xx.load(Ordering::Relaxed)
1516 ));
1517
1518 output.push_str("# HELP synapse_requests_blocked Requests blocked by WAF\n");
1519 output.push_str("# TYPE synapse_requests_blocked counter\n");
1520 output.push_str(&format!(
1521 "synapse_requests_blocked {}\n",
1522 self.request_counts.blocked.load(Ordering::Relaxed)
1523 ));
1524
1525 output.push_str("# HELP synapse_active_requests Current number of active requests\n");
1526 output.push_str("# TYPE synapse_active_requests gauge\n");
1527 output.push_str(&format!(
1528 "synapse_active_requests {}\n",
1529 self.active_requests.load(Ordering::Relaxed)
1530 ));
1531
1532 output.push_str("# HELP synapse_request_duration_us Request duration in microseconds\n");
1534 output.push_str("# TYPE synapse_request_duration_us histogram\n");
1535 let mut cumulative = 0u64;
1536 for (i, &boundary) in self.latencies.buckets.iter().enumerate() {
1537 cumulative += self.latencies.counts[i].load(Ordering::Relaxed);
1538 output.push_str(&format!(
1539 "synapse_request_duration_us_bucket{{le=\"{}\"}} {}\n",
1540 boundary, cumulative
1541 ));
1542 }
1543 output.push_str(&format!(
1544 "synapse_request_duration_us_bucket{{le=\"+Inf\"}} {}\n",
1545 self.latencies.count.load(Ordering::Relaxed)
1546 ));
1547 output.push_str(&format!(
1548 "synapse_request_duration_us_sum {}\n",
1549 self.latencies.sum_us.load(Ordering::Relaxed)
1550 ));
1551 output.push_str(&format!(
1552 "synapse_request_duration_us_count {}\n",
1553 self.latencies.count.load(Ordering::Relaxed)
1554 ));
1555
1556 output.push_str("# HELP synapse_waf_analyzed Requests analyzed by WAF\n");
1558 output.push_str("# TYPE synapse_waf_analyzed counter\n");
1559 output.push_str(&format!(
1560 "synapse_waf_analyzed {}\n",
1561 self.waf_metrics.analyzed.load(Ordering::Relaxed)
1562 ));
1563
1564 output.push_str("# HELP synapse_waf_blocked Requests blocked by WAF\n");
1565 output.push_str("# TYPE synapse_waf_blocked counter\n");
1566 output.push_str(&format!(
1567 "synapse_waf_blocked {}\n",
1568 self.waf_metrics.blocked.load(Ordering::Relaxed)
1569 ));
1570
1571 output.push_str("# HELP synapse_waf_detection_avg_us Average WAF detection time\n");
1572 output.push_str("# TYPE synapse_waf_detection_avg_us gauge\n");
1573 output.push_str(&format!(
1574 "synapse_waf_detection_avg_us {:.2}\n",
1575 self.waf_metrics.avg_detection_us()
1576 ));
1577
1578 output
1580 .push_str("# HELP synapse_profiles_active_count Number of active endpoint profiles\n");
1581 output.push_str("# TYPE synapse_profiles_active_count gauge\n");
1582 output.push_str(&format!(
1583 "synapse_profiles_active_count {}\n",
1584 self.profiling_metrics
1585 .profiles_active
1586 .load(Ordering::Relaxed)
1587 ));
1588
1589 output.push_str("# HELP synapse_profiles_total Total number of endpoint profiles\n");
1590 output.push_str("# TYPE synapse_profiles_total gauge\n");
1591 output.push_str(&format!(
1592 "synapse_profiles_total {}\n",
1593 self.profiling_metrics
1594 .profiles_total
1595 .load(Ordering::Relaxed)
1596 ));
1597
1598 output.push_str("# HELP synapse_schemas_total Total number of learned schemas\n");
1599 output.push_str("# TYPE synapse_schemas_total gauge\n");
1600 output.push_str(&format!(
1601 "synapse_schemas_total {}\n",
1602 self.profiling_metrics.schemas_total.load(Ordering::Relaxed)
1603 ));
1604
1605 output.push_str("# HELP synapse_profile_updates_total Total profile update operations\n");
1606 output.push_str("# TYPE synapse_profile_updates_total counter\n");
1607 output.push_str(&format!(
1608 "synapse_profile_updates_total {}\n",
1609 self.profiling_metrics
1610 .profile_updates_total
1611 .load(Ordering::Relaxed)
1612 ));
1613
1614 output.push_str("# HELP synapse_schema_violations_total Schema violations by endpoint\n");
1615 output.push_str("# TYPE synapse_schema_violations_total counter\n");
1616 let violations = self.profiling_metrics.schema_violations_total.read();
1617 for (endpoint, count) in violations.iter() {
1618 output.push_str(&format!(
1619 "synapse_schema_violations_total{{endpoint=\"{}\"}} {}\n",
1620 endpoint, count
1621 ));
1622 }
1623
1624 output.push_str("# HELP synapse_anomalies_detected_total Anomalies detected by type\n");
1625 output.push_str("# TYPE synapse_anomalies_detected_total counter\n");
1626 let anomalies = self.profiling_metrics.anomalies_detected.read();
1627 for (anomaly_type, count) in anomalies.iter() {
1628 output.push_str(&format!(
1629 "synapse_anomalies_detected_total{{type=\"{}\"}} {}\n",
1630 anomaly_type, count
1631 ));
1632 }
1633
1634 output.push_str("# HELP synapse_avg_anomaly_score Average anomaly score (0-10)\n");
1635 output.push_str("# TYPE synapse_avg_anomaly_score gauge\n");
1636 output.push_str(&format!(
1637 "synapse_avg_anomaly_score {:.2}\n",
1638 self.profiling_metrics
1639 .avg_anomaly_score
1640 .load(Ordering::Relaxed) as f64
1641 / 1000.0
1642 ));
1643
1644 output.push_str("# HELP synapse_backend_requests Backend request counts\n");
1646 output.push_str("# TYPE synapse_backend_requests counter\n");
1647 output.push_str("# HELP synapse_backend_healthy Backend health status\n");
1648 output.push_str("# TYPE synapse_backend_healthy gauge\n");
1649
1650 let backends = self.backend_metrics.read();
1651 for (backend, metrics) in backends.iter() {
1652 output.push_str(&format!(
1653 "synapse_backend_requests{{backend=\"{}\"}} {}\n",
1654 backend, metrics.requests
1655 ));
1656 output.push_str(&format!(
1657 "synapse_backend_healthy{{backend=\"{}\"}} {}\n",
1658 backend,
1659 if metrics.healthy { 1 } else { 0 }
1660 ));
1661 }
1662
1663 output.push_str("# HELP synapse_shadow_mirrored Requests mirrored to honeypots\n");
1665 output.push_str("# TYPE synapse_shadow_mirrored counter\n");
1666 output.push_str(&format!(
1667 "synapse_shadow_mirrored {}\n",
1668 self.shadow_metrics.mirrored.load(Ordering::Relaxed)
1669 ));
1670
1671 output
1672 .push_str("# HELP synapse_shadow_rate_limited Requests rate-limited from mirroring\n");
1673 output.push_str("# TYPE synapse_shadow_rate_limited counter\n");
1674 output.push_str(&format!(
1675 "synapse_shadow_rate_limited {}\n",
1676 self.shadow_metrics.rate_limited.load(Ordering::Relaxed)
1677 ));
1678
1679 output.push_str("# HELP synapse_shadow_failed Failed mirror deliveries\n");
1680 output.push_str("# TYPE synapse_shadow_failed counter\n");
1681 output.push_str(&format!(
1682 "synapse_shadow_failed {}\n",
1683 self.shadow_metrics.failed.load(Ordering::Relaxed)
1684 ));
1685
1686 output.push_str("# HELP synapse_shadow_bytes_total Total bytes sent to honeypots\n");
1687 output.push_str("# TYPE synapse_shadow_bytes_total counter\n");
1688 output.push_str(&format!(
1689 "synapse_shadow_bytes_total {}\n",
1690 self.shadow_metrics.bytes_sent.load(Ordering::Relaxed)
1691 ));
1692
1693 output.push_str("# HELP synapse_shadow_delivery_avg_us Average shadow delivery time\n");
1694 output.push_str("# TYPE synapse_shadow_delivery_avg_us gauge\n");
1695 output.push_str(&format!(
1696 "synapse_shadow_delivery_avg_us {:.2}\n",
1697 self.shadow_metrics.avg_delivery_us()
1698 ));
1699
1700 output.push_str("# HELP synapse_dlp_scans_total Total DLP scans performed\n");
1702 output.push_str("# TYPE synapse_dlp_scans_total counter\n");
1703 output.push_str(&format!(
1704 "synapse_dlp_scans_total {}\n",
1705 self.dlp_metrics.scans_total.load(Ordering::Relaxed)
1706 ));
1707
1708 output.push_str("# HELP synapse_dlp_matches_total Total DLP matches found\n");
1709 output.push_str("# TYPE synapse_dlp_matches_total counter\n");
1710 output.push_str(&format!(
1711 "synapse_dlp_matches_total {}\n",
1712 self.dlp_metrics.matches_total.load(Ordering::Relaxed)
1713 ));
1714
1715 output.push_str("# HELP synapse_dlp_matches_by_type DLP matches by pattern type\n");
1716 output.push_str("# TYPE synapse_dlp_matches_by_type counter\n");
1717 let matches_by_type = self.dlp_metrics.matches_by_type.read();
1718 for (pattern_type, count) in matches_by_type.iter() {
1719 output.push_str(&format!(
1720 "synapse_dlp_matches_by_type{{type=\"{}\"}} {}\n",
1721 pattern_type, count
1722 ));
1723 }
1724
1725 output.push_str("# HELP synapse_dlp_matches_by_severity DLP matches by severity\n");
1726 output.push_str("# TYPE synapse_dlp_matches_by_severity counter\n");
1727 let matches_by_severity = self.dlp_metrics.matches_by_severity.read();
1728 for (severity, count) in matches_by_severity.iter() {
1729 output.push_str(&format!(
1730 "synapse_dlp_matches_by_severity{{severity=\"{}\"}} {}\n",
1731 severity, count
1732 ));
1733 }
1734
1735 output.push_str(
1736 "# HELP synapse_dlp_violations_dropped Violations dropped due to buffer overflow\n",
1737 );
1738 output.push_str("# TYPE synapse_dlp_violations_dropped counter\n");
1739 output.push_str(&format!(
1740 "synapse_dlp_violations_dropped {}\n",
1741 self.dlp_metrics.violations_dropped.load(Ordering::Relaxed)
1742 ));
1743
1744 output.push_str("# HELP synapse_signal_dispatch_total Total signal dispatch attempts\n");
1746 output.push_str("# TYPE synapse_signal_dispatch_total counter\n");
1747 output.push_str(&format!(
1748 "synapse_signal_dispatch_total {}\n",
1749 self.signal_dispatch_metrics.total.load(Ordering::Relaxed)
1750 ));
1751
1752 output.push_str("# HELP synapse_signal_dispatch_success Successful signal dispatches\n");
1753 output.push_str("# TYPE synapse_signal_dispatch_success counter\n");
1754 output.push_str(&format!(
1755 "synapse_signal_dispatch_success {}\n",
1756 self.signal_dispatch_metrics.success.load(Ordering::Relaxed)
1757 ));
1758
1759 output.push_str("# HELP synapse_signal_dispatch_failure Failed signal dispatches\n");
1760 output.push_str("# TYPE synapse_signal_dispatch_failure counter\n");
1761 output.push_str(&format!(
1762 "synapse_signal_dispatch_failure {}\n",
1763 self.signal_dispatch_metrics.failure.load(Ordering::Relaxed)
1764 ));
1765
1766 output.push_str("# HELP synapse_signal_dispatch_timeout Timed out signal dispatches\n");
1767 output.push_str("# TYPE synapse_signal_dispatch_timeout counter\n");
1768 output.push_str(&format!(
1769 "synapse_signal_dispatch_timeout {}\n",
1770 self.signal_dispatch_metrics.timeout.load(Ordering::Relaxed)
1771 ));
1772
1773 output.push_str(
1774 "# HELP synapse_signal_dispatch_duration_us Signal dispatch duration in microseconds\n",
1775 );
1776 output.push_str("# TYPE synapse_signal_dispatch_duration_us histogram\n");
1777 let mut cumulative_dispatch = 0u64;
1778 for (i, &boundary) in self
1779 .signal_dispatch_metrics
1780 .latencies
1781 .buckets
1782 .iter()
1783 .enumerate()
1784 {
1785 cumulative_dispatch +=
1786 self.signal_dispatch_metrics.latencies.counts[i].load(Ordering::Relaxed);
1787 output.push_str(&format!(
1788 "synapse_signal_dispatch_duration_us_bucket{{le=\"{}\"}} {}\n",
1789 boundary, cumulative_dispatch
1790 ));
1791 }
1792 output.push_str(&format!(
1793 "synapse_signal_dispatch_duration_us_bucket{{le=\"+Inf\"}} {}\n",
1794 self.signal_dispatch_metrics
1795 .latencies
1796 .count
1797 .load(Ordering::Relaxed)
1798 ));
1799 output.push_str(&format!(
1800 "synapse_signal_dispatch_duration_us_sum {}\n",
1801 self.signal_dispatch_metrics
1802 .latencies
1803 .sum_us
1804 .load(Ordering::Relaxed)
1805 ));
1806 output.push_str(&format!(
1807 "synapse_signal_dispatch_duration_us_count {}\n",
1808 self.signal_dispatch_metrics
1809 .latencies
1810 .count
1811 .load(Ordering::Relaxed)
1812 ));
1813
1814 output.push_str("# HELP synapse_tunnel_connected Tunnel connection state (1=connected)\n");
1816 output.push_str("# TYPE synapse_tunnel_connected gauge\n");
1817 output.push_str(&format!(
1818 "synapse_tunnel_connected {}\n",
1819 self.tunnel_metrics.connected.load(Ordering::Relaxed)
1820 ));
1821
1822 output.push_str("# HELP synapse_tunnel_messages_sent_total Tunnel messages sent\n");
1823 output.push_str("# TYPE synapse_tunnel_messages_sent_total counter\n");
1824 output.push_str(&format!(
1825 "synapse_tunnel_messages_sent_total {}\n",
1826 self.tunnel_metrics.messages_sent.load(Ordering::Relaxed)
1827 ));
1828
1829 output.push_str("# HELP synapse_tunnel_messages_received_total Tunnel messages received\n");
1830 output.push_str("# TYPE synapse_tunnel_messages_received_total counter\n");
1831 output.push_str(&format!(
1832 "synapse_tunnel_messages_received_total {}\n",
1833 self.tunnel_metrics
1834 .messages_received
1835 .load(Ordering::Relaxed)
1836 ));
1837
1838 output
1839 .push_str("# HELP synapse_tunnel_reconnect_attempts_total Tunnel reconnect attempts\n");
1840 output.push_str("# TYPE synapse_tunnel_reconnect_attempts_total counter\n");
1841 output.push_str(&format!(
1842 "synapse_tunnel_reconnect_attempts_total {}\n",
1843 self.tunnel_metrics
1844 .reconnect_attempts
1845 .load(Ordering::Relaxed)
1846 ));
1847
1848 output.push_str(
1849 "# HELP synapse_tunnel_reconnect_delay_ms Tunnel reconnect backoff in milliseconds\n",
1850 );
1851 output.push_str("# TYPE synapse_tunnel_reconnect_delay_ms histogram\n");
1852 let mut reconnect_cumulative = 0u64;
1853 for (idx, &boundary) in self
1854 .tunnel_metrics
1855 .reconnect_delay_ms
1856 .buckets
1857 .iter()
1858 .enumerate()
1859 {
1860 reconnect_cumulative +=
1861 self.tunnel_metrics.reconnect_delay_ms.counts[idx].load(Ordering::Relaxed);
1862 output.push_str(&format!(
1863 "synapse_tunnel_reconnect_delay_ms_bucket{{le=\"{}\"}} {}\n",
1864 boundary, reconnect_cumulative
1865 ));
1866 }
1867 output.push_str(&format!(
1868 "synapse_tunnel_reconnect_delay_ms_bucket{{le=\"+Inf\"}} {}\n",
1869 self.tunnel_metrics
1870 .reconnect_delay_ms
1871 .count
1872 .load(Ordering::Relaxed)
1873 ));
1874 output.push_str(&format!(
1875 "synapse_tunnel_reconnect_delay_ms_sum {}\n",
1876 self.tunnel_metrics
1877 .reconnect_delay_ms
1878 .sum_ms
1879 .load(Ordering::Relaxed)
1880 ));
1881 output.push_str(&format!(
1882 "synapse_tunnel_reconnect_delay_ms_count {}\n",
1883 self.tunnel_metrics
1884 .reconnect_delay_ms
1885 .count
1886 .load(Ordering::Relaxed)
1887 ));
1888
1889 output.push_str("# HELP synapse_tunnel_auth_timeout_total Tunnel auth timeouts\n");
1890 output.push_str("# TYPE synapse_tunnel_auth_timeout_total counter\n");
1891 output.push_str(&format!(
1892 "synapse_tunnel_auth_timeout_total {}\n",
1893 self.tunnel_metrics.auth_timeouts.load(Ordering::Relaxed)
1894 ));
1895
1896 output.push_str("# HELP synapse_tunnel_heartbeat_sent_total Tunnel heartbeats sent\n");
1897 output.push_str("# TYPE synapse_tunnel_heartbeat_sent_total counter\n");
1898 output.push_str(&format!(
1899 "synapse_tunnel_heartbeat_sent_total {}\n",
1900 self.tunnel_metrics.heartbeats_sent.load(Ordering::Relaxed)
1901 ));
1902
1903 output
1904 .push_str("# HELP synapse_tunnel_heartbeat_timeout_total Tunnel heartbeat timeouts\n");
1905 output.push_str("# TYPE synapse_tunnel_heartbeat_timeout_total counter\n");
1906 output.push_str(&format!(
1907 "synapse_tunnel_heartbeat_timeout_total {}\n",
1908 self.tunnel_metrics
1909 .heartbeat_timeouts
1910 .load(Ordering::Relaxed)
1911 ));
1912
1913 output.push_str(
1914 "# HELP synapse_tunnel_channel_buffer_overflow_total Tunnel channel buffer pressure events\n",
1915 );
1916 output.push_str("# TYPE synapse_tunnel_channel_buffer_overflow_total counter\n");
1917 for channel in TunnelChannel::ALL.iter().copied() {
1918 output.push_str(&format!(
1919 "synapse_tunnel_channel_buffer_overflow_total{{channel=\"{}\"}} {}\n",
1920 channel.as_str(),
1921 self.tunnel_metrics.channel_overflow_total(channel)
1922 ));
1923 }
1924
1925 output.push_str(
1926 "# HELP synapse_tunnel_handler_latency_ms Tunnel handler latency in milliseconds\n",
1927 );
1928 output.push_str("# TYPE synapse_tunnel_handler_latency_ms histogram\n");
1929 for channel in TunnelChannel::ALL.iter().copied() {
1930 let hist = self.tunnel_metrics.handler_latency_hist(channel);
1931 let mut cumulative = 0u64;
1932 for (idx, &boundary) in hist.buckets.iter().enumerate() {
1933 cumulative += hist.counts[idx].load(Ordering::Relaxed);
1934 output.push_str(&format!(
1935 "synapse_tunnel_handler_latency_ms_bucket{{channel=\"{}\",le=\"{}\"}} {}\n",
1936 channel.as_str(),
1937 boundary,
1938 cumulative
1939 ));
1940 }
1941 output.push_str(&format!(
1942 "synapse_tunnel_handler_latency_ms_bucket{{channel=\"{}\",le=\"+Inf\"}} {}\n",
1943 channel.as_str(),
1944 hist.count.load(Ordering::Relaxed)
1945 ));
1946 output.push_str(&format!(
1947 "synapse_tunnel_handler_latency_ms_sum{{channel=\"{}\"}} {}\n",
1948 channel.as_str(),
1949 hist.sum_ms.load(Ordering::Relaxed)
1950 ));
1951 output.push_str(&format!(
1952 "synapse_tunnel_handler_latency_ms_count{{channel=\"{}\"}} {}\n",
1953 channel.as_str(),
1954 hist.count.load(Ordering::Relaxed)
1955 ));
1956 }
1957
1958 output.push_str("# HELP synapse_uptime_seconds Service uptime in seconds\n");
1959 output.push_str("# TYPE synapse_uptime_seconds gauge\n");
1960 output.push_str(&format!("synapse_uptime_seconds {}\n", self.uptime_secs()));
1961
1962 output
1963 }
1964
1965 pub fn reset(&self) {
1968 self.request_counts.total.store(0, Ordering::Relaxed);
1970 self.request_counts.success_2xx.store(0, Ordering::Relaxed);
1971 self.request_counts.redirect_3xx.store(0, Ordering::Relaxed);
1972 self.request_counts
1973 .client_error_4xx
1974 .store(0, Ordering::Relaxed);
1975 self.request_counts
1976 .server_error_5xx
1977 .store(0, Ordering::Relaxed);
1978 self.request_counts.blocked.store(0, Ordering::Relaxed);
1979
1980 for count in &self.latencies.counts {
1982 count.store(0, Ordering::Relaxed);
1983 }
1984 self.latencies.sum_us.store(0, Ordering::Relaxed);
1985 self.latencies.count.store(0, Ordering::Relaxed);
1986
1987 self.waf_metrics.analyzed.store(0, Ordering::Relaxed);
1989 self.waf_metrics.blocked.store(0, Ordering::Relaxed);
1990 self.waf_metrics.challenged.store(0, Ordering::Relaxed);
1991 self.waf_metrics.logged.store(0, Ordering::Relaxed);
1992 self.waf_metrics
1993 .detection_time_us
1994 .store(0, Ordering::Relaxed);
1995 self.waf_metrics.rule_matches.write().clear();
1996
1997 self.profiling_metrics
1999 .profiles_active
2000 .store(0, Ordering::Relaxed);
2001 self.profiling_metrics
2002 .profiles_total
2003 .store(0, Ordering::Relaxed);
2004 self.profiling_metrics
2005 .schemas_total
2006 .store(0, Ordering::Relaxed);
2007 self.profiling_metrics
2008 .profile_updates_total
2009 .store(0, Ordering::Relaxed);
2010 self.profiling_metrics
2011 .schema_violations_total
2012 .write()
2013 .clear();
2014 self.profiling_metrics.anomalies_detected.write().clear();
2015 self.profiling_metrics
2016 .avg_anomaly_score
2017 .store(0, Ordering::Relaxed);
2018 self.profiling_metrics
2019 .requests_with_anomalies
2020 .store(0, Ordering::Relaxed);
2021 self.profiling_metrics.endpoint_stats.write().clear();
2022
2023 self.shadow_metrics.mirrored.store(0, Ordering::Relaxed);
2025 self.shadow_metrics.rate_limited.store(0, Ordering::Relaxed);
2026 self.shadow_metrics.failed.store(0, Ordering::Relaxed);
2027 self.shadow_metrics.bytes_sent.store(0, Ordering::Relaxed);
2028 self.shadow_metrics
2029 .delivery_time_us
2030 .store(0, Ordering::Relaxed);
2031
2032 self.dlp_metrics.scans_total.store(0, Ordering::Relaxed);
2034 self.dlp_metrics.matches_total.store(0, Ordering::Relaxed);
2035 self.dlp_metrics.matches_by_type.write().clear();
2036 self.dlp_metrics.matches_by_severity.write().clear();
2037 self.dlp_metrics
2038 .violations_dropped
2039 .store(0, Ordering::Relaxed);
2040 self.dlp_metrics.graph_export_durations.write().clear();
2041
2042 self.signal_dispatch_metrics
2044 .total
2045 .store(0, Ordering::Relaxed);
2046 self.signal_dispatch_metrics
2047 .success
2048 .store(0, Ordering::Relaxed);
2049 self.signal_dispatch_metrics
2050 .failure
2051 .store(0, Ordering::Relaxed);
2052 self.signal_dispatch_metrics
2053 .timeout
2054 .store(0, Ordering::Relaxed);
2055 self.signal_dispatch_metrics.latencies.reset();
2056
2057 self.tunnel_metrics.connected.store(0, Ordering::Relaxed);
2059 self.tunnel_metrics
2060 .messages_sent
2061 .store(0, Ordering::Relaxed);
2062 self.tunnel_metrics
2063 .messages_received
2064 .store(0, Ordering::Relaxed);
2065 self.tunnel_metrics
2066 .reconnect_attempts
2067 .store(0, Ordering::Relaxed);
2068 self.tunnel_metrics.reconnect_delay_ms.reset();
2069 self.tunnel_metrics
2070 .auth_timeouts
2071 .store(0, Ordering::Relaxed);
2072 self.tunnel_metrics
2073 .heartbeats_sent
2074 .store(0, Ordering::Relaxed);
2075 self.tunnel_metrics
2076 .heartbeat_timeouts
2077 .store(0, Ordering::Relaxed);
2078 for channel in TunnelChannel::ALL.iter().copied() {
2079 let idx = tunnel_channel_index(channel);
2080 self.tunnel_metrics.channel_overflows[idx].store(0, Ordering::Relaxed);
2081 self.tunnel_metrics.handler_latency_ms[idx].reset();
2082 }
2083 self.active_requests.store(0, Ordering::Relaxed);
2084 }
2085
2086 pub fn dlp_metrics(&self) -> &DlpMetrics {
2088 &self.dlp_metrics
2089 }
2090
2091 pub fn signal_dispatch_metrics(&self) -> &SignalDispatchMetrics {
2093 &self.signal_dispatch_metrics
2094 }
2095
2096 pub fn tunnel_metrics(&self) -> &TunnelMetrics {
2098 &self.tunnel_metrics
2099 }
2100
2101 pub fn profiling_metrics(&self) -> &ProfilingMetrics {
2103 &self.profiling_metrics
2104 }
2105}
2106
2107#[cfg(test)]
2108mod tests {
2109 use super::*;
2110
2111 #[test]
2112 fn test_request_counters() {
2113 let registry = MetricsRegistry::new();
2114
2115 registry.record_request(200, 1000);
2116 registry.record_request(201, 1500);
2117 registry.record_request(404, 500);
2118 registry.record_request(500, 2000);
2119
2120 assert_eq!(registry.request_counts.total.load(Ordering::Relaxed), 4);
2121 assert_eq!(
2122 registry.request_counts.success_2xx.load(Ordering::Relaxed),
2123 2
2124 );
2125 assert_eq!(
2126 registry
2127 .request_counts
2128 .client_error_4xx
2129 .load(Ordering::Relaxed),
2130 1
2131 );
2132 assert_eq!(
2133 registry
2134 .request_counts
2135 .server_error_5xx
2136 .load(Ordering::Relaxed),
2137 1
2138 );
2139 }
2140
2141 #[test]
2142 fn test_latency_histogram() {
2143 let histogram = LatencyHistogram::default();
2144
2145 histogram.observe(50); histogram.observe(150); histogram.observe(750); histogram.observe(5000); assert_eq!(histogram.count.load(Ordering::Relaxed), 4);
2151 assert_eq!(histogram.sum_us.load(Ordering::Relaxed), 5950);
2152 }
2153
2154 #[test]
2155 fn test_latency_average() {
2156 let histogram = LatencyHistogram::default();
2157
2158 histogram.observe(100);
2159 histogram.observe(200);
2160 histogram.observe(300);
2161
2162 assert_eq!(histogram.average_us(), 200.0);
2163 }
2164
2165 #[test]
2166 fn test_waf_metrics() {
2167 let registry = MetricsRegistry::new();
2168
2169 registry.record_waf(true, false, false, 50); registry.record_waf(false, true, false, 30); registry.record_waf(false, false, true, 20); assert_eq!(registry.waf_metrics.analyzed.load(Ordering::Relaxed), 3);
2174 assert_eq!(registry.waf_metrics.blocked.load(Ordering::Relaxed), 1);
2175 assert_eq!(registry.waf_metrics.challenged.load(Ordering::Relaxed), 1);
2176 assert_eq!(registry.waf_metrics.logged.load(Ordering::Relaxed), 1);
2177 }
2178
2179 #[test]
2180 fn test_backend_metrics() {
2181 let registry = MetricsRegistry::new();
2182
2183 registry.record_backend("127.0.0.1:8080", true, 1000);
2184 registry.record_backend("127.0.0.1:8080", true, 1500);
2185 registry.record_backend("127.0.0.1:8080", false, 5000);
2186
2187 let backends = registry.backend_metrics.read();
2188 let metrics = backends.get("127.0.0.1:8080").unwrap();
2189
2190 assert_eq!(metrics.requests, 3);
2191 assert_eq!(metrics.successes, 2);
2192 assert_eq!(metrics.failures, 1);
2193 assert!(metrics.healthy); }
2195
2196 #[test]
2197 fn test_prometheus_output() {
2198 let registry = MetricsRegistry::new();
2199
2200 registry.record_request(200, 1000);
2201 registry.record_blocked();
2202 registry.record_waf(true, false, false, 50);
2203
2204 let output = registry.render_prometheus();
2205
2206 assert!(output.contains("synapse_requests_total 1"));
2207 assert!(output.contains("synapse_requests_blocked 1"));
2208 assert!(output.contains("synapse_waf_analyzed 1"));
2209 assert!(output.contains("synapse_uptime_seconds"));
2210 }
2211
2212 #[test]
2213 fn test_rule_match_recording() {
2214 let registry = MetricsRegistry::new();
2215
2216 registry.record_rule_match("rule-123");
2217 registry.record_rule_match("rule-123");
2218 registry.record_rule_match("rule-456");
2219
2220 let matches = registry.waf_metrics.rule_matches.read();
2221 assert_eq!(matches.get("rule-123"), Some(&2));
2222 assert_eq!(matches.get("rule-456"), Some(&1));
2223 }
2224
2225 #[test]
2226 fn test_uptime() {
2227 let registry = MetricsRegistry::new();
2228
2229 assert!(registry.uptime_secs() < 1);
2231 }
2232
2233 #[test]
2238 fn test_bandwidth_timeline_default() {
2239 let timeline = BandwidthTimeline::default();
2240
2241 assert_eq!(timeline.points.len(), 60);
2243 assert_eq!(timeline.current_index, 0);
2244 assert_eq!(timeline.last_minute, 0);
2245
2246 for point in &timeline.points {
2248 assert_eq!(point.timestamp, 0);
2249 assert_eq!(point.bytes_in, 0);
2250 assert_eq!(point.bytes_out, 0);
2251 assert_eq!(point.request_count, 0);
2252 }
2253 }
2254
2255 #[test]
2256 fn test_bandwidth_timeline_circular_buffer_wrap() {
2257 let mut timeline = BandwidthTimeline::default();
2259
2260 for i in 0..65 {
2262 timeline.current_index = i % 60;
2263 timeline.points[timeline.current_index] = BandwidthDataPoint {
2264 timestamp: (i as u64) * 60_000,
2265 bytes_in: (i as u64) * 100,
2266 bytes_out: (i as u64) * 50,
2267 request_count: 1,
2268 };
2269 }
2270
2271 assert_eq!(timeline.current_index, 4); assert_eq!(timeline.points[4].bytes_in, 6400);
2276 }
2277
2278 #[test]
2279 fn test_bandwidth_data_point_default() {
2280 let point = BandwidthDataPoint::default();
2281
2282 assert_eq!(point.timestamp, 0);
2283 assert_eq!(point.bytes_in, 0);
2284 assert_eq!(point.bytes_out, 0);
2285 assert_eq!(point.request_count, 0);
2286 }
2287
2288 #[test]
2293 fn test_profiling_metrics_record_request_bytes() {
2294 let metrics = ProfilingMetrics::default();
2295
2296 metrics.record_request_bytes(1000);
2297 metrics.record_request_bytes(2000);
2298 metrics.record_request_bytes(500);
2299
2300 assert_eq!(metrics.total_bytes_in.load(Ordering::Relaxed), 3500);
2301 assert_eq!(metrics.bandwidth_request_count.load(Ordering::Relaxed), 3);
2302 }
2303
2304 #[test]
2305 fn test_profiling_metrics_record_request_bytes_zero() {
2306 let metrics = ProfilingMetrics::default();
2307
2308 metrics.record_request_bytes(0);
2309
2310 assert_eq!(metrics.total_bytes_in.load(Ordering::Relaxed), 0);
2311 assert_eq!(metrics.bandwidth_request_count.load(Ordering::Relaxed), 1);
2312 }
2313
2314 #[test]
2315 fn test_profiling_metrics_record_request_bytes_large_value() {
2316 let metrics = ProfilingMetrics::default();
2317
2318 metrics.record_request_bytes(10 * 1024 * 1024);
2320
2321 assert_eq!(
2322 metrics.total_bytes_in.load(Ordering::Relaxed),
2323 10 * 1024 * 1024
2324 );
2325 }
2326
2327 #[test]
2332 fn test_profiling_metrics_record_response_bytes() {
2333 let metrics = ProfilingMetrics::default();
2334
2335 metrics.record_response_bytes(5000);
2336 metrics.record_response_bytes(3000);
2337
2338 assert_eq!(metrics.total_bytes_out.load(Ordering::Relaxed), 8000);
2339 }
2340
2341 #[test]
2342 fn test_profiling_metrics_record_response_bytes_zero() {
2343 let metrics = ProfilingMetrics::default();
2344
2345 metrics.record_response_bytes(0);
2346
2347 assert_eq!(metrics.total_bytes_out.load(Ordering::Relaxed), 0);
2348 }
2349
2350 #[test]
2351 fn test_profiling_metrics_mixed_request_response() {
2352 let metrics = ProfilingMetrics::default();
2353
2354 metrics.record_request_bytes(100);
2355 metrics.record_response_bytes(500);
2356 metrics.record_request_bytes(200);
2357 metrics.record_response_bytes(1000);
2358
2359 assert_eq!(metrics.total_bytes_in.load(Ordering::Relaxed), 300);
2360 assert_eq!(metrics.total_bytes_out.load(Ordering::Relaxed), 1500);
2361 assert_eq!(metrics.bandwidth_request_count.load(Ordering::Relaxed), 2);
2362 }
2363
2364 #[test]
2369 fn test_profiling_metrics_max_request_size_tracking() {
2370 let metrics = ProfilingMetrics::default();
2371
2372 metrics.record_request_bytes(100);
2373 assert_eq!(metrics.max_request_size.load(Ordering::Relaxed), 100);
2374
2375 metrics.record_request_bytes(50); assert_eq!(metrics.max_request_size.load(Ordering::Relaxed), 100);
2377
2378 metrics.record_request_bytes(200); assert_eq!(metrics.max_request_size.load(Ordering::Relaxed), 200);
2380
2381 metrics.record_request_bytes(150); assert_eq!(metrics.max_request_size.load(Ordering::Relaxed), 200);
2383 }
2384
2385 #[test]
2386 fn test_profiling_metrics_max_response_size_tracking() {
2387 let metrics = ProfilingMetrics::default();
2388
2389 metrics.record_response_bytes(500);
2390 assert_eq!(metrics.max_response_size.load(Ordering::Relaxed), 500);
2391
2392 metrics.record_response_bytes(250); assert_eq!(metrics.max_response_size.load(Ordering::Relaxed), 500);
2394
2395 metrics.record_response_bytes(1000); assert_eq!(metrics.max_response_size.load(Ordering::Relaxed), 1000);
2397 }
2398
2399 #[test]
2400 fn test_profiling_metrics_max_size_from_zero() {
2401 let metrics = ProfilingMetrics::default();
2402
2403 assert_eq!(metrics.max_request_size.load(Ordering::Relaxed), 0);
2405 assert_eq!(metrics.max_response_size.load(Ordering::Relaxed), 0);
2406
2407 metrics.record_request_bytes(42);
2409 metrics.record_response_bytes(84);
2410
2411 assert_eq!(metrics.max_request_size.load(Ordering::Relaxed), 42);
2412 assert_eq!(metrics.max_response_size.load(Ordering::Relaxed), 84);
2413 }
2414
2415 #[test]
2420 fn test_profiling_metrics_get_bandwidth_stats_empty() {
2421 let metrics = ProfilingMetrics::default();
2422
2423 let stats = metrics.get_bandwidth_stats();
2424
2425 assert_eq!(stats.total_bytes, 0);
2426 assert_eq!(stats.total_bytes_in, 0);
2427 assert_eq!(stats.total_bytes_out, 0);
2428 assert_eq!(stats.avg_bytes_per_request, 0);
2429 assert_eq!(stats.max_request_size, 0);
2430 assert_eq!(stats.max_response_size, 0);
2431 assert_eq!(stats.request_count, 0);
2432 }
2433
2434 #[test]
2435 fn test_profiling_metrics_get_bandwidth_stats_with_data() {
2436 let metrics = ProfilingMetrics::default();
2437
2438 metrics.record_request_bytes(100);
2439 metrics.record_response_bytes(400);
2440 metrics.record_request_bytes(200);
2441 metrics.record_response_bytes(600);
2442
2443 let stats = metrics.get_bandwidth_stats();
2444
2445 assert_eq!(stats.total_bytes_in, 300);
2446 assert_eq!(stats.total_bytes_out, 1000);
2447 assert_eq!(stats.total_bytes, 1300);
2448 assert_eq!(stats.request_count, 2);
2449 assert_eq!(stats.avg_bytes_per_request, 650); assert_eq!(stats.max_request_size, 200);
2451 assert_eq!(stats.max_response_size, 600);
2452 }
2453
2454 #[test]
2455 fn test_profiling_metrics_get_bandwidth_stats_average_calculation() {
2456 let metrics = ProfilingMetrics::default();
2457
2458 metrics.record_request_bytes(1000);
2460 metrics.record_response_bytes(2000);
2461 metrics.record_request_bytes(500);
2462 metrics.record_response_bytes(1500);
2463 metrics.record_request_bytes(1500);
2464 metrics.record_response_bytes(3500);
2465
2466 let stats = metrics.get_bandwidth_stats();
2467
2468 assert_eq!(stats.total_bytes_in, 3000);
2472 assert_eq!(stats.total_bytes_out, 7000);
2473 assert_eq!(stats.request_count, 3);
2474 assert_eq!(stats.avg_bytes_per_request, 3333);
2475 }
2476
2477 #[test]
2482 fn test_registry_record_request_bandwidth() {
2483 let registry = MetricsRegistry::new();
2484
2485 registry.record_request_bandwidth(1024);
2486 registry.record_request_bandwidth(2048);
2487
2488 let stats = registry.get_bandwidth_stats();
2489 assert_eq!(stats.total_bytes_in, 3072);
2490 }
2491
2492 #[test]
2493 fn test_registry_record_response_bandwidth() {
2494 let registry = MetricsRegistry::new();
2495
2496 registry.record_response_bandwidth(4096);
2497 registry.record_response_bandwidth(8192);
2498
2499 let stats = registry.get_bandwidth_stats();
2500 assert_eq!(stats.total_bytes_out, 12288);
2501 }
2502
2503 #[test]
2504 fn test_registry_bandwidth_stats_integration() {
2505 let registry = MetricsRegistry::new();
2506
2507 registry.record_request_bandwidth(500);
2508 registry.record_response_bandwidth(1500);
2509 registry.record_request_bandwidth(1000);
2510 registry.record_response_bandwidth(3000);
2511
2512 let stats = registry.get_bandwidth_stats();
2513
2514 assert_eq!(stats.total_bytes_in, 1500);
2515 assert_eq!(stats.total_bytes_out, 4500);
2516 assert_eq!(stats.total_bytes, 6000);
2517 assert_eq!(stats.request_count, 2);
2518 assert_eq!(stats.max_request_size, 1000);
2519 assert_eq!(stats.max_response_size, 3000);
2520 }
2521
2522 #[test]
2527 fn test_profiling_metrics_record_endpoint() {
2528 let metrics = ProfilingMetrics::default();
2529
2530 metrics.record_endpoint("/api/users", "GET");
2531 metrics.record_endpoint("/api/users", "GET");
2532 metrics.record_endpoint("/api/users", "POST");
2533 metrics.record_endpoint("/api/products", "GET");
2534
2535 let stats = metrics.endpoint_stats.read();
2536
2537 assert_eq!(stats.len(), 2); let users_stats = stats.get("/api/users").unwrap();
2540 assert_eq!(users_stats.hit_count, 3);
2541 assert_eq!(users_stats.methods.len(), 2); assert!(users_stats.methods.contains(&"GET".to_string()));
2543 assert!(users_stats.methods.contains(&"POST".to_string()));
2544
2545 let products_stats = stats.get("/api/products").unwrap();
2546 assert_eq!(products_stats.hit_count, 1);
2547 assert_eq!(products_stats.methods.len(), 1);
2548 }
2549
2550 #[test]
2551 fn test_profiling_metrics_active_profiles_count() {
2552 let metrics = ProfilingMetrics::default();
2553
2554 assert_eq!(metrics.profiles_active.load(Ordering::Relaxed), 0);
2555
2556 metrics.record_endpoint("/api/v1/users", "GET");
2557 assert_eq!(metrics.profiles_active.load(Ordering::Relaxed), 1);
2558
2559 metrics.record_endpoint("/api/v1/products", "GET");
2560 assert_eq!(metrics.profiles_active.load(Ordering::Relaxed), 2);
2561
2562 metrics.record_endpoint("/api/v1/users", "POST");
2564 assert_eq!(metrics.profiles_active.load(Ordering::Relaxed), 2);
2565 }
2566
2567 #[test]
2568 fn test_profiling_metrics_get_endpoint_stats() {
2569 let metrics = ProfilingMetrics::default();
2570
2571 metrics.record_endpoint("/path1", "GET");
2572 metrics.record_endpoint("/path2", "POST");
2573
2574 let stats = metrics.get_endpoint_stats();
2575
2576 assert_eq!(stats.len(), 2);
2577
2578 let path_names: Vec<&String> = stats.iter().map(|(path, _)| path).collect();
2580 assert!(path_names.contains(&&"/path1".to_string()));
2581 assert!(path_names.contains(&&"/path2".to_string()));
2582 }
2583
2584 #[test]
2589 fn test_profiling_metrics_record_anomaly() {
2590 let metrics = ProfilingMetrics::default();
2591
2592 metrics.record_anomaly("sql_injection", 8.5);
2593 metrics.record_anomaly("xss_attempt", 6.0);
2594 metrics.record_anomaly("sql_injection", 9.0);
2595
2596 let anomalies = metrics.anomalies_detected.read();
2597 assert_eq!(anomalies.get("sql_injection"), Some(&2));
2598 assert_eq!(anomalies.get("xss_attempt"), Some(&1));
2599
2600 assert_eq!(metrics.requests_with_anomalies.load(Ordering::Relaxed), 3);
2601 }
2602
2603 #[test]
2604 fn test_profiling_metrics_avg_anomaly_score_ema() {
2605 let metrics = ProfilingMetrics::default();
2606
2607 metrics.record_anomaly("test", 10.0);
2609 let score1 = metrics.avg_anomaly_score.load(Ordering::Relaxed) as f64 / 1000.0;
2610 assert!((score1 - 10.0).abs() < 0.01);
2611
2612 metrics.record_anomaly("test", 5.0);
2615 let score2 = metrics.avg_anomaly_score.load(Ordering::Relaxed) as f64 / 1000.0;
2616 assert!((score2 - 9.5).abs() < 0.01);
2617 }
2618
2619 #[test]
2624 fn test_registry_reset_profiling_metrics() {
2625 let registry = MetricsRegistry::new();
2626
2627 registry.record_request_bandwidth(1000);
2629 registry.record_response_bandwidth(2000);
2630 registry.record_endpoint("/api/test", "GET");
2631 registry.profiling_metrics.record_anomaly("test", 5.0);
2632
2633 let stats_before = registry.get_bandwidth_stats();
2635 assert!(stats_before.total_bytes > 0);
2636
2637 registry.reset();
2639
2640 assert_eq!(
2642 registry
2643 .profiling_metrics
2644 .profiles_active
2645 .load(Ordering::Relaxed),
2646 0
2647 );
2648 assert_eq!(
2649 registry
2650 .profiling_metrics
2651 .avg_anomaly_score
2652 .load(Ordering::Relaxed),
2653 0
2654 );
2655 assert_eq!(
2656 registry
2657 .profiling_metrics
2658 .requests_with_anomalies
2659 .load(Ordering::Relaxed),
2660 0
2661 );
2662 assert!(registry
2663 .profiling_metrics
2664 .anomalies_detected
2665 .read()
2666 .is_empty());
2667 assert!(registry.profiling_metrics.endpoint_stats.read().is_empty());
2668 }
2669
2670 #[test]
2675 fn test_profiling_metrics_timeline_records_data() {
2676 let metrics = ProfilingMetrics::default();
2677
2678 metrics.record_request_bytes(1000);
2680 metrics.record_response_bytes(2000);
2681
2682 let stats = metrics.get_bandwidth_stats();
2684
2685 assert!(stats.timeline.len() <= 60);
2688 }
2689
2690 #[test]
2691 fn test_bandwidth_stats_struct_fields() {
2692 let stats = BandwidthStats {
2693 total_bytes: 100,
2694 total_bytes_in: 40,
2695 total_bytes_out: 60,
2696 avg_bytes_per_request: 50,
2697 max_request_size: 20,
2698 max_response_size: 30,
2699 request_count: 2,
2700 timeline: vec![],
2701 };
2702
2703 assert_eq!(stats.total_bytes, 100);
2704 assert_eq!(stats.total_bytes_in, 40);
2705 assert_eq!(stats.total_bytes_out, 60);
2706 assert_eq!(stats.avg_bytes_per_request, 50);
2707 assert_eq!(stats.max_request_size, 20);
2708 assert_eq!(stats.max_response_size, 30);
2709 assert_eq!(stats.request_count, 2);
2710 assert!(stats.timeline.is_empty());
2711 }
2712
2713 #[test]
2714 fn test_endpoint_stats_default() {
2715 let stats = EndpointStats::default();
2716
2717 assert_eq!(stats.hit_count, 0);
2718 assert!(stats.first_seen > 0); assert!(stats.last_seen > 0);
2720 assert!(stats.methods.is_empty());
2721 }
2722}