Skip to main content

retina/client/
timeline.rs

1// Copyright (C) The Retina Authors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::convert::TryFrom;
5use std::num::{NonZeroI32, NonZeroU32};
6
7use crate::Timestamp;
8
9const MAX_FORWARD_TIME_JUMP_SECS: u32 = 10;
10
11/// Creates [Timestamp]s (which don't wrap and can be converted to NPT aka normal play time)
12/// from 32-bit (wrapping) RTP timestamps. Unstable, exposed for benchmark.
13#[doc(hidden)]
14#[derive(Debug)]
15pub struct Timeline {
16    timestamp: i64,
17    clock_rate: NonZeroU32,
18    start: Option<u32>,
19
20    /// The maximum forward jump to allow, in clock rate units.
21    /// If this is absent, don't do any enforcement of sane time units.
22    max_forward_jump: Option<NonZeroI32>,
23
24    /// The same in seconds, for logging.
25    max_forward_jump_secs: u32,
26}
27
28impl Timeline {
29    /// Creates a new timeline, erroring on crazy clock rates.
30    pub fn new(
31        start: Option<u32>,
32        clock_rate: u32,
33        enforce_with_max_forward_jump_secs: Option<NonZeroU32>,
34    ) -> Result<Self, String> {
35        let clock_rate = NonZeroU32::new(clock_rate)
36            .ok_or_else(|| "clock_rate=0 rejected to prevent division by zero".to_string())?;
37        let max_forward_jump = enforce_with_max_forward_jump_secs
38            .map(|j| i32::try_from(u64::from(j.get()) * u64::from(clock_rate.get())))
39            .transpose()
40            .map_err(|_| {
41                format!(
42                    "clock_rate={clock_rate} rejected because max forward jump of {MAX_FORWARD_TIME_JUMP_SECS} sec exceeds i32::MAX"
43                )
44            })?
45            .map(|j| NonZeroI32::new(j).expect("non-zero times non-zero must be non-zero"));
46        Ok(Timeline {
47            timestamp: i64::from(start.unwrap_or(0)),
48            start,
49            clock_rate,
50            max_forward_jump,
51            max_forward_jump_secs: enforce_with_max_forward_jump_secs
52                .map(NonZeroU32::get)
53                .unwrap_or(0),
54        })
55    }
56
57    /// Advances to the given (wrapping) RTP timestamp.
58    ///
59    /// If enforcement was enabled, this produces a monotonically increasing
60    /// [Timestamp], erroring on excessive or backward time jumps.
61    pub fn advance_to(&mut self, rtp_timestamp: u32) -> Result<Timestamp, String> {
62        let (timestamp, delta) = self.ts_and_delta(rtp_timestamp)?;
63        if matches!(self.max_forward_jump, Some(j) if !(0..j.get()).contains(&delta)) {
64            return Err(format!(
65                "Timestamp jumped {} ({:.03} sec) from {} to {}; \
66                   policy is to allow 0..{} sec only",
67                delta,
68                (delta as f64) / f64::from(self.clock_rate.get()),
69                self.timestamp,
70                timestamp,
71                self.max_forward_jump_secs
72            ));
73        }
74        self.timestamp = timestamp.timestamp;
75        Ok(timestamp)
76    }
77
78    /// Places `rtp_timestamp` on the timeline without advancing the timeline
79    /// or applying time jump policy. Will set the NPT epoch if unset.
80    ///
81    /// This is useful for RTP timestamps in RTCP packets. They commonly refer
82    /// to time slightly before the most timestamp of the matching RTP stream.
83    pub fn place(&mut self, rtp_timestamp: u32) -> Result<Timestamp, String> {
84        Ok(self.ts_and_delta(rtp_timestamp)?.0)
85    }
86
87    fn ts_and_delta(&mut self, rtp_timestamp: u32) -> Result<(Timestamp, i32), String> {
88        let start = match self.start {
89            None => {
90                self.start = Some(rtp_timestamp);
91                self.timestamp = i64::from(rtp_timestamp);
92                rtp_timestamp
93            }
94            Some(start) => start,
95        };
96        let delta = (rtp_timestamp as i32).wrapping_sub(self.timestamp as i32);
97        let timestamp = self
98            .timestamp
99            .checked_add(i64::from(delta))
100            .ok_or_else(|| {
101                // This probably won't happen even with a hostile server. It'd
102                // take ~2^32 packets (~ 4 billion) to advance the time this far
103                // forward or backward even with no limits on time jump per
104                // packet.
105                format!(
106                    "timestamp {} + delta {} won't fit in i64!",
107                    self.timestamp, delta
108                )
109            })?;
110
111        // Also error in similarly-unlikely NPT underflow.
112        if timestamp.checked_sub(i64::from(start)).is_none() {
113            return Err(format!(
114                "timestamp {} + delta {} - start {} underflows i64!",
115                self.timestamp, delta, start
116            ));
117        }
118        Ok((
119            Timestamp {
120                timestamp,
121                clock_rate: self.clock_rate,
122                start,
123            },
124            delta,
125        ))
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use std::num::NonZeroU32;
132
133    use super::Timeline;
134
135    #[test]
136    fn timeline() {
137        // Don't allow crazy clock rates that will get us into trouble.
138        Timeline::new(Some(0), 0, None).unwrap_err();
139        Timeline::new(Some(0), u32::MAX, NonZeroU32::new(10)).unwrap_err();
140
141        // Don't allow excessive forward jumps when enforcement is enabled.
142        let mut t = Timeline::new(Some(100), 90_000, NonZeroU32::new(10)).unwrap();
143        t.advance_to(100 + (10 * 90_000) + 1).unwrap_err();
144
145        // Or any backward jump when enforcement is enabled.
146        let mut t = Timeline::new(Some(100), 90_000, NonZeroU32::new(10)).unwrap();
147        t.advance_to(99).unwrap_err();
148
149        // ...but do allow backward RTP timestamps in RTCP.
150        let mut t = Timeline::new(Some(100), 90_000, NonZeroU32::new(10)).unwrap();
151        assert_eq!(t.place(99).unwrap().elapsed(), -1);
152        assert_eq!(t.advance_to(101).unwrap().elapsed(), 1);
153
154        // ...and be more permissive when enforcement is disabled.
155        let mut t = Timeline::new(Some(100), 90_000, None).unwrap();
156        t.advance_to(100 + (10 * 90_000) + 1).unwrap();
157        let mut t = Timeline::new(Some(100), 90_000, None).unwrap();
158        t.advance_to(99).unwrap();
159
160        // Normal usage.
161        let mut t = Timeline::new(Some(42), 90_000, NonZeroU32::new(10)).unwrap();
162        assert_eq!(t.advance_to(83).unwrap().elapsed(), 83 - 42);
163        assert_eq!(t.advance_to(453).unwrap().elapsed(), 453 - 42);
164
165        // Wraparound is normal too.
166        let mut t = Timeline::new(Some(u32::MAX), 90_000, NonZeroU32::new(10)).unwrap();
167        assert_eq!(t.advance_to(5).unwrap().elapsed(), 5 + 1);
168
169        // No initial rtptime.
170        let mut t = Timeline::new(None, 90_000, NonZeroU32::new(10)).unwrap();
171        assert_eq!(t.advance_to(218250000).unwrap().elapsed(), 0);
172    }
173
174    #[test]
175    fn cast() {
176        let a = 0x1_FFFF_FFFFi64;
177        let b = 0x1_0000_0000i64;
178        assert_eq!(a as i32, -1);
179        assert_eq!(b as i32, 0);
180    }
181}