gstthreadshare/
dataqueue.rs

1// Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com>
2//
3// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
4// If a copy of the MPL was not distributed with this file, You can obtain one at
5// <https://mozilla.org/MPL/2.0/>.
6//
7// SPDX-License-Identifier: MPL-2.0
8
9use futures::future::{self, abortable, AbortHandle};
10
11use gst::prelude::*;
12
13use std::sync::LazyLock;
14
15use std::collections::VecDeque;
16use std::sync::Arc;
17use std::sync::Mutex;
18
19use crate::runtime::Context;
20
21static DATA_QUEUE_CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
22    gst::DebugCategory::new(
23        "ts-dataqueue",
24        gst::DebugColorFlags::empty(),
25        Some("Thread-sharing queue"),
26    )
27});
28
29#[derive(Debug, Clone)]
30pub enum DataQueueItem {
31    Buffer(gst::Buffer),
32    BufferList(gst::BufferList),
33    Event(gst::Event),
34}
35
36impl DataQueueItem {
37    fn size(&self) -> (u32, u32) {
38        match *self {
39            DataQueueItem::Buffer(ref buffer) => (1, buffer.size() as u32),
40            DataQueueItem::BufferList(ref list) => (
41                list.len() as u32,
42                list.iter().map(|b| b.size() as u32).sum::<u32>(),
43            ),
44            DataQueueItem::Event(_) => (0, 0),
45        }
46    }
47
48    fn timestamp(&self) -> Option<gst::ClockTime> {
49        match *self {
50            DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts(),
51            DataQueueItem::BufferList(ref list) => list.iter().find_map(|b| b.dts_or_pts()),
52            DataQueueItem::Event(_) => None,
53        }
54    }
55}
56
57#[derive(Clone, Copy, Debug, PartialEq, Eq)]
58pub enum DataQueueState {
59    Started,
60    Stopped,
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
64pub enum PushedStatus {
65    FirstBuffer,
66    GotBuffers,
67    #[default]
68    PendingBuffers,
69}
70
71impl PushedStatus {
72    pub fn is_first_buffer(self) -> bool {
73        matches!(self, PushedStatus::FirstBuffer)
74    }
75}
76
77#[derive(Clone, Debug)]
78pub struct DataQueue(Arc<Mutex<DataQueueInner>>);
79
80#[derive(Debug)]
81struct DataQueueInner {
82    element: gst::Element,
83    upstream_ctx: Option<Context>,
84    src_pad: gst::Pad,
85
86    state: DataQueueState,
87    queue: VecDeque<DataQueueItem>,
88
89    pushed_status: PushedStatus,
90
91    cur_level_buffers: u32,
92    cur_level_bytes: u32,
93    cur_level_time: gst::ClockTime,
94    max_size_buffers: Option<u32>,
95    max_size_bytes: Option<u32>,
96    max_size_time: Option<gst::ClockTime>,
97
98    pending_handle: Option<AbortHandle>,
99}
100
101impl DataQueueInner {
102    fn wake(&mut self) {
103        if let Some(pending_handle) = self.pending_handle.take() {
104            pending_handle.abort();
105        }
106    }
107}
108
109impl DataQueue {
110    pub fn builder(element: &gst::Element, src_pad: &gst::Pad) -> DataQueueBuilder {
111        DataQueueBuilder::new(element, src_pad)
112    }
113
114    pub fn state(&self) -> DataQueueState {
115        self.0.lock().unwrap().state
116    }
117
118    pub fn upstream_context(&self) -> Option<Context> {
119        self.0.lock().unwrap().upstream_ctx.clone()
120    }
121
122    pub fn cur_level_buffers(&self) -> u32 {
123        self.0.lock().unwrap().cur_level_buffers
124    }
125
126    pub fn cur_level_bytes(&self) -> u32 {
127        self.0.lock().unwrap().cur_level_bytes
128    }
129
130    pub fn cur_level_time(&self) -> gst::ClockTime {
131        self.0.lock().unwrap().cur_level_time
132    }
133
134    pub fn start(&self) {
135        let mut inner = self.0.lock().unwrap();
136        if inner.state == DataQueueState::Started {
137            gst::debug!(
138                DATA_QUEUE_CAT,
139                obj = inner.element,
140                "Data queue already Started"
141            );
142            return;
143        }
144        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Starting data queue");
145        inner.state = DataQueueState::Started;
146        inner.wake();
147    }
148
149    pub fn stop(&self) {
150        let mut inner = self.0.lock().unwrap();
151        if inner.state == DataQueueState::Stopped {
152            gst::debug!(
153                DATA_QUEUE_CAT,
154                obj = inner.element,
155                "Data queue already Stopped"
156            );
157            return;
158        }
159        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Stopping data queue");
160        inner.state = DataQueueState::Stopped;
161        inner.wake();
162    }
163
164    pub fn clear(&self) {
165        let mut inner = self.0.lock().unwrap();
166
167        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Clearing data queue");
168
169        let src_pad = inner.src_pad.clone();
170        for item in inner.queue.drain(..) {
171            if let DataQueueItem::Event(event) = item {
172                if event.is_sticky()
173                    && event.type_() != gst::EventType::Segment
174                    && event.type_() != gst::EventType::Eos
175                {
176                    let _ = src_pad.store_sticky_event(&event);
177                }
178            }
179        }
180
181        inner.pushed_status = PushedStatus::default();
182
183        inner.cur_level_buffers = 0;
184        inner.cur_level_bytes = 0;
185        inner.cur_level_time = gst::ClockTime::ZERO;
186
187        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue cleared");
188    }
189
190    pub fn push(
191        &self,
192        obj: &gst::glib::Object,
193        item: DataQueueItem,
194    ) -> Result<PushedStatus, DataQueueItem> {
195        let mut inner = self.0.lock().unwrap();
196
197        if inner.state == DataQueueState::Stopped {
198            gst::debug!(
199                DATA_QUEUE_CAT,
200                obj = obj,
201                "Rejecting item {item:?} in state {:?}",
202                inner.state
203            );
204            return Err(item);
205        }
206
207        gst::debug!(DATA_QUEUE_CAT, obj = obj, "Pushing item {item:?}");
208
209        let (count, bytes) = item.size();
210        let queue_ts = inner.queue.iter().find_map(|i| i.timestamp());
211        let ts = item.timestamp();
212
213        if let Some(max) = inner.max_size_buffers {
214            if max <= inner.cur_level_buffers {
215                gst::debug!(
216                    DATA_QUEUE_CAT,
217                    obj = obj,
218                    "Queue is full (buffers): {max} <= {}",
219                    inner.cur_level_buffers
220                );
221                return Err(item);
222            }
223        }
224
225        if let Some(max) = inner.max_size_bytes {
226            if max <= inner.cur_level_bytes {
227                gst::debug!(
228                    DATA_QUEUE_CAT,
229                    obj = obj,
230                    "Queue is full (bytes): {max} <= {}",
231                    inner.cur_level_bytes
232                );
233                return Err(item);
234            }
235        }
236
237        // FIXME: Use running time
238        let level = if let (Some(queue_ts), Some(ts)) = (queue_ts, ts) {
239            let level = if queue_ts > ts {
240                queue_ts - ts
241            } else {
242                ts - queue_ts
243            };
244
245            if inner.max_size_time.opt_le(level).unwrap_or(false) {
246                gst::debug!(
247                    DATA_QUEUE_CAT,
248                    obj = obj,
249                    "Queue is full (time): {} <= {level}",
250                    inner.max_size_time.display(),
251                );
252                return Err(item);
253            }
254
255            level
256        } else {
257            gst::ClockTime::ZERO
258        };
259
260        use PushedStatus::*;
261        match inner.pushed_status {
262            GotBuffers => (),
263            FirstBuffer => inner.pushed_status = GotBuffers,
264            PendingBuffers => {
265                if let DataQueueItem::Buffer(_) | DataQueueItem::BufferList(_) = item {
266                    inner.pushed_status = PushedStatus::FirstBuffer;
267                    inner.upstream_ctx = Context::current();
268                }
269            }
270        }
271
272        inner.queue.push_back(item);
273        inner.cur_level_buffers += count;
274        inner.cur_level_bytes += bytes;
275        inner.cur_level_time = level;
276
277        inner.wake();
278
279        Ok(inner.pushed_status)
280    }
281
282    // TODO: implement as a Stream now that we use a StdMutex
283    #[allow(clippy::should_implement_trait)]
284    pub async fn next(&mut self) -> Option<DataQueueItem> {
285        loop {
286            let pending_fut = {
287                let mut inner = self.0.lock().unwrap();
288                match inner.state {
289                    DataQueueState::Started => match inner.queue.pop_front() {
290                        None => {
291                            gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue is empty");
292                        }
293                        Some(item) => {
294                            gst::debug!(
295                                DATA_QUEUE_CAT,
296                                obj = inner.element,
297                                "Popped item {:?}",
298                                item
299                            );
300
301                            let (count, bytes) = item.size();
302                            inner.cur_level_buffers -= count;
303                            inner.cur_level_bytes -= bytes;
304
305                            return Some(item);
306                        }
307                    },
308                    DataQueueState::Stopped => {
309                        gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue Stopped");
310                        return None;
311                    }
312                }
313
314                let (pending_fut, abort_handle) = abortable(future::pending::<()>());
315                inner.pending_handle = Some(abort_handle);
316
317                pending_fut
318            };
319
320            let _ = pending_fut.await;
321        }
322    }
323}
324
325#[derive(Debug)]
326pub struct DataQueueBuilder(DataQueueInner);
327
328impl DataQueueBuilder {
329    fn new(element: &gst::Element, src_pad: &gst::Pad) -> DataQueueBuilder {
330        DataQueueBuilder(DataQueueInner {
331            element: element.clone(),
332            upstream_ctx: None,
333            src_pad: src_pad.clone(),
334            state: DataQueueState::Stopped,
335            queue: VecDeque::new(),
336            pushed_status: PushedStatus::default(),
337            cur_level_buffers: 0,
338            cur_level_bytes: 0,
339            cur_level_time: gst::ClockTime::ZERO,
340            max_size_buffers: None,
341            max_size_bytes: None,
342            max_size_time: None,
343            pending_handle: None,
344        })
345    }
346
347    pub fn max_size_buffers(mut self, max_size_buffers: u32) -> Self {
348        self.0.max_size_buffers = (max_size_buffers > 0).then_some(max_size_buffers);
349        self
350    }
351
352    pub fn max_size_bytes(mut self, max_size_bytes: u32) -> Self {
353        self.0.max_size_bytes = (max_size_bytes > 0).then_some(max_size_bytes);
354        self
355    }
356
357    pub fn max_size_time(mut self, max_size_time: gst::ClockTime) -> Self {
358        self.0.max_size_time = (!max_size_time.is_zero()).then_some(max_size_time);
359        self
360    }
361
362    pub fn build(self) -> DataQueue {
363        DataQueue(Arc::new(Mutex::new(self.0)))
364    }
365}