use crate::bus::StreamEvent;
use crate::observe::Observer;
use crate::{MediaFrame, StreamKey};
use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
pub struct PrometheusObserver {
registry: Registry,
active_streams: IntGaugeVec,
frames_total: IntCounterVec,
publishers_total: IntGauge,
subscriber_lagged_total: IntCounterVec,
}
impl PrometheusObserver {
pub fn new() -> prometheus::Result<Self> {
Self::with_registry(Registry::new())
}
pub fn with_registry(registry: Registry) -> prometheus::Result<Self> {
let active_streams = IntGaugeVec::new(
Opts::new("arcly_active_streams", "Currently publishing streams"),
&["app"],
)?;
let frames_total = IntCounterVec::new(
Opts::new("arcly_frames_total", "Total media frames published"),
&["app"],
)?;
let publishers_total = IntGauge::new(
"arcly_publishers_total",
"Total active publishers across all applications",
)?;
let subscriber_lagged_total = IntCounterVec::new(
Opts::new(
"arcly_subscriber_lagged_total",
"Total frames dropped from slow subscribers' views due to broadcast lag",
),
&["app"],
)?;
registry.register(Box::new(active_streams.clone()))?;
registry.register(Box::new(frames_total.clone()))?;
registry.register(Box::new(publishers_total.clone()))?;
registry.register(Box::new(subscriber_lagged_total.clone()))?;
Ok(Self {
registry,
active_streams,
frames_total,
publishers_total,
subscriber_lagged_total,
})
}
pub fn registry(&self) -> &Registry {
&self.registry
}
}
impl Observer for PrometheusObserver {
fn on_publish_started(&self, app: &str) {
self.active_streams.with_label_values(&[app]).inc();
self.publishers_total.inc();
}
fn on_publish_ended(&self, app: &str) {
self.active_streams.with_label_values(&[app]).dec();
self.publishers_total.dec();
}
fn on_frame(&self, key: &StreamKey, _frame: &MediaFrame) {
self.frames_total
.with_label_values(&[key.app.as_str()])
.inc();
}
fn on_subscriber_lagged(&self, key: &StreamKey, skipped: u64) {
self.subscriber_lagged_total
.with_label_values(&[key.app.as_str()])
.inc_by(skipped);
}
fn on_event(&self, _event: &StreamEvent) {}
}