fuse_rule/
metrics.rs

1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Mutex;
4
5// Simple histogram for latency tracking
6#[derive(Debug)]
7pub struct Histogram {
8    buckets: Vec<(f64, AtomicU64)>, // (upper_bound, count)
9}
10
11impl Histogram {
12    fn new() -> Self {
13        // Standard Prometheus buckets: 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10
14        let bounds = vec![
15            0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
16        ];
17        Self {
18            buckets: bounds.into_iter().map(|b| (b, AtomicU64::new(0))).collect(),
19        }
20    }
21
22    fn record(&self, value: f64) {
23        for (bound, count) in &self.buckets {
24            if value <= *bound {
25                count.fetch_add(1, Ordering::Relaxed);
26                return;
27            }
28        }
29        // Value exceeds all buckets - increment the last one (infinity bucket)
30        if let Some((_, count)) = self.buckets.last() {
31            count.fetch_add(1, Ordering::Relaxed);
32        }
33    }
34
35    fn snapshot(&self) -> Vec<(f64, u64)> {
36        self.buckets
37            .iter()
38            .map(|(bound, count)| (*bound, count.load(Ordering::Relaxed)))
39            .collect()
40    }
41
42    fn to_prometheus(&self, name: &str, labels: &str) -> String {
43        let snapshot = self.snapshot();
44        let total: u64 = snapshot.iter().map(|(_, c)| c).sum();
45        let mut output = format!("# HELP {}_seconds Duration histogram.\n", name);
46        output.push_str(&format!("# TYPE {}_seconds histogram\n", name));
47        for (bound, count) in snapshot {
48            output.push_str(&format!(
49                "{}{{le=\"{}\",{}}} {}\n",
50                name, bound, labels, count
51            ));
52        }
53        output.push_str(&format!("{}{{le=\"+Inf\",{}}} {}\n", name, labels, total));
54        output
55    }
56}
57
58pub struct SystemMetrics {
59    pub batches_processed: AtomicU64,
60    pub activations_total: AtomicU64,
61    pub deactivations_total: AtomicU64,
62    pub agent_failures: AtomicU64,
63    pub evaluation_errors: AtomicU64,
64    pub rule_evaluations: Mutex<HashMap<String, AtomicU64>>,
65    pub rule_activations: Mutex<HashMap<String, AtomicU64>>,
66    pub evaluation_duration: Histogram,
67    pub rule_evaluation_duration: Mutex<HashMap<String, Histogram>>, // Per-rule histogram metrics
68    pub agent_execution_duration: Mutex<HashMap<String, Histogram>>,
69}
70
71impl Default for SystemMetrics {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77impl SystemMetrics {
78    pub fn new() -> Self {
79        Self {
80            batches_processed: AtomicU64::new(0),
81            activations_total: AtomicU64::new(0),
82            deactivations_total: AtomicU64::new(0),
83            agent_failures: AtomicU64::new(0),
84            evaluation_errors: AtomicU64::new(0),
85            rule_evaluations: Mutex::new(HashMap::new()),
86            rule_activations: Mutex::new(HashMap::new()),
87            evaluation_duration: Histogram::new(),
88            rule_evaluation_duration: Mutex::new(HashMap::new()),
89            agent_execution_duration: Mutex::new(HashMap::new()),
90        }
91    }
92
93    pub fn record_evaluation_duration(&self, duration_secs: f64) {
94        self.evaluation_duration.record(duration_secs);
95    }
96
97    pub fn record_agent_execution_duration(&self, agent_name: &str, duration_secs: f64) {
98        let mut map = self.agent_execution_duration.lock().unwrap();
99        let hist = map
100            .entry(agent_name.to_string())
101            .or_insert_with(Histogram::new);
102        hist.record(duration_secs);
103    }
104
105    pub fn record_rule_evaluation(&self, rule_id: &str) {
106        let mut map = self.rule_evaluations.lock().unwrap();
107        map.entry(rule_id.to_string())
108            .or_insert_with(|| AtomicU64::new(0))
109            .fetch_add(1, Ordering::Relaxed);
110    }
111
112    pub fn record_rule_activation(&self, rule_id: &str) {
113        let mut map = self.rule_activations.lock().unwrap();
114        map.entry(rule_id.to_string())
115            .or_insert_with(|| AtomicU64::new(0))
116            .fetch_add(1, Ordering::Relaxed);
117    }
118
119    pub fn record_deactivation(&self) {
120        self.deactivations_total.fetch_add(1, Ordering::Relaxed);
121    }
122
123    pub fn record_evaluation_error(&self) {
124        self.evaluation_errors.fetch_add(1, Ordering::Relaxed);
125    }
126
127    pub fn record_rule_evaluation_duration(&self, rule_id: &str, duration_secs: f64) {
128        let mut map = self.rule_evaluation_duration.lock().unwrap();
129        let hist = map
130            .entry(rule_id.to_string())
131            .or_insert_with(Histogram::new);
132        hist.record(duration_secs);
133    }
134
135    pub fn snapshot(&self) -> MetricsSnapshot {
136        let rule_evals: HashMap<String, u64> = self
137            .rule_evaluations
138            .lock()
139            .unwrap()
140            .iter()
141            .map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed)))
142            .collect();
143        let rule_acts: HashMap<String, u64> = self
144            .rule_activations
145            .lock()
146            .unwrap()
147            .iter()
148            .map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed)))
149            .collect();
150
151        MetricsSnapshot {
152            batches_processed: self.batches_processed.load(Ordering::Relaxed),
153            activations_total: self.activations_total.load(Ordering::Relaxed),
154            deactivations_total: self.deactivations_total.load(Ordering::Relaxed),
155            agent_failures: self.agent_failures.load(Ordering::Relaxed),
156            evaluation_errors: self.evaluation_errors.load(Ordering::Relaxed),
157            rule_evaluations: rule_evals,
158            rule_activations: rule_acts,
159        }
160    }
161
162    pub fn to_prometheus(&self) -> String {
163        let snapshot = self.snapshot();
164        let mut output = format!(
165            "# HELP fuserule_batches_processed_total Total number of data batches ingested.\n\
166             # TYPE fuserule_batches_processed_total counter\n\
167             fuserule_batches_processed_total {}\n\
168             # HELP fuserule_activations_total Total number of rule activations triggered.\n\
169             # TYPE fuserule_activations_total counter\n\
170             fuserule_activations_total {}\n\
171             # HELP fuserule_deactivations_total Total number of rule deactivations.\n\
172             # TYPE fuserule_deactivations_total counter\n\
173             fuserule_deactivations_total {}\n\
174             # HELP fuserule_agent_failures_total Total number of failed agent executions.\n\
175             # TYPE fuserule_agent_failures_total counter\n\
176             fuserule_agent_failures_total {}\n\
177             # HELP fuserule_evaluation_errors_total Total number of rule evaluation errors.\n\
178             # TYPE fuserule_evaluation_errors_total counter\n\
179             fuserule_evaluation_errors_total {}\n",
180            snapshot.batches_processed,
181            snapshot.activations_total,
182            snapshot.deactivations_total,
183            snapshot.agent_failures,
184            snapshot.evaluation_errors
185        );
186
187        // Add per-rule metrics
188        output.push_str("# HELP fuserule_rule_evaluations_total Total evaluations per rule.\n");
189        output.push_str("# TYPE fuserule_rule_evaluations_total counter\n");
190        for (rule_id, count) in &snapshot.rule_evaluations {
191            output.push_str(&format!(
192                "fuserule_rule_evaluations_total{{rule_id=\"{}\"}} {}\n",
193                rule_id, count
194            ));
195        }
196
197        output.push_str("# HELP fuserule_rule_activations_total Total activations per rule.\n");
198        output.push_str("# TYPE fuserule_rule_activations_total counter\n");
199        for (rule_id, count) in &snapshot.rule_activations {
200            output.push_str(&format!(
201                "fuserule_rule_activations_total{{rule_id=\"{}\"}} {}\n",
202                rule_id, count
203            ));
204        }
205
206        // Add histogram metrics
207        output.push_str(
208            &self
209                .evaluation_duration
210                .to_prometheus("fuserule_evaluation_duration", ""),
211        );
212
213        // Add per-rule evaluation duration histograms
214        let rule_durations = self.rule_evaluation_duration.lock().unwrap();
215        for (rule_id, hist) in rule_durations.iter() {
216            output.push_str(&hist.to_prometheus(
217                "fuserule_rule_evaluation_duration",
218                &format!("rule_id=\"{}\"", rule_id),
219            ));
220        }
221
222        let agent_durations = self.agent_execution_duration.lock().unwrap();
223        for (agent_name, hist) in agent_durations.iter() {
224            output.push_str(&hist.to_prometheus(
225                "fuserule_agent_execution_duration",
226                &format!("agent=\"{}\"", agent_name),
227            ));
228        }
229
230        output
231    }
232}
233
234#[derive(Debug, serde::Serialize)]
235pub struct MetricsSnapshot {
236    pub batches_processed: u64,
237    pub activations_total: u64,
238    pub deactivations_total: u64,
239    pub agent_failures: u64,
240    pub evaluation_errors: u64,
241    pub rule_evaluations: HashMap<String, u64>,
242    pub rule_activations: HashMap<String, u64>,
243}
244
245lazy_static::lazy_static! {
246    pub static ref METRICS: SystemMetrics = SystemMetrics::new();
247}