Skip to main content

arcly_stream/protocol/rtsp/
mod.rs

1//! Native RTSP ingest handler (feature `rtsp`).
2//!
3//! Pulls live media from RTSP sources — IP cameras, hardware encoders, and
4//! restreamers — and bridges it onto the engine bus through the standard
5//! [`InboundProtocol`] seam. The handler acts as an **RTSP client**: for each
6//! configured [`RtspSource`] it drives the session state machine
7//! (`OPTIONS → DESCRIBE → SETUP → PLAY → TEARDOWN`), then depacketizes the RTP
8//! media into Annex-B access units and publishes them.
9//!
10//! # Transport
11//!
12//! TCP-interleaved transport (RTP-over-RTSP, RFC 2326 §10.12) is the default and
13//! the most camera-compatible: media and control share the one TCP connection,
14//! so it traverses NAT and firewalls that block the classic UDP transport. The
15//! [interleaved framing][InterleavedFrame] (`$ channel length …`) is parsed
16//! here; the RTP payloads feed the shared [`H264Depacketizer`].
17//!
18//! [`InboundProtocol`]: crate::inbound::InboundProtocol
19//! [`H264Depacketizer`]: crate::protocol::rtp::H264Depacketizer
20//!
21//! # Async behavior & teardown
22//!
23//! [`serve`](crate::inbound::InboundProtocol::serve) spawns one pull task per
24//! source and supervises them until `shutdown` fires, at which point each task
25//! issues `TEARDOWN` and releases its [`PublishSession`](crate::inbound::PublishSession). A source that drops is
26//! retried with backoff so a flaky camera link self-heals.
27//!
28//! # Scope
29//!
30//! The message, SDP, and interleaved-framing parsers are complete and unit
31//! tested. Digest authentication and ONVIF discovery are out of scope for the
32//! kernel — a host that needs them resolves the authenticated URL and passes it
33//! in via [`RtspSource`].
34
35mod message;
36mod sdp;
37
38pub use message::{InterleavedFrame, RtspMethod, RtspRequest, RtspResponse};
39pub use sdp::{MediaDescription, Sdp};
40
41use crate::inbound::{InboundProtocol, IngestContext};
42use crate::protocol::rtp::{AacDepacketizer, H264Depacketizer, RtpHeader};
43use crate::{CodecId, MediaFrame, Result, StreamKey};
44use async_trait::async_trait;
45use std::time::Duration;
46use tokio_util::sync::CancellationToken;
47use tracing::{debug, warn};
48
49/// One RTSP source to pull and the stream key it publishes to.
50#[derive(Debug, Clone)]
51pub struct RtspSource {
52    /// Absolute `rtsp://` URL (credentials, if any, embedded by the host).
53    pub url: String,
54    /// Engine stream key the pulled media is published under.
55    pub key: StreamKey,
56}
57
58impl RtspSource {
59    /// A source pulling `url` and publishing it as `key`.
60    pub fn new(url: impl Into<String>, key: StreamKey) -> Self {
61        Self {
62            url: url.into(),
63            key,
64        }
65    }
66}
67
68/// RTSP ingest worker — pulls every configured [`RtspSource`] concurrently.
69#[derive(Debug)]
70pub struct RtspHandler {
71    sources: Vec<RtspSource>,
72    retry_backoff: Duration,
73}
74
75impl Default for RtspHandler {
76    fn default() -> Self {
77        Self::new()
78    }
79}
80
81impl RtspHandler {
82    /// A handler with no sources. Add them with [`source`](Self::source).
83    pub fn new() -> Self {
84        Self {
85            sources: Vec::new(),
86            retry_backoff: Duration::from_secs(3),
87        }
88    }
89
90    /// Register a source to pull.
91    pub fn source(mut self, source: RtspSource) -> Self {
92        self.sources.push(source);
93        self
94    }
95
96    /// Override the reconnect backoff applied after a source drops (default 3s).
97    pub fn retry_backoff(mut self, backoff: Duration) -> Self {
98        self.retry_backoff = backoff;
99        self
100    }
101
102    /// Pull one source until `shutdown`, reconnecting on failure. Owned
103    /// arguments so each source runs on its own spawned task.
104    async fn run_source(
105        source: RtspSource,
106        ctx: IngestContext,
107        shutdown: CancellationToken,
108        backoff: Duration,
109    ) {
110        loop {
111            if shutdown.is_cancelled() {
112                return;
113            }
114            if let Err(e) = Self::pull_once(&source, &ctx, &shutdown).await {
115                warn!(url = %source.url, error = %e, "rtsp source dropped; will retry");
116            }
117            tokio::select! {
118                _ = shutdown.cancelled() => return,
119                _ = tokio::time::sleep(backoff) => {}
120            }
121        }
122    }
123
124    /// One full pull session for `source`. Connects, negotiates, and streams
125    /// interleaved RTP until the link drops or `shutdown` fires.
126    async fn pull_once(
127        source: &RtspSource,
128        ctx: &IngestContext,
129        shutdown: &CancellationToken,
130    ) -> Result<()> {
131        use tokio::io::{AsyncReadExt, AsyncWriteExt};
132        use tokio::net::TcpStream;
133
134        let (host, port) = message::host_port(&source.url)
135            .ok_or_else(|| crate::StreamError::protocol("malformed rtsp url"))?;
136        let mut stream = TcpStream::connect((host.as_str(), port)).await?;
137        let mut cseq = 1u32;
138
139        // OPTIONS → DESCRIBE → SETUP → PLAY.
140        message::write_request(&mut stream, RtspMethod::Options, &source.url, cseq, &[]).await?;
141        let _ = message::read_response(&mut stream).await?;
142        cseq += 1;
143
144        message::write_request(
145            &mut stream,
146            RtspMethod::Describe,
147            &source.url,
148            cseq,
149            &[("Accept", "application/sdp")],
150        )
151        .await?;
152        let describe = message::read_response(&mut stream).await?;
153        let sdp = Sdp::parse(&describe.body);
154        debug!(url = %source.url, media = sdp.media.len(), "rtsp DESCRIBE parsed");
155        cseq += 1;
156
157        // SETUP the first video track over interleaved channels 0/1.
158        let setup_url = sdp
159            .first_video_control(&source.url)
160            .unwrap_or_else(|| source.url.clone());
161        message::write_request(
162            &mut stream,
163            RtspMethod::Setup,
164            &setup_url,
165            cseq,
166            &[("Transport", "RTP/AVP/TCP;unicast;interleaved=0-1")],
167        )
168        .await?;
169        let setup = message::read_response(&mut stream).await?;
170        let session_id = message::session_id(&setup).unwrap_or_default();
171        cseq += 1;
172
173        // If the session offers an AAC audio track, SETUP it on channels 2/3.
174        let mut audio_clock = None;
175        if sdp.has_aac_audio() {
176            if let Some(audio_url) = sdp.first_audio_control(&source.url) {
177                message::write_request(
178                    &mut stream,
179                    RtspMethod::Setup,
180                    &audio_url,
181                    cseq,
182                    &[
183                        ("Transport", "RTP/AVP/TCP;unicast;interleaved=2-3"),
184                        ("Session", &session_id),
185                    ],
186                )
187                .await?;
188                let _ = message::read_response(&mut stream).await?;
189                cseq += 1;
190                audio_clock = sdp
191                    .media
192                    .iter()
193                    .find(|m| m.media == "audio")
194                    .and_then(|m| m.clock_rate)
195                    .or(Some(48_000));
196                debug!(url = %source.url, "rtsp AAC audio track set up on ch 2/3");
197            }
198        }
199
200        message::write_request(
201            &mut stream,
202            RtspMethod::Play,
203            &source.url,
204            cseq,
205            &[("Session", &session_id)],
206        )
207        .await?;
208        let _ = message::read_response(&mut stream).await?;
209
210        // Stream interleaved RTP → depacketize → publish.
211        let session = ctx.open_publish(source.key.clone()).await?;
212        let mut depack = H264Depacketizer::new();
213        let (size_len, index_len) = sdp.audio_aac_lengths();
214        let aac = AacDepacketizer::with_lengths(size_len, index_len);
215        let mut buf = Vec::with_capacity(64 * 1024);
216        let mut read = [0u8; 16 * 1024];
217
218        loop {
219            tokio::select! {
220                _ = shutdown.cancelled() => break,
221                n = stream.read(&mut read) => {
222                    let n = n?;
223                    if n == 0 { break; }
224                    buf.extend_from_slice(&read[..n]);
225                    Self::drain_interleaved(&mut buf, &mut depack, &aac, audio_clock, &session)?;
226                }
227            }
228        }
229
230        // Best-effort TEARDOWN, then release the publish slot.
231        cseq += 1;
232        let _ = message::write_request(
233            &mut stream,
234            RtspMethod::Teardown,
235            &source.url,
236            cseq,
237            &[("Session", &session_id)],
238        )
239        .await;
240        let _ = stream.shutdown().await;
241        session.finish().await
242    }
243
244    /// Consume whole interleaved frames from `buf`, depacketizing channel-0 video
245    /// RTP and (when `audio_clock` is set) channel-2 AAC RTP into frames and
246    /// publishing them. Leaves any partial frame in `buf`.
247    fn drain_interleaved(
248        buf: &mut Vec<u8>,
249        depack: &mut H264Depacketizer,
250        aac: &AacDepacketizer,
251        audio_clock: Option<u32>,
252        session: &crate::inbound::PublishSession,
253    ) -> Result<()> {
254        let mut consumed = 0;
255        while let Some((frame, len)) = InterleavedFrame::parse(&buf[consumed..]) {
256            consumed += len;
257            let Some(header) = RtpHeader::parse(frame.payload) else {
258                continue;
259            };
260            let payload = &frame.payload[header.payload_offset..];
261            // Channel 0 = video RTP, channel 2 = audio RTP; odd channels are
262            // RTCP, ignored on ingest.
263            match frame.channel {
264                0 => {
265                    match depack.push(payload, header.marker, header.timestamp, header.sequence) {
266                        Ok(Some(au)) => {
267                            let pts = (au.timestamp / 90) as i64; // 90 kHz → ms
268                            let mf = MediaFrame::new_video(
269                                pts,
270                                pts,
271                                au.data,
272                                CodecId::H264,
273                                au.keyframe,
274                            );
275                            let _ = session.publish_frame(mf)?;
276                        }
277                        Ok(None) => {}
278                        Err(e) => debug!(?e, "rtp depacketize skip"),
279                    }
280                }
281                2 => {
282                    if let Some(clock) = audio_clock {
283                        match aac.push(payload) {
284                            Ok(units) => {
285                                for au in units {
286                                    let pts =
287                                        (header.timestamp as i64 * 1000) / clock.max(1) as i64;
288                                    let mf = MediaFrame::new_audio(pts, au, CodecId::AAC);
289                                    let _ = session.publish_frame(mf)?;
290                                }
291                            }
292                            Err(e) => debug!(?e, "aac depacketize skip"),
293                        }
294                    }
295                }
296                _ => {}
297            }
298        }
299        buf.drain(..consumed);
300        Ok(())
301    }
302}
303
304#[async_trait]
305impl InboundProtocol for RtspHandler {
306    fn name(&self) -> &'static str {
307        "rtsp"
308    }
309
310    async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
311        // Pull every source concurrently on its own task; await them all so the
312        // worker only returns once every source has drained on shutdown.
313        let mut tasks = tokio::task::JoinSet::new();
314        for source in &self.sources {
315            tasks.spawn(Self::run_source(
316                source.clone(),
317                ctx.clone(),
318                shutdown.clone(),
319                self.retry_backoff,
320            ));
321        }
322        while tasks.join_next().await.is_some() {}
323        Ok(())
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    #[test]
332    fn source_builder_sets_url_and_key() {
333        let s = RtspSource::new("rtsp://cam/stream", StreamKey::new("live", "cam1"));
334        assert_eq!(s.url, "rtsp://cam/stream");
335        assert_eq!(s.key.stream_id.as_str(), "cam1");
336    }
337
338    #[test]
339    fn handler_collects_sources() {
340        let h = RtspHandler::new()
341            .source(RtspSource::new("rtsp://a/1", StreamKey::new("live", "a")))
342            .source(RtspSource::new("rtsp://b/2", StreamKey::new("live", "b")));
343        assert_eq!(h.sources.len(), 2);
344        assert_eq!(h.name(), "rtsp");
345    }
346}