arcly-stream 0.1.2

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS) — runtime, config, and metrics free.
Documentation
//! Stream-level authentication and authorization.
//!
//! Like [`Observer`](crate::Observer), authorization is an **injected trait with
//! a permit-all default** — the engine never bakes in an identity scheme. A host
//! supplies a [`StreamAuthenticator`] (validating stream keys, signed tokens,
//! IP allow-lists, an external auth service, …) and the engine enforces it on
//! the publish and play paths.
//!
//! ```no_run
//! use arcly_stream::auth::{Credentials, StreamAuthenticator};
//! use arcly_stream::prelude::*;
//! use std::sync::Arc;
//!
//! struct KeyAuth { secret: String }
//!
//! #[async_trait]
//! impl StreamAuthenticator for KeyAuth {
//!     async fn authorize_publish(&self, _key: &StreamKey, creds: &Credentials) -> Result<()> {
//!         match creds.token.as_deref() {
//!             Some(t) if t == self.secret => Ok(()),
//!             _ => Err(StreamError::Unauthorized("bad publish key".into())),
//!         }
//!     }
//! }
//!
//! let engine = Engine::builder()
//!     .application(AppSpec::new("live"))
//!     .authenticator(KeyAuth { secret: "s3cr3t".into() })
//!     .build();
//! # let _ = engine;
//! ```

use crate::{Result, StreamKey};
use async_trait::async_trait;
use std::net::SocketAddr;

/// Credentials presented by a connecting publisher or player.
///
/// Protocol handlers populate whichever fields their transport carries (an RTMP
/// stream key in `token`, a WHIP bearer in `token`, the peer address in `addr`,
/// query parameters in `params`).
#[derive(Debug, Default, Clone)]
pub struct Credentials {
    /// Opaque secret: a stream key, signed URL token, or bearer credential.
    pub token: Option<String>,
    /// Remote peer address, when the transport exposes one.
    pub addr: Option<SocketAddr>,
    /// Arbitrary protocol-supplied key/value parameters (e.g. query string).
    pub params: Vec<(String, String)>,
}

impl Credentials {
    /// Credentials carrying only a token (the common stream-key case).
    pub fn token(token: impl Into<String>) -> Self {
        Self {
            token: Some(token.into()),
            ..Self::default()
        }
    }

    /// Look up a protocol parameter by key.
    pub fn param(&self, key: &str) -> Option<&str> {
        self.params
            .iter()
            .find(|(k, _)| k == key)
            .map(|(_, v)| v.as_str())
    }
}

/// Authorizes publish and play attempts. Both methods **default to permit**, so
/// an implementor overrides only the side it gates.
#[async_trait]
pub trait StreamAuthenticator: Send + Sync + 'static {
    /// Decide whether `creds` may publish to `key`. Return
    /// [`StreamError::Unauthorized`](crate::StreamError::Unauthorized) to reject.
    async fn authorize_publish(&self, _key: &StreamKey, _creds: &Credentials) -> Result<()> {
        Ok(())
    }

    /// Decide whether `creds` may subscribe to `key`.
    async fn authorize_play(&self, _key: &StreamKey, _creds: &Credentials) -> Result<()> {
        Ok(())
    }
}

/// The default authenticator: permits everything. Selected when the builder is
/// given none, preserving the engine's zero-policy default.
#[derive(Debug, Default, Clone, Copy)]
pub struct AllowAll;

impl StreamAuthenticator for AllowAll {}

/// A production-ready, time-limited **signed-token** authenticator
/// (`feature = "auth-token"`).
///
/// A token binds a [`StreamKey`] to an expiry under an HMAC-SHA-256 signature,
/// so it cannot be forged without the shared secret nor replayed past its
/// deadline. The wire form is:
///
/// ```text
/// <expiry_unix_seconds>:<hex(hmac_sha256(secret, "app/stream:expiry"))>
/// ```
///
/// Mint tokens out-of-band (e.g. in your sign-in / "get publish URL" endpoint)
/// with [`sign`](Self::sign); the engine verifies them on the publish path —
/// and, when [`gate_playback`](Self::gate_playback) is set, the play path too.
/// Verification is constant-time and pulls in **no crypto dependency** (the
/// HMAC is a small, test-vector-checked in-crate implementation).
///
/// ```
/// use arcly_stream::auth::TokenAuthenticator;
/// use arcly_stream::StreamKey;
///
/// let auth = TokenAuthenticator::new("super-secret");
/// let key = StreamKey::new("live", "cam-1");
/// // Mint a token valid until some absolute Unix time:
/// let token = auth.sign(&key, 9_999_999_999);
/// assert!(auth.verify(&key, &token).is_ok());
/// // A token for a different stream is rejected:
/// assert!(auth.verify(&StreamKey::new("live", "other"), &token).is_err());
/// ```
#[cfg(feature = "auth-token")]
#[cfg_attr(docsrs, doc(cfg(feature = "auth-token")))]
pub struct TokenAuthenticator {
    secret: Vec<u8>,
    gate_play: bool,
}

