1use std::{
4 mem, panic,
5 pin::Pin,
6 ptr,
7 sync::{Arc, Mutex},
8 task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{ffi, AppSink};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22 new_preroll: Option<
23 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24 >,
25 new_sample: Option<
26 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27 >,
28 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29 propose_allocation:
30 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31 #[cfg(not(panic = "abort"))]
32 panicked: AtomicBool,
33 callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40 pub fn builder() -> AppSinkCallbacksBuilder {
41 skip_assert_initialized!();
42 AppSinkCallbacksBuilder {
43 eos: None,
44 new_preroll: None,
45 new_sample: None,
46 new_event: None,
47 propose_allocation: None,
48 }
49 }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56 new_preroll: Option<
57 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58 >,
59 new_sample: Option<
60 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61 >,
62 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63 propose_allocation:
64 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68 pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69 Self {
70 eos: Some(Box::new(eos)),
71 ..self
72 }
73 }
74
75 pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76 if predicate {
77 self.eos(eos)
78 } else {
79 self
80 }
81 }
82
83 pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
84 if let Some(eos) = eos {
85 self.eos(eos)
86 } else {
87 self
88 }
89 }
90
91 pub fn new_preroll<
92 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
93 >(
94 self,
95 new_preroll: F,
96 ) -> Self {
97 Self {
98 new_preroll: Some(Box::new(new_preroll)),
99 ..self
100 }
101 }
102
103 pub fn new_preroll_if<
104 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
105 >(
106 self,
107 new_preroll: F,
108 predicate: bool,
109 ) -> Self {
110 if predicate {
111 self.new_preroll(new_preroll)
112 } else {
113 self
114 }
115 }
116
117 pub fn new_preroll_if_some<
118 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
119 >(
120 self,
121 new_preroll: Option<F>,
122 ) -> Self {
123 if let Some(new_preroll) = new_preroll {
124 self.new_preroll(new_preroll)
125 } else {
126 self
127 }
128 }
129
130 pub fn new_sample<
131 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
132 >(
133 self,
134 new_sample: F,
135 ) -> Self {
136 Self {
137 new_sample: Some(Box::new(new_sample)),
138 ..self
139 }
140 }
141
142 pub fn new_sample_if<
143 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
144 >(
145 self,
146 new_sample: F,
147 predicate: bool,
148 ) -> Self {
149 if predicate {
150 self.new_sample(new_sample)
151 } else {
152 self
153 }
154 }
155
156 pub fn new_sample_if_some<
157 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
158 >(
159 self,
160 new_sample: Option<F>,
161 ) -> Self {
162 if let Some(new_sample) = new_sample {
163 self.new_sample(new_sample)
164 } else {
165 self
166 }
167 }
168
169 #[cfg(feature = "v1_20")]
170 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
171 pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
172 Self {
173 new_event: Some(Box::new(new_event)),
174 ..self
175 }
176 }
177
178 #[cfg(feature = "v1_20")]
179 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
180 pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
181 self,
182 new_event: F,
183 predicate: bool,
184 ) -> Self {
185 if predicate {
186 self.new_event(new_event)
187 } else {
188 self
189 }
190 }
191
192 #[cfg(feature = "v1_20")]
193 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
194 pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
195 self,
196 new_event: Option<F>,
197 ) -> Self {
198 if let Some(new_event) = new_event {
199 self.new_event(new_event)
200 } else {
201 self
202 }
203 }
204
205 #[cfg(feature = "v1_24")]
206 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
207 pub fn propose_allocation<
208 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
209 >(
210 self,
211 propose_allocation: F,
212 ) -> Self {
213 Self {
214 propose_allocation: Some(Box::new(propose_allocation)),
215 ..self
216 }
217 }
218
219 #[cfg(feature = "v1_24")]
220 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
221 pub fn propose_allocation_if<
222 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
223 >(
224 self,
225 propose_allocation: F,
226 predicate: bool,
227 ) -> Self {
228 if predicate {
229 self.propose_allocation(propose_allocation)
230 } else {
231 self
232 }
233 }
234
235 #[cfg(feature = "v1_24")]
236 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
237 pub fn propose_allocation_if_some<
238 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
239 >(
240 self,
241 propose_allocation: Option<F>,
242 ) -> Self {
243 if let Some(propose_allocation) = propose_allocation {
244 self.propose_allocation(propose_allocation)
245 } else {
246 self
247 }
248 }
249
250 #[must_use = "Building the callbacks without using them has no effect"]
251 pub fn build(self) -> AppSinkCallbacks {
252 let have_eos = self.eos.is_some();
253 let have_new_preroll = self.new_preroll.is_some();
254 let have_new_sample = self.new_sample.is_some();
255 let have_new_event = self.new_event.is_some();
256 let have_propose_allocation = self.propose_allocation.is_some();
257
258 AppSinkCallbacks {
259 eos: self.eos,
260 new_preroll: self.new_preroll,
261 new_sample: self.new_sample,
262 new_event: self.new_event,
263 propose_allocation: self.propose_allocation,
264 #[cfg(not(panic = "abort"))]
265 panicked: AtomicBool::new(false),
266 callbacks: ffi::GstAppSinkCallbacks {
267 eos: if have_eos { Some(trampoline_eos) } else { None },
268 new_preroll: if have_new_preroll {
269 Some(trampoline_new_preroll)
270 } else {
271 None
272 },
273 new_sample: if have_new_sample {
274 Some(trampoline_new_sample)
275 } else {
276 None
277 },
278 new_event: if have_new_event {
279 Some(trampoline_new_event)
280 } else {
281 None
282 },
283 propose_allocation: if have_propose_allocation {
284 Some(trampoline_propose_allocation)
285 } else {
286 None
287 },
288 _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
289 },
290 }
291 }
292}
293
294unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
295 let callbacks = callbacks as *mut AppSinkCallbacks;
296 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
297
298 #[cfg(not(panic = "abort"))]
299 if (*callbacks).panicked.load(Ordering::Relaxed) {
300 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
301 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
302 return;
303 }
304
305 if let Some(ref mut eos) = (*callbacks).eos {
306 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
307 match result {
308 Ok(result) => result,
309 Err(err) => {
310 #[cfg(panic = "abort")]
311 {
312 unreachable!("{err:?}");
313 }
314 #[cfg(not(panic = "abort"))]
315 {
316 (*callbacks).panicked.store(true, Ordering::Relaxed);
317 gst::subclass::post_panic_error_message(
318 element.upcast_ref(),
319 element.upcast_ref(),
320 Some(err),
321 );
322 }
323 }
324 }
325 }
326}
327
328unsafe extern "C" fn trampoline_new_preroll(
329 appsink: *mut ffi::GstAppSink,
330 callbacks: gpointer,
331) -> gst::ffi::GstFlowReturn {
332 let callbacks = callbacks as *mut AppSinkCallbacks;
333 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
334
335 #[cfg(not(panic = "abort"))]
336 if (*callbacks).panicked.load(Ordering::Relaxed) {
337 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
338 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
339 return gst::FlowReturn::Error.into_glib();
340 }
341
342 let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
343 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
344 match result {
345 Ok(result) => result,
346 Err(err) => {
347 #[cfg(panic = "abort")]
348 {
349 unreachable!("{err:?}");
350 }
351 #[cfg(not(panic = "abort"))]
352 {
353 (*callbacks).panicked.store(true, Ordering::Relaxed);
354 gst::subclass::post_panic_error_message(
355 element.upcast_ref(),
356 element.upcast_ref(),
357 Some(err),
358 );
359
360 gst::FlowReturn::Error
361 }
362 }
363 }
364 } else {
365 gst::FlowReturn::Error
366 };
367
368 ret.into_glib()
369}
370
371unsafe extern "C" fn trampoline_new_sample(
372 appsink: *mut ffi::GstAppSink,
373 callbacks: gpointer,
374) -> gst::ffi::GstFlowReturn {
375 let callbacks = callbacks as *mut AppSinkCallbacks;
376 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
377
378 #[cfg(not(panic = "abort"))]
379 if (*callbacks).panicked.load(Ordering::Relaxed) {
380 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
381 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
382 return gst::FlowReturn::Error.into_glib();
383 }
384
385 let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
386 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
387 match result {
388 Ok(result) => result,
389 Err(err) => {
390 #[cfg(panic = "abort")]
391 {
392 unreachable!("{err:?}");
393 }
394 #[cfg(not(panic = "abort"))]
395 {
396 (*callbacks).panicked.store(true, Ordering::Relaxed);
397 gst::subclass::post_panic_error_message(
398 element.upcast_ref(),
399 element.upcast_ref(),
400 Some(err),
401 );
402
403 gst::FlowReturn::Error
404 }
405 }
406 }
407 } else {
408 gst::FlowReturn::Error
409 };
410
411 ret.into_glib()
412}
413
414unsafe extern "C" fn trampoline_new_event(
415 appsink: *mut ffi::GstAppSink,
416 callbacks: gpointer,
417) -> glib::ffi::gboolean {
418 let callbacks = callbacks as *mut AppSinkCallbacks;
419 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
420
421 #[cfg(not(panic = "abort"))]
422 if (*callbacks).panicked.load(Ordering::Relaxed) {
423 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
424 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
425 return false.into_glib();
426 }
427
428 let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
429 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
430 match result {
431 Ok(result) => result,
432 Err(err) => {
433 #[cfg(panic = "abort")]
434 {
435 unreachable!("{err:?}");
436 }
437 #[cfg(not(panic = "abort"))]
438 {
439 (*callbacks).panicked.store(true, Ordering::Relaxed);
440 gst::subclass::post_panic_error_message(
441 element.upcast_ref(),
442 element.upcast_ref(),
443 Some(err),
444 );
445
446 false
447 }
448 }
449 }
450 } else {
451 false
452 };
453
454 ret.into_glib()
455}
456
457unsafe extern "C" fn trampoline_propose_allocation(
458 appsink: *mut ffi::GstAppSink,
459 query: *mut gst::ffi::GstQuery,
460 callbacks: gpointer,
461) -> glib::ffi::gboolean {
462 let callbacks = callbacks as *mut AppSinkCallbacks;
463 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
464
465 #[cfg(not(panic = "abort"))]
466 if (*callbacks).panicked.load(Ordering::Relaxed) {
467 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
468 gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
469 return false.into_glib();
470 }
471
472 let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
473 let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
474 gst::QueryViewMut::Allocation(allocation) => allocation,
475 _ => unreachable!(),
476 };
477 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
478 propose_allocation(&element, query)
479 }));
480 match result {
481 Ok(result) => result,
482 Err(err) => {
483 #[cfg(panic = "abort")]
484 {
485 unreachable!("{err:?}");
486 }
487 #[cfg(not(panic = "abort"))]
488 {
489 (*callbacks).panicked.store(true, Ordering::Relaxed);
490 gst::subclass::post_panic_error_message(
491 element.upcast_ref(),
492 element.upcast_ref(),
493 Some(err),
494 );
495 false
496 }
497 }
498 }
499 } else {
500 false
501 };
502
503 ret.into_glib()
504}
505
506unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
507 let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
508}
509
510impl AppSink {
511 pub fn builder<'a>() -> AppSinkBuilder<'a> {
516 assert_initialized_main_thread!();
517 AppSinkBuilder {
518 builder: gst::Object::builder(),
519 callbacks: None,
520 drop_out_of_segment: None,
521 }
522 }
523
524 #[doc(alias = "gst_app_sink_set_callbacks")]
525 pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
526 unsafe {
527 let sink = self.to_glib_none().0;
528
529 #[cfg(not(feature = "v1_18"))]
530 {
531 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
532 std::sync::OnceLock::new();
533
534 let set_once_quark = SET_ONCE_QUARK
535 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
536
537 if gst::version() < (1, 16, 3, 0) {
540 if !glib::gobject_ffi::g_object_get_qdata(
541 sink as *mut _,
542 set_once_quark.into_glib(),
543 )
544 .is_null()
545 {
546 panic!("AppSink callbacks can only be set once");
547 }
548
549 glib::gobject_ffi::g_object_set_qdata(
550 sink as *mut _,
551 set_once_quark.into_glib(),
552 1 as *mut _,
553 );
554 }
555 }
556
557 ffi::gst_app_sink_set_callbacks(
558 sink,
559 mut_override(&callbacks.callbacks),
560 Box::into_raw(Box::new(callbacks)) as *mut _,
561 Some(destroy_callbacks),
562 );
563 }
564 }
565
566 #[doc(alias = "drop-out-of-segment")]
567 pub fn drops_out_of_segment(&self) -> bool {
568 unsafe {
569 from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
570 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
571 ))
572 }
573 }
574
575 #[doc(alias = "max-bitrate")]
576 #[doc(alias = "gst_base_sink_get_max_bitrate")]
577 pub fn max_bitrate(&self) -> u64 {
578 unsafe {
579 gst_base::ffi::gst_base_sink_get_max_bitrate(
580 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
581 )
582 }
583 }
584
585 #[doc(alias = "max-lateness")]
586 #[doc(alias = "gst_base_sink_get_max_lateness")]
587 pub fn max_lateness(&self) -> i64 {
588 unsafe {
589 gst_base::ffi::gst_base_sink_get_max_lateness(
590 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
591 )
592 }
593 }
594
595 #[doc(alias = "processing-deadline")]
596 #[cfg(feature = "v1_16")]
597 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
598 #[doc(alias = "gst_base_sink_get_processing_deadline")]
599 pub fn processing_deadline(&self) -> gst::ClockTime {
600 unsafe {
601 try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
602 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
603 ))
604 .expect("undefined processing_deadline")
605 }
606 }
607
608 #[doc(alias = "render-delay")]
609 #[doc(alias = "gst_base_sink_get_render_delay")]
610 pub fn render_delay(&self) -> gst::ClockTime {
611 unsafe {
612 try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
613 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
614 ))
615 .expect("undefined render_delay")
616 }
617 }
618
619 #[cfg(feature = "v1_18")]
620 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
621 #[doc(alias = "gst_base_sink_get_stats")]
622 pub fn stats(&self) -> gst::Structure {
623 unsafe {
624 from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
625 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
626 ))
627 }
628 }
629
630 #[doc(alias = "sync")]
631 pub fn is_sync(&self) -> bool {
632 unsafe {
633 from_glib(gst_base::ffi::gst_base_sink_get_sync(
634 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
635 ))
636 }
637 }
638
639 #[doc(alias = "throttle-time")]
640 #[doc(alias = "gst_base_sink_get_throttle_time")]
641 pub fn throttle_time(&self) -> u64 {
642 unsafe {
643 gst_base::ffi::gst_base_sink_get_throttle_time(
644 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
645 )
646 }
647 }
648
649 #[doc(alias = "ts-offset")]
650 #[doc(alias = "gst_base_sink_get_ts_offset")]
651 pub fn ts_offset(&self) -> gst::ClockTimeDiff {
652 unsafe {
653 gst_base::ffi::gst_base_sink_get_ts_offset(
654 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
655 )
656 }
657 }
658
659 #[doc(alias = "async")]
660 #[doc(alias = "gst_base_sink_is_async_enabled")]
661 pub fn is_async(&self) -> bool {
662 unsafe {
663 from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
664 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
665 ))
666 }
667 }
668
669 #[doc(alias = "last-sample")]
670 pub fn enables_last_sample(&self) -> bool {
671 unsafe {
672 from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
673 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
674 ))
675 }
676 }
677
678 #[doc(alias = "qos")]
679 #[doc(alias = "gst_base_sink_is_qos_enabled")]
680 pub fn is_qos(&self) -> bool {
681 unsafe {
682 from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
683 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
684 ))
685 }
686 }
687
688 #[doc(alias = "async")]
689 #[doc(alias = "gst_base_sink_set_async_enabled")]
690 pub fn set_async(&self, enabled: bool) {
691 unsafe {
692 gst_base::ffi::gst_base_sink_set_async_enabled(
693 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
694 enabled.into_glib(),
695 );
696 }
697 }
698
699 #[doc(alias = "drop-out-of-segment")]
700 #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
701 pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
702 unsafe {
703 gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
704 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
705 drop_out_of_segment.into_glib(),
706 );
707 }
708 }
709
710 #[doc(alias = "last-sample")]
711 pub fn set_enable_last_sample(&self, enabled: bool) {
712 unsafe {
713 gst_base::ffi::gst_base_sink_set_last_sample_enabled(
714 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
715 enabled.into_glib(),
716 );
717 }
718 }
719
720 #[doc(alias = "max-bitrate")]
721 #[doc(alias = "gst_base_sink_set_max_bitrate")]
722 pub fn set_max_bitrate(&self, max_bitrate: u64) {
723 unsafe {
724 gst_base::ffi::gst_base_sink_set_max_bitrate(
725 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
726 max_bitrate,
727 );
728 }
729 }
730
731 #[doc(alias = "max-lateness")]
732 #[doc(alias = "gst_base_sink_set_max_lateness")]
733 pub fn set_max_lateness(&self, max_lateness: i64) {
734 unsafe {
735 gst_base::ffi::gst_base_sink_set_max_lateness(
736 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
737 max_lateness,
738 );
739 }
740 }
741
742 #[doc(alias = "processing-deadline")]
743 #[cfg(feature = "v1_16")]
744 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
745 #[doc(alias = "gst_base_sink_set_processing_deadline")]
746 pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
747 unsafe {
748 gst_base::ffi::gst_base_sink_set_processing_deadline(
749 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
750 processing_deadline.into_glib(),
751 );
752 }
753 }
754
755 #[doc(alias = "qos")]
756 #[doc(alias = "gst_base_sink_set_qos_enabled")]
757 pub fn set_qos(&self, enabled: bool) {
758 unsafe {
759 gst_base::ffi::gst_base_sink_set_qos_enabled(
760 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
761 enabled.into_glib(),
762 );
763 }
764 }
765
766 #[doc(alias = "render-delay")]
767 #[doc(alias = "gst_base_sink_set_render_delay")]
768 pub fn set_render_delay(&self, delay: gst::ClockTime) {
769 unsafe {
770 gst_base::ffi::gst_base_sink_set_render_delay(
771 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
772 delay.into_glib(),
773 );
774 }
775 }
776
777 #[doc(alias = "sync")]
778 #[doc(alias = "gst_base_sink_set_sync")]
779 pub fn set_sync(&self, sync: bool) {
780 unsafe {
781 gst_base::ffi::gst_base_sink_set_sync(
782 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
783 sync.into_glib(),
784 );
785 }
786 }
787
788 #[doc(alias = "throttle-time")]
789 #[doc(alias = "gst_base_sink_set_throttle_time")]
790 pub fn set_throttle_time(&self, throttle: u64) {
791 unsafe {
792 gst_base::ffi::gst_base_sink_set_throttle_time(
793 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
794 throttle,
795 );
796 }
797 }
798
799 #[doc(alias = "ts-offset")]
800 #[doc(alias = "gst_base_sink_set_ts_offset")]
801 pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
802 unsafe {
803 gst_base::ffi::gst_base_sink_set_ts_offset(
804 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
805 offset,
806 );
807 }
808 }
809
810 #[doc(alias = "async")]
811 pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
812 &self,
813 f: F,
814 ) -> glib::SignalHandlerId {
815 unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
816 this: *mut ffi::GstAppSink,
817 _param_spec: glib::ffi::gpointer,
818 f: glib::ffi::gpointer,
819 ) {
820 let f: &F = &*(f as *const F);
821 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
822 }
823 unsafe {
824 let f: Box<F> = Box::new(f);
825 glib::signal::connect_raw(
826 self.as_ptr() as *mut _,
827 b"notify::async\0".as_ptr() as *const _,
828 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
829 notify_async_trampoline::<F> as *const (),
830 )),
831 Box::into_raw(f),
832 )
833 }
834 }
835
836 #[doc(alias = "blocksize")]
837 pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
838 &self,
839 f: F,
840 ) -> glib::SignalHandlerId {
841 unsafe extern "C" fn notify_blocksize_trampoline<
842 F: Fn(&AppSink) + Send + Sync + 'static,
843 >(
844 this: *mut ffi::GstAppSink,
845 _param_spec: glib::ffi::gpointer,
846 f: glib::ffi::gpointer,
847 ) {
848 let f: &F = &*(f as *const F);
849 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
850 }
851 unsafe {
852 let f: Box<F> = Box::new(f);
853 glib::signal::connect_raw(
854 self.as_ptr() as *mut _,
855 b"notify::blocksize\0".as_ptr() as *const _,
856 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
857 notify_blocksize_trampoline::<F> as *const (),
858 )),
859 Box::into_raw(f),
860 )
861 }
862 }
863
864 #[doc(alias = "enable-last-sample")]
865 pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
866 &self,
867 f: F,
868 ) -> glib::SignalHandlerId {
869 unsafe extern "C" fn notify_enable_last_sample_trampoline<
870 F: Fn(&AppSink) + Send + Sync + 'static,
871 >(
872 this: *mut ffi::GstAppSink,
873 _param_spec: glib::ffi::gpointer,
874 f: glib::ffi::gpointer,
875 ) {
876 let f: &F = &*(f as *const F);
877 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
878 }
879 unsafe {
880 let f: Box<F> = Box::new(f);
881 glib::signal::connect_raw(
882 self.as_ptr() as *mut _,
883 b"notify::enable-last-sample\0".as_ptr() as *const _,
884 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
885 notify_enable_last_sample_trampoline::<F> as *const (),
886 )),
887 Box::into_raw(f),
888 )
889 }
890 }
891
892 #[doc(alias = "last-sample")]
893 pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
894 &self,
895 f: F,
896 ) -> glib::SignalHandlerId {
897 unsafe extern "C" fn notify_last_sample_trampoline<
898 F: Fn(&AppSink) + Send + Sync + 'static,
899 >(
900 this: *mut ffi::GstAppSink,
901 _param_spec: glib::ffi::gpointer,
902 f: glib::ffi::gpointer,
903 ) {
904 let f: &F = &*(f as *const F);
905 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
906 }
907 unsafe {
908 let f: Box<F> = Box::new(f);
909 glib::signal::connect_raw(
910 self.as_ptr() as *mut _,
911 b"notify::last-sample\0".as_ptr() as *const _,
912 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
913 notify_last_sample_trampoline::<F> as *const (),
914 )),
915 Box::into_raw(f),
916 )
917 }
918 }
919
920 #[doc(alias = "max-bitrate")]
921 pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
922 &self,
923 f: F,
924 ) -> glib::SignalHandlerId {
925 unsafe extern "C" fn notify_max_bitrate_trampoline<
926 F: Fn(&AppSink) + Send + Sync + 'static,
927 >(
928 this: *mut ffi::GstAppSink,
929 _param_spec: glib::ffi::gpointer,
930 f: glib::ffi::gpointer,
931 ) {
932 let f: &F = &*(f as *const F);
933 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
934 }
935 unsafe {
936 let f: Box<F> = Box::new(f);
937 glib::signal::connect_raw(
938 self.as_ptr() as *mut _,
939 b"notify::max-bitrate\0".as_ptr() as *const _,
940 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
941 notify_max_bitrate_trampoline::<F> as *const (),
942 )),
943 Box::into_raw(f),
944 )
945 }
946 }
947
948 #[doc(alias = "max-lateness")]
949 pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
950 &self,
951 f: F,
952 ) -> glib::SignalHandlerId {
953 unsafe extern "C" fn notify_max_lateness_trampoline<
954 F: Fn(&AppSink) + Send + Sync + 'static,
955 >(
956 this: *mut ffi::GstAppSink,
957 _param_spec: glib::ffi::gpointer,
958 f: glib::ffi::gpointer,
959 ) {
960 let f: &F = &*(f as *const F);
961 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
962 }
963 unsafe {
964 let f: Box<F> = Box::new(f);
965 glib::signal::connect_raw(
966 self.as_ptr() as *mut _,
967 b"notify::max-lateness\0".as_ptr() as *const _,
968 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
969 notify_max_lateness_trampoline::<F> as *const (),
970 )),
971 Box::into_raw(f),
972 )
973 }
974 }
975
976 #[cfg(feature = "v1_16")]
977 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
978 #[doc(alias = "processing-deadline")]
979 pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
980 &self,
981 f: F,
982 ) -> glib::SignalHandlerId {
983 unsafe extern "C" fn notify_processing_deadline_trampoline<
984 F: Fn(&AppSink) + Send + Sync + 'static,
985 >(
986 this: *mut ffi::GstAppSink,
987 _param_spec: glib::ffi::gpointer,
988 f: glib::ffi::gpointer,
989 ) {
990 let f: &F = &*(f as *const F);
991 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
992 }
993 unsafe {
994 let f: Box<F> = Box::new(f);
995 glib::signal::connect_raw(
996 self.as_ptr() as *mut _,
997 b"notify::processing-deadline\0".as_ptr() as *const _,
998 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
999 notify_processing_deadline_trampoline::<F> as *const (),
1000 )),
1001 Box::into_raw(f),
1002 )
1003 }
1004 }
1005
1006 #[doc(alias = "qos")]
1007 pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1008 &self,
1009 f: F,
1010 ) -> glib::SignalHandlerId {
1011 unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1012 this: *mut ffi::GstAppSink,
1013 _param_spec: glib::ffi::gpointer,
1014 f: glib::ffi::gpointer,
1015 ) {
1016 let f: &F = &*(f as *const F);
1017 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1018 }
1019 unsafe {
1020 let f: Box<F> = Box::new(f);
1021 glib::signal::connect_raw(
1022 self.as_ptr() as *mut _,
1023 b"notify::qos\0".as_ptr() as *const _,
1024 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1025 notify_qos_trampoline::<F> as *const (),
1026 )),
1027 Box::into_raw(f),
1028 )
1029 }
1030 }
1031
1032 #[doc(alias = "render-delay")]
1033 pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1034 &self,
1035 f: F,
1036 ) -> glib::SignalHandlerId {
1037 unsafe extern "C" fn notify_render_delay_trampoline<
1038 F: Fn(&AppSink) + Send + Sync + 'static,
1039 >(
1040 this: *mut ffi::GstAppSink,
1041 _param_spec: glib::ffi::gpointer,
1042 f: glib::ffi::gpointer,
1043 ) {
1044 let f: &F = &*(f as *const F);
1045 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1046 }
1047 unsafe {
1048 let f: Box<F> = Box::new(f);
1049 glib::signal::connect_raw(
1050 self.as_ptr() as *mut _,
1051 b"notify::render-delay\0".as_ptr() as *const _,
1052 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1053 notify_render_delay_trampoline::<F> as *const (),
1054 )),
1055 Box::into_raw(f),
1056 )
1057 }
1058 }
1059
1060 #[cfg(feature = "v1_18")]
1061 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1062 #[doc(alias = "stats")]
1063 pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1064 &self,
1065 f: F,
1066 ) -> glib::SignalHandlerId {
1067 unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1068 this: *mut ffi::GstAppSink,
1069 _param_spec: glib::ffi::gpointer,
1070 f: glib::ffi::gpointer,
1071 ) {
1072 let f: &F = &*(f as *const F);
1073 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1074 }
1075 unsafe {
1076 let f: Box<F> = Box::new(f);
1077 glib::signal::connect_raw(
1078 self.as_ptr() as *mut _,
1079 b"notify::stats\0".as_ptr() as *const _,
1080 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1081 notify_stats_trampoline::<F> as *const (),
1082 )),
1083 Box::into_raw(f),
1084 )
1085 }
1086 }
1087
1088 #[doc(alias = "sync")]
1089 pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1090 &self,
1091 f: F,
1092 ) -> glib::SignalHandlerId {
1093 unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1094 this: *mut ffi::GstAppSink,
1095 _param_spec: glib::ffi::gpointer,
1096 f: glib::ffi::gpointer,
1097 ) {
1098 let f: &F = &*(f as *const F);
1099 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1100 }
1101 unsafe {
1102 let f: Box<F> = Box::new(f);
1103 glib::signal::connect_raw(
1104 self.as_ptr() as *mut _,
1105 b"notify::sync\0".as_ptr() as *const _,
1106 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1107 notify_sync_trampoline::<F> as *const (),
1108 )),
1109 Box::into_raw(f),
1110 )
1111 }
1112 }
1113
1114 #[doc(alias = "throttle-time")]
1115 pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1116 &self,
1117 f: F,
1118 ) -> glib::SignalHandlerId {
1119 unsafe extern "C" fn notify_throttle_time_trampoline<
1120 F: Fn(&AppSink) + Send + Sync + 'static,
1121 >(
1122 this: *mut ffi::GstAppSink,
1123 _param_spec: glib::ffi::gpointer,
1124 f: glib::ffi::gpointer,
1125 ) {
1126 let f: &F = &*(f as *const F);
1127 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1128 }
1129 unsafe {
1130 let f: Box<F> = Box::new(f);
1131 glib::signal::connect_raw(
1132 self.as_ptr() as *mut _,
1133 b"notify::throttle-time\0".as_ptr() as *const _,
1134 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1135 notify_throttle_time_trampoline::<F> as *const (),
1136 )),
1137 Box::into_raw(f),
1138 )
1139 }
1140 }
1141
1142 #[doc(alias = "ts-offset")]
1143 pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1144 &self,
1145 f: F,
1146 ) -> glib::SignalHandlerId {
1147 unsafe extern "C" fn notify_ts_offset_trampoline<
1148 F: Fn(&AppSink) + Send + Sync + 'static,
1149 >(
1150 this: *mut ffi::GstAppSink,
1151 _param_spec: glib::ffi::gpointer,
1152 f: glib::ffi::gpointer,
1153 ) {
1154 let f: &F = &*(f as *const F);
1155 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1156 }
1157 unsafe {
1158 let f: Box<F> = Box::new(f);
1159 glib::signal::connect_raw(
1160 self.as_ptr() as *mut _,
1161 b"notify::ts-offset\0".as_ptr() as *const _,
1162 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1163 notify_ts_offset_trampoline::<F> as *const (),
1164 )),
1165 Box::into_raw(f),
1166 )
1167 }
1168 }
1169
1170 pub fn stream(&self) -> AppSinkStream {
1171 AppSinkStream::new(self)
1172 }
1173}
1174
1175#[must_use = "The builder must be built to be used"]
1180pub struct AppSinkBuilder<'a> {
1181 builder: gst::gobject::GObjectBuilder<'a, AppSink>,
1182 callbacks: Option<AppSinkCallbacks>,
1183 drop_out_of_segment: Option<bool>,
1184}
1185
1186impl<'a> AppSinkBuilder<'a> {
1187 #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1195 pub fn build(self) -> AppSink {
1196 let appsink = self.builder.build().unwrap();
1197
1198 if let Some(callbacks) = self.callbacks {
1199 appsink.set_callbacks(callbacks);
1200 }
1201
1202 if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1203 appsink.set_drop_out_of_segment(drop_out_of_segment);
1204 }
1205
1206 appsink
1207 }
1208
1209 pub fn async_(self, async_: bool) -> Self {
1210 Self {
1211 builder: self.builder.property("async", async_),
1212 ..self
1213 }
1214 }
1215
1216 pub fn buffer_list(self, buffer_list: bool) -> Self {
1217 Self {
1218 builder: self.builder.property("buffer-list", buffer_list),
1219 ..self
1220 }
1221 }
1222
1223 pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1224 Self {
1225 callbacks: Some(callbacks),
1226 ..self
1227 }
1228 }
1229
1230 pub fn caps(self, caps: &'a gst::Caps) -> Self {
1231 Self {
1232 builder: self.builder.property("caps", caps),
1233 ..self
1234 }
1235 }
1236
1237 #[cfg_attr(feature = "v1_28", deprecated = "Since 1.28")]
1238 #[allow(deprecated)]
1239 pub fn drop(self, drop: bool) -> Self {
1240 Self {
1241 builder: self.builder.property("drop", drop),
1242 ..self
1243 }
1244 }
1245
1246 pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1247 Self {
1248 builder: self
1249 .builder
1250 .property("drop-out-of-segment", drop_out_of_segment),
1251 ..self
1252 }
1253 }
1254
1255 pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1256 Self {
1257 builder: self
1258 .builder
1259 .property("enable-last-sample", enable_last_sample),
1260 ..self
1261 }
1262 }
1263
1264 pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1265 Self {
1266 builder: self.builder.property("max-bitrate", max_bitrate),
1267 ..self
1268 }
1269 }
1270
1271 pub fn max_buffers(self, max_buffers: u32) -> Self {
1272 Self {
1273 builder: self.builder.property("max-buffers", max_buffers),
1274 ..self
1275 }
1276 }
1277
1278 pub fn max_lateness(self, max_lateness: i64) -> Self {
1279 Self {
1280 builder: self.builder.property("max-lateness", max_lateness),
1281 ..self
1282 }
1283 }
1284
1285 #[cfg(feature = "v1_16")]
1286 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1287 pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1288 Self {
1289 builder: self
1290 .builder
1291 .property("processing-deadline", processing_deadline),
1292 ..self
1293 }
1294 }
1295
1296 pub fn qos(self, qos: bool) -> Self {
1297 Self {
1298 builder: self.builder.property("qos", qos),
1299 ..self
1300 }
1301 }
1302
1303 pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1304 Self {
1305 builder: self.builder.property("render-delay", render_delay),
1306 ..self
1307 }
1308 }
1309
1310 pub fn sync(self, sync: bool) -> Self {
1311 Self {
1312 builder: self.builder.property("sync", sync),
1313 ..self
1314 }
1315 }
1316
1317 pub fn throttle_time(self, throttle_time: u64) -> Self {
1318 Self {
1319 builder: self.builder.property("throttle-time", throttle_time),
1320 ..self
1321 }
1322 }
1323
1324 pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1325 Self {
1326 builder: self.builder.property("ts-offset", ts_offset),
1327 ..self
1328 }
1329 }
1330
1331 pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1332 Self {
1333 builder: self.builder.property("wait-on-eos", wait_on_eos),
1334 ..self
1335 }
1336 }
1337
1338 #[cfg(feature = "v1_24")]
1339 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1340 pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1341 Self {
1342 builder: self.builder.property("max-time", max_time),
1343 ..self
1344 }
1345 }
1346
1347 #[cfg(feature = "v1_24")]
1348 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1349 pub fn max_bytes(self, max_bytes: u64) -> Self {
1350 Self {
1351 builder: self.builder.property("max-bytes", max_bytes),
1352 ..self
1353 }
1354 }
1355
1356 #[cfg(feature = "v1_28")]
1357 #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1358 pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
1359 Self {
1360 builder: self.builder.property("leaky-type", leaky_type),
1361 ..self
1362 }
1363 }
1364
1365 #[cfg(feature = "v1_28")]
1366 #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1367 pub fn silent(self, silent: bool) -> Self {
1368 Self {
1369 builder: self.builder.property("silent", silent),
1370 ..self
1371 }
1372 }
1373
1374 #[inline]
1379 pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
1380 Self {
1381 builder: self.builder.property(name, value),
1382 ..self
1383 }
1384 }
1385
1386 #[inline]
1389 pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
1390 Self {
1391 builder: self.builder.property_from_str(name, value),
1392 ..self
1393 }
1394 }
1395
1396 gst::impl_builder_gvalue_extra_setters!(property_and_name);
1397}
1398
1399#[derive(Debug)]
1400pub struct AppSinkStream {
1401 app_sink: glib::WeakRef<AppSink>,
1402 waker_reference: Arc<Mutex<Option<Waker>>>,
1403}
1404
1405impl AppSinkStream {
1406 fn new(app_sink: &AppSink) -> Self {
1407 skip_assert_initialized!();
1408
1409 let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1410
1411 app_sink.set_callbacks(
1412 AppSinkCallbacks::builder()
1413 .new_sample({
1414 let waker_reference = Arc::clone(&waker_reference);
1415
1416 move |_| {
1417 if let Some(waker) = waker_reference.lock().unwrap().take() {
1418 waker.wake();
1419 }
1420
1421 Ok(gst::FlowSuccess::Ok)
1422 }
1423 })
1424 .eos({
1425 let waker_reference = Arc::clone(&waker_reference);
1426
1427 move |_| {
1428 if let Some(waker) = waker_reference.lock().unwrap().take() {
1429 waker.wake();
1430 }
1431 }
1432 })
1433 .build(),
1434 );
1435
1436 Self {
1437 app_sink: app_sink.downgrade(),
1438 waker_reference,
1439 }
1440 }
1441}
1442
1443impl Drop for AppSinkStream {
1444 fn drop(&mut self) {
1445 #[cfg(not(feature = "v1_18"))]
1446 {
1447 if gst::version() >= (1, 16, 3, 0) {
1450 if let Some(app_sink) = self.app_sink.upgrade() {
1451 app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1452 }
1453 }
1454 }
1455 }
1456}
1457
1458impl Stream for AppSinkStream {
1459 type Item = gst::Sample;
1460
1461 fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1462 let mut waker = self.waker_reference.lock().unwrap();
1463
1464 let Some(app_sink) = self.app_sink.upgrade() else {
1465 return Poll::Ready(None);
1466 };
1467
1468 app_sink
1469 .try_pull_sample(gst::ClockTime::ZERO)
1470 .map(|sample| Poll::Ready(Some(sample)))
1471 .unwrap_or_else(|| {
1472 if app_sink.is_eos() {
1473 return Poll::Ready(None);
1474 }
1475
1476 waker.replace(context.waker().to_owned());
1477
1478 Poll::Pending
1479 })
1480 }
1481}
1482
1483#[cfg(test)]
1484mod tests {
1485 use futures_util::StreamExt;
1486 use gst::prelude::*;
1487
1488 use super::*;
1489
1490 #[test]
1491 fn test_app_sink_stream() {
1492 gst::init().unwrap();
1493
1494 let videotestsrc = gst::ElementFactory::make("videotestsrc")
1495 .property("num-buffers", 5)
1496 .build()
1497 .unwrap();
1498 let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1499
1500 let pipeline = gst::Pipeline::new();
1501 pipeline.add(&videotestsrc).unwrap();
1502 pipeline.add(&appsink).unwrap();
1503
1504 videotestsrc.link(&appsink).unwrap();
1505
1506 let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1507 let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1508
1509 pipeline.set_state(gst::State::Playing).unwrap();
1510 let samples = futures_executor::block_on(samples_future);
1511 pipeline.set_state(gst::State::Null).unwrap();
1512
1513 assert_eq!(samples.len(), 5);
1514 }
1515}