arcly-stream 0.1.0

A high-performance live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, pluggable HLS/recording, and trait-driven protocol/storage/auth/observer extension points — runtime, config, and metrics free.
Documentation
//! A Prometheus implementation of the [`Observer`] trait.
//!
//! Gated behind `metrics`. This is the *adapter* that re-creates
//! `stream-center`'s old behaviour — but as an injected observer the host opts
//! into, not a global singleton the engine reaches for. The engine core never
//! depends on `prometheus`.

use crate::bus::StreamEvent;
use crate::observe::Observer;
use crate::{MediaFrame, StreamKey};
use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};

/// Prometheus-backed [`Observer`]. Construct it, register it on the engine
/// builder, and expose its [`registry`](Self::registry) on your `/metrics`
/// endpoint.
///
/// ```
/// use arcly_stream::observability::PrometheusObserver;
/// use arcly_stream::prelude::*;
///
/// let metrics = PrometheusObserver::new().expect("register metrics");
/// let registry = metrics.registry().clone();
/// let engine = Engine::builder()
///     .application(AppSpec::new("live"))
///     .observer(metrics)
///     .build();
/// assert!(!registry.gather().is_empty());
/// # let _ = engine;
/// ```
pub struct PrometheusObserver {
    registry: Registry,
    active_streams: IntGaugeVec,
    frames_total: IntCounterVec,
    publishers_total: IntGauge,
    subscriber_lagged_total: IntCounterVec,
}

impl PrometheusObserver {
    /// Create an observer with a fresh registry.
    pub fn new() -> prometheus::Result<Self> {
        Self::with_registry(Registry::new())
    }

    /// Create an observer registering into an existing `registry` (e.g. the
    /// process-wide default).
    pub fn with_registry(registry: Registry) -> prometheus::Result<Self> {
        let active_streams = IntGaugeVec::new(
            Opts::new("arcly_active_streams", "Currently publishing streams"),
            &["app"],
        )?;
        let frames_total = IntCounterVec::new(
            Opts::new("arcly_frames_total", "Total media frames published"),
            &["app"],
        )?;
        let publishers_total = IntGauge::new(
            "arcly_publishers_total",
            "Total active publishers across all applications",
        )?;
        let subscriber_lagged_total = IntCounterVec::new(
            Opts::new(
                "arcly_subscriber_lagged_total",
                "Total frames dropped from slow subscribers' views due to broadcast lag",
            ),
            &["app"],
        )?;
        registry.register(Box::new(active_streams.clone()))?;
        registry.register(Box::new(frames_total.clone()))?;
        registry.register(Box::new(publishers_total.clone()))?;
        registry.register(Box::new(subscriber_lagged_total.clone()))?;
        Ok(Self {
            registry,
            active_streams,
            frames_total,
            publishers_total,
            subscriber_lagged_total,
        })
    }

    /// The registry to gather and expose on `/metrics`.
    pub fn registry(&self) -> &Registry {
        &self.registry
    }
}

impl Observer for PrometheusObserver {
    fn on_publish_started(&self, app: &str) {
        self.active_streams.with_label_values(&[app]).inc();
        self.publishers_total.inc();
    }

    fn on_publish_ended(&self, app: &str) {
        self.active_streams.with_label_values(&[app]).dec();
        self.publishers_total.dec();
    }

    fn on_frame(&self, key: &StreamKey, _frame: &MediaFrame) {
        self.frames_total
            .with_label_values(&[key.app.as_str()])
            .inc();
    }

    fn on_subscriber_lagged(&self, key: &StreamKey, skipped: u64) {
        self.subscriber_lagged_total
            .with_label_values(&[key.app.as_str()])
            .inc_by(skipped);
    }

    fn on_event(&self, _event: &StreamEvent) {}
}