Skip to main content

gstthreadshare/
dataqueue.rs

1// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
4// If a copy of the MPL was not distributed with this file, You can obtain one at
5// <https://mozilla.org/MPL/2.0/>.
6//
7// SPDX-License-Identifier: MPL-2.0
8
9use futures::future::{self, AbortHandle, abortable};
10
11use gst::prelude::*;
12
13use std::sync::LazyLock;
14
15use std::collections::VecDeque;
16use std::sync::Arc;
17use std::sync::Mutex;
18
19use crate::runtime::Context;
20
21static DATA_QUEUE_CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
22    gst::DebugCategory::new(
23        "ts-dataqueue",
24        gst::DebugColorFlags::empty(),
25        Some("Thread-sharing queue"),
26    )
27});
28
29#[derive(Debug, Clone)]
30pub enum DataQueueItem {
31    Buffer(gst::Buffer),
32    BufferList(gst::BufferList),
33    Event(gst::Event),
34}
35
36impl DataQueueItem {
37    fn size(&self) -> (u32, u32) {
38        match *self {
39            DataQueueItem::Buffer(ref buffer) => (1, buffer.size() as u32),
40            DataQueueItem::BufferList(ref list) => (
41                list.len() as u32,
42                list.iter().map(|b| b.size() as u32).sum::<u32>(),
43            ),
44            DataQueueItem::Event(_) => (0, 0),
45        }
46    }
47
48    fn timestamp(&self) -> Option<gst::ClockTime> {
49        match *self {
50            DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts(),
51            DataQueueItem::BufferList(ref list) => list.iter().find_map(|b| b.dts_or_pts()),
52            DataQueueItem::Event(_) => None,
53        }
54    }
55}
56
57#[derive(Clone, Copy, Debug, PartialEq, Eq)]
58pub enum DataQueueState {
59    Started,
60    Stopped,
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
64pub enum PushedStatus {
65    FirstBuffer,
66    GotBuffers,
67    #[default]
68    PendingBuffers,
69}
70
71impl PushedStatus {
72    pub fn is_first_buffer(self) -> bool {
73        matches!(self, PushedStatus::FirstBuffer)
74    }
75}
76
77#[derive(Clone, Debug)]
78pub struct DataQueue(Arc<Mutex<DataQueueInner>>);
79
80#[derive(Debug)]
81struct DataQueueInner {
82    element: gst::Element,
83    upstream_ctx: Option<Context>,
84    src_pad: gst::Pad,
85
86    state: DataQueueState,
87    queue: VecDeque<DataQueueItem>,
88
89    pushed_status: PushedStatus,
90
91    cur_level_buffers: u32,
92    cur_level_bytes: u32,
93    cur_level_time: gst::ClockTime,
94    max_size_buffers: Option<u32>,
95    max_size_bytes: Option<u32>,
96    max_size_time: Option<gst::ClockTime>,
97
98    pending_handle: Option<AbortHandle>,
99}
100
101impl DataQueueInner {
102    fn wake(&mut self) {
103        if let Some(pending_handle) = self.pending_handle.take() {
104            pending_handle.abort();
105        }
106    }
107}
108
109impl DataQueue {
110    pub fn builder(element: &gst::Element, src_pad: &gst::Pad) -> DataQueueBuilder {
111        DataQueueBuilder::new(element, src_pad)
112    }
113
114    pub fn state(&self) -> DataQueueState {
115        self.0.lock().unwrap().state
116    }
117
118    pub fn upstream_context(&self) -> Option<Context> {
119        self.0.lock().unwrap().upstream_ctx.clone()
120    }
121
122    pub fn cur_level_buffers(&self) -> u32 {
123        self.0.lock().unwrap().cur_level_buffers
124    }
125
126    pub fn cur_level_bytes(&self) -> u32 {
127        self.0.lock().unwrap().cur_level_bytes
128    }
129
130    pub fn cur_level_time(&self) -> gst::ClockTime {
131        self.0.lock().unwrap().cur_level_time
132    }
133
134    pub fn start(&self) {
135        let mut inner = self.0.lock().unwrap();
136        if inner.state == DataQueueState::Started {
137            gst::debug!(
138                DATA_QUEUE_CAT,
139                obj = inner.element,
140                "Data queue already Started"
141            );
142            return;
143        }
144        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Starting data queue");
145        inner.state = DataQueueState::Started;
146        inner.wake();
147    }
148
149    pub fn stop(&self) {
150        let mut inner = self.0.lock().unwrap();
151        if inner.state == DataQueueState::Stopped {
152            gst::debug!(
153                DATA_QUEUE_CAT,
154                obj = inner.element,
155                "Data queue already Stopped"
156            );
157            return;
158        }
159        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Stopping data queue");
160        inner.state = DataQueueState::Stopped;
161        inner.wake();
162    }
163
164    pub fn clear(&self) {
165        let mut inner = self.0.lock().unwrap();
166
167        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Clearing data queue");
168
169        let src_pad = inner.src_pad.clone();
170        for item in inner.queue.drain(..) {
171            if let DataQueueItem::Event(event) = item
172                && event.is_sticky()
173                && event.type_() != gst::EventType::Segment
174                && event.type_() != gst::EventType::Eos
175            {
176                let _ = src_pad.store_sticky_event(&event);
177            }
178        }
179
180        inner.pushed_status = PushedStatus::default();
181
182        inner.cur_level_buffers = 0;
183        inner.cur_level_bytes = 0;
184        inner.cur_level_time = gst::ClockTime::ZERO;
185
186        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue cleared");
187    }
188
189    pub fn push(
190        &self,
191        obj: &gst::glib::Object,
192        item: DataQueueItem,
193    ) -> Result<PushedStatus, DataQueueItem> {
194        let mut inner = self.0.lock().unwrap();
195
196        if inner.state == DataQueueState::Stopped {
197            gst::debug!(
198                DATA_QUEUE_CAT,
199                obj = obj,
200                "Rejecting item {item:?} in state {:?}",
201                inner.state
202            );
203            return Err(item);
204        }
205
206        gst::debug!(DATA_QUEUE_CAT, obj = obj, "Pushing item {item:?}");
207
208        let (count, bytes) = item.size();
209        let queue_ts = inner.queue.iter().find_map(|i| i.timestamp());
210        let ts = item.timestamp();
211
212        if let Some(max) = inner.max_size_buffers
213            && max <= inner.cur_level_buffers
214        {
215            gst::debug!(
216                DATA_QUEUE_CAT,
217                obj = obj,
218                "Queue is full (buffers): {max} <= {}",
219                inner.cur_level_buffers
220            );
221            return Err(item);
222        }
223
224        if let Some(max) = inner.max_size_bytes
225            && max <= inner.cur_level_bytes
226        {
227            gst::debug!(
228                DATA_QUEUE_CAT,
229                obj = obj,
230                "Queue is full (bytes): {max} <= {}",
231                inner.cur_level_bytes
232            );
233            return Err(item);
234        }
235
236        // FIXME: Use running time
237        let level = if let (Some(queue_ts), Some(ts)) = (queue_ts, ts) {
238            let level = if queue_ts > ts {
239                queue_ts - ts
240            } else {
241                ts - queue_ts
242            };
243
244            if inner.max_size_time.opt_le(level).unwrap_or(false) {
245                gst::debug!(
246                    DATA_QUEUE_CAT,
247                    obj = obj,
248                    "Queue is full (time): {} <= {level}",
249                    inner.max_size_time.display(),
250                );
251                return Err(item);
252            }
253
254            level
255        } else {
256            gst::ClockTime::ZERO
257        };
258
259        use PushedStatus::*;
260        match inner.pushed_status {
261            GotBuffers => (),
262            FirstBuffer => inner.pushed_status = GotBuffers,
263            PendingBuffers => {
264                if let DataQueueItem::Buffer(_) | DataQueueItem::BufferList(_) = item {
265                    inner.pushed_status = PushedStatus::FirstBuffer;
266                    inner.upstream_ctx = Context::current();
267                }
268            }
269        }
270
271        inner.queue.push_back(item);
272        inner.cur_level_buffers += count;
273        inner.cur_level_bytes += bytes;
274        inner.cur_level_time = level;
275
276        inner.wake();
277
278        Ok(inner.pushed_status)
279    }
280
281    // TODO: implement as a Stream now that we use a StdMutex
282    #[allow(clippy::should_implement_trait)]
283    pub async fn next(&mut self) -> Option<DataQueueItem> {
284        loop {
285            let pending_fut = {
286                let mut inner = self.0.lock().unwrap();
287                match inner.state {
288                    DataQueueState::Started => match inner.queue.pop_front() {
289                        None => {
290                            gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue is empty");
291                        }
292                        Some(item) => {
293                            gst::debug!(
294                                DATA_QUEUE_CAT,
295                                obj = inner.element,
296                                "Popped item {:?}",
297                                item
298                            );
299
300                            let (count, bytes) = item.size();
301                            inner.cur_level_buffers -= count;
302                            inner.cur_level_bytes -= bytes;
303
304                            return Some(item);
305                        }
306                    },
307                    DataQueueState::Stopped => {
308                        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue Stopped");
309                        return None;
310                    }
311                }
312
313                let (pending_fut, abort_handle) = abortable(future::pending::<()>());
314                inner.pending_handle = Some(abort_handle);
315
316                pending_fut
317            };
318
319            let _ = pending_fut.await;
320        }
321    }
322}
323
324#[derive(Debug)]
325pub struct DataQueueBuilder(DataQueueInner);
326
327impl DataQueueBuilder {
328    fn new(element: &gst::Element, src_pad: &gst::Pad) -> DataQueueBuilder {
329        DataQueueBuilder(DataQueueInner {
330            element: element.clone(),
331            upstream_ctx: None,
332            src_pad: src_pad.clone(),
333            state: DataQueueState::Stopped,
334            queue: VecDeque::new(),
335            pushed_status: PushedStatus::default(),
336            cur_level_buffers: 0,
337            cur_level_bytes: 0,
338            cur_level_time: gst::ClockTime::ZERO,
339            max_size_buffers: None,
340            max_size_bytes: None,
341            max_size_time: None,
342            pending_handle: None,
343        })
344    }
345
346    pub fn max_size_buffers(mut self, max_size_buffers: u32) -> Self {
347        self.0.max_size_buffers = (max_size_buffers > 0).then_some(max_size_buffers);
348        self
349    }
350
351    pub fn max_size_bytes(mut self, max_size_bytes: u32) -> Self {
352        self.0.max_size_bytes = (max_size_bytes > 0).then_some(max_size_bytes);
353        self
354    }
355
356    pub fn max_size_time(mut self, max_size_time: gst::ClockTime) -> Self {
357        self.0.max_size_time = (!max_size_time.is_zero()).then_some(max_size_time);
358        self
359    }
360
361    pub fn build(self) -> DataQueue {
362        DataQueue(Arc::new(Mutex::new(self.0)))
363    }
364}