arcly-stream 0.1.4

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Telemetry hook — the injected replacement for `stream-center`'s global
//! Prometheus singleton (`sc_metrics::Metrics::global()`).
//!
//! The engine calls these methods on lifecycle transitions and (opt-in) per
//! frame. The **default no-op** means the pure engine has zero observability
//! dependencies; wire a real sink only when you want one.

use crate::bus::StreamEvent;
use crate::{MediaFrame, StreamKey};

/// Telemetry observer. All methods default to no-ops so implementors override
/// only what they care about. Compare `arcly-http`'s `AuditSink` / `HealthCheck`
/// injection points: behaviour is supplied by the host, never reached for
/// through global state.
pub trait Observer: Send + Sync + 'static {
    /// A stream lifecycle event was emitted.
    fn on_event(&self, _event: &StreamEvent) {}

    /// A publish session started in application `app`.
    fn on_publish_started(&self, _app: &str) {}

    /// A publish session ended in application `app`.
    fn on_publish_ended(&self, _app: &str) {}

    /// A frame was published. **Hot-path hook — keep it cheap.** The default
    /// is a no-op so the branch is trivially predicted away when no observer
    /// is installed.
    fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {}

    /// A subscriber fell behind the broadcast buffer and `skipped` frames were
    /// dropped from its view before it could read them. This is the canonical
    /// slow-consumer signal: alert on it. Fired by
    /// [`StreamHandle::subscribe_resilient`](crate::StreamHandle::subscribe_resilient).
    fn on_subscriber_lagged(&self, _key: &StreamKey, _skipped: u64) {}

    /// A chronically slow subscriber crossed its `max_lag` budget and was shed.
    /// Fired by [`Subscription::recv`](crate::Subscription::recv).
    fn on_subscriber_evicted(&self, _key: &StreamKey) {}

    /// An ingress rate limit was exceeded for `key`; the protocol handler should
    /// drop or backpressure the connection. Handlers call this from their
    /// ingest loop (e.g. when [`IngestRateLimit`](crate::protocol::IngestRateLimit)
    /// returns `false`).
    fn on_rate_limited(&self, _key: &StreamKey) {}

    /// A publish session was reaped for exceeding the engine's idle timeout.
    fn on_stream_reaped(&self, _key: &StreamKey) {}
}

/// The default observer. Selected automatically when the builder gets none.
#[derive(Debug, Default, Clone, Copy)]
pub struct NoopObserver;

impl Observer for NoopObserver {}

/// A ready-to-use [`Observer`] that emits **structured `tracing` events** for
/// every stream lifecycle transition — turnkey operational telemetry with no
/// boilerplate and no extra dependency.
///
/// Each hook logs under the `arcly_stream::telemetry` target with structured
/// fields (`app`, `stream`, `skipped`), so a `tracing-subscriber` can render it
/// to stdout, JSON, or a rolling file appender. Routine transitions log at
/// `INFO`; slow-consumer/rate-limit/reap signals log at `WARN` so they surface
/// in alerting. The per-frame hook is intentionally **not** logged — it is
/// far too high-cardinality for the hot path; use the
/// [`PrometheusObserver`](crate::observability::PrometheusObserver) (`metrics`
/// feature) for frame counters.
///
/// ```
/// use arcly_stream::observe::StandardTelemetry;
/// use arcly_stream::prelude::*;
///
/// let engine = Engine::builder()
///     .application(AppSpec::new("live"))
///     .observer(StandardTelemetry::new())
///     .build();
/// # let _ = engine;
/// ```
#[derive(Debug, Default, Clone, Copy)]
pub struct StandardTelemetry;

impl StandardTelemetry {
    /// A new structured-telemetry observer.
    pub fn new() -> Self {
        Self
    }
}

impl Observer for StandardTelemetry {
    fn on_event(&self, event: &StreamEvent) {
        tracing::info!(
            target: "arcly_stream::telemetry",
            app = %event.app,
            stream = %event.stream_id,
            kind = ?event.kind,
            "stream event",
        );
    }
    fn on_publish_started(&self, app: &str) {
        tracing::info!(target: "arcly_stream::telemetry", app, "publish started");
    }
    fn on_publish_ended(&self, app: &str) {
        tracing::info!(target: "arcly_stream::telemetry", app, "publish ended");
    }
    fn on_subscriber_lagged(&self, key: &StreamKey, skipped: u64) {
        tracing::warn!(
            target: "arcly_stream::telemetry",
            app = %key.app, stream = %key.stream_id, skipped,
            "subscriber lagged",
        );
    }
    fn on_subscriber_evicted(&self, key: &StreamKey) {
        tracing::warn!(
            target: "arcly_stream::telemetry",
            app = %key.app, stream = %key.stream_id,
            "subscriber evicted",
        );
    }
    fn on_rate_limited(&self, key: &StreamKey) {
        tracing::warn!(
            target: "arcly_stream::telemetry",
            app = %key.app, stream = %key.stream_id,
            "ingress rate limited",
        );
    }
    fn on_stream_reaped(&self, key: &StreamKey) {
        tracing::warn!(
            target: "arcly_stream::telemetry",
            app = %key.app, stream = %key.stream_id,
            "stream reaped (idle)",
        );
    }
    // on_frame intentionally not logged — hot path, far too high-cardinality.
}