1use futures::future::{self, abortable, AbortHandle};
10
11use gst::prelude::*;
12
13use std::sync::LazyLock;
14
15use std::collections::VecDeque;
16use std::sync::Arc;
17use std::sync::Mutex;
18
19use crate::runtime::Context;
20
21static DATA_QUEUE_CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
22 gst::DebugCategory::new(
23 "ts-dataqueue",
24 gst::DebugColorFlags::empty(),
25 Some("Thread-sharing queue"),
26 )
27});
28
29#[derive(Debug, Clone)]
30pub enum DataQueueItem {
31 Buffer(gst::Buffer),
32 BufferList(gst::BufferList),
33 Event(gst::Event),
34}
35
36impl DataQueueItem {
37 fn size(&self) -> (u32, u32) {
38 match *self {
39 DataQueueItem::Buffer(ref buffer) => (1, buffer.size() as u32),
40 DataQueueItem::BufferList(ref list) => (
41 list.len() as u32,
42 list.iter().map(|b| b.size() as u32).sum::<u32>(),
43 ),
44 DataQueueItem::Event(_) => (0, 0),
45 }
46 }
47
48 fn timestamp(&self) -> Option<gst::ClockTime> {
49 match *self {
50 DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts(),
51 DataQueueItem::BufferList(ref list) => list.iter().find_map(|b| b.dts_or_pts()),
52 DataQueueItem::Event(_) => None,
53 }
54 }
55}
56
57#[derive(Clone, Copy, Debug, PartialEq, Eq)]
58pub enum DataQueueState {
59 Started,
60 Stopped,
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
64pub enum PushedStatus {
65 FirstBuffer,
66 GotBuffers,
67 #[default]
68 PendingBuffers,
69}
70
71impl PushedStatus {
72 pub fn is_first_buffer(self) -> bool {
73 matches!(self, PushedStatus::FirstBuffer)
74 }
75}
76
77#[derive(Clone, Debug)]
78pub struct DataQueue(Arc<Mutex<DataQueueInner>>);
79
80#[derive(Debug)]
81struct DataQueueInner {
82 element: gst::Element,
83 upstream_ctx: Option<Context>,
84 src_pad: gst::Pad,
85
86 state: DataQueueState,
87 queue: VecDeque<DataQueueItem>,
88
89 pushed_status: PushedStatus,
90
91 cur_level_buffers: u32,
92 cur_level_bytes: u32,
93 cur_level_time: gst::ClockTime,
94 max_size_buffers: Option<u32>,
95 max_size_bytes: Option<u32>,
96 max_size_time: Option<gst::ClockTime>,
97
98 pending_handle: Option<AbortHandle>,
99}
100
101impl DataQueueInner {
102 fn wake(&mut self) {
103 if let Some(pending_handle) = self.pending_handle.take() {
104 pending_handle.abort();
105 }
106 }
107}
108
109impl DataQueue {
110 pub fn builder(element: &gst::Element, src_pad: &gst::Pad) -> DataQueueBuilder {
111 DataQueueBuilder::new(element, src_pad)
112 }
113
114 pub fn state(&self) -> DataQueueState {
115 self.0.lock().unwrap().state
116 }
117
118 pub fn upstream_context(&self) -> Option<Context> {
119 self.0.lock().unwrap().upstream_ctx.clone()
120 }
121
122 pub fn cur_level_buffers(&self) -> u32 {
123 self.0.lock().unwrap().cur_level_buffers
124 }
125
126 pub fn cur_level_bytes(&self) -> u32 {
127 self.0.lock().unwrap().cur_level_bytes
128 }
129
130 pub fn cur_level_time(&self) -> gst::ClockTime {
131 self.0.lock().unwrap().cur_level_time
132 }
133
134 pub fn start(&self) {
135 let mut inner = self.0.lock().unwrap();
136 if inner.state == DataQueueState::Started {
137 gst::debug!(
138 DATA_QUEUE_CAT,
139 obj = inner.element,
140 "Data queue already Started"
141 );
142 return;
143 }
144 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Starting data queue");
145 inner.state = DataQueueState::Started;
146 inner.wake();
147 }
148
149 pub fn stop(&self) {
150 let mut inner = self.0.lock().unwrap();
151 if inner.state == DataQueueState::Stopped {
152 gst::debug!(
153 DATA_QUEUE_CAT,
154 obj = inner.element,
155 "Data queue already Stopped"
156 );
157 return;
158 }
159 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Stopping data queue");
160 inner.state = DataQueueState::Stopped;
161 inner.wake();
162 }
163
164 pub fn clear(&self) {
165 let mut inner = self.0.lock().unwrap();
166
167 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Clearing data queue");
168
169 let src_pad = inner.src_pad.clone();
170 for item in inner.queue.drain(..) {
171 if let DataQueueItem::Event(event) = item {
172 if event.is_sticky()
173 && event.type_() != gst::EventType::Segment
174 && event.type_() != gst::EventType::Eos
175 {
176 let _ = src_pad.store_sticky_event(&event);
177 }
178 }
179 }
180
181 inner.pushed_status = PushedStatus::default();
182
183 inner.cur_level_buffers = 0;
184 inner.cur_level_bytes = 0;
185 inner.cur_level_time = gst::ClockTime::ZERO;
186
187 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue cleared");
188 }
189
190 pub fn push(
191 &self,
192 obj: &gst::glib::Object,
193 item: DataQueueItem,
194 ) -> Result<PushedStatus, DataQueueItem> {
195 let mut inner = self.0.lock().unwrap();
196
197 if inner.state == DataQueueState::Stopped {
198 gst::debug!(
199 DATA_QUEUE_CAT,
200 obj = obj,
201 "Rejecting item {item:?} in state {:?}",
202 inner.state
203 );
204 return Err(item);
205 }
206
207 gst::debug!(DATA_QUEUE_CAT, obj = obj, "Pushing item {item:?}");
208
209 let (count, bytes) = item.size();
210 let queue_ts = inner.queue.iter().find_map(|i| i.timestamp());
211 let ts = item.timestamp();
212
213 if let Some(max) = inner.max_size_buffers {
214 if max <= inner.cur_level_buffers {
215 gst::debug!(
216 DATA_QUEUE_CAT,
217 obj = obj,
218 "Queue is full (buffers): {max} <= {}",
219 inner.cur_level_buffers
220 );
221 return Err(item);
222 }
223 }
224
225 if let Some(max) = inner.max_size_bytes {
226 if max <= inner.cur_level_bytes {
227 gst::debug!(
228 DATA_QUEUE_CAT,
229 obj = obj,
230 "Queue is full (bytes): {max} <= {}",
231 inner.cur_level_bytes
232 );
233 return Err(item);
234 }
235 }
236
237 let level = if let (Some(queue_ts), Some(ts)) = (queue_ts, ts) {
239 let level = if queue_ts > ts {
240 queue_ts - ts
241 } else {
242 ts - queue_ts
243 };
244
245 if inner.max_size_time.opt_le(level).unwrap_or(false) {
246 gst::debug!(
247 DATA_QUEUE_CAT,
248 obj = obj,
249 "Queue is full (time): {} <= {level}",
250 inner.max_size_time.display(),
251 );
252 return Err(item);
253 }
254
255 level
256 } else {
257 gst::ClockTime::ZERO
258 };
259
260 use PushedStatus::*;
261 match inner.pushed_status {
262 GotBuffers => (),
263 FirstBuffer => inner.pushed_status = GotBuffers,
264 PendingBuffers => {
265 if let DataQueueItem::Buffer(_) | DataQueueItem::BufferList(_) = item {
266 inner.pushed_status = PushedStatus::FirstBuffer;
267 inner.upstream_ctx = Context::current();
268 }
269 }
270 }
271
272 inner.queue.push_back(item);
273 inner.cur_level_buffers += count;
274 inner.cur_level_bytes += bytes;
275 inner.cur_level_time = level;
276
277 inner.wake();
278
279 Ok(inner.pushed_status)
280 }
281
282 #[allow(clippy::should_implement_trait)]
284 pub async fn next(&mut self) -> Option<DataQueueItem> {
285 loop {
286 let pending_fut = {
287 let mut inner = self.0.lock().unwrap();
288 match inner.state {
289 DataQueueState::Started => match inner.queue.pop_front() {
290 None => {
291 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue is empty");
292 }
293 Some(item) => {
294 gst::debug!(
295 DATA_QUEUE_CAT,
296 obj = inner.element,
297 "Popped item {:?}",
298 item
299 );
300
301 let (count, bytes) = item.size();
302 inner.cur_level_buffers -= count;
303 inner.cur_level_bytes -= bytes;
304
305 return Some(item);
306 }
307 },
308 DataQueueState::Stopped => {
309 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue Stopped");
310 return None;
311 }
312 }
313
314 let (pending_fut, abort_handle) = abortable(future::pending::<()>());
315 inner.pending_handle = Some(abort_handle);
316
317 pending_fut
318 };
319
320 let _ = pending_fut.await;
321 }
322 }
323}
324
325#[derive(Debug)]
326pub struct DataQueueBuilder(DataQueueInner);
327
328impl DataQueueBuilder {
329 fn new(element: &gst::Element, src_pad: &gst::Pad) -> DataQueueBuilder {
330 DataQueueBuilder(DataQueueInner {
331 element: element.clone(),
332 upstream_ctx: None,
333 src_pad: src_pad.clone(),
334 state: DataQueueState::Stopped,
335 queue: VecDeque::new(),
336 pushed_status: PushedStatus::default(),
337 cur_level_buffers: 0,
338 cur_level_bytes: 0,
339 cur_level_time: gst::ClockTime::ZERO,
340 max_size_buffers: None,
341 max_size_bytes: None,
342 max_size_time: None,
343 pending_handle: None,
344 })
345 }
346
347 pub fn max_size_buffers(mut self, max_size_buffers: u32) -> Self {
348 self.0.max_size_buffers = (max_size_buffers > 0).then_some(max_size_buffers);
349 self
350 }
351
352 pub fn max_size_bytes(mut self, max_size_bytes: u32) -> Self {
353 self.0.max_size_bytes = (max_size_bytes > 0).then_some(max_size_bytes);
354 self
355 }
356
357 pub fn max_size_time(mut self, max_size_time: gst::ClockTime) -> Self {
358 self.0.max_size_time = (!max_size_time.is_zero()).then_some(max_size_time);
359 self
360 }
361
362 pub fn build(self) -> DataQueue {
363 DataQueue(Arc::new(Mutex::new(self.0)))
364 }
365}