arcly_stream/protocol/rtsp/
mod.rs1mod message;
36mod sdp;
37
38pub use message::{InterleavedFrame, RtspMethod, RtspRequest, RtspResponse};
39pub use sdp::{MediaDescription, Sdp};
40
41use crate::inbound::{InboundProtocol, IngestContext};
42use crate::protocol::rtp::{AacDepacketizer, H264Depacketizer, RtpHeader};
43use crate::{CodecId, MediaFrame, Result, StreamKey};
44use async_trait::async_trait;
45use std::time::Duration;
46use tokio_util::sync::CancellationToken;
47use tracing::{debug, warn};
48
49#[derive(Debug, Clone)]
51pub struct RtspSource {
52 pub url: String,
54 pub key: StreamKey,
56}
57
58impl RtspSource {
59 pub fn new(url: impl Into<String>, key: StreamKey) -> Self {
61 Self {
62 url: url.into(),
63 key,
64 }
65 }
66}
67
68#[derive(Debug)]
70pub struct RtspHandler {
71 sources: Vec<RtspSource>,
72 retry_backoff: Duration,
73}
74
75impl Default for RtspHandler {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81impl RtspHandler {
82 pub fn new() -> Self {
84 Self {
85 sources: Vec::new(),
86 retry_backoff: Duration::from_secs(3),
87 }
88 }
89
90 pub fn source(mut self, source: RtspSource) -> Self {
92 self.sources.push(source);
93 self
94 }
95
96 pub fn retry_backoff(mut self, backoff: Duration) -> Self {
98 self.retry_backoff = backoff;
99 self
100 }
101
102 async fn run_source(
105 source: RtspSource,
106 ctx: IngestContext,
107 shutdown: CancellationToken,
108 backoff: Duration,
109 ) {
110 loop {
111 if shutdown.is_cancelled() {
112 return;
113 }
114 if let Err(e) = Self::pull_once(&source, &ctx, &shutdown).await {
115 warn!(url = %source.url, error = %e, "rtsp source dropped; will retry");
116 }
117 tokio::select! {
118 _ = shutdown.cancelled() => return,
119 _ = tokio::time::sleep(backoff) => {}
120 }
121 }
122 }
123
124 async fn pull_once(
127 source: &RtspSource,
128 ctx: &IngestContext,
129 shutdown: &CancellationToken,
130 ) -> Result<()> {
131 use tokio::io::{AsyncReadExt, AsyncWriteExt};
132 use tokio::net::TcpStream;
133
134 let (host, port) = message::host_port(&source.url)
135 .ok_or_else(|| crate::StreamError::protocol("malformed rtsp url"))?;
136 let mut stream = TcpStream::connect((host.as_str(), port)).await?;
137 let mut cseq = 1u32;
138
139 message::write_request(&mut stream, RtspMethod::Options, &source.url, cseq, &[]).await?;
141 let _ = message::read_response(&mut stream).await?;
142 cseq += 1;
143
144 message::write_request(
145 &mut stream,
146 RtspMethod::Describe,
147 &source.url,
148 cseq,
149 &[("Accept", "application/sdp")],
150 )
151 .await?;
152 let describe = message::read_response(&mut stream).await?;
153 let sdp = Sdp::parse(&describe.body);
154 debug!(url = %source.url, media = sdp.media.len(), "rtsp DESCRIBE parsed");
155 cseq += 1;
156
157 let setup_url = sdp
159 .first_video_control(&source.url)
160 .unwrap_or_else(|| source.url.clone());
161 message::write_request(
162 &mut stream,
163 RtspMethod::Setup,
164 &setup_url,
165 cseq,
166 &[("Transport", "RTP/AVP/TCP;unicast;interleaved=0-1")],
167 )
168 .await?;
169 let setup = message::read_response(&mut stream).await?;
170 let session_id = message::session_id(&setup).unwrap_or_default();
171 cseq += 1;
172
173 let mut audio_clock = None;
175 if sdp.has_aac_audio() {
176 if let Some(audio_url) = sdp.first_audio_control(&source.url) {
177 message::write_request(
178 &mut stream,
179 RtspMethod::Setup,
180 &audio_url,
181 cseq,
182 &[
183 ("Transport", "RTP/AVP/TCP;unicast;interleaved=2-3"),
184 ("Session", &session_id),
185 ],
186 )
187 .await?;
188 let _ = message::read_response(&mut stream).await?;
189 cseq += 1;
190 audio_clock = sdp
191 .media
192 .iter()
193 .find(|m| m.media == "audio")
194 .and_then(|m| m.clock_rate)
195 .or(Some(48_000));
196 debug!(url = %source.url, "rtsp AAC audio track set up on ch 2/3");
197 }
198 }
199
200 message::write_request(
201 &mut stream,
202 RtspMethod::Play,
203 &source.url,
204 cseq,
205 &[("Session", &session_id)],
206 )
207 .await?;
208 let _ = message::read_response(&mut stream).await?;
209
210 let session = ctx.open_publish(source.key.clone()).await?;
212 let mut depack = H264Depacketizer::new();
213 let (size_len, index_len) = sdp.audio_aac_lengths();
214 let aac = AacDepacketizer::with_lengths(size_len, index_len);
215 let mut buf = Vec::with_capacity(64 * 1024);
216 let mut read = [0u8; 16 * 1024];
217
218 loop {
219 tokio::select! {
220 _ = shutdown.cancelled() => break,
221 n = stream.read(&mut read) => {
222 let n = n?;
223 if n == 0 { break; }
224 buf.extend_from_slice(&read[..n]);
225 Self::drain_interleaved(&mut buf, &mut depack, &aac, audio_clock, &session)?;
226 }
227 }
228 }
229
230 cseq += 1;
232 let _ = message::write_request(
233 &mut stream,
234 RtspMethod::Teardown,
235 &source.url,
236 cseq,
237 &[("Session", &session_id)],
238 )
239 .await;
240 let _ = stream.shutdown().await;
241 session.finish().await
242 }
243
244 fn drain_interleaved(
248 buf: &mut Vec<u8>,
249 depack: &mut H264Depacketizer,
250 aac: &AacDepacketizer,
251 audio_clock: Option<u32>,
252 session: &crate::inbound::PublishSession,
253 ) -> Result<()> {
254 let mut consumed = 0;
255 while let Some((frame, len)) = InterleavedFrame::parse(&buf[consumed..]) {
256 consumed += len;
257 let Some(header) = RtpHeader::parse(frame.payload) else {
258 continue;
259 };
260 let payload = &frame.payload[header.payload_offset..];
261 match frame.channel {
264 0 => {
265 match depack.push(payload, header.marker, header.timestamp, header.sequence) {
266 Ok(Some(au)) => {
267 let pts = (au.timestamp / 90) as i64; let mf = MediaFrame::new_video(
269 pts,
270 pts,
271 au.data,
272 CodecId::H264,
273 au.keyframe,
274 );
275 let _ = session.publish_frame(mf)?;
276 }
277 Ok(None) => {}
278 Err(e) => debug!(?e, "rtp depacketize skip"),
279 }
280 }
281 2 => {
282 if let Some(clock) = audio_clock {
283 match aac.push(payload) {
284 Ok(units) => {
285 for au in units {
286 let pts =
287 (header.timestamp as i64 * 1000) / clock.max(1) as i64;
288 let mf = MediaFrame::new_audio(pts, au, CodecId::AAC);
289 let _ = session.publish_frame(mf)?;
290 }
291 }
292 Err(e) => debug!(?e, "aac depacketize skip"),
293 }
294 }
295 }
296 _ => {}
297 }
298 }
299 buf.drain(..consumed);
300 Ok(())
301 }
302}
303
304#[async_trait]
305impl InboundProtocol for RtspHandler {
306 fn name(&self) -> &'static str {
307 "rtsp"
308 }
309
310 async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
311 let mut tasks = tokio::task::JoinSet::new();
314 for source in &self.sources {
315 tasks.spawn(Self::run_source(
316 source.clone(),
317 ctx.clone(),
318 shutdown.clone(),
319 self.retry_backoff,
320 ));
321 }
322 while tasks.join_next().await.is_some() {}
323 Ok(())
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 #[test]
332 fn source_builder_sets_url_and_key() {
333 let s = RtspSource::new("rtsp://cam/stream", StreamKey::new("live", "cam1"));
334 assert_eq!(s.url, "rtsp://cam/stream");
335 assert_eq!(s.key.stream_id.as_str(), "cam1");
336 }
337
338 #[test]
339 fn handler_collects_sources() {
340 let h = RtspHandler::new()
341 .source(RtspSource::new("rtsp://a/1", StreamKey::new("live", "a")))
342 .source(RtspSource::new("rtsp://b/2", StreamKey::new("live", "b")));
343 assert_eq!(h.sources.len(), 2);
344 assert_eq!(h.name(), "rtsp");
345 }
346}