Skip to main content

arcly_stream/
auth.rs

1//! Stream-level authentication and authorization.
2//!
3//! Like [`Observer`](crate::Observer), authorization is an **injected trait with
4//! a permit-all default** — the engine never bakes in an identity scheme. A host
5//! supplies a [`StreamAuthenticator`] (validating stream keys, signed tokens,
6//! IP allow-lists, an external auth service, …) and the engine enforces it on
7//! the publish and play paths.
8//!
9//! ```no_run
10//! use arcly_stream::auth::{Credentials, StreamAuthenticator};
11//! use arcly_stream::prelude::*;
12//! use std::sync::Arc;
13//!
14//! struct KeyAuth { secret: String }
15//!
16//! #[async_trait]
17//! impl StreamAuthenticator for KeyAuth {
18//!     async fn authorize_publish(&self, _key: &StreamKey, creds: &Credentials) -> Result<()> {
19//!         match creds.token.as_deref() {
20//!             Some(t) if t == self.secret => Ok(()),
21//!             _ => Err(StreamError::Unauthorized("bad publish key".into())),
22//!         }
23//!     }
24//! }
25//!
26//! let engine = Engine::builder()
27//!     .application(AppSpec::new("live"))
28//!     .authenticator(KeyAuth { secret: "s3cr3t".into() })
29//!     .build();
30//! # let _ = engine;
31//! ```
32
33use crate::{Result, StreamKey};
34use async_trait::async_trait;
35use std::net::SocketAddr;
36
37/// Credentials presented by a connecting publisher or player.
38///
39/// Protocol handlers populate whichever fields their transport carries (an RTMP
40/// stream key in `token`, a WHIP bearer in `token`, the peer address in `addr`,
41/// query parameters in `params`).
42#[derive(Debug, Default, Clone)]
43pub struct Credentials {
44    /// Opaque secret: a stream key, signed URL token, or bearer credential.
45    pub token: Option<String>,
46    /// Remote peer address, when the transport exposes one.
47    pub addr: Option<SocketAddr>,
48    /// Arbitrary protocol-supplied key/value parameters (e.g. query string).
49    pub params: Vec<(String, String)>,
50}
51
52impl Credentials {
53    /// Credentials carrying only a token (the common stream-key case).
54    pub fn token(token: impl Into<String>) -> Self {
55        Self {
56            token: Some(token.into()),
57            ..Self::default()
58        }
59    }
60
61    /// Look up a protocol parameter by key.
62    pub fn param(&self, key: &str) -> Option<&str> {
63        self.params
64            .iter()
65            .find(|(k, _)| k == key)
66            .map(|(_, v)| v.as_str())
67    }
68}
69
70/// Authorizes publish and play attempts. Both methods **default to permit**, so
71/// an implementor overrides only the side it gates.
72#[async_trait]
73pub trait StreamAuthenticator: Send + Sync + 'static {
74    /// Decide whether `creds` may publish to `key`. Return
75    /// [`StreamError::Unauthorized`](crate::StreamError::Unauthorized) to reject.
76    async fn authorize_publish(&self, _key: &StreamKey, _creds: &Credentials) -> Result<()> {
77        Ok(())
78    }
79
80    /// Decide whether `creds` may subscribe to `key`.
81    async fn authorize_play(&self, _key: &StreamKey, _creds: &Credentials) -> Result<()> {
82        Ok(())
83    }
84}
85
86/// The default authenticator: permits everything. Selected when the builder is
87/// given none, preserving the engine's zero-policy default.
88#[derive(Debug, Default, Clone, Copy)]
89pub struct AllowAll;
90
91impl StreamAuthenticator for AllowAll {}
92
93/// A production-ready, time-limited **signed-token** authenticator
94/// (`feature = "auth-token"`).
95///
96/// A token binds a [`StreamKey`] to an expiry under an HMAC-SHA-256 signature,
97/// so it cannot be forged without the shared secret nor replayed past its
98/// deadline. The wire form is:
99///
100/// ```text
101/// <expiry_unix_seconds>:<hex(hmac_sha256(secret, "app/stream:expiry"))>
102/// ```
103///
104/// Mint tokens out-of-band (e.g. in your sign-in / "get publish URL" endpoint)
105/// with [`sign`](Self::sign); the engine verifies them on the publish path —
106/// and, when [`gate_playback`](Self::gate_playback) is set, the play path too.
107/// Verification is constant-time and pulls in **no crypto dependency** (the
108/// HMAC is a small, test-vector-checked in-crate implementation).
109///
110/// ```
111/// use arcly_stream::auth::TokenAuthenticator;
112/// use arcly_stream::StreamKey;
113///
114/// let auth = TokenAuthenticator::new("super-secret");
115/// let key = StreamKey::new("live", "cam-1");
116/// // Mint a token valid until some absolute Unix time:
117/// let token = auth.sign(&key, 9_999_999_999);
118/// assert!(auth.verify(&key, &token).is_ok());
119/// // A token for a different stream is rejected:
120/// assert!(auth.verify(&StreamKey::new("live", "other"), &token).is_err());
121/// ```
122#[cfg(feature = "auth-token")]
123#[cfg_attr(docsrs, doc(cfg(feature = "auth-token")))]
124pub struct TokenAuthenticator {
125    secret: Vec<u8>,
126    gate_play: bool,
127}
128
129#[cfg(feature = "auth-token")]
130impl TokenAuthenticator {
131    /// New authenticator keyed by `secret`. Gates **publish** only by default;
132    /// call [`gate_playback`](Self::gate_playback) to gate play as well.
133    pub fn new(secret: impl Into<Vec<u8>>) -> Self {
134        Self {
135            secret: secret.into(),
136            gate_play: false,
137        }
138    }
139
140    /// Also require a valid token to *play* (subscribe), not just publish.
141    pub fn gate_playback(mut self, gate: bool) -> Self {
142        self.gate_play = gate;
143        self
144    }
145
146    /// The signed message a token covers: `"app/stream:expiry"`.
147    fn message(key: &StreamKey, expiry: u64) -> String {
148        format!("{}/{}:{}", key.app.as_str(), key.stream_id.as_str(), expiry)
149    }
150
151    /// Mint a token authorizing `key` until `expires_at` (Unix seconds).
152    pub fn sign(&self, key: &StreamKey, expires_at: u64) -> String {
153        let mac =
154            crate::crypto::hmac_sha256(&self.secret, Self::message(key, expires_at).as_bytes());
155        format!("{}:{}", expires_at, crate::crypto::to_hex(&mac))
156    }
157
158    /// Verify `token` against `key` and the current wall clock. Returns
159    /// [`StreamError::Unauthorized`](crate::StreamError::Unauthorized) on a
160    /// malformed, expired, or mis-signed token.
161    pub fn verify(&self, key: &StreamKey, token: &str) -> Result<()> {
162        let (exp_str, sig) = token
163            .split_once(':')
164            .ok_or_else(|| crate::StreamError::Unauthorized("malformed token".into()))?;
165        let expiry: u64 = exp_str
166            .parse()
167            .map_err(|_| crate::StreamError::Unauthorized("malformed token expiry".into()))?;
168        if now_unix_secs() > expiry {
169            return Err(crate::StreamError::Unauthorized("token expired".into()));
170        }
171        let expected = crate::crypto::to_hex(&crate::crypto::hmac_sha256(
172            &self.secret,
173            Self::message(key, expiry).as_bytes(),
174        ));
175        if crate::crypto::constant_time_eq(expected.as_bytes(), sig.as_bytes()) {
176            Ok(())
177        } else {
178            Err(crate::StreamError::Unauthorized(
179                "invalid token signature".into(),
180            ))
181        }
182    }
183
184    fn token_for(creds: &Credentials) -> Result<&str> {
185        creds
186            .token
187            .as_deref()
188            .ok_or_else(|| crate::StreamError::Unauthorized("missing token".into()))
189    }
190}
191
192/// Seconds since the Unix epoch, saturating to 0 before 1970.
193#[cfg(feature = "auth-token")]
194fn now_unix_secs() -> u64 {
195    std::time::SystemTime::now()
196        .duration_since(std::time::UNIX_EPOCH)
197        .map(|d| d.as_secs())
198        .unwrap_or(0)
199}
200
201#[cfg(feature = "auth-token")]
202#[async_trait]
203impl StreamAuthenticator for TokenAuthenticator {
204    async fn authorize_publish(&self, key: &StreamKey, creds: &Credentials) -> Result<()> {
205        self.verify(key, Self::token_for(creds)?)
206    }
207
208    async fn authorize_play(&self, key: &StreamKey, creds: &Credentials) -> Result<()> {
209        if self.gate_play {
210            self.verify(key, Self::token_for(creds)?)
211        } else {
212            Ok(())
213        }
214    }
215}
216
217#[cfg(all(test, feature = "auth-token"))]
218mod token_tests {
219    use super::*;
220
221    #[tokio::test]
222    async fn signed_token_authorizes_its_stream() {
223        let auth = TokenAuthenticator::new("s3cr3t");
224        let key = StreamKey::new("live", "cam");
225        let token = auth.sign(&key, now_unix_secs() + 3600);
226        let creds = Credentials::token(token);
227        assert!(auth.authorize_publish(&key, &creds).await.is_ok());
228        // Play is open unless gated.
229        assert!(auth
230            .authorize_play(&key, &Credentials::default())
231            .await
232            .is_ok());
233    }
234
235    #[tokio::test]
236    async fn rejects_wrong_stream_secret_and_expiry() {
237        let auth = TokenAuthenticator::new("s3cr3t");
238        let key = StreamKey::new("live", "cam");
239        let token = auth.sign(&key, now_unix_secs() + 3600);
240
241        // Wrong stream.
242        assert!(auth
243            .verify(&StreamKey::new("live", "other"), &token)
244            .is_err());
245        // Wrong secret.
246        let other = TokenAuthenticator::new("different");
247        assert!(other.verify(&key, &token).is_err());
248        // Expired.
249        let stale = auth.sign(&key, now_unix_secs().saturating_sub(1));
250        assert!(auth.verify(&key, &stale).is_err());
251        // Malformed.
252        assert!(auth.verify(&key, "not-a-token").is_err());
253    }
254
255    #[tokio::test]
256    async fn gated_playback_requires_a_token() {
257        let auth = TokenAuthenticator::new("s3cr3t").gate_playback(true);
258        let key = StreamKey::new("live", "cam");
259        assert!(auth
260            .authorize_play(&key, &Credentials::default())
261            .await
262            .is_err());
263        let token = auth.sign(&key, now_unix_secs() + 60);
264        assert!(auth
265            .authorize_play(&key, &Credentials::token(token))
266            .await
267            .is_ok());
268    }
269}