#![forbid(clippy::print_stderr, clippy::print_stdout)]
#![allow(clippy::from_str_radix_10)]
use bytes::Bytes;
use log::trace;
use rand::Rng;
use rtsp::msg::Message;
use std::fmt::{Debug, Display};
use std::net::{IpAddr, SocketAddr, UdpSocket};
use std::num::NonZeroU32;
use std::ops::Range;
use std::time::{Instant, SystemTime};
mod error;
mod hex;
mod mostly_ascii;
pub mod rtcp;
pub mod rtp;
#[doc(hidden)]
pub mod testutil;
pub use error::Error;
macro_rules! bail {
($e:expr) => {
return Err(crate::error::Error(std::sync::Arc::new($e)))
};
}
macro_rules! wrap {
($e:expr) => {
crate::error::Error(std::sync::Arc::new($e))
};
}
pub mod client;
pub mod codec;
#[doc(hidden)]
pub mod rtsp;
mod tokio;
use error::ErrorInt;
#[derive(Debug)]
struct ReceivedMessage {
ctx: RtspMessageContext,
msg: Message,
body: Bytes,
}
#[derive(Copy, Clone, PartialEq, Eq)]
pub struct Timestamp {
timestamp: i64,
clock_rate: NonZeroU32,
start: u32,
}
impl Timestamp {
#[inline]
pub fn new(timestamp: i64, clock_rate: NonZeroU32, start: u32) -> Option<Self> {
timestamp.checked_sub(i64::from(start)).map(|_| Timestamp {
timestamp,
clock_rate,
start,
})
}
#[inline]
pub fn timestamp(&self) -> i64 {
self.timestamp
}
#[inline]
pub fn start(&self) -> u32 {
self.start
}
#[inline]
pub fn clock_rate(&self) -> NonZeroU32 {
self.clock_rate
}
#[inline]
pub fn elapsed(&self) -> i64 {
self.timestamp - i64::from(self.start)
}
#[inline]
pub fn elapsed_secs(&self) -> f64 {
(self.elapsed() as f64) / (self.clock_rate.get() as f64)
}
pub fn try_add(&self, delta: u32) -> Option<Self> {
self.timestamp
.checked_add(i64::from(delta))
.map(|timestamp| Timestamp {
timestamp,
clock_rate: self.clock_rate,
start: self.start,
})
}
}
impl Display for Timestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} (mod-2^32: {}), npt {:.03}",
self.timestamp,
self.timestamp as u32,
self.elapsed_secs()
)
}
}
impl Debug for Timestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Display::fmt(self, f)
}
}
pub const UNIX_EPOCH: NtpTimestamp = NtpTimestamp((2_208_988_800) << 32);
#[derive(Copy, Clone, PartialEq, PartialOrd, Eq, Ord)]
pub struct NtpTimestamp(pub u64);
impl std::fmt::Display for NtpTimestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let since_epoch = self.0.wrapping_sub(UNIX_EPOCH.0);
let sec_since_epoch = (since_epoch >> 32) as u32;
let ns = i32::try_from(((since_epoch & 0xFFFF_FFFF) * 1_000_000_000) >> 32)
.expect("should be < 1_000_000_000");
let tm = jiff::Timestamp::new(i64::from(sec_since_epoch), ns)
.expect("u32 sec should be valid Timestamp");
std::fmt::Display::fmt(&tm, f)
}
}
impl std::fmt::Debug for NtpTimestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} /* {} */", self.0, self)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct WallTime(jiff::Timestamp);
impl WallTime {
#[inline]
fn now() -> Self {
Self(jiff::Timestamp::now())
}
}
impl Display for WallTime {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
}
}
impl From<WallTime> for SystemTime {
#[inline]
fn from(wall_time: WallTime) -> Self {
wall_time.0.into()
}
}
#[derive(Copy, Clone, Debug)]
pub struct ConnectionContext {
local_addr: std::net::SocketAddr,
peer_addr: std::net::SocketAddr,
established_wall: WallTime,
}
impl ConnectionContext {
#[doc(hidden)]
pub fn dummy() -> Self {
let addr = SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0);
Self {
local_addr: addr,
peer_addr: addr,
established_wall: WallTime::now(),
}
}
}
impl Display for ConnectionContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}(me)->{}@{}",
&self.local_addr, &self.peer_addr, &self.established_wall,
)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct RtspMessageContext {
pos: u64,
received_wall: WallTime,
received: std::time::Instant,
}
impl RtspMessageContext {
#[doc(hidden)]
pub fn dummy() -> Self {
Self {
pos: 0,
received_wall: WallTime::now(),
received: std::time::Instant::now(),
}
}
pub fn received(&self) -> std::time::Instant {
self.received
}
pub fn pos(&self) -> u64 {
self.pos
}
}
impl Display for RtspMessageContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}@{}", self.pos, &self.received_wall)
}
}
#[derive(Copy, Clone, Debug)]
pub struct StreamContext(StreamContextInner);
impl StreamContext {
#[doc(hidden)]
pub fn dummy() -> Self {
StreamContext(StreamContextInner::Dummy)
}
}
impl Display for StreamContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.0 {
StreamContextInner::Tcp(tcp) => {
write!(
f,
"TCP, interleaved channel ids {}-{}",
tcp.rtp_channel_id,
tcp.rtp_channel_id + 1
)
}
StreamContextInner::Udp(udp) => Display::fmt(udp, f),
StreamContextInner::Dummy => write!(f, "dummy"),
}
}
}
#[derive(Copy, Clone, Debug)]
enum StreamContextInner {
Tcp(TcpStreamContext),
Udp(UdpStreamContext),
Dummy,
}
#[doc(hidden)]
#[derive(Copy, Clone, Debug)]
pub struct UdpStreamContext {
local_ip: IpAddr,
peer_ip: IpAddr,
local_rtp_port: u16,
peer_rtp_port: u16,
}
#[doc(hidden)]
#[derive(Copy, Clone, Debug)]
pub struct TcpStreamContext {
rtp_channel_id: u8,
}
impl Display for UdpStreamContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}:{}-{}(me) -> {}:{}-{}",
self.local_ip,
self.local_rtp_port,
self.local_rtp_port + 1,
self.peer_ip,
self.peer_rtp_port,
self.peer_rtp_port + 1
)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub struct PacketContext(PacketContextInner);
impl PacketContext {
#[inline]
pub fn received(&self) -> Instant {
match self.0 {
PacketContextInner::Tcp { msg_ctx } => msg_ctx.received,
PacketContextInner::Udp { received, .. } => received,
PacketContextInner::Dummy => Instant::now(),
}
}
#[inline]
pub fn received_wall(&self) -> WallTime {
match self.0 {
PacketContextInner::Tcp { msg_ctx } => msg_ctx.received_wall,
PacketContextInner::Udp { received_wall, .. } => received_wall,
PacketContextInner::Dummy => WallTime::now(),
}
}
#[doc(hidden)]
pub fn dummy() -> PacketContext {
Self(PacketContextInner::Dummy)
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum PacketContextInner {
Tcp {
msg_ctx: RtspMessageContext,
},
Udp {
received: Instant,
received_wall: WallTime,
},
Dummy,
}
impl Display for PacketContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.0 {
PacketContextInner::Udp { received_wall, .. } => {
std::fmt::Display::fmt(&received_wall, f)
}
PacketContextInner::Tcp { msg_ctx } => std::fmt::Display::fmt(&msg_ctx, f),
PacketContextInner::Dummy => write!(f, "dummy"),
}
}
}
struct UdpPair {
rtp_port: u16,
rtp_socket: UdpSocket,
rtcp_socket: UdpSocket,
}
impl UdpPair {
fn for_ip(ip_addr: IpAddr) -> Result<Self, std::io::Error> {
const MAX_TRIES: usize = 10;
const ALLOWED_RTP_RANGE: Range<u16> = 5000..65000; let mut rng = rand::thread_rng();
for i in 0..MAX_TRIES {
let rtp_port = rng.gen_range(ALLOWED_RTP_RANGE) & !0b1;
debug_assert!(ALLOWED_RTP_RANGE.contains(&rtp_port));
let rtp_addr = SocketAddr::new(ip_addr, rtp_port);
let rtp_socket = match UdpSocket::bind(rtp_addr) {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
trace!(
"Try {}/{}: unable to bind RTP addr {:?}",
i, MAX_TRIES, rtp_addr
);
continue;
}
Err(e) => return Err(e),
};
let rtcp_addr = SocketAddr::new(ip_addr, rtp_port + 1);
let rtcp_socket = match UdpSocket::bind(rtcp_addr) {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
trace!(
"Try {}/{}: unable to bind RTCP addr {:?}",
i, MAX_TRIES, rtcp_addr
);
continue;
}
Err(e) => return Err(e),
};
return Ok(Self {
rtp_port,
rtp_socket,
rtcp_socket,
});
}
Err(std::io::Error::new(
std::io::ErrorKind::AddrInUse,
format!(
"Unable to find even/odd pair in {}:{}..{} after {} tries",
ip_addr, ALLOWED_RTP_RANGE.start, ALLOWED_RTP_RANGE.end, MAX_TRIES
),
))
}
}
fn to_usize<V: Into<u32>>(v: V) -> usize {
const {
assert!(std::mem::size_of::<u32>() <= std::mem::size_of::<usize>());
}
v.into() as usize
}
fn to_u64(v: usize) -> u64 {
const {
assert!(std::mem::size_of::<usize>() <= std::mem::size_of::<u64>());
}
v as u64
}
#[cfg(test)]
mod test {
use std::net::Ipv4Addr;
use super::*;
#[test]
fn local_udp_pair() {
UdpPair::for_ip(IpAddr::V4(Ipv4Addr::LOCALHOST)).unwrap();
}
}