use crate::Result;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RtspMethod {
Options,
Describe,
Setup,
Play,
Teardown,
}
impl RtspMethod {
pub fn as_str(self) -> &'static str {
match self {
RtspMethod::Options => "OPTIONS",
RtspMethod::Describe => "DESCRIBE",
RtspMethod::Setup => "SETUP",
RtspMethod::Play => "PLAY",
RtspMethod::Teardown => "TEARDOWN",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RtspRequest {
pub method: String,
pub uri: String,
pub cseq: u32,
pub headers: Vec<(String, String)>,
}
impl RtspRequest {
pub fn parse(text: &str) -> Option<RtspRequest> {
let mut lines = text.split("\r\n");
let request_line = lines.next()?;
let mut parts = request_line.split(' ');
let method = parts.next()?.to_string();
let uri = parts.next()?.to_string();
let mut headers = Vec::new();
let mut cseq = None;
for line in lines {
if line.is_empty() {
break;
}
let (name, value) = line.split_once(':')?;
let (name, value) = (name.trim().to_string(), value.trim().to_string());
if name.eq_ignore_ascii_case("cseq") {
cseq = value.parse().ok();
}
headers.push((name, value));
}
Some(RtspRequest {
method,
uri,
cseq: cseq?,
headers,
})
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RtspResponse {
pub status: u16,
pub headers: Vec<(String, String)>,
pub body: String,
}
impl RtspResponse {
pub fn parse(text: &str, body: String) -> Option<RtspResponse> {
let mut lines = text.split("\r\n");
let status_line = lines.next()?;
let status = status_line.split(' ').nth(1)?.parse().ok()?;
let mut headers = Vec::new();
for line in lines {
if line.is_empty() {
continue;
}
if let Some((name, value)) = line.split_once(':') {
headers.push((name.trim().to_string(), value.trim().to_string()));
}
}
Some(RtspResponse {
status,
headers,
body,
})
}
pub fn header(&self, name: &str) -> Option<&str> {
self.headers
.iter()
.find(|(n, _)| n.eq_ignore_ascii_case(name))
.map(|(_, v)| v.as_str())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InterleavedFrame<'a> {
pub channel: u8,
pub payload: &'a [u8],
}
impl<'a> InterleavedFrame<'a> {
pub fn parse(buf: &'a [u8]) -> Option<(InterleavedFrame<'a>, usize)> {
if buf.len() < 4 || buf[0] != b'$' {
return None;
}
let channel = buf[1];
let length = u16::from_be_bytes([buf[2], buf[3]]) as usize;
if buf.len() < 4 + length {
return None;
}
Some((
InterleavedFrame {
channel,
payload: &buf[4..4 + length],
},
4 + length,
))
}
}
pub fn host_port(url: &str) -> Option<(String, u16)> {
let rest = url.strip_prefix("rtsp://")?;
let authority = rest.split('/').next()?;
let authority = authority.rsplit('@').next()?;
match authority.rsplit_once(':') {
Some((host, port)) => Some((host.to_string(), port.parse().ok()?)),
None => Some((authority.to_string(), 554)),
}
}
pub fn session_id(resp: &RtspResponse) -> Option<String> {
let raw = resp.header("Session")?;
Some(raw.split(';').next().unwrap_or(raw).trim().to_string())
}
pub async fn write_request<W: AsyncWrite + Unpin>(
w: &mut W,
method: RtspMethod,
uri: &str,
cseq: u32,
extra: &[(&str, &str)],
) -> Result<()> {
let mut msg = format!(
"{} {} RTSP/1.0\r\nCSeq: {}\r\nUser-Agent: arcly-stream\r\n",
method.as_str(),
uri,
cseq
);
for (name, value) in extra {
msg.push_str(name);
msg.push_str(": ");
msg.push_str(value);
msg.push_str("\r\n");
}
msg.push_str("\r\n");
w.write_all(msg.as_bytes()).await?;
w.flush().await?;
Ok(())
}
pub async fn read_response<R: AsyncRead + Unpin>(r: &mut R) -> Result<RtspResponse> {
let mut head = Vec::new();
let mut byte = [0u8; 1];
loop {
let n = r.read(&mut byte).await?;
if n == 0 {
return Err(crate::StreamError::ConnectionClosed);
}
head.push(byte[0]);
if head.ends_with(b"\r\n\r\n") {
break;
}
if head.len() > 64 * 1024 {
return Err(crate::StreamError::protocol("rtsp header too large"));
}
}
let text = String::from_utf8_lossy(&head).into_owned();
let content_length = text
.split("\r\n")
.find_map(|l| {
l.split_once(':')
.filter(|(n, _)| n.eq_ignore_ascii_case("content-length"))
})
.and_then(|(_, v)| v.trim().parse::<usize>().ok())
.unwrap_or(0);
let mut body = vec![0u8; content_length];
if content_length > 0 {
r.read_exact(&mut body).await?;
}
RtspResponse::parse(&text, String::from_utf8_lossy(&body).into_owned())
.ok_or_else(|| crate::StreamError::protocol("malformed rtsp response"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parses_request_and_cseq() {
let req = "OPTIONS rtsp://cam/stream RTSP/1.0\r\nCSeq: 3\r\nUser-Agent: x\r\n\r\n";
let parsed = RtspRequest::parse(req).unwrap();
assert_eq!(parsed.method, "OPTIONS");
assert_eq!(parsed.uri, "rtsp://cam/stream");
assert_eq!(parsed.cseq, 3);
}
#[test]
fn parses_response_status_and_headers() {
let resp = RtspResponse::parse(
"RTSP/1.0 200 OK\r\nCSeq: 2\r\nSession: 12345678;timeout=60\r\n",
String::new(),
)
.unwrap();
assert_eq!(resp.status, 200);
assert_eq!(resp.header("session"), Some("12345678;timeout=60"));
assert_eq!(session_id(&resp).as_deref(), Some("12345678"));
}
#[test]
fn host_port_defaults_and_explicit() {
assert_eq!(host_port("rtsp://cam/stream"), Some(("cam".into(), 554)));
assert_eq!(host_port("rtsp://cam:8554/s"), Some(("cam".into(), 8554)));
assert_eq!(
host_port("rtsp://user:pw@cam:9000/s"),
Some(("cam".into(), 9000))
);
assert!(host_port("http://cam/s").is_none());
}
#[test]
fn interleaved_frame_parses_and_reports_consumed() {
let buf = [b'$', 0, 0, 3, 0xAA, 0xBB, 0xCC, 0xFF];
let (frame, len) = InterleavedFrame::parse(&buf).unwrap();
assert_eq!(frame.channel, 0);
assert_eq!(frame.payload, &[0xAA, 0xBB, 0xCC]);
assert_eq!(len, 7);
}
#[test]
fn interleaved_frame_needs_full_payload() {
assert!(InterleavedFrame::parse(&[b'$', 0, 0, 9, 1, 2]).is_none());
assert!(InterleavedFrame::parse(&[0x80, 0, 0, 0]).is_none()); }
}