1use futures::future::{self, AbortHandle, abortable};
10use smallvec::SmallVec;
11
12use gst::glib;
13use gst::prelude::*;
14
15use std::sync::LazyLock;
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::sync::Mutex;
20
21use crate::runtime::Context;
22
23static DATA_QUEUE_CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
24 gst::DebugCategory::new(
25 "ts-dataqueue",
26 gst::DebugColorFlags::empty(),
27 Some("Thread-sharing queue"),
28 )
29});
30
31#[derive(Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
32#[repr(u32)]
33#[enum_type(name = "GstTsQueueLeakyMode")]
34pub enum QueueLeakyMode {
35 #[default]
36 #[enum_value(name = "Not Leaky", nick = "no")]
37 NotLeaky,
38 #[enum_value(name = "Leaky on upstream (new buffers)", nick = "upstream")]
39 Upstream,
40 #[enum_value(name = "Leaky on downstream (old buffers)", nick = "downstream")]
41 Downstream,
42}
43
44impl QueueLeakyMode {
45 pub fn is_leaky(self) -> bool {
46 !matches!(self, QueueLeakyMode::NotLeaky)
47 }
48}
49
50#[derive(Debug, Clone)]
51pub enum DataQueueItem {
52 Buffer(gst::Buffer),
53 BufferList(gst::BufferList),
54 Event(gst::Event),
55}
56
57impl DataQueueItem {
58 fn sizes(&self) -> (u32, u32, Option<gst::ClockTime>) {
59 match *self {
60 DataQueueItem::Buffer(ref buffer) => (1, buffer.size() as u32, buffer.dts_or_pts()),
61 DataQueueItem::BufferList(ref list) => {
62 let (size, ts) = list
63 .iter()
64 .fold((0, gst::ClockTime::NONE), |(size, first_ts), buf| {
65 (size + buf.size(), first_ts.or(buf.dts_or_pts()))
66 });
67 (list.len() as u32, size as u32, ts)
68 }
69 DataQueueItem::Event(_) => (0, 0, None),
70 }
71 }
72
73 fn timestamp(&self) -> Option<gst::ClockTime> {
74 match *self {
75 DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts(),
76 DataQueueItem::BufferList(ref list) => list.iter().find_map(|b| b.dts_or_pts()),
77 DataQueueItem::Event(_) => None,
78 }
79 }
80
81 fn last_timestamp(&self) -> Option<gst::ClockTime> {
82 match *self {
83 DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts(),
84 DataQueueItem::BufferList(ref list) => list.iter().rev().find_map(|b| b.dts_or_pts()),
85 DataQueueItem::Event(_) => None,
86 }
87 }
88}
89
90#[derive(Clone, Copy, Debug, PartialEq, Eq)]
91pub enum DataQueueState {
92 Started,
93 Stopped,
94}
95
96#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
97pub enum PushedStatus {
98 FirstBuffer,
99 GotBuffers,
100 #[default]
101 PendingBuffers,
102}
103
104impl PushedStatus {
105 pub fn is_first_buffer(self) -> bool {
106 matches!(self, PushedStatus::FirstBuffer)
107 }
108}
109
110#[derive(Clone, Debug)]
111pub struct DataQueue(Arc<Mutex<DataQueueInner>>);
112
113#[derive(Debug)]
114struct DataQueueInner {
115 element: gst::Element,
116 upstream_ctx: Option<Context>,
117 src_pad: gst::Pad,
118
119 state: DataQueueState,
120 queue: VecDeque<DataQueueItem>,
121
122 pushed_status: PushedStatus,
123
124 leaky_mode: QueueLeakyMode,
125 cur_level_buffers: u32,
126 cur_level_bytes: u32,
127 cur_level_time: gst::ClockTime,
128 max_size_buffers: Option<u32>,
129 max_size_bytes: Option<u32>,
130 max_size_time: Option<gst::ClockTime>,
131
132 pending_handle: Option<AbortHandle>,
133}
134
135impl DataQueueInner {
136 fn wake(&mut self) {
137 if let Some(pending_handle) = self.pending_handle.take() {
138 pending_handle.abort();
139 }
140 }
141
142 fn update_cur_time_level(&mut self) {
143 if let Some((first_ts, last_ts)) = Option::zip(
144 self.queue.iter().find_map(|i| i.timestamp()),
145 self.queue.iter().rev().find_map(|i| i.last_timestamp()),
146 ) {
147 self.cur_level_time = if last_ts >= first_ts {
148 last_ts - first_ts
149 } else {
150 first_ts - last_ts
151 };
152 } else {
153 self.cur_level_time = gst::ClockTime::ZERO;
154 }
155 }
156
157 #[track_caller]
165 fn dequeue_oldest_buffer_or_list(&mut self, obj: &gst::glib::Object) {
166 assert_eq!(self.leaky_mode, QueueLeakyMode::Downstream);
167
168 let mut items_to_restore = SmallVec::<[DataQueueItem; 4]>::new();
169
170 let oldest_buf_or_list = loop {
171 let item = self.queue.pop_front();
172 if let Some(DataQueueItem::Event(evt)) = item {
173 items_to_restore.push(DataQueueItem::Event(evt));
174 } else {
175 break item.expect("if queue is full, there ought to be a buffer to dequeue");
176 }
177 };
178
179 items_to_restore
180 .drain(..)
181 .rev()
182 .for_each(|item| self.queue.push_front(item));
183
184 let (count, bytes, ts) = oldest_buf_or_list.sizes();
185 self.cur_level_buffers = self.cur_level_buffers.saturating_sub(count);
186 self.cur_level_bytes = self.cur_level_bytes.saturating_sub(bytes);
187 if ts.is_some() {
188 self.update_cur_time_level();
189 }
190
191 gst::debug!(
192 DATA_QUEUE_CAT,
193 obj = obj,
194 "Dropped oldest item {oldest_buf_or_list:?}",
195 );
196 }
197}
198
199impl DataQueue {
200 pub fn builder(element: &gst::Element, src_pad: &gst::Pad) -> DataQueueBuilder {
201 DataQueueBuilder::new(element, src_pad)
202 }
203
204 pub fn state(&self) -> DataQueueState {
205 self.0.lock().unwrap().state
206 }
207
208 pub fn upstream_context(&self) -> Option<Context> {
209 self.0.lock().unwrap().upstream_ctx.clone()
210 }
211
212 pub fn cur_level_buffers(&self) -> u32 {
213 self.0.lock().unwrap().cur_level_buffers
214 }
215
216 pub fn cur_level_bytes(&self) -> u32 {
217 self.0.lock().unwrap().cur_level_bytes
218 }
219
220 pub fn cur_level_time(&self) -> gst::ClockTime {
221 self.0.lock().unwrap().cur_level_time
222 }
223
224 pub fn is_empty(&self) -> bool {
225 self.0.lock().unwrap().queue.is_empty()
226 }
227
228 pub fn start(&self) {
229 let mut inner = self.0.lock().unwrap();
230 if inner.state == DataQueueState::Started {
231 gst::debug!(
232 DATA_QUEUE_CAT,
233 obj = inner.element,
234 "Data queue already Started"
235 );
236 return;
237 }
238 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Starting data queue");
239 inner.state = DataQueueState::Started;
240 inner.wake();
241 }
242
243 pub fn stop(&self) {
244 let mut inner = self.0.lock().unwrap();
245 if inner.state == DataQueueState::Stopped {
246 gst::debug!(
247 DATA_QUEUE_CAT,
248 obj = inner.element,
249 "Data queue already Stopped"
250 );
251 return;
252 }
253 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Stopping data queue");
254 inner.state = DataQueueState::Stopped;
255 inner.wake();
256 }
257
258 pub fn clear(&self) {
259 let mut inner = self.0.lock().unwrap();
260
261 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Clearing data queue");
262
263 let src_pad = inner.src_pad.clone();
264 for item in inner.queue.drain(..) {
265 if let DataQueueItem::Event(event) = item
266 && event.is_sticky()
267 && event.type_() != gst::EventType::Segment
268 && event.type_() != gst::EventType::Eos
269 {
270 let _ = src_pad.store_sticky_event(&event);
271 }
272 }
273
274 inner.pushed_status = PushedStatus::default();
275
276 inner.cur_level_buffers = 0;
277 inner.cur_level_bytes = 0;
278 inner.cur_level_time = gst::ClockTime::ZERO;
279
280 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue cleared");
281 }
282
283 pub fn push(
284 &self,
285 obj: &gst::glib::Object,
286 item: DataQueueItem,
287 ) -> Result<PushedStatus, DataQueueItem> {
288 use QueueLeakyMode::*;
289
290 let mut inner = self.0.lock().unwrap();
291
292 if inner.state == DataQueueState::Stopped {
293 gst::debug!(
294 DATA_QUEUE_CAT,
295 obj = obj,
296 "Rejecting item {item:?} in state {:?}",
297 inner.state
298 );
299 return Err(item);
300 }
301
302 let (buffer_count, bytes, ts) = item.sizes();
303
304 if buffer_count > 0 {
305 if let Some(max) = inner.max_size_buffers
306 && max <= inner.cur_level_buffers
307 {
308 gst::debug!(
309 DATA_QUEUE_CAT,
310 obj = obj,
311 "Queue is full (buffers): {max} <= {} (leaky mode {:?}), {item:?}",
312 inner.cur_level_buffers,
313 inner.leaky_mode,
314 );
315
316 match inner.leaky_mode {
317 NotLeaky => return Err(item),
318 Upstream => return Ok(inner.pushed_status),
319 Downstream => inner.dequeue_oldest_buffer_or_list(obj),
320 }
321 }
322
323 if let Some(max) = inner.max_size_bytes
324 && max <= inner.cur_level_bytes
325 {
326 gst::debug!(
327 DATA_QUEUE_CAT,
328 obj = obj,
329 "Queue is full (bytes): {max} <= {} (leaky mode {:?}), {item:?}",
330 inner.cur_level_bytes,
331 inner.leaky_mode,
332 );
333
334 match inner.leaky_mode {
335 NotLeaky => return Err(item),
336 Upstream => return Ok(inner.pushed_status),
337 Downstream => inner.dequeue_oldest_buffer_or_list(obj),
338 }
339 }
340
341 if ts.is_some() {
342 if let Some(max) = inner.max_size_time
344 && max <= inner.cur_level_time
345 {
346 gst::debug!(
347 DATA_QUEUE_CAT,
348 obj = obj,
349 "Queue is full (time): {max} <= {} (leaky mode {:?}), {item:?}",
350 inner.cur_level_time,
351 inner.leaky_mode,
352 );
353
354 match inner.leaky_mode {
355 NotLeaky => return Err(item),
356 Upstream => return Ok(inner.pushed_status),
357 Downstream => inner.dequeue_oldest_buffer_or_list(obj),
358 }
359 }
360 }
361 }
362
363 gst::debug!(DATA_QUEUE_CAT, obj = obj, "Pushing item {item:?}");
364
365 use PushedStatus::*;
366 match inner.pushed_status {
367 GotBuffers => (),
368 FirstBuffer => inner.pushed_status = GotBuffers,
369 PendingBuffers => {
370 if let DataQueueItem::Buffer(_) | DataQueueItem::BufferList(_) = item {
371 inner.pushed_status = PushedStatus::FirstBuffer;
372 inner.upstream_ctx = Context::current();
373 }
374 }
375 }
376
377 inner.queue.push_back(item);
378
379 inner.cur_level_buffers += buffer_count;
380 inner.cur_level_bytes += bytes;
381 if ts.is_some() {
382 inner.update_cur_time_level();
383 }
384
385 inner.wake();
386
387 Ok(inner.pushed_status)
388 }
389
390 #[allow(clippy::should_implement_trait)]
392 pub async fn next(&mut self) -> Option<DataQueueItem> {
393 loop {
394 let pending_fut = {
395 let mut inner = self.0.lock().unwrap();
396 match inner.state {
397 DataQueueState::Started => match inner.queue.pop_front() {
398 None => {
399 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue is empty");
400 }
401 Some(item) => {
402 gst::debug!(
403 DATA_QUEUE_CAT,
404 obj = inner.element,
405 "Popped item {:?}",
406 item
407 );
408
409 let (buffer_count, bytes, ts) = item.sizes();
410 if buffer_count > 0 {
411 inner.cur_level_buffers -= buffer_count;
412 inner.cur_level_bytes -= bytes;
413
414 if ts.is_some() {
415 inner.update_cur_time_level();
416 }
417 }
418
419 return Some(item);
420 }
421 },
422 DataQueueState::Stopped => {
423 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue Stopped");
424 return None;
425 }
426 }
427
428 let (pending_fut, abort_handle) = abortable(future::pending::<()>());
429 inner.pending_handle = Some(abort_handle);
430
431 pending_fut
432 };
433
434 let _ = pending_fut.await;
435 }
436 }
437}
438
439#[derive(Debug)]
440pub struct DataQueueBuilder(DataQueueInner);
441
442impl DataQueueBuilder {
443 fn new(element: &gst::Element, src_pad: &gst::Pad) -> DataQueueBuilder {
444 DataQueueBuilder(DataQueueInner {
445 element: element.clone(),
446 upstream_ctx: None,
447 src_pad: src_pad.clone(),
448 state: DataQueueState::Stopped,
449 queue: VecDeque::new(),
450 pushed_status: PushedStatus::default(),
451 leaky_mode: QueueLeakyMode::NotLeaky,
452 cur_level_buffers: 0,
453 cur_level_bytes: 0,
454 cur_level_time: gst::ClockTime::ZERO,
455 max_size_buffers: None,
456 max_size_bytes: None,
457 max_size_time: None,
458 pending_handle: None,
459 })
460 }
461
462 pub fn leaky_mode(mut self, leaky_mode: QueueLeakyMode) -> Self {
463 self.0.leaky_mode = leaky_mode;
464 self
465 }
466
467 pub fn max_size_buffers(mut self, max_size_buffers: u32) -> Self {
468 self.0.max_size_buffers = (max_size_buffers > 0).then_some(max_size_buffers);
469 self
470 }
471
472 pub fn max_size_bytes(mut self, max_size_bytes: u32) -> Self {
473 self.0.max_size_bytes = (max_size_bytes > 0).then_some(max_size_bytes);
474 self
475 }
476
477 pub fn max_size_time(mut self, max_size_time: gst::ClockTime) -> Self {
478 self.0.max_size_time = (!max_size_time.is_zero()).then_some(max_size_time);
479 self
480 }
481
482 pub fn build(self) -> DataQueue {
483 DataQueue(Arc::new(Mutex::new(self.0)))
484 }
485}
486
487#[cfg(test)]
488mod tests {
489 use super::*;
490
491 fn init(test: &str) -> (gst::Element, gst::Pad) {
492 use std::sync::Once;
493 static INIT: Once = Once::new();
494
495 INIT.call_once(|| {
496 gst::init().unwrap();
497 });
498
499 let elem = gst::ElementFactory::make("fakesrc")
500 .name(test)
501 .build()
502 .unwrap();
503 let src_pad = elem.static_pad("src").unwrap();
504
505 (elem, src_pad)
506 }
507
508 #[track_caller]
509 fn push_initial_events(elem: &gst::Element, dq: &DataQueue) {
510 dq.push(
511 elem.upcast_ref(),
512 DataQueueItem::Event(
513 gst::event::Caps::builder(&gst::Caps::new_empty_simple("test/raw")).build(),
514 ),
515 )
516 .unwrap();
517 dq.push(
518 elem.upcast_ref(),
519 DataQueueItem::Event(gst::event::StreamStart::builder(&elem.name()).build()),
520 )
521 .unwrap();
522 dq.push(
523 elem.upcast_ref(),
524 DataQueueItem::Event(
525 gst::event::Segment::builder(&gst::FormattedSegment::<gst::format::Time>::new())
526 .build(),
527 ),
528 )
529 .unwrap();
530 }
531
532 #[track_caller]
533 fn push_segment_done(elem: &gst::Element, dq: &DataQueue) {
534 dq.push(
535 elem.upcast_ref(),
536 DataQueueItem::Event(gst::event::SegmentDone::builder(2.seconds()).build()),
537 )
538 .unwrap();
539 }
540
541 fn pop_item(dq: &mut DataQueue) -> Option<DataQueueItem> {
542 futures::executor::block_on(dq.next())
543 }
544
545 #[track_caller]
546 fn pop_event(dq: &mut DataQueue) -> gst::Event {
547 match pop_item(dq) {
548 Some(DataQueueItem::Event(evt)) => evt,
549 other => panic!("Unexpected {other:?}"),
550 }
551 }
552
553 #[track_caller]
554 fn pop_intial_events(dq: &mut DataQueue) {
555 assert_eq!(pop_event(dq).type_(), gst::EventType::Caps);
556 assert_eq!(pop_event(dq).type_(), gst::EventType::StreamStart);
557 assert_eq!(pop_event(dq).type_(), gst::EventType::Segment);
558 }
559
560 #[track_caller]
561 fn pop_segment_done(dq: &mut DataQueue) {
562 assert_eq!(pop_event(dq).type_(), gst::EventType::SegmentDone);
563 }
564
565 fn push_buffer(
566 elem: &gst::Element,
567 dq: &DataQueue,
568 ts: gst::ClockTime,
569 ) -> Result<PushedStatus, DataQueueItem> {
570 let mut buf = gst::Buffer::from_slice([0u8]);
571 {
572 let buf_mut = buf.make_mut();
573 buf_mut.set_pts(ts);
574 }
575
576 dq.push(elem.upcast_ref(), DataQueueItem::Buffer(buf))
577 }
578
579 #[track_caller]
580 fn pop_buffer(dq: &mut DataQueue) -> gst::Buffer {
581 match pop_item(dq) {
582 Some(DataQueueItem::Buffer(buf)) => buf,
583 other => panic!("Unexpected {other:?}"),
584 }
585 }
586
587 fn push_buffer_list(
588 elem: &gst::Element,
589 dq: &DataQueue,
590 ts: gst::ClockTime,
591 ) -> Result<PushedStatus, DataQueueItem> {
592 let mut buf1 = gst::Buffer::from_slice([0u8]);
593 {
594 let buf_mut = buf1.make_mut();
595 buf_mut.set_pts(ts);
596 }
597 let mut buf2 = gst::Buffer::from_slice([0u8]);
598 {
599 let buf_mut = buf2.make_mut();
600 buf_mut.set_pts(ts + 1.seconds());
601 }
602
603 dq.push(
604 elem.upcast_ref(),
605 DataQueueItem::BufferList(gst::BufferList::from([buf1, buf2])),
606 )
607 }
608
609 #[track_caller]
610 fn pop_buffer_list(dq: &mut DataQueue) -> gst::BufferList {
611 match pop_item(dq) {
612 Some(DataQueueItem::BufferList(buf_list)) => buf_list,
613 other => panic!("Unexpected {other:?}"),
614 }
615 }
616
617 fn test_not_leaky(elem: &gst::Element, dq: &mut DataQueue) {
618 dq.start();
619
620 push_initial_events(elem, dq);
622 assert_eq!(dq.cur_level_buffers(), 0);
623 assert_eq!(dq.cur_level_bytes(), 0);
624 assert_eq!(dq.cur_level_time(), 0.seconds());
625
626 push_buffer(elem, dq, 0.seconds()).unwrap();
627 push_buffer(elem, dq, 1.seconds()).unwrap();
628 assert_eq!(dq.cur_level_buffers(), 2);
629 assert_eq!(dq.cur_level_bytes(), 2);
630 assert_eq!(dq.cur_level_time(), 1.seconds());
631 let rejected_buf = match push_buffer(elem, dq, 2.seconds()).unwrap_err() {
632 DataQueueItem::Buffer(buf) => buf,
633 other => panic!("Unexpected {other:?}"),
634 };
635 assert_eq!(rejected_buf.pts(), Some(2.seconds()));
636 assert_eq!(dq.cur_level_buffers(), 2);
637 assert_eq!(dq.cur_level_bytes(), 2);
638 assert_eq!(dq.cur_level_time(), 1.seconds());
639 push_segment_done(elem, dq);
640
641 pop_intial_events(dq);
642 let buf = pop_buffer(dq);
643 assert_eq!(buf.pts(), Some(0.seconds()));
644 assert_eq!(dq.cur_level_buffers(), 1);
645 assert_eq!(dq.cur_level_bytes(), 1);
646 assert_eq!(dq.cur_level_time(), 0.seconds());
647 let buf = pop_buffer(dq);
648 assert_eq!(buf.pts(), Some(1.seconds()));
649 assert_eq!(dq.cur_level_buffers(), 0);
650 assert_eq!(dq.cur_level_bytes(), 0);
651 assert_eq!(dq.cur_level_time(), 0.seconds());
652 pop_segment_done(dq);
653 assert!(dq.is_empty());
654
655 push_initial_events(elem, dq);
657 assert_eq!(dq.cur_level_buffers(), 0);
658 assert_eq!(dq.cur_level_bytes(), 0);
659 assert_eq!(dq.cur_level_time(), 0.seconds());
660
661 push_buffer_list(elem, dq, 0.seconds()).unwrap();
662 assert_eq!(dq.cur_level_buffers(), 2);
663 assert_eq!(dq.cur_level_bytes(), 2);
664 assert_eq!(dq.cur_level_time(), 1.seconds());
665
666 let rejected_buf_list = match push_buffer_list(elem, dq, 2.seconds()).unwrap_err() {
667 DataQueueItem::BufferList(buf_list) => buf_list,
668 other => panic!("Unexpected {other:?}"),
669 };
670 assert_eq!(rejected_buf_list.len(), 2);
671 let buf = rejected_buf_list.get(0).unwrap();
672 assert_eq!(buf.pts(), Some(2.seconds()));
673
674 let rejected_buf = match push_buffer(elem, dq, 2.seconds()).unwrap_err() {
675 DataQueueItem::Buffer(buf) => buf,
676 other => panic!("Unexpected {other:?}"),
677 };
678 assert_eq!(rejected_buf.pts(), Some(2.seconds()));
679 assert_eq!(dq.cur_level_buffers(), 2);
680 assert_eq!(dq.cur_level_bytes(), 2);
681 assert_eq!(dq.cur_level_time(), 1.seconds());
682
683 push_segment_done(elem, dq);
684
685 pop_intial_events(dq);
686 let buf_list = pop_buffer_list(dq);
687 let buf = buf_list.get(0).unwrap();
688 assert_eq!(buf.pts(), Some(0.seconds()));
689 assert_eq!(dq.cur_level_buffers(), 0);
690 assert_eq!(dq.cur_level_bytes(), 0);
691 assert_eq!(dq.cur_level_time(), 0.seconds());
692 pop_segment_done(dq);
693 assert!(dq.is_empty());
694 }
695
696 #[test]
697 fn not_leaky_max_size_buffers() {
698 let (elem, src_pad) = init("not_leaky - max_size_buffers");
699
700 let mut dq = DataQueueBuilder::new(&elem, &src_pad)
701 .max_size_buffers(2)
702 .max_size_bytes(0)
703 .max_size_time(gst::ClockTime::ZERO)
704 .build();
705
706 test_not_leaky(&elem, &mut dq);
707 }
708
709 #[test]
710 fn not_leaky_max_size_bytes() {
711 let (elem, src_pad) = init("not_leaky - max_size_bytes");
712
713 let mut dq = DataQueueBuilder::new(&elem, &src_pad)
714 .max_size_buffers(0)
715 .max_size_bytes(2)
716 .max_size_time(gst::ClockTime::ZERO)
717 .build();
718
719 test_not_leaky(&elem, &mut dq);
720 }
721
722 #[test]
723 fn not_leaky_max_size_time() {
724 let (elem, src_pad) = init("not_leaky - max_size_time");
725
726 let mut dq = DataQueueBuilder::new(&elem, &src_pad)
727 .max_size_buffers(0)
728 .max_size_bytes(0)
729 .max_size_time(1.seconds())
730 .build();
731
732 test_not_leaky(&elem, &mut dq);
733 }
734
735 fn test_leaky_upstream(elem: &gst::Element, dq: &mut DataQueue) {
736 dq.start();
737
738 push_initial_events(elem, dq);
740
741 push_buffer(elem, dq, 0.seconds()).unwrap();
742 assert_eq!(dq.cur_level_buffers(), 1);
743 assert_eq!(dq.cur_level_bytes(), 1);
744 assert_eq!(dq.cur_level_time(), 0.seconds());
745 push_buffer(elem, dq, 1.seconds()).unwrap();
746 assert_eq!(dq.cur_level_buffers(), 2);
747 assert_eq!(dq.cur_level_bytes(), 2);
748 assert_eq!(dq.cur_level_time(), 1.seconds());
749 push_buffer(elem, dq, 2.seconds()).unwrap();
750 assert_eq!(dq.cur_level_buffers(), 2);
751 assert_eq!(dq.cur_level_bytes(), 2);
752 assert_eq!(dq.cur_level_time(), 1.seconds());
753 push_segment_done(elem, dq);
754
755 pop_intial_events(dq);
756 let buf = pop_buffer(dq);
757 assert_eq!(buf.pts(), Some(0.seconds()));
758 let buf = pop_buffer(dq);
759 assert_eq!(buf.pts(), Some(1.seconds()));
760 assert_eq!(dq.cur_level_buffers(), 0);
761 assert_eq!(dq.cur_level_bytes(), 0);
762 assert_eq!(dq.cur_level_time(), 0.seconds());
763 pop_segment_done(dq);
764 assert!(dq.is_empty());
765
766 push_initial_events(elem, dq);
768
769 push_buffer_list(elem, dq, 0.seconds()).unwrap();
770 assert_eq!(dq.cur_level_buffers(), 2);
771 assert_eq!(dq.cur_level_bytes(), 2);
772 assert_eq!(dq.cur_level_time(), 1.seconds());
773
774 push_buffer_list(elem, dq, 2.seconds()).unwrap();
775 assert_eq!(dq.cur_level_buffers(), 2);
776 assert_eq!(dq.cur_level_bytes(), 2);
777 assert_eq!(dq.cur_level_time(), 1.seconds());
778
779 push_buffer(elem, dq, 4.seconds()).unwrap();
780 assert_eq!(dq.cur_level_buffers(), 2);
781 assert_eq!(dq.cur_level_bytes(), 2);
782 assert_eq!(dq.cur_level_time(), 1.seconds());
783
784 push_segment_done(elem, dq);
785
786 pop_intial_events(dq);
787 let buf_list = pop_buffer_list(dq);
788 let buf = buf_list.get(0).unwrap();
789 assert_eq!(buf.pts(), Some(0.seconds()));
790 assert_eq!(dq.cur_level_buffers(), 0);
791 assert_eq!(dq.cur_level_bytes(), 0);
792 assert_eq!(dq.cur_level_time(), 0.seconds());
793 pop_segment_done(dq);
794 assert!(dq.is_empty());
795 }
796
797 #[test]
798 fn leaky_upstream_max_size_buffers() {
799 let (elem, src_pad) = init("leaky_upstream - max_size_buffers");
800
801 let mut dq = DataQueueBuilder::new(&elem, &src_pad)
802 .leaky_mode(QueueLeakyMode::Upstream)
803 .max_size_buffers(2)
804 .max_size_bytes(0)
805 .max_size_time(gst::ClockTime::ZERO)
806 .build();
807
808 test_leaky_upstream(&elem, &mut dq);
809 }
810
811 #[test]
812 fn leaky_upstream_max_size_bytes() {
813 let (elem, src_pad) = init("leaky_upstream - max_size_bytes");
814
815 let mut dq = DataQueueBuilder::new(&elem, &src_pad)
816 .leaky_mode(QueueLeakyMode::Upstream)
817 .max_size_buffers(0)
818 .max_size_bytes(2)
819 .max_size_time(gst::ClockTime::ZERO)
820 .build();
821
822 test_leaky_upstream(&elem, &mut dq);
823 }
824
825 #[test]
826 fn leaky_upstream_max_size_time() {
827 let (elem, src_pad) = init("leaky_upstream - max_size_time");
828
829 let mut dq = DataQueueBuilder::new(&elem, &src_pad)
830 .leaky_mode(QueueLeakyMode::Upstream)
831 .max_size_buffers(0)
832 .max_size_bytes(0)
833 .max_size_time(1.seconds())
834 .build();
835
836 test_leaky_upstream(&elem, &mut dq);
837 }
838
839 fn test_leaky_downstream(elem: &gst::Element, dq: &mut DataQueue) {
840 dq.start();
841
842 push_initial_events(elem, dq);
844
845 push_buffer(elem, dq, 0.seconds()).unwrap();
846 assert_eq!(dq.cur_level_buffers(), 1);
847 assert_eq!(dq.cur_level_bytes(), 1);
848 assert_eq!(dq.cur_level_time(), 0.seconds());
849 push_buffer(elem, dq, 1.seconds()).unwrap();
850 assert_eq!(dq.cur_level_buffers(), 2);
851 assert_eq!(dq.cur_level_bytes(), 2);
852 assert_eq!(dq.cur_level_time(), 1.seconds());
853 push_buffer(elem, dq, 2.seconds()).unwrap();
854 assert_eq!(dq.cur_level_buffers(), 2);
855 assert_eq!(dq.cur_level_bytes(), 2);
856 assert_eq!(dq.cur_level_time(), 1.seconds());
857 push_segment_done(elem, dq);
858
859 pop_intial_events(dq);
860 let buf = pop_buffer(dq);
861 assert_eq!(buf.pts(), Some(1.seconds()));
862 let buf = pop_buffer(dq);
863 assert_eq!(buf.pts(), Some(2.seconds()));
864 assert_eq!(dq.cur_level_buffers(), 0);
865 assert_eq!(dq.cur_level_bytes(), 0);
866 assert_eq!(dq.cur_level_time(), 0.seconds());
867 pop_segment_done(dq);
868 assert!(dq.is_empty());
869
870 push_initial_events(elem, dq);
872
873 push_buffer(elem, dq, 0.seconds()).unwrap();
874 push_buffer(elem, dq, 1.seconds()).unwrap();
875 push_buffer_list(elem, dq, 2.seconds()).unwrap();
877 assert_eq!(dq.cur_level_buffers(), 3);
878 assert_eq!(dq.cur_level_bytes(), 3);
879 assert_eq!(dq.cur_level_time(), 2.seconds());
880 push_buffer_list(elem, dq, 4.seconds()).unwrap();
882 assert_eq!(dq.cur_level_buffers(), 4);
883 assert_eq!(dq.cur_level_bytes(), 4);
884 assert_eq!(dq.cur_level_time(), 3.seconds());
885 push_buffer(elem, dq, 6.seconds()).unwrap();
887 assert_eq!(dq.cur_level_buffers(), 3);
888 assert_eq!(dq.cur_level_bytes(), 3);
889 assert_eq!(dq.cur_level_time(), 2.seconds());
890
891 push_segment_done(elem, dq);
892
893 pop_intial_events(dq);
894 let buf_list = pop_buffer_list(dq);
895 let buf = buf_list.get(0).unwrap();
896 assert_eq!(buf.pts(), Some(4.seconds()));
897 assert_eq!(dq.cur_level_buffers(), 1);
898 assert_eq!(dq.cur_level_bytes(), 1);
899 assert_eq!(dq.cur_level_time(), 0.seconds());
900 let buf = pop_buffer(dq);
901 assert_eq!(buf.pts(), Some(6.seconds()));
902 assert_eq!(dq.cur_level_buffers(), 0);
903 assert_eq!(dq.cur_level_bytes(), 0);
904 assert_eq!(dq.cur_level_time(), 0.seconds());
905 pop_segment_done(dq);
906 assert!(dq.is_empty());
907 }
908
909 #[test]
910 fn leaky_downstream_max_size_buffers() {
911 let (elem, src_pad) = init("leaky_downstream - max_size_buffers");
912
913 let mut dq = DataQueueBuilder::new(&elem, &src_pad)
914 .leaky_mode(QueueLeakyMode::Downstream)
915 .max_size_buffers(2)
916 .max_size_bytes(0)
917 .max_size_time(gst::ClockTime::ZERO)
918 .build();
919
920 test_leaky_downstream(&elem, &mut dq);
921 }
922
923 #[test]
924 fn leaky_downstream_max_size_bytes() {
925 let (elem, src_pad) = init("leaky_downstream - max_size_bytes");
926
927 let mut dq = DataQueueBuilder::new(&elem, &src_pad)
928 .leaky_mode(QueueLeakyMode::Downstream)
929 .max_size_buffers(0)
930 .max_size_bytes(2)
931 .max_size_time(gst::ClockTime::ZERO)
932 .build();
933
934 test_leaky_downstream(&elem, &mut dq);
935 }
936
937 #[test]
938 fn leaky_downstream_max_size_time() {
939 let (elem, src_pad) = init("leaky_downstream - max_size_time");
940
941 let mut dq = DataQueueBuilder::new(&elem, &src_pad)
942 .leaky_mode(QueueLeakyMode::Downstream)
943 .max_size_buffers(0)
944 .max_size_bytes(0)
945 .max_size_time(1.seconds())
946 .build();
947
948 test_leaky_downstream(&elem, &mut dq);
949 }
950}