Skip to main content

srt/protocol/
mod.rs

1use std::cmp::Ordering;
2use std::num::Wrapping;
3use std::ops::{Add, Div, Mul, Neg, Sub};
4use std::time::{Duration, Instant};
5use std::u32;
6
7pub mod connection;
8pub mod handshake;
9pub mod receiver;
10pub mod sender;
11
12/// Timestamp in us after creation
13/// These wrap every 2^32 microseconds
14#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord)]
15pub struct TimeStamp(Wrapping<u32>);
16
17/// Signed duration in us, e.g. RTT
18#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd)]
19pub struct TimeSpan(i32);
20
21const TIMESTAMP_MASK: u128 = u32::MAX as u128;
22
23#[derive(Copy, Clone, Debug)]
24pub struct TimeBase(Instant);
25
26impl TimeSpan {
27    pub fn from_micros(us: i32) -> Self {
28        Self(us)
29    }
30
31    pub fn as_micros(self) -> i32 {
32        self.0
33    }
34
35    pub fn abs(self) -> Self {
36        Self(self.0.abs())
37    }
38
39    pub fn as_secs_f64(self) -> f64 {
40        self.0 as f64 / 1e6
41    }
42}
43
44impl TimeStamp {
45    pub fn from_micros(us: u32) -> Self {
46        Self(Wrapping(us))
47    }
48
49    pub fn as_micros(self) -> u32 {
50        (self.0).0
51    }
52
53    pub fn as_secs_f64(self) -> f64 {
54        (self.0).0 as f64 / 1e6
55    }
56
57    pub fn as_duration(self) -> Duration {
58        Duration::from_micros(u64::from(self.as_micros()))
59    }
60}
61
62impl PartialOrd<TimeStamp> for TimeStamp {
63    fn partial_cmp(&self, other: &TimeStamp) -> Option<Ordering> {
64        // this is a "best effort" implementation, and goes for close
65        // if timestamps are very far apart, this will not work (and cannot)
66        Some((*self - *other).as_micros().cmp(&0))
67    }
68}
69
70impl Add<TimeSpan> for TimeStamp {
71    type Output = TimeStamp;
72
73    #[allow(clippy::suspicious_arithmetic_impl)]
74    fn add(self, rhs: TimeSpan) -> Self::Output {
75        TimeStamp(if rhs.0 > 0 {
76            self.0 + Wrapping(rhs.0 as u32)
77        } else {
78            self.0 - Wrapping(rhs.0.abs() as u32)
79        })
80    }
81}
82
83impl Sub<TimeSpan> for TimeStamp {
84    type Output = TimeStamp;
85
86    fn sub(self, rhs: TimeSpan) -> Self::Output {
87        self + -rhs
88    }
89}
90
91impl Sub<TimeStamp> for TimeStamp {
92    type Output = TimeSpan;
93
94    fn sub(self, rhs: TimeStamp) -> TimeSpan {
95        // This is also a "best effort" implementation, and cannot be precise
96        let pos_sub = self.0 - rhs.0;
97        let neg_sub = rhs.0 - self.0;
98        if pos_sub < neg_sub {
99            TimeSpan(pos_sub.0 as i32)
100        } else {
101            -TimeSpan(neg_sub.0 as i32)
102        }
103    }
104}
105
106impl Neg for TimeSpan {
107    type Output = TimeSpan;
108
109    fn neg(self) -> Self::Output {
110        Self(-self.0)
111    }
112}
113
114impl Mul<i32> for TimeSpan {
115    type Output = TimeSpan;
116
117    fn mul(self, rhs: i32) -> Self::Output {
118        Self(self.0 * rhs)
119    }
120}
121
122impl Add<TimeSpan> for TimeSpan {
123    type Output = TimeSpan;
124
125    fn add(self, rhs: TimeSpan) -> Self::Output {
126        Self(self.0 + rhs.0)
127    }
128}
129
130impl Div<i32> for TimeSpan {
131    type Output = TimeSpan;
132
133    fn div(self, rhs: i32) -> Self::Output {
134        Self(self.0 / rhs)
135    }
136}
137
138impl Sub<TimeSpan> for TimeSpan {
139    type Output = TimeSpan;
140
141    fn sub(self, rhs: TimeSpan) -> Self::Output {
142        Self(self.0 - rhs.0)
143    }
144}
145
146impl TimeBase {
147    pub fn new(start_time: Instant) -> Self {
148        Self(start_time)
149    }
150
151    #[allow(clippy::trivially_copy_pass_by_ref)]
152    pub fn timestamp_from(&self, instant: Instant) -> TimeStamp {
153        assert!(
154            self.0 <= instant,
155            "Timestamps are only valid after the timebase start time"
156        );
157        TimeStamp(Wrapping(
158            ((instant - self.0).as_micros() & TIMESTAMP_MASK) as u32,
159        ))
160    }
161
162    // Get Instant closest to `now` that is consistent with `timestamp`
163    #[allow(clippy::trivially_copy_pass_by_ref)]
164    pub fn instant_from(&self, now: Instant, timestamp: TimeStamp) -> Instant {
165        let wraps = ((now - self.0).as_micros() >> 32) as u64;
166        self.0
167            + Duration::from_micros(wraps * u64::from(std::u32::MAX) + timestamp.as_micros() as u64)
168    }
169
170    pub fn adjust(&mut self, delta: TimeSpan) {
171        if delta.0 > 0 {
172            self.0 += Duration::from_micros(delta.0 as u64);
173        } else {
174            self.0 -= Duration::from_micros(delta.0.abs() as u64);
175        }
176    }
177
178    pub fn origin_time(&self) -> Instant {
179        self.0
180    }
181}
182
183#[cfg(test)]
184mod timebase {
185    use super::*;
186    use proptest::prelude::*;
187
188    proptest! {
189        #[test]
190        fn timestamp_roundtrip(expected_ts: u32) {
191            let timebase = TimeBase::new(Instant::now());
192            let expected_ts = TimeStamp::from_micros(expected_ts);
193
194            let ts = timebase.timestamp_from(timebase.instant_from(Instant::now(), expected_ts));
195            assert_eq!(ts, expected_ts);
196        }
197
198        #[test]
199        fn timestamp_from(expected_ts: u32, n in 0u64..10) {
200            let now = Instant::now();
201            let timebase = TimeBase::new(now);
202            let delta = ((std::u32::MAX as u64 + 1)* n) + expected_ts as u64;
203            let instant =  now + Duration::from_micros(delta as u64);
204            let ts = timebase.timestamp_from(instant);
205            assert_eq!(ts, TimeStamp::from_micros(expected_ts));
206        }
207
208        #[test]
209        fn adjust(drift: i16) {
210            let now = Instant::now();
211            let mut timebase = TimeBase::new(now);
212            let drift = TimeSpan::from_micros(i32::from(drift));
213
214            let original_ts = timebase.timestamp_from(now);
215            timebase.adjust(drift);
216            let ts = timebase.timestamp_from(now + Duration::from_micros(1_000_000));
217
218            assert_eq!(ts, original_ts - drift + TimeSpan::from_micros(1_000_000));
219        }
220
221    }
222}
223
224#[cfg(test)]
225mod timestamp {
226    use super::*;
227
228    #[test]
229    fn subtract_timestamp() {
230        let a = TimeStamp::from_micros(10);
231        let max = a - TimeSpan(11);
232        let b = TimeStamp::from_micros(11);
233
234        assert_eq!(a - a, TimeSpan::from_micros(0));
235        assert_eq!(b - a, TimeSpan::from_micros(1));
236        assert_eq!(a - b, TimeSpan::from_micros(-1));
237        assert!(max < a);
238        assert!(b > a);
239        assert!(b > max);
240        assert_eq!(max.as_micros(), u32::MAX);
241    }
242}
243
244//4. Timers
245//
246//   UDT uses four timers to trigger different periodical events. Each
247//   event has its own period and they are all independent. They use the
248//   system time as origins and should process wrapping if the system time
249//   wraps.
250//
251//   For a certain periodical event E in UDT, suppose the time variable is
252//   ET and its period is p. If E is set or reset at system time t0 (ET =
253//   t0), then at any time t1, (t1 - ET >= p) is the condition to check if
254//   E should be triggered.
255//
256//   The four timers are ACK, NAK, EXP and SND. SND is used in the sender
257//   only for rate-based packet sending (see Section 6.1), whereas the
258//   other three are used in the receiver only.
259//
260//   ACK is used to trigger an acknowledgement (ACK). Its period is set by
261//   the congestion control module. However, UDT will send an ACK no
262//   longer than every 0.01 second, even though the congestion control
263//   does not need timer-based ACK. Here, 0.01 second is defined as the
264//   SYN time, or synchronization time, and it affects many of the other
265//   timers used in UDT.
266//
267//   NAK is used to trigger a negative acknowledgement (NAK). Its period
268//   is dynamically updated to 4 * RTT_+ RTTVar + SYN, where RTTVar is the
269//   variance of RTT samples.
270//
271//   EXP is used to trigger data packets retransmission and maintain
272//   connection status. Its period is dynamically updated to 4 * RTT +
273//   RTTVar + SYN.
274//
275//   The recommended granularity of their periods is microseconds. The
276//   system time is queried after each time bounded UDP receiving (there
277//   will be additional necessary data processing time if a UDP packet is
278//   received) to check if any of the ACK, NAK, or EXP event should be
279//   triggered. The timeout value of UDP receiving should be at least SYN.
280//
281//   In the rest of this document, a name of a time variable will be used
282//   to represent the associated event, the variable itself, or the value
283//   of its period, depending on the context. For example, ACK can mean
284//   either the ACK event or the value of ACK period.
285
286pub struct Timer {
287    period: Duration,
288    last: Instant,
289}
290
291impl Timer {
292    pub fn new(period: Duration, now: Instant) -> Timer {
293        Timer { period, last: now }
294    }
295
296    pub fn period(&mut self) -> Duration {
297        self.period
298    }
299
300    pub fn next_instant(&self) -> Instant {
301        self.last + self.period
302    }
303
304    pub fn reset(&mut self, now: Instant) {
305        self.last = now;
306    }
307
308    pub fn set_period(&mut self, period: Duration) {
309        self.period = period;
310    }
311
312    pub fn check_expired(&mut self, now: Instant) -> Option<Instant> {
313        if now > self.next_instant() {
314            self.last = self.next_instant();
315            Some(self.last)
316        } else {
317            None
318        }
319    }
320}