1use std::net::SocketAddr;
4use std::sync::OnceLock;
5use std::time::Duration;
6
7use metrics::{counter, gauge, histogram};
8use metrics_exporter_prometheus::PrometheusBuilder;
9use rustvello_core::error::{RustvelloError, RustvelloResult};
10use rustvello_core::observability::EventEmitter;
11use rustvello_proto::identifiers::{InvocationId, RunnerId, TaskId};
12
13static RECORDER_INSTALLED: OnceLock<()> = OnceLock::new();
15
16pub struct PrometheusSink {
41 _handle: metrics_exporter_prometheus::PrometheusHandle,
42}
43
44impl PrometheusSink {
45 pub fn new(bind: SocketAddr) -> RustvelloResult<Self> {
52 if RECORDER_INSTALLED.get().is_some() {
53 return Err(RustvelloError::Configuration {
54 message:
55 "Prometheus recorder already installed — only one PrometheusSink per process"
56 .to_owned(),
57 });
58 }
59 let builder = PrometheusBuilder::new().with_http_listener(bind);
60 let handle = builder
61 .install_recorder()
62 .map_err(|e| RustvelloError::Configuration {
63 message: format!("failed to install Prometheus recorder: {e}"),
64 })?;
65 let _ = RECORDER_INSTALLED.set(());
66 Ok(Self { _handle: handle })
67 }
68}
69
70impl EventEmitter for PrometheusSink {
71 fn on_worker_started(&self, runner_id: &RunnerId) {
72 gauge!("rustvello_workers_active", "runner_id" => runner_id.as_str().to_owned())
73 .increment(1.0);
74 }
75
76 fn on_worker_shutdown(&self, runner_id: &RunnerId) {
77 gauge!("rustvello_workers_active", "runner_id" => runner_id.as_str().to_owned())
78 .decrement(1.0);
79 }
80
81 fn on_task_submitted(&self, task_id: &TaskId, _inv_id: &InvocationId) {
82 counter!("rustvello_tasks_submitted_total", "task_id" => task_id.to_string()).increment(1);
83 }
84
85 fn on_task_started(&self, task_id: &TaskId, _inv_id: &InvocationId) {
86 counter!("rustvello_tasks_started_total", "task_id" => task_id.to_string()).increment(1);
87 }
88
89 fn on_task_succeeded(&self, task_id: &TaskId, _inv_id: &InvocationId, duration: Duration) {
90 let tid = task_id.to_string();
91 counter!("rustvello_tasks_succeeded_total", "task_id" => tid.clone()).increment(1);
92 histogram!("rustvello_task_duration_seconds", "task_id" => tid)
93 .record(duration.as_secs_f64());
94 }
95
96 fn on_task_failed(
97 &self,
98 task_id: &TaskId,
99 _inv_id: &InvocationId,
100 _error: &str,
101 duration: Duration,
102 ) {
103 let tid = task_id.to_string();
104 counter!("rustvello_tasks_failed_total", "task_id" => tid.clone()).increment(1);
105 histogram!("rustvello_task_duration_seconds", "task_id" => tid)
106 .record(duration.as_secs_f64());
107 }
108
109 fn on_task_retried(&self, task_id: &TaskId, _inv_id: &InvocationId, _attempt: u32) {
110 counter!("rustvello_tasks_retried_total", "task_id" => task_id.to_string()).increment(1);
111 }
112
113 fn on_queue_depth(&self, queue: &str, depth: usize) {
114 gauge!("rustvello_queue_depth", "queue" => queue.to_owned()).set(depth as f64);
115 }
116
117 fn on_cc_rejected(&self, task_id: &TaskId) {
118 counter!("rustvello_cc_rejected_total", "task_id" => task_id.to_string()).increment(1);
119 }
120
121 fn on_cc_slot_acquired(&self, task_id: &TaskId) {
122 let tid = task_id.to_string();
123 counter!("rustvello_cc_slot_acquired_total", "task_id" => tid.clone()).increment(1);
124 gauge!("rustvello_cc_slots_active", "task_id" => tid).increment(1.0);
125 }
126
127 fn on_cc_slot_released(&self, task_id: &TaskId) {
128 let tid = task_id.to_string();
129 counter!("rustvello_cc_slot_released_total", "task_id" => tid.clone()).increment(1);
130 gauge!("rustvello_cc_slots_active", "task_id" => tid).decrement(1.0);
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137 use metrics::Recorder;
138 use metrics_exporter_prometheus::PrometheusBuilder;
139
140 fn test_recorder() -> (impl Recorder, metrics_exporter_prometheus::PrometheusHandle) {
142 let recorder = PrometheusBuilder::new().build_recorder();
143 let handle = recorder.handle();
144 (recorder, handle)
145 }
146
147 fn task_id(s: &str) -> TaskId {
148 s.parse().unwrap()
149 }
150
151 fn inv_id(s: &str) -> InvocationId {
152 InvocationId::from(s)
153 }
154
155 fn runner_id(s: &str) -> RunnerId {
156 RunnerId::from(s)
157 }
158
159 #[test]
160 fn worker_started_increments_gauge() {
161 let (rec, handle) = test_recorder();
162 let sink = PrometheusSink {
163 _handle: handle.clone(),
164 };
165
166 metrics::with_local_recorder(&rec, || {
167 sink.on_worker_started(&runner_id("r1"));
168 });
169
170 let output = handle.render();
171 assert!(
172 output.contains("rustvello_workers_active"),
173 "missing gauge:\n{output}"
174 );
175 assert!(
176 output.contains("runner_id=\"r1\""),
177 "missing label:\n{output}"
178 );
179 }
180
181 #[test]
182 fn worker_shutdown_decrements_gauge() {
183 let (rec, handle) = test_recorder();
184 let sink = PrometheusSink {
185 _handle: handle.clone(),
186 };
187
188 metrics::with_local_recorder(&rec, || {
189 sink.on_worker_started(&runner_id("r1"));
190 sink.on_worker_shutdown(&runner_id("r1"));
191 });
192
193 let output = handle.render();
194 assert!(output.contains("rustvello_workers_active{runner_id=\"r1\"} 0"));
196 }
197
198 #[test]
199 fn task_submitted_increments_counter() {
200 let (rec, handle) = test_recorder();
201 let sink = PrometheusSink {
202 _handle: handle.clone(),
203 };
204
205 metrics::with_local_recorder(&rec, || {
206 sink.on_task_submitted(&task_id("my_app.add"), &inv_id("inv-001"));
207 sink.on_task_submitted(&task_id("my_app.add"), &inv_id("inv-002"));
208 });
209
210 let output = handle.render();
211 assert!(
212 output.contains("rustvello_tasks_submitted_total"),
213 "missing counter:\n{output}"
214 );
215 assert!(output.contains("} 2"), "expected count 2:\n{output}");
217 }
218
219 #[test]
220 fn task_started_increments_counter() {
221 let (rec, handle) = test_recorder();
222 let sink = PrometheusSink {
223 _handle: handle.clone(),
224 };
225
226 metrics::with_local_recorder(&rec, || {
227 sink.on_task_started(&task_id("my_app.run"), &inv_id("inv-001"));
228 });
229
230 let output = handle.render();
231 assert!(output.contains("rustvello_tasks_started_total"));
232 }
233
234 #[test]
235 fn task_succeeded_records_counter_and_histogram() {
236 let (rec, handle) = test_recorder();
237 let sink = PrometheusSink {
238 _handle: handle.clone(),
239 };
240
241 metrics::with_local_recorder(&rec, || {
242 sink.on_task_succeeded(
243 &task_id("my_app.calc"),
244 &inv_id("inv-001"),
245 Duration::from_millis(150),
246 );
247 });
248
249 let output = handle.render();
250 assert!(
251 output.contains("rustvello_tasks_succeeded_total"),
252 "missing succeeded counter:\n{output}"
253 );
254 assert!(
255 output.contains("rustvello_task_duration_seconds"),
256 "missing duration histogram:\n{output}"
257 );
258 }
259
260 #[test]
261 fn task_failed_records_counter_and_histogram() {
262 let (rec, handle) = test_recorder();
263 let sink = PrometheusSink {
264 _handle: handle.clone(),
265 };
266
267 metrics::with_local_recorder(&rec, || {
268 sink.on_task_failed(
269 &task_id("my_app.calc"),
270 &inv_id("inv-001"),
271 "boom",
272 Duration::from_secs(1),
273 );
274 });
275
276 let output = handle.render();
277 assert!(output.contains("rustvello_tasks_failed_total"));
278 assert!(output.contains("rustvello_task_duration_seconds"));
279 }
280
281 #[test]
282 fn task_retried_increments_counter() {
283 let (rec, handle) = test_recorder();
284 let sink = PrometheusSink {
285 _handle: handle.clone(),
286 };
287
288 metrics::with_local_recorder(&rec, || {
289 sink.on_task_retried(&task_id("my_app.flaky"), &inv_id("inv-001"), 3);
290 });
291
292 let output = handle.render();
293 assert!(output.contains("rustvello_tasks_retried_total"));
294 }
295
296 #[test]
297 fn queue_depth_sets_gauge() {
298 let (rec, handle) = test_recorder();
299 let sink = PrometheusSink {
300 _handle: handle.clone(),
301 };
302
303 metrics::with_local_recorder(&rec, || {
304 sink.on_queue_depth("default", 42);
305 });
306
307 let output = handle.render();
308 assert!(output.contains("rustvello_queue_depth{queue=\"default\"} 42"));
309 }
310
311 #[test]
312 fn cc_rejected_increments_counter() {
313 let (rec, handle) = test_recorder();
314 let sink = PrometheusSink {
315 _handle: handle.clone(),
316 };
317
318 metrics::with_local_recorder(&rec, || {
319 sink.on_cc_rejected(&task_id("my_app.heavy"));
320 });
321
322 let output = handle.render();
323 assert!(output.contains("rustvello_cc_rejected_total"));
324 }
325
326 #[test]
327 fn cc_slot_acquired_increments_counter_and_gauge() {
328 let (rec, handle) = test_recorder();
329 let sink = PrometheusSink {
330 _handle: handle.clone(),
331 };
332
333 metrics::with_local_recorder(&rec, || {
334 sink.on_cc_slot_acquired(&task_id("my_app.limited"));
335 });
336
337 let output = handle.render();
338 assert!(
339 output.contains("rustvello_cc_slot_acquired_total"),
340 "missing acquired counter:\n{output}"
341 );
342 assert!(
343 output.contains("rustvello_cc_slots_active"),
344 "missing active gauge:\n{output}"
345 );
346 }
347
348 #[test]
349 fn cc_slot_released_decrements_gauge() {
350 let (rec, handle) = test_recorder();
351 let sink = PrometheusSink {
352 _handle: handle.clone(),
353 };
354
355 metrics::with_local_recorder(&rec, || {
356 sink.on_cc_slot_acquired(&task_id("my_app.limited"));
357 sink.on_cc_slot_released(&task_id("my_app.limited"));
358 });
359
360 let output = handle.render();
361 assert!(
362 output.contains("rustvello_cc_slot_released_total"),
363 "missing released counter:\n{output}"
364 );
365 assert!(
367 output.contains("rustvello_cc_slots_active{task_id=\"my_app.limited\"} 0"),
368 "expected gauge at 0:\n{output}"
369 );
370 }
371}