rtc_media/io/sample_builder/
mod.rs1#[cfg(test)]
2mod sample_builder_test;
3#[cfg(test)]
4mod sample_sequence_location_test;
5
6pub mod sample_sequence_location;
7
8use self::sample_sequence_location::{Comparison, SampleSequenceLocation};
9use crate::Sample;
10use bytes::Bytes;
11use rtp::Packet;
12use rtp::packetizer::Depacketizer;
13use shared::time::SystemInstant;
14use std::time::Duration;
15
16pub struct SampleBuilder<T: Depacketizer> {
18 max_late: u16,
20 max_late_timestamp: u32,
22 buffer: Vec<Option<Packet>>,
23 prepared_samples: Vec<Option<Sample>>,
24 last_sample_timestamp: Option<u32>,
25
26 depacketizer: T,
28
29 sample_rate: u32,
31
32 filled: SampleSequenceLocation,
34
35 active: SampleSequenceLocation,
37
38 prepared: SampleSequenceLocation,
40
41 dropped_packets: u16,
43
44 padding_packets: u16,
47}
48
49impl<T: Depacketizer> SampleBuilder<T> {
50 pub fn new(max_late: u16, depacketizer: T, sample_rate: u32) -> Self {
56 Self {
57 max_late,
58 max_late_timestamp: 0,
59 buffer: vec![None; u16::MAX as usize + 1],
60 prepared_samples: (0..=u16::MAX as usize).map(|_| None).collect(),
61 last_sample_timestamp: None,
62 depacketizer,
63 sample_rate,
64 filled: SampleSequenceLocation::new(),
65 active: SampleSequenceLocation::new(),
66 prepared: SampleSequenceLocation::new(),
67 dropped_packets: 0,
68 padding_packets: 0,
69 }
70 }
71
72 pub fn with_max_time_delay(mut self, max_late_duration: Duration) -> Self {
73 self.max_late_timestamp =
74 (self.sample_rate as u128 * max_late_duration.as_millis() / 1000) as u32;
75 self
76 }
77
78 fn too_old(&self, location: &SampleSequenceLocation) -> bool {
79 if self.max_late_timestamp == 0 {
80 return false;
81 }
82
83 let mut found_head: Option<u32> = None;
84 let mut found_tail: Option<u32> = None;
85
86 let mut i = location.head;
87 while i != location.tail {
88 if let Some(ref packet) = self.buffer[i as usize] {
89 found_head = Some(packet.header.timestamp);
90 break;
91 }
92 i = i.wrapping_add(1);
93 }
94
95 if found_head.is_none() {
96 return false;
97 }
98
99 let mut i = location.tail.wrapping_sub(1);
100 while i != location.head {
101 if let Some(ref packet) = self.buffer[i as usize] {
102 found_tail = Some(packet.header.timestamp);
103 break;
104 }
105 i = i.wrapping_sub(1);
106 }
107
108 if found_tail.is_none() {
109 return false;
110 }
111
112 found_tail.unwrap().wrapping_sub(found_head.unwrap()) > self.max_late_timestamp
113 }
114
115 fn fetch_timestamp(&self, location: &SampleSequenceLocation) -> Option<u32> {
117 if location.empty() {
118 None
119 } else {
120 Some(
121 (self.buffer[location.head as usize])
122 .as_ref()?
123 .header
124 .timestamp,
125 )
126 }
127 }
128
129 fn release_packet(&mut self, i: u16) {
130 self.buffer[i as usize] = None;
131 }
132
133 fn purge_consumed_buffers(&mut self) {
136 let active = self.active;
137 self.purge_consumed_location(&active, false);
138 }
139
140 fn purge_consumed_location(&mut self, consume: &SampleSequenceLocation, force_consume: bool) {
143 if !self.filled.has_data() {
144 return;
145 }
146 match consume.compare(self.filled.head) {
147 Comparison::Inside if force_consume => {
148 self.release_packet(self.filled.head);
149 self.filled.head = self.filled.head.wrapping_add(1);
150 }
151 Comparison::Before => {
152 self.release_packet(self.filled.head);
153 self.filled.head = self.filled.head.wrapping_add(1);
154 }
155 _ => {}
156 }
157 }
158
159 fn purge_buffers(&mut self) {
162 self.purge_consumed_buffers();
163
164 while (self.too_old(&self.filled) || (self.filled.count() > self.max_late))
165 && self.filled.has_data()
166 {
167 if self.active.empty() {
168 self.active = self.filled;
170 }
171
172 if self.active.has_data() && (self.active.head == self.filled.head) {
173 let err = match self.build_sample(true) {
176 Ok(_) => continue,
177 Err(e) => e,
178 };
179
180 if !matches!(err, BuildError::InvalidPartition(_)) {
181 self.dropped_packets += 1;
183 }
184
185 self.active.head = self.active.head.wrapping_add(1);
187 }
188
189 self.release_packet(self.filled.head);
190 self.filled.head = self.filled.head.wrapping_add(1);
191 }
192 }
193
194 pub fn push(&mut self, p: Packet) {
199 let sequence_number = p.header.sequence_number;
200 self.buffer[sequence_number as usize] = Some(p);
201 match self.filled.compare(sequence_number) {
202 Comparison::Void => {
203 self.filled.head = sequence_number;
204 self.filled.tail = sequence_number.wrapping_add(1);
205 }
206 Comparison::Before => {
207 self.filled.head = sequence_number;
208 }
209 Comparison::After => {
210 self.filled.tail = sequence_number.wrapping_add(1);
211 }
212 _ => {}
213 }
214 self.purge_buffers();
215 }
216
217 fn build_sample(
221 &mut self,
222 purging_buffers: bool,
223 ) -> Result<SampleSequenceLocation, BuildError> {
224 if self.active.empty() {
225 self.active = self.filled;
226 }
227
228 if self.active.empty() {
229 return Err(BuildError::NoActiveSegment);
230 }
231
232 if self.filled.compare(self.active.tail) == Comparison::Inside {
233 self.active.tail = self.filled.tail;
234 }
235
236 let mut consume = SampleSequenceLocation::new();
237
238 let mut i = self.active.head;
239 let head_timestamp = self.fetch_timestamp(&self.active);
241 while let Some(ref packet) = self.buffer[i as usize] {
242 if self.active.compare(i) == Comparison::After {
243 break;
244 }
245 let is_same_timestamp = head_timestamp.map(|t| packet.header.timestamp == t);
246 let is_different_timestamp = is_same_timestamp.map(std::ops::Not::not);
247 let is_partition_tail = self
248 .depacketizer
249 .is_partition_tail(packet.header.marker, &packet.payload);
250
251 if is_partition_tail && is_same_timestamp.unwrap_or(true) {
260 consume.head = self.active.head;
261 consume.tail = i.wrapping_add(1);
262 break;
263 }
264
265 if is_different_timestamp.unwrap_or(false) {
266 consume.head = self.active.head;
267 consume.tail = i;
268 break;
269 }
270 i = i.wrapping_add(1);
271 }
272
273 if consume.empty() {
274 return Err(BuildError::NothingToConsume);
275 }
276
277 if !purging_buffers && self.buffer[consume.tail as usize].is_none() {
278 return Err(BuildError::PendingTimestampPacket);
282 }
283
284 let sample_timestamp = self.fetch_timestamp(&self.active).unwrap_or(0);
285 let mut after_timestamp = sample_timestamp;
286
287 for i in consume.tail..self.active.tail {
289 if let Some(ref packet) = self.buffer[i as usize] {
290 after_timestamp = packet.header.timestamp;
291 break;
292 }
293 }
294
295 let head_payload = self.buffer[consume.head as usize]
298 .as_ref()
299 .map(|p| &p.payload)
300 .ok_or(BuildError::GapInSegment)?;
301 if !self.depacketizer.is_partition_head(head_payload) {
302 let is_padding = consume.range(&self.buffer).all(|p| {
305 p.map(|p| {
306 self.last_sample_timestamp == Some(p.header.timestamp) && p.payload.is_empty()
307 })
308 .unwrap_or(false)
309 });
310
311 self.dropped_packets += consume.count();
312 if is_padding {
313 self.padding_packets += consume.count();
314 }
315 self.purge_consumed_location(&consume, true);
316 self.purge_consumed_buffers();
317
318 self.active.head = consume.tail;
319 return Err(BuildError::InvalidPartition(consume));
320 }
321
322 self.active.head = consume.tail;
324
325 let mut data: Vec<u8> = Vec::new();
327 let mut i = consume.head;
328 while i != consume.tail {
329 let payload = self.buffer[i as usize]
330 .as_ref()
331 .map(|p| &p.payload)
332 .ok_or(BuildError::GapInSegment)?;
333
334 let p = self
335 .depacketizer
336 .depacketize(payload)
337 .map_err(|_| BuildError::DepacketizerFailed)?;
338
339 data.extend_from_slice(&p);
340 i = i.wrapping_add(1);
341 }
342 let samples = after_timestamp.wrapping_sub(sample_timestamp);
343
344 let sample = Sample {
345 data: Bytes::copy_from_slice(&data),
346 timestamp: SystemInstant::now(),
347 duration: Duration::from_secs_f64((samples as f64) / (self.sample_rate as f64)),
348 packet_timestamp: sample_timestamp,
349 prev_dropped_packets: self.dropped_packets,
350 prev_padding_packets: self.padding_packets,
351 };
352
353 self.dropped_packets = 0;
354 self.padding_packets = 0;
355 self.last_sample_timestamp = Some(sample_timestamp);
356
357 self.prepared_samples[self.prepared.tail as usize] = Some(sample);
358 self.prepared.tail = self.prepared.tail.wrapping_add(1);
359
360 self.purge_consumed_location(&consume, true);
361 self.purge_consumed_buffers();
362
363 Ok(consume)
364 }
365
366 pub fn pop(&mut self) -> Option<Sample> {
369 let _ = self.build_sample(false);
370
371 if self.prepared.empty() {
372 return None;
373 }
374 let result = self.prepared_samples[self.prepared.head as usize].take();
375 self.prepared.head = self.prepared.head.wrapping_add(1);
376 result
377 }
378
379 pub fn pop_with_timestamp(&mut self) -> Option<(Sample, u32)> {
383 if let Some(sample) = self.pop() {
384 let timestamp = sample.packet_timestamp;
385 Some((sample, timestamp))
386 } else {
387 None
388 }
389 }
390}
391
392pub(crate) fn seqnum_distance(x: u16, y: u16) -> u16 {
402 let diff = x.wrapping_sub(y);
403 if diff > 0xFFFF / 2 {
404 0xFFFF - diff + 1
405 } else {
406 diff
407 }
408}
409
410#[derive(Debug)]
411enum BuildError {
412 NoActiveSegment,
414
415 NothingToConsume,
417
418 PendingTimestampPacket,
421
422 InvalidPartition(SampleSequenceLocation),
425
426 GapInSegment,
428
429 DepacketizerFailed,
431}