1use std::collections::HashMap;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::sync::Mutex;
4
5#[derive(Debug)]
7pub struct Histogram {
8 buckets: Vec<(f64, AtomicU64)>, }
10
11impl Histogram {
12 fn new() -> Self {
13 let bounds = vec![
15 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
16 ];
17 Self {
18 buckets: bounds.into_iter().map(|b| (b, AtomicU64::new(0))).collect(),
19 }
20 }
21
22 fn record(&self, value: f64) {
23 for (bound, count) in &self.buckets {
24 if value <= *bound {
25 count.fetch_add(1, Ordering::Relaxed);
26 return;
27 }
28 }
29 if let Some((_, count)) = self.buckets.last() {
31 count.fetch_add(1, Ordering::Relaxed);
32 }
33 }
34
35 fn snapshot(&self) -> Vec<(f64, u64)> {
36 self.buckets
37 .iter()
38 .map(|(bound, count)| (*bound, count.load(Ordering::Relaxed)))
39 .collect()
40 }
41
42 fn to_prometheus(&self, name: &str, labels: &str) -> String {
43 let snapshot = self.snapshot();
44 let total: u64 = snapshot.iter().map(|(_, c)| c).sum();
45 let mut output = format!("# HELP {}_seconds Duration histogram.\n", name);
46 output.push_str(&format!("# TYPE {}_seconds histogram\n", name));
47 for (bound, count) in snapshot {
48 output.push_str(&format!(
49 "{}{{le=\"{}\",{}}} {}\n",
50 name, bound, labels, count
51 ));
52 }
53 output.push_str(&format!("{}{{le=\"+Inf\",{}}} {}\n", name, labels, total));
54 output
55 }
56}
57
58pub struct SystemMetrics {
59 pub batches_processed: AtomicU64,
60 pub activations_total: AtomicU64,
61 pub deactivations_total: AtomicU64,
62 pub agent_failures: AtomicU64,
63 pub evaluation_errors: AtomicU64,
64 pub rule_evaluations: Mutex<HashMap<String, AtomicU64>>,
65 pub rule_activations: Mutex<HashMap<String, AtomicU64>>,
66 pub evaluation_duration: Histogram,
67 pub rule_evaluation_duration: Mutex<HashMap<String, Histogram>>, pub agent_execution_duration: Mutex<HashMap<String, Histogram>>,
69}
70
71impl Default for SystemMetrics {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77impl SystemMetrics {
78 pub fn new() -> Self {
79 Self {
80 batches_processed: AtomicU64::new(0),
81 activations_total: AtomicU64::new(0),
82 deactivations_total: AtomicU64::new(0),
83 agent_failures: AtomicU64::new(0),
84 evaluation_errors: AtomicU64::new(0),
85 rule_evaluations: Mutex::new(HashMap::new()),
86 rule_activations: Mutex::new(HashMap::new()),
87 evaluation_duration: Histogram::new(),
88 rule_evaluation_duration: Mutex::new(HashMap::new()),
89 agent_execution_duration: Mutex::new(HashMap::new()),
90 }
91 }
92
93 pub fn record_evaluation_duration(&self, duration_secs: f64) {
94 self.evaluation_duration.record(duration_secs);
95 }
96
97 pub fn record_agent_execution_duration(&self, agent_name: &str, duration_secs: f64) {
98 let mut map = self.agent_execution_duration.lock().unwrap();
99 let hist = map
100 .entry(agent_name.to_string())
101 .or_insert_with(Histogram::new);
102 hist.record(duration_secs);
103 }
104
105 pub fn record_rule_evaluation(&self, rule_id: &str) {
106 let mut map = self.rule_evaluations.lock().unwrap();
107 map.entry(rule_id.to_string())
108 .or_insert_with(|| AtomicU64::new(0))
109 .fetch_add(1, Ordering::Relaxed);
110 }
111
112 pub fn record_rule_activation(&self, rule_id: &str) {
113 let mut map = self.rule_activations.lock().unwrap();
114 map.entry(rule_id.to_string())
115 .or_insert_with(|| AtomicU64::new(0))
116 .fetch_add(1, Ordering::Relaxed);
117 }
118
119 pub fn record_deactivation(&self) {
120 self.deactivations_total.fetch_add(1, Ordering::Relaxed);
121 }
122
123 pub fn record_evaluation_error(&self) {
124 self.evaluation_errors.fetch_add(1, Ordering::Relaxed);
125 }
126
127 pub fn record_rule_evaluation_duration(&self, rule_id: &str, duration_secs: f64) {
128 let mut map = self.rule_evaluation_duration.lock().unwrap();
129 let hist = map
130 .entry(rule_id.to_string())
131 .or_insert_with(Histogram::new);
132 hist.record(duration_secs);
133 }
134
135 pub fn snapshot(&self) -> MetricsSnapshot {
136 let rule_evals: HashMap<String, u64> = self
137 .rule_evaluations
138 .lock()
139 .unwrap()
140 .iter()
141 .map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed)))
142 .collect();
143 let rule_acts: HashMap<String, u64> = self
144 .rule_activations
145 .lock()
146 .unwrap()
147 .iter()
148 .map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed)))
149 .collect();
150
151 MetricsSnapshot {
152 batches_processed: self.batches_processed.load(Ordering::Relaxed),
153 activations_total: self.activations_total.load(Ordering::Relaxed),
154 deactivations_total: self.deactivations_total.load(Ordering::Relaxed),
155 agent_failures: self.agent_failures.load(Ordering::Relaxed),
156 evaluation_errors: self.evaluation_errors.load(Ordering::Relaxed),
157 rule_evaluations: rule_evals,
158 rule_activations: rule_acts,
159 }
160 }
161
162 pub fn to_prometheus(&self) -> String {
163 let snapshot = self.snapshot();
164 let mut output = format!(
165 "# HELP fuserule_batches_processed_total Total number of data batches ingested.\n\
166 # TYPE fuserule_batches_processed_total counter\n\
167 fuserule_batches_processed_total {}\n\
168 # HELP fuserule_activations_total Total number of rule activations triggered.\n\
169 # TYPE fuserule_activations_total counter\n\
170 fuserule_activations_total {}\n\
171 # HELP fuserule_deactivations_total Total number of rule deactivations.\n\
172 # TYPE fuserule_deactivations_total counter\n\
173 fuserule_deactivations_total {}\n\
174 # HELP fuserule_agent_failures_total Total number of failed agent executions.\n\
175 # TYPE fuserule_agent_failures_total counter\n\
176 fuserule_agent_failures_total {}\n\
177 # HELP fuserule_evaluation_errors_total Total number of rule evaluation errors.\n\
178 # TYPE fuserule_evaluation_errors_total counter\n\
179 fuserule_evaluation_errors_total {}\n",
180 snapshot.batches_processed,
181 snapshot.activations_total,
182 snapshot.deactivations_total,
183 snapshot.agent_failures,
184 snapshot.evaluation_errors
185 );
186
187 output.push_str("# HELP fuserule_rule_evaluations_total Total evaluations per rule.\n");
189 output.push_str("# TYPE fuserule_rule_evaluations_total counter\n");
190 for (rule_id, count) in &snapshot.rule_evaluations {
191 output.push_str(&format!(
192 "fuserule_rule_evaluations_total{{rule_id=\"{}\"}} {}\n",
193 rule_id, count
194 ));
195 }
196
197 output.push_str("# HELP fuserule_rule_activations_total Total activations per rule.\n");
198 output.push_str("# TYPE fuserule_rule_activations_total counter\n");
199 for (rule_id, count) in &snapshot.rule_activations {
200 output.push_str(&format!(
201 "fuserule_rule_activations_total{{rule_id=\"{}\"}} {}\n",
202 rule_id, count
203 ));
204 }
205
206 output.push_str(
208 &self
209 .evaluation_duration
210 .to_prometheus("fuserule_evaluation_duration", ""),
211 );
212
213 let rule_durations = self.rule_evaluation_duration.lock().unwrap();
215 for (rule_id, hist) in rule_durations.iter() {
216 output.push_str(&hist.to_prometheus(
217 "fuserule_rule_evaluation_duration",
218 &format!("rule_id=\"{}\"", rule_id),
219 ));
220 }
221
222 let agent_durations = self.agent_execution_duration.lock().unwrap();
223 for (agent_name, hist) in agent_durations.iter() {
224 output.push_str(&hist.to_prometheus(
225 "fuserule_agent_execution_duration",
226 &format!("agent=\"{}\"", agent_name),
227 ));
228 }
229
230 output
231 }
232}
233
234#[derive(Debug, serde::Serialize)]
235pub struct MetricsSnapshot {
236 pub batches_processed: u64,
237 pub activations_total: u64,
238 pub deactivations_total: u64,
239 pub agent_failures: u64,
240 pub evaluation_errors: u64,
241 pub rule_evaluations: HashMap<String, u64>,
242 pub rule_activations: HashMap<String, u64>,
243}
244
245lazy_static::lazy_static! {
246 pub static ref METRICS: SystemMetrics = SystemMetrics::new();
247}