Skip to main content

arcly_stream/protocol/rtsp/
message.rs

1//! RTSP/1.0 message parsing and TCP-interleaved framing (RFC 2326).
2
3use crate::Result;
4use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
5
6/// An RTSP request method this client issues (RFC 2326 §10).
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum RtspMethod {
9    /// `OPTIONS` — discover supported methods.
10    Options,
11    /// `DESCRIBE` — fetch the SDP session description.
12    Describe,
13    /// `SETUP` — negotiate transport for one media track.
14    Setup,
15    /// `PLAY` — start the media flow.
16    Play,
17    /// `TEARDOWN` — stop and release the session.
18    Teardown,
19}
20
21impl RtspMethod {
22    /// The on-the-wire method token.
23    pub fn as_str(self) -> &'static str {
24        match self {
25            RtspMethod::Options => "OPTIONS",
26            RtspMethod::Describe => "DESCRIBE",
27            RtspMethod::Setup => "SETUP",
28            RtspMethod::Play => "PLAY",
29            RtspMethod::Teardown => "TEARDOWN",
30        }
31    }
32}
33
34/// A parsed RTSP request (server-side view, for completeness/testing).
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct RtspRequest {
37    /// The request method token.
38    pub method: String,
39    /// The request URI.
40    pub uri: String,
41    /// `CSeq` sequence number echoed in the response.
42    pub cseq: u32,
43    /// Header name/value pairs (verbatim order).
44    pub headers: Vec<(String, String)>,
45}
46
47impl RtspRequest {
48    /// Parse a complete request (headers terminated by a blank line). Returns
49    /// `None` if the request line or `CSeq` is missing/malformed.
50    pub fn parse(text: &str) -> Option<RtspRequest> {
51        let mut lines = text.split("\r\n");
52        let request_line = lines.next()?;
53        let mut parts = request_line.split(' ');
54        let method = parts.next()?.to_string();
55        let uri = parts.next()?.to_string();
56        let mut headers = Vec::new();
57        let mut cseq = None;
58        for line in lines {
59            if line.is_empty() {
60                break;
61            }
62            let (name, value) = line.split_once(':')?;
63            let (name, value) = (name.trim().to_string(), value.trim().to_string());
64            if name.eq_ignore_ascii_case("cseq") {
65                cseq = value.parse().ok();
66            }
67            headers.push((name, value));
68        }
69        Some(RtspRequest {
70            method,
71            uri,
72            cseq: cseq?,
73            headers,
74        })
75    }
76}
77
78/// A parsed RTSP response.
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct RtspResponse {
81    /// Numeric status code (200, 401, 461, …).
82    pub status: u16,
83    /// Header name/value pairs (verbatim order).
84    pub headers: Vec<(String, String)>,
85    /// Message body (the SDP for a `DESCRIBE` response).
86    pub body: String,
87}
88
89impl RtspResponse {
90    /// Parse `text` (status line + headers) and attach `body`.
91    pub fn parse(text: &str, body: String) -> Option<RtspResponse> {
92        let mut lines = text.split("\r\n");
93        let status_line = lines.next()?;
94        // "RTSP/1.0 200 OK"
95        let status = status_line.split(' ').nth(1)?.parse().ok()?;
96        let mut headers = Vec::new();
97        for line in lines {
98            if line.is_empty() {
99                continue;
100            }
101            if let Some((name, value)) = line.split_once(':') {
102                headers.push((name.trim().to_string(), value.trim().to_string()));
103            }
104        }
105        Some(RtspResponse {
106            status,
107            headers,
108            body,
109        })
110    }
111
112    /// First header value matching `name`, case-insensitively.
113    pub fn header(&self, name: &str) -> Option<&str> {
114        self.headers
115            .iter()
116            .find(|(n, _)| n.eq_ignore_ascii_case(name))
117            .map(|(_, v)| v.as_str())
118    }
119}
120
121/// A TCP-interleaved binary frame: `$`, channel id, 16-bit length, then payload
122/// (RFC 2326 §10.12). Used to multiplex RTP/RTCP over the RTSP TCP connection.
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct InterleavedFrame<'a> {
125    /// Interleaved channel id (even = RTP, odd = RTCP, per the SETUP transport).
126    pub channel: u8,
127    /// The embedded RTP/RTCP packet.
128    pub payload: &'a [u8],
129}
130
131impl<'a> InterleavedFrame<'a> {
132    /// Parse one interleaved frame from the front of `buf`. Returns the frame and
133    /// the total bytes consumed (`4 + length`), or `None` if `buf` does not yet
134    /// hold a `$`-prefixed frame in full.
135    pub fn parse(buf: &'a [u8]) -> Option<(InterleavedFrame<'a>, usize)> {
136        if buf.len() < 4 || buf[0] != b'$' {
137            return None;
138        }
139        let channel = buf[1];
140        let length = u16::from_be_bytes([buf[2], buf[3]]) as usize;
141        if buf.len() < 4 + length {
142            return None;
143        }
144        Some((
145            InterleavedFrame {
146                channel,
147                payload: &buf[4..4 + length],
148            },
149            4 + length,
150        ))
151    }
152}
153
154/// Split `rtsp://host[:port]/path` into host and port (default 554).
155pub fn host_port(url: &str) -> Option<(String, u16)> {
156    let rest = url.strip_prefix("rtsp://")?;
157    let authority = rest.split('/').next()?;
158    // Strip any embedded userinfo (`user:pass@host`).
159    let authority = authority.rsplit('@').next()?;
160    match authority.rsplit_once(':') {
161        Some((host, port)) => Some((host.to_string(), port.parse().ok()?)),
162        None => Some((authority.to_string(), 554)),
163    }
164}
165
166/// Extract the `Session` id (trimming any `;timeout=` parameter) from a response.
167pub fn session_id(resp: &RtspResponse) -> Option<String> {
168    let raw = resp.header("Session")?;
169    Some(raw.split(';').next().unwrap_or(raw).trim().to_string())
170}
171
172/// Write an RTSP request with the standard `CSeq`/`User-Agent` headers plus
173/// `extra` header pairs.
174pub async fn write_request<W: AsyncWrite + Unpin>(
175    w: &mut W,
176    method: RtspMethod,
177    uri: &str,
178    cseq: u32,
179    extra: &[(&str, &str)],
180) -> Result<()> {
181    let mut msg = format!(
182        "{} {} RTSP/1.0\r\nCSeq: {}\r\nUser-Agent: arcly-stream\r\n",
183        method.as_str(),
184        uri,
185        cseq
186    );
187    for (name, value) in extra {
188        msg.push_str(name);
189        msg.push_str(": ");
190        msg.push_str(value);
191        msg.push_str("\r\n");
192    }
193    msg.push_str("\r\n");
194    w.write_all(msg.as_bytes()).await?;
195    w.flush().await?;
196    Ok(())
197}
198
199/// Read one RTSP response (headers to the blank line, then any `Content-Length`
200/// body). Skips leading interleaved data that may arrive before the response.
201pub async fn read_response<R: AsyncRead + Unpin>(r: &mut R) -> Result<RtspResponse> {
202    let mut head = Vec::new();
203    let mut byte = [0u8; 1];
204    // Read until the CRLF CRLF header terminator.
205    loop {
206        let n = r.read(&mut byte).await?;
207        if n == 0 {
208            return Err(crate::StreamError::ConnectionClosed);
209        }
210        head.push(byte[0]);
211        if head.ends_with(b"\r\n\r\n") {
212            break;
213        }
214        if head.len() > 64 * 1024 {
215            return Err(crate::StreamError::protocol("rtsp header too large"));
216        }
217    }
218    let text = String::from_utf8_lossy(&head).into_owned();
219    let content_length = text
220        .split("\r\n")
221        .find_map(|l| {
222            l.split_once(':')
223                .filter(|(n, _)| n.eq_ignore_ascii_case("content-length"))
224        })
225        .and_then(|(_, v)| v.trim().parse::<usize>().ok())
226        .unwrap_or(0);
227
228    let mut body = vec![0u8; content_length];
229    if content_length > 0 {
230        r.read_exact(&mut body).await?;
231    }
232    RtspResponse::parse(&text, String::from_utf8_lossy(&body).into_owned())
233        .ok_or_else(|| crate::StreamError::protocol("malformed rtsp response"))
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239
240    #[test]
241    fn parses_request_and_cseq() {
242        let req = "OPTIONS rtsp://cam/stream RTSP/1.0\r\nCSeq: 3\r\nUser-Agent: x\r\n\r\n";
243        let parsed = RtspRequest::parse(req).unwrap();
244        assert_eq!(parsed.method, "OPTIONS");
245        assert_eq!(parsed.uri, "rtsp://cam/stream");
246        assert_eq!(parsed.cseq, 3);
247    }
248
249    #[test]
250    fn parses_response_status_and_headers() {
251        let resp = RtspResponse::parse(
252            "RTSP/1.0 200 OK\r\nCSeq: 2\r\nSession: 12345678;timeout=60\r\n",
253            String::new(),
254        )
255        .unwrap();
256        assert_eq!(resp.status, 200);
257        assert_eq!(resp.header("session"), Some("12345678;timeout=60"));
258        assert_eq!(session_id(&resp).as_deref(), Some("12345678"));
259    }
260
261    #[test]
262    fn host_port_defaults_and_explicit() {
263        assert_eq!(host_port("rtsp://cam/stream"), Some(("cam".into(), 554)));
264        assert_eq!(host_port("rtsp://cam:8554/s"), Some(("cam".into(), 8554)));
265        assert_eq!(
266            host_port("rtsp://user:pw@cam:9000/s"),
267            Some(("cam".into(), 9000))
268        );
269        assert!(host_port("http://cam/s").is_none());
270    }
271
272    #[test]
273    fn interleaved_frame_parses_and_reports_consumed() {
274        // $, channel 0, length 3, payload AA BB CC, then a trailing byte.
275        let buf = [b'$', 0, 0, 3, 0xAA, 0xBB, 0xCC, 0xFF];
276        let (frame, len) = InterleavedFrame::parse(&buf).unwrap();
277        assert_eq!(frame.channel, 0);
278        assert_eq!(frame.payload, &[0xAA, 0xBB, 0xCC]);
279        assert_eq!(len, 7);
280    }
281
282    #[test]
283    fn interleaved_frame_needs_full_payload() {
284        assert!(InterleavedFrame::parse(&[b'$', 0, 0, 9, 1, 2]).is_none());
285        assert!(InterleavedFrame::parse(&[0x80, 0, 0, 0]).is_none()); // not a '$' frame
286    }
287}