1use std::{
4    mem, panic,
5    pin::Pin,
6    ptr,
7    sync::{Arc, Mutex},
8    task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{ffi, AppSink};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22    new_preroll: Option<
23        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24    >,
25    new_sample: Option<
26        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27    >,
28    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29    propose_allocation:
30        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31    #[cfg(not(panic = "abort"))]
32    panicked: AtomicBool,
33    callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40    pub fn builder() -> AppSinkCallbacksBuilder {
41        skip_assert_initialized!();
42        AppSinkCallbacksBuilder {
43            eos: None,
44            new_preroll: None,
45            new_sample: None,
46            new_event: None,
47            propose_allocation: None,
48        }
49    }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56    new_preroll: Option<
57        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58    >,
59    new_sample: Option<
60        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61    >,
62    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63    propose_allocation:
64        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68    pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69        Self {
70            eos: Some(Box::new(eos)),
71            ..self
72        }
73    }
74
75    pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76        if predicate {
77            self.eos(eos)
78        } else {
79            self
80        }
81    }
82
83    pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
84        if let Some(eos) = eos {
85            self.eos(eos)
86        } else {
87            self
88        }
89    }
90
91    pub fn new_preroll<
92        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
93    >(
94        self,
95        new_preroll: F,
96    ) -> Self {
97        Self {
98            new_preroll: Some(Box::new(new_preroll)),
99            ..self
100        }
101    }
102
103    pub fn new_preroll_if<
104        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
105    >(
106        self,
107        new_preroll: F,
108        predicate: bool,
109    ) -> Self {
110        if predicate {
111            self.new_preroll(new_preroll)
112        } else {
113            self
114        }
115    }
116
117    pub fn new_preroll_if_some<
118        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
119    >(
120        self,
121        new_preroll: Option<F>,
122    ) -> Self {
123        if let Some(new_preroll) = new_preroll {
124            self.new_preroll(new_preroll)
125        } else {
126            self
127        }
128    }
129
130    pub fn new_sample<
131        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
132    >(
133        self,
134        new_sample: F,
135    ) -> Self {
136        Self {
137            new_sample: Some(Box::new(new_sample)),
138            ..self
139        }
140    }
141
142    pub fn new_sample_if<
143        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
144    >(
145        self,
146        new_sample: F,
147        predicate: bool,
148    ) -> Self {
149        if predicate {
150            self.new_sample(new_sample)
151        } else {
152            self
153        }
154    }
155
156    pub fn new_sample_if_some<
157        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
158    >(
159        self,
160        new_sample: Option<F>,
161    ) -> Self {
162        if let Some(new_sample) = new_sample {
163            self.new_sample(new_sample)
164        } else {
165            self
166        }
167    }
168
169    #[cfg(feature = "v1_20")]
170    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
171    pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
172        Self {
173            new_event: Some(Box::new(new_event)),
174            ..self
175        }
176    }
177
178    #[cfg(feature = "v1_20")]
179    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
180    pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
181        self,
182        new_event: F,
183        predicate: bool,
184    ) -> Self {
185        if predicate {
186            self.new_event(new_event)
187        } else {
188            self
189        }
190    }
191
192    #[cfg(feature = "v1_20")]
193    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
194    pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
195        self,
196        new_event: Option<F>,
197    ) -> Self {
198        if let Some(new_event) = new_event {
199            self.new_event(new_event)
200        } else {
201            self
202        }
203    }
204
205    #[cfg(feature = "v1_24")]
206    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
207    pub fn propose_allocation<
208        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
209    >(
210        self,
211        propose_allocation: F,
212    ) -> Self {
213        Self {
214            propose_allocation: Some(Box::new(propose_allocation)),
215            ..self
216        }
217    }
218
219    #[cfg(feature = "v1_24")]
220    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
221    pub fn propose_allocation_if<
222        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
223    >(
224        self,
225        propose_allocation: F,
226        predicate: bool,
227    ) -> Self {
228        if predicate {
229            self.propose_allocation(propose_allocation)
230        } else {
231            self
232        }
233    }
234
235    #[cfg(feature = "v1_24")]
236    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
237    pub fn propose_allocation_if_some<
238        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
239    >(
240        self,
241        propose_allocation: Option<F>,
242    ) -> Self {
243        if let Some(propose_allocation) = propose_allocation {
244            self.propose_allocation(propose_allocation)
245        } else {
246            self
247        }
248    }
249
250    #[must_use = "Building the callbacks without using them has no effect"]
251    pub fn build(self) -> AppSinkCallbacks {
252        let have_eos = self.eos.is_some();
253        let have_new_preroll = self.new_preroll.is_some();
254        let have_new_sample = self.new_sample.is_some();
255        let have_new_event = self.new_event.is_some();
256        let have_propose_allocation = self.propose_allocation.is_some();
257
258        AppSinkCallbacks {
259            eos: self.eos,
260            new_preroll: self.new_preroll,
261            new_sample: self.new_sample,
262            new_event: self.new_event,
263            propose_allocation: self.propose_allocation,
264            #[cfg(not(panic = "abort"))]
265            panicked: AtomicBool::new(false),
266            callbacks: ffi::GstAppSinkCallbacks {
267                eos: if have_eos { Some(trampoline_eos) } else { None },
268                new_preroll: if have_new_preroll {
269                    Some(trampoline_new_preroll)
270                } else {
271                    None
272                },
273                new_sample: if have_new_sample {
274                    Some(trampoline_new_sample)
275                } else {
276                    None
277                },
278                new_event: if have_new_event {
279                    Some(trampoline_new_event)
280                } else {
281                    None
282                },
283                propose_allocation: if have_propose_allocation {
284                    Some(trampoline_propose_allocation)
285                } else {
286                    None
287                },
288                _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
289            },
290        }
291    }
292}
293
294unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
295    let callbacks = callbacks as *mut AppSinkCallbacks;
296    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
297
298    #[cfg(not(panic = "abort"))]
299    if (*callbacks).panicked.load(Ordering::Relaxed) {
300        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
301        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
302        return;
303    }
304
305    if let Some(ref mut eos) = (*callbacks).eos {
306        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
307        match result {
308            Ok(result) => result,
309            Err(err) => {
310                #[cfg(panic = "abort")]
311                {
312                    unreachable!("{err:?}");
313                }
314                #[cfg(not(panic = "abort"))]
315                {
316                    (*callbacks).panicked.store(true, Ordering::Relaxed);
317                    gst::subclass::post_panic_error_message(
318                        element.upcast_ref(),
319                        element.upcast_ref(),
320                        Some(err),
321                    );
322                }
323            }
324        }
325    }
326}
327
328unsafe extern "C" fn trampoline_new_preroll(
329    appsink: *mut ffi::GstAppSink,
330    callbacks: gpointer,
331) -> gst::ffi::GstFlowReturn {
332    let callbacks = callbacks as *mut AppSinkCallbacks;
333    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
334
335    #[cfg(not(panic = "abort"))]
336    if (*callbacks).panicked.load(Ordering::Relaxed) {
337        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
338        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
339        return gst::FlowReturn::Error.into_glib();
340    }
341
342    let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
343        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
344        match result {
345            Ok(result) => result,
346            Err(err) => {
347                #[cfg(panic = "abort")]
348                {
349                    unreachable!("{err:?}");
350                }
351                #[cfg(not(panic = "abort"))]
352                {
353                    (*callbacks).panicked.store(true, Ordering::Relaxed);
354                    gst::subclass::post_panic_error_message(
355                        element.upcast_ref(),
356                        element.upcast_ref(),
357                        Some(err),
358                    );
359
360                    gst::FlowReturn::Error
361                }
362            }
363        }
364    } else {
365        gst::FlowReturn::Error
366    };
367
368    ret.into_glib()
369}
370
371unsafe extern "C" fn trampoline_new_sample(
372    appsink: *mut ffi::GstAppSink,
373    callbacks: gpointer,
374) -> gst::ffi::GstFlowReturn {
375    let callbacks = callbacks as *mut AppSinkCallbacks;
376    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
377
378    #[cfg(not(panic = "abort"))]
379    if (*callbacks).panicked.load(Ordering::Relaxed) {
380        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
381        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
382        return gst::FlowReturn::Error.into_glib();
383    }
384
385    let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
386        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
387        match result {
388            Ok(result) => result,
389            Err(err) => {
390                #[cfg(panic = "abort")]
391                {
392                    unreachable!("{err:?}");
393                }
394                #[cfg(not(panic = "abort"))]
395                {
396                    (*callbacks).panicked.store(true, Ordering::Relaxed);
397                    gst::subclass::post_panic_error_message(
398                        element.upcast_ref(),
399                        element.upcast_ref(),
400                        Some(err),
401                    );
402
403                    gst::FlowReturn::Error
404                }
405            }
406        }
407    } else {
408        gst::FlowReturn::Error
409    };
410
411    ret.into_glib()
412}
413
414unsafe extern "C" fn trampoline_new_event(
415    appsink: *mut ffi::GstAppSink,
416    callbacks: gpointer,
417) -> glib::ffi::gboolean {
418    let callbacks = callbacks as *mut AppSinkCallbacks;
419    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
420
421    #[cfg(not(panic = "abort"))]
422    if (*callbacks).panicked.load(Ordering::Relaxed) {
423        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
424        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
425        return false.into_glib();
426    }
427
428    let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
429        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
430        match result {
431            Ok(result) => result,
432            Err(err) => {
433                #[cfg(panic = "abort")]
434                {
435                    unreachable!("{err:?}");
436                }
437                #[cfg(not(panic = "abort"))]
438                {
439                    (*callbacks).panicked.store(true, Ordering::Relaxed);
440                    gst::subclass::post_panic_error_message(
441                        element.upcast_ref(),
442                        element.upcast_ref(),
443                        Some(err),
444                    );
445
446                    false
447                }
448            }
449        }
450    } else {
451        false
452    };
453
454    ret.into_glib()
455}
456
457unsafe extern "C" fn trampoline_propose_allocation(
458    appsink: *mut ffi::GstAppSink,
459    query: *mut gst::ffi::GstQuery,
460    callbacks: gpointer,
461) -> glib::ffi::gboolean {
462    let callbacks = callbacks as *mut AppSinkCallbacks;
463    let element: Borrowed<AppSink> = from_glib_borrow(appsink);
464
465    #[cfg(not(panic = "abort"))]
466    if (*callbacks).panicked.load(Ordering::Relaxed) {
467        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
468        gst::subclass::post_panic_error_message(element.upcast_ref(), element.upcast_ref(), None);
469        return false.into_glib();
470    }
471
472    let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
473        let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
474            gst::QueryViewMut::Allocation(allocation) => allocation,
475            _ => unreachable!(),
476        };
477        let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
478            propose_allocation(&element, query)
479        }));
480        match result {
481            Ok(result) => result,
482            Err(err) => {
483                #[cfg(panic = "abort")]
484                {
485                    unreachable!("{err:?}");
486                }
487                #[cfg(not(panic = "abort"))]
488                {
489                    (*callbacks).panicked.store(true, Ordering::Relaxed);
490                    gst::subclass::post_panic_error_message(
491                        element.upcast_ref(),
492                        element.upcast_ref(),
493                        Some(err),
494                    );
495                    false
496                }
497            }
498        }
499    } else {
500        false
501    };
502
503    ret.into_glib()
504}
505
506unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
507    let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
508}
509
510impl AppSink {
511    pub fn builder<'a>() -> AppSinkBuilder<'a> {
516        assert_initialized_main_thread!();
517        AppSinkBuilder {
518            builder: gst::Object::builder(),
519            callbacks: None,
520            drop_out_of_segment: None,
521        }
522    }
523
524    #[doc(alias = "gst_app_sink_set_callbacks")]
525    pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
526        unsafe {
527            let sink = self.to_glib_none().0;
528
529            #[cfg(not(feature = "v1_18"))]
530            {
531                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
532                    std::sync::OnceLock::new();
533
534                let set_once_quark = SET_ONCE_QUARK
535                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
536
537                if gst::version() < (1, 16, 3, 0) {
540                    if !glib::gobject_ffi::g_object_get_qdata(
541                        sink as *mut _,
542                        set_once_quark.into_glib(),
543                    )
544                    .is_null()
545                    {
546                        panic!("AppSink callbacks can only be set once");
547                    }
548
549                    glib::gobject_ffi::g_object_set_qdata(
550                        sink as *mut _,
551                        set_once_quark.into_glib(),
552                        1 as *mut _,
553                    );
554                }
555            }
556
557            ffi::gst_app_sink_set_callbacks(
558                sink,
559                mut_override(&callbacks.callbacks),
560                Box::into_raw(Box::new(callbacks)) as *mut _,
561                Some(destroy_callbacks),
562            );
563        }
564    }
565
566    #[doc(alias = "drop-out-of-segment")]
567    pub fn drops_out_of_segment(&self) -> bool {
568        unsafe {
569            from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
570                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
571            ))
572        }
573    }
574
575    #[doc(alias = "max-bitrate")]
576    #[doc(alias = "gst_base_sink_get_max_bitrate")]
577    pub fn max_bitrate(&self) -> u64 {
578        unsafe {
579            gst_base::ffi::gst_base_sink_get_max_bitrate(
580                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
581            )
582        }
583    }
584
585    #[doc(alias = "max-lateness")]
586    #[doc(alias = "gst_base_sink_get_max_lateness")]
587    pub fn max_lateness(&self) -> i64 {
588        unsafe {
589            gst_base::ffi::gst_base_sink_get_max_lateness(
590                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
591            )
592        }
593    }
594
595    #[doc(alias = "processing-deadline")]
596    #[cfg(feature = "v1_16")]
597    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
598    #[doc(alias = "gst_base_sink_get_processing_deadline")]
599    pub fn processing_deadline(&self) -> gst::ClockTime {
600        unsafe {
601            try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
602                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
603            ))
604            .expect("undefined processing_deadline")
605        }
606    }
607
608    #[doc(alias = "render-delay")]
609    #[doc(alias = "gst_base_sink_get_render_delay")]
610    pub fn render_delay(&self) -> gst::ClockTime {
611        unsafe {
612            try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
613                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
614            ))
615            .expect("undefined render_delay")
616        }
617    }
618
619    #[cfg(feature = "v1_18")]
620    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
621    #[doc(alias = "gst_base_sink_get_stats")]
622    pub fn stats(&self) -> gst::Structure {
623        unsafe {
624            from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
625                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
626            ))
627        }
628    }
629
630    #[doc(alias = "sync")]
631    pub fn is_sync(&self) -> bool {
632        unsafe {
633            from_glib(gst_base::ffi::gst_base_sink_get_sync(
634                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
635            ))
636        }
637    }
638
639    #[doc(alias = "throttle-time")]
640    #[doc(alias = "gst_base_sink_get_throttle_time")]
641    pub fn throttle_time(&self) -> u64 {
642        unsafe {
643            gst_base::ffi::gst_base_sink_get_throttle_time(
644                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
645            )
646        }
647    }
648
649    #[doc(alias = "ts-offset")]
650    #[doc(alias = "gst_base_sink_get_ts_offset")]
651    pub fn ts_offset(&self) -> gst::ClockTimeDiff {
652        unsafe {
653            gst_base::ffi::gst_base_sink_get_ts_offset(
654                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
655            )
656        }
657    }
658
659    #[doc(alias = "async")]
660    #[doc(alias = "gst_base_sink_is_async_enabled")]
661    pub fn is_async(&self) -> bool {
662        unsafe {
663            from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
664                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
665            ))
666        }
667    }
668
669    #[doc(alias = "last-sample")]
670    pub fn enables_last_sample(&self) -> bool {
671        unsafe {
672            from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
673                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
674            ))
675        }
676    }
677
678    #[doc(alias = "qos")]
679    #[doc(alias = "gst_base_sink_is_qos_enabled")]
680    pub fn is_qos(&self) -> bool {
681        unsafe {
682            from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
683                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
684            ))
685        }
686    }
687
688    #[doc(alias = "async")]
689    #[doc(alias = "gst_base_sink_set_async_enabled")]
690    pub fn set_async(&self, enabled: bool) {
691        unsafe {
692            gst_base::ffi::gst_base_sink_set_async_enabled(
693                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
694                enabled.into_glib(),
695            );
696        }
697    }
698
699    #[doc(alias = "drop-out-of-segment")]
700    #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
701    pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
702        unsafe {
703            gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
704                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
705                drop_out_of_segment.into_glib(),
706            );
707        }
708    }
709
710    #[doc(alias = "last-sample")]
711    pub fn set_enable_last_sample(&self, enabled: bool) {
712        unsafe {
713            gst_base::ffi::gst_base_sink_set_last_sample_enabled(
714                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
715                enabled.into_glib(),
716            );
717        }
718    }
719
720    #[doc(alias = "max-bitrate")]
721    #[doc(alias = "gst_base_sink_set_max_bitrate")]
722    pub fn set_max_bitrate(&self, max_bitrate: u64) {
723        unsafe {
724            gst_base::ffi::gst_base_sink_set_max_bitrate(
725                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
726                max_bitrate,
727            );
728        }
729    }
730
731    #[doc(alias = "max-lateness")]
732    #[doc(alias = "gst_base_sink_set_max_lateness")]
733    pub fn set_max_lateness(&self, max_lateness: i64) {
734        unsafe {
735            gst_base::ffi::gst_base_sink_set_max_lateness(
736                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
737                max_lateness,
738            );
739        }
740    }
741
742    #[doc(alias = "processing-deadline")]
743    #[cfg(feature = "v1_16")]
744    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
745    #[doc(alias = "gst_base_sink_set_processing_deadline")]
746    pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
747        unsafe {
748            gst_base::ffi::gst_base_sink_set_processing_deadline(
749                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
750                processing_deadline.into_glib(),
751            );
752        }
753    }
754
755    #[doc(alias = "qos")]
756    #[doc(alias = "gst_base_sink_set_qos_enabled")]
757    pub fn set_qos(&self, enabled: bool) {
758        unsafe {
759            gst_base::ffi::gst_base_sink_set_qos_enabled(
760                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
761                enabled.into_glib(),
762            );
763        }
764    }
765
766    #[doc(alias = "render-delay")]
767    #[doc(alias = "gst_base_sink_set_render_delay")]
768    pub fn set_render_delay(&self, delay: gst::ClockTime) {
769        unsafe {
770            gst_base::ffi::gst_base_sink_set_render_delay(
771                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
772                delay.into_glib(),
773            );
774        }
775    }
776
777    #[doc(alias = "sync")]
778    #[doc(alias = "gst_base_sink_set_sync")]
779    pub fn set_sync(&self, sync: bool) {
780        unsafe {
781            gst_base::ffi::gst_base_sink_set_sync(
782                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
783                sync.into_glib(),
784            );
785        }
786    }
787
788    #[doc(alias = "throttle-time")]
789    #[doc(alias = "gst_base_sink_set_throttle_time")]
790    pub fn set_throttle_time(&self, throttle: u64) {
791        unsafe {
792            gst_base::ffi::gst_base_sink_set_throttle_time(
793                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
794                throttle,
795            );
796        }
797    }
798
799    #[doc(alias = "ts-offset")]
800    #[doc(alias = "gst_base_sink_set_ts_offset")]
801    pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
802        unsafe {
803            gst_base::ffi::gst_base_sink_set_ts_offset(
804                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
805                offset,
806            );
807        }
808    }
809
810    #[doc(alias = "async")]
811    pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
812        &self,
813        f: F,
814    ) -> glib::SignalHandlerId {
815        unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
816            this: *mut ffi::GstAppSink,
817            _param_spec: glib::ffi::gpointer,
818            f: glib::ffi::gpointer,
819        ) {
820            let f: &F = &*(f as *const F);
821            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
822        }
823        unsafe {
824            let f: Box<F> = Box::new(f);
825            glib::signal::connect_raw(
826                self.as_ptr() as *mut _,
827                b"notify::async\0".as_ptr() as *const _,
828                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
829                    notify_async_trampoline::<F> as *const (),
830                )),
831                Box::into_raw(f),
832            )
833        }
834    }
835
836    #[doc(alias = "blocksize")]
837    pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
838        &self,
839        f: F,
840    ) -> glib::SignalHandlerId {
841        unsafe extern "C" fn notify_blocksize_trampoline<
842            F: Fn(&AppSink) + Send + Sync + 'static,
843        >(
844            this: *mut ffi::GstAppSink,
845            _param_spec: glib::ffi::gpointer,
846            f: glib::ffi::gpointer,
847        ) {
848            let f: &F = &*(f as *const F);
849            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
850        }
851        unsafe {
852            let f: Box<F> = Box::new(f);
853            glib::signal::connect_raw(
854                self.as_ptr() as *mut _,
855                b"notify::blocksize\0".as_ptr() as *const _,
856                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
857                    notify_blocksize_trampoline::<F> as *const (),
858                )),
859                Box::into_raw(f),
860            )
861        }
862    }
863
864    #[doc(alias = "enable-last-sample")]
865    pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
866        &self,
867        f: F,
868    ) -> glib::SignalHandlerId {
869        unsafe extern "C" fn notify_enable_last_sample_trampoline<
870            F: Fn(&AppSink) + Send + Sync + 'static,
871        >(
872            this: *mut ffi::GstAppSink,
873            _param_spec: glib::ffi::gpointer,
874            f: glib::ffi::gpointer,
875        ) {
876            let f: &F = &*(f as *const F);
877            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
878        }
879        unsafe {
880            let f: Box<F> = Box::new(f);
881            glib::signal::connect_raw(
882                self.as_ptr() as *mut _,
883                b"notify::enable-last-sample\0".as_ptr() as *const _,
884                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
885                    notify_enable_last_sample_trampoline::<F> as *const (),
886                )),
887                Box::into_raw(f),
888            )
889        }
890    }
891
892    #[doc(alias = "last-sample")]
893    pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
894        &self,
895        f: F,
896    ) -> glib::SignalHandlerId {
897        unsafe extern "C" fn notify_last_sample_trampoline<
898            F: Fn(&AppSink) + Send + Sync + 'static,
899        >(
900            this: *mut ffi::GstAppSink,
901            _param_spec: glib::ffi::gpointer,
902            f: glib::ffi::gpointer,
903        ) {
904            let f: &F = &*(f as *const F);
905            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
906        }
907        unsafe {
908            let f: Box<F> = Box::new(f);
909            glib::signal::connect_raw(
910                self.as_ptr() as *mut _,
911                b"notify::last-sample\0".as_ptr() as *const _,
912                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
913                    notify_last_sample_trampoline::<F> as *const (),
914                )),
915                Box::into_raw(f),
916            )
917        }
918    }
919
920    #[doc(alias = "max-bitrate")]
921    pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
922        &self,
923        f: F,
924    ) -> glib::SignalHandlerId {
925        unsafe extern "C" fn notify_max_bitrate_trampoline<
926            F: Fn(&AppSink) + Send + Sync + 'static,
927        >(
928            this: *mut ffi::GstAppSink,
929            _param_spec: glib::ffi::gpointer,
930            f: glib::ffi::gpointer,
931        ) {
932            let f: &F = &*(f as *const F);
933            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
934        }
935        unsafe {
936            let f: Box<F> = Box::new(f);
937            glib::signal::connect_raw(
938                self.as_ptr() as *mut _,
939                b"notify::max-bitrate\0".as_ptr() as *const _,
940                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
941                    notify_max_bitrate_trampoline::<F> as *const (),
942                )),
943                Box::into_raw(f),
944            )
945        }
946    }
947
948    #[doc(alias = "max-lateness")]
949    pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
950        &self,
951        f: F,
952    ) -> glib::SignalHandlerId {
953        unsafe extern "C" fn notify_max_lateness_trampoline<
954            F: Fn(&AppSink) + Send + Sync + 'static,
955        >(
956            this: *mut ffi::GstAppSink,
957            _param_spec: glib::ffi::gpointer,
958            f: glib::ffi::gpointer,
959        ) {
960            let f: &F = &*(f as *const F);
961            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
962        }
963        unsafe {
964            let f: Box<F> = Box::new(f);
965            glib::signal::connect_raw(
966                self.as_ptr() as *mut _,
967                b"notify::max-lateness\0".as_ptr() as *const _,
968                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
969                    notify_max_lateness_trampoline::<F> as *const (),
970                )),
971                Box::into_raw(f),
972            )
973        }
974    }
975
976    #[cfg(feature = "v1_16")]
977    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
978    #[doc(alias = "processing-deadline")]
979    pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
980        &self,
981        f: F,
982    ) -> glib::SignalHandlerId {
983        unsafe extern "C" fn notify_processing_deadline_trampoline<
984            F: Fn(&AppSink) + Send + Sync + 'static,
985        >(
986            this: *mut ffi::GstAppSink,
987            _param_spec: glib::ffi::gpointer,
988            f: glib::ffi::gpointer,
989        ) {
990            let f: &F = &*(f as *const F);
991            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
992        }
993        unsafe {
994            let f: Box<F> = Box::new(f);
995            glib::signal::connect_raw(
996                self.as_ptr() as *mut _,
997                b"notify::processing-deadline\0".as_ptr() as *const _,
998                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
999                    notify_processing_deadline_trampoline::<F> as *const (),
1000                )),
1001                Box::into_raw(f),
1002            )
1003        }
1004    }
1005
1006    #[doc(alias = "qos")]
1007    pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1008        &self,
1009        f: F,
1010    ) -> glib::SignalHandlerId {
1011        unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1012            this: *mut ffi::GstAppSink,
1013            _param_spec: glib::ffi::gpointer,
1014            f: glib::ffi::gpointer,
1015        ) {
1016            let f: &F = &*(f as *const F);
1017            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1018        }
1019        unsafe {
1020            let f: Box<F> = Box::new(f);
1021            glib::signal::connect_raw(
1022                self.as_ptr() as *mut _,
1023                b"notify::qos\0".as_ptr() as *const _,
1024                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1025                    notify_qos_trampoline::<F> as *const (),
1026                )),
1027                Box::into_raw(f),
1028            )
1029        }
1030    }
1031
1032    #[doc(alias = "render-delay")]
1033    pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1034        &self,
1035        f: F,
1036    ) -> glib::SignalHandlerId {
1037        unsafe extern "C" fn notify_render_delay_trampoline<
1038            F: Fn(&AppSink) + Send + Sync + 'static,
1039        >(
1040            this: *mut ffi::GstAppSink,
1041            _param_spec: glib::ffi::gpointer,
1042            f: glib::ffi::gpointer,
1043        ) {
1044            let f: &F = &*(f as *const F);
1045            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1046        }
1047        unsafe {
1048            let f: Box<F> = Box::new(f);
1049            glib::signal::connect_raw(
1050                self.as_ptr() as *mut _,
1051                b"notify::render-delay\0".as_ptr() as *const _,
1052                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1053                    notify_render_delay_trampoline::<F> as *const (),
1054                )),
1055                Box::into_raw(f),
1056            )
1057        }
1058    }
1059
1060    #[cfg(feature = "v1_18")]
1061    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1062    #[doc(alias = "stats")]
1063    pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1064        &self,
1065        f: F,
1066    ) -> glib::SignalHandlerId {
1067        unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1068            this: *mut ffi::GstAppSink,
1069            _param_spec: glib::ffi::gpointer,
1070            f: glib::ffi::gpointer,
1071        ) {
1072            let f: &F = &*(f as *const F);
1073            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1074        }
1075        unsafe {
1076            let f: Box<F> = Box::new(f);
1077            glib::signal::connect_raw(
1078                self.as_ptr() as *mut _,
1079                b"notify::stats\0".as_ptr() as *const _,
1080                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1081                    notify_stats_trampoline::<F> as *const (),
1082                )),
1083                Box::into_raw(f),
1084            )
1085        }
1086    }
1087
1088    #[doc(alias = "sync")]
1089    pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1090        &self,
1091        f: F,
1092    ) -> glib::SignalHandlerId {
1093        unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1094            this: *mut ffi::GstAppSink,
1095            _param_spec: glib::ffi::gpointer,
1096            f: glib::ffi::gpointer,
1097        ) {
1098            let f: &F = &*(f as *const F);
1099            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1100        }
1101        unsafe {
1102            let f: Box<F> = Box::new(f);
1103            glib::signal::connect_raw(
1104                self.as_ptr() as *mut _,
1105                b"notify::sync\0".as_ptr() as *const _,
1106                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1107                    notify_sync_trampoline::<F> as *const (),
1108                )),
1109                Box::into_raw(f),
1110            )
1111        }
1112    }
1113
1114    #[doc(alias = "throttle-time")]
1115    pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1116        &self,
1117        f: F,
1118    ) -> glib::SignalHandlerId {
1119        unsafe extern "C" fn notify_throttle_time_trampoline<
1120            F: Fn(&AppSink) + Send + Sync + 'static,
1121        >(
1122            this: *mut ffi::GstAppSink,
1123            _param_spec: glib::ffi::gpointer,
1124            f: glib::ffi::gpointer,
1125        ) {
1126            let f: &F = &*(f as *const F);
1127            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1128        }
1129        unsafe {
1130            let f: Box<F> = Box::new(f);
1131            glib::signal::connect_raw(
1132                self.as_ptr() as *mut _,
1133                b"notify::throttle-time\0".as_ptr() as *const _,
1134                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1135                    notify_throttle_time_trampoline::<F> as *const (),
1136                )),
1137                Box::into_raw(f),
1138            )
1139        }
1140    }
1141
1142    #[doc(alias = "ts-offset")]
1143    pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1144        &self,
1145        f: F,
1146    ) -> glib::SignalHandlerId {
1147        unsafe extern "C" fn notify_ts_offset_trampoline<
1148            F: Fn(&AppSink) + Send + Sync + 'static,
1149        >(
1150            this: *mut ffi::GstAppSink,
1151            _param_spec: glib::ffi::gpointer,
1152            f: glib::ffi::gpointer,
1153        ) {
1154            let f: &F = &*(f as *const F);
1155            f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1156        }
1157        unsafe {
1158            let f: Box<F> = Box::new(f);
1159            glib::signal::connect_raw(
1160                self.as_ptr() as *mut _,
1161                b"notify::ts-offset\0".as_ptr() as *const _,
1162                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1163                    notify_ts_offset_trampoline::<F> as *const (),
1164                )),
1165                Box::into_raw(f),
1166            )
1167        }
1168    }
1169
1170    pub fn stream(&self) -> AppSinkStream {
1171        AppSinkStream::new(self)
1172    }
1173}
1174
1175#[must_use = "The builder must be built to be used"]
1180pub struct AppSinkBuilder<'a> {
1181    builder: gst::gobject::GObjectBuilder<'a, AppSink>,
1182    callbacks: Option<AppSinkCallbacks>,
1183    drop_out_of_segment: Option<bool>,
1184}
1185
1186impl<'a> AppSinkBuilder<'a> {
1187    #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1195    pub fn build(self) -> AppSink {
1196        let appsink = self.builder.build().unwrap();
1197
1198        if let Some(callbacks) = self.callbacks {
1199            appsink.set_callbacks(callbacks);
1200        }
1201
1202        if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1203            appsink.set_drop_out_of_segment(drop_out_of_segment);
1204        }
1205
1206        appsink
1207    }
1208
1209    pub fn async_(self, async_: bool) -> Self {
1210        Self {
1211            builder: self.builder.property("async", async_),
1212            ..self
1213        }
1214    }
1215
1216    pub fn buffer_list(self, buffer_list: bool) -> Self {
1217        Self {
1218            builder: self.builder.property("buffer-list", buffer_list),
1219            ..self
1220        }
1221    }
1222
1223    pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1224        Self {
1225            callbacks: Some(callbacks),
1226            ..self
1227        }
1228    }
1229
1230    pub fn caps(self, caps: &'a gst::Caps) -> Self {
1231        Self {
1232            builder: self.builder.property("caps", caps),
1233            ..self
1234        }
1235    }
1236
1237    #[cfg_attr(feature = "v1_28", deprecated = "Since 1.28")]
1238    #[allow(deprecated)]
1239    pub fn drop(self, drop: bool) -> Self {
1240        Self {
1241            builder: self.builder.property("drop", drop),
1242            ..self
1243        }
1244    }
1245
1246    pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1247        Self {
1248            builder: self
1249                .builder
1250                .property("drop-out-of-segment", drop_out_of_segment),
1251            ..self
1252        }
1253    }
1254
1255    pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1256        Self {
1257            builder: self
1258                .builder
1259                .property("enable-last-sample", enable_last_sample),
1260            ..self
1261        }
1262    }
1263
1264    pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1265        Self {
1266            builder: self.builder.property("max-bitrate", max_bitrate),
1267            ..self
1268        }
1269    }
1270
1271    pub fn max_buffers(self, max_buffers: u32) -> Self {
1272        Self {
1273            builder: self.builder.property("max-buffers", max_buffers),
1274            ..self
1275        }
1276    }
1277
1278    pub fn max_lateness(self, max_lateness: i64) -> Self {
1279        Self {
1280            builder: self.builder.property("max-lateness", max_lateness),
1281            ..self
1282        }
1283    }
1284
1285    #[cfg(feature = "v1_16")]
1286    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1287    pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1288        Self {
1289            builder: self
1290                .builder
1291                .property("processing-deadline", processing_deadline),
1292            ..self
1293        }
1294    }
1295
1296    pub fn qos(self, qos: bool) -> Self {
1297        Self {
1298            builder: self.builder.property("qos", qos),
1299            ..self
1300        }
1301    }
1302
1303    pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1304        Self {
1305            builder: self.builder.property("render-delay", render_delay),
1306            ..self
1307        }
1308    }
1309
1310    pub fn sync(self, sync: bool) -> Self {
1311        Self {
1312            builder: self.builder.property("sync", sync),
1313            ..self
1314        }
1315    }
1316
1317    pub fn throttle_time(self, throttle_time: u64) -> Self {
1318        Self {
1319            builder: self.builder.property("throttle-time", throttle_time),
1320            ..self
1321        }
1322    }
1323
1324    pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1325        Self {
1326            builder: self.builder.property("ts-offset", ts_offset),
1327            ..self
1328        }
1329    }
1330
1331    pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1332        Self {
1333            builder: self.builder.property("wait-on-eos", wait_on_eos),
1334            ..self
1335        }
1336    }
1337
1338    #[cfg(feature = "v1_24")]
1339    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1340    pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1341        Self {
1342            builder: self.builder.property("max-time", max_time),
1343            ..self
1344        }
1345    }
1346
1347    #[cfg(feature = "v1_24")]
1348    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1349    pub fn max_bytes(self, max_bytes: u64) -> Self {
1350        Self {
1351            builder: self.builder.property("max-bytes", max_bytes),
1352            ..self
1353        }
1354    }
1355
1356    #[cfg(feature = "v1_28")]
1357    #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1358    pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
1359        Self {
1360            builder: self.builder.property("leaky-type", leaky_type),
1361            ..self
1362        }
1363    }
1364
1365    #[cfg(feature = "v1_28")]
1366    #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1367    pub fn silent(self, silent: bool) -> Self {
1368        Self {
1369            builder: self.builder.property("silent", silent),
1370            ..self
1371        }
1372    }
1373
1374    #[inline]
1379    pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
1380        Self {
1381            builder: self.builder.property(name, value),
1382            ..self
1383        }
1384    }
1385
1386    #[inline]
1389    pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
1390        Self {
1391            builder: self.builder.property_from_str(name, value),
1392            ..self
1393        }
1394    }
1395
1396    gst::impl_builder_gvalue_extra_setters!(property_and_name);
1397}
1398
1399#[derive(Debug)]
1400pub struct AppSinkStream {
1401    app_sink: glib::WeakRef<AppSink>,
1402    waker_reference: Arc<Mutex<Option<Waker>>>,
1403}
1404
1405impl AppSinkStream {
1406    fn new(app_sink: &AppSink) -> Self {
1407        skip_assert_initialized!();
1408
1409        let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1410
1411        app_sink.set_callbacks(
1412            AppSinkCallbacks::builder()
1413                .new_sample({
1414                    let waker_reference = Arc::clone(&waker_reference);
1415
1416                    move |_| {
1417                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1418                            waker.wake();
1419                        }
1420
1421                        Ok(gst::FlowSuccess::Ok)
1422                    }
1423                })
1424                .eos({
1425                    let waker_reference = Arc::clone(&waker_reference);
1426
1427                    move |_| {
1428                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1429                            waker.wake();
1430                        }
1431                    }
1432                })
1433                .build(),
1434        );
1435
1436        Self {
1437            app_sink: app_sink.downgrade(),
1438            waker_reference,
1439        }
1440    }
1441}
1442
1443impl Drop for AppSinkStream {
1444    fn drop(&mut self) {
1445        #[cfg(not(feature = "v1_18"))]
1446        {
1447            if gst::version() >= (1, 16, 3, 0) {
1450                if let Some(app_sink) = self.app_sink.upgrade() {
1451                    app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1452                }
1453            }
1454        }
1455    }
1456}
1457
1458impl Stream for AppSinkStream {
1459    type Item = gst::Sample;
1460
1461    fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1462        let mut waker = self.waker_reference.lock().unwrap();
1463
1464        let Some(app_sink) = self.app_sink.upgrade() else {
1465            return Poll::Ready(None);
1466        };
1467
1468        app_sink
1469            .try_pull_sample(gst::ClockTime::ZERO)
1470            .map(|sample| Poll::Ready(Some(sample)))
1471            .unwrap_or_else(|| {
1472                if app_sink.is_eos() {
1473                    return Poll::Ready(None);
1474                }
1475
1476                waker.replace(context.waker().to_owned());
1477
1478                Poll::Pending
1479            })
1480    }
1481}
1482
1483#[cfg(test)]
1484mod tests {
1485    use futures_util::StreamExt;
1486    use gst::prelude::*;
1487
1488    use super::*;
1489
1490    #[test]
1491    fn test_app_sink_stream() {
1492        gst::init().unwrap();
1493
1494        let videotestsrc = gst::ElementFactory::make("videotestsrc")
1495            .property("num-buffers", 5)
1496            .build()
1497            .unwrap();
1498        let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1499
1500        let pipeline = gst::Pipeline::new();
1501        pipeline.add(&videotestsrc).unwrap();
1502        pipeline.add(&appsink).unwrap();
1503
1504        videotestsrc.link(&appsink).unwrap();
1505
1506        let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1507        let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1508
1509        pipeline.set_state(gst::State::Playing).unwrap();
1510        let samples = futures_executor::block_on(samples_future);
1511        pipeline.set_state(gst::State::Null).unwrap();
1512
1513        assert_eq!(samples.len(), 5);
1514    }
1515}