Skip to main content

rtc_media/io/sample_builder/
mod.rs

1#[cfg(test)]
2mod sample_builder_test;
3#[cfg(test)]
4mod sample_sequence_location_test;
5
6pub mod sample_sequence_location;
7
8use self::sample_sequence_location::{Comparison, SampleSequenceLocation};
9use crate::Sample;
10use bytes::Bytes;
11use rtp::Packet;
12use rtp::packetizer::Depacketizer;
13use shared::time::SystemInstant;
14use std::time::Duration;
15
16/// SampleBuilder buffers packets until media frames are complete.
17pub struct SampleBuilder<T: Depacketizer> {
18    /// how many packets to wait until we get a valid Sample
19    max_late: u16,
20    /// max timestamp between old and new timestamps before dropping packets
21    max_late_timestamp: u32,
22    buffer: Vec<Option<Packet>>,
23    prepared_samples: Vec<Option<Sample>>,
24    last_sample_timestamp: Option<u32>,
25
26    /// Interface that allows us to take RTP packets to samples
27    depacketizer: T,
28
29    /// sample_rate allows us to compute duration of media.SamplecA
30    sample_rate: u32,
31
32    /// filled contains the head/tail of the packets inserted into the buffer
33    filled: SampleSequenceLocation,
34
35    /// active contains the active head/tail of the timestamp being actively processed
36    active: SampleSequenceLocation,
37
38    /// prepared contains the samples that have been processed to date
39    prepared: SampleSequenceLocation,
40
41    /// number of packets forced to be dropped
42    dropped_packets: u16,
43
44    /// number of padding packets detected and dropped. This number will be a subset of
45    /// `dropped_packets`
46    padding_packets: u16,
47}
48
49impl<T: Depacketizer> SampleBuilder<T> {
50    /// Constructs a new SampleBuilder.
51    /// `max_late` is how long to wait until we can construct a completed [`Sample`].
52    /// `max_late` is measured in RTP packet sequence numbers.
53    /// A large max_late will result in less packet loss but higher latency.
54    /// The depacketizer extracts media samples from RTP packets.
55    pub fn new(max_late: u16, depacketizer: T, sample_rate: u32) -> Self {
56        Self {
57            max_late,
58            max_late_timestamp: 0,
59            buffer: vec![None; u16::MAX as usize + 1],
60            prepared_samples: (0..=u16::MAX as usize).map(|_| None).collect(),
61            last_sample_timestamp: None,
62            depacketizer,
63            sample_rate,
64            filled: SampleSequenceLocation::new(),
65            active: SampleSequenceLocation::new(),
66            prepared: SampleSequenceLocation::new(),
67            dropped_packets: 0,
68            padding_packets: 0,
69        }
70    }
71
72    pub fn with_max_time_delay(mut self, max_late_duration: Duration) -> Self {
73        self.max_late_timestamp =
74            (self.sample_rate as u128 * max_late_duration.as_millis() / 1000) as u32;
75        self
76    }
77
78    fn too_old(&self, location: &SampleSequenceLocation) -> bool {
79        if self.max_late_timestamp == 0 {
80            return false;
81        }
82
83        let mut found_head: Option<u32> = None;
84        let mut found_tail: Option<u32> = None;
85
86        let mut i = location.head;
87        while i != location.tail {
88            if let Some(ref packet) = self.buffer[i as usize] {
89                found_head = Some(packet.header.timestamp);
90                break;
91            }
92            i = i.wrapping_add(1);
93        }
94
95        if found_head.is_none() {
96            return false;
97        }
98
99        let mut i = location.tail.wrapping_sub(1);
100        while i != location.head {
101            if let Some(ref packet) = self.buffer[i as usize] {
102                found_tail = Some(packet.header.timestamp);
103                break;
104            }
105            i = i.wrapping_sub(1);
106        }
107
108        if found_tail.is_none() {
109            return false;
110        }
111
112        found_tail.unwrap().wrapping_sub(found_head.unwrap()) > self.max_late_timestamp
113    }
114
115    /// Returns the timestamp associated with a given sample location
116    fn fetch_timestamp(&self, location: &SampleSequenceLocation) -> Option<u32> {
117        if location.empty() {
118            None
119        } else {
120            Some(
121                (self.buffer[location.head as usize])
122                    .as_ref()?
123                    .header
124                    .timestamp,
125            )
126        }
127    }
128
129    fn release_packet(&mut self, i: u16) {
130        self.buffer[i as usize] = None;
131    }
132
133    /// Clears all buffers that have already been consumed by
134    /// popping.
135    fn purge_consumed_buffers(&mut self) {
136        let active = self.active;
137        self.purge_consumed_location(&active, false);
138    }
139
140    /// Clears all buffers that have already been consumed
141    /// during a sample building method.
142    fn purge_consumed_location(&mut self, consume: &SampleSequenceLocation, force_consume: bool) {
143        if !self.filled.has_data() {
144            return;
145        }
146        match consume.compare(self.filled.head) {
147            Comparison::Inside if force_consume => {
148                self.release_packet(self.filled.head);
149                self.filled.head = self.filled.head.wrapping_add(1);
150            }
151            Comparison::Before => {
152                self.release_packet(self.filled.head);
153                self.filled.head = self.filled.head.wrapping_add(1);
154            }
155            _ => {}
156        }
157    }
158
159    /// Flushes all buffers that are already consumed or those buffers
160    /// that are too late to consume.
161    fn purge_buffers(&mut self) {
162        self.purge_consumed_buffers();
163
164        while (self.too_old(&self.filled) || (self.filled.count() > self.max_late))
165            && self.filled.has_data()
166        {
167            if self.active.empty() {
168                // refill the active based on the filled packets
169                self.active = self.filled;
170            }
171
172            if self.active.has_data() && (self.active.head == self.filled.head) {
173                // attempt to force the active packet to be consumed even though
174                // outstanding data may be pending arrival
175                let err = match self.build_sample(true) {
176                    Ok(_) => continue,
177                    Err(e) => e,
178                };
179
180                if !matches!(err, BuildError::InvalidPartition(_)) {
181                    // In the InvalidPartition case `build_sample` will have already adjusted `dropped_packets`.
182                    self.dropped_packets += 1;
183                }
184
185                // could not build the sample so drop it
186                self.active.head = self.active.head.wrapping_add(1);
187            }
188
189            self.release_packet(self.filled.head);
190            self.filled.head = self.filled.head.wrapping_add(1);
191        }
192    }
193
194    /// Adds an RTP Packet to self's buffer.
195    ///
196    /// Push does not copy the input. If you wish to reuse
197    /// this memory make sure to copy before calling push
198    pub fn push(&mut self, p: Packet) {
199        let sequence_number = p.header.sequence_number;
200        self.buffer[sequence_number as usize] = Some(p);
201        match self.filled.compare(sequence_number) {
202            Comparison::Void => {
203                self.filled.head = sequence_number;
204                self.filled.tail = sequence_number.wrapping_add(1);
205            }
206            Comparison::Before => {
207                self.filled.head = sequence_number;
208            }
209            Comparison::After => {
210                self.filled.tail = sequence_number.wrapping_add(1);
211            }
212            _ => {}
213        }
214        self.purge_buffers();
215    }
216
217    /// Creates a sample from a valid collection of RTP Packets by
218    /// walking forwards building a sample if everything looks good clear and
219    /// update buffer+values
220    fn build_sample(
221        &mut self,
222        purging_buffers: bool,
223    ) -> Result<SampleSequenceLocation, BuildError> {
224        if self.active.empty() {
225            self.active = self.filled;
226        }
227
228        if self.active.empty() {
229            return Err(BuildError::NoActiveSegment);
230        }
231
232        if self.filled.compare(self.active.tail) == Comparison::Inside {
233            self.active.tail = self.filled.tail;
234        }
235
236        let mut consume = SampleSequenceLocation::new();
237
238        let mut i = self.active.head;
239        // `self.active` isn't modified in the loop, fetch the timestamp once and cache it.
240        let head_timestamp = self.fetch_timestamp(&self.active);
241        while let Some(ref packet) = self.buffer[i as usize] {
242            if self.active.compare(i) == Comparison::After {
243                break;
244            }
245            let is_same_timestamp = head_timestamp.map(|t| packet.header.timestamp == t);
246            let is_different_timestamp = is_same_timestamp.map(std::ops::Not::not);
247            let is_partition_tail = self
248                .depacketizer
249                .is_partition_tail(packet.header.marker, &packet.payload);
250
251            // If the timestamp is not the same it might be because the next packet is both a start
252            // and end of the next partition in which case a sample should be generated now. This
253            // can happen when padding packets are used .e.g:
254            //
255            // p1(t=1), p2(t=1), p3(t=1), p4(t=2, marker=true, start=true)
256            //
257            // In thic case the generated sample should be p1 through p3, but excluding p4 which is
258            // its own sample.
259            if is_partition_tail && is_same_timestamp.unwrap_or(true) {
260                consume.head = self.active.head;
261                consume.tail = i.wrapping_add(1);
262                break;
263            }
264
265            if is_different_timestamp.unwrap_or(false) {
266                consume.head = self.active.head;
267                consume.tail = i;
268                break;
269            }
270            i = i.wrapping_add(1);
271        }
272
273        if consume.empty() {
274            return Err(BuildError::NothingToConsume);
275        }
276
277        if !purging_buffers && self.buffer[consume.tail as usize].is_none() {
278            // wait for the next packet after this set of packets to arrive
279            // to ensure at least one post sample timestamp is known
280            // (unless we have to release right now)
281            return Err(BuildError::PendingTimestampPacket);
282        }
283
284        let sample_timestamp = self.fetch_timestamp(&self.active).unwrap_or(0);
285        let mut after_timestamp = sample_timestamp;
286
287        // scan for any packet after the current and use that time stamp as the diff point
288        for i in consume.tail..self.active.tail {
289            if let Some(ref packet) = self.buffer[i as usize] {
290                after_timestamp = packet.header.timestamp;
291                break;
292            }
293        }
294
295        // prior to decoding all the packets, check if this packet
296        // would end being disposed anyway
297        let head_payload = self.buffer[consume.head as usize]
298            .as_ref()
299            .map(|p| &p.payload)
300            .ok_or(BuildError::GapInSegment)?;
301        if !self.depacketizer.is_partition_head(head_payload) {
302            // libWebRTC will sometimes send several empty padding packets to smooth out send
303            // rate. These packets don't carry any media payloads.
304            let is_padding = consume.range(&self.buffer).all(|p| {
305                p.map(|p| {
306                    self.last_sample_timestamp == Some(p.header.timestamp) && p.payload.is_empty()
307                })
308                .unwrap_or(false)
309            });
310
311            self.dropped_packets += consume.count();
312            if is_padding {
313                self.padding_packets += consume.count();
314            }
315            self.purge_consumed_location(&consume, true);
316            self.purge_consumed_buffers();
317
318            self.active.head = consume.tail;
319            return Err(BuildError::InvalidPartition(consume));
320        }
321
322        // the head set of packets is now fully consumed
323        self.active.head = consume.tail;
324
325        // merge all the buffers into a sample
326        let mut data: Vec<u8> = Vec::new();
327        let mut i = consume.head;
328        while i != consume.tail {
329            let payload = self.buffer[i as usize]
330                .as_ref()
331                .map(|p| &p.payload)
332                .ok_or(BuildError::GapInSegment)?;
333
334            let p = self
335                .depacketizer
336                .depacketize(payload)
337                .map_err(|_| BuildError::DepacketizerFailed)?;
338
339            data.extend_from_slice(&p);
340            i = i.wrapping_add(1);
341        }
342        let samples = after_timestamp.wrapping_sub(sample_timestamp);
343
344        let sample = Sample {
345            data: Bytes::copy_from_slice(&data),
346            timestamp: SystemInstant::now(),
347            duration: Duration::from_secs_f64((samples as f64) / (self.sample_rate as f64)),
348            packet_timestamp: sample_timestamp,
349            prev_dropped_packets: self.dropped_packets,
350            prev_padding_packets: self.padding_packets,
351        };
352
353        self.dropped_packets = 0;
354        self.padding_packets = 0;
355        self.last_sample_timestamp = Some(sample_timestamp);
356
357        self.prepared_samples[self.prepared.tail as usize] = Some(sample);
358        self.prepared.tail = self.prepared.tail.wrapping_add(1);
359
360        self.purge_consumed_location(&consume, true);
361        self.purge_consumed_buffers();
362
363        Ok(consume)
364    }
365
366    /// Compiles pushed RTP packets into media samples and then
367    /// returns the next valid sample (or None if no sample is compiled).
368    pub fn pop(&mut self) -> Option<Sample> {
369        let _ = self.build_sample(false);
370
371        if self.prepared.empty() {
372            return None;
373        }
374        let result = self.prepared_samples[self.prepared.head as usize].take();
375        self.prepared.head = self.prepared.head.wrapping_add(1);
376        result
377    }
378
379    /// Compiles pushed RTP packets into media samples and then
380    /// returns the next valid sample with its associated RTP timestamp (or `None` if
381    /// no sample is compiled).
382    pub fn pop_with_timestamp(&mut self) -> Option<(Sample, u32)> {
383        if let Some(sample) = self.pop() {
384            let timestamp = sample.packet_timestamp;
385            Some((sample, timestamp))
386        } else {
387            None
388        }
389    }
390}
391
392// Computes the distance between two sequence numbers
393/*pub(crate) fn seqnum_distance(head: u16, tail: u16) -> u16 {
394    if head > tail {
395        head.wrapping_add(tail)
396    } else {
397        tail - head
398    }
399}*/
400
401pub(crate) fn seqnum_distance(x: u16, y: u16) -> u16 {
402    let diff = x.wrapping_sub(y);
403    if diff > 0xFFFF / 2 {
404        0xFFFF - diff + 1
405    } else {
406        diff
407    }
408}
409
410#[derive(Debug)]
411enum BuildError {
412    /// There's no active segment of RTP packets to consider yet.
413    NoActiveSegment,
414
415    /// No sample partition could be found in the active segment.
416    NothingToConsume,
417
418    /// A segment to consume was identified, but a subsequent packet is needed to determine the
419    /// duration of the sample.
420    PendingTimestampPacket,
421
422    /// The active segment's head was not aligned with a sample partition head. Some packets were
423    /// dropped.
424    InvalidPartition(SampleSequenceLocation),
425
426    /// There was a gap in the active segment because of one or more missing RTP packets.
427    GapInSegment,
428
429    /// We failed to depacketize an RTP packet.
430    DepacketizerFailed,
431}