#![allow(dead_code)]
use std::collections::HashMap;
pub type RtspSessionId = String;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u16)]
pub enum RtspStatus {
Ok = 200,
BadRequest = 400,
NotFound = 404,
MethodNotAllowed = 405,
SessionNotFound = 454,
MethodNotValidInState = 455,
InternalError = 500,
NotImplemented = 501,
}
impl RtspStatus {
#[must_use]
pub fn reason_phrase(self) -> &'static str {
match self {
Self::Ok => "OK",
Self::BadRequest => "Bad Request",
Self::NotFound => "Not Found",
Self::MethodNotAllowed => "Method Not Allowed",
Self::SessionNotFound => "Session Not Found",
Self::MethodNotValidInState => "Method Not Valid In This State",
Self::InternalError => "Internal Server Error",
Self::NotImplemented => "Not Implemented",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RtspTransport {
UdpUnicast {
client_rtp_port: u16,
client_rtcp_port: u16,
server_rtp_port: u16,
server_rtcp_port: u16,
},
UdpMulticast {
group_addr: String,
rtp_port: u16,
ttl: u8,
},
TcpInterleaved {
rtp_channel: u8,
rtcp_channel: u8,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RtspSessionState {
Init,
Ready,
Playing,
TearingDown,
}
#[derive(Debug, Clone)]
pub struct RtspSession {
pub id: RtspSessionId,
pub state: RtspSessionState,
pub transport: Option<RtspTransport>,
pub url_path: String,
pub last_cseq: u32,
pub created_at_ms: u64,
pub packets_sent: u64,
pub bytes_sent: u64,
}
impl RtspSession {
#[must_use]
pub fn new(id: RtspSessionId, url_path: String, created_at_ms: u64) -> Self {
Self {
id,
state: RtspSessionState::Init,
transport: None,
url_path,
last_cseq: 0,
created_at_ms,
packets_sent: 0,
bytes_sent: 0,
}
}
#[must_use]
pub fn is_playing(&self) -> bool {
self.state == RtspSessionState::Playing
}
}
#[derive(Debug, Clone, PartialEq, thiserror::Error)]
pub enum RtspError {
#[error("RTSP session '{0}' not found")]
SessionNotFound(String),
#[error("method not valid in state {state:?}: {method}")]
InvalidState {
state: RtspSessionState,
method: String,
},
#[error("resource not found: {0}")]
ResourceNotFound(String),
#[error("transport error: {0}")]
TransportError(String),
}
pub type RtspResult<T> = Result<T, RtspError>;
#[derive(Debug, Clone)]
pub struct RtspResponse {
pub status: RtspStatus,
pub cseq: u32,
pub session_id: Option<String>,
pub headers: HashMap<String, String>,
pub body: Option<String>,
}
impl RtspResponse {
#[must_use]
pub fn ok(cseq: u32) -> Self {
Self {
status: RtspStatus::Ok,
cseq,
session_id: None,
headers: HashMap::new(),
body: None,
}
}
#[must_use]
pub fn error(status: RtspStatus, cseq: u32) -> Self {
Self {
status,
cseq,
session_id: None,
headers: HashMap::new(),
body: None,
}
}
#[must_use]
pub fn to_wire(&self) -> String {
let mut s = format!(
"RTSP/1.0 {} {}\r\nCSeq: {}\r\n",
self.status as u16,
self.status.reason_phrase(),
self.cseq,
);
if let Some(sid) = &self.session_id {
s.push_str(&format!("Session: {sid}\r\n"));
}
for (k, v) in &self.headers {
s.push_str(&format!("{k}: {v}\r\n"));
}
if let Some(body) = &self.body {
s.push_str(&format!("Content-Length: {}\r\n", body.len()));
s.push_str("Content-Type: application/sdp\r\n");
s.push_str("\r\n");
s.push_str(body);
} else {
s.push_str("\r\n");
}
s
}
}
#[derive(Debug, Clone)]
pub struct MediaTrack {
pub media_type: String,
pub payload_type: u8,
pub codec: String,
pub clock_rate: u32,
pub channels: Option<u8>,
pub control: String,
}
impl MediaTrack {
#[must_use]
pub fn to_sdp_block(&self) -> String {
let channel_str = self.channels.map(|c| format!("/{c}")).unwrap_or_default();
let port = if self.media_type == "video" {
0u16
} else {
0u16
};
format!(
"m={media} {port} RTP/AVP {pt}\r\n\
a=rtpmap:{pt} {codec}/{rate}{ch}\r\n\
a=control:{ctrl}\r\n",
media = self.media_type,
port = port,
pt = self.payload_type,
codec = self.codec,
rate = self.clock_rate,
ch = channel_str,
ctrl = self.control,
)
}
}
#[derive(Debug, Default)]
pub struct RtspServer {
sessions: HashMap<RtspSessionId, RtspSession>,
tracks: Vec<MediaTrack>,
stream_name: String,
next_server_port: u16,
now_ms: u64,
}
impl RtspServer {
#[must_use]
pub fn new(stream_name: impl Into<String>, tracks: Vec<MediaTrack>) -> Self {
Self {
sessions: HashMap::new(),
tracks,
stream_name: stream_name.into(),
next_server_port: 10000,
now_ms: 0,
}
}
pub fn set_time_ms(&mut self, ms: u64) {
self.now_ms = ms;
}
#[must_use]
pub fn handle_options(&self, cseq: u32) -> RtspResponse {
let mut resp = RtspResponse::ok(cseq);
resp.headers.insert(
"Public".to_owned(),
"OPTIONS, DESCRIBE, SETUP, PLAY, TEARDOWN".to_owned(),
);
resp
}
#[must_use]
pub fn handle_describe(&self, cseq: u32, url: &str) -> RtspResponse {
let sdp = self.build_sdp(url);
let mut resp = RtspResponse::ok(cseq);
resp.body = Some(sdp);
resp
}
pub fn handle_setup(
&mut self,
cseq: u32,
url: &str,
session_id: Option<&str>,
client_rtp_port: u16,
client_rtcp_port: u16,
) -> RtspResponse {
let srv_rtp = self.next_server_port;
let srv_rtcp = srv_rtp + 1;
self.next_server_port = srv_rtp + 2;
let transport = RtspTransport::UdpUnicast {
client_rtp_port,
client_rtcp_port,
server_rtp_port: srv_rtp,
server_rtcp_port: srv_rtcp,
};
let sid = match session_id {
Some(id) => id.to_owned(),
None => format!("{:016x}", self.now_ms ^ (cseq as u64)),
};
let session = self
.sessions
.entry(sid.clone())
.or_insert_with(|| RtspSession::new(sid.clone(), url.to_owned(), self.now_ms));
session.transport = Some(transport.clone());
session.last_cseq = cseq;
if session.state == RtspSessionState::Init {
session.state = RtspSessionState::Ready;
}
let transport_header = match &transport {
RtspTransport::UdpUnicast {
client_rtp_port,
client_rtcp_port,
server_rtp_port,
server_rtcp_port,
} => format!(
"RTP/AVP;unicast;client_port={crtp}-{crtcp};server_port={srtp}-{srtcp}",
crtp = client_rtp_port,
crtcp = client_rtcp_port,
srtp = server_rtp_port,
srtcp = server_rtcp_port,
),
_ => "RTP/AVP".to_owned(),
};
let mut resp = RtspResponse::ok(cseq);
resp.session_id = Some(sid);
resp.headers
.insert("Transport".to_owned(), transport_header);
resp
}
pub fn handle_play(&mut self, cseq: u32, session_id: &str) -> RtspResponse {
match self.sessions.get_mut(session_id) {
None => RtspResponse::error(RtspStatus::SessionNotFound, cseq),
Some(sess) => {
if sess.state != RtspSessionState::Ready && sess.state != RtspSessionState::Playing
{
return RtspResponse::error(RtspStatus::MethodNotValidInState, cseq);
}
sess.state = RtspSessionState::Playing;
sess.last_cseq = cseq;
let mut resp = RtspResponse::ok(cseq);
resp.session_id = Some(session_id.to_owned());
resp.headers
.insert("Range".to_owned(), "npt=0.000-".to_owned());
resp
}
}
}
pub fn handle_teardown(&mut self, cseq: u32, session_id: &str) -> RtspResponse {
match self.sessions.get_mut(session_id) {
None => RtspResponse::error(RtspStatus::SessionNotFound, cseq),
Some(sess) => {
sess.state = RtspSessionState::TearingDown;
sess.last_cseq = cseq;
let mut resp = RtspResponse::ok(cseq);
resp.session_id = Some(session_id.to_owned());
resp
}
}
}
pub fn get_session(&self, id: &str) -> RtspResult<&RtspSession> {
self.sessions
.get(id)
.ok_or_else(|| RtspError::SessionNotFound(id.to_owned()))
}
#[must_use]
pub fn playing_count(&self) -> usize {
self.sessions.values().filter(|s| s.is_playing()).count()
}
pub fn gc_sessions(&mut self) {
self.sessions
.retain(|_, s| s.state != RtspSessionState::TearingDown);
}
pub fn record_sent(&mut self, session_id: &str, packets: u64, bytes: u64) {
if let Some(sess) = self.sessions.get_mut(session_id) {
sess.packets_sent += packets;
sess.bytes_sent += bytes;
}
}
fn build_sdp(&self, base_url: &str) -> String {
let mut sdp = format!(
"v=0\r\n\
o=- 0 0 IN IP4 0.0.0.0\r\n\
s={name}\r\n\
t=0 0\r\n\
a=control:{url}\r\n",
name = self.stream_name,
url = base_url,
);
for track in &self.tracks {
sdp.push_str(&track.to_sdp_block());
}
sdp
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_server() -> RtspServer {
let tracks = vec![
MediaTrack {
media_type: "video".to_owned(),
payload_type: 96,
codec: "VP9".to_owned(),
clock_rate: 90000,
channels: None,
control: "track1".to_owned(),
},
MediaTrack {
media_type: "audio".to_owned(),
payload_type: 97,
codec: "opus".to_owned(),
clock_rate: 48000,
channels: Some(2),
control: "track2".to_owned(),
},
];
RtspServer::new("Test Stream", tracks)
}
#[test]
fn test_options_response() {
let server = make_server();
let resp = server.handle_options(1);
assert_eq!(resp.status, RtspStatus::Ok);
assert_eq!(resp.cseq, 1);
assert!(resp
.headers
.get("Public")
.expect("Public header is always set in OPTIONS response")
.contains("DESCRIBE"));
}
#[test]
fn test_describe_returns_sdp() {
let server = make_server();
let resp = server.handle_describe(2, "rtsp://localhost/live");
assert_eq!(resp.status, RtspStatus::Ok);
let sdp = resp.body.expect("DESCRIBE response always has an SDP body");
assert!(sdp.contains("m=video"));
assert!(sdp.contains("VP9"));
assert!(sdp.contains("m=audio"));
}
#[test]
fn test_setup_creates_session() {
let mut server = make_server();
server.set_time_ms(1000);
let resp = server.handle_setup(3, "rtsp://localhost/live/track1", None, 50000, 50001);
assert_eq!(resp.status, RtspStatus::Ok);
let sid = resp
.session_id
.expect("SETUP response must include a session ID");
let sess = server
.get_session(&sid)
.expect("session was just created by SETUP");
assert_eq!(sess.state, RtspSessionState::Ready);
}
#[test]
fn test_play_transitions_to_playing() {
let mut server = make_server();
let setup = server.handle_setup(1, "rtsp://localhost/live/track1", None, 50000, 50001);
let sid = setup
.session_id
.expect("SETUP response must include a session ID");
let resp = server.handle_play(2, &sid);
assert_eq!(resp.status, RtspStatus::Ok);
let sess = server
.get_session(&sid)
.expect("session was created by SETUP");
assert!(sess.is_playing());
}
#[test]
fn test_teardown() {
let mut server = make_server();
let setup = server.handle_setup(1, "rtsp://localhost/live/track1", None, 50000, 50001);
let sid = setup
.session_id
.expect("SETUP response must include a session ID");
server.handle_play(2, &sid);
let resp = server.handle_teardown(3, &sid);
assert_eq!(resp.status, RtspStatus::Ok);
let sess = server
.get_session(&sid)
.expect("session was created by SETUP");
assert_eq!(sess.state, RtspSessionState::TearingDown);
}
#[test]
fn test_play_unknown_session_returns_error() {
let mut server = make_server();
let resp = server.handle_play(1, "nosession");
assert_eq!(resp.status, RtspStatus::SessionNotFound);
}
#[test]
fn test_playing_count() {
let mut server = make_server();
let setup = server.handle_setup(1, "rtsp://localhost/live/track1", None, 50000, 50001);
let sid = setup
.session_id
.expect("SETUP response must include a session ID");
assert_eq!(server.playing_count(), 0);
server.handle_play(2, &sid);
assert_eq!(server.playing_count(), 1);
}
#[test]
fn test_gc_sessions() {
let mut server = make_server();
let setup = server.handle_setup(1, "rtsp://localhost/live/track1", None, 50000, 50001);
let sid = setup
.session_id
.expect("SETUP response must include a session ID");
server.handle_teardown(2, &sid);
server.gc_sessions();
assert!(server.get_session(&sid).is_err());
}
#[test]
fn test_wire_format_ok() {
let resp = RtspResponse::ok(5);
let wire = resp.to_wire();
assert!(wire.starts_with("RTSP/1.0 200 OK"));
assert!(wire.contains("CSeq: 5"));
}
#[test]
fn test_record_sent() {
let mut server = make_server();
let setup = server.handle_setup(1, "rtsp://localhost/live/track1", None, 50000, 50001);
let sid = setup
.session_id
.expect("SETUP response must include a session ID");
server.record_sent(&sid, 100, 153600);
let sess = server
.get_session(&sid)
.expect("session was created by SETUP");
assert_eq!(sess.packets_sent, 100);
assert_eq!(sess.bytes_sent, 153600);
}
}