1use std::sync::Arc;
8
9use prometheus::{Counter, CounterVec, Gauge, Histogram, HistogramVec, Registry};
10use taskvisor::{BackoffSource, Event, EventKind, Subscribe};
11
12use crate::register::{Sub, ms_to_secs};
13
14pub const DEFAULT_QUEUE_CAPACITY: usize = 2048;
16
17pub struct PrometheusSubscriber {
83 tasks_in_flight: Gauge,
84 task_restarts: Counter,
85 task_backoff_count: CounterVec,
86 task_backoff_duration: Histogram,
87 task_terminal: CounterVec,
88 attempts_to_finalize: HistogramVec,
89 task_timeouts: Counter,
90 subscriber_overflow: Counter,
91 subscriber_panicked: Counter,
92 controller_submissions: Counter,
93 controller_rejections: CounterVec,
94 queue_capacity: usize,
95}
96
97fn classify_rejection_reason(reason: Option<&str>) -> &'static str {
104 let Some(r) = reason else {
105 return "unknown";
106 };
107 if r.starts_with("slot full") {
108 "slot_full"
109 } else if r.starts_with("dropped: slot busy") {
110 "slot_busy"
111 } else if r.starts_with("add_failed") {
112 "add_failed"
113 } else if r.starts_with("remove_failed") {
114 "remove_failed"
115 } else if r.starts_with("queue_start_failed") {
116 "queue_failed"
117 } else if r.starts_with("recovery_start_failed") {
118 "recovery_failed"
119 } else if r.starts_with("bus_lagged") {
120 "bus_lagged"
121 } else if r.starts_with("controller_loop_exited") {
122 "controller_exited"
123 } else {
124 "other"
125 }
126}
127
128impl PrometheusSubscriber {
129 pub fn new(registry: Arc<Registry>) -> Result<Self, prometheus::Error> {
131 Self::with_queue_capacity(registry, DEFAULT_QUEUE_CAPACITY)
132 }
133
134 pub fn with_queue_capacity(
136 registry: Arc<Registry>,
137 queue_capacity: usize,
138 ) -> Result<Self, prometheus::Error> {
139 let sv = Sub::new(®istry, "sv");
140 let ctrl = Sub::new(®istry, "ctrl");
141
142 let tasks_in_flight = sv.gauge("tasks_in_flight", "Number of tasks currently executing")?;
143 let task_restarts =
144 sv.counter("task_restarts_total", "Total task restarts (attempt > 1)")?;
145 let task_backoff_count = sv.counter_vec(
146 "task_backoff_count_total",
147 "Total backoff events",
148 &["source"],
149 )?;
150 let task_backoff_duration = sv.histogram(
151 "task_backoff_duration_seconds",
152 "Backoff delay duration in seconds",
153 vec![
154 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1800.0,
155 3600.0,
156 ],
157 )?;
158 let task_terminal = sv.counter_vec(
159 "task_terminal_total",
160 "Total terminal task states",
161 &["reason"],
162 )?;
163 let attempts_to_finalize = sv.histogram_vec(
164 "attempts_to_finalize",
165 "Number of attempts observed when a task leaves the supervision loop",
166 vec![1.0, 2.0, 3.0, 5.0, 10.0, 20.0, 50.0, 100.0],
167 &["outcome"],
168 )?;
169 let task_timeouts = sv.counter("task_timeouts_total", "Total task timeout events")?;
170 let subscriber_overflow = sv.counter(
171 "subscriber_overflow_total",
172 "Total subscriber queue overflow events (events lost)",
173 )?;
174 let subscriber_panicked =
175 sv.counter("subscriber_panicked_total", "Total subscriber panic events")?;
176
177 let controller_submissions =
178 ctrl.counter("submissions_total", "Total controller submissions")?;
179 let controller_rejections = ctrl.counter_vec(
180 "rejections_total",
181 "Total controller rejections grouped by cause",
182 &["reason"],
183 )?;
184
185 Ok(Self {
186 tasks_in_flight,
187 task_restarts,
188 task_backoff_count,
189 task_backoff_duration,
190 task_terminal,
191 attempts_to_finalize,
192 task_timeouts,
193 subscriber_overflow,
194 subscriber_panicked,
195 controller_submissions,
196 controller_rejections,
197 queue_capacity,
198 })
199 }
200}
201
202impl std::fmt::Debug for PrometheusSubscriber {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 f.debug_struct("PrometheusSubscriber").finish()
205 }
206}
207
208impl Subscribe for PrometheusSubscriber {
209 fn on_event(&self, event: &Event) {
211 match event.kind {
212 EventKind::TaskStarting => {
213 self.tasks_in_flight.inc();
214 if event.attempt.unwrap_or(1) > 1 {
215 self.task_restarts.inc();
216 }
217 }
218 EventKind::TaskStopped | EventKind::TaskFailed => {
219 if self.tasks_in_flight.get() > 0.0 {
220 self.tasks_in_flight.dec();
221 }
222 }
223 EventKind::TimeoutHit => {
224 self.task_timeouts.inc();
225 }
226 EventKind::SubscriberOverflow => {
227 tracing::warn!(
228 task = event.task.as_deref().unwrap_or("unknown"),
229 "subscriber queue overflow: events are being dropped"
230 );
231 self.subscriber_overflow.inc();
232 }
233 EventKind::SubscriberPanicked => {
234 tracing::warn!(
235 task = event.task.as_deref().unwrap_or("unknown"),
236 reason = event.reason.as_deref().unwrap_or("unknown"),
237 "subscriber panicked while processing an event"
238 );
239 self.subscriber_panicked.inc();
240 }
241 EventKind::BackoffScheduled => {
242 let source = match event.backoff_source {
243 Some(BackoffSource::Failure) => "failure",
244 Some(BackoffSource::Success) => "success",
245 None => "unknown",
246 };
247 self.task_backoff_count.with_label_values(&[source]).inc();
248
249 if let Some(delay_ms) = event.delay_ms {
250 self.task_backoff_duration
251 .observe(ms_to_secs(delay_ms.into()));
252 }
253 }
254 EventKind::ActorExhausted => {
255 self.task_terminal.with_label_values(&["exhausted"]).inc();
256 self.attempts_to_finalize
257 .with_label_values(&["exhausted"])
258 .observe(f64::from(event.attempt.unwrap_or(1)));
259 }
260 EventKind::ActorDead => {
261 self.task_terminal.with_label_values(&["fatal"]).inc();
262 self.attempts_to_finalize
263 .with_label_values(&["fatal"])
264 .observe(f64::from(event.attempt.unwrap_or(1)));
265 }
266 EventKind::ControllerSubmitted => {
267 self.controller_submissions.inc();
268 }
269 EventKind::ControllerRejected => {
270 let reason = classify_rejection_reason(event.reason.as_deref());
271 self.controller_rejections
272 .with_label_values(&[reason])
273 .inc();
274 }
275 EventKind::TaskAdded
276 | EventKind::TaskRemoved
277 | EventKind::TaskAddRequested
278 | EventKind::TaskRemoveRequested
279 | EventKind::ShutdownRequested
280 | EventKind::AllStoppedWithinGrace
281 | EventKind::GraceExceeded
282 | EventKind::ControllerSlotTransition => {}
283 }
284 }
285
286 fn name(&self) -> &'static str {
288 "prometheus"
289 }
290
291 fn queue_capacity(&self) -> usize {
293 self.queue_capacity
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use prometheus::Encoder;
301 use solti_runner::MetricsBackend;
302 use std::time::Duration;
303
304 fn new_subscriber() -> PrometheusSubscriber {
305 let registry = Arc::new(Registry::new());
306 PrometheusSubscriber::new(registry).unwrap()
307 }
308
309 fn metrics_text(registry: &Registry) -> String {
310 let encoder = prometheus::TextEncoder::new();
311 let families = registry.gather();
312 let mut buf = Vec::new();
313 encoder.encode(&families, &mut buf).unwrap();
314 String::from_utf8(buf).unwrap()
315 }
316
317 #[test]
318 fn task_starting_increments_in_flight() {
319 let sub = new_subscriber();
320
321 sub.on_event(
322 &Event::new(EventKind::TaskStarting)
323 .with_task("t")
324 .with_attempt(1),
325 );
326
327 assert_eq!(sub.tasks_in_flight.get(), 1.0);
328 }
329
330 #[test]
331 fn task_stopped_decrements_in_flight() {
332 let sub = new_subscriber();
333
334 sub.on_event(
335 &Event::new(EventKind::TaskStarting)
336 .with_task("t")
337 .with_attempt(1),
338 );
339 sub.on_event(&Event::new(EventKind::TaskStopped).with_task("t"));
340
341 assert_eq!(sub.tasks_in_flight.get(), 0.0);
342 }
343
344 #[test]
345 fn task_failed_decrements_in_flight() {
346 let sub = new_subscriber();
347
348 sub.on_event(
349 &Event::new(EventKind::TaskStarting)
350 .with_task("t")
351 .with_attempt(1),
352 );
353 sub.on_event(
354 &Event::new(EventKind::TaskFailed)
355 .with_task("t")
356 .with_reason("boom"),
357 );
358
359 assert_eq!(sub.tasks_in_flight.get(), 0.0);
360 }
361
362 #[test]
363 fn first_attempt_is_not_a_restart() {
364 let sub = new_subscriber();
365
366 sub.on_event(
367 &Event::new(EventKind::TaskStarting)
368 .with_task("t")
369 .with_attempt(1),
370 );
371
372 assert_eq!(sub.task_restarts.get(), 0.0);
373 }
374
375 #[test]
376 fn second_attempt_is_a_restart() {
377 let sub = new_subscriber();
378
379 sub.on_event(
380 &Event::new(EventKind::TaskStarting)
381 .with_task("t")
382 .with_attempt(2),
383 );
384
385 assert_eq!(sub.task_restarts.get(), 1.0);
386 }
387
388 #[test]
389 fn backoff_failure_increments_counter() {
390 let sub = new_subscriber();
391
392 sub.on_event(
393 &Event::new(EventKind::BackoffScheduled)
394 .with_task("t")
395 .with_delay(Duration::from_secs(5))
396 .with_backoff_failure(),
397 );
398
399 assert_eq!(
400 sub.task_backoff_count.with_label_values(&["failure"]).get(),
401 1.0
402 );
403 }
404
405 #[test]
406 fn backoff_success_increments_counter() {
407 let sub = new_subscriber();
408
409 sub.on_event(
410 &Event::new(EventKind::BackoffScheduled)
411 .with_task("t")
412 .with_delay(Duration::from_secs(10))
413 .with_backoff_success(),
414 );
415
416 assert_eq!(
417 sub.task_backoff_count.with_label_values(&["success"]).get(),
418 1.0
419 );
420 }
421
422 #[test]
423 fn timeout_hit_increments_counter() {
424 let sub = new_subscriber();
425
426 sub.on_event(
427 &Event::new(EventKind::TimeoutHit)
428 .with_task("t")
429 .with_timeout(Duration::from_secs(30)),
430 );
431
432 assert_eq!(sub.task_timeouts.get(), 1.0);
433 }
434
435 #[test]
436 fn actor_exhausted_increments_terminal() {
437 let sub = new_subscriber();
438
439 sub.on_event(
440 &Event::new(EventKind::ActorExhausted)
441 .with_task("t")
442 .with_reason("policy done"),
443 );
444
445 assert_eq!(
446 sub.task_terminal.with_label_values(&["exhausted"]).get(),
447 1.0
448 );
449 }
450
451 #[test]
452 fn actor_exhausted_observes_attempts_to_finalize() {
453 let sub = new_subscriber();
454
455 sub.on_event(
456 &Event::new(EventKind::ActorExhausted)
457 .with_task("t")
458 .with_attempt(3),
459 );
460
461 let h = sub.attempts_to_finalize.with_label_values(&["exhausted"]);
462 assert_eq!(h.get_sample_count(), 1);
463 assert_eq!(h.get_sample_sum(), 3.0);
464 }
465
466 #[test]
467 fn actor_dead_observes_attempts_to_finalize() {
468 let sub = new_subscriber();
469
470 sub.on_event(
471 &Event::new(EventKind::ActorDead)
472 .with_task("t")
473 .with_attempt(7)
474 .with_reason("fatal"),
475 );
476
477 let h = sub.attempts_to_finalize.with_label_values(&["fatal"]);
478 assert_eq!(h.get_sample_count(), 1);
479 assert_eq!(h.get_sample_sum(), 7.0);
480 }
481
482 #[test]
483 fn actor_exhausted_without_attempt_observes_one() {
484 let sub = new_subscriber();
485
486 sub.on_event(&Event::new(EventKind::ActorExhausted).with_task("t"));
487
488 let h = sub.attempts_to_finalize.with_label_values(&["exhausted"]);
489 assert_eq!(h.get_sample_count(), 1);
490 assert_eq!(h.get_sample_sum(), 1.0);
491 }
492
493 #[test]
494 fn actor_dead_increments_terminal() {
495 let sub = new_subscriber();
496
497 sub.on_event(
498 &Event::new(EventKind::ActorDead)
499 .with_task("t")
500 .with_reason("fatal error"),
501 );
502
503 assert_eq!(sub.task_terminal.with_label_values(&["fatal"]).get(), 1.0);
504 }
505
506 #[test]
507 fn in_flight_does_not_go_negative() {
508 let sub = new_subscriber();
509
510 sub.on_event(&Event::new(EventKind::TaskStopped).with_task("t"));
511
512 assert_eq!(sub.tasks_in_flight.get(), 0.0);
513 }
514
515 #[test]
516 fn subscriber_overflow_increments_counter() {
517 let sub = new_subscriber();
518
519 sub.on_event(
520 &Event::new(EventKind::SubscriberOverflow)
521 .with_task("t")
522 .with_reason("queue full"),
523 );
524
525 assert_eq!(sub.subscriber_overflow.get(), 1.0);
526 }
527
528 #[test]
529 fn subscriber_panicked_increments_counter() {
530 let sub = new_subscriber();
531
532 sub.on_event(
533 &Event::new(EventKind::SubscriberPanicked)
534 .with_task("t")
535 .with_reason("boom"),
536 );
537
538 assert_eq!(sub.subscriber_panicked.get(), 1.0);
539 }
540
541 #[test]
542 fn controller_submitted_increments_counter() {
543 let sub = new_subscriber();
544
545 sub.on_event(&Event::new(EventKind::ControllerSubmitted).with_task("t"));
546
547 assert_eq!(sub.controller_submissions.get(), 1.0);
548 }
549
550 #[test]
551 fn controller_rejected_without_reason_labels_as_unknown() {
552 let sub = new_subscriber();
553
554 sub.on_event(&Event::new(EventKind::ControllerRejected).with_task("t"));
555
556 assert_eq!(
557 sub.controller_rejections
558 .with_label_values(&["unknown"])
559 .get(),
560 1.0
561 );
562 }
563
564 #[test]
565 fn controller_rejected_slot_full_reason() {
566 let sub = new_subscriber();
567
568 sub.on_event(
569 &Event::new(EventKind::ControllerRejected)
570 .with_task("t")
571 .with_reason("slot full at capacity cap=1 depth=1 admission=Queue"),
572 );
573
574 assert_eq!(
575 sub.controller_rejections
576 .with_label_values(&["slot_full"])
577 .get(),
578 1.0
579 );
580 }
581
582 #[test]
583 fn controller_rejected_slot_busy_reason() {
584 let sub = new_subscriber();
585
586 sub.on_event(
587 &Event::new(EventKind::ControllerRejected)
588 .with_task("t")
589 .with_reason("dropped: slot busy (status=Running)"),
590 );
591
592 assert_eq!(
593 sub.controller_rejections
594 .with_label_values(&["slot_busy"])
595 .get(),
596 1.0
597 );
598 }
599
600 #[test]
601 fn classify_rejection_reason_recognizes_known_prefixes() {
602 assert_eq!(
603 classify_rejection_reason(Some("slot full at capacity cap=1 depth=1 admission=Queue")),
604 "slot_full"
605 );
606 assert_eq!(
607 classify_rejection_reason(Some("dropped: slot busy (status=Running)")),
608 "slot_busy"
609 );
610 assert_eq!(
611 classify_rejection_reason(Some("add_failed: something")),
612 "add_failed"
613 );
614 assert_eq!(
615 classify_rejection_reason(Some("remove_failed: boom")),
616 "remove_failed"
617 );
618 assert_eq!(
619 classify_rejection_reason(Some("queue_start_failed: oom")),
620 "queue_failed"
621 );
622 assert_eq!(
623 classify_rejection_reason(Some("recovery_start_failed: net")),
624 "recovery_failed"
625 );
626 assert_eq!(
627 classify_rejection_reason(Some("bus_lagged: missed 1 events, recovering slots")),
628 "bus_lagged"
629 );
630 assert_eq!(
631 classify_rejection_reason(Some("controller_loop_exited: channel closed")),
632 "controller_exited"
633 );
634 }
635
636 #[test]
637 fn classify_rejection_reason_none_is_unknown() {
638 assert_eq!(classify_rejection_reason(None), "unknown");
639 }
640
641 #[test]
642 fn classify_rejection_reason_unrecognized_is_other() {
643 assert_eq!(classify_rejection_reason(Some("something weird")), "other");
644 assert_eq!(classify_rejection_reason(Some("")), "other");
645 }
646
647 #[test]
648 fn queue_capacity_defaults_to_2048() {
649 let sub = new_subscriber();
650 assert_eq!(sub.queue_capacity(), DEFAULT_QUEUE_CAPACITY);
651 assert_eq!(sub.queue_capacity(), 2048);
652 }
653
654 #[test]
655 fn queue_capacity_is_overridable_via_constructor() {
656 let registry = Arc::new(Registry::new());
657 let sub = PrometheusSubscriber::with_queue_capacity(registry, 4096).unwrap();
658 assert_eq!(sub.queue_capacity(), 4096);
659 }
660
661 #[test]
662 fn shared_registry_with_backend() {
663 let registry = Arc::new(Registry::new());
664
665 let backend = crate::PrometheusMetrics::new(registry.clone()).unwrap();
666 let sub = PrometheusSubscriber::new(registry.clone()).unwrap();
667
668 backend.record_task_started(solti_runner::RunnerType::Subprocess);
669 sub.on_event(
670 &Event::new(EventKind::TaskStarting)
671 .with_task("t")
672 .with_attempt(1),
673 );
674
675 let text = metrics_text(®istry);
676 assert!(text.contains("solti_runner_tasks_started_total"));
677 assert!(text.contains("solti_sv_tasks_in_flight"));
678 }
679}