arcly_stream/protocol/rtsp/
message.rs1use crate::Result;
4use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum RtspMethod {
9 Options,
11 Describe,
13 Setup,
15 Play,
17 Teardown,
19}
20
21impl RtspMethod {
22 pub fn as_str(self) -> &'static str {
24 match self {
25 RtspMethod::Options => "OPTIONS",
26 RtspMethod::Describe => "DESCRIBE",
27 RtspMethod::Setup => "SETUP",
28 RtspMethod::Play => "PLAY",
29 RtspMethod::Teardown => "TEARDOWN",
30 }
31 }
32}
33
34#[derive(Debug, Clone, PartialEq, Eq)]
36pub struct RtspRequest {
37 pub method: String,
39 pub uri: String,
41 pub cseq: u32,
43 pub headers: Vec<(String, String)>,
45}
46
47impl RtspRequest {
48 pub fn parse(text: &str) -> Option<RtspRequest> {
51 let mut lines = text.split("\r\n");
52 let request_line = lines.next()?;
53 let mut parts = request_line.split(' ');
54 let method = parts.next()?.to_string();
55 let uri = parts.next()?.to_string();
56 let mut headers = Vec::new();
57 let mut cseq = None;
58 for line in lines {
59 if line.is_empty() {
60 break;
61 }
62 let (name, value) = line.split_once(':')?;
63 let (name, value) = (name.trim().to_string(), value.trim().to_string());
64 if name.eq_ignore_ascii_case("cseq") {
65 cseq = value.parse().ok();
66 }
67 headers.push((name, value));
68 }
69 Some(RtspRequest {
70 method,
71 uri,
72 cseq: cseq?,
73 headers,
74 })
75 }
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct RtspResponse {
81 pub status: u16,
83 pub headers: Vec<(String, String)>,
85 pub body: String,
87}
88
89impl RtspResponse {
90 pub fn parse(text: &str, body: String) -> Option<RtspResponse> {
92 let mut lines = text.split("\r\n");
93 let status_line = lines.next()?;
94 let status = status_line.split(' ').nth(1)?.parse().ok()?;
96 let mut headers = Vec::new();
97 for line in lines {
98 if line.is_empty() {
99 continue;
100 }
101 if let Some((name, value)) = line.split_once(':') {
102 headers.push((name.trim().to_string(), value.trim().to_string()));
103 }
104 }
105 Some(RtspResponse {
106 status,
107 headers,
108 body,
109 })
110 }
111
112 pub fn header(&self, name: &str) -> Option<&str> {
114 self.headers
115 .iter()
116 .find(|(n, _)| n.eq_ignore_ascii_case(name))
117 .map(|(_, v)| v.as_str())
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct InterleavedFrame<'a> {
125 pub channel: u8,
127 pub payload: &'a [u8],
129}
130
131impl<'a> InterleavedFrame<'a> {
132 pub fn parse(buf: &'a [u8]) -> Option<(InterleavedFrame<'a>, usize)> {
136 if buf.len() < 4 || buf[0] != b'$' {
137 return None;
138 }
139 let channel = buf[1];
140 let length = u16::from_be_bytes([buf[2], buf[3]]) as usize;
141 if buf.len() < 4 + length {
142 return None;
143 }
144 Some((
145 InterleavedFrame {
146 channel,
147 payload: &buf[4..4 + length],
148 },
149 4 + length,
150 ))
151 }
152}
153
154pub fn host_port(url: &str) -> Option<(String, u16)> {
156 let rest = url.strip_prefix("rtsp://")?;
157 let authority = rest.split('/').next()?;
158 let authority = authority.rsplit('@').next()?;
160 match authority.rsplit_once(':') {
161 Some((host, port)) => Some((host.to_string(), port.parse().ok()?)),
162 None => Some((authority.to_string(), 554)),
163 }
164}
165
166pub fn session_id(resp: &RtspResponse) -> Option<String> {
168 let raw = resp.header("Session")?;
169 Some(raw.split(';').next().unwrap_or(raw).trim().to_string())
170}
171
172pub async fn write_request<W: AsyncWrite + Unpin>(
175 w: &mut W,
176 method: RtspMethod,
177 uri: &str,
178 cseq: u32,
179 extra: &[(&str, &str)],
180) -> Result<()> {
181 let mut msg = format!(
182 "{} {} RTSP/1.0\r\nCSeq: {}\r\nUser-Agent: arcly-stream\r\n",
183 method.as_str(),
184 uri,
185 cseq
186 );
187 for (name, value) in extra {
188 msg.push_str(name);
189 msg.push_str(": ");
190 msg.push_str(value);
191 msg.push_str("\r\n");
192 }
193 msg.push_str("\r\n");
194 w.write_all(msg.as_bytes()).await?;
195 w.flush().await?;
196 Ok(())
197}
198
199pub async fn read_response<R: AsyncRead + Unpin>(r: &mut R) -> Result<RtspResponse> {
202 let mut head = Vec::new();
203 let mut byte = [0u8; 1];
204 loop {
206 let n = r.read(&mut byte).await?;
207 if n == 0 {
208 return Err(crate::StreamError::ConnectionClosed);
209 }
210 head.push(byte[0]);
211 if head.ends_with(b"\r\n\r\n") {
212 break;
213 }
214 if head.len() > 64 * 1024 {
215 return Err(crate::StreamError::protocol("rtsp header too large"));
216 }
217 }
218 let text = String::from_utf8_lossy(&head).into_owned();
219 let content_length = text
220 .split("\r\n")
221 .find_map(|l| {
222 l.split_once(':')
223 .filter(|(n, _)| n.eq_ignore_ascii_case("content-length"))
224 })
225 .and_then(|(_, v)| v.trim().parse::<usize>().ok())
226 .unwrap_or(0);
227
228 let mut body = vec![0u8; content_length];
229 if content_length > 0 {
230 r.read_exact(&mut body).await?;
231 }
232 RtspResponse::parse(&text, String::from_utf8_lossy(&body).into_owned())
233 .ok_or_else(|| crate::StreamError::protocol("malformed rtsp response"))
234}
235
236#[cfg(test)]
237mod tests {
238 use super::*;
239
240 #[test]
241 fn parses_request_and_cseq() {
242 let req = "OPTIONS rtsp://cam/stream RTSP/1.0\r\nCSeq: 3\r\nUser-Agent: x\r\n\r\n";
243 let parsed = RtspRequest::parse(req).unwrap();
244 assert_eq!(parsed.method, "OPTIONS");
245 assert_eq!(parsed.uri, "rtsp://cam/stream");
246 assert_eq!(parsed.cseq, 3);
247 }
248
249 #[test]
250 fn parses_response_status_and_headers() {
251 let resp = RtspResponse::parse(
252 "RTSP/1.0 200 OK\r\nCSeq: 2\r\nSession: 12345678;timeout=60\r\n",
253 String::new(),
254 )
255 .unwrap();
256 assert_eq!(resp.status, 200);
257 assert_eq!(resp.header("session"), Some("12345678;timeout=60"));
258 assert_eq!(session_id(&resp).as_deref(), Some("12345678"));
259 }
260
261 #[test]
262 fn host_port_defaults_and_explicit() {
263 assert_eq!(host_port("rtsp://cam/stream"), Some(("cam".into(), 554)));
264 assert_eq!(host_port("rtsp://cam:8554/s"), Some(("cam".into(), 8554)));
265 assert_eq!(
266 host_port("rtsp://user:pw@cam:9000/s"),
267 Some(("cam".into(), 9000))
268 );
269 assert!(host_port("http://cam/s").is_none());
270 }
271
272 #[test]
273 fn interleaved_frame_parses_and_reports_consumed() {
274 let buf = [b'$', 0, 0, 3, 0xAA, 0xBB, 0xCC, 0xFF];
276 let (frame, len) = InterleavedFrame::parse(&buf).unwrap();
277 assert_eq!(frame.channel, 0);
278 assert_eq!(frame.payload, &[0xAA, 0xBB, 0xCC]);
279 assert_eq!(len, 7);
280 }
281
282 #[test]
283 fn interleaved_frame_needs_full_payload() {
284 assert!(InterleavedFrame::parse(&[b'$', 0, 0, 9, 1, 2]).is_none());
285 assert!(InterleavedFrame::parse(&[0x80, 0, 0, 0]).is_none()); }
287}