arcly_stream/auth.rs
1//! Stream-level authentication and authorization.
2//!
3//! Like [`Observer`](crate::Observer), authorization is an **injected trait with
4//! a permit-all default** — the engine never bakes in an identity scheme. A host
5//! supplies a [`StreamAuthenticator`] (validating stream keys, signed tokens,
6//! IP allow-lists, an external auth service, …) and the engine enforces it on
7//! the publish and play paths.
8//!
9//! ```no_run
10//! use arcly_stream::auth::{Credentials, StreamAuthenticator};
11//! use arcly_stream::prelude::*;
12//! use std::sync::Arc;
13//!
14//! struct KeyAuth { secret: String }
15//!
16//! #[async_trait]
17//! impl StreamAuthenticator for KeyAuth {
18//! async fn authorize_publish(&self, _key: &StreamKey, creds: &Credentials) -> Result<()> {
19//! match creds.token.as_deref() {
20//! Some(t) if t == self.secret => Ok(()),
21//! _ => Err(StreamError::Unauthorized("bad publish key".into())),
22//! }
23//! }
24//! }
25//!
26//! let engine = Engine::builder()
27//! .application(AppSpec::new("live"))
28//! .authenticator(KeyAuth { secret: "s3cr3t".into() })
29//! .build();
30//! # let _ = engine;
31//! ```
32
33use crate::{Result, StreamKey};
34use async_trait::async_trait;
35use std::net::SocketAddr;
36
37/// Credentials presented by a connecting publisher or player.
38///
39/// Protocol handlers populate whichever fields their transport carries (an RTMP
40/// stream key in `token`, a WHIP bearer in `token`, the peer address in `addr`,
41/// query parameters in `params`).
42#[derive(Debug, Default, Clone)]
43pub struct Credentials {
44 /// Opaque secret: a stream key, signed URL token, or bearer credential.
45 pub token: Option<String>,
46 /// Remote peer address, when the transport exposes one.
47 pub addr: Option<SocketAddr>,
48 /// Arbitrary protocol-supplied key/value parameters (e.g. query string).
49 pub params: Vec<(String, String)>,
50}
51
52impl Credentials {
53 /// Credentials carrying only a token (the common stream-key case).
54 pub fn token(token: impl Into<String>) -> Self {
55 Self {
56 token: Some(token.into()),
57 ..Self::default()
58 }
59 }
60
61 /// Look up a protocol parameter by key.
62 pub fn param(&self, key: &str) -> Option<&str> {
63 self.params
64 .iter()
65 .find(|(k, _)| k == key)
66 .map(|(_, v)| v.as_str())
67 }
68}
69
70/// Authorizes publish and play attempts. Both methods **default to permit**, so
71/// an implementor overrides only the side it gates.
72#[async_trait]
73pub trait StreamAuthenticator: Send + Sync + 'static {
74 /// Decide whether `creds` may publish to `key`. Return
75 /// [`StreamError::Unauthorized`](crate::StreamError::Unauthorized) to reject.
76 async fn authorize_publish(&self, _key: &StreamKey, _creds: &Credentials) -> Result<()> {
77 Ok(())
78 }
79
80 /// Decide whether `creds` may subscribe to `key`.
81 async fn authorize_play(&self, _key: &StreamKey, _creds: &Credentials) -> Result<()> {
82 Ok(())
83 }
84}
85
86/// The default authenticator: permits everything. Selected when the builder is
87/// given none, preserving the engine's zero-policy default.
88#[derive(Debug, Default, Clone, Copy)]
89pub struct AllowAll;
90
91impl StreamAuthenticator for AllowAll {}
92
93/// A production-ready, time-limited **signed-token** authenticator
94/// (`feature = "auth-token"`).
95///
96/// A token binds a [`StreamKey`] to an expiry under an HMAC-SHA-256 signature,
97/// so it cannot be forged without the shared secret nor replayed past its
98/// deadline. The wire form is:
99///
100/// ```text
101/// <expiry_unix_seconds>:<hex(hmac_sha256(secret, "app/stream:expiry"))>
102/// ```
103///
104/// Mint tokens out-of-band (e.g. in your sign-in / "get publish URL" endpoint)
105/// with [`sign`](Self::sign); the engine verifies them on the publish path —
106/// and, when [`gate_playback`](Self::gate_playback) is set, the play path too.
107/// Verification is constant-time and pulls in **no crypto dependency** (the
108/// HMAC is a small, test-vector-checked in-crate implementation).
109///
110/// ```
111/// use arcly_stream::auth::TokenAuthenticator;
112/// use arcly_stream::StreamKey;
113///
114/// let auth = TokenAuthenticator::new("super-secret");
115/// let key = StreamKey::new("live", "cam-1");
116/// // Mint a token valid until some absolute Unix time:
117/// let token = auth.sign(&key, 9_999_999_999);
118/// assert!(auth.verify(&key, &token).is_ok());
119/// // A token for a different stream is rejected:
120/// assert!(auth.verify(&StreamKey::new("live", "other"), &token).is_err());
121/// ```
122#[cfg(feature = "auth-token")]
123#[cfg_attr(docsrs, doc(cfg(feature = "auth-token")))]
124pub struct TokenAuthenticator {
125 secret: Vec<u8>,
126 gate_play: bool,
127}
128
129#[cfg(feature = "auth-token")]
130impl TokenAuthenticator {
131 /// New authenticator keyed by `secret`. Gates **publish** only by default;
132 /// call [`gate_playback`](Self::gate_playback) to gate play as well.
133 pub fn new(secret: impl Into<Vec<u8>>) -> Self {
134 Self {
135 secret: secret.into(),
136 gate_play: false,
137 }
138 }
139
140 /// Also require a valid token to *play* (subscribe), not just publish.
141 pub fn gate_playback(mut self, gate: bool) -> Self {
142 self.gate_play = gate;
143 self
144 }
145
146 /// The signed message a token covers: `"app/stream:expiry"`.
147 fn message(key: &StreamKey, expiry: u64) -> String {
148 format!("{}/{}:{}", key.app.as_str(), key.stream_id.as_str(), expiry)
149 }
150
151 /// Mint a token authorizing `key` until `expires_at` (Unix seconds).
152 pub fn sign(&self, key: &StreamKey, expires_at: u64) -> String {
153 let mac =
154 crate::crypto::hmac_sha256(&self.secret, Self::message(key, expires_at).as_bytes());
155 format!("{}:{}", expires_at, crate::crypto::to_hex(&mac))
156 }
157
158 /// Verify `token` against `key` and the current wall clock. Returns
159 /// [`StreamError::Unauthorized`](crate::StreamError::Unauthorized) on a
160 /// malformed, expired, or mis-signed token.
161 pub fn verify(&self, key: &StreamKey, token: &str) -> Result<()> {
162 let (exp_str, sig) = token
163 .split_once(':')
164 .ok_or_else(|| crate::StreamError::Unauthorized("malformed token".into()))?;
165 let expiry: u64 = exp_str
166 .parse()
167 .map_err(|_| crate::StreamError::Unauthorized("malformed token expiry".into()))?;
168 if now_unix_secs() > expiry {
169 return Err(crate::StreamError::Unauthorized("token expired".into()));
170 }
171 let expected = crate::crypto::to_hex(&crate::crypto::hmac_sha256(
172 &self.secret,
173 Self::message(key, expiry).as_bytes(),
174 ));
175 if crate::crypto::constant_time_eq(expected.as_bytes(), sig.as_bytes()) {
176 Ok(())
177 } else {
178 Err(crate::StreamError::Unauthorized(
179 "invalid token signature".into(),
180 ))
181 }
182 }
183
184 fn token_for(creds: &Credentials) -> Result<&str> {
185 creds
186 .token
187 .as_deref()
188 .ok_or_else(|| crate::StreamError::Unauthorized("missing token".into()))
189 }
190}
191
192/// Seconds since the Unix epoch, saturating to 0 before 1970.
193#[cfg(feature = "auth-token")]
194fn now_unix_secs() -> u64 {
195 std::time::SystemTime::now()
196 .duration_since(std::time::UNIX_EPOCH)
197 .map(|d| d.as_secs())
198 .unwrap_or(0)
199}
200
201#[cfg(feature = "auth-token")]
202#[async_trait]
203impl StreamAuthenticator for TokenAuthenticator {
204 async fn authorize_publish(&self, key: &StreamKey, creds: &Credentials) -> Result<()> {
205 self.verify(key, Self::token_for(creds)?)
206 }
207
208 async fn authorize_play(&self, key: &StreamKey, creds: &Credentials) -> Result<()> {
209 if self.gate_play {
210 self.verify(key, Self::token_for(creds)?)
211 } else {
212 Ok(())
213 }
214 }
215}
216
217#[cfg(all(test, feature = "auth-token"))]
218mod token_tests {
219 use super::*;
220
221 #[tokio::test]
222 async fn signed_token_authorizes_its_stream() {
223 let auth = TokenAuthenticator::new("s3cr3t");
224 let key = StreamKey::new("live", "cam");
225 let token = auth.sign(&key, now_unix_secs() + 3600);
226 let creds = Credentials::token(token);
227 assert!(auth.authorize_publish(&key, &creds).await.is_ok());
228 // Play is open unless gated.
229 assert!(auth
230 .authorize_play(&key, &Credentials::default())
231 .await
232 .is_ok());
233 }
234
235 #[tokio::test]
236 async fn rejects_wrong_stream_secret_and_expiry() {
237 let auth = TokenAuthenticator::new("s3cr3t");
238 let key = StreamKey::new("live", "cam");
239 let token = auth.sign(&key, now_unix_secs() + 3600);
240
241 // Wrong stream.
242 assert!(auth
243 .verify(&StreamKey::new("live", "other"), &token)
244 .is_err());
245 // Wrong secret.
246 let other = TokenAuthenticator::new("different");
247 assert!(other.verify(&key, &token).is_err());
248 // Expired.
249 let stale = auth.sign(&key, now_unix_secs().saturating_sub(1));
250 assert!(auth.verify(&key, &stale).is_err());
251 // Malformed.
252 assert!(auth.verify(&key, "not-a-token").is_err());
253 }
254
255 #[tokio::test]
256 async fn gated_playback_requires_a_token() {
257 let auth = TokenAuthenticator::new("s3cr3t").gate_playback(true);
258 let key = StreamKey::new("live", "cam");
259 assert!(auth
260 .authorize_play(&key, &Credentials::default())
261 .await
262 .is_err());
263 let token = auth.sign(&key, now_unix_secs() + 60);
264 assert!(auth
265 .authorize_play(&key, &Credentials::token(token))
266 .await
267 .is_ok());
268 }
269}