Skip to main content

gstthreadshare/
dataqueue.rs

1// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
4// If a copy of the MPL was not distributed with this file, You can obtain one at
5// <https://mozilla.org/MPL/2.0/>.
6//
7// SPDX-License-Identifier: MPL-2.0
8
9use futures::future::{self, AbortHandle, abortable};
10use smallvec::SmallVec;
11
12use gst::glib;
13use gst::prelude::*;
14
15use std::sync::LazyLock;
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19use std::sync::Mutex;
20
21use crate::runtime::Context;
22
23static DATA_QUEUE_CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
24    gst::DebugCategory::new(
25        "ts-dataqueue",
26        gst::DebugColorFlags::empty(),
27        Some("Thread-sharing queue"),
28    )
29});
30
31#[derive(Debug, Default, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
32#[repr(u32)]
33#[enum_type(name = "GstTsQueueLeakyMode")]
34pub enum QueueLeakyMode {
35    #[default]
36    #[enum_value(name = "Not Leaky", nick = "no")]
37    NotLeaky,
38    #[enum_value(name = "Leaky on upstream (new buffers)", nick = "upstream")]
39    Upstream,
40    #[enum_value(name = "Leaky on downstream (old buffers)", nick = "downstream")]
41    Downstream,
42}
43
44impl QueueLeakyMode {
45    pub fn is_leaky(self) -> bool {
46        !matches!(self, QueueLeakyMode::NotLeaky)
47    }
48}
49
50#[derive(Debug, Clone)]
51pub enum DataQueueItem {
52    Buffer(gst::Buffer),
53    BufferList(gst::BufferList),
54    Event(gst::Event),
55}
56
57impl DataQueueItem {
58    fn sizes(&self) -> (u32, u32, Option<gst::ClockTime>) {
59        match *self {
60            DataQueueItem::Buffer(ref buffer) => (1, buffer.size() as u32, buffer.dts_or_pts()),
61            DataQueueItem::BufferList(ref list) => {
62                let (size, ts) = list
63                    .iter()
64                    .fold((0, gst::ClockTime::NONE), |(size, first_ts), buf| {
65                        (size + buf.size(), first_ts.or(buf.dts_or_pts()))
66                    });
67                (list.len() as u32, size as u32, ts)
68            }
69            DataQueueItem::Event(_) => (0, 0, None),
70        }
71    }
72
73    fn timestamp(&self) -> Option<gst::ClockTime> {
74        match *self {
75            DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts(),
76            DataQueueItem::BufferList(ref list) => list.iter().find_map(|b| b.dts_or_pts()),
77            DataQueueItem::Event(_) => None,
78        }
79    }
80
81    fn last_timestamp(&self) -> Option<gst::ClockTime> {
82        match *self {
83            DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts(),
84            DataQueueItem::BufferList(ref list) => list.iter().rev().find_map(|b| b.dts_or_pts()),
85            DataQueueItem::Event(_) => None,
86        }
87    }
88}
89
90#[derive(Clone, Copy, Debug, PartialEq, Eq)]
91pub enum DataQueueState {
92    Started,
93    Stopped,
94}
95
96#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
97pub enum PushedStatus {
98    FirstBuffer,
99    GotBuffers,
100    #[default]
101    PendingBuffers,
102}
103
104impl PushedStatus {
105    pub fn is_first_buffer(self) -> bool {
106        matches!(self, PushedStatus::FirstBuffer)
107    }
108}
109
110#[derive(Clone, Debug)]
111pub struct DataQueue(Arc<Mutex<DataQueueInner>>);
112
113#[derive(Debug)]
114struct DataQueueInner {
115    element: gst::Element,
116    upstream_ctx: Option<Context>,
117    src_pad: gst::Pad,
118
119    state: DataQueueState,
120    queue: VecDeque<DataQueueItem>,
121
122    pushed_status: PushedStatus,
123
124    leaky_mode: QueueLeakyMode,
125    cur_level_buffers: u32,
126    cur_level_bytes: u32,
127    cur_level_time: gst::ClockTime,
128    max_size_buffers: Option<u32>,
129    max_size_bytes: Option<u32>,
130    max_size_time: Option<gst::ClockTime>,
131
132    pending_handle: Option<AbortHandle>,
133}
134
135impl DataQueueInner {
136    fn wake(&mut self) {
137        if let Some(pending_handle) = self.pending_handle.take() {
138            pending_handle.abort();
139        }
140    }
141
142    fn update_cur_time_level(&mut self) {
143        if let Some((first_ts, last_ts)) = Option::zip(
144            self.queue.iter().find_map(|i| i.timestamp()),
145            self.queue.iter().rev().find_map(|i| i.last_timestamp()),
146        ) {
147            self.cur_level_time = if last_ts >= first_ts {
148                last_ts - first_ts
149            } else {
150                first_ts - last_ts
151            };
152        } else {
153            self.cur_level_time = gst::ClockTime::ZERO;
154        }
155    }
156
157    /// Dequeues oldest buffer or buffer list, keeping events if any.
158    ///
159    /// Updates current levels.
160    ///
161    /// # Panic
162    ///
163    /// Panics if called when leaky_mode is not Downstream.
164    #[track_caller]
165    fn dequeue_oldest_buffer_or_list(&mut self, obj: &gst::glib::Object) {
166        assert_eq!(self.leaky_mode, QueueLeakyMode::Downstream);
167
168        let mut items_to_restore = SmallVec::<[DataQueueItem; 4]>::new();
169
170        let oldest_buf_or_list = loop {
171            let item = self.queue.pop_front();
172            if let Some(DataQueueItem::Event(evt)) = item {
173                items_to_restore.push(DataQueueItem::Event(evt));
174            } else {
175                break item.expect("if queue is full, there ought to be a buffer to dequeue");
176            }
177        };
178
179        items_to_restore
180            .drain(..)
181            .rev()
182            .for_each(|item| self.queue.push_front(item));
183
184        let (count, bytes, ts) = oldest_buf_or_list.sizes();
185        self.cur_level_buffers = self.cur_level_buffers.saturating_sub(count);
186        self.cur_level_bytes = self.cur_level_bytes.saturating_sub(bytes);
187        if ts.is_some() {
188            self.update_cur_time_level();
189        }
190
191        gst::debug!(
192            DATA_QUEUE_CAT,
193            obj = obj,
194            "Dropped oldest item {oldest_buf_or_list:?}",
195        );
196    }
197}
198
199impl DataQueue {
200    pub fn builder(element: &gst::Element, src_pad: &gst::Pad) -> DataQueueBuilder {
201        DataQueueBuilder::new(element, src_pad)
202    }
203
204    pub fn state(&self) -> DataQueueState {
205        self.0.lock().unwrap().state
206    }
207
208    pub fn upstream_context(&self) -> Option<Context> {
209        self.0.lock().unwrap().upstream_ctx.clone()
210    }
211
212    pub fn cur_level_buffers(&self) -> u32 {
213        self.0.lock().unwrap().cur_level_buffers
214    }
215
216    pub fn cur_level_bytes(&self) -> u32 {
217        self.0.lock().unwrap().cur_level_bytes
218    }
219
220    pub fn cur_level_time(&self) -> gst::ClockTime {
221        self.0.lock().unwrap().cur_level_time
222    }
223
224    pub fn is_empty(&self) -> bool {
225        self.0.lock().unwrap().queue.is_empty()
226    }
227
228    pub fn start(&self) {
229        let mut inner = self.0.lock().unwrap();
230        if inner.state == DataQueueState::Started {
231            gst::debug!(
232                DATA_QUEUE_CAT,
233                obj = inner.element,
234                "Data queue already Started"
235            );
236            return;
237        }
238        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Starting data queue");
239        inner.state = DataQueueState::Started;
240        inner.wake();
241    }
242
243    pub fn stop(&self) {
244        let mut inner = self.0.lock().unwrap();
245        if inner.state == DataQueueState::Stopped {
246            gst::debug!(
247                DATA_QUEUE_CAT,
248                obj = inner.element,
249                "Data queue already Stopped"
250            );
251            return;
252        }
253        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Stopping data queue");
254        inner.state = DataQueueState::Stopped;
255        inner.wake();
256    }
257
258    pub fn clear(&self) {
259        let mut inner = self.0.lock().unwrap();
260
261        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Clearing data queue");
262
263        let src_pad = inner.src_pad.clone();
264        for item in inner.queue.drain(..) {
265            if let DataQueueItem::Event(event) = item
266                && event.is_sticky()
267                && event.type_() != gst::EventType::Segment
268                && event.type_() != gst::EventType::Eos
269            {
270                let _ = src_pad.store_sticky_event(&event);
271            }
272        }
273
274        inner.pushed_status = PushedStatus::default();
275
276        inner.cur_level_buffers = 0;
277        inner.cur_level_bytes = 0;
278        inner.cur_level_time = gst::ClockTime::ZERO;
279
280        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue cleared");
281    }
282
283    pub fn push(
284        &self,
285        obj: &gst::glib::Object,
286        item: DataQueueItem,
287    ) -> Result<PushedStatus, DataQueueItem> {
288        use QueueLeakyMode::*;
289
290        let mut inner = self.0.lock().unwrap();
291
292        if inner.state == DataQueueState::Stopped {
293            gst::debug!(
294                DATA_QUEUE_CAT,
295                obj = obj,
296                "Rejecting item {item:?} in state {:?}",
297                inner.state
298            );
299            return Err(item);
300        }
301
302        let (buffer_count, bytes, ts) = item.sizes();
303
304        if buffer_count > 0 {
305            if let Some(max) = inner.max_size_buffers
306                && max <= inner.cur_level_buffers
307            {
308                gst::debug!(
309                    DATA_QUEUE_CAT,
310                    obj = obj,
311                    "Queue is full (buffers): {max} <= {} (leaky mode {:?}), {item:?}",
312                    inner.cur_level_buffers,
313                    inner.leaky_mode,
314                );
315
316                match inner.leaky_mode {
317                    NotLeaky => return Err(item),
318                    Upstream => return Ok(inner.pushed_status),
319                    Downstream => inner.dequeue_oldest_buffer_or_list(obj),
320                }
321            }
322
323            if let Some(max) = inner.max_size_bytes
324                && max <= inner.cur_level_bytes
325            {
326                gst::debug!(
327                    DATA_QUEUE_CAT,
328                    obj = obj,
329                    "Queue is full (bytes): {max} <= {} (leaky mode {:?}), {item:?}",
330                    inner.cur_level_bytes,
331                    inner.leaky_mode,
332                );
333
334                match inner.leaky_mode {
335                    NotLeaky => return Err(item),
336                    Upstream => return Ok(inner.pushed_status),
337                    Downstream => inner.dequeue_oldest_buffer_or_list(obj),
338                }
339            }
340
341            if ts.is_some() {
342                // FIXME: Use running time
343                if let Some(max) = inner.max_size_time
344                    && max <= inner.cur_level_time
345                {
346                    gst::debug!(
347                        DATA_QUEUE_CAT,
348                        obj = obj,
349                        "Queue is full (time): {max} <= {} (leaky mode {:?}), {item:?}",
350                        inner.cur_level_time,
351                        inner.leaky_mode,
352                    );
353
354                    match inner.leaky_mode {
355                        NotLeaky => return Err(item),
356                        Upstream => return Ok(inner.pushed_status),
357                        Downstream => inner.dequeue_oldest_buffer_or_list(obj),
358                    }
359                }
360            }
361        }
362
363        gst::debug!(DATA_QUEUE_CAT, obj = obj, "Pushing item {item:?}");
364
365        use PushedStatus::*;
366        match inner.pushed_status {
367            GotBuffers => (),
368            FirstBuffer => inner.pushed_status = GotBuffers,
369            PendingBuffers => {
370                if let DataQueueItem::Buffer(_) | DataQueueItem::BufferList(_) = item {
371                    inner.pushed_status = PushedStatus::FirstBuffer;
372                    inner.upstream_ctx = Context::current();
373                }
374            }
375        }
376
377        inner.queue.push_back(item);
378
379        inner.cur_level_buffers += buffer_count;
380        inner.cur_level_bytes += bytes;
381        if ts.is_some() {
382            inner.update_cur_time_level();
383        }
384
385        inner.wake();
386
387        Ok(inner.pushed_status)
388    }
389
390    // TODO: implement as a Stream now that we use a StdMutex
391    #[allow(clippy::should_implement_trait)]
392    pub async fn next(&mut self) -> Option<DataQueueItem> {
393        loop {
394            let pending_fut = {
395                let mut inner = self.0.lock().unwrap();
396                match inner.state {
397                    DataQueueState::Started => match inner.queue.pop_front() {
398                        None => {
399                            gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue is empty");
400                        }
401                        Some(item) => {
402                            gst::debug!(
403                                DATA_QUEUE_CAT,
404                                obj = inner.element,
405                                "Popped item {:?}",
406                                item
407                            );
408
409                            let (buffer_count, bytes, ts) = item.sizes();
410                            if buffer_count > 0 {
411                                inner.cur_level_buffers -= buffer_count;
412                                inner.cur_level_bytes -= bytes;
413
414                                if ts.is_some() {
415                                    inner.update_cur_time_level();
416                                }
417                            }
418
419                            return Some(item);
420                        }
421                    },
422                    DataQueueState::Stopped => {
423                        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue Stopped");
424                        return None;
425                    }
426                }
427
428                let (pending_fut, abort_handle) = abortable(future::pending::<()>());
429                inner.pending_handle = Some(abort_handle);
430
431                pending_fut
432            };
433
434            let _ = pending_fut.await;
435        }
436    }
437}
438
439#[derive(Debug)]
440pub struct DataQueueBuilder(DataQueueInner);
441
442impl DataQueueBuilder {
443    fn new(element: &gst::Element, src_pad: &gst::Pad) -> DataQueueBuilder {
444        DataQueueBuilder(DataQueueInner {
445            element: element.clone(),
446            upstream_ctx: None,
447            src_pad: src_pad.clone(),
448            state: DataQueueState::Stopped,
449            queue: VecDeque::new(),
450            pushed_status: PushedStatus::default(),
451            leaky_mode: QueueLeakyMode::NotLeaky,
452            cur_level_buffers: 0,
453            cur_level_bytes: 0,
454            cur_level_time: gst::ClockTime::ZERO,
455            max_size_buffers: None,
456            max_size_bytes: None,
457            max_size_time: None,
458            pending_handle: None,
459        })
460    }
461
462    pub fn leaky_mode(mut self, leaky_mode: QueueLeakyMode) -> Self {
463        self.0.leaky_mode = leaky_mode;
464        self
465    }
466
467    pub fn max_size_buffers(mut self, max_size_buffers: u32) -> Self {
468        self.0.max_size_buffers = (max_size_buffers > 0).then_some(max_size_buffers);
469        self
470    }
471
472    pub fn max_size_bytes(mut self, max_size_bytes: u32) -> Self {
473        self.0.max_size_bytes = (max_size_bytes > 0).then_some(max_size_bytes);
474        self
475    }
476
477    pub fn max_size_time(mut self, max_size_time: gst::ClockTime) -> Self {
478        self.0.max_size_time = (!max_size_time.is_zero()).then_some(max_size_time);
479        self
480    }
481
482    pub fn build(self) -> DataQueue {
483        DataQueue(Arc::new(Mutex::new(self.0)))
484    }
485}
486
487#[cfg(test)]
488mod tests {
489    use super::*;
490
491    fn init(test: &str) -> (gst::Element, gst::Pad) {
492        use std::sync::Once;
493        static INIT: Once = Once::new();
494
495        INIT.call_once(|| {
496            gst::init().unwrap();
497        });
498
499        let elem = gst::ElementFactory::make("fakesrc")
500            .name(test)
501            .build()
502            .unwrap();
503        let src_pad = elem.static_pad("src").unwrap();
504
505        (elem, src_pad)
506    }
507
508    #[track_caller]
509    fn push_initial_events(elem: &gst::Element, dq: &DataQueue) {
510        dq.push(
511            elem.upcast_ref(),
512            DataQueueItem::Event(
513                gst::event::Caps::builder(&gst::Caps::new_empty_simple("test/raw")).build(),
514            ),
515        )
516        .unwrap();
517        dq.push(
518            elem.upcast_ref(),
519            DataQueueItem::Event(gst::event::StreamStart::builder(&elem.name()).build()),
520        )
521        .unwrap();
522        dq.push(
523            elem.upcast_ref(),
524            DataQueueItem::Event(
525                gst::event::Segment::builder(&gst::FormattedSegment::<gst::format::Time>::new())
526                    .build(),
527            ),
528        )
529        .unwrap();
530    }
531
532    #[track_caller]
533    fn push_segment_done(elem: &gst::Element, dq: &DataQueue) {
534        dq.push(
535            elem.upcast_ref(),
536            DataQueueItem::Event(gst::event::SegmentDone::builder(2.seconds()).build()),
537        )
538        .unwrap();
539    }
540
541    fn pop_item(dq: &mut DataQueue) -> Option<DataQueueItem> {
542        futures::executor::block_on(dq.next())
543    }
544
545    #[track_caller]
546    fn pop_event(dq: &mut DataQueue) -> gst::Event {
547        match pop_item(dq) {
548            Some(DataQueueItem::Event(evt)) => evt,
549            other => panic!("Unexpected {other:?}"),
550        }
551    }
552
553    #[track_caller]
554    fn pop_intial_events(dq: &mut DataQueue) {
555        assert_eq!(pop_event(dq).type_(), gst::EventType::Caps);
556        assert_eq!(pop_event(dq).type_(), gst::EventType::StreamStart);
557        assert_eq!(pop_event(dq).type_(), gst::EventType::Segment);
558    }
559
560    #[track_caller]
561    fn pop_segment_done(dq: &mut DataQueue) {
562        assert_eq!(pop_event(dq).type_(), gst::EventType::SegmentDone);
563    }
564
565    fn push_buffer(
566        elem: &gst::Element,
567        dq: &DataQueue,
568        ts: gst::ClockTime,
569    ) -> Result<PushedStatus, DataQueueItem> {
570        let mut buf = gst::Buffer::from_slice([0u8]);
571        {
572            let buf_mut = buf.make_mut();
573            buf_mut.set_pts(ts);
574        }
575
576        dq.push(elem.upcast_ref(), DataQueueItem::Buffer(buf))
577    }
578
579    #[track_caller]
580    fn pop_buffer(dq: &mut DataQueue) -> gst::Buffer {
581        match pop_item(dq) {
582            Some(DataQueueItem::Buffer(buf)) => buf,
583            other => panic!("Unexpected {other:?}"),
584        }
585    }
586
587    fn push_buffer_list(
588        elem: &gst::Element,
589        dq: &DataQueue,
590        ts: gst::ClockTime,
591    ) -> Result<PushedStatus, DataQueueItem> {
592        let mut buf1 = gst::Buffer::from_slice([0u8]);
593        {
594            let buf_mut = buf1.make_mut();
595            buf_mut.set_pts(ts);
596        }
597        let mut buf2 = gst::Buffer::from_slice([0u8]);
598        {
599            let buf_mut = buf2.make_mut();
600            buf_mut.set_pts(ts + 1.seconds());
601        }
602
603        dq.push(
604            elem.upcast_ref(),
605            DataQueueItem::BufferList(gst::BufferList::from([buf1, buf2])),
606        )
607    }
608
609    #[track_caller]
610    fn pop_buffer_list(dq: &mut DataQueue) -> gst::BufferList {
611        match pop_item(dq) {
612            Some(DataQueueItem::BufferList(buf_list)) => buf_list,
613            other => panic!("Unexpected {other:?}"),
614        }
615    }
616
617    fn test_not_leaky(elem: &gst::Element, dq: &mut DataQueue) {
618        dq.start();
619
620        // Buffers
621        push_initial_events(elem, dq);
622        assert_eq!(dq.cur_level_buffers(), 0);
623        assert_eq!(dq.cur_level_bytes(), 0);
624        assert_eq!(dq.cur_level_time(), 0.seconds());
625
626        push_buffer(elem, dq, 0.seconds()).unwrap();
627        push_buffer(elem, dq, 1.seconds()).unwrap();
628        assert_eq!(dq.cur_level_buffers(), 2);
629        assert_eq!(dq.cur_level_bytes(), 2);
630        assert_eq!(dq.cur_level_time(), 1.seconds());
631        let rejected_buf = match push_buffer(elem, dq, 2.seconds()).unwrap_err() {
632            DataQueueItem::Buffer(buf) => buf,
633            other => panic!("Unexpected {other:?}"),
634        };
635        assert_eq!(rejected_buf.pts(), Some(2.seconds()));
636        assert_eq!(dq.cur_level_buffers(), 2);
637        assert_eq!(dq.cur_level_bytes(), 2);
638        assert_eq!(dq.cur_level_time(), 1.seconds());
639        push_segment_done(elem, dq);
640
641        pop_intial_events(dq);
642        let buf = pop_buffer(dq);
643        assert_eq!(buf.pts(), Some(0.seconds()));
644        assert_eq!(dq.cur_level_buffers(), 1);
645        assert_eq!(dq.cur_level_bytes(), 1);
646        assert_eq!(dq.cur_level_time(), 0.seconds());
647        let buf = pop_buffer(dq);
648        assert_eq!(buf.pts(), Some(1.seconds()));
649        assert_eq!(dq.cur_level_buffers(), 0);
650        assert_eq!(dq.cur_level_bytes(), 0);
651        assert_eq!(dq.cur_level_time(), 0.seconds());
652        pop_segment_done(dq);
653        assert!(dq.is_empty());
654
655        // Buffer list
656        push_initial_events(elem, dq);
657        assert_eq!(dq.cur_level_buffers(), 0);
658        assert_eq!(dq.cur_level_bytes(), 0);
659        assert_eq!(dq.cur_level_time(), 0.seconds());
660
661        push_buffer_list(elem, dq, 0.seconds()).unwrap();
662        assert_eq!(dq.cur_level_buffers(), 2);
663        assert_eq!(dq.cur_level_bytes(), 2);
664        assert_eq!(dq.cur_level_time(), 1.seconds());
665
666        let rejected_buf_list = match push_buffer_list(elem, dq, 2.seconds()).unwrap_err() {
667            DataQueueItem::BufferList(buf_list) => buf_list,
668            other => panic!("Unexpected {other:?}"),
669        };
670        assert_eq!(rejected_buf_list.len(), 2);
671        let buf = rejected_buf_list.get(0).unwrap();
672        assert_eq!(buf.pts(), Some(2.seconds()));
673
674        let rejected_buf = match push_buffer(elem, dq, 2.seconds()).unwrap_err() {
675            DataQueueItem::Buffer(buf) => buf,
676            other => panic!("Unexpected {other:?}"),
677        };
678        assert_eq!(rejected_buf.pts(), Some(2.seconds()));
679        assert_eq!(dq.cur_level_buffers(), 2);
680        assert_eq!(dq.cur_level_bytes(), 2);
681        assert_eq!(dq.cur_level_time(), 1.seconds());
682
683        push_segment_done(elem, dq);
684
685        pop_intial_events(dq);
686        let buf_list = pop_buffer_list(dq);
687        let buf = buf_list.get(0).unwrap();
688        assert_eq!(buf.pts(), Some(0.seconds()));
689        assert_eq!(dq.cur_level_buffers(), 0);
690        assert_eq!(dq.cur_level_bytes(), 0);
691        assert_eq!(dq.cur_level_time(), 0.seconds());
692        pop_segment_done(dq);
693        assert!(dq.is_empty());
694    }
695
696    #[test]
697    fn not_leaky_max_size_buffers() {
698        let (elem, src_pad) = init("not_leaky - max_size_buffers");
699
700        let mut dq = DataQueueBuilder::new(&elem, &src_pad)
701            .max_size_buffers(2)
702            .max_size_bytes(0)
703            .max_size_time(gst::ClockTime::ZERO)
704            .build();
705
706        test_not_leaky(&elem, &mut dq);
707    }
708
709    #[test]
710    fn not_leaky_max_size_bytes() {
711        let (elem, src_pad) = init("not_leaky - max_size_bytes");
712
713        let mut dq = DataQueueBuilder::new(&elem, &src_pad)
714            .max_size_buffers(0)
715            .max_size_bytes(2)
716            .max_size_time(gst::ClockTime::ZERO)
717            .build();
718
719        test_not_leaky(&elem, &mut dq);
720    }
721
722    #[test]
723    fn not_leaky_max_size_time() {
724        let (elem, src_pad) = init("not_leaky - max_size_time");
725
726        let mut dq = DataQueueBuilder::new(&elem, &src_pad)
727            .max_size_buffers(0)
728            .max_size_bytes(0)
729            .max_size_time(1.seconds())
730            .build();
731
732        test_not_leaky(&elem, &mut dq);
733    }
734
735    fn test_leaky_upstream(elem: &gst::Element, dq: &mut DataQueue) {
736        dq.start();
737
738        // Buffers
739        push_initial_events(elem, dq);
740
741        push_buffer(elem, dq, 0.seconds()).unwrap();
742        assert_eq!(dq.cur_level_buffers(), 1);
743        assert_eq!(dq.cur_level_bytes(), 1);
744        assert_eq!(dq.cur_level_time(), 0.seconds());
745        push_buffer(elem, dq, 1.seconds()).unwrap();
746        assert_eq!(dq.cur_level_buffers(), 2);
747        assert_eq!(dq.cur_level_bytes(), 2);
748        assert_eq!(dq.cur_level_time(), 1.seconds());
749        push_buffer(elem, dq, 2.seconds()).unwrap();
750        assert_eq!(dq.cur_level_buffers(), 2);
751        assert_eq!(dq.cur_level_bytes(), 2);
752        assert_eq!(dq.cur_level_time(), 1.seconds());
753        push_segment_done(elem, dq);
754
755        pop_intial_events(dq);
756        let buf = pop_buffer(dq);
757        assert_eq!(buf.pts(), Some(0.seconds()));
758        let buf = pop_buffer(dq);
759        assert_eq!(buf.pts(), Some(1.seconds()));
760        assert_eq!(dq.cur_level_buffers(), 0);
761        assert_eq!(dq.cur_level_bytes(), 0);
762        assert_eq!(dq.cur_level_time(), 0.seconds());
763        pop_segment_done(dq);
764        assert!(dq.is_empty());
765
766        // Buffer list
767        push_initial_events(elem, dq);
768
769        push_buffer_list(elem, dq, 0.seconds()).unwrap();
770        assert_eq!(dq.cur_level_buffers(), 2);
771        assert_eq!(dq.cur_level_bytes(), 2);
772        assert_eq!(dq.cur_level_time(), 1.seconds());
773
774        push_buffer_list(elem, dq, 2.seconds()).unwrap();
775        assert_eq!(dq.cur_level_buffers(), 2);
776        assert_eq!(dq.cur_level_bytes(), 2);
777        assert_eq!(dq.cur_level_time(), 1.seconds());
778
779        push_buffer(elem, dq, 4.seconds()).unwrap();
780        assert_eq!(dq.cur_level_buffers(), 2);
781        assert_eq!(dq.cur_level_bytes(), 2);
782        assert_eq!(dq.cur_level_time(), 1.seconds());
783
784        push_segment_done(elem, dq);
785
786        pop_intial_events(dq);
787        let buf_list = pop_buffer_list(dq);
788        let buf = buf_list.get(0).unwrap();
789        assert_eq!(buf.pts(), Some(0.seconds()));
790        assert_eq!(dq.cur_level_buffers(), 0);
791        assert_eq!(dq.cur_level_bytes(), 0);
792        assert_eq!(dq.cur_level_time(), 0.seconds());
793        pop_segment_done(dq);
794        assert!(dq.is_empty());
795    }
796
797    #[test]
798    fn leaky_upstream_max_size_buffers() {
799        let (elem, src_pad) = init("leaky_upstream - max_size_buffers");
800
801        let mut dq = DataQueueBuilder::new(&elem, &src_pad)
802            .leaky_mode(QueueLeakyMode::Upstream)
803            .max_size_buffers(2)
804            .max_size_bytes(0)
805            .max_size_time(gst::ClockTime::ZERO)
806            .build();
807
808        test_leaky_upstream(&elem, &mut dq);
809    }
810
811    #[test]
812    fn leaky_upstream_max_size_bytes() {
813        let (elem, src_pad) = init("leaky_upstream - max_size_bytes");
814
815        let mut dq = DataQueueBuilder::new(&elem, &src_pad)
816            .leaky_mode(QueueLeakyMode::Upstream)
817            .max_size_buffers(0)
818            .max_size_bytes(2)
819            .max_size_time(gst::ClockTime::ZERO)
820            .build();
821
822        test_leaky_upstream(&elem, &mut dq);
823    }
824
825    #[test]
826    fn leaky_upstream_max_size_time() {
827        let (elem, src_pad) = init("leaky_upstream - max_size_time");
828
829        let mut dq = DataQueueBuilder::new(&elem, &src_pad)
830            .leaky_mode(QueueLeakyMode::Upstream)
831            .max_size_buffers(0)
832            .max_size_bytes(0)
833            .max_size_time(1.seconds())
834            .build();
835
836        test_leaky_upstream(&elem, &mut dq);
837    }
838
839    fn test_leaky_downstream(elem: &gst::Element, dq: &mut DataQueue) {
840        dq.start();
841
842        // Buffers
843        push_initial_events(elem, dq);
844
845        push_buffer(elem, dq, 0.seconds()).unwrap();
846        assert_eq!(dq.cur_level_buffers(), 1);
847        assert_eq!(dq.cur_level_bytes(), 1);
848        assert_eq!(dq.cur_level_time(), 0.seconds());
849        push_buffer(elem, dq, 1.seconds()).unwrap();
850        assert_eq!(dq.cur_level_buffers(), 2);
851        assert_eq!(dq.cur_level_bytes(), 2);
852        assert_eq!(dq.cur_level_time(), 1.seconds());
853        push_buffer(elem, dq, 2.seconds()).unwrap();
854        assert_eq!(dq.cur_level_buffers(), 2);
855        assert_eq!(dq.cur_level_bytes(), 2);
856        assert_eq!(dq.cur_level_time(), 1.seconds());
857        push_segment_done(elem, dq);
858
859        pop_intial_events(dq);
860        let buf = pop_buffer(dq);
861        assert_eq!(buf.pts(), Some(1.seconds()));
862        let buf = pop_buffer(dq);
863        assert_eq!(buf.pts(), Some(2.seconds()));
864        assert_eq!(dq.cur_level_buffers(), 0);
865        assert_eq!(dq.cur_level_bytes(), 0);
866        assert_eq!(dq.cur_level_time(), 0.seconds());
867        pop_segment_done(dq);
868        assert!(dq.is_empty());
869
870        // Buffer list
871        push_initial_events(elem, dq);
872
873        push_buffer(elem, dq, 0.seconds()).unwrap();
874        push_buffer(elem, dq, 1.seconds()).unwrap();
875        // The next list causes previously enqueued first buffers to be dequeued
876        push_buffer_list(elem, dq, 2.seconds()).unwrap();
877        assert_eq!(dq.cur_level_buffers(), 3);
878        assert_eq!(dq.cur_level_bytes(), 3);
879        assert_eq!(dq.cur_level_time(), 2.seconds());
880        // The next list causes previously enqueued second buffer to be dequeued
881        push_buffer_list(elem, dq, 4.seconds()).unwrap();
882        assert_eq!(dq.cur_level_buffers(), 4);
883        assert_eq!(dq.cur_level_bytes(), 4);
884        assert_eq!(dq.cur_level_time(), 3.seconds());
885        // The next buffer causes previously enqueued first buffer list to be dequeued
886        push_buffer(elem, dq, 6.seconds()).unwrap();
887        assert_eq!(dq.cur_level_buffers(), 3);
888        assert_eq!(dq.cur_level_bytes(), 3);
889        assert_eq!(dq.cur_level_time(), 2.seconds());
890
891        push_segment_done(elem, dq);
892
893        pop_intial_events(dq);
894        let buf_list = pop_buffer_list(dq);
895        let buf = buf_list.get(0).unwrap();
896        assert_eq!(buf.pts(), Some(4.seconds()));
897        assert_eq!(dq.cur_level_buffers(), 1);
898        assert_eq!(dq.cur_level_bytes(), 1);
899        assert_eq!(dq.cur_level_time(), 0.seconds());
900        let buf = pop_buffer(dq);
901        assert_eq!(buf.pts(), Some(6.seconds()));
902        assert_eq!(dq.cur_level_buffers(), 0);
903        assert_eq!(dq.cur_level_bytes(), 0);
904        assert_eq!(dq.cur_level_time(), 0.seconds());
905        pop_segment_done(dq);
906        assert!(dq.is_empty());
907    }
908
909    #[test]
910    fn leaky_downstream_max_size_buffers() {
911        let (elem, src_pad) = init("leaky_downstream - max_size_buffers");
912
913        let mut dq = DataQueueBuilder::new(&elem, &src_pad)
914            .leaky_mode(QueueLeakyMode::Downstream)
915            .max_size_buffers(2)
916            .max_size_bytes(0)
917            .max_size_time(gst::ClockTime::ZERO)
918            .build();
919
920        test_leaky_downstream(&elem, &mut dq);
921    }
922
923    #[test]
924    fn leaky_downstream_max_size_bytes() {
925        let (elem, src_pad) = init("leaky_downstream - max_size_bytes");
926
927        let mut dq = DataQueueBuilder::new(&elem, &src_pad)
928            .leaky_mode(QueueLeakyMode::Downstream)
929            .max_size_buffers(0)
930            .max_size_bytes(2)
931            .max_size_time(gst::ClockTime::ZERO)
932            .build();
933
934        test_leaky_downstream(&elem, &mut dq);
935    }
936
937    #[test]
938    fn leaky_downstream_max_size_time() {
939        let (elem, src_pad) = init("leaky_downstream - max_size_time");
940
941        let mut dq = DataQueueBuilder::new(&elem, &src_pad)
942            .leaky_mode(QueueLeakyMode::Downstream)
943            .max_size_buffers(0)
944            .max_size_bytes(0)
945            .max_size_time(1.seconds())
946            .build();
947
948        test_leaky_downstream(&elem, &mut dq);
949    }
950}