Skip to main content

arcly_stream/
observability.rs

1//! A Prometheus implementation of the [`Observer`] trait.
2//!
3//! Gated behind `metrics`. This is the *adapter* that re-creates
4//! `stream-center`'s old behaviour — but as an injected observer the host opts
5//! into, not a global singleton the engine reaches for. The engine core never
6//! depends on `prometheus`.
7
8use crate::bus::StreamEvent;
9use crate::observe::Observer;
10use crate::{MediaFrame, StreamKey};
11use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
12
13/// Prometheus-backed [`Observer`]. Construct it, register it on the engine
14/// builder, and expose its [`registry`](Self::registry) on your `/metrics`
15/// endpoint.
16///
17/// ```
18/// use arcly_stream::observability::PrometheusObserver;
19/// use arcly_stream::prelude::*;
20///
21/// let metrics = PrometheusObserver::new().expect("register metrics");
22/// let registry = metrics.registry().clone();
23/// let engine = Engine::builder()
24///     .application(AppSpec::new("live"))
25///     .observer(metrics)
26///     .build();
27/// assert!(!registry.gather().is_empty());
28/// # let _ = engine;
29/// ```
30pub struct PrometheusObserver {
31    registry: Registry,
32    active_streams: IntGaugeVec,
33    frames_total: IntCounterVec,
34    publishers_total: IntGauge,
35    subscriber_lagged_total: IntCounterVec,
36}
37
38impl PrometheusObserver {
39    /// Create an observer with a fresh registry.
40    pub fn new() -> prometheus::Result<Self> {
41        Self::with_registry(Registry::new())
42    }
43
44    /// Create an observer registering into an existing `registry` (e.g. the
45    /// process-wide default).
46    pub fn with_registry(registry: Registry) -> prometheus::Result<Self> {
47        let active_streams = IntGaugeVec::new(
48            Opts::new("arcly_active_streams", "Currently publishing streams"),
49            &["app"],
50        )?;
51        let frames_total = IntCounterVec::new(
52            Opts::new("arcly_frames_total", "Total media frames published"),
53            &["app"],
54        )?;
55        let publishers_total = IntGauge::new(
56            "arcly_publishers_total",
57            "Total active publishers across all applications",
58        )?;
59        let subscriber_lagged_total = IntCounterVec::new(
60            Opts::new(
61                "arcly_subscriber_lagged_total",
62                "Total frames dropped from slow subscribers' views due to broadcast lag",
63            ),
64            &["app"],
65        )?;
66        registry.register(Box::new(active_streams.clone()))?;
67        registry.register(Box::new(frames_total.clone()))?;
68        registry.register(Box::new(publishers_total.clone()))?;
69        registry.register(Box::new(subscriber_lagged_total.clone()))?;
70        Ok(Self {
71            registry,
72            active_streams,
73            frames_total,
74            publishers_total,
75            subscriber_lagged_total,
76        })
77    }
78
79    /// The registry to gather and expose on `/metrics`.
80    pub fn registry(&self) -> &Registry {
81        &self.registry
82    }
83}
84
85impl Observer for PrometheusObserver {
86    fn on_publish_started(&self, app: &str) {
87        self.active_streams.with_label_values(&[app]).inc();
88        self.publishers_total.inc();
89    }
90
91    fn on_publish_ended(&self, app: &str) {
92        self.active_streams.with_label_values(&[app]).dec();
93        self.publishers_total.dec();
94    }
95
96    fn on_frame(&self, key: &StreamKey, _frame: &MediaFrame) {
97        self.frames_total
98            .with_label_values(&[key.app.as_str()])
99            .inc();
100    }
101
102    fn on_subscriber_lagged(&self, key: &StreamKey, skipped: u64) {
103        self.subscriber_lagged_total
104            .with_label_values(&[key.app.as_str()])
105            .inc_by(skipped);
106    }
107
108    fn on_event(&self, _event: &StreamEvent) {}
109}