retina/
lib.rs

1// Copyright (C) 2021 Scott Lamb <slamb@slamb.org>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! High-level RTSP library.
5//!
6//! Currently this is useful for clients; it will be extended to support
7//! servers and proxies.
8
9#![forbid(clippy::print_stderr, clippy::print_stdout)]
10// I prefer to use from_str_radix(..., 10) to explicitly note the base.
11#![allow(clippy::from_str_radix_10)]
12
13use bytes::Bytes;
14use log::trace;
15use rand::Rng;
16use rtsp_types::Message;
17use std::fmt::{Debug, Display};
18use std::net::{IpAddr, SocketAddr, UdpSocket};
19use std::num::NonZeroU32;
20use std::ops::Range;
21
22mod error;
23
24mod hex;
25pub mod rtcp;
26pub mod rtp;
27
28#[cfg(test)]
29mod testutil;
30
31pub use error::Error;
32
33/// Wraps the supplied `ErrorInt` and returns it as an `Err`.
34macro_rules! bail {
35    ($e:expr) => {
36        return Err(crate::error::Error(std::sync::Arc::new($e)))
37    };
38}
39
40macro_rules! wrap {
41    ($e:expr) => {
42        crate::error::Error(std::sync::Arc::new($e))
43    };
44}
45
46pub mod client;
47pub mod codec;
48//mod error;
49mod tokio;
50
51use error::ErrorInt;
52
53/// A received RTSP message.
54#[derive(Debug)]
55struct ReceivedMessage {
56    ctx: RtspMessageContext,
57    msg: Message<Bytes>,
58}
59
60/// An annotated RTP timestamp.
61///
62/// This couples together three pieces of information:
63///
64/// *   The stream's starting time. In client use, this is often as received in the RTSP
65///     `RTP-Info` header but may be controlled via [`crate::client::InitialTimestampPolicy`].
66///     According to [RFC 3550 section 5.1](https://datatracker.ietf.org/doc/html/rfc3550#section-5.1), "the initial
67///     value of the timestamp SHOULD be random".
68///
69/// *   The codec-specific clock rate.
70///
71/// *   The timestamp as an `i64`. In client use, its top bits should be inferred from wraparounds
72///     of 32-bit RTP timestamps. The Retina client's policy is that timestamps that differ by more
73///     than `i32::MAX` from previous timestamps are treated as backwards jumps. It's allowed for
74///     a timestamp to indicate a time *before* the stream's starting point.
75///
76/// In combination, these allow conversion to "normal play time" (NPT): seconds since start of
77/// the stream.
78///
79/// According to [RFC 3550 section 5.1](https://datatracker.ietf.org/doc/html/rfc3550#section-5.1),
80/// RTP timestamps "MUST be derived from a clock that increments monotonically". In practice,
81/// many RTP servers violate this. The Retina client allows such violations unless
82/// [`crate::client::PlayOptions::enforce_timestamps_with_max_jump_secs`] says otherwise.
83///
84/// [`Timestamp`] can't represent timestamps which overflow/underflow `i64` can't be constructed or
85/// elapsed times (`elapsed = timestamp - start`) which underflow `i64`. The client will return
86/// error in these cases. This should rarely cause problems. It'd take ~2^32 packets (~4 billion)
87/// to advance the time this far forward or backward even with a hostile server.
88///
89/// The [`Display`] and [`Debug`] implementations currently display:
90/// *   the bottom 32 bits, as seen in RTP packet headers. This advances at a
91///     codec-specified clock rate.
92/// *   the full timestamp.
93/// *   NPT
94#[derive(Copy, Clone, PartialEq, Eq)]
95pub struct Timestamp {
96    /// A timestamp which must be compared to `start`.
97    timestamp: i64,
98
99    /// The codec-specified clock rate, in Hz. Must be non-zero.
100    clock_rate: NonZeroU32,
101
102    /// The stream's starting time, as specified in the RTSP `RTP-Info` header.
103    start: u32,
104}
105
106impl Timestamp {
107    /// Creates a new timestamp unless `timestamp - start` underflows.
108    #[inline]
109    pub fn new(timestamp: i64, clock_rate: NonZeroU32, start: u32) -> Option<Self> {
110        timestamp.checked_sub(i64::from(start)).map(|_| Timestamp {
111            timestamp,
112            clock_rate,
113            start,
114        })
115    }
116
117    /// Returns time since some arbitrary point before the stream started.
118    #[inline]
119    pub fn timestamp(&self) -> i64 {
120        self.timestamp
121    }
122
123    /// Returns timestamp of the start of the stream.
124    #[inline]
125    pub fn start(&self) -> u32 {
126        self.start
127    }
128
129    /// Returns codec-specified clock rate, in Hz.
130    #[inline]
131    pub fn clock_rate(&self) -> NonZeroU32 {
132        self.clock_rate
133    }
134
135    /// Returns elapsed time since the stream start in clock rate units.
136    #[inline]
137    pub fn elapsed(&self) -> i64 {
138        self.timestamp - i64::from(self.start)
139    }
140
141    /// Returns elapsed time since the stream start in seconds, aka "normal play
142    /// time" (NPT).
143    #[inline]
144    pub fn elapsed_secs(&self) -> f64 {
145        (self.elapsed() as f64) / (self.clock_rate.get() as f64)
146    }
147
148    /// Returns `self + delta` unless it would overflow.
149    pub fn try_add(&self, delta: u32) -> Option<Self> {
150        // Check for `timestamp` overflow only. We don't need to check for
151        // `timestamp - start` underflow because delta is non-negative.
152        self.timestamp
153            .checked_add(i64::from(delta))
154            .map(|timestamp| Timestamp {
155                timestamp,
156                clock_rate: self.clock_rate,
157                start: self.start,
158            })
159    }
160}
161
162impl Display for Timestamp {
163    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164        write!(
165            f,
166            "{} (mod-2^32: {}), npt {:.03}",
167            self.timestamp,
168            self.timestamp as u32,
169            self.elapsed_secs()
170        )
171    }
172}
173
174impl Debug for Timestamp {
175    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176        Display::fmt(self, f)
177    }
178}
179
180/// The Unix epoch as an [`NtpTimestamp`].
181pub const UNIX_EPOCH: NtpTimestamp = NtpTimestamp((2_208_988_800) << 32);
182
183/// A wallclock time represented using the format of the Network Time Protocol.
184///
185/// NTP timestamps are in a fixed-point representation of seconds since
186/// 0h UTC on 1 January 1900. The top 32 bits represent the integer part
187/// (wrapping around every 68 years) and the bottom 32 bits represent the
188/// fractional part.
189///
190/// This is a simple wrapper around a `u64` in that format, with a `Display`
191/// impl that writes the timestamp as a human-readable string. Currently this
192/// assumes the time is within 68 years of 1970; the string will be incorrect
193/// after `2038-01-19T03:14:07Z`.
194///
195/// An `NtpTimestamp` isn't necessarily gathered from a real NTP server.
196/// Reported NTP timestamps are allowed to jump backwards and/or be complete
197/// nonsense.
198///
199/// The NTP timestamp of the Unix epoch is available via the constant [`UNIX_EPOCH`].
200#[derive(Copy, Clone, PartialEq, PartialOrd, Eq, Ord)]
201pub struct NtpTimestamp(pub u64);
202
203impl std::fmt::Display for NtpTimestamp {
204    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205        let since_epoch = self.0.wrapping_sub(UNIX_EPOCH.0);
206        let sec_since_epoch = (since_epoch >> 32) as u32;
207        let ns = i32::try_from(((since_epoch & 0xFFFF_FFFF) * 1_000_000_000) >> 32)
208            .expect("should be < 1_000_000_000");
209        let tm = jiff::Timestamp::new(i64::from(sec_since_epoch), ns)
210            .expect("u32 sec should be valid Timestamp");
211        std::fmt::Display::fmt(&tm, f)
212    }
213}
214
215impl std::fmt::Debug for NtpTimestamp {
216    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217        // Write both the raw and display forms.
218        write!(f, "{} /* {} */", self.0, self)
219    }
220}
221
222/// A wall time taken from the local machine's realtime clock, used in error reporting.
223///
224/// Currently this just allows formatting via `Debug` and `Display`.
225#[derive(Copy, Clone, Debug)]
226pub struct WallTime(jiff::Timestamp);
227
228impl WallTime {
229    fn now() -> Self {
230        Self(jiff::Timestamp::now())
231    }
232}
233
234impl Display for WallTime {
235    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236        std::fmt::Display::fmt(&self.0, f)
237    }
238}
239
240/// RTSP connection context.
241///
242/// This gives enough information to pick out the flow in a packet capture.
243#[derive(Copy, Clone, Debug)]
244pub struct ConnectionContext {
245    local_addr: std::net::SocketAddr,
246    peer_addr: std::net::SocketAddr,
247    established_wall: WallTime,
248}
249
250impl ConnectionContext {
251    #[doc(hidden)]
252    pub fn dummy() -> Self {
253        let addr = SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0);
254        Self {
255            local_addr: addr,
256            peer_addr: addr,
257            established_wall: WallTime::now(),
258        }
259    }
260}
261
262impl Display for ConnectionContext {
263    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264        // TODO: this current hardcodes the assumption we are the client.
265        // Change if/when adding server code.
266        write!(
267            f,
268            "{}(me)->{}@{}",
269            &self.local_addr, &self.peer_addr, &self.established_wall,
270        )
271    }
272}
273
274/// Context of a received message (or read error) within an RTSP connection.
275///
276/// When paired with a [`ConnectionContext`], this should allow picking the
277/// message out of a packet capture.
278#[derive(Copy, Clone, Debug)]
279pub struct RtspMessageContext {
280    /// The starting byte position within the input stream. The bottom 32 bits
281    /// can be compared to the relative TCP sequence number.
282    pos: u64,
283
284    /// Time when the application parsed the message. Caveat: this may not
285    /// closely match the time on a packet capture if the application is
286    /// overloaded (or if `CLOCK_REALTIME` jumps).
287    received_wall: WallTime,
288    received: std::time::Instant,
289}
290
291impl RtspMessageContext {
292    #[doc(hidden)]
293    pub fn dummy() -> Self {
294        Self {
295            pos: 0,
296            received_wall: WallTime::now(),
297            received: std::time::Instant::now(),
298        }
299    }
300
301    pub fn received(&self) -> std::time::Instant {
302        self.received
303    }
304
305    pub fn pos(&self) -> u64 {
306        self.pos
307    }
308}
309
310impl Display for RtspMessageContext {
311    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
312        write!(f, "{}@{}", self.pos, &self.received_wall)
313    }
314}
315
316/// Context for an active stream (RTP+RTCP session), either TCP or UDP. Owned version.
317#[derive(Copy, Clone, Debug)]
318pub struct StreamContext(StreamContextInner);
319
320impl StreamContext {
321    #[doc(hidden)]
322    pub fn dummy() -> Self {
323        StreamContext(StreamContextInner::Dummy)
324    }
325}
326
327impl Display for StreamContext {
328    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329        match &self.0 {
330            StreamContextInner::Tcp(tcp) => {
331                write!(
332                    f,
333                    "TCP, interleaved channel ids {}-{}",
334                    tcp.rtp_channel_id,
335                    tcp.rtp_channel_id + 1
336                )
337            }
338            StreamContextInner::Udp(udp) => Display::fmt(udp, f),
339            StreamContextInner::Dummy => write!(f, "dummy"),
340        }
341    }
342}
343
344#[derive(Copy, Clone, Debug)]
345enum StreamContextInner {
346    Tcp(TcpStreamContext),
347    Udp(UdpStreamContext),
348    Dummy,
349}
350
351/// Context for a UDP stream (aka UDP-based RTP transport). Unstable/internal. Exposed for benchmarks.
352///
353/// This stores only the RTP addresses; the RTCP addresses are assumed to use
354/// the same IP and one port higher.
355#[doc(hidden)]
356#[derive(Copy, Clone, Debug)]
357pub struct UdpStreamContext {
358    local_ip: IpAddr,
359    peer_ip: IpAddr,
360    local_rtp_port: u16,
361    peer_rtp_port: u16,
362}
363
364/// Context for a TCP stream. Unstable/internal. Exposed for benchmarks.
365///
366/// This stores only the RTP channel id; the RTCP channel id is assumed to be one higher.
367#[doc(hidden)]
368#[derive(Copy, Clone, Debug)]
369pub struct TcpStreamContext {
370    rtp_channel_id: u8,
371}
372
373impl Display for UdpStreamContext {
374    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
375        // TODO: this assumes we are the client. Revisit when adding server support.
376        write!(
377            f,
378            "{}:{}-{}(me) -> {}:{}-{}",
379            self.local_ip,
380            self.local_rtp_port,
381            self.local_rtp_port + 1,
382            self.peer_ip,
383            self.peer_rtp_port,
384            self.peer_rtp_port + 1
385        )
386    }
387}
388
389/// Context for an RTP or RTCP packet, received either via RTSP interleaved data or UDP.
390///
391/// Should be paired with an [`ConnectionContext`] of the RTSP connection that started
392/// the session. In the interleaved data case, it's assumed the packet was received over
393/// that same connection.
394#[derive(Copy, Clone, Debug)]
395pub struct PacketContext(PacketContextInner);
396
397impl PacketContext {
398    #[doc(hidden)]
399    pub fn dummy() -> PacketContext {
400        Self(PacketContextInner::Dummy)
401    }
402}
403
404#[derive(Copy, Clone, Debug)]
405enum PacketContextInner {
406    Tcp { msg_ctx: RtspMessageContext },
407    Udp { received_wall: WallTime },
408    Dummy,
409}
410
411impl Display for PacketContext {
412    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
413        match self.0 {
414            PacketContextInner::Udp { received_wall } => std::fmt::Display::fmt(&received_wall, f),
415            PacketContextInner::Tcp { msg_ctx } => std::fmt::Display::fmt(&msg_ctx, f),
416            PacketContextInner::Dummy => write!(f, "dummy"),
417        }
418    }
419}
420
421/// Returns the range within `buf` that represents `subset`.
422/// If `subset` is empty, returns None; otherwise panics if `subset` is not within `buf`.
423pub(crate) fn as_range(buf: &[u8], subset: &[u8]) -> Option<std::ops::Range<usize>> {
424    if subset.is_empty() {
425        return None;
426    }
427    let subset_p = subset.as_ptr() as usize;
428    let buf_p = buf.as_ptr() as usize;
429    let off = match subset_p.checked_sub(buf_p) {
430        Some(off) => off,
431        None => panic!(
432            "{}-byte subset not within {}-byte buf",
433            subset.len(),
434            buf.len()
435        ),
436    };
437    let end = off + subset.len();
438    assert!(end <= buf.len());
439    Some(off..end)
440}
441
442/// A pair of local UDP sockets used for RTP and RTCP transmission.
443///
444/// The RTP port is always even, and the RTCP port is always the following (odd) integer.
445struct UdpPair {
446    rtp_port: u16,
447    rtp_socket: UdpSocket,
448    rtcp_socket: UdpSocket,
449}
450
451impl UdpPair {
452    fn for_ip(ip_addr: IpAddr) -> Result<Self, std::io::Error> {
453        const MAX_TRIES: usize = 10;
454        const ALLOWED_RTP_RANGE: Range<u16> = 5000..65000; // stolen from ffmpeg's defaults.
455        let mut rng = rand::thread_rng();
456        for i in 0..MAX_TRIES {
457            let rtp_port = rng.gen_range(ALLOWED_RTP_RANGE) & !0b1;
458            debug_assert!(ALLOWED_RTP_RANGE.contains(&rtp_port));
459            let rtp_addr = SocketAddr::new(ip_addr, rtp_port);
460            let rtp_socket = match UdpSocket::bind(rtp_addr) {
461                Ok(s) => s,
462                Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
463                    trace!(
464                        "Try {}/{}: unable to bind RTP addr {:?}",
465                        i, MAX_TRIES, rtp_addr
466                    );
467                    continue;
468                }
469                Err(e) => return Err(e),
470            };
471            let rtcp_addr = SocketAddr::new(ip_addr, rtp_port + 1);
472            let rtcp_socket = match UdpSocket::bind(rtcp_addr) {
473                Ok(s) => s,
474                Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
475                    trace!(
476                        "Try {}/{}: unable to bind RTCP addr {:?}",
477                        i, MAX_TRIES, rtcp_addr
478                    );
479                    continue;
480                }
481                Err(e) => return Err(e),
482            };
483            return Ok(Self {
484                rtp_port,
485                rtp_socket,
486                rtcp_socket,
487            });
488        }
489        Err(std::io::Error::new(
490            std::io::ErrorKind::AddrInUse,
491            format!(
492                "Unable to find even/odd pair in {}:{}..{} after {} tries",
493                ip_addr, ALLOWED_RTP_RANGE.start, ALLOWED_RTP_RANGE.end, MAX_TRIES
494            ),
495        ))
496    }
497}
498
499// Let's assume pointers are either 32-bit or 64-bit so we can do the following
500// infallible conversions.
501fn to_usize<V: Into<u32>>(v: V) -> usize {
502    const {
503        assert!(std::mem::size_of::<u32>() <= std::mem::size_of::<usize>());
504    }
505    v.into() as usize
506}
507fn to_u64(v: usize) -> u64 {
508    const {
509        assert!(std::mem::size_of::<usize>() <= std::mem::size_of::<u64>());
510    }
511    v as u64
512}
513
514#[cfg(test)]
515mod test {
516    use std::net::Ipv4Addr;
517
518    use super::*;
519
520    #[test]
521    fn local_udp_pair() {
522        // Just test that it succeeds.
523        UdpPair::for_ip(IpAddr::V4(Ipv4Addr::LOCALHOST)).unwrap();
524    }
525}