Skip to main content

solti_prometheus/
subscriber.rs

1//! # Supervision-level Prometheus metrics.
2//!
3//! [`PrometheusSubscriber`] implements [`Subscribe`] and translates [`taskvisor`] events into Prometheus counters, gauges, and histograms.
4//!
5//! See the [crate root](crate) for architecture and namespace overview.
6
7use std::sync::Arc;
8
9use prometheus::{Counter, CounterVec, Gauge, Histogram, HistogramVec, Registry};
10use taskvisor::{BackoffSource, Event, EventKind, Subscribe};
11
12use crate::register::{Sub, ms_to_secs};
13
14/// Default subscriber queue capacity.
15pub const DEFAULT_QUEUE_CAPACITY: usize = 2048;
16
17/// Prometheus subscriber for supervision-level metrics.
18///
19/// Implements [`Subscribe`] and captures metrics from the [`taskvisor`] event stream.
20/// Must share a [`Registry`] with [`crate::PrometheusMetrics`] for a unified `/metrics` endpoint.
21///
22/// ## Event → metric mapping
23///
24/// ```text
25/// TaskStarting        → tasks_in_flight.inc()
26///                       + task_restarts.inc()  (if attempt > 1)
27/// TaskStopped         → tasks_in_flight.dec()
28/// TaskFailed          → tasks_in_flight.dec()
29/// TimeoutHit          → task_timeouts.inc()
30/// BackoffScheduled    → task_backoff_count{source}.inc()
31///                       + task_backoff_duration.observe(delay)
32/// ActorExhausted      → task_terminal{reason="exhausted"}.inc()
33///                       + attempts_to_finalize{outcome="exhausted"}.observe(attempt)
34/// ActorDead           → task_terminal{reason="fatal"}.inc()
35///                       + attempts_to_finalize{outcome="fatal"}.observe(attempt)
36/// SubscriberOverflow  → subscriber_overflow.inc() + tracing::warn
37/// SubscriberPanicked  → subscriber_panicked.inc() + tracing::warn
38/// ControllerSubmitted → controller_submissions.inc()
39/// ControllerRejected  → controller_rejections{reason}.inc()  (reason classified from Event.reason)
40/// ```
41///
42/// ## Supervision metrics (`solti_sv_*`)
43///
44/// | Metric                                   | Type      | Labels   | Description                  |
45/// |------------------------------------------|-----------|----------|------------------------------|
46/// | `solti_sv_tasks_in_flight`               | Gauge     | -        | Currently executing tasks    |
47/// | `solti_sv_task_restarts_total`           | Counter   | -        | Restarts (attempt > 1)       |
48/// | `solti_sv_task_backoff_count_total`      | Counter   | `source` | Backoff events               |
49/// | `solti_sv_task_backoff_duration_seconds` | Histogram | -        | Backoff delay duration       |
50/// | `solti_sv_task_terminal_total`           | Counter   | `reason` | Terminal task states         |
51/// | `solti_sv_attempts_to_finalize`          | Histogram | `outcome`| Attempts when task left loop |
52/// | `solti_sv_task_timeouts_total`           | Counter   | -        | Timeout events               |
53/// | `solti_sv_subscriber_overflow_total`     | Counter   | -        | Queue overflow (lost events) |
54/// | `solti_sv_subscriber_panicked_total`     | Counter   | -        | Subscriber panics            |
55///
56/// ## Controller metrics (`solti_ctrl_*`)
57///
58/// | Metric                         | Type      | Labels   | Description                             |
59/// |--------------------------------|-----------|----------|-----------------------------------------|
60/// | `solti_ctrl_submissions_total` | Counter   | -        | Controller submissions                  |
61/// | `solti_ctrl_rejections_total`  | CounterVec| `reason` | Controller rejections grouped by cause  |
62///
63/// ## Labels
64///
65/// | Label    | Values                                                                                                                           | Source                                                       |
66/// |----------|----------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------|
67/// | `source` | `failure`, `success`                                                                                                             | [`BackoffSource`] on the event                               |
68/// | `reason` (terminal)   | `exhausted`, `fatal`                                                                                                | Terminal event kind                                          |
69/// | `outcome` (attempts)  | `exhausted`, `fatal`                                                                                                | Attempts-to-finalize histogram                               |
70/// | `reason` (rejection)  | `slot_full`, `slot_busy`, `add_failed`, `remove_failed`, `queue_failed`, `recovery_failed`, `bus_lagged`, `controller_exited`, `other`, `unknown` | Classified from `Event.reason` by `classify_rejection_reason` (private)         |
71///
72/// ## Notes
73///
74/// - `tasks_in_flight` gauge is guarded against going negative: a [`TaskStopped`](EventKind::TaskStopped) without a preceding [`TaskStarting`](EventKind::TaskStarting) is a no-op on the gauge.
75/// - [`queue_capacity`](Subscribe::queue_capacity) defaults to [`DEFAULT_QUEUE_CAPACITY`].
76/// - Backoff duration is converted from milliseconds to seconds before observation.
77///
78/// ## Also
79///
80/// - [`PrometheusMetrics`](crate::PrometheusMetrics) is a runner-level metrics, complementary to this subscriber.
81/// - [`Event`](taskvisor::Event) and [`EventKind`](taskvisor::EventKind): event structure and classification.
82pub struct PrometheusSubscriber {
83    tasks_in_flight: Gauge,
84    task_restarts: Counter,
85    task_backoff_count: CounterVec,
86    task_backoff_duration: Histogram,
87    task_terminal: CounterVec,
88    attempts_to_finalize: HistogramVec,
89    task_timeouts: Counter,
90    subscriber_overflow: Counter,
91    subscriber_panicked: Counter,
92    controller_submissions: Counter,
93    controller_rejections: CounterVec,
94    queue_capacity: usize,
95}
96
97/// Map a free-form `ControllerRejected` reason string to a bounded, low-cardinality metric label.
98///
99/// The raw `reason` on [`taskvisor::Event`] can embed error messages, queue depths, and other unbounded content, which would explode Prometheus cardinality if used directly as a label.
100/// This classifier collapses known prefixes produced by taskvisor's controller into a small set:
101///
102/// [`slot_full`, `slot_busy`, `add_failed`, `remove_failed`, `queue_failed`, `recovery_failed`, `bus_lagged`, `controller_exited`, `other`, `unknown` ].
103fn classify_rejection_reason(reason: Option<&str>) -> &'static str {
104    let Some(r) = reason else {
105        return "unknown";
106    };
107    if r.starts_with("slot full") {
108        "slot_full"
109    } else if r.starts_with("dropped: slot busy") {
110        "slot_busy"
111    } else if r.starts_with("add_failed") {
112        "add_failed"
113    } else if r.starts_with("remove_failed") {
114        "remove_failed"
115    } else if r.starts_with("queue_start_failed") {
116        "queue_failed"
117    } else if r.starts_with("recovery_start_failed") {
118        "recovery_failed"
119    } else if r.starts_with("bus_lagged") {
120        "bus_lagged"
121    } else if r.starts_with("controller_loop_exited") {
122        "controller_exited"
123    } else {
124        "other"
125    }
126}
127
128impl PrometheusSubscriber {
129    /// Create a new subscriber with the default event-bus queue capacity ([`DEFAULT_QUEUE_CAPACITY`]).
130    pub fn new(registry: Arc<Registry>) -> Result<Self, prometheus::Error> {
131        Self::with_queue_capacity(registry, DEFAULT_QUEUE_CAPACITY)
132    }
133
134    /// Create a new subscriber with a specific event-bus queue capacity.
135    pub fn with_queue_capacity(
136        registry: Arc<Registry>,
137        queue_capacity: usize,
138    ) -> Result<Self, prometheus::Error> {
139        let sv = Sub::new(&registry, "sv");
140        let ctrl = Sub::new(&registry, "ctrl");
141
142        let tasks_in_flight = sv.gauge("tasks_in_flight", "Number of tasks currently executing")?;
143        let task_restarts =
144            sv.counter("task_restarts_total", "Total task restarts (attempt > 1)")?;
145        let task_backoff_count = sv.counter_vec(
146            "task_backoff_count_total",
147            "Total backoff events",
148            &["source"],
149        )?;
150        let task_backoff_duration = sv.histogram(
151            "task_backoff_duration_seconds",
152            "Backoff delay duration in seconds",
153            vec![
154                0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0, 1800.0,
155                3600.0,
156            ],
157        )?;
158        let task_terminal = sv.counter_vec(
159            "task_terminal_total",
160            "Total terminal task states",
161            &["reason"],
162        )?;
163        let attempts_to_finalize = sv.histogram_vec(
164            "attempts_to_finalize",
165            "Number of attempts observed when a task leaves the supervision loop",
166            vec![1.0, 2.0, 3.0, 5.0, 10.0, 20.0, 50.0, 100.0],
167            &["outcome"],
168        )?;
169        let task_timeouts = sv.counter("task_timeouts_total", "Total task timeout events")?;
170        let subscriber_overflow = sv.counter(
171            "subscriber_overflow_total",
172            "Total subscriber queue overflow events (events lost)",
173        )?;
174        let subscriber_panicked =
175            sv.counter("subscriber_panicked_total", "Total subscriber panic events")?;
176
177        let controller_submissions =
178            ctrl.counter("submissions_total", "Total controller submissions")?;
179        let controller_rejections = ctrl.counter_vec(
180            "rejections_total",
181            "Total controller rejections grouped by cause",
182            &["reason"],
183        )?;
184
185        Ok(Self {
186            tasks_in_flight,
187            task_restarts,
188            task_backoff_count,
189            task_backoff_duration,
190            task_terminal,
191            attempts_to_finalize,
192            task_timeouts,
193            subscriber_overflow,
194            subscriber_panicked,
195            controller_submissions,
196            controller_rejections,
197            queue_capacity,
198        })
199    }
200}
201
202impl std::fmt::Debug for PrometheusSubscriber {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.debug_struct("PrometheusSubscriber").finish()
205    }
206}
207
208impl Subscribe for PrometheusSubscriber {
209    /// Translates a [`taskvisor`] event into prometheus metric updates.
210    fn on_event(&self, event: &Event) {
211        match event.kind {
212            EventKind::TaskStarting => {
213                self.tasks_in_flight.inc();
214                if event.attempt.unwrap_or(1) > 1 {
215                    self.task_restarts.inc();
216                }
217            }
218            EventKind::TaskStopped | EventKind::TaskFailed => {
219                if self.tasks_in_flight.get() > 0.0 {
220                    self.tasks_in_flight.dec();
221                }
222            }
223            EventKind::TimeoutHit => {
224                self.task_timeouts.inc();
225            }
226            EventKind::SubscriberOverflow => {
227                tracing::warn!(
228                    task = event.task.as_deref().unwrap_or("unknown"),
229                    "subscriber queue overflow: events are being dropped"
230                );
231                self.subscriber_overflow.inc();
232            }
233            EventKind::SubscriberPanicked => {
234                tracing::warn!(
235                    task = event.task.as_deref().unwrap_or("unknown"),
236                    reason = event.reason.as_deref().unwrap_or("unknown"),
237                    "subscriber panicked while processing an event"
238                );
239                self.subscriber_panicked.inc();
240            }
241            EventKind::BackoffScheduled => {
242                let source = match event.backoff_source {
243                    Some(BackoffSource::Failure) => "failure",
244                    Some(BackoffSource::Success) => "success",
245                    None => "unknown",
246                };
247                self.task_backoff_count.with_label_values(&[source]).inc();
248
249                if let Some(delay_ms) = event.delay_ms {
250                    self.task_backoff_duration
251                        .observe(ms_to_secs(delay_ms.into()));
252                }
253            }
254            EventKind::ActorExhausted => {
255                self.task_terminal.with_label_values(&["exhausted"]).inc();
256                self.attempts_to_finalize
257                    .with_label_values(&["exhausted"])
258                    .observe(f64::from(event.attempt.unwrap_or(1)));
259            }
260            EventKind::ActorDead => {
261                self.task_terminal.with_label_values(&["fatal"]).inc();
262                self.attempts_to_finalize
263                    .with_label_values(&["fatal"])
264                    .observe(f64::from(event.attempt.unwrap_or(1)));
265            }
266            EventKind::ControllerSubmitted => {
267                self.controller_submissions.inc();
268            }
269            EventKind::ControllerRejected => {
270                let reason = classify_rejection_reason(event.reason.as_deref());
271                self.controller_rejections
272                    .with_label_values(&[reason])
273                    .inc();
274            }
275            EventKind::TaskAdded
276            | EventKind::TaskRemoved
277            | EventKind::TaskAddRequested
278            | EventKind::TaskRemoveRequested
279            | EventKind::ShutdownRequested
280            | EventKind::AllStoppedWithinGrace
281            | EventKind::GraceExceeded
282            | EventKind::ControllerSlotTransition => {}
283        }
284    }
285
286    /// Returns `"prometheus"`
287    fn name(&self) -> &'static str {
288        "prometheus"
289    }
290
291    /// Returns the per-subscriber queue capacity configured via [`PrometheusSubscriber::new`] or [`PrometheusSubscriber::with_queue_capacity`].
292    fn queue_capacity(&self) -> usize {
293        self.queue_capacity
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use prometheus::Encoder;
301    use solti_runner::MetricsBackend;
302    use std::time::Duration;
303
304    fn new_subscriber() -> PrometheusSubscriber {
305        let registry = Arc::new(Registry::new());
306        PrometheusSubscriber::new(registry).unwrap()
307    }
308
309    fn metrics_text(registry: &Registry) -> String {
310        let encoder = prometheus::TextEncoder::new();
311        let families = registry.gather();
312        let mut buf = Vec::new();
313        encoder.encode(&families, &mut buf).unwrap();
314        String::from_utf8(buf).unwrap()
315    }
316
317    #[test]
318    fn task_starting_increments_in_flight() {
319        let sub = new_subscriber();
320
321        sub.on_event(
322            &Event::new(EventKind::TaskStarting)
323                .with_task("t")
324                .with_attempt(1),
325        );
326
327        assert_eq!(sub.tasks_in_flight.get(), 1.0);
328    }
329
330    #[test]
331    fn task_stopped_decrements_in_flight() {
332        let sub = new_subscriber();
333
334        sub.on_event(
335            &Event::new(EventKind::TaskStarting)
336                .with_task("t")
337                .with_attempt(1),
338        );
339        sub.on_event(&Event::new(EventKind::TaskStopped).with_task("t"));
340
341        assert_eq!(sub.tasks_in_flight.get(), 0.0);
342    }
343
344    #[test]
345    fn task_failed_decrements_in_flight() {
346        let sub = new_subscriber();
347
348        sub.on_event(
349            &Event::new(EventKind::TaskStarting)
350                .with_task("t")
351                .with_attempt(1),
352        );
353        sub.on_event(
354            &Event::new(EventKind::TaskFailed)
355                .with_task("t")
356                .with_reason("boom"),
357        );
358
359        assert_eq!(sub.tasks_in_flight.get(), 0.0);
360    }
361
362    #[test]
363    fn first_attempt_is_not_a_restart() {
364        let sub = new_subscriber();
365
366        sub.on_event(
367            &Event::new(EventKind::TaskStarting)
368                .with_task("t")
369                .with_attempt(1),
370        );
371
372        assert_eq!(sub.task_restarts.get(), 0.0);
373    }
374
375    #[test]
376    fn second_attempt_is_a_restart() {
377        let sub = new_subscriber();
378
379        sub.on_event(
380            &Event::new(EventKind::TaskStarting)
381                .with_task("t")
382                .with_attempt(2),
383        );
384
385        assert_eq!(sub.task_restarts.get(), 1.0);
386    }
387
388    #[test]
389    fn backoff_failure_increments_counter() {
390        let sub = new_subscriber();
391
392        sub.on_event(
393            &Event::new(EventKind::BackoffScheduled)
394                .with_task("t")
395                .with_delay(Duration::from_secs(5))
396                .with_backoff_failure(),
397        );
398
399        assert_eq!(
400            sub.task_backoff_count.with_label_values(&["failure"]).get(),
401            1.0
402        );
403    }
404
405    #[test]
406    fn backoff_success_increments_counter() {
407        let sub = new_subscriber();
408
409        sub.on_event(
410            &Event::new(EventKind::BackoffScheduled)
411                .with_task("t")
412                .with_delay(Duration::from_secs(10))
413                .with_backoff_success(),
414        );
415
416        assert_eq!(
417            sub.task_backoff_count.with_label_values(&["success"]).get(),
418            1.0
419        );
420    }
421
422    #[test]
423    fn timeout_hit_increments_counter() {
424        let sub = new_subscriber();
425
426        sub.on_event(
427            &Event::new(EventKind::TimeoutHit)
428                .with_task("t")
429                .with_timeout(Duration::from_secs(30)),
430        );
431
432        assert_eq!(sub.task_timeouts.get(), 1.0);
433    }
434
435    #[test]
436    fn actor_exhausted_increments_terminal() {
437        let sub = new_subscriber();
438
439        sub.on_event(
440            &Event::new(EventKind::ActorExhausted)
441                .with_task("t")
442                .with_reason("policy done"),
443        );
444
445        assert_eq!(
446            sub.task_terminal.with_label_values(&["exhausted"]).get(),
447            1.0
448        );
449    }
450
451    #[test]
452    fn actor_exhausted_observes_attempts_to_finalize() {
453        let sub = new_subscriber();
454
455        sub.on_event(
456            &Event::new(EventKind::ActorExhausted)
457                .with_task("t")
458                .with_attempt(3),
459        );
460
461        let h = sub.attempts_to_finalize.with_label_values(&["exhausted"]);
462        assert_eq!(h.get_sample_count(), 1);
463        assert_eq!(h.get_sample_sum(), 3.0);
464    }
465
466    #[test]
467    fn actor_dead_observes_attempts_to_finalize() {
468        let sub = new_subscriber();
469
470        sub.on_event(
471            &Event::new(EventKind::ActorDead)
472                .with_task("t")
473                .with_attempt(7)
474                .with_reason("fatal"),
475        );
476
477        let h = sub.attempts_to_finalize.with_label_values(&["fatal"]);
478        assert_eq!(h.get_sample_count(), 1);
479        assert_eq!(h.get_sample_sum(), 7.0);
480    }
481
482    #[test]
483    fn actor_exhausted_without_attempt_observes_one() {
484        let sub = new_subscriber();
485
486        sub.on_event(&Event::new(EventKind::ActorExhausted).with_task("t"));
487
488        let h = sub.attempts_to_finalize.with_label_values(&["exhausted"]);
489        assert_eq!(h.get_sample_count(), 1);
490        assert_eq!(h.get_sample_sum(), 1.0);
491    }
492
493    #[test]
494    fn actor_dead_increments_terminal() {
495        let sub = new_subscriber();
496
497        sub.on_event(
498            &Event::new(EventKind::ActorDead)
499                .with_task("t")
500                .with_reason("fatal error"),
501        );
502
503        assert_eq!(sub.task_terminal.with_label_values(&["fatal"]).get(), 1.0);
504    }
505
506    #[test]
507    fn in_flight_does_not_go_negative() {
508        let sub = new_subscriber();
509
510        sub.on_event(&Event::new(EventKind::TaskStopped).with_task("t"));
511
512        assert_eq!(sub.tasks_in_flight.get(), 0.0);
513    }
514
515    #[test]
516    fn subscriber_overflow_increments_counter() {
517        let sub = new_subscriber();
518
519        sub.on_event(
520            &Event::new(EventKind::SubscriberOverflow)
521                .with_task("t")
522                .with_reason("queue full"),
523        );
524
525        assert_eq!(sub.subscriber_overflow.get(), 1.0);
526    }
527
528    #[test]
529    fn subscriber_panicked_increments_counter() {
530        let sub = new_subscriber();
531
532        sub.on_event(
533            &Event::new(EventKind::SubscriberPanicked)
534                .with_task("t")
535                .with_reason("boom"),
536        );
537
538        assert_eq!(sub.subscriber_panicked.get(), 1.0);
539    }
540
541    #[test]
542    fn controller_submitted_increments_counter() {
543        let sub = new_subscriber();
544
545        sub.on_event(&Event::new(EventKind::ControllerSubmitted).with_task("t"));
546
547        assert_eq!(sub.controller_submissions.get(), 1.0);
548    }
549
550    #[test]
551    fn controller_rejected_without_reason_labels_as_unknown() {
552        let sub = new_subscriber();
553
554        sub.on_event(&Event::new(EventKind::ControllerRejected).with_task("t"));
555
556        assert_eq!(
557            sub.controller_rejections
558                .with_label_values(&["unknown"])
559                .get(),
560            1.0
561        );
562    }
563
564    #[test]
565    fn controller_rejected_slot_full_reason() {
566        let sub = new_subscriber();
567
568        sub.on_event(
569            &Event::new(EventKind::ControllerRejected)
570                .with_task("t")
571                .with_reason("slot full at capacity cap=1 depth=1 admission=Queue"),
572        );
573
574        assert_eq!(
575            sub.controller_rejections
576                .with_label_values(&["slot_full"])
577                .get(),
578            1.0
579        );
580    }
581
582    #[test]
583    fn controller_rejected_slot_busy_reason() {
584        let sub = new_subscriber();
585
586        sub.on_event(
587            &Event::new(EventKind::ControllerRejected)
588                .with_task("t")
589                .with_reason("dropped: slot busy (status=Running)"),
590        );
591
592        assert_eq!(
593            sub.controller_rejections
594                .with_label_values(&["slot_busy"])
595                .get(),
596            1.0
597        );
598    }
599
600    #[test]
601    fn classify_rejection_reason_recognizes_known_prefixes() {
602        assert_eq!(
603            classify_rejection_reason(Some("slot full at capacity cap=1 depth=1 admission=Queue")),
604            "slot_full"
605        );
606        assert_eq!(
607            classify_rejection_reason(Some("dropped: slot busy (status=Running)")),
608            "slot_busy"
609        );
610        assert_eq!(
611            classify_rejection_reason(Some("add_failed: something")),
612            "add_failed"
613        );
614        assert_eq!(
615            classify_rejection_reason(Some("remove_failed: boom")),
616            "remove_failed"
617        );
618        assert_eq!(
619            classify_rejection_reason(Some("queue_start_failed: oom")),
620            "queue_failed"
621        );
622        assert_eq!(
623            classify_rejection_reason(Some("recovery_start_failed: net")),
624            "recovery_failed"
625        );
626        assert_eq!(
627            classify_rejection_reason(Some("bus_lagged: missed 1 events, recovering slots")),
628            "bus_lagged"
629        );
630        assert_eq!(
631            classify_rejection_reason(Some("controller_loop_exited: channel closed")),
632            "controller_exited"
633        );
634    }
635
636    #[test]
637    fn classify_rejection_reason_none_is_unknown() {
638        assert_eq!(classify_rejection_reason(None), "unknown");
639    }
640
641    #[test]
642    fn classify_rejection_reason_unrecognized_is_other() {
643        assert_eq!(classify_rejection_reason(Some("something weird")), "other");
644        assert_eq!(classify_rejection_reason(Some("")), "other");
645    }
646
647    #[test]
648    fn queue_capacity_defaults_to_2048() {
649        let sub = new_subscriber();
650        assert_eq!(sub.queue_capacity(), DEFAULT_QUEUE_CAPACITY);
651        assert_eq!(sub.queue_capacity(), 2048);
652    }
653
654    #[test]
655    fn queue_capacity_is_overridable_via_constructor() {
656        let registry = Arc::new(Registry::new());
657        let sub = PrometheusSubscriber::with_queue_capacity(registry, 4096).unwrap();
658        assert_eq!(sub.queue_capacity(), 4096);
659    }
660
661    #[test]
662    fn shared_registry_with_backend() {
663        let registry = Arc::new(Registry::new());
664
665        let backend = crate::PrometheusMetrics::new(registry.clone()).unwrap();
666        let sub = PrometheusSubscriber::new(registry.clone()).unwrap();
667
668        backend.record_task_started(solti_runner::RunnerType::Subprocess);
669        sub.on_event(
670            &Event::new(EventKind::TaskStarting)
671                .with_task("t")
672                .with_attempt(1),
673        );
674
675        let text = metrics_text(&registry);
676        assert!(text.contains("solti_runner_tasks_started_total"));
677        assert!(text.contains("solti_sv_tasks_in_flight"));
678    }
679}