mod amf;
mod chunk;
mod flv;
mod handshake;
use crate::bus::{PlaybackRegistry, PublishRegistry, StreamHandle};
use crate::{MediaFrame, Result, StreamError, StreamKey};
use amf::Amf0Value;
use async_trait::async_trait;
use chunk::{ChunkReader, ChunkWriter, RtmpMessage};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
pub const DEFAULT_RTMP_PORT: u16 = 1935;
const OUT_CHUNK_SIZE: usize = 4096;
const MSG_SET_CHUNK_SIZE: u8 = 1;
const MSG_ACK: u8 = 3;
const MSG_USER_CONTROL: u8 = 4;
const MSG_WINDOW_ACK_SIZE: u8 = 5;
const MSG_SET_PEER_BANDWIDTH: u8 = 6;
const MSG_AUDIO: u8 = 8;
const MSG_VIDEO: u8 = 9;
const MSG_DATA_AMF3: u8 = 15;
const MSG_DATA_AMF0: u8 = 18;
const MSG_COMMAND_AMF3: u8 = 17;
const MSG_COMMAND_AMF0: u8 = 20;
const CSID_CONTROL: u8 = 2;
const CSID_COMMAND: u8 = 3;
const CSID_AUDIO: u8 = 4;
const CSID_VIDEO: u8 = 6;
const STREAM_ID: u32 = 1;
pub struct RtmpHandler {
addr: SocketAddr,
max_connections: usize,
playback: Option<Arc<dyn PlaybackRegistry>>,
}
impl RtmpHandler {
pub fn new(addr: SocketAddr) -> Self {
Self {
addr,
max_connections: 1024,
playback: None,
}
}
pub fn max_connections(mut self, max: usize) -> Self {
self.max_connections = max;
self
}
pub fn with_playback(mut self, playback: Arc<dyn PlaybackRegistry>) -> Self {
self.playback = Some(playback);
self
}
}
#[async_trait]
impl crate::ProtocolHandler for RtmpHandler {
fn name(&self) -> &'static str {
"rtmp"
}
async fn run(
&self,
registry: Arc<dyn PublishRegistry>,
shutdown: CancellationToken,
) -> Result<()> {
info!(addr = %self.addr, "rtmp handler listening");
let playback = self.playback.clone();
crate::protocol::run_tcp_ingest_server(
self.addr,
self.max_connections,
shutdown,
move |sock, peer| {
let registry = Arc::clone(®istry);
let playback = playback.clone();
async move {
if let Err(e) = handle_connection(sock, peer, registry, playback).await {
warn!(%peer, error = %e, "rtmp session ended with error");
}
}
},
)
.await
}
}
async fn handle_connection(
mut sock: TcpStream,
peer: SocketAddr,
registry: Arc<dyn PublishRegistry>,
playback: Option<Arc<dyn PlaybackRegistry>>,
) -> Result<()> {
handshake::accept(&mut sock).await?;
debug!(%peer, "rtmp handshake complete");
let (read_half, write_half) = sock.into_split();
let mut session = Session::new(
ChunkReader::new(read_half),
ChunkWriter::new(write_half),
registry,
playback,
);
session.run().await
}
#[derive(PartialEq)]
enum Role {
Pending,
Publishing,
Playing,
}
struct Session<R, W> {
reader: ChunkReader<R>,
writer: ChunkWriter<W>,
registry: Arc<dyn PublishRegistry>,
playback: Option<Arc<dyn PlaybackRegistry>>,
app: Option<String>,
role: Role,
publish_key: Option<StreamKey>,
handle: Option<StreamHandle>,
avc: Option<flv::AvcConfig>,
aac: Option<flv::AudioConfig>,
audio_seq_sent: bool,
stop: bool,
}
impl<R, W> Session<R, W>
where
R: tokio::io::AsyncRead + Unpin,
W: tokio::io::AsyncWrite + Unpin,
{
fn new(
reader: ChunkReader<R>,
writer: ChunkWriter<W>,
registry: Arc<dyn PublishRegistry>,
playback: Option<Arc<dyn PlaybackRegistry>>,
) -> Self {
Self {
reader,
writer,
registry,
playback,
app: None,
role: Role::Pending,
publish_key: None,
handle: None,
avc: None,
aac: None,
audio_seq_sent: false,
stop: false,
}
}
async fn run(&mut self) -> Result<()> {
let result = self.event_loop().await;
if let Some(key) = self.publish_key.take() {
let _ = self.registry.end_publish(&key).await;
}
match result {
Ok(()) => Ok(()),
Err(e) if is_disconnect(&e) => Ok(()),
Err(e) => Err(e),
}
}
async fn event_loop(&mut self) -> Result<()> {
while !self.stop {
let msg = self.reader.read_message().await?;
self.process(msg).await?;
}
Ok(())
}
async fn process(&mut self, msg: RtmpMessage) -> Result<()> {
match msg.type_id {
MSG_SET_CHUNK_SIZE => {
if let Ok(b) = <[u8; 4]>::try_from(&msg.payload[..msg.payload.len().min(4)]) {
self.reader.set_chunk_size(u32::from_be_bytes(b) as usize);
}
}
MSG_COMMAND_AMF0 => self.handle_command(&msg.payload).await?,
MSG_COMMAND_AMF3 => {
self.handle_command(&msg.payload[1.min(msg.payload.len())..])
.await?
}
MSG_VIDEO if self.role == Role::Publishing => self.on_video(&msg)?,
MSG_AUDIO if self.role == Role::Publishing => self.on_audio(&msg)?,
MSG_DATA_AMF0 | MSG_DATA_AMF3 => { }
MSG_ACK | MSG_WINDOW_ACK_SIZE | MSG_SET_PEER_BANDWIDTH | MSG_USER_CONTROL => {}
other => debug!(
type_id = other,
stream = msg.msg_stream_id,
"ignoring RTMP message"
),
}
Ok(())
}
async fn handle_command(&mut self, payload: &[u8]) -> Result<()> {
let values = amf::decode_all(payload);
let Some(name) = values.first().and_then(Amf0Value::as_str) else {
return Ok(());
};
let txn = values.get(1).and_then(Amf0Value::as_number).unwrap_or(0.0);
match name {
"connect" => self.on_connect(txn, values.get(2)).await,
"createStream" => self.on_create_stream(txn).await,
"publish" => self.on_publish(values.get(3)).await,
"play" => self.on_play(values.get(3)).await,
"releaseStream" | "FCPublish" | "FCUnpublish" | "deleteStream" | "closeStream"
| "FCSubscribe" => Ok(()),
other => {
debug!(command = other, "unhandled RTMP command");
Ok(())
}
}
}
async fn on_connect(&mut self, txn: f64, cmd_obj: Option<&Amf0Value>) -> Result<()> {
let app = cmd_obj
.and_then(|o| o.get("app"))
.and_then(Amf0Value::as_str)
.unwrap_or("")
.split('?')
.next()
.unwrap_or("")
.to_string();
self.app = Some(app);
self.send_control(MSG_WINDOW_ACK_SIZE, &2_500_000u32.to_be_bytes())
.await?;
let mut bw = Vec::from(2_500_000u32.to_be_bytes());
bw.push(2); self.send_control(MSG_SET_PEER_BANDWIDTH, &bw).await?;
self.send_control(MSG_SET_CHUNK_SIZE, &(OUT_CHUNK_SIZE as u32).to_be_bytes())
.await?;
self.writer.set_chunk_size(OUT_CHUNK_SIZE);
let props = amf::object(vec![
("fmsVer", amf::string("FMS/3,0,1,123")),
("capabilities", Amf0Value::Number(31.0)),
]);
let info = amf::object(vec![
("level", amf::string("status")),
("code", amf::string("NetConnection.Connect.Success")),
("description", amf::string("Connection succeeded.")),
("objectEncoding", Amf0Value::Number(0.0)),
]);
self.send_command(
0,
&[amf::string("_result"), Amf0Value::Number(txn), props, info],
)
.await
}
async fn on_create_stream(&mut self, txn: f64) -> Result<()> {
self.send_command(
0,
&[
amf::string("_result"),
Amf0Value::Number(txn),
Amf0Value::Null,
Amf0Value::Number(STREAM_ID as f64),
],
)
.await
}
async fn on_publish(&mut self, name: Option<&Amf0Value>) -> Result<()> {
let key = self.stream_key(name)?;
let handle = self.registry.start_publish(&key).await?;
info!(stream = %key, "rtmp publish started");
self.handle = Some(handle);
self.publish_key = Some(key);
self.role = Role::Publishing;
self.send_status("status", "NetStream.Publish.Start", "Publishing started.")
.await
}
async fn on_play(&mut self, name: Option<&Amf0Value>) -> Result<()> {
let key = self.stream_key(name)?;
let Some(playback) = self.playback.clone() else {
self.send_status("error", "NetStream.Play.Failed", "Playback not enabled.")
.await?;
self.stop = true;
return Ok(());
};
let handle = playback.get_stream(&key)?;
info!(stream = %key, "rtmp play started");
self.role = Role::Playing;
let mut begin = Vec::from(0u16.to_be_bytes()); begin.extend_from_slice(&STREAM_ID.to_be_bytes());
self.send_control(MSG_USER_CONTROL, &begin).await?;
self.send_status("status", "NetStream.Play.Reset", "Playing and resetting.")
.await?;
self.send_status("status", "NetStream.Play.Start", "Started playing.")
.await?;
self.serve_play(handle).await?;
self.stop = true;
Ok(())
}
fn on_video(&mut self, msg: &RtmpMessage) -> Result<()> {
let Some(header) = flv::parse_video_header(&msg.payload) else {
return Ok(()); };
let body = &msg.payload[header.body_offset..];
let ts = msg.timestamp as i64;
if header.avc_packet_type == flv::PKT_SEQUENCE_HEADER {
let cfg = flv::AvcConfig::parse(body)
.ok_or_else(|| StreamError::protocol("bad AVCDecoderConfigurationRecord"))?;
let mut frame =
MediaFrame::new_video(ts, ts, cfg.to_annexb(), crate::CodecId::H264, false);
frame.flags |= crate::FrameFlags::CONFIG;
self.avc = Some(cfg);
self.publish(frame);
} else if header.avc_packet_type == flv::PKT_RAW {
let Some(cfg) = self.avc.as_ref() else {
return Ok(()); };
let annexb = cfg
.avcc_to_annexb(body, header.is_keyframe)
.ok_or_else(|| StreamError::protocol("malformed AVCC NAL"))?;
let pts = ts + header.composition_time as i64;
let frame =
MediaFrame::new_video(pts, ts, annexb, crate::CodecId::H264, header.is_keyframe);
self.publish(frame);
}
Ok(())
}
fn on_audio(&mut self, msg: &RtmpMessage) -> Result<()> {
let payload = &msg.payload;
let Some(&b0) = payload.first() else {
return Ok(());
};
if b0 >> 4 != flv::AUDIO_FORMAT_AAC {
return Ok(()); }
let aac_packet_type = *payload.get(1).unwrap_or(&0);
let body = &payload[2.min(payload.len())..];
let ts = msg.timestamp as i64;
if aac_packet_type == flv::PKT_SEQUENCE_HEADER {
let cfg = flv::AudioConfig::parse(body)
.ok_or_else(|| StreamError::protocol("bad AudioSpecificConfig"))?;
self.aac = Some(cfg);
let mut frame =
MediaFrame::new_audio(ts, bytes::Bytes::copy_from_slice(body), crate::CodecId::AAC);
frame.flags |= crate::FrameFlags::CONFIG;
self.publish(frame);
} else if let Some(cfg) = self.aac.as_ref() {
let adts = cfg.to_adts(body);
self.publish(MediaFrame::new_audio(ts, adts, crate::CodecId::AAC));
}
Ok(())
}
fn publish(&self, frame: MediaFrame) {
if let Some(handle) = self.handle.as_ref() {
let _ = handle.publish_frame(frame);
}
}
async fn serve_play(&mut self, handle: StreamHandle) -> Result<()> {
for frame in handle.replay_buffer() {
self.send_media(&frame).await?;
}
let mut sub = handle.subscribe_resilient();
while let Some(frame) = sub.recv().await {
self.send_media(&frame).await?;
}
Ok(())
}
async fn send_media(&mut self, frame: &MediaFrame) -> Result<()> {
let is_config = frame.flags.contains(crate::FrameFlags::CONFIG);
match frame.codec {
crate::CodecId::H264 => {
let ts = frame.dts.max(0) as u32;
let tag = if is_config {
let cfg = annexb_to_avc_config(&frame.data);
flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &cfg.to_avc_record())
} else {
let avcc = crate::codec::h264::annexb_to_avcc(&frame.data);
let cts = (frame.pts - frame.dts) as i32;
flv::build_video_tag(frame.is_keyframe(), flv::PKT_RAW, cts, &avcc)
};
self.writer
.write_message(CSID_VIDEO, MSG_VIDEO, ts, STREAM_ID, &tag)
.await
}
crate::CodecId::AAC => {
let ts = frame.pts.max(0) as u32;
if is_config {
self.audio_seq_sent = true;
let tag = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &frame.data);
return self
.writer
.write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
.await;
}
if !self.audio_seq_sent {
if let Some(cfg) = flv::AudioConfig::from_adts(&frame.data) {
let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &cfg.to_asc());
self.writer
.write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &seq)
.await?;
self.audio_seq_sent = true;
}
}
let raw = frame.data.get(7..).unwrap_or(&[]);
let tag = flv::build_audio_tag(flv::PKT_RAW, raw);
self.writer
.write_message(CSID_AUDIO, MSG_AUDIO, ts, STREAM_ID, &tag)
.await
}
_ => Ok(()), }
}
fn stream_key(&self, name: Option<&Amf0Value>) -> Result<StreamKey> {
let app = self
.app
.clone()
.ok_or_else(|| StreamError::protocol("stream command before connect"))?;
let stream = name
.and_then(Amf0Value::as_str)
.ok_or_else(|| StreamError::protocol("missing stream name"))?
.split('?')
.next()
.unwrap_or("")
.to_string();
Ok(StreamKey::new(app, stream))
}
async fn send_control(&mut self, type_id: u8, payload: &[u8]) -> Result<()> {
self.writer
.write_message(CSID_CONTROL, type_id, 0, 0, payload)
.await
}
async fn send_command(&mut self, msg_stream_id: u32, values: &[Amf0Value]) -> Result<()> {
let mut buf = bytes::BytesMut::new();
for v in values {
v.encode(&mut buf);
}
self.writer
.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, msg_stream_id, &buf)
.await
}
async fn send_status(&mut self, level: &str, code: &str, description: &str) -> Result<()> {
let info = amf::object(vec![
("level", amf::string(level)),
("code", amf::string(code)),
("description", amf::string(description)),
]);
self.send_command(
STREAM_ID,
&[
amf::string("onStatus"),
Amf0Value::Number(0.0),
Amf0Value::Null,
info,
],
)
.await
}
}
fn annexb_to_avc_config(annexb: &[u8]) -> flv::AvcConfig {
let mut cfg = flv::AvcConfig {
nal_length_size: 4,
..Default::default()
};
for nal in crate::codec::h264::iter_nals_annexb(annexb) {
match nal.first().map(|b| b & 0x1F) {
Some(7) => cfg.sps.push(nal.to_vec()),
Some(8) => cfg.pps.push(nal.to_vec()),
_ => {}
}
}
cfg
}
fn is_disconnect(e: &StreamError) -> bool {
matches!(e, StreamError::Io(io) if matches!(
io.kind(),
std::io::ErrorKind::UnexpectedEof
| std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::BrokenPipe
| std::io::ErrorKind::ConnectionAborted
))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{AppSpec, Engine};
use bytes::Bytes;
fn avcc_idr() -> Vec<u8> {
let slice = [0x65u8, 0x88, 0x84, 0x00];
let mut v = (slice.len() as u32).to_be_bytes().to_vec();
v.extend_from_slice(&slice);
v
}
fn avc_decoder_config() -> Vec<u8> {
let sps = [0x67u8, 0x42, 0x00, 0x1F, 0xAA];
let pps = [0x68u8, 0xCE, 0x3C, 0x80];
let mut r = vec![1, sps[1], sps[2], sps[3], 0xFF, 0xE1];
r.extend_from_slice(&(sps.len() as u16).to_be_bytes());
r.extend_from_slice(&sps);
r.push(1);
r.extend_from_slice(&(pps.len() as u16).to_be_bytes());
r.extend_from_slice(&pps);
r
}
async fn publishing_session() -> (Session<tokio::io::Empty, tokio::io::Sink>, Arc<Engine>) {
let engine = Engine::builder()
.application(AppSpec::new("live").gop_cache(64))
.build();
let key = StreamKey::new("live", "cam");
let handle = engine.start_publish(&key).await.unwrap();
let mut session = Session::new(
ChunkReader::new(tokio::io::empty()),
ChunkWriter::new(tokio::io::sink()),
engine.clone(),
Some(engine.clone()),
);
session.app = Some("live".into());
session.handle = Some(handle);
session.role = Role::Publishing;
(session, engine)
}
#[tokio::test]
async fn publish_video_seq_header_then_keyframe_reaches_gop_cache() {
let (mut session, _engine) = publishing_session().await;
let seq = flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
session
.on_video(&RtmpMessage {
type_id: MSG_VIDEO,
timestamp: 0,
msg_stream_id: STREAM_ID,
payload: seq,
})
.unwrap();
let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
session
.on_video(&RtmpMessage {
type_id: MSG_VIDEO,
timestamp: 40,
msg_stream_id: STREAM_ID,
payload: kf,
})
.unwrap();
let handle = session.handle.as_ref().unwrap();
let (vcfg, _) = handle.cached_configs();
let vcfg = vcfg.expect("video config cached");
assert!(vcfg.flags.contains(crate::FrameFlags::CONFIG));
assert_eq!(&vcfg.data[..5], &[0, 0, 0, 1, 0x67]);
let replay = handle.replay_buffer();
let key = replay
.iter()
.find(|f| f.is_keyframe())
.expect("keyframe in GOP");
assert_eq!(&key.data[..5], &[0, 0, 0, 1, 0x67]);
}
#[tokio::test]
async fn publish_aac_builds_adts_frames() {
let (mut session, _engine) = publishing_session().await;
let asc = [0x12u8, 0x10];
let seq = flv::build_audio_tag(flv::PKT_SEQUENCE_HEADER, &asc);
session
.on_audio(&RtmpMessage {
type_id: MSG_AUDIO,
timestamp: 0,
msg_stream_id: STREAM_ID,
payload: seq,
})
.unwrap();
let raw = flv::build_audio_tag(flv::PKT_RAW, &[0x01, 0x02, 0x03]);
session
.on_audio(&RtmpMessage {
type_id: MSG_AUDIO,
timestamp: 23,
msg_stream_id: STREAM_ID,
payload: raw,
})
.unwrap();
let handle = session.handle.as_ref().unwrap();
let (_, acfg) = handle.cached_configs();
assert_eq!(&acfg.unwrap().data[..], &asc);
}
#[tokio::test]
async fn full_publish_session_drives_real_chunk_reader() {
use tokio::io::AsyncReadExt;
let engine = Engine::builder()
.application(AppSpec::new("live").gop_cache(64))
.build();
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let engine_for_server = engine.clone();
let server = tokio::spawn(async move {
let (sock, _) = listener.accept().await.unwrap();
let (r, w) = sock.into_split();
let mut session = Session::new(
ChunkReader::new(r),
ChunkWriter::new(w),
engine_for_server,
None,
);
session.run().await.unwrap(); let handle = session.handle.as_ref().expect("publish handle");
let has_config = handle.cached_configs().0.is_some();
let has_keyframe = handle.replay_buffer().iter().any(|f| f.is_keyframe());
(has_config, has_keyframe)
});
let client = tokio::spawn(async move {
let stream = tokio::net::TcpStream::connect(addr).await.unwrap();
let (mut read_half, write_half) = stream.into_split();
tokio::spawn(async move {
let mut buf = [0u8; 4096];
while let Ok(n) = read_half.read(&mut buf).await {
if n == 0 {
break;
}
}
});
let cmd = |values: &[Amf0Value]| {
let mut b = bytes::BytesMut::new();
for v in values {
v.encode(&mut b);
}
b
};
let mut w = ChunkWriter::new(write_half);
let connect = cmd(&[
amf::string("connect"),
Amf0Value::Number(1.0),
amf::object(vec![("app", amf::string("live"))]),
]);
w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &connect)
.await
.unwrap();
let create = cmd(&[
amf::string("createStream"),
Amf0Value::Number(2.0),
Amf0Value::Null,
]);
w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &create)
.await
.unwrap();
let publish = cmd(&[
amf::string("publish"),
Amf0Value::Number(3.0),
Amf0Value::Null,
amf::string("cam"),
amf::string("live"),
]);
w.write_message(CSID_COMMAND, MSG_COMMAND_AMF0, 0, 0, &publish)
.await
.unwrap();
let seq =
flv::build_video_tag(false, flv::PKT_SEQUENCE_HEADER, 0, &avc_decoder_config());
w.write_message(CSID_VIDEO, MSG_VIDEO, 0, STREAM_ID, &seq)
.await
.unwrap();
let kf = flv::build_video_tag(true, flv::PKT_RAW, 0, &avcc_idr());
w.write_message(CSID_VIDEO, MSG_VIDEO, 40, STREAM_ID, &kf)
.await
.unwrap();
});
client.await.unwrap();
let (has_config, has_keyframe) = server.await.unwrap();
assert!(has_config, "video config reached the engine over the wire");
assert!(has_keyframe, "keyframe reached the GOP cache over the wire");
}
#[test]
fn annexb_config_splits_sps_and_pps() {
let annexb = Bytes::from_static(&[0, 0, 0, 1, 0x67, 0x42, 0x00, 1, 0, 0, 0, 1, 0x68, 0xCE]);
let cfg = annexb_to_avc_config(&annexb);
assert_eq!(cfg.sps.len(), 1);
assert_eq!(cfg.pps.len(), 1);
assert_eq!(cfg.sps[0][0] & 0x1F, 7);
assert_eq!(cfg.pps[0][0] & 0x1F, 8);
}
}