mod message;
mod sdp;
pub use message::{InterleavedFrame, RtspMethod, RtspRequest, RtspResponse};
pub use sdp::{MediaDescription, Sdp};
use crate::inbound::{InboundProtocol, IngestContext};
use crate::protocol::rtp::{AacDepacketizer, H264Depacketizer, RtpHeader};
use crate::{CodecId, MediaFrame, Result, StreamKey};
use async_trait::async_trait;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
#[derive(Debug, Clone)]
pub struct RtspSource {
pub url: String,
pub key: StreamKey,
}
impl RtspSource {
pub fn new(url: impl Into<String>, key: StreamKey) -> Self {
Self {
url: url.into(),
key,
}
}
}
#[derive(Debug)]
pub struct RtspHandler {
sources: Vec<RtspSource>,
retry_backoff: Duration,
}
impl Default for RtspHandler {
fn default() -> Self {
Self::new()
}
}
impl RtspHandler {
pub fn new() -> Self {
Self {
sources: Vec::new(),
retry_backoff: Duration::from_secs(3),
}
}
pub fn source(mut self, source: RtspSource) -> Self {
self.sources.push(source);
self
}
pub fn retry_backoff(mut self, backoff: Duration) -> Self {
self.retry_backoff = backoff;
self
}
async fn run_source(
source: RtspSource,
ctx: IngestContext,
shutdown: CancellationToken,
backoff: Duration,
) {
loop {
if shutdown.is_cancelled() {
return;
}
if let Err(e) = Self::pull_once(&source, &ctx, &shutdown).await {
warn!(url = %source.url, error = %e, "rtsp source dropped; will retry");
}
tokio::select! {
_ = shutdown.cancelled() => return,
_ = tokio::time::sleep(backoff) => {}
}
}
}
async fn pull_once(
source: &RtspSource,
ctx: &IngestContext,
shutdown: &CancellationToken,
) -> Result<()> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
let (host, port) = message::host_port(&source.url)
.ok_or_else(|| crate::StreamError::protocol("malformed rtsp url"))?;
let mut stream = TcpStream::connect((host.as_str(), port)).await?;
let mut cseq = 1u32;
message::write_request(&mut stream, RtspMethod::Options, &source.url, cseq, &[]).await?;
let _ = message::read_response(&mut stream).await?;
cseq += 1;
message::write_request(
&mut stream,
RtspMethod::Describe,
&source.url,
cseq,
&[("Accept", "application/sdp")],
)
.await?;
let describe = message::read_response(&mut stream).await?;
let sdp = Sdp::parse(&describe.body);
debug!(url = %source.url, media = sdp.media.len(), "rtsp DESCRIBE parsed");
cseq += 1;
let setup_url = sdp
.first_video_control(&source.url)
.unwrap_or_else(|| source.url.clone());
message::write_request(
&mut stream,
RtspMethod::Setup,
&setup_url,
cseq,
&[("Transport", "RTP/AVP/TCP;unicast;interleaved=0-1")],
)
.await?;
let setup = message::read_response(&mut stream).await?;
let session_id = message::session_id(&setup).unwrap_or_default();
cseq += 1;
let mut audio_clock = None;
if sdp.has_aac_audio() {
if let Some(audio_url) = sdp.first_audio_control(&source.url) {
message::write_request(
&mut stream,
RtspMethod::Setup,
&audio_url,
cseq,
&[
("Transport", "RTP/AVP/TCP;unicast;interleaved=2-3"),
("Session", &session_id),
],
)
.await?;
let _ = message::read_response(&mut stream).await?;
cseq += 1;
audio_clock = sdp
.media
.iter()
.find(|m| m.media == "audio")
.and_then(|m| m.clock_rate)
.or(Some(48_000));
debug!(url = %source.url, "rtsp AAC audio track set up on ch 2/3");
}
}
message::write_request(
&mut stream,
RtspMethod::Play,
&source.url,
cseq,
&[("Session", &session_id)],
)
.await?;
let _ = message::read_response(&mut stream).await?;
let session = ctx.open_publish(source.key.clone()).await?;
let mut depack = H264Depacketizer::new();
let aac = AacDepacketizer::new();
let mut buf = Vec::with_capacity(64 * 1024);
let mut read = [0u8; 16 * 1024];
loop {
tokio::select! {
_ = shutdown.cancelled() => break,
n = stream.read(&mut read) => {
let n = n?;
if n == 0 { break; }
buf.extend_from_slice(&read[..n]);
Self::drain_interleaved(&mut buf, &mut depack, &aac, audio_clock, &session)?;
}
}
}
cseq += 1;
let _ = message::write_request(
&mut stream,
RtspMethod::Teardown,
&source.url,
cseq,
&[("Session", &session_id)],
)
.await;
let _ = stream.shutdown().await;
session.finish().await
}
fn drain_interleaved(
buf: &mut Vec<u8>,
depack: &mut H264Depacketizer,
aac: &AacDepacketizer,
audio_clock: Option<u32>,
session: &crate::inbound::PublishSession,
) -> Result<()> {
let mut consumed = 0;
while let Some((frame, len)) = InterleavedFrame::parse(&buf[consumed..]) {
consumed += len;
let Some(header) = RtpHeader::parse(frame.payload) else {
continue;
};
let payload = &frame.payload[header.payload_offset..];
match frame.channel {
0 => {
match depack.push(payload, header.marker, header.timestamp, header.sequence) {
Ok(Some(au)) => {
let pts = (au.timestamp / 90) as i64; let mf = MediaFrame::new_video(
pts,
pts,
au.data,
CodecId::H264,
au.keyframe,
);
let _ = session.publish_frame(mf)?;
}
Ok(None) => {}
Err(e) => debug!(?e, "rtp depacketize skip"),
}
}
2 => {
if let Some(clock) = audio_clock {
match aac.push(payload) {
Ok(units) => {
for au in units {
let pts =
(header.timestamp as i64 * 1000) / clock.max(1) as i64;
let mf = MediaFrame::new_audio(pts, au, CodecId::AAC);
let _ = session.publish_frame(mf)?;
}
}
Err(e) => debug!(?e, "aac depacketize skip"),
}
}
}
_ => {}
}
}
buf.drain(..consumed);
Ok(())
}
}
#[async_trait]
impl InboundProtocol for RtspHandler {
fn name(&self) -> &'static str {
"rtsp"
}
async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
let mut tasks = tokio::task::JoinSet::new();
for source in &self.sources {
tasks.spawn(Self::run_source(
source.clone(),
ctx.clone(),
shutdown.clone(),
self.retry_backoff,
));
}
while tasks.join_next().await.is_some() {}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn source_builder_sets_url_and_key() {
let s = RtspSource::new("rtsp://cam/stream", StreamKey::new("live", "cam1"));
assert_eq!(s.url, "rtsp://cam/stream");
assert_eq!(s.key.stream_id.as_str(), "cam1");
}
#[test]
fn handler_collects_sources() {
let h = RtspHandler::new()
.source(RtspSource::new("rtsp://a/1", StreamKey::new("live", "a")))
.source(RtspSource::new("rtsp://b/2", StreamKey::new("live", "b")));
assert_eq!(h.sources.len(), 2);
assert_eq!(h.name(), "rtsp");
}
}