#[cfg(feature = "auth-token")]
impl TokenAuthenticator {
    /// New authenticator keyed by `secret`. Gates **publish** only by default;
    /// call [`gate_playback`](Self::gate_playback) to gate play as well.
    pub fn new(secret: impl Into<Vec<u8>>) -> Self {
        Self {
            secret: secret.into(),
            gate_play: false,
        }
    }

    /// Also require a valid token to *play* (subscribe), not just publish.
    pub fn gate_playback(mut self, gate: bool) -> Self {
        self.gate_play = gate;
        self
    }

    /// The signed message a token covers: `"app/stream:expiry"`.
    fn message(key: &StreamKey, expiry: u64) -> String {
        format!("{}/{}:{}", key.app.as_str(), key.stream_id.as_str(), expiry)
    }

    /// Mint a token authorizing `key` until `expires_at` (Unix seconds).
    pub fn sign(&self, key: &StreamKey, expires_at: u64) -> String {
        let mac =
            crate::crypto::hmac_sha256(&self.secret, Self::message(key, expires_at).as_bytes());
        format!("{}:{}", expires_at, crate::crypto::to_hex(&mac))
    }

    /// Verify `token` against `key` and the current wall clock. Returns
    /// [`StreamError::Unauthorized`](crate::StreamError::Unauthorized) on a
    /// malformed, expired, or mis-signed token.
    pub fn verify(&self, key: &StreamKey, token: &str) -> Result<()> {
        let (exp_str, sig) = token
            .split_once(':')
            .ok_or_else(|| crate::StreamError::Unauthorized("malformed token".into()))?;
        let expiry: u64 = exp_str
            .parse()
            .map_err(|_| crate::StreamError::Unauthorized("malformed token expiry".into()))?;
        if now_unix_secs() > expiry {
            return Err(crate::StreamError::Unauthorized("token expired".into()));
        }
        let expected = crate::crypto::to_hex(&crate::crypto::hmac_sha256(
            &self.secret,
            Self::message(key, expiry).as_bytes(),
        ));
        if crate::crypto::constant_time_eq(expected.as_bytes(), sig.as_bytes()) {
            Ok(())
        } else {
            Err(crate::StreamError::Unauthorized(
                "invalid token signature".into(),
            ))
        }
    }

    fn token_for(creds: &Credentials) -> Result<&str> {
        creds
            .token
            .as_deref()
            .ok_or_else(|| crate::StreamError::Unauthorized("missing token".into()))
    }
}

/// Seconds since the Unix epoch, saturating to 0 before 1970.
#[cfg(feature = "auth-token")]
fn now_unix_secs() -> u64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.as_secs())
        .unwrap_or(0)
}

#[cfg(feature = "auth-token")]
#[async_trait]
impl StreamAuthenticator for TokenAuthenticator {
    async fn authorize_publish(&self, key: &StreamKey, creds: &Credentials) -> Result<()> {
        self.verify(key, Self::token_for(creds)?)
    }

    async fn authorize_play(&self, key: &StreamKey, creds: &Credentials) -> Result<()> {
        if self.gate_play {
            self.verify(key, Self::token_for(creds)?)
        } else {
            Ok(())
        }
    }
}

#[cfg(all(test, feature = "auth-token"))]
mod token_tests {
    use super::*;

    #[tokio::test]
    async fn signed_token_authorizes_its_stream() {
        let auth = TokenAuthenticator::new("s3cr3t");
        let key = StreamKey::new("live", "cam");
        let token = auth.sign(&key, now_unix_secs() + 3600);
        let creds = Credentials::token(token);
        assert!(auth.authorize_publish(&key, &creds).await.is_ok());
        // Play is open unless gated.
        assert!(auth
            .authorize_play(&key, &Credentials::default())
            .await
            .is_ok());
    }

    #[tokio::test]
    async fn rejects_wrong_stream_secret_and_expiry() {
        let auth = TokenAuthenticator::new("s3cr3t");
        let key = StreamKey::new("live", "cam");
        let token = auth.sign(&key, now_unix_secs() + 3600);

        // Wrong stream.
        assert!(auth
            .verify(&StreamKey::new("live", "other"), &token)
            .is_err());
        // Wrong secret.
        let other = TokenAuthenticator::new("different");
        assert!(other.verify(&key, &token).is_err());
        // Expired.
        let stale = auth.sign(&key, now_unix_secs().saturating_sub(1));
        assert!(auth.verify(&key, &stale).is_err());
        // Malformed.
        assert!(auth.verify(&key, "not-a-token").is_err());
    }

    #[tokio::test]
    async fn gated_playback_requires_a_token() {
        let auth = TokenAuthenticator::new("s3cr3t").gate_playback(true);
        let key = StreamKey::new("live", "cam");
        assert!(auth
            .authorize_play(&key, &Credentials::default())
            .await
            .is_err());
        let token = auth.sign(&key, now_unix_secs() + 60);
        assert!(auth
            .authorize_play(&key, &Credentials::token(token))
            .await
            .is_ok());
    }
}