worldinterface_daemon/metrics/
registry.rs1use prometheus::{
4 Counter, CounterVec, Encoder, Gauge, HistogramOpts, HistogramVec, Opts, Registry, TextEncoder,
5};
6
7#[derive(Debug, thiserror::Error)]
9pub enum MetricsError {
10 #[error("prometheus error: {0}")]
11 Prometheus(#[from] prometheus::Error),
12
13 #[error("encoding error: {0}")]
14 Encoding(#[from] std::string::FromUtf8Error),
15}
16
17pub struct WiMetrics {
19 pub flow_runs_total: CounterVec,
21
22 pub flow_runs_active: Gauge,
24
25 pub step_runs_total: CounterVec,
28
29 pub step_duration_seconds: HistogramVec,
32
33 pub contextstore_writes_total: Counter,
35
36 pub connector_invocations_total: CounterVec,
39
40 pub webhook_invocations_total: CounterVec,
43
44 pub webhook_errors_total: Counter,
46}
47
48pub struct WiMetricsRegistry {
50 registry: Registry,
51 collectors: WiMetrics,
52}
53
54impl WiMetricsRegistry {
55 pub fn new() -> Result<Self, MetricsError> {
57 let registry = Registry::new_custom(Some("wi".into()), None)?;
58
59 let flow_runs_total = CounterVec::new(
60 Opts::new("flow_runs_total", "Total flow runs by terminal status"),
61 &["status"],
62 )?;
63 registry.register(Box::new(flow_runs_total.clone()))?;
64
65 let flow_runs_active = Gauge::with_opts(Opts::new(
66 "flow_runs_active",
67 "Currently active (non-terminal) flow runs",
68 ))?;
69 registry.register(Box::new(flow_runs_active.clone()))?;
70
71 let step_runs_total = CounterVec::new(
72 Opts::new("step_runs_total", "Total step executions by connector and status"),
73 &["connector", "status"],
74 )?;
75 registry.register(Box::new(step_runs_total.clone()))?;
76
77 let step_duration_seconds = HistogramVec::new(
78 HistogramOpts::new("step_duration_seconds", "Step execution duration in seconds"),
79 &["connector"],
80 )?;
81 registry.register(Box::new(step_duration_seconds.clone()))?;
82
83 let contextstore_writes_total = Counter::with_opts(Opts::new(
84 "contextstore_writes_total",
85 "Total ContextStore write operations",
86 ))?;
87 registry.register(Box::new(contextstore_writes_total.clone()))?;
88
89 let connector_invocations_total = CounterVec::new(
90 Opts::new("connector_invocations_total", "Total connector invocations"),
91 &["connector"],
92 )?;
93 registry.register(Box::new(connector_invocations_total.clone()))?;
94
95 let webhook_invocations_total = CounterVec::new(
96 Opts::new("webhook_invocations_total", "Total webhook invocations by path"),
97 &["path"],
98 )?;
99 registry.register(Box::new(webhook_invocations_total.clone()))?;
100
101 let webhook_errors_total = Counter::with_opts(Opts::new(
102 "webhook_errors_total",
103 "Total webhook invocation errors",
104 ))?;
105 registry.register(Box::new(webhook_errors_total.clone()))?;
106
107 Ok(Self {
108 registry,
109 collectors: WiMetrics {
110 flow_runs_total,
111 flow_runs_active,
112 step_runs_total,
113 step_duration_seconds,
114 contextstore_writes_total,
115 connector_invocations_total,
116 webhook_invocations_total,
117 webhook_errors_total,
118 },
119 })
120 }
121
122 pub fn collectors(&self) -> &WiMetrics {
124 &self.collectors
125 }
126
127 pub fn encode_text(&self) -> Result<String, MetricsError> {
129 let metric_families = self.registry.gather();
130 let encoder = TextEncoder::new();
131 let mut buffer = Vec::new();
132 encoder.encode(&metric_families, &mut buffer)?;
133 Ok(String::from_utf8(buffer)?)
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140
141 #[test]
142 fn registry_creates_all_metrics() {
143 let registry = WiMetricsRegistry::new().unwrap();
144 let c = registry.collectors();
145 let _ = &c.flow_runs_total;
147 let _ = &c.flow_runs_active;
148 let _ = &c.step_runs_total;
149 let _ = &c.step_duration_seconds;
150 let _ = &c.contextstore_writes_total;
151 let _ = &c.connector_invocations_total;
152 let _ = &c.webhook_invocations_total;
153 let _ = &c.webhook_errors_total;
154 }
155
156 #[test]
157 fn registry_encodes_text() {
158 let registry = WiMetricsRegistry::new().unwrap();
159 let c = registry.collectors();
161 c.flow_runs_total.with_label_values(&["completed"]);
162 c.step_runs_total.with_label_values(&["delay", "success"]);
163 c.webhook_invocations_total.with_label_values(&["test"]);
164 c.connector_invocations_total.with_label_values(&["delay"]);
165
166 let text = registry.encode_text().unwrap();
167 assert!(text.contains("wi_flow_runs_total"), "missing flow_runs_total");
168 assert!(text.contains("wi_step_runs_total"), "missing step_runs_total");
169 assert!(text.contains("wi_contextstore_writes_total"), "missing contextstore_writes_total");
170 assert!(text.contains("wi_webhook_invocations_total"), "missing webhook_invocations_total");
171 }
172
173 #[test]
174 fn flow_runs_total_increments() {
175 let registry = WiMetricsRegistry::new().unwrap();
176 registry.collectors().flow_runs_total.with_label_values(&["completed"]).inc();
177 let text = registry.encode_text().unwrap();
178 assert!(text.contains("wi_flow_runs_total{status=\"completed\"} 1"));
179 }
180
181 #[test]
182 fn step_runs_total_increments_by_label() {
183 let registry = WiMetricsRegistry::new().unwrap();
184 let c = registry.collectors();
185 c.step_runs_total.with_label_values(&["delay", "success"]).inc();
186 c.step_runs_total.with_label_values(&["fs.write", "success"]).inc();
187 let text = registry.encode_text().unwrap();
188 assert!(text.contains("wi_step_runs_total{connector=\"delay\",status=\"success\"} 1"));
189 assert!(text.contains("wi_step_runs_total{connector=\"fs.write\",status=\"success\"} 1"));
190 }
191
192 #[test]
193 fn step_duration_histogram_observes() {
194 let registry = WiMetricsRegistry::new().unwrap();
195 registry.collectors().step_duration_seconds.with_label_values(&["delay"]).observe(0.5);
196 let text = registry.encode_text().unwrap();
197 assert!(text.contains("wi_step_duration_seconds"));
198 assert!(text.contains("wi_step_duration_seconds_count{connector=\"delay\"} 1"));
200 }
201
202 #[test]
203 fn contextstore_writes_counter() {
204 let registry = WiMetricsRegistry::new().unwrap();
205 let c = registry.collectors();
206 c.contextstore_writes_total.inc();
207 c.contextstore_writes_total.inc();
208 c.contextstore_writes_total.inc();
209 let text = registry.encode_text().unwrap();
210 assert!(text.contains("wi_contextstore_writes_total 3"));
211 }
212
213 #[test]
214 fn webhook_invocations_by_path() {
215 let registry = WiMetricsRegistry::new().unwrap();
216 let c = registry.collectors();
217 c.webhook_invocations_total.with_label_values(&["github/push"]).inc();
218 c.webhook_invocations_total.with_label_values(&["github/push"]).inc();
219 let text = registry.encode_text().unwrap();
220 assert!(text.contains("wi_webhook_invocations_total{path=\"github/push\"} 2"));
221 }
222
223 #[test]
224 fn webhook_errors_counter() {
225 let registry = WiMetricsRegistry::new().unwrap();
226 registry.collectors().webhook_errors_total.inc();
227 let text = registry.encode_text().unwrap();
228 assert!(text.contains("wi_webhook_errors_total 1"));
229 }
230}