arcly_stream/observe.rs
1//! Telemetry hook — the injected replacement for `stream-center`'s global
2//! Prometheus singleton (`sc_metrics::Metrics::global()`).
3//!
4//! The engine calls these methods on lifecycle transitions and (opt-in) per
5//! frame. The **default no-op** means the pure engine has zero observability
6//! dependencies; wire a real sink only when you want one.
7
8use crate::bus::StreamEvent;
9use crate::{MediaFrame, StreamKey};
10
11/// Telemetry observer. All methods default to no-ops so implementors override
12/// only what they care about. Compare `arcly-http`'s `AuditSink` / `HealthCheck`
13/// injection points: behaviour is supplied by the host, never reached for
14/// through global state.
15pub trait Observer: Send + Sync + 'static {
16 /// A stream lifecycle event was emitted.
17 fn on_event(&self, _event: &StreamEvent) {}
18
19 /// A publish session started in application `app`.
20 fn on_publish_started(&self, _app: &str) {}
21
22 /// A publish session ended in application `app`.
23 fn on_publish_ended(&self, _app: &str) {}
24
25 /// A frame was published. **Hot-path hook — keep it cheap.** The default
26 /// is a no-op so the branch is trivially predicted away when no observer
27 /// is installed.
28 fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {}
29
30 /// A subscriber fell behind the broadcast buffer and `skipped` frames were
31 /// dropped from its view before it could read them. This is the canonical
32 /// slow-consumer signal: alert on it. Fired by
33 /// [`StreamHandle::subscribe_resilient`](crate::StreamHandle::subscribe_resilient).
34 fn on_subscriber_lagged(&self, _key: &StreamKey, _skipped: u64) {}
35
36 /// A chronically slow subscriber crossed its `max_lag` budget and was shed.
37 /// Fired by [`Subscription::recv`](crate::Subscription::recv).
38 fn on_subscriber_evicted(&self, _key: &StreamKey) {}
39
40 /// An ingress rate limit was exceeded for `key`; the protocol handler should
41 /// drop or backpressure the connection. Handlers call this from their
42 /// ingest loop (e.g. when [`IngestRateLimit`](crate::protocol::IngestRateLimit)
43 /// returns `false`).
44 fn on_rate_limited(&self, _key: &StreamKey) {}
45
46 /// A publish session was reaped for exceeding the engine's idle timeout.
47 fn on_stream_reaped(&self, _key: &StreamKey) {}
48}
49
50/// The default observer. Selected automatically when the builder gets none.
51#[derive(Debug, Default, Clone, Copy)]
52pub struct NoopObserver;
53
54impl Observer for NoopObserver {}
55
56/// A ready-to-use [`Observer`] that emits **structured `tracing` events** for
57/// every stream lifecycle transition — turnkey operational telemetry with no
58/// boilerplate and no extra dependency.
59///
60/// Each hook logs under the `arcly_stream::telemetry` target with structured
61/// fields (`app`, `stream`, `skipped`), so a `tracing-subscriber` can render it
62/// to stdout, JSON, or a rolling file appender. Routine transitions log at
63/// `INFO`; slow-consumer/rate-limit/reap signals log at `WARN` so they surface
64/// in alerting. The per-frame hook is intentionally **not** logged — it is
65/// far too high-cardinality for the hot path; use the
66/// [`PrometheusObserver`](crate::observability::PrometheusObserver) (`metrics`
67/// feature) for frame counters.
68///
69/// ```
70/// use arcly_stream::observe::StandardTelemetry;
71/// use arcly_stream::prelude::*;
72///
73/// let engine = Engine::builder()
74/// .application(AppSpec::new("live"))
75/// .observer(StandardTelemetry::new())
76/// .build();
77/// # let _ = engine;
78/// ```
79#[derive(Debug, Default, Clone, Copy)]
80pub struct StandardTelemetry;
81
82impl StandardTelemetry {
83 /// A new structured-telemetry observer.
84 pub fn new() -> Self {
85 Self
86 }
87}
88
89impl Observer for StandardTelemetry {
90 fn on_event(&self, event: &StreamEvent) {
91 tracing::info!(
92 target: "arcly_stream::telemetry",
93 app = %event.app,
94 stream = %event.stream_id,
95 kind = ?event.kind,
96 "stream event",
97 );
98 }
99 fn on_publish_started(&self, app: &str) {
100 tracing::info!(target: "arcly_stream::telemetry", app, "publish started");
101 }
102 fn on_publish_ended(&self, app: &str) {
103 tracing::info!(target: "arcly_stream::telemetry", app, "publish ended");
104 }
105 fn on_subscriber_lagged(&self, key: &StreamKey, skipped: u64) {
106 tracing::warn!(
107 target: "arcly_stream::telemetry",
108 app = %key.app, stream = %key.stream_id, skipped,
109 "subscriber lagged",
110 );
111 }
112 fn on_subscriber_evicted(&self, key: &StreamKey) {
113 tracing::warn!(
114 target: "arcly_stream::telemetry",
115 app = %key.app, stream = %key.stream_id,
116 "subscriber evicted",
117 );
118 }
119 fn on_rate_limited(&self, key: &StreamKey) {
120 tracing::warn!(
121 target: "arcly_stream::telemetry",
122 app = %key.app, stream = %key.stream_id,
123 "ingress rate limited",
124 );
125 }
126 fn on_stream_reaped(&self, key: &StreamKey) {
127 tracing::warn!(
128 target: "arcly_stream::telemetry",
129 app = %key.app, stream = %key.stream_id,
130 "stream reaped (idle)",
131 );
132 }
133 // on_frame intentionally not logged — hot path, far too high-cardinality.
134}