1use futures::future::{self, AbortHandle, abortable};
10
11use gst::prelude::*;
12
13use std::sync::LazyLock;
14
15use std::collections::VecDeque;
16use std::sync::Arc;
17use std::sync::Mutex;
18
19use crate::runtime::Context;
20
21static DATA_QUEUE_CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
22 gst::DebugCategory::new(
23 "ts-dataqueue",
24 gst::DebugColorFlags::empty(),
25 Some("Thread-sharing queue"),
26 )
27});
28
29#[derive(Debug, Clone)]
30pub enum DataQueueItem {
31 Buffer(gst::Buffer),
32 BufferList(gst::BufferList),
33 Event(gst::Event),
34}
35
36impl DataQueueItem {
37 fn size(&self) -> (u32, u32) {
38 match *self {
39 DataQueueItem::Buffer(ref buffer) => (1, buffer.size() as u32),
40 DataQueueItem::BufferList(ref list) => (
41 list.len() as u32,
42 list.iter().map(|b| b.size() as u32).sum::<u32>(),
43 ),
44 DataQueueItem::Event(_) => (0, 0),
45 }
46 }
47
48 fn timestamp(&self) -> Option<gst::ClockTime> {
49 match *self {
50 DataQueueItem::Buffer(ref buffer) => buffer.dts_or_pts(),
51 DataQueueItem::BufferList(ref list) => list.iter().find_map(|b| b.dts_or_pts()),
52 DataQueueItem::Event(_) => None,
53 }
54 }
55}
56
57#[derive(Clone, Copy, Debug, PartialEq, Eq)]
58pub enum DataQueueState {
59 Started,
60 Stopped,
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
64pub enum PushedStatus {
65 FirstBuffer,
66 GotBuffers,
67 #[default]
68 PendingBuffers,
69}
70
71impl PushedStatus {
72 pub fn is_first_buffer(self) -> bool {
73 matches!(self, PushedStatus::FirstBuffer)
74 }
75}
76
77#[derive(Clone, Debug)]
78pub struct DataQueue(Arc<Mutex<DataQueueInner>>);
79
80#[derive(Debug)]
81struct DataQueueInner {
82 element: gst::Element,
83 upstream_ctx: Option<Context>,
84 src_pad: gst::Pad,
85
86 state: DataQueueState,
87 queue: VecDeque<DataQueueItem>,
88
89 pushed_status: PushedStatus,
90
91 cur_level_buffers: u32,
92 cur_level_bytes: u32,
93 cur_level_time: gst::ClockTime,
94 max_size_buffers: Option<u32>,
95 max_size_bytes: Option<u32>,
96 max_size_time: Option<gst::ClockTime>,
97
98 pending_handle: Option<AbortHandle>,
99}
100
101impl DataQueueInner {
102 fn wake(&mut self) {
103 if let Some(pending_handle) = self.pending_handle.take() {
104 pending_handle.abort();
105 }
106 }
107}
108
109impl DataQueue {
110 pub fn builder(element: &gst::Element, src_pad: &gst::Pad) -> DataQueueBuilder {
111 DataQueueBuilder::new(element, src_pad)
112 }
113
114 pub fn state(&self) -> DataQueueState {
115 self.0.lock().unwrap().state
116 }
117
118 pub fn upstream_context(&self) -> Option<Context> {
119 self.0.lock().unwrap().upstream_ctx.clone()
120 }
121
122 pub fn cur_level_buffers(&self) -> u32 {
123 self.0.lock().unwrap().cur_level_buffers
124 }
125
126 pub fn cur_level_bytes(&self) -> u32 {
127 self.0.lock().unwrap().cur_level_bytes
128 }
129
130 pub fn cur_level_time(&self) -> gst::ClockTime {
131 self.0.lock().unwrap().cur_level_time
132 }
133
134 pub fn start(&self) {
135 let mut inner = self.0.lock().unwrap();
136 if inner.state == DataQueueState::Started {
137 gst::debug!(
138 DATA_QUEUE_CAT,
139 obj = inner.element,
140 "Data queue already Started"
141 );
142 return;
143 }
144 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Starting data queue");
145 inner.state = DataQueueState::Started;
146 inner.wake();
147 }
148
149 pub fn stop(&self) {
150 let mut inner = self.0.lock().unwrap();
151 if inner.state == DataQueueState::Stopped {
152 gst::debug!(
153 DATA_QUEUE_CAT,
154 obj = inner.element,
155 "Data queue already Stopped"
156 );
157 return;
158 }
159 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Stopping data queue");
160 inner.state = DataQueueState::Stopped;
161 inner.wake();
162 }
163
164 pub fn clear(&self) {
165 let mut inner = self.0.lock().unwrap();
166
167 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Clearing data queue");
168
169 let src_pad = inner.src_pad.clone();
170 for item in inner.queue.drain(..) {
171 if let DataQueueItem::Event(event) = item
172 && event.is_sticky()
173 && event.type_() != gst::EventType::Segment
174 && event.type_() != gst::EventType::Eos
175 {
176 let _ = src_pad.store_sticky_event(&event);
177 }
178 }
179
180 inner.pushed_status = PushedStatus::default();
181
182 inner.cur_level_buffers = 0;
183 inner.cur_level_bytes = 0;
184 inner.cur_level_time = gst::ClockTime::ZERO;
185
186 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue cleared");
187 }
188
189 pub fn push(
190 &self,
191 obj: &gst::glib::Object,
192 item: DataQueueItem,
193 ) -> Result<PushedStatus, DataQueueItem> {
194 let mut inner = self.0.lock().unwrap();
195
196 if inner.state == DataQueueState::Stopped {
197 gst::debug!(
198 DATA_QUEUE_CAT,
199 obj = obj,
200 "Rejecting item {item:?} in state {:?}",
201 inner.state
202 );
203 return Err(item);
204 }
205
206 gst::debug!(DATA_QUEUE_CAT, obj = obj, "Pushing item {item:?}");
207
208 let (count, bytes) = item.size();
209 let queue_ts = inner.queue.iter().find_map(|i| i.timestamp());
210 let ts = item.timestamp();
211
212 if let Some(max) = inner.max_size_buffers
213 && max <= inner.cur_level_buffers
214 {
215 gst::debug!(
216 DATA_QUEUE_CAT,
217 obj = obj,
218 "Queue is full (buffers): {max} <= {}",
219 inner.cur_level_buffers
220 );
221 return Err(item);
222 }
223
224 if let Some(max) = inner.max_size_bytes
225 && max <= inner.cur_level_bytes
226 {
227 gst::debug!(
228 DATA_QUEUE_CAT,
229 obj = obj,
230 "Queue is full (bytes): {max} <= {}",
231 inner.cur_level_bytes
232 );
233 return Err(item);
234 }
235
236 let level = if let (Some(queue_ts), Some(ts)) = (queue_ts, ts) {
238 let level = if queue_ts > ts {
239 queue_ts - ts
240 } else {
241 ts - queue_ts
242 };
243
244 if inner.max_size_time.opt_le(level).unwrap_or(false) {
245 gst::debug!(
246 DATA_QUEUE_CAT,
247 obj = obj,
248 "Queue is full (time): {} <= {level}",
249 inner.max_size_time.display(),
250 );
251 return Err(item);
252 }
253
254 level
255 } else {
256 gst::ClockTime::ZERO
257 };
258
259 use PushedStatus::*;
260 match inner.pushed_status {
261 GotBuffers => (),
262 FirstBuffer => inner.pushed_status = GotBuffers,
263 PendingBuffers => {
264 if let DataQueueItem::Buffer(_) | DataQueueItem::BufferList(_) = item {
265 inner.pushed_status = PushedStatus::FirstBuffer;
266 inner.upstream_ctx = Context::current();
267 }
268 }
269 }
270
271 inner.queue.push_back(item);
272 inner.cur_level_buffers += count;
273 inner.cur_level_bytes += bytes;
274 inner.cur_level_time = level;
275
276 inner.wake();
277
278 Ok(inner.pushed_status)
279 }
280
281 #[allow(clippy::should_implement_trait)]
283 pub async fn next(&mut self) -> Option<DataQueueItem> {
284 loop {
285 let pending_fut = {
286 let mut inner = self.0.lock().unwrap();
287 match inner.state {
288 DataQueueState::Started => match inner.queue.pop_front() {
289 None => {
290 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue is empty");
291 }
292 Some(item) => {
293 gst::debug!(
294 DATA_QUEUE_CAT,
295 obj = inner.element,
296 "Popped item {:?}",
297 item
298 );
299
300 let (count, bytes) = item.size();
301 inner.cur_level_buffers -= count;
302 inner.cur_level_bytes -= bytes;
303
304 return Some(item);
305 }
306 },
307 DataQueueState::Stopped => {
308 gst::debug!(DATA_QUEUE_CAT, obj = inner.element, "Data queue Stopped");
309 return None;
310 }
311 }
312
313 let (pending_fut, abort_handle) = abortable(future::pending::<()>());
314 inner.pending_handle = Some(abort_handle);
315
316 pending_fut
317 };
318
319 let _ = pending_fut.await;
320 }
321 }
322}
323
324#[derive(Debug)]
325pub struct DataQueueBuilder(DataQueueInner);
326
327impl DataQueueBuilder {
328 fn new(element: &gst::Element, src_pad: &gst::Pad) -> DataQueueBuilder {
329 DataQueueBuilder(DataQueueInner {
330 element: element.clone(),
331 upstream_ctx: None,
332 src_pad: src_pad.clone(),
333 state: DataQueueState::Stopped,
334 queue: VecDeque::new(),
335 pushed_status: PushedStatus::default(),
336 cur_level_buffers: 0,
337 cur_level_bytes: 0,
338 cur_level_time: gst::ClockTime::ZERO,
339 max_size_buffers: None,
340 max_size_bytes: None,
341 max_size_time: None,
342 pending_handle: None,
343 })
344 }
345
346 pub fn max_size_buffers(mut self, max_size_buffers: u32) -> Self {
347 self.0.max_size_buffers = (max_size_buffers > 0).then_some(max_size_buffers);
348 self
349 }
350
351 pub fn max_size_bytes(mut self, max_size_bytes: u32) -> Self {
352 self.0.max_size_bytes = (max_size_bytes > 0).then_some(max_size_bytes);
353 self
354 }
355
356 pub fn max_size_time(mut self, max_size_time: gst::ClockTime) -> Self {
357 self.0.max_size_time = (!max_size_time.is_zero()).then_some(max_size_time);
358 self
359 }
360
361 pub fn build(self) -> DataQueue {
362 DataQueue(Arc::new(Mutex::new(self.0)))
363 }
364}