1#![forbid(clippy::print_stderr, clippy::print_stdout)]
10#![allow(clippy::from_str_radix_10)]
12
13use bytes::Bytes;
14use log::trace;
15use rand::Rng;
16use rtsp_types::Message;
17use std::fmt::{Debug, Display};
18use std::net::{IpAddr, SocketAddr, UdpSocket};
19use std::num::NonZeroU32;
20use std::ops::Range;
21
22mod error;
23
24mod hex;
25pub mod rtcp;
26pub mod rtp;
27
28#[cfg(test)]
29mod testutil;
30
31pub use error::Error;
32
33macro_rules! bail {
35 ($e:expr) => {
36 return Err(crate::error::Error(std::sync::Arc::new($e)))
37 };
38}
39
40macro_rules! wrap {
41 ($e:expr) => {
42 crate::error::Error(std::sync::Arc::new($e))
43 };
44}
45
46pub mod client;
47pub mod codec;
48mod tokio;
50
51use error::ErrorInt;
52
53#[derive(Debug)]
55struct ReceivedMessage {
56 ctx: RtspMessageContext,
57 msg: Message<Bytes>,
58}
59
60#[derive(Copy, Clone, PartialEq, Eq)]
95pub struct Timestamp {
96 timestamp: i64,
98
99 clock_rate: NonZeroU32,
101
102 start: u32,
104}
105
106impl Timestamp {
107 #[inline]
109 pub fn new(timestamp: i64, clock_rate: NonZeroU32, start: u32) -> Option<Self> {
110 timestamp.checked_sub(i64::from(start)).map(|_| Timestamp {
111 timestamp,
112 clock_rate,
113 start,
114 })
115 }
116
117 #[inline]
119 pub fn timestamp(&self) -> i64 {
120 self.timestamp
121 }
122
123 #[inline]
125 pub fn start(&self) -> u32 {
126 self.start
127 }
128
129 #[inline]
131 pub fn clock_rate(&self) -> NonZeroU32 {
132 self.clock_rate
133 }
134
135 #[inline]
137 pub fn elapsed(&self) -> i64 {
138 self.timestamp - i64::from(self.start)
139 }
140
141 #[inline]
144 pub fn elapsed_secs(&self) -> f64 {
145 (self.elapsed() as f64) / (self.clock_rate.get() as f64)
146 }
147
148 pub fn try_add(&self, delta: u32) -> Option<Self> {
150 self.timestamp
153 .checked_add(i64::from(delta))
154 .map(|timestamp| Timestamp {
155 timestamp,
156 clock_rate: self.clock_rate,
157 start: self.start,
158 })
159 }
160}
161
162impl Display for Timestamp {
163 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
164 write!(
165 f,
166 "{} (mod-2^32: {}), npt {:.03}",
167 self.timestamp,
168 self.timestamp as u32,
169 self.elapsed_secs()
170 )
171 }
172}
173
174impl Debug for Timestamp {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 Display::fmt(self, f)
177 }
178}
179
180pub const UNIX_EPOCH: NtpTimestamp = NtpTimestamp((2_208_988_800) << 32);
182
183#[derive(Copy, Clone, PartialEq, PartialOrd, Eq, Ord)]
201pub struct NtpTimestamp(pub u64);
202
203impl std::fmt::Display for NtpTimestamp {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 let since_epoch = self.0.wrapping_sub(UNIX_EPOCH.0);
206 let sec_since_epoch = (since_epoch >> 32) as u32;
207 let ns = i32::try_from(((since_epoch & 0xFFFF_FFFF) * 1_000_000_000) >> 32)
208 .expect("should be < 1_000_000_000");
209 let tm = jiff::Timestamp::new(i64::from(sec_since_epoch), ns)
210 .expect("u32 sec should be valid Timestamp");
211 std::fmt::Display::fmt(&tm, f)
212 }
213}
214
215impl std::fmt::Debug for NtpTimestamp {
216 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
217 write!(f, "{} /* {} */", self.0, self)
219 }
220}
221
222#[derive(Copy, Clone, Debug)]
226pub struct WallTime(jiff::Timestamp);
227
228impl WallTime {
229 fn now() -> Self {
230 Self(jiff::Timestamp::now())
231 }
232}
233
234impl Display for WallTime {
235 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
236 std::fmt::Display::fmt(&self.0, f)
237 }
238}
239
240#[derive(Copy, Clone, Debug)]
244pub struct ConnectionContext {
245 local_addr: std::net::SocketAddr,
246 peer_addr: std::net::SocketAddr,
247 established_wall: WallTime,
248}
249
250impl ConnectionContext {
251 #[doc(hidden)]
252 pub fn dummy() -> Self {
253 let addr = SocketAddr::new(IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED), 0);
254 Self {
255 local_addr: addr,
256 peer_addr: addr,
257 established_wall: WallTime::now(),
258 }
259 }
260}
261
262impl Display for ConnectionContext {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 write!(
267 f,
268 "{}(me)->{}@{}",
269 &self.local_addr, &self.peer_addr, &self.established_wall,
270 )
271 }
272}
273
274#[derive(Copy, Clone, Debug)]
279pub struct RtspMessageContext {
280 pos: u64,
283
284 received_wall: WallTime,
288 received: std::time::Instant,
289}
290
291impl RtspMessageContext {
292 #[doc(hidden)]
293 pub fn dummy() -> Self {
294 Self {
295 pos: 0,
296 received_wall: WallTime::now(),
297 received: std::time::Instant::now(),
298 }
299 }
300
301 pub fn received(&self) -> std::time::Instant {
302 self.received
303 }
304
305 pub fn pos(&self) -> u64 {
306 self.pos
307 }
308}
309
310impl Display for RtspMessageContext {
311 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
312 write!(f, "{}@{}", self.pos, &self.received_wall)
313 }
314}
315
316#[derive(Copy, Clone, Debug)]
318pub struct StreamContext(StreamContextInner);
319
320impl StreamContext {
321 #[doc(hidden)]
322 pub fn dummy() -> Self {
323 StreamContext(StreamContextInner::Dummy)
324 }
325}
326
327impl Display for StreamContext {
328 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329 match &self.0 {
330 StreamContextInner::Tcp(tcp) => {
331 write!(
332 f,
333 "TCP, interleaved channel ids {}-{}",
334 tcp.rtp_channel_id,
335 tcp.rtp_channel_id + 1
336 )
337 }
338 StreamContextInner::Udp(udp) => Display::fmt(udp, f),
339 StreamContextInner::Dummy => write!(f, "dummy"),
340 }
341 }
342}
343
344#[derive(Copy, Clone, Debug)]
345enum StreamContextInner {
346 Tcp(TcpStreamContext),
347 Udp(UdpStreamContext),
348 Dummy,
349}
350
351#[doc(hidden)]
356#[derive(Copy, Clone, Debug)]
357pub struct UdpStreamContext {
358 local_ip: IpAddr,
359 peer_ip: IpAddr,
360 local_rtp_port: u16,
361 peer_rtp_port: u16,
362}
363
364#[doc(hidden)]
368#[derive(Copy, Clone, Debug)]
369pub struct TcpStreamContext {
370 rtp_channel_id: u8,
371}
372
373impl Display for UdpStreamContext {
374 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
375 write!(
377 f,
378 "{}:{}-{}(me) -> {}:{}-{}",
379 self.local_ip,
380 self.local_rtp_port,
381 self.local_rtp_port + 1,
382 self.peer_ip,
383 self.peer_rtp_port,
384 self.peer_rtp_port + 1
385 )
386 }
387}
388
389#[derive(Copy, Clone, Debug)]
395pub struct PacketContext(PacketContextInner);
396
397impl PacketContext {
398 #[doc(hidden)]
399 pub fn dummy() -> PacketContext {
400 Self(PacketContextInner::Dummy)
401 }
402}
403
404#[derive(Copy, Clone, Debug)]
405enum PacketContextInner {
406 Tcp { msg_ctx: RtspMessageContext },
407 Udp { received_wall: WallTime },
408 Dummy,
409}
410
411impl Display for PacketContext {
412 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
413 match self.0 {
414 PacketContextInner::Udp { received_wall } => std::fmt::Display::fmt(&received_wall, f),
415 PacketContextInner::Tcp { msg_ctx } => std::fmt::Display::fmt(&msg_ctx, f),
416 PacketContextInner::Dummy => write!(f, "dummy"),
417 }
418 }
419}
420
421pub(crate) fn as_range(buf: &[u8], subset: &[u8]) -> Option<std::ops::Range<usize>> {
424 if subset.is_empty() {
425 return None;
426 }
427 let subset_p = subset.as_ptr() as usize;
428 let buf_p = buf.as_ptr() as usize;
429 let off = match subset_p.checked_sub(buf_p) {
430 Some(off) => off,
431 None => panic!(
432 "{}-byte subset not within {}-byte buf",
433 subset.len(),
434 buf.len()
435 ),
436 };
437 let end = off + subset.len();
438 assert!(end <= buf.len());
439 Some(off..end)
440}
441
442struct UdpPair {
446 rtp_port: u16,
447 rtp_socket: UdpSocket,
448 rtcp_socket: UdpSocket,
449}
450
451impl UdpPair {
452 fn for_ip(ip_addr: IpAddr) -> Result<Self, std::io::Error> {
453 const MAX_TRIES: usize = 10;
454 const ALLOWED_RTP_RANGE: Range<u16> = 5000..65000; let mut rng = rand::thread_rng();
456 for i in 0..MAX_TRIES {
457 let rtp_port = rng.gen_range(ALLOWED_RTP_RANGE) & !0b1;
458 debug_assert!(ALLOWED_RTP_RANGE.contains(&rtp_port));
459 let rtp_addr = SocketAddr::new(ip_addr, rtp_port);
460 let rtp_socket = match UdpSocket::bind(rtp_addr) {
461 Ok(s) => s,
462 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
463 trace!(
464 "Try {}/{}: unable to bind RTP addr {:?}",
465 i, MAX_TRIES, rtp_addr
466 );
467 continue;
468 }
469 Err(e) => return Err(e),
470 };
471 let rtcp_addr = SocketAddr::new(ip_addr, rtp_port + 1);
472 let rtcp_socket = match UdpSocket::bind(rtcp_addr) {
473 Ok(s) => s,
474 Err(e) if e.kind() == std::io::ErrorKind::AddrInUse => {
475 trace!(
476 "Try {}/{}: unable to bind RTCP addr {:?}",
477 i, MAX_TRIES, rtcp_addr
478 );
479 continue;
480 }
481 Err(e) => return Err(e),
482 };
483 return Ok(Self {
484 rtp_port,
485 rtp_socket,
486 rtcp_socket,
487 });
488 }
489 Err(std::io::Error::new(
490 std::io::ErrorKind::AddrInUse,
491 format!(
492 "Unable to find even/odd pair in {}:{}..{} after {} tries",
493 ip_addr, ALLOWED_RTP_RANGE.start, ALLOWED_RTP_RANGE.end, MAX_TRIES
494 ),
495 ))
496 }
497}
498
499fn to_usize<V: Into<u32>>(v: V) -> usize {
502 const {
503 assert!(std::mem::size_of::<u32>() <= std::mem::size_of::<usize>());
504 }
505 v.into() as usize
506}
507fn to_u64(v: usize) -> u64 {
508 const {
509 assert!(std::mem::size_of::<usize>() <= std::mem::size_of::<u64>());
510 }
511 v as u64
512}
513
514#[cfg(test)]
515mod test {
516 use std::net::Ipv4Addr;
517
518 use super::*;
519
520 #[test]
521 fn local_udp_pair() {
522 UdpPair::for_ip(IpAddr::V4(Ipv4Addr::LOCALHOST)).unwrap();
524 }
525}