use std::convert::TryFrom;
use std::io;
use std::mem::MaybeUninit;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::num::NonZeroU32;
use std::sync::{Arc, Mutex};
use std::task::Poll;
use std::time::Instant;
use std::{fmt::Debug, num::NonZeroU16, pin::Pin};
use self::channel_mapping::*;
pub use self::timeline::Timeline;
use crate::rtsp::msg::{self as msg, OwnedMessage, StatusCode};
use bytes::Bytes;
use futures::{Future, SinkExt, StreamExt, ready};
use log::{debug, trace, warn};
use pin_project::pin_project;
use tokio::net::UdpSocket;
use tokio::sync::Notify;
use url::Url;
use crate::client::parse::SessionHeader;
use crate::codec::CodecItem;
use crate::{
Error, ErrorInt, RtspMessageContext, StreamContext, StreamContextInner, TcpStreamContext,
UdpStreamContext,
};
mod channel_mapping;
mod parse;
#[doc(hidden)]
pub mod rtp;
mod teardown;
mod timeline;
const LIVE555_EXPIRATION_SEC: u64 = 65;
struct StaleSession {
seqnum: u64,
teardown_rx: Option<tokio::sync::watch::Receiver<Option<Result<(), Error>>>>,
maybe_playing: bool,
has_tcp: bool,
expires: tokio::time::Instant,
}
#[derive(Default)]
pub struct SessionGroup {
name: Option<String>,
sessions: Mutex<SessionGroupInner>,
notify: Notify,
}
#[derive(Default)]
struct SessionGroupInner {
next_seqnum: u64,
sessions: Vec<StaleSession>,
}
pub struct StaleSessionStatus {
pub max_expires: Option<tokio::time::Instant>,
pub num_sessions: usize,
next_seqnum: u64,
}
impl SessionGroup {
pub fn named(self, name: String) -> Self {
SessionGroup {
name: Some(name),
..self
}
}
pub fn name(&self) -> Option<&str> {
self.name.as_deref()
}
fn debug_id(&self) -> impl Debug + use<> {
self.name.clone().unwrap_or_else(|| format!("{:p}", &self))
}
pub fn stale_sessions(&self) -> StaleSessionStatus {
let l = self.sessions.lock().unwrap();
let playing = l.sessions.iter().filter(|s| s.maybe_playing);
StaleSessionStatus {
max_expires: playing.clone().map(|s| s.expires).max(),
num_sessions: playing.count(),
next_seqnum: l.next_seqnum,
}
}
pub async fn await_teardown(&self) -> Result<(), Error> {
let mut watches: Vec<_>;
{
let l = self.sessions.lock().unwrap();
watches = l
.sessions
.iter()
.filter_map(|s| s.teardown_rx.clone())
.collect();
}
let mut overall_result = Ok(());
for w in &mut watches {
let mut r = (*w.borrow_and_update()).clone();
if r.is_none() {
w.changed().await.expect(
"teardown Sender shouldn't be dropped; \
ensure the Session's tokio runtime is still alive",
);
r.clone_from(&*w.borrow())
}
let r = r.expect("teardown result should be populated after change");
overall_result = overall_result.and(r);
}
overall_result
}
pub async fn await_stale_sessions(&self, status: &StaleSessionStatus) {
loop {
let notified = self.notify.notified();
{
let l = self.sessions.lock().unwrap();
let left = l
.sessions
.iter()
.filter(|s| s.maybe_playing && s.seqnum < status.next_seqnum)
.count();
log::trace!(
"Session group {:?} has {} relevant sessions numbered < {}",
self.debug_id(),
left,
status.next_seqnum
);
if left == 0 {
return;
}
}
notified.await;
}
}
fn try_remove_seqnum(&self, seqnum: u64) -> bool {
let mut l = self.sessions.lock().unwrap();
let i = l.sessions.iter().position(|s| s.seqnum == seqnum);
match i {
Some(i) => {
l.sessions.swap_remove(i);
drop(l);
self.notify.notify_waiters();
true
}
None => false,
}
}
}
#[derive(Copy, Clone, Debug, Default, derive_more::Display)]
pub enum TeardownPolicy {
#[default]
#[display("auto")]
Auto,
#[display("always")]
Always,
#[display("never")]
Never,
}
impl std::str::FromStr for TeardownPolicy {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"auto" => TeardownPolicy::Auto,
"never" => TeardownPolicy::Never,
"always" => TeardownPolicy::Always,
_ => bail!(ErrorInt::InvalidArgument(format!(
"bad TeardownPolicy {s}; expected auto, never, or always"
))),
})
}
}
#[derive(Copy, Clone, Debug, Default, derive_more::Display)]
pub enum InitialTimestampPolicy {
#[default]
#[display("default")]
Default,
#[display("require")]
Require,
#[display("ignore")]
Ignore,
#[display("permissive")]
Permissive,
}
impl std::str::FromStr for InitialTimestampPolicy {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"default" => InitialTimestampPolicy::Default,
"require" => InitialTimestampPolicy::Require,
"ignore" => InitialTimestampPolicy::Ignore,
"permissive" => InitialTimestampPolicy::Permissive,
_ => bail!(ErrorInt::InvalidArgument(format!(
"bad InitialTimestampPolicy {s}; \
expected default, require, ignore or permissive"
))),
})
}
}
#[derive(Copy, Clone, Debug, Default, derive_more::Display)]
#[non_exhaustive]
pub enum InitialSequenceNumberPolicy {
#[default]
#[display("default")]
Default,
#[display("respect")]
Respect,
#[display("ignore-suspicious-values")]
IgnoreSuspiciousValues,
#[display("ignore")]
Ignore,
}
impl std::str::FromStr for InitialSequenceNumberPolicy {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"default" => InitialSequenceNumberPolicy::Default,
"respect" => InitialSequenceNumberPolicy::Respect,
"ignore-suspicious-values" => InitialSequenceNumberPolicy::IgnoreSuspiciousValues,
"ignore" => InitialSequenceNumberPolicy::Ignore,
_ => bail!(ErrorInt::InvalidArgument(format!(
"bad InitialSequenceNumberPolicy {s}; \
expected default, respect, ignore-suspicious-values, or ignore"
))),
})
}
}
#[derive(Copy, Clone, Debug, Default, derive_more::Display)]
#[non_exhaustive]
pub enum UnknownRtcpSsrcPolicy {
#[default]
#[display("default")]
Default,
#[display("abort-session")]
AbortSession,
#[display("drop-packets")]
DropPackets,
#[display("process-packets")]
ProcessPackets,
}
impl std::str::FromStr for UnknownRtcpSsrcPolicy {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"default" => UnknownRtcpSsrcPolicy::Default,
"abort-session" => UnknownRtcpSsrcPolicy::AbortSession,
"drop-packets" => UnknownRtcpSsrcPolicy::DropPackets,
"process-packets" => UnknownRtcpSsrcPolicy::ProcessPackets,
_ => bail!(ErrorInt::InvalidArgument(format!(
"bad UnknownRtcpSsrcPolicy {s}; \
expected default, abort-session, drop-packets, or process-packets"
))),
})
}
}
fn keepalive_interval(session: &SessionHeader) -> std::time::Duration {
std::time::Duration::from_secs(std::cmp::min(u64::from(session.timeout_sec), 60)) / 2
}
#[derive(Default)]
pub struct SessionOptions {
creds: Option<Credentials>,
user_agent: Option<Box<str>>,
session_group: Option<Arc<SessionGroup>>,
teardown: TeardownPolicy,
unassigned_channel_data: UnassignedChannelDataPolicy,
session_id: SessionIdPolicy,
}
#[derive(Copy, Clone, Default, derive_more::Display)]
pub enum UnassignedChannelDataPolicy {
#[default]
#[display("auto")]
Auto,
#[display("assume-stale-session")]
AssumeStaleSession,
#[display("error")]
Error,
#[display("ignore")]
Ignore,
}
impl std::str::FromStr for UnassignedChannelDataPolicy {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"auto" => UnassignedChannelDataPolicy::Auto,
"assume-stale-session" => UnassignedChannelDataPolicy::AssumeStaleSession,
"error" => UnassignedChannelDataPolicy::Error,
"ignore" => UnassignedChannelDataPolicy::Ignore,
_ => bail!(ErrorInt::InvalidArgument(format!(
"bad UnassignedChannelDataPolicy {s}; expected auto, assume-stale-session, error, \
or ignore"
))),
})
}
}
#[derive(Copy, Clone, Debug, Default, derive_more::Display)]
pub enum SessionIdPolicy {
#[default]
#[display("default")]
Default,
#[display("require-match")]
RequireMatch,
#[display("use-first")]
UseFirst,
}
impl std::str::FromStr for SessionIdPolicy {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"default" => SessionIdPolicy::Default,
"require-match" => SessionIdPolicy::RequireMatch,
"use-first" => SessionIdPolicy::UseFirst,
_ => bail!(ErrorInt::InvalidArgument(format!(
"bad SessionIdPolicy {s}; \
expected default, require-match or use-first"
))),
})
}
}
#[derive(Clone, Debug, derive_more::Display)]
#[non_exhaustive]
pub enum Transport {
#[display("tcp")]
Tcp(TcpTransportOptions),
#[display("udp")]
Udp(UdpTransportOptions),
}
impl Default for Transport {
fn default() -> Self {
Transport::Tcp(TcpTransportOptions::default())
}
}
impl std::str::FromStr for Transport {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"tcp" => Transport::Tcp(TcpTransportOptions::default()),
"udp" => Transport::Udp(UdpTransportOptions::default()),
_ => bail!(ErrorInt::InvalidArgument(format!(
"bad Transport {s}; \
expected tcp or udp"
))),
})
}
}
#[derive(Clone, Default, Debug)]
#[non_exhaustive]
pub struct TcpTransportOptions;
#[derive(Clone, Default, Debug)]
#[non_exhaustive]
pub struct UdpTransportOptions;
impl SessionOptions {
#[inline]
pub fn creds(mut self, creds: Option<Credentials>) -> Self {
self.creds = creds;
self
}
pub fn user_agent(mut self, user_agent: String) -> Self {
self.user_agent = if user_agent.is_empty() {
None
} else {
Some(user_agent.into_boxed_str())
};
self
}
pub fn session_group(mut self, session_group: Arc<SessionGroup>) -> Self {
self.session_group = Some(session_group);
self
}
pub fn teardown(mut self, teardown: TeardownPolicy) -> Self {
self.teardown = teardown;
self
}
pub fn unassigned_channel_data(mut self, policy: UnassignedChannelDataPolicy) -> Self {
self.unassigned_channel_data = policy;
self
}
pub fn session_id(mut self, policy: SessionIdPolicy) -> Self {
self.session_id = policy;
self
}
}
#[derive(Default)]
pub struct SetupOptions {
transport: Transport,
frame_format: Option<crate::codec::FrameFormat>,
}
impl SetupOptions {
#[inline]
pub fn transport(mut self, transport: Transport) -> Self {
self.transport = transport;
self
}
#[inline]
pub fn frame_format(mut self, format: crate::codec::FrameFormat) -> Self {
self.frame_format = Some(format);
self
}
}
#[derive(Default)]
pub struct PlayOptions {
initial_timestamp: InitialTimestampPolicy,
initial_seq: InitialSequenceNumberPolicy,
enforce_timestamps_with_max_jump_secs: Option<NonZeroU32>,
unknown_rtcp_ssrc: UnknownRtcpSsrcPolicy,
}
impl PlayOptions {
pub fn initial_timestamp(self, initial_timestamp: InitialTimestampPolicy) -> Self {
Self {
initial_timestamp,
..self
}
}
pub fn initial_seq(self, initial_seq: InitialSequenceNumberPolicy) -> Self {
Self {
initial_seq,
..self
}
}
pub fn unknown_rtcp_ssrc(self, unknown_rtcp_ssrc: UnknownRtcpSsrcPolicy) -> Self {
Self {
unknown_rtcp_ssrc,
..self
}
}
#[deprecated]
pub fn ignore_zero_seq(self, ignore_zero_seq: bool) -> Self {
Self {
initial_seq: match ignore_zero_seq {
true => InitialSequenceNumberPolicy::IgnoreSuspiciousValues,
false => InitialSequenceNumberPolicy::Respect,
},
..self
}
}
pub fn enforce_timestamps_with_max_jump_secs(self, secs: NonZeroU32) -> Self {
Self {
enforce_timestamps_with_max_jump_secs: Some(secs),
..self
}
}
}
#[derive(Debug)]
pub(crate) struct Presentation {
pub streams: Box<[Stream]>,
base_url: Url,
pub control: Url,
tool: Option<Tool>,
}
#[derive(Eq, PartialEq)]
pub struct Tool(Box<str>);
impl Tool {
pub fn new(raw: &str) -> Self {
Self(raw.into())
}
pub fn has_live555_tcp_bug(&self) -> bool {
if let Some(version) = self.0.strip_prefix("LIVE555 Streaming Media v") {
version > "0000.00.00" && version < "2017.06.04"
} else {
false
}
}
}
impl std::fmt::Debug for Tool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&*self.0, f)
}
}
impl std::ops::Deref for Tool {
type Target = str;
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub struct Stream {
depacketizer: Result<crate::codec::Depacketizer, String>,
state: StreamState,
media: Box<str>,
encoding_name: Box<str>,
rtp_payload_type: u8,
clock_rate_hz: u32,
channels: Option<NonZeroU16>,
framerate: Option<f32>,
control: Option<Url>,
}
impl std::fmt::Debug for Stream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
f.debug_struct("Stream")
.field("media", &self.media)
.field("control", &self.control.as_ref().map(Url::as_str))
.field("encoding_name", &self.encoding_name)
.field("rtp_payload_type", &self.rtp_payload_type)
.field("clock_rate", &self.clock_rate_hz)
.field("channels", &self.channels)
.field("framerate", &self.framerate)
.field("depacketizer", &self.depacketizer)
.field("state", &self.state)
.finish()
}
}
impl Stream {
#[inline]
pub fn media(&self) -> &str {
&self.media
}
#[inline]
pub fn encoding_name(&self) -> &str {
&self.encoding_name
}
#[inline]
pub fn rtp_payload_type(&self) -> u8 {
self.rtp_payload_type
}
#[inline]
pub fn clock_rate_hz(&self) -> u32 {
self.clock_rate_hz
}
#[inline]
pub fn channels(&self) -> Option<NonZeroU16> {
self.channels
}
#[inline]
pub fn framerate(&self) -> Option<f32> {
self.framerate
}
#[inline]
pub fn control(&self) -> Option<&Url> {
self.control.as_ref()
}
}
struct UdpSockets {
rtp: UdpSocket,
rtcp: UdpSocket,
}
impl std::fmt::Debug for UdpSockets {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UdpSockets").finish()
}
}
impl Stream {
pub fn parameters(&self) -> Option<crate::codec::ParametersRef<'_>> {
self.depacketizer.as_ref().ok().and_then(|d| d.parameters())
}
pub fn ctx(&self) -> Option<&StreamContext> {
match &self.state {
StreamState::Uninit => None,
StreamState::Init(init) => Some(&init.ctx),
StreamState::Playing { ctx, .. } => Some(ctx),
}
}
}
#[derive(Debug)]
enum StreamState {
Uninit,
Init(StreamStateInit),
Playing {
timeline: Timeline,
rtp_handler: rtp::InorderParser,
ctx: StreamContext,
udp_sockets: Option<UdpSockets>,
},
}
#[derive(Debug)]
struct StreamStateInit {
ssrc: Option<u32>,
initial_seq: Option<u16>,
initial_rtptime: Option<u32>,
ctx: StreamContext,
udp_sockets: Option<UdpSockets>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Credentials {
pub username: String,
pub password: String,
}
#[doc(hidden)]
pub trait State {}
#[doc(hidden)]
pub struct Described {
sdp: Bytes,
}
impl State for Described {}
enum KeepaliveState {
Idle,
Flushing { cseq: u32, method: KeepaliveMethod },
Waiting { cseq: u32, method: KeepaliveMethod },
}
#[repr(u8)]
#[derive(Copy, Clone, Debug)]
enum KeepaliveMethod {
Options,
SetParameter,
GetParameter,
}
impl From<KeepaliveMethod> for msg::Method {
fn from(method: KeepaliveMethod) -> Self {
match method {
KeepaliveMethod::Options => msg::Method::OPTIONS,
KeepaliveMethod::SetParameter => msg::Method::SET_PARAMETER,
KeepaliveMethod::GetParameter => msg::Method::GET_PARAMETER,
}
}
}
#[doc(hidden)]
pub struct Playing(());
impl State for Playing {}
struct RtspConnection {
inner: crate::tokio::Connection,
channels: ChannelMappings,
next_cseq: u32,
seen_unassigned: bool,
}
enum ResponseMode {
Normal,
Play,
Teardown,
}
pub struct Session<S: State>(Pin<Box<SessionInner>>, S);
#[pin_project(PinnedDrop)]
struct SessionInner {
conn: Option<RtspConnection>,
options: SessionOptions,
requested_auth: Option<http_auth::PasswordClient>,
presentation: Presentation,
session: Option<parse::SessionHeader>,
describe_ctx: RtspMessageContext,
describe_cseq: u32,
describe_status: StatusCode,
keepalive_state: KeepaliveState,
keepalive_timer: Option<Pin<Box<tokio::time::Sleep>>>,
flags: u8,
udp_next_poll_i: usize,
}
#[derive(Copy, Clone)]
#[repr(u8)]
enum SessionFlag {
MaybePlaying = 0x1,
TcpStreams = 0x2,
UdpStreams = 0x4,
SetParameterSupported = 0x8,
GetParameterSupported = 0x10,
}
impl RtspConnection {
async fn connect(url: &Url) -> Result<Self, Error> {
let host =
RtspConnection::validate_url(url).map_err(|e| wrap!(ErrorInt::InvalidArgument(e)))?;
let port = url.port().unwrap_or(554);
let inner = crate::tokio::Connection::connect(host, port)
.await
.map_err(|e| wrap!(ErrorInt::ConnectError(e)))?;
Ok(Self {
inner,
channels: ChannelMappings::default(),
next_cseq: 1,
seen_unassigned: false,
})
}
fn validate_url(url: &Url) -> Result<url::Host<&str>, String> {
if url.scheme() != "rtsp" {
return Err(format!(
"Bad URL {}; only scheme rtsp supported",
url.as_str()
));
}
if url.username() != "" || url.password().is_some() {
return Err("URL must not contain credentials".to_owned());
}
url.host()
.ok_or_else(|| format!("Must specify host in rtsp url {}", &url))
}
async fn send(
&mut self,
mode: ResponseMode,
options: &SessionOptions,
tool: Option<&Tool>,
requested_auth: &mut Option<http_auth::PasswordClient>,
req: &mut OwnedMessage,
) -> Result<(RtspMessageContext, u32, msg::Response, Bytes), Error> {
loop {
let cseq = self.fill_req(options, requested_auth, req)?;
self.inner.send(req.clone()).await.map_err(|e| wrap!(e))?;
let method: &str = req.method_str();
let (resp, resp_body, msg_ctx) = loop {
let msg = self.inner.next().await.unwrap_or_else(|| {
bail!(ErrorInt::RtspReadError {
conn_ctx: *self.inner.ctx(),
msg_ctx: self.inner.eof_ctx(),
source: std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("EOF while expecting response to {method} CSeq {cseq}"),
),
})
})?;
let msg_ctx = msg.ctx;
let description = match msg.msg {
msg::Message::Response(r) => {
if let Some(response_cseq) = parse::get_cseq(&r) {
if response_cseq == cseq {
break (r, msg.body, msg_ctx);
}
if matches!(mode, ResponseMode::Teardown) {
debug!("ignoring unrelated response during TEARDOWN");
continue;
}
format!("{} response with CSeq {}", r.reason_phrase, response_cseq)
} else {
format!("{} response with no/unparseable cseq", r.reason_phrase)
}
}
msg::Message::Data(d) => {
if matches!(mode, ResponseMode::Teardown) {
debug!("ignoring RTSP interleaved data during TEARDOWN");
continue;
} else if let (ResponseMode::Play, Some(m)) =
(&mode, self.channels.lookup(d.channel_id))
{
debug!(
"ignoring interleaved data message on {:?} channel {} while \
waiting for response to {} CSeq {}",
m.channel_type, d.channel_id, method, cseq
);
continue;
}
self.handle_unassigned_data(
msg_ctx,
options,
tool,
d.channel_id,
msg.body,
)?;
continue;
}
msg::Message::Request(r) => format!("{:?} request", r.method),
};
bail!(ErrorInt::RtspFramingError {
conn_ctx: *self.inner.ctx(),
msg_ctx,
description: format!(
"Expected response to {method} CSeq {cseq}, got {description}",
),
});
};
if resp.status_code == StatusCode::UNAUTHORIZED {
if requested_auth.is_some() {
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx,
method: req.method().clone(),
cseq,
status: resp.status_code,
description: "Received Unauthorized after trying digest auth".into(),
})
}
let www_authenticate = match resp.headers.get("WWW-Authenticate") {
None => bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx,
method: req.method().clone(),
cseq,
status: resp.status_code,
description: "Unauthorized without WWW-Authenticate header".into(),
}),
Some(h) => h,
};
if options.creds.is_none() {
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx,
method: req.method().clone(),
cseq,
status: resp.status_code,
description: "Authentication requested and no credentials supplied"
.to_owned(),
})
}
let www_authenticate: &str = www_authenticate;
*requested_auth = match http_auth::PasswordClient::try_from(www_authenticate) {
Ok(c) => Some(c),
Err(e) => bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx,
method: req.method().clone(),
cseq,
status: resp.status_code,
description: format!("Can't understand WWW-Authenticate header: {e}"),
}),
};
continue;
} else if !resp.status_code.is_success() {
bail!(ErrorInt::RtspResponseError {
conn_ctx: *self.inner.ctx(),
msg_ctx,
method: req.method().clone(),
cseq,
status: resp.status_code,
description: "Unexpected RTSP response status".into(),
});
}
return Ok((msg_ctx, cseq, resp, resp_body));
}
}
fn handle_unassigned_data(
&mut self,
msg_ctx: RtspMessageContext,
options: &SessionOptions,
tool: Option<&Tool>,
channel_id: u8,
data: Bytes,
) -> Result<(), Error> {
let live555 = match options.unassigned_channel_data {
UnassignedChannelDataPolicy::Auto
if tool.map(Tool::has_live555_tcp_bug).unwrap_or(false) =>
{
true
}
UnassignedChannelDataPolicy::AssumeStaleSession => true,
UnassignedChannelDataPolicy::Error => false,
UnassignedChannelDataPolicy::Ignore | UnassignedChannelDataPolicy::Auto => {
if !self.seen_unassigned {
log::warn!(
"Ignoring data on unassigned RTSP interleaved data channel {}. \
This is the first such message. Following messages will be logged \
at trace priority only.\n\n\
conn: {}\nmsg: {}\ndata: {:#?}",
channel_id,
self.inner.ctx(),
&msg_ctx,
crate::hex::LimitedHex::new(&data, 128),
);
self.seen_unassigned = true;
} else {
log::trace!(
"Ignoring data on unassigned RTSP interleaved data channel {}.\n\n\
conn: {}\nmsg: {}\ndata: {:#?}",
channel_id,
self.inner.ctx(),
&msg_ctx,
crate::hex::LimitedHex::new(&data, 128),
);
}
return Ok(());
}
};
if live555 {
note_stale_live555_data(tool, options, self.inner.ctx(), channel_id, &msg_ctx);
}
bail!(ErrorInt::RtspUnassignedChannelError {
conn_ctx: *self.inner.ctx(),
msg_ctx,
channel_id,
data,
});
}
fn fill_req(
&mut self,
options: &SessionOptions,
requested_auth: &mut Option<http_auth::PasswordClient>,
req: &mut OwnedMessage,
) -> Result<u32, Error> {
let cseq = self.next_cseq;
self.next_cseq += 1;
if let Some(auth) = requested_auth {
let creds = options
.creds
.as_ref()
.expect("creds were checked when filling request_auth");
let authorization = auth
.respond(&http_auth::PasswordParams {
username: &creds.username,
password: &creds.password,
uri: req.request_uri_str(),
method: req.method_str(),
body: Some(&[]),
})
.map_err(|e| wrap!(ErrorInt::Internal(e.into())))?;
let headers = req.headers_mut();
headers.insert(
msg::HeaderName::AUTHORIZATION,
msg::HeaderValue::try_from(authorization).unwrap(),
);
}
let headers = req.headers_mut();
headers.insert(
msg::HeaderName::CSEQ,
msg::HeaderValue::try_from(cseq.to_string()).unwrap(),
);
let user_agent = if let Some(ref u) = options.user_agent {
u
} else {
DEFAULT_USER_AGENT
};
headers.insert(
msg::HeaderName::USER_AGENT,
msg::HeaderValue::try_from(user_agent.to_string()).unwrap(),
);
Ok(cseq)
}
}
const DEFAULT_USER_AGENT: &str = concat!("retina_", env!("CARGO_PKG_VERSION"));
impl<S: State> Session<S> {
pub fn streams(&self) -> &[Stream] {
&self.0.presentation.streams
}
pub fn tool(&self) -> Option<&Tool> {
self.0.presentation.tool.as_ref()
}
}
impl Session<Described> {
pub async fn describe(url: Url, options: SessionOptions) -> Result<Self, Error> {
let conn = RtspConnection::connect(&url).await?;
Self::describe_with_conn(conn, options, url).await
}
async fn describe_with_conn(
mut conn: RtspConnection,
options: SessionOptions,
url: Url,
) -> Result<Self, Error> {
let mut req = OwnedMessage::Request {
head: msg::Request {
method: msg::Method::DESCRIBE,
request_uri: Some(url.clone()),
headers: [(
msg::HeaderName::ACCEPT,
msg::HeaderValue::try_from("application/sdp").unwrap(),
)]
.into(),
},
body: Bytes::new(),
};
let mut requested_auth = None;
let (msg_ctx, cseq, response, resp_body) = conn
.send(
ResponseMode::Normal,
&options,
None, &mut requested_auth,
&mut req,
)
.await?;
let presentation =
parse::parse_describe(url, &response, &resp_body).map_err(|description| {
wrap!(ErrorInt::RtspResponseError {
conn_ctx: *conn.inner.ctx(),
msg_ctx,
method: msg::Method::DESCRIBE,
cseq,
status: response.status_code,
description,
})
})?;
let describe_status = response.status_code;
let sdp = resp_body;
Ok(Session(
Box::pin(SessionInner {
conn: Some(conn),
options,
requested_auth,
presentation,
session: None,
describe_ctx: msg_ctx,
describe_cseq: cseq,
describe_status,
keepalive_state: KeepaliveState::Idle,
keepalive_timer: None,
flags: 0,
udp_next_poll_i: 0,
}),
Described { sdp },
))
}
pub fn sdp(&self) -> &[u8] {
&self.1.sdp
}
pub async fn setup(&mut self, stream_i: usize, options: SetupOptions) -> Result<(), Error> {
let inner = &mut self.0.as_mut().project();
let conn = inner
.conn
.as_mut()
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?;
let stream = &mut inner.presentation.streams[stream_i];
if !matches!(stream.state, StreamState::Uninit) {
bail!(ErrorInt::FailedPrecondition("stream already set up".into()));
}
let url = stream
.control
.as_ref()
.unwrap_or(&inner.presentation.control)
.clone();
let mut headers = msg::Headers::default();
let udp = match options.transport {
Transport::Tcp(_) => {
let proposed_channel_id = conn.channels.next_unassigned().ok_or_else(|| {
wrap!(ErrorInt::FailedPrecondition(
"no unassigned channels".into()
))
})?;
headers.insert(
msg::HeaderName::TRANSPORT,
msg::HeaderValue::try_from(format!(
"RTP/AVP/TCP;unicast;interleaved={}-{}",
proposed_channel_id,
proposed_channel_id + 1
))
.unwrap(),
);
*inner.flags |= SessionFlag::TcpStreams as u8;
None
}
Transport::Udp(_) => {
let local_ip = conn.inner.ctx().local_addr.ip();
let pair = crate::tokio::UdpPair::for_ip(local_ip)
.map_err(|e| wrap!(ErrorInt::Internal(e.into())))?;
headers.insert(
msg::HeaderName::TRANSPORT,
msg::HeaderValue::try_from(format!(
"RTP/AVP/UDP;unicast;client_port={}-{}",
pair.rtp_port,
pair.rtp_port + 1,
))
.unwrap(),
);
*inner.flags |= SessionFlag::UdpStreams as u8;
Some((
UdpStreamContext {
local_ip,
peer_ip: IpAddr::V4(Ipv4Addr::UNSPECIFIED),
local_rtp_port: pair.rtp_port,
peer_rtp_port: 0,
},
UdpSockets {
rtp: pair.rtp_socket,
rtcp: pair.rtcp_socket,
},
))
}
};
if let &mut Some(ref s) = inner.session {
headers.insert(
msg::HeaderName::SESSION,
msg::HeaderValue::try_from(s.id.to_string()).unwrap(),
);
}
let mut req = OwnedMessage::Request {
head: msg::Request {
method: msg::Method::SETUP,
request_uri: Some(url),
headers,
},
body: Bytes::new(),
};
let (msg_ctx, cseq, response, _resp_body) = conn
.send(
ResponseMode::Normal,
inner.options,
inner.presentation.tool.as_ref(),
inner.requested_auth,
&mut req,
)
.await?;
debug!("SETUP response: {:#?}", &response);
let conn_ctx = conn.inner.ctx();
let status = response.status_code;
let response = parse::parse_setup(&response).map_err(|description| {
wrap!(ErrorInt::RtspResponseError {
conn_ctx: *conn_ctx,
msg_ctx,
method: msg::Method::SETUP,
cseq,
status,
description,
})
})?;
match inner.session.as_ref() {
Some(SessionHeader { id, .. }) if id.as_ref() != &*response.session.id => {
match inner.options.session_id {
SessionIdPolicy::UseFirst => (),
_ => {
bail!(ErrorInt::RtspResponseError {
conn_ctx: *conn.inner.ctx(),
msg_ctx,
method: msg::Method::SETUP,
cseq,
status,
description: format!(
"session id changed from {:?} to {:?}",
id, response.session.id,
),
});
}
}
}
Some(_) => {}
None => {
debug!(
"established session {:?}, timeout={}s",
response.session.id, response.session.timeout_sec
);
*inner.session = Some(response.session)
}
};
let conn_ctx = conn.inner.ctx();
let (stream_ctx, udp_sockets);
match udp {
None => {
let channel_id = match response.channel_id {
Some(id) => id,
None => bail!(ErrorInt::RtspResponseError {
conn_ctx: *conn.inner.ctx(),
msg_ctx,
method: msg::Method::SETUP,
cseq,
status,
description: "Transport header is missing interleaved parameter".to_owned(),
}),
};
conn.channels
.assign(channel_id, stream_i)
.map_err(|description| {
wrap!(ErrorInt::RtspResponseError {
conn_ctx: *conn_ctx,
msg_ctx,
method: msg::Method::SETUP,
cseq,
status,
description,
})
})?;
stream_ctx = StreamContext(StreamContextInner::Tcp(TcpStreamContext {
rtp_channel_id: channel_id,
}));
udp_sockets = None;
}
Some((mut ctx, sockets)) => {
let source = match response.source {
Some(s) => s,
None => conn.inner.ctx().peer_addr.ip(),
};
let server_port = response.server_port.ok_or_else(|| {
wrap!(ErrorInt::RtspResponseError {
conn_ctx: *conn_ctx,
msg_ctx,
method: msg::Method::SETUP,
cseq,
status,
description: "Transport header is missing server_port parameter".to_owned(),
})
})?;
ctx.peer_ip = source;
ctx.peer_rtp_port = server_port;
sockets
.rtp
.connect(SocketAddr::new(source, server_port))
.await
.map_err(|e| wrap!(ErrorInt::ConnectError(e)))?;
sockets
.rtcp
.connect(SocketAddr::new(source, server_port + 1))
.await
.map_err(|e| wrap!(ErrorInt::ConnectError(e)))?;
punch_firewall_hole(&sockets)
.await
.map_err(|e| wrap!(ErrorInt::ConnectError(e)))?;
stream_ctx = StreamContext(StreamContextInner::Udp(ctx));
udp_sockets = Some(sockets);
}
};
if let Some(format) = options.frame_format
&& let Ok(d) = &mut stream.depacketizer
{
d.set_frame_format(format);
}
stream.state = StreamState::Init(StreamStateInit {
ssrc: response.ssrc,
initial_seq: None,
initial_rtptime: None,
ctx: stream_ctx,
udp_sockets,
});
Ok(())
}
pub async fn play(mut self, policy: PlayOptions) -> Result<Session<Playing>, Error> {
let inner = self.0.as_mut().project();
let conn = inner
.conn
.as_mut()
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?;
let session = inner.session.as_ref().ok_or_else(|| {
wrap!(ErrorInt::FailedPrecondition(
"must SETUP before PLAY".into()
))
})?;
if let Some(ref t) = inner.presentation.tool
&& (*inner.flags & (SessionFlag::TcpStreams as u8)) != 0
&& t.has_live555_tcp_bug()
{
warn!(
"Connecting via TCP to known-broken RTSP server {:?}. \
See <https://github.com/scottlamb/retina/issues/17>. \
Consider using UDP instead!",
t
);
}
trace!("PLAY with channel mappings: {:#?}", &conn.channels);
*inner.flags |= SessionFlag::MaybePlaying as u8;
let (msg_ctx, cseq, response, _resp_body) = conn
.send(
ResponseMode::Play,
inner.options,
inner.presentation.tool.as_ref(),
inner.requested_auth,
&mut OwnedMessage::Request {
head: msg::Request {
method: msg::Method::PLAY,
request_uri: Some(inner.presentation.control.clone()),
headers: [
(
msg::HeaderName::SESSION,
msg::HeaderValue::try_from(&*session.id).unwrap(),
),
(
msg::HeaderName::RANGE,
msg::HeaderValue::try_from("npt=0.000-").unwrap(),
),
]
.into(),
},
body: Bytes::new(),
},
)
.await?;
parse::parse_play(&response, inner.presentation).map_err(|description| {
wrap!(ErrorInt::RtspResponseError {
conn_ctx: *conn.inner.ctx(),
msg_ctx,
method: msg::Method::PLAY,
cseq,
status: response.status_code,
description,
})
})?;
let setup_streams = inner
.presentation
.streams
.iter()
.filter(|s| matches!(s.state, StreamState::Init(_)))
.count();
let all_have_time = inner.presentation.streams.iter().all(|s| match s.state {
StreamState::Init(StreamStateInit {
initial_rtptime, ..
}) => initial_rtptime.is_some(),
_ => true,
});
for (i, s) in inner.presentation.streams.iter_mut().enumerate() {
match std::mem::replace(&mut s.state, StreamState::Uninit) {
StreamState::Init(StreamStateInit {
initial_rtptime,
initial_seq,
ssrc,
ctx,
udp_sockets,
}) => {
let initial_rtptime = match policy.initial_timestamp {
InitialTimestampPolicy::Require | InitialTimestampPolicy::Default
if setup_streams > 1 =>
{
if initial_rtptime.is_none() {
bail!(ErrorInt::RtspResponseError {
conn_ctx: *conn.inner.ctx(),
msg_ctx,
method: msg::Method::PLAY,
cseq,
status: response.status_code,
description: format!(
"Expected rtptime on PLAY with mode {:?}, missing on \
stream {} ({:?}). Consider setting initial timestamp \
mode permissive.",
policy.initial_timestamp, i, &s.control
),
});
}
initial_rtptime
}
InitialTimestampPolicy::Permissive
if setup_streams > 1 && all_have_time =>
{
initial_rtptime
}
_ => None,
};
let initial_seq = match (initial_seq, policy.initial_seq) {
(Some(seq), InitialSequenceNumberPolicy::Ignore)
| (
Some(seq @ 0 | seq @ 1),
InitialSequenceNumberPolicy::Default
| InitialSequenceNumberPolicy::IgnoreSuspiciousValues,
) => {
log::info!(
"ignoring PLAY seq={} on stream {} due to policy {:?}",
seq,
i,
policy.initial_seq
);
None
}
(Some(seq), _) => {
log::debug!("setting PLAY seq={} on stream {}", seq, i);
Some(seq)
}
(None, _) => {
log::debug!("no PLAY seq on stream {}", i);
None
}
};
let conn_ctx = conn.inner.ctx();
s.state = StreamState::Playing {
timeline: Timeline::new(
initial_rtptime,
s.clock_rate_hz,
policy.enforce_timestamps_with_max_jump_secs,
)
.map_err(|description| {
wrap!(ErrorInt::RtspResponseError {
conn_ctx: *conn_ctx,
msg_ctx,
method: msg::Method::PLAY,
cseq,
status: response.status_code,
description,
})
})?,
rtp_handler: rtp::InorderParser::new(
ssrc,
initial_seq,
policy.unknown_rtcp_ssrc,
),
ctx,
udp_sockets,
};
}
StreamState::Uninit => {}
StreamState::Playing { .. } => unreachable!(),
};
}
*inner.keepalive_timer = Some(Box::pin(tokio::time::sleep(keepalive_interval(session))));
Ok(Session(self.0, Playing(())))
}
}
fn note_stale_live555_data(
tool: Option<&Tool>,
options: &SessionOptions,
conn_ctx: &crate::ConnectionContext,
channel_id: u8,
msg_ctx: &RtspMessageContext,
) {
let known_to_have_live555_tcp_bug = tool.map(Tool::has_live555_tcp_bug).unwrap_or(false);
if !known_to_have_live555_tcp_bug {
log::warn!(
"saw unexpected RTSP packet. This is presumed to be due to a bug in old
live555 servers' TCP handling, though tool attribute {tool:?} does not refer to a \
known-buggy version. Consider switching to UDP.\n\n\
conn: {conn_ctx:?}\n\
channel: {channel_id}\n\
msg: {msg_ctx:?}"
);
}
let group = match options.session_group.as_ref() {
Some(g) => g,
None => {
log::debug!("Not tracking stale session because there's no session group.");
return;
}
};
let expires =
tokio::time::Instant::now() + std::time::Duration::from_secs(LIVE555_EXPIRATION_SEC);
let seqnum;
{
let mut lock = group.sessions.lock().unwrap();
for s in &lock.sessions {
if s.has_tcp {
log::debug!(
"Unexpected RTSP interleaved packet (live555 stale file \
descriptor bug) plausibly explained by known stale \
session {:?}/{}. Not adding a session entry.",
group.debug_id(),
s.seqnum,
);
return;
}
}
seqnum = lock.next_seqnum;
lock.next_seqnum += 1;
log::debug!(
"Unexpected RTSP interleaved packet, presumed due to live555 stale \
file descriptor bug; adding stale session {:?}/{} that will \
expire in {} seconds.",
group.debug_id(),
seqnum,
LIVE555_EXPIRATION_SEC,
);
lock.sessions.push(StaleSession {
seqnum,
expires,
teardown_rx: None,
has_tcp: true,
maybe_playing: true,
});
}
let group = group.clone();
tokio::spawn(async move {
tokio::time::sleep_until(expires).await;
if !group.try_remove_seqnum(seqnum) {
log::warn!(
"Unable to find stale live555 file descriptor session {:?}/{} at expiration",
group.debug_id(),
seqnum
);
} else {
log::debug!(
"Stale live555 file descriptor bug session {:?}/{} presumed expired.",
group.debug_id(),
seqnum
);
}
});
}
#[rustfmt::skip]
const HOLE_PUNCH_RTP: [u8; 12] = [
2 << 6, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ];
#[rustfmt::skip]
const HOLE_PUNCH_RTCP: [u8; 8] = [
2 << 6, 201, 0, 1, 0, 0, 0, 0, ];
async fn punch_firewall_hole(sockets: &UdpSockets) -> Result<(), std::io::Error> {
sockets.rtp.send(&HOLE_PUNCH_RTP[..]).await?;
sockets.rtcp.send(&HOLE_PUNCH_RTCP[..]).await?;
Ok(())
}
#[derive(Debug)]
#[non_exhaustive]
pub enum PacketItem {
Rtp(crate::rtp::ReceivedPacket),
Rtcp(crate::rtcp::ReceivedCompoundPacket),
}
impl Session<Playing> {
pub fn demuxed(mut self) -> Result<Demuxed, Error> {
let inner = self.0.as_mut().project();
let conn = inner
.conn
.as_ref()
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?;
for s in &mut *inner.presentation.streams {
if matches!(s.state, StreamState::Playing { .. })
&& let Err(ref description) = s.depacketizer
{
bail!(ErrorInt::RtspResponseError {
conn_ctx: *conn.inner.ctx(),
msg_ctx: *inner.describe_ctx,
method: msg::Method::DESCRIBE,
cseq: *inner.describe_cseq,
status: *inner.describe_status,
description: description.clone(),
});
}
}
Ok(Demuxed {
state: DemuxedState::Waiting,
session: self,
})
}
fn handle_keepalive_timer(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Result<(), Error> {
let inner = self.0.as_mut().project();
let session = inner
.session
.as_ref()
.expect("keepalive_timer can't fire without a session");
let keepalive_interval = keepalive_interval(session);
let conn = inner
.conn
.as_mut()
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?;
match inner.keepalive_state {
KeepaliveState::Flushing { cseq, .. } => bail!(ErrorInt::WriteError {
conn_ctx: *conn.inner.ctx(),
source: std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!("Unable to write keepalive {cseq} within {keepalive_interval:?}",),
),
}),
KeepaliveState::Waiting { cseq, .. } => bail!(ErrorInt::RtspReadError {
conn_ctx: *conn.inner.ctx(),
msg_ctx: conn.inner.eof_ctx(),
source: std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!(
"Server failed to respond to keepalive {cseq} within {keepalive_interval:?}",
),
),
}),
KeepaliveState::Idle => {}
}
if matches!(conn.inner.poll_ready_unpin(cx), Poll::Pending) {
bail!(ErrorInt::Internal(
"Unexpectedly not ready to send keepalive".into()
));
}
let method = if *inner.flags & (SessionFlag::SetParameterSupported as u8) != 0 {
KeepaliveMethod::SetParameter
} else if *inner.flags & (SessionFlag::GetParameterSupported as u8) != 0 {
KeepaliveMethod::GetParameter
} else {
KeepaliveMethod::Options
};
let mut req = OwnedMessage::Request {
head: msg::Request {
method: method.into(),
request_uri: Some(inner.presentation.base_url.clone()),
headers: [(
msg::HeaderName::SESSION,
msg::HeaderValue::try_from(session.id.to_string()).unwrap(),
)]
.into(),
},
body: Bytes::new(),
};
let cseq = conn.fill_req(inner.options, inner.requested_auth, &mut req)?;
trace!("sending {:?} keepalive", method);
conn.inner
.start_send_unpin(req)
.expect("encoding is infallible");
*inner.keepalive_state = match conn.inner.poll_flush_unpin(cx) {
Poll::Ready(Ok(())) => KeepaliveState::Waiting { cseq, method },
Poll::Ready(Err(e)) => bail!(e),
Poll::Pending => KeepaliveState::Flushing { cseq, method },
};
inner
.keepalive_timer
.as_mut()
.expect("keepalive timer set in state Playing")
.as_mut()
.reset(tokio::time::Instant::now() + keepalive_interval);
Ok(())
}
fn handle_response(
mut self: Pin<&mut Self>,
msg_ctx: &crate::RtspMessageContext,
response: msg::Response,
) -> Result<(), Error> {
let inner = self.0.as_mut().project();
match inner.keepalive_state {
KeepaliveState::Waiting { cseq, method }
if parse::get_cseq(&response) == Some(*cseq) =>
{
if !response.status_code.is_success() {
warn!("keepalive failed with {:?}", response.status_code);
} else {
trace!("keepalive succeeded with {:?}", response.status_code);
if matches!(method, KeepaliveMethod::Options) {
match parse::parse_options(&response) {
Ok(r) => {
if r.set_parameter_supported {
*inner.flags |= SessionFlag::SetParameterSupported as u8;
}
if r.get_parameter_supported {
*inner.flags |= SessionFlag::GetParameterSupported as u8;
}
}
Err(e) => {
warn!("Unable to parse OPTIONS response: {}", e);
}
}
}
}
*inner.keepalive_state = KeepaliveState::Idle;
return Ok(());
}
_ => {}
}
bail!(ErrorInt::RtspFramingError {
conn_ctx: *inner
.conn
.as_ref()
.expect("have conn when handling response")
.inner
.ctx(),
msg_ctx: *msg_ctx,
description: format!("Unexpected RTSP response {response:#?}"),
})
}
fn handle_data(
mut self: Pin<&mut Self>,
msg_ctx: &RtspMessageContext,
channel_id: u8,
body: Bytes,
) -> Result<Option<PacketItem>, Error> {
let inner = self.0.as_mut().project();
let conn = inner
.conn
.as_mut()
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?;
let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Tcp { msg_ctx: *msg_ctx });
let m = match conn.channels.lookup(channel_id) {
Some(m) => m,
None => {
conn.handle_unassigned_data(
*msg_ctx,
inner.options,
inner.presentation.tool.as_ref(),
channel_id,
body,
)?;
return Ok(None);
}
};
let stream = &mut inner.presentation.streams[m.stream_i];
let (timeline, rtp_handler, stream_ctx) = match &mut stream.state {
StreamState::Playing {
timeline,
rtp_handler,
ctx,
..
} => (timeline, rtp_handler, ctx),
_ => unreachable!(
"Session<Playing>'s {}->{:?} not in Playing state",
channel_id, m
),
};
match m.channel_type {
ChannelType::Rtp => Ok(rtp_handler.rtp(
inner.options,
stream_ctx,
inner.presentation.tool.as_ref(),
conn.inner.ctx(),
&pkt_ctx,
timeline,
m.stream_i,
body,
)?),
ChannelType::Rtcp => {
match rtp_handler.rtcp(
inner.options,
stream_ctx,
inner.presentation.tool.as_ref(),
conn.inner.ctx(),
&pkt_ctx,
timeline,
m.stream_i,
body,
) {
Ok(p) => Ok(p),
Err(description) => Err(wrap!(ErrorInt::PacketError {
conn_ctx: *conn.inner.ctx(),
stream_ctx: stream.ctx().unwrap().to_owned(),
pkt_ctx,
stream_id: m.stream_i,
description,
})),
}
}
}
}
fn poll_udp_stream(
&mut self,
cx: &mut std::task::Context,
buf: &mut tokio::io::ReadBuf,
i: usize,
) -> Poll<Option<Result<PacketItem, Error>>> {
debug_assert!(buf.filled().is_empty());
let inner = self.0.as_mut().project();
let s = &mut inner.presentation.streams[i];
let (timeline, rtp_handler, stream_ctx, udp_sockets) = match &mut s.state {
StreamState::Playing {
timeline,
rtp_handler,
ctx,
udp_sockets: Some(udp_sockets),
..
} => (timeline, rtp_handler, ctx, udp_sockets),
_ => return Poll::Pending,
};
let conn_ctx = inner
.conn
.as_ref()
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?
.inner
.ctx();
while let Poll::Ready(r) = udp_sockets.rtcp.poll_recv(cx, buf) {
let when = Instant::now();
let when_wall = crate::WallTime::now();
match r {
Ok(()) => {
let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp {
received: when,
received_wall: when_wall,
});
let msg = Bytes::copy_from_slice(buf.filled());
match rtp_handler.rtcp(
inner.options,
stream_ctx,
inner.presentation.tool.as_ref(),
conn_ctx,
&pkt_ctx,
timeline,
i,
msg,
) {
Ok(Some(p)) => return Poll::Ready(Some(Ok(p))),
Ok(None) => buf.clear(),
Err(description) => {
return Poll::Ready(Some(Err(wrap!(ErrorInt::PacketError {
conn_ctx: *conn_ctx,
stream_ctx: stream_ctx.to_owned(),
pkt_ctx,
stream_id: i,
description,
}))));
}
}
}
Err(source) if source.kind() == io::ErrorKind::ConnectionRefused => {
debug!("Ignoring UDP connection refused error");
}
Err(source) => {
return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError {
conn_ctx: *conn_ctx,
stream_ctx: stream_ctx.to_owned(),
when: when_wall,
source,
}))));
}
}
}
while let Poll::Ready(r) = udp_sockets.rtp.poll_recv(cx, buf) {
let when = Instant::now();
let when_wall = crate::WallTime::now();
match r {
Ok(()) => {
let msg = Bytes::copy_from_slice(buf.filled());
let pkt_ctx = crate::PacketContext(crate::PacketContextInner::Udp {
received: when,
received_wall: when_wall,
});
match rtp_handler.rtp(
inner.options,
stream_ctx,
inner.presentation.tool.as_ref(),
conn_ctx,
&pkt_ctx,
timeline,
i,
msg,
) {
Ok(Some(p)) => return Poll::Ready(Some(Ok(p))),
Ok(None) => buf.clear(),
Err(e) => return Poll::Ready(Some(Err(e))),
}
}
Err(source) if source.kind() == io::ErrorKind::ConnectionRefused => {
debug!("Ignoring UDP connection refused error");
}
Err(source) => {
return Poll::Ready(Some(Err(wrap!(ErrorInt::UdpRecvError {
conn_ctx: *conn_ctx,
stream_ctx: stream_ctx.to_owned(),
when: when_wall,
source,
}))));
}
}
}
Poll::Pending
}
fn poll_udp(&mut self, cx: &mut std::task::Context) -> Poll<Option<Result<PacketItem, Error>>> {
let mut buf: [MaybeUninit<u8>; 65_536] = unsafe { MaybeUninit::uninit().assume_init() };
let mut buf = tokio::io::ReadBuf::uninit(&mut buf);
let starting_i = *self.0.as_mut().project().udp_next_poll_i;
loop {
let inner = self.0.as_mut().project();
let i = *inner.udp_next_poll_i;
*inner.udp_next_poll_i += 1;
if *inner.udp_next_poll_i == inner.presentation.streams.len() {
*inner.udp_next_poll_i = 0;
}
if let Poll::Ready(r) = self.poll_udp_stream(cx, &mut buf, i) {
return Poll::Ready(r);
}
if *self.0.as_mut().project().udp_next_poll_i == starting_i {
break;
}
}
Poll::Pending
}
}
#[pin_project::pinned_drop]
impl PinnedDrop for SessionInner {
fn drop(self: Pin<&mut Self>) {
let this = self.project();
let has_tcp = (*this.flags & (SessionFlag::TcpStreams as u8)) != 0;
let just_try_once = match this.options.teardown {
TeardownPolicy::Auto if has_tcp => {
!this
.presentation
.tool
.as_ref()
.map(Tool::has_live555_tcp_bug)
.unwrap_or(false)
}
TeardownPolicy::Auto | TeardownPolicy::Always => false,
TeardownPolicy::Never => return,
};
let session = match this.session.take() {
Some(s) => s,
None => return,
};
let expires = tokio::time::Instant::now()
+ std::time::Duration::from_secs(session.timeout_sec.into());
let (teardown_tx, teardown_rx) = tokio::sync::watch::channel(None);
let seqnum = if let Some(session_group) = this.options.session_group.as_ref() {
let mut lock = session_group.sessions.lock().unwrap();
let seqnum = lock.next_seqnum;
lock.next_seqnum += 1;
lock.sessions.push(StaleSession {
seqnum,
expires,
teardown_rx: Some(teardown_rx),
has_tcp,
maybe_playing: *this.flags & (SessionFlag::MaybePlaying as u8) != 0,
});
log::debug!(
"{:?}/{} tracking TEARDOWN of session id {}",
session_group.debug_id(),
seqnum,
&session.id
);
Some(seqnum)
} else {
None
};
tokio::spawn(teardown::background_teardown(
seqnum,
this.presentation.base_url.clone(),
this.presentation.tool.take(),
session.id,
just_try_once,
std::mem::take(this.options),
this.requested_auth.take(),
this.conn.take(),
teardown_tx,
expires,
));
}
}
impl futures::Stream for Session<Playing> {
type Item = Result<PacketItem, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
loop {
match Pin::new(&mut self.0.conn.as_mut().unwrap().inner).poll_next(cx) {
Poll::Ready(Some(Ok(msg))) => match msg.msg {
msg::Message::Data(d) => {
match self.as_mut().handle_data(&msg.ctx, d.channel_id, msg.body) {
Err(e) => return Poll::Ready(Some(Err(e))),
Ok(Some(pkt)) => return Poll::Ready(Some(Ok(pkt))),
Ok(None) => continue,
};
}
msg::Message::Response(response) => {
if let Err(e) = self.as_mut().handle_response(&msg.ctx, response) {
return Poll::Ready(Some(Err(e)));
}
continue;
}
msg::Message::Request(request) => {
warn!(
"Received RTSP request in Playing state. Responding unimplemented.\n{:#?}",
request
);
}
},
Poll::Ready(Some(Err(e))) => {
debug!("RTSP connection error: {:?}", e);
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
debug!("Server closed RTSP connection");
return Poll::Ready(None);
}
std::task::Poll::Pending => {}
}
if self.0.flags & (SessionFlag::UdpStreams as u8) != 0
&& let Poll::Ready(result) = self.as_mut().poll_udp(cx)
{
return Poll::Ready(result);
}
if let Some(t) = self.0.keepalive_timer.as_mut()
&& matches!(t.as_mut().poll(cx), Poll::Ready(()))
{
self.as_mut().handle_keepalive_timer(cx)?;
}
if let KeepaliveState::Flushing { cseq, method } = self.0.keepalive_state {
match self.0.conn.as_mut().unwrap().inner.poll_flush_unpin(cx) {
Poll::Ready(Ok(())) => {
self.0.keepalive_state = KeepaliveState::Waiting { cseq, method }
}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(Error(Arc::new(e))))),
Poll::Pending => {}
}
}
return Poll::Pending;
}
}
}
enum DemuxedState {
Waiting,
Pulling(usize),
Fused,
}
pub struct Demuxed {
state: DemuxedState,
session: Session<Playing>,
}
impl Demuxed {
pub fn tool(&self) -> Option<&Tool> {
self.session.tool()
}
pub fn streams(&self) -> &[Stream] {
self.session.streams()
}
}
impl futures::Stream for Demuxed {
type Item = Result<CodecItem, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
loop {
let (stream_id, pkt) = match self.state {
DemuxedState::Waiting => match ready!(Pin::new(&mut self.session).poll_next(cx)) {
Some(Ok(PacketItem::Rtp(p))) => (p.stream_id(), Some(p)),
Some(Ok(PacketItem::Rtcp(p))) => {
return Poll::Ready(Some(Ok(CodecItem::Rtcp(p))));
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => return Poll::Ready(None),
},
DemuxedState::Pulling(stream_id) => (stream_id, None),
DemuxedState::Fused => return Poll::Ready(None),
};
let inner = self.session.0.as_mut().project();
let stream = &mut inner.presentation.streams[stream_id];
let stream_ctx = match stream.state {
StreamState::Playing { ref ctx, .. } => ctx,
_ => unreachable!(),
};
let depacketizer = match &mut stream.depacketizer {
Ok(d) => d,
Err(_) => unreachable!("depacketizer was Ok"),
};
let conn_ctx = inner
.conn
.as_ref()
.ok_or_else(|| wrap!(ErrorInt::FailedPrecondition("no connection".into())))?
.inner
.ctx();
if let Some(p) = pkt {
let pkt_ctx = *p.ctx();
let stream_id = p.stream_id();
let ssrc = p.ssrc();
let sequence_number = p.sequence_number();
depacketizer.push(p).map_err(|description| {
wrap!(ErrorInt::RtpPacketError {
conn_ctx: *conn_ctx,
stream_ctx: stream_ctx.to_owned(),
pkt_ctx,
stream_id,
ssrc,
sequence_number,
description,
})
})?;
}
match depacketizer.pull() {
Some(Ok(item)) => {
self.state = DemuxedState::Pulling(stream_id);
return Poll::Ready(Some(Ok(item)));
}
None => {
self.state = DemuxedState::Waiting;
continue;
}
Some(Err(e)) => {
let conn_ctx = *conn_ctx;
let stream_ctx = *stream_ctx;
self.state = DemuxedState::Fused;
return Poll::Ready(Some(Err(Error(Arc::new(ErrorInt::RtpPacketError {
conn_ctx,
stream_ctx,
pkt_ctx: e.pkt_ctx,
stream_id,
ssrc: e.ssrc,
sequence_number: e.sequence_number,
description: e.description,
})))));
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testutil::{init_logging, response};
async fn socketpair() -> (tokio::net::TcpStream, tokio::net::TcpStream) {
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let client = tokio::net::TcpStream::connect(addr);
let server = listener.accept();
(client.await.unwrap(), server.await.unwrap().0)
}
async fn connect_to_mock() -> (RtspConnection, crate::tokio::Connection) {
let (client, server) = socketpair().await;
let client = crate::tokio::Connection::from_stream(client).unwrap();
let server = crate::tokio::Connection::from_stream(server).unwrap();
let client = RtspConnection {
inner: client,
channels: ChannelMappings::default(),
next_cseq: 1,
seen_unassigned: false,
};
(client, server)
}
async fn req_response(
server: &mut crate::tokio::Connection,
expected_method: msg::Method,
(mut response, resp_body): (msg::Response, Bytes),
) {
let msg = server.next().await.unwrap().unwrap();
let cseq = match msg.msg {
msg::Message::Request(ref r) => {
assert_eq!(r.method, expected_method);
r.headers.get("CSeq").unwrap().to_string()
}
_ => panic!(),
};
response.headers.insert(
msg::HeaderName::CSEQ,
msg::HeaderValue::try_from(cseq).unwrap(),
);
server
.send(OwnedMessage::Response {
head: response,
body: resp_body,
})
.await
.unwrap();
}
#[tokio::test(start_paused = true)]
async fn simple() {
init_logging();
let (conn, mut server) = connect_to_mock().await;
let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap();
let group = Arc::new(SessionGroup::default());
let (session, _) = tokio::join!(
Session::describe_with_conn(
conn,
SessionOptions::default()
.session_group(group.clone())
.unassigned_channel_data(UnassignedChannelDataPolicy::Ignore),
url
),
req_response(
&mut server,
msg::Method::DESCRIBE,
response(include_bytes!("testdata/reolink_describe.txt"))
),
);
let mut session = session.unwrap();
assert_eq!(session.streams().len(), 2);
tokio::join!(
async {
session.setup(0, SetupOptions::default()).await.unwrap();
},
req_response(
&mut server,
msg::Method::SETUP,
response(include_bytes!("testdata/reolink_setup.txt"))
),
);
let (session, _) = tokio::join!(
session.play(PlayOptions::default()),
req_response(
&mut server,
msg::Method::PLAY,
response(include_bytes!("testdata/reolink_play.txt"))
),
);
{
let session = session.unwrap();
tokio::pin!(session);
session.0.keepalive_timer = None;
tokio::join!(
async {
match session.next().await {
Some(Ok(PacketItem::Rtp(p))) => {
assert_eq!(p.ssrc(), 0xdcc4a0d8);
assert_eq!(p.sequence_number(), 0x41d4);
assert_eq!(p.payload(), b"hello world");
}
o => panic!("unexpected item: {o:#?}"),
}
},
async {
let bad_pkt = b"data on unassigned channel";
server
.send(OwnedMessage::Data {
channel_id: 2,
body: Bytes::from_static(bad_pkt),
})
.await
.unwrap();
let good_pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world";
server
.send(OwnedMessage::Data {
channel_id: 0,
body: Bytes::from_static(good_pkt),
})
.await
.unwrap();
},
);
};
let stale_sessions = group.stale_sessions();
assert_eq!(stale_sessions.num_sessions, 1);
tokio::time::resume();
tokio::join!(
group.await_stale_sessions(&stale_sessions),
req_response(
&mut server,
msg::Method::TEARDOWN,
response(include_bytes!("testdata/reolink_teardown.txt"))
),
);
tokio::time::pause();
assert_eq!(group.stale_sessions().num_sessions, 0);
}
#[tokio::test(start_paused = true)]
async fn session_expiration() {
init_logging();
let (conn, mut server) = connect_to_mock().await;
let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap();
let group = Arc::new(SessionGroup::default());
let (session, _) = tokio::join!(
Session::describe_with_conn(
conn,
SessionOptions::default().session_group(group.clone()),
url
),
req_response(
&mut server,
msg::Method::DESCRIBE,
response(include_bytes!("testdata/reolink_describe.txt"))
),
);
let mut session = session.unwrap();
assert_eq!(session.streams().len(), 2);
tokio::join!(
async {
session.setup(0, SetupOptions::default()).await.unwrap();
},
req_response(
&mut server,
msg::Method::SETUP,
response(include_bytes!("testdata/reolink_setup.txt"))
),
);
let (session, _) = tokio::join!(
session.play(PlayOptions::default()),
req_response(
&mut server,
msg::Method::PLAY,
response(include_bytes!("testdata/reolink_play.txt"))
),
);
let drop_time;
{
let session = session.unwrap();
tokio::pin!(session);
tokio::join!(
async {
match session.next().await {
Some(Ok(PacketItem::Rtp(p))) => {
assert_eq!(p.ssrc(), 0xdcc4a0d8);
assert_eq!(p.sequence_number(), 0x41d4);
assert_eq!(p.payload(), b"hello world");
}
o => panic!("unexpected item: {o:#?}"),
}
},
async {
let pkt = b"\x80\x60\x41\xd4\x00\x00\x00\x00\xdc\xc4\xa0\xd8hello world";
server
.send(OwnedMessage::Data {
channel_id: 0,
body: Bytes::from_static(pkt),
})
.await
.unwrap();
},
);
drop_time = tokio::time::Instant::now();
}
server.close().await.unwrap();
let stale_sessions = group.stale_sessions();
assert_eq!(stale_sessions.num_sessions, 1);
group.await_stale_sessions(&stale_sessions).await;
assert_eq!(group.stale_sessions().num_sessions, 0);
let elapsed = tokio::time::Instant::now() - drop_time;
assert!(
elapsed >= std::time::Duration::from_secs(60),
"elapsed={elapsed:?}"
);
}
#[tokio::test(start_paused = true)]
async fn stale_file_descriptor_session() {
init_logging();
let (conn, mut server) = connect_to_mock().await;
let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap();
let group = Arc::new(SessionGroup::default());
let bogus_rtp = OwnedMessage::Data {
channel_id: 0,
body: Bytes::from_static(b"bogus pkt"),
};
let start = tokio::time::Instant::now();
tokio::join!(
async {
let e = Session::describe_with_conn(
conn,
SessionOptions::default()
.session_group(group.clone())
.unassigned_channel_data(UnassignedChannelDataPolicy::AssumeStaleSession),
url,
)
.await
.map(|_s| ())
.unwrap_err();
assert!(matches!(*e.0, ErrorInt::RtspUnassignedChannelError { .. }));
},
async { server.send(bogus_rtp).await.unwrap() },
);
let stale_sessions = group.stale_sessions();
assert_eq!(stale_sessions.num_sessions, 1);
group.await_stale_sessions(&stale_sessions).await;
let elapsed = tokio::time::Instant::now() - start;
assert_eq!(group.stale_sessions().num_sessions, 0);
assert!(
elapsed >= std::time::Duration::from_secs(LIVE555_EXPIRATION_SEC),
"elapsed={elapsed:?}"
);
}
#[tokio::test]
async fn ignore_early_rtp_rtcp() {
init_logging();
let (conn, mut server) = connect_to_mock().await;
let url = Url::parse("rtsp://192.168.5.206:554/h264Preview_01_main").unwrap();
let bogus_rtp = OwnedMessage::Data {
channel_id: 0,
body: Bytes::from_static(b"bogus pkt"),
};
let bogus_rtcp = OwnedMessage::Data {
channel_id: 1,
body: Bytes::from_static(b"bogus pkt"),
};
let (session, _) = tokio::join!(
Session::describe_with_conn(conn, SessionOptions::default(), url),
async {
req_response(
&mut server,
msg::Method::DESCRIBE,
response(include_bytes!("testdata/reolink_describe.txt")),
)
.await;
},
);
let mut session = session.unwrap();
assert_eq!(session.streams().len(), 2);
tokio::join!(
async {
session.setup(0, SetupOptions::default()).await.unwrap();
},
req_response(
&mut server,
msg::Method::SETUP,
response(include_bytes!("testdata/reolink_setup.txt"))
),
);
let (session, _) = tokio::join!(session.play(PlayOptions::default()), async move {
server.send(bogus_rtp).await.unwrap();
server.send(bogus_rtcp).await.unwrap();
req_response(
&mut server,
msg::Method::PLAY,
response(include_bytes!("testdata/reolink_play.txt")),
)
.await
},);
let _session = session.unwrap();
}
#[tokio::test]
async fn reject_session_id_change() {
session_id_change(SessionIdPolicy::RequireMatch, true).await
}
#[tokio::test]
async fn ignore_session_id_change() {
session_id_change(SessionIdPolicy::UseFirst, false).await
}
async fn session_id_change(policy: SessionIdPolicy, expect_error: bool) {
init_logging();
let (conn, mut server) = connect_to_mock().await;
let url = Url::parse("rtsp://127.0.0.1:554/camera").unwrap();
let group = Arc::new(SessionGroup::default());
let (session, _) = tokio::join!(
Session::describe_with_conn(
conn,
SessionOptions::default()
.session_group(group.clone())
.session_id(policy),
url
),
req_response(
&mut server,
msg::Method::DESCRIBE,
response(include_bytes!("testdata/h264dvr_describe.txt"))
),
);
let mut session = session.unwrap();
assert_eq!(session.streams().len(), 2);
tokio::join!(
async {
session
.setup(
0,
SetupOptions::default()
.transport(Transport::Udp(UdpTransportOptions::default())),
)
.await
.unwrap();
},
req_response(
&mut server,
msg::Method::SETUP,
response(include_bytes!("testdata/h264dvr_setup_video.txt"))
),
);
tokio::join!(
async {
let r = session
.setup(
1,
SetupOptions::default()
.transport(Transport::Udp(UdpTransportOptions::default())),
)
.await;
if expect_error {
let e = r.unwrap_err();
assert!(matches!(*e.0, ErrorInt::RtspResponseError { .. }));
} else {
r.unwrap();
}
},
req_response(
&mut server,
msg::Method::SETUP,
response(include_bytes!("testdata/h264dvr_setup_audio.txt"))
),
);
}
#[test]
fn print_sizes() {
init_logging();
for (name, size) in &[
("PacketItem", std::mem::size_of::<PacketItem>()),
("Presentation", std::mem::size_of::<Presentation>()),
("RtspConnection", std::mem::size_of::<RtspConnection>()),
(
"Session",
std::mem::size_of::<Session<Described>>(), ),
("SessionInner", std::mem::size_of::<SessionInner>()),
("SessionOptions", std::mem::size_of::<SessionOptions>()),
("Demuxed", std::mem::size_of::<Demuxed>()),
("Stream", std::mem::size_of::<Stream>()),
] {
log::info!("{name:-40} {size:4}");
}
}
#[test]
fn check_live555_tcp_bug() {
init_logging();
assert!(!Tool::new("not live555").has_live555_tcp_bug());
assert!(!Tool::new("LIVE555 Streaming Media v").has_live555_tcp_bug());
assert!(Tool::new("LIVE555 Streaming Media v2013.04.08").has_live555_tcp_bug());
assert!(!Tool::new("LIVE555 Streaming Media v2017.06.04").has_live555_tcp_bug());
assert!(!Tool::new("LIVE555 Streaming Media v2020.01.01").has_live555_tcp_bug());
}
#[test]
fn await_stale_sessions_is_send() {
fn assert_send<T: Send>(_: T) {}
let group = SessionGroup::default();
let stale_sessions = group.stale_sessions();
assert_send(group.await_stale_sessions(&stale_sessions));
}
#[test]
fn validate_hole_punch_rtp() {
let (pkt_ref, _) = crate::rtp::RawPacket::new(Bytes::from_static(&HOLE_PUNCH_RTP)).unwrap();
assert_eq!(pkt_ref.payload_type(), 0);
}
#[test]
fn validate_hole_punch_rtcp() {
let (pkt_ref, _) = crate::rtcp::PacketRef::parse(&HOLE_PUNCH_RTCP).unwrap();
assert!(matches!(
pkt_ref.as_typed().unwrap(),
Some(crate::rtcp::TypedPacketRef::ReceiverReport(_))
));
}
}