Skip to main content

arcly_stream/
observe.rs

1//! Telemetry hook — the injected replacement for `stream-center`'s global
2//! Prometheus singleton (`sc_metrics::Metrics::global()`).
3//!
4//! The engine calls these methods on lifecycle transitions and (opt-in) per
5//! frame. The **default no-op** means the pure engine has zero observability
6//! dependencies; wire a real sink only when you want one.
7
8use crate::bus::StreamEvent;
9use crate::{MediaFrame, StreamKey};
10
11/// Telemetry observer. All methods default to no-ops so implementors override
12/// only what they care about. Compare `arcly-http`'s `AuditSink` / `HealthCheck`
13/// injection points: behaviour is supplied by the host, never reached for
14/// through global state.
15pub trait Observer: Send + Sync + 'static {
16    /// A stream lifecycle event was emitted.
17    fn on_event(&self, _event: &StreamEvent) {}
18
19    /// A publish session started in application `app`.
20    fn on_publish_started(&self, _app: &str) {}
21
22    /// A publish session ended in application `app`.
23    fn on_publish_ended(&self, _app: &str) {}
24
25    /// A frame was published. **Hot-path hook — keep it cheap.** The default
26    /// is a no-op so the branch is trivially predicted away when no observer
27    /// is installed.
28    fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {}
29
30    /// A subscriber fell behind the broadcast buffer and `skipped` frames were
31    /// dropped from its view before it could read them. This is the canonical
32    /// slow-consumer signal: alert on it. Fired by
33    /// [`StreamHandle::subscribe_resilient`](crate::StreamHandle::subscribe_resilient).
34    fn on_subscriber_lagged(&self, _key: &StreamKey, _skipped: u64) {}
35
36    /// A chronically slow subscriber crossed its `max_lag` budget and was shed.
37    /// Fired by [`Subscription::recv`](crate::Subscription::recv).
38    fn on_subscriber_evicted(&self, _key: &StreamKey) {}
39
40    /// An ingress rate limit was exceeded for `key`; the protocol handler should
41    /// drop or backpressure the connection. Handlers call this from their
42    /// ingest loop (e.g. when [`IngestRateLimit`](crate::protocol::IngestRateLimit)
43    /// returns `false`).
44    fn on_rate_limited(&self, _key: &StreamKey) {}
45
46    /// A publish session was reaped for exceeding the engine's idle timeout.
47    fn on_stream_reaped(&self, _key: &StreamKey) {}
48
49    /// The GOP replay buffer filled before the next keyframe and began dropping
50    /// frames, so late joiners receive a *truncated* GOP (keyframe + a prefix of
51    /// its deltas). Fired **once per affected GOP**, not per dropped frame.
52    ///
53    /// This is the canonical "GOP cache too small for this stream" signal: it
54    /// means [`AppSpec::gop_cache`](crate::AppSpec::gop_cache) is undersized for
55    /// the stream's keyframe interval × frame rate. Raise the capacity to restore
56    /// full instant-start replay.
57    fn on_gop_truncated(&self, _key: &StreamKey, _capacity: usize) {}
58}
59
60/// The default observer. Selected automatically when the builder gets none.
61#[derive(Debug, Default, Clone, Copy)]
62pub struct NoopObserver;
63
64impl Observer for NoopObserver {}
65
66/// A ready-to-use [`Observer`] that emits **structured `tracing` events** for
67/// every stream lifecycle transition — turnkey operational telemetry with no
68/// boilerplate and no extra dependency.
69///
70/// Each hook logs under the `arcly_stream::telemetry` target with structured
71/// fields (`app`, `stream`, `skipped`), so a `tracing-subscriber` can render it
72/// to stdout, JSON, or a rolling file appender. Routine transitions log at
73/// `INFO`; slow-consumer/rate-limit/reap signals log at `WARN` so they surface
74/// in alerting. The per-frame hook is intentionally **not** logged — it is
75/// far too high-cardinality for the hot path; use the
76/// [`PrometheusObserver`](crate::observability::PrometheusObserver) (`metrics`
77/// feature) for frame counters.
78///
79/// ```
80/// use arcly_stream::observe::StandardTelemetry;
81/// use arcly_stream::prelude::*;
82///
83/// let engine = Engine::builder()
84///     .application(AppSpec::new("live"))
85///     .observer(StandardTelemetry::new())
86///     .build();
87/// # let _ = engine;
88/// ```
89#[derive(Debug, Default, Clone, Copy)]
90pub struct StandardTelemetry;
91
92impl StandardTelemetry {
93    /// A new structured-telemetry observer.
94    pub fn new() -> Self {
95        Self
96    }
97}
98
99impl Observer for StandardTelemetry {
100    fn on_event(&self, event: &StreamEvent) {
101        tracing::info!(
102            target: "arcly_stream::telemetry",
103            app = %event.app,
104            stream = %event.stream_id,
105            kind = ?event.kind,
106            "stream event",
107        );
108    }
109    fn on_publish_started(&self, app: &str) {
110        tracing::info!(target: "arcly_stream::telemetry", app, "publish started");
111    }
112    fn on_publish_ended(&self, app: &str) {
113        tracing::info!(target: "arcly_stream::telemetry", app, "publish ended");
114    }
115    fn on_subscriber_lagged(&self, key: &StreamKey, skipped: u64) {
116        tracing::warn!(
117            target: "arcly_stream::telemetry",
118            app = %key.app, stream = %key.stream_id, skipped,
119            "subscriber lagged",
120        );
121    }
122    fn on_subscriber_evicted(&self, key: &StreamKey) {
123        tracing::warn!(
124            target: "arcly_stream::telemetry",
125            app = %key.app, stream = %key.stream_id,
126            "subscriber evicted",
127        );
128    }
129    fn on_rate_limited(&self, key: &StreamKey) {
130        tracing::warn!(
131            target: "arcly_stream::telemetry",
132            app = %key.app, stream = %key.stream_id,
133            "ingress rate limited",
134        );
135    }
136    fn on_stream_reaped(&self, key: &StreamKey) {
137        tracing::warn!(
138            target: "arcly_stream::telemetry",
139            app = %key.app, stream = %key.stream_id,
140            "stream reaped (idle)",
141        );
142    }
143    fn on_gop_truncated(&self, key: &StreamKey, capacity: usize) {
144        tracing::warn!(
145            target: "arcly_stream::telemetry",
146            app = %key.app, stream = %key.stream_id, capacity,
147            "gop replay buffer truncated (cache too small for this stream)",
148        );
149    }
150    // on_frame intentionally not logged — hot path, far too high-cardinality.
151}