use crate::error::IngestError;
use bytes::Bytes;
use rml_rtmp::handshake::{Handshake, HandshakeProcessResult, PeerType};
use rml_rtmp::sessions::{ServerSession, ServerSessionConfig, ServerSessionEvent, ServerSessionResult};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct RtmpConfig {
pub bind_addr: SocketAddr,
}
impl Default for RtmpConfig {
fn default() -> Self {
Self {
bind_addr: ([0, 0, 0, 0], 1935).into(),
}
}
}
pub type MediaCallback = Arc<dyn Fn(&str, &str, Bytes, u32) + Send + Sync>;
pub type StreamCallback = Arc<dyn Fn(&str, &str) + Send + Sync>;
pub type AuthCallback = Arc<dyn Fn(&str, &str) -> bool + Send + Sync>;
pub type Scte35Callback = Arc<dyn Fn(&str, &str, Bytes) + Send + Sync>;
pub type SessionRegisterFn = Arc<dyn Fn(String, SocketAddr, CancellationToken) + Send + Sync>;
pub type SessionDeregisterFn = Arc<dyn Fn(&str) + Send + Sync>;
#[derive(Clone)]
pub struct SessionRegistrar {
pub register: SessionRegisterFn,
pub deregister: SessionDeregisterFn,
}
pub struct RtmpServer {
config: RtmpConfig,
on_video: MediaCallback,
on_audio: MediaCallback,
on_publish: StreamCallback,
on_unpublish: StreamCallback,
validate_publish: Option<AuthCallback>,
on_scte35: Option<Scte35Callback>,
session_registrar: Option<SessionRegistrar>,
}
impl RtmpServer {
pub fn new(
config: RtmpConfig,
on_video: impl Fn(&str, &str, Bytes, u32) + Send + Sync + 'static,
on_audio: impl Fn(&str, &str, Bytes, u32) + Send + Sync + 'static,
on_publish: impl Fn(&str, &str) + Send + Sync + 'static,
on_unpublish: impl Fn(&str, &str) + Send + Sync + 'static,
) -> Self {
Self {
config,
on_video: Arc::new(on_video),
on_audio: Arc::new(on_audio),
on_publish: Arc::new(on_publish),
on_unpublish: Arc::new(on_unpublish),
validate_publish: None,
on_scte35: None,
session_registrar: None,
}
}
pub fn from_callbacks(
config: RtmpConfig,
on_video: MediaCallback,
on_audio: MediaCallback,
on_publish: StreamCallback,
on_unpublish: StreamCallback,
) -> Self {
Self {
config,
on_video,
on_audio,
on_publish,
on_unpublish,
validate_publish: None,
on_scte35: None,
session_registrar: None,
}
}
pub fn set_validate_publish(&mut self, validate: AuthCallback) {
self.validate_publish = Some(validate);
}
pub fn set_scte35_callback(&mut self, cb: Scte35Callback) {
self.on_scte35 = Some(cb);
}
pub fn set_session_registrar(&mut self, registrar: SessionRegistrar) {
self.session_registrar = Some(registrar);
}
pub fn config(&self) -> &RtmpConfig {
&self.config
}
pub async fn run(&self, shutdown: CancellationToken) -> Result<(), IngestError> {
let listener = TcpListener::bind(self.config.bind_addr).await?;
info!(addr = %self.config.bind_addr, "RTMP ingest listening");
self.run_with_listener(listener, shutdown).await
}
pub async fn run_with_listener(
&self,
listener: TcpListener,
shutdown: CancellationToken,
) -> Result<(), IngestError> {
loop {
tokio::select! {
result = listener.accept() => {
let (stream, peer_addr) = result?;
info!(%peer_addr, "RTMP connection accepted");
metrics::counter!("lvqr_rtmp_connections_total").increment(1);
let on_video = self.on_video.clone();
let on_audio = self.on_audio.clone();
let on_publish = self.on_publish.clone();
let on_unpublish = self.on_unpublish.clone();
let validate_publish = self.validate_publish.clone();
let on_scte35 = self.on_scte35.clone();
let session_registrar = self.session_registrar.clone();
tokio::spawn(async move {
if let Err(e) = handle_rtmp_connection(
stream,
peer_addr,
on_video,
on_audio,
on_publish,
on_unpublish,
validate_publish,
on_scte35,
session_registrar,
)
.await
{
error!(%peer_addr, error = %e, "RTMP session error");
}
});
}
_ = shutdown.cancelled() => {
info!("RTMP shutdown signal received");
break;
}
}
}
Ok(())
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_rtmp_connection(
mut stream: TcpStream,
peer_addr: SocketAddr,
on_video: MediaCallback,
on_audio: MediaCallback,
on_publish: StreamCallback,
on_unpublish: StreamCallback,
validate_publish: Option<AuthCallback>,
on_scte35: Option<Scte35Callback>,
session_registrar: Option<SessionRegistrar>,
) -> Result<(), IngestError> {
let mut handshake = Handshake::new(PeerType::Server);
let mut buf = vec![0u8; 4096];
let p0_and_p1 = handshake
.generate_outbound_p0_and_p1()
.map_err(|e| IngestError::Protocol(format!("handshake generate error: {e:?}")))?;
stream.write_all(&p0_and_p1).await?;
loop {
let n = stream.read(&mut buf).await?;
if n == 0 {
return Err(IngestError::Protocol("connection closed during handshake".into()));
}
match handshake
.process_bytes(&buf[..n])
.map_err(|e| IngestError::Protocol(format!("handshake error: {e:?}")))?
{
HandshakeProcessResult::InProgress { response_bytes } => {
if !response_bytes.is_empty() {
stream.write_all(&response_bytes).await?;
}
}
HandshakeProcessResult::Completed {
response_bytes,
remaining_bytes,
} => {
if !response_bytes.is_empty() {
stream.write_all(&response_bytes).await?;
}
debug!("RTMP handshake complete");
return handle_rtmp_session(
stream,
peer_addr,
remaining_bytes,
on_video,
on_audio,
on_publish,
on_unpublish,
validate_publish,
on_scte35,
session_registrar,
)
.await;
}
}
}
}
struct DeregisterGuard {
name: Option<String>,
deregister: Option<SessionDeregisterFn>,
}
impl Drop for DeregisterGuard {
fn drop(&mut self) {
if let (Some(name), Some(deregister)) = (&self.name, &self.deregister) {
(deregister)(name);
}
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_rtmp_session(
mut stream: TcpStream,
peer_addr: SocketAddr,
remaining_bytes: Vec<u8>,
on_video: MediaCallback,
on_audio: MediaCallback,
on_publish: StreamCallback,
on_unpublish: StreamCallback,
validate_publish: Option<AuthCallback>,
on_scte35: Option<Scte35Callback>,
session_registrar: Option<SessionRegistrar>,
) -> Result<(), IngestError> {
let config = ServerSessionConfig::new();
let (mut session, initial_results) =
ServerSession::new(config).map_err(|e| IngestError::Protocol(format!("session init error: {e:?}")))?;
let session_cancel = CancellationToken::new();
let mut dereg = DeregisterGuard {
name: None,
deregister: session_registrar.as_ref().map(|r| r.deregister.clone()),
};
for result in initial_results {
if let ServerSessionResult::OutboundResponse(packet) = result {
stream.write_all(&packet.bytes).await?;
}
}
if !remaining_bytes.is_empty() {
let results = session
.handle_input(&remaining_bytes)
.map_err(|e| IngestError::Protocol(format!("session input error: {e:?}")))?;
process_session_results(
&mut stream,
&session,
&results,
&on_video,
&on_audio,
&on_publish,
&on_unpublish,
)
.await?;
}
let mut buf = vec![0u8; 65536]; let mut current_app = String::new();
let mut current_key = String::new();
loop {
let n = tokio::select! {
biased;
_ = session_cancel.cancelled() => {
info!(%peer_addr, "RTMP publisher session cancelled by admin");
if !current_app.is_empty() && !current_key.is_empty() {
(on_unpublish)(¤t_app, ¤t_key);
}
return Ok(());
}
r = stream.read(&mut buf) => r?,
};
if n == 0 {
if !current_app.is_empty() && !current_key.is_empty() {
(on_unpublish)(¤t_app, ¤t_key);
}
return Ok(());
}
let results = session
.handle_input(&buf[..n])
.map_err(|e| IngestError::Protocol(format!("session input error: {e:?}")))?;
for result in &results {
match result {
ServerSessionResult::OutboundResponse(packet) => {
stream.write_all(&packet.bytes).await?;
}
ServerSessionResult::RaisedEvent(event) => match event {
ServerSessionEvent::ConnectionRequested { request_id, app_name } => {
info!(app = %app_name, "RTMP connection requested");
current_app = app_name.clone();
let accept_results = session
.accept_request(*request_id)
.map_err(|e| IngestError::Protocol(format!("accept error: {e:?}")))?;
for r in &accept_results {
if let ServerSessionResult::OutboundResponse(p) = r {
stream.write_all(&p.bytes).await?;
}
}
}
ServerSessionEvent::PublishStreamRequested {
request_id,
app_name,
stream_key,
..
} => {
info!(app = %app_name, key = %stream_key, "RTMP publish requested");
if let Some(ref validate) = validate_publish {
if !(validate)(app_name, stream_key) {
info!(
app = %app_name,
"RTMP publish rejected by auth provider"
);
metrics::counter!("lvqr_auth_failures_total", "entry" => "rtmp").increment(1);
return Ok(());
}
}
current_key = stream_key.clone();
let accept_results = session
.accept_request(*request_id)
.map_err(|e| IngestError::Protocol(format!("accept error: {e:?}")))?;
for r in &accept_results {
if let ServerSessionResult::OutboundResponse(p) = r {
stream.write_all(&p.bytes).await?;
}
}
(on_publish)(app_name, stream_key);
if let Some(reg) = &session_registrar {
let name = format!("{}/{}", app_name, stream_key);
(reg.register)(name.clone(), peer_addr, session_cancel.clone());
dereg.name = Some(name);
}
}
ServerSessionEvent::VideoDataReceived {
app_name,
stream_key,
data,
timestamp,
} => {
(on_video)(app_name, stream_key, data.clone(), timestamp.value);
}
ServerSessionEvent::AudioDataReceived {
app_name,
stream_key,
data,
timestamp,
} => {
(on_audio)(app_name, stream_key, data.clone(), timestamp.value);
}
ServerSessionEvent::PublishStreamFinished { app_name, stream_key } => {
info!(app = %app_name, key = %stream_key, "RTMP publish finished");
(on_unpublish)(app_name, stream_key);
current_key.clear();
}
ServerSessionEvent::StreamMetadataChanged {
app_name,
stream_key,
metadata,
} => {
debug!(
app = %app_name,
key = %stream_key,
video_width = ?metadata.video_width,
video_height = ?metadata.video_height,
video_codec_id = ?metadata.video_codec_id,
audio_codec_id = ?metadata.audio_codec_id,
"stream metadata received"
);
if let Some(id) = metadata.video_codec_id
&& id != 7
{
warn!(
app = %app_name,
key = %stream_key,
video_codec_id = id,
"RTMP publisher advertises non-H.264 video codec; rejecting publish"
);
metrics::counter!(
"lvqr_rtmp_unsupported_codec_total",
"kind" => "video",
"codec_id" => id.to_string(),
)
.increment(1);
let description =
format!("unsupported video codec_id {id}; lvqr ingests H.264 (codec_id 7) only");
match session.finish_publishing_with_error("NetStream.Publish.BadName", &description) {
Ok(Some((packet, _key))) => {
let _ = stream.write_all(&packet.bytes).await;
}
Ok(None) => {
debug!(app = %app_name, key = %stream_key, "no active publish to reject");
}
Err(e) => {
warn!(error = ?e, "failed to serialize onStatus reject");
}
}
metrics::counter!(
"lvqr_rtmp_publish_rejected_total",
"kind" => "video",
"codec_id" => id.to_string(),
)
.increment(1);
if !current_app.is_empty() && !current_key.is_empty() {
(on_unpublish)(¤t_app, ¤t_key);
}
return Ok(());
}
if let Some(id) = metadata.audio_codec_id
&& id != 10
{
warn!(
app = %app_name,
key = %stream_key,
audio_codec_id = id,
"RTMP publisher advertises non-AAC audio codec; downstream depacketization will corrupt"
);
metrics::counter!(
"lvqr_rtmp_unsupported_codec_total",
"kind" => "audio",
"codec_id" => id.to_string(),
)
.increment(1);
}
}
ServerSessionEvent::Amf0DataReceived {
app_name,
stream_key,
data,
} => {
if let Some(section) = parse_oncuepoint_scte35(data) {
metrics::counter!(
"lvqr_scte35_events_total",
"ingest" => "rtmp",
"command" => "oncuepoint",
)
.increment(1);
if let Some(ref cb) = on_scte35 {
(cb)(app_name, stream_key, section);
} else {
debug!(
app = %app_name,
key = %stream_key,
"RTMP scte35-bin64 onCuePoint received but no callback installed; dropping"
);
}
} else {
debug!(
app = %app_name,
key = %stream_key,
first = ?data.first(),
"RTMP AMF0 data not an scte35-bin64 onCuePoint; ignoring"
);
}
}
_ => {
debug!(event = ?event, "unhandled RTMP event");
}
},
ServerSessionResult::UnhandleableMessageReceived(_) => {
debug!("received unhandleable RTMP message");
}
}
}
}
}
async fn process_session_results(
stream: &mut TcpStream,
_session: &ServerSession,
results: &[ServerSessionResult],
_on_video: &MediaCallback,
_on_audio: &MediaCallback,
_on_publish: &StreamCallback,
_on_unpublish: &StreamCallback,
) -> Result<(), IngestError> {
for result in results {
if let ServerSessionResult::OutboundResponse(packet) = result {
stream.write_all(&packet.bytes).await?;
}
}
Ok(())
}
fn parse_oncuepoint_scte35(values: &[rml_amf0::Amf0Value]) -> Option<Bytes> {
use base64::{Engine as _, engine::general_purpose::STANDARD};
use rml_amf0::Amf0Value;
if values.len() < 2 {
return None;
}
let method = match &values[0] {
Amf0Value::Utf8String(s) => s,
_ => return None,
};
if method != "onCuePoint" {
return None;
}
let obj = match &values[1] {
Amf0Value::Object(props) => props,
_ => return None,
};
let name = obj.get("name").and_then(|v| match v {
Amf0Value::Utf8String(s) => Some(s.as_str()),
_ => None,
});
if name != Some("scte35-bin64") {
return None;
}
let b64 = obj.get("data").and_then(|v| match v {
Amf0Value::Utf8String(s) => Some(s.as_str()),
_ => None,
})?;
let decoded = STANDARD.decode(b64).ok()?;
if decoded.is_empty() {
return None;
}
Some(Bytes::from(decoded))
}
pub fn is_keyframe(data: &[u8]) -> bool {
if data.is_empty() {
return false;
}
(data[0] >> 4) == 1
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn keyframe_detection() {
assert!(is_keyframe(&[0x17, 0x01, 0x00, 0x00]));
assert!(!is_keyframe(&[0x27, 0x01, 0x00, 0x00]));
assert!(!is_keyframe(&[]));
assert!(is_keyframe(&[0x14, 0x00]));
}
#[test]
fn default_config() {
let config = RtmpConfig::default();
assert_eq!(config.bind_addr.port(), 1935);
}
fn mk_scte35_oncuepoint(b64: &str) -> Vec<rml_amf0::Amf0Value> {
use rml_amf0::Amf0Value;
use std::collections::HashMap;
let mut obj = HashMap::new();
obj.insert("name".into(), Amf0Value::Utf8String("scte35-bin64".into()));
obj.insert("data".into(), Amf0Value::Utf8String(b64.into()));
obj.insert("type".into(), Amf0Value::Utf8String("scte-35".into()));
vec![Amf0Value::Utf8String("onCuePoint".into()), Amf0Value::Object(obj)]
}
#[test]
fn parses_well_formed_oncuepoint_scte35_bin64() {
let b64 = "/DARAA=="; let values = mk_scte35_oncuepoint(b64);
let raw = parse_oncuepoint_scte35(&values).expect("parses");
assert_eq!(&raw[..], &[0xFC, 0x30, 0x11, 0x00]);
}
#[test]
fn rejects_oncuepoint_without_scte35_name() {
use rml_amf0::Amf0Value;
use std::collections::HashMap;
let mut obj = HashMap::new();
obj.insert("name".into(), Amf0Value::Utf8String("other-cue".into()));
obj.insert("data".into(), Amf0Value::Utf8String("/DARAA==".into()));
let values = vec![Amf0Value::Utf8String("onCuePoint".into()), Amf0Value::Object(obj)];
assert!(parse_oncuepoint_scte35(&values).is_none());
}
#[test]
fn rejects_non_oncuepoint_method() {
use rml_amf0::Amf0Value;
let values = vec![Amf0Value::Utf8String("onMetaData".into()), Amf0Value::Null];
assert!(parse_oncuepoint_scte35(&values).is_none());
}
#[test]
fn rejects_empty_base64_payload() {
let values = mk_scte35_oncuepoint("");
assert!(parse_oncuepoint_scte35(&values).is_none());
}
#[test]
fn rejects_invalid_base64() {
let values = mk_scte35_oncuepoint("!!!not-valid-base64!!!");
assert!(parse_oncuepoint_scte35(&values).is_none());
}
}