arcly_stream/
observability.rs1use crate::bus::StreamEvent;
9use crate::observe::Observer;
10use crate::{MediaFrame, StreamKey};
11use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry};
12
13pub struct PrometheusObserver {
31 registry: Registry,
32 active_streams: IntGaugeVec,
33 frames_total: IntCounterVec,
34 publishers_total: IntGauge,
35 subscriber_lagged_total: IntCounterVec,
36}
37
38impl PrometheusObserver {
39 pub fn new() -> prometheus::Result<Self> {
41 Self::with_registry(Registry::new())
42 }
43
44 pub fn with_registry(registry: Registry) -> prometheus::Result<Self> {
47 let active_streams = IntGaugeVec::new(
48 Opts::new("arcly_active_streams", "Currently publishing streams"),
49 &["app"],
50 )?;
51 let frames_total = IntCounterVec::new(
52 Opts::new("arcly_frames_total", "Total media frames published"),
53 &["app"],
54 )?;
55 let publishers_total = IntGauge::new(
56 "arcly_publishers_total",
57 "Total active publishers across all applications",
58 )?;
59 let subscriber_lagged_total = IntCounterVec::new(
60 Opts::new(
61 "arcly_subscriber_lagged_total",
62 "Total frames dropped from slow subscribers' views due to broadcast lag",
63 ),
64 &["app"],
65 )?;
66 registry.register(Box::new(active_streams.clone()))?;
67 registry.register(Box::new(frames_total.clone()))?;
68 registry.register(Box::new(publishers_total.clone()))?;
69 registry.register(Box::new(subscriber_lagged_total.clone()))?;
70 Ok(Self {
71 registry,
72 active_streams,
73 frames_total,
74 publishers_total,
75 subscriber_lagged_total,
76 })
77 }
78
79 pub fn registry(&self) -> &Registry {
81 &self.registry
82 }
83}
84
85impl Observer for PrometheusObserver {
86 fn on_publish_started(&self, app: &str) {
87 self.active_streams.with_label_values(&[app]).inc();
88 self.publishers_total.inc();
89 }
90
91 fn on_publish_ended(&self, app: &str) {
92 self.active_streams.with_label_values(&[app]).dec();
93 self.publishers_total.dec();
94 }
95
96 fn on_frame(&self, key: &StreamKey, _frame: &MediaFrame) {
97 self.frames_total
98 .with_label_values(&[key.app.as_str()])
99 .inc();
100 }
101
102 fn on_subscriber_lagged(&self, key: &StreamKey, skipped: u64) {
103 self.subscriber_lagged_total
104 .with_label_values(&[key.app.as_str()])
105 .inc_by(skipped);
106 }
107
108 fn on_event(&self, _event: &StreamEvent) {}
109}