use crate::{Result, StreamKey};
use async_trait::async_trait;
use std::net::SocketAddr;
#[derive(Debug, Default, Clone)]
pub struct Credentials {
pub token: Option<String>,
pub addr: Option<SocketAddr>,
pub params: Vec<(String, String)>,
}
impl Credentials {
pub fn token(token: impl Into<String>) -> Self {
Self {
token: Some(token.into()),
..Self::default()
}
}
pub fn param(&self, key: &str) -> Option<&str> {
self.params
.iter()
.find(|(k, _)| k == key)
.map(|(_, v)| v.as_str())
}
}
#[async_trait]
pub trait StreamAuthenticator: Send + Sync + 'static {
async fn authorize_publish(&self, _key: &StreamKey, _creds: &Credentials) -> Result<()> {
Ok(())
}
async fn authorize_play(&self, _key: &StreamKey, _creds: &Credentials) -> Result<()> {
Ok(())
}
}
#[derive(Debug, Default, Clone, Copy)]
pub struct AllowAll;
impl StreamAuthenticator for AllowAll {}
#[cfg(feature = "auth-token")]
#[cfg_attr(docsrs, doc(cfg(feature = "auth-token")))]
pub struct TokenAuthenticator {
secret: Vec<u8>,
gate_play: bool,
}
#[cfg(feature = "auth-token")]
impl TokenAuthenticator {
pub fn new(secret: impl Into<Vec<u8>>) -> Self {
Self {
secret: secret.into(),
gate_play: false,
}
}
pub fn gate_playback(mut self, gate: bool) -> Self {
self.gate_play = gate;
self
}
fn message(key: &StreamKey, expiry: u64) -> String {
format!("{}/{}:{}", key.app.as_str(), key.stream_id.as_str(), expiry)
}
pub fn sign(&self, key: &StreamKey, expires_at: u64) -> String {
let mac =
crate::crypto::hmac_sha256(&self.secret, Self::message(key, expires_at).as_bytes());
format!("{}:{}", expires_at, crate::crypto::to_hex(&mac))
}
pub fn verify(&self, key: &StreamKey, token: &str) -> Result<()> {
let (exp_str, sig) = token
.split_once(':')
.ok_or_else(|| crate::StreamError::Unauthorized("malformed token".into()))?;
let expiry: u64 = exp_str
.parse()
.map_err(|_| crate::StreamError::Unauthorized("malformed token expiry".into()))?;
if now_unix_secs() > expiry {
return Err(crate::StreamError::Unauthorized("token expired".into()));
}
let expected = crate::crypto::to_hex(&crate::crypto::hmac_sha256(
&self.secret,
Self::message(key, expiry).as_bytes(),
));
if crate::crypto::constant_time_eq(expected.as_bytes(), sig.as_bytes()) {
Ok(())
} else {
Err(crate::StreamError::Unauthorized(
"invalid token signature".into(),
))
}
}
fn token_for(creds: &Credentials) -> Result<&str> {
creds
.token
.as_deref()
.ok_or_else(|| crate::StreamError::Unauthorized("missing token".into()))
}
}
#[cfg(feature = "auth-token")]
fn now_unix_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[cfg(feature = "auth-token")]
#[async_trait]
impl StreamAuthenticator for TokenAuthenticator {
async fn authorize_publish(&self, key: &StreamKey, creds: &Credentials) -> Result<()> {
self.verify(key, Self::token_for(creds)?)
}
async fn authorize_play(&self, key: &StreamKey, creds: &Credentials) -> Result<()> {
if self.gate_play {
self.verify(key, Self::token_for(creds)?)
} else {
Ok(())
}
}
}
#[cfg(all(test, feature = "auth-token"))]
mod token_tests {
use super::*;
#[tokio::test]
async fn signed_token_authorizes_its_stream() {
let auth = TokenAuthenticator::new("s3cr3t");
let key = StreamKey::new("live", "cam");
let token = auth.sign(&key, now_unix_secs() + 3600);
let creds = Credentials::token(token);
assert!(auth.authorize_publish(&key, &creds).await.is_ok());
assert!(auth
.authorize_play(&key, &Credentials::default())
.await
.is_ok());
}
#[tokio::test]
async fn rejects_wrong_stream_secret_and_expiry() {
let auth = TokenAuthenticator::new("s3cr3t");
let key = StreamKey::new("live", "cam");
let token = auth.sign(&key, now_unix_secs() + 3600);
assert!(auth
.verify(&StreamKey::new("live", "other"), &token)
.is_err());
let other = TokenAuthenticator::new("different");
assert!(other.verify(&key, &token).is_err());
let stale = auth.sign(&key, now_unix_secs().saturating_sub(1));
assert!(auth.verify(&key, &stale).is_err());
assert!(auth.verify(&key, "not-a-token").is_err());
}
#[tokio::test]
async fn gated_playback_requires_a_token() {
let auth = TokenAuthenticator::new("s3cr3t").gate_playback(true);
let key = StreamKey::new("live", "cam");
assert!(auth
.authorize_play(&key, &Credentials::default())
.await
.is_err());
let token = auth.sign(&key, now_unix_secs() + 60);
assert!(auth
.authorize_play(&key, &Credentials::token(token))
.await
.is_ok());
}
}