Skip to main content

gstreamer_app/
app_sink.rs

1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4    mem, panic,
5    pin::Pin,
6    ptr,
7    sync::{Arc, Mutex},
8    task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{AppSink, ffi};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22    new_preroll: Option<
23        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24    >,
25    new_sample: Option<
26        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27    >,
28    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29    propose_allocation:
30        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31    #[cfg(not(panic = "abort"))]
32    panicked: AtomicBool,
33    callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40    pub fn builder() -> AppSinkCallbacksBuilder {
41        skip_assert_initialized!();
42        AppSinkCallbacksBuilder {
43            eos: None,
44            new_preroll: None,
45            new_sample: None,
46            new_event: None,
47            propose_allocation: None,
48        }
49    }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55    eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56    new_preroll: Option<
57        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58    >,
59    new_sample: Option<
60        Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61    >,
62    new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63    propose_allocation:
64        Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68    pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69        Self {
70            eos: Some(Box::new(eos)),
71            ..self
72        }
73    }
74
75    pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76        if predicate { self.eos(eos) } else { self }
77    }
78
79    pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
80        if let Some(eos) = eos {
81            self.eos(eos)
82        } else {
83            self
84        }
85    }
86
87    pub fn new_preroll<
88        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
89    >(
90        self,
91        new_preroll: F,
92    ) -> Self {
93        Self {
94            new_preroll: Some(Box::new(new_preroll)),
95            ..self
96        }
97    }
98
99    pub fn new_preroll_if<
100        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
101    >(
102        self,
103        new_preroll: F,
104        predicate: bool,
105    ) -> Self {
106        if predicate {
107            self.new_preroll(new_preroll)
108        } else {
109            self
110        }
111    }
112
113    pub fn new_preroll_if_some<
114        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
115    >(
116        self,
117        new_preroll: Option<F>,
118    ) -> Self {
119        if let Some(new_preroll) = new_preroll {
120            self.new_preroll(new_preroll)
121        } else {
122            self
123        }
124    }
125
126    pub fn new_sample<
127        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
128    >(
129        self,
130        new_sample: F,
131    ) -> Self {
132        Self {
133            new_sample: Some(Box::new(new_sample)),
134            ..self
135        }
136    }
137
138    pub fn new_sample_if<
139        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
140    >(
141        self,
142        new_sample: F,
143        predicate: bool,
144    ) -> Self {
145        if predicate {
146            self.new_sample(new_sample)
147        } else {
148            self
149        }
150    }
151
152    pub fn new_sample_if_some<
153        F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
154    >(
155        self,
156        new_sample: Option<F>,
157    ) -> Self {
158        if let Some(new_sample) = new_sample {
159            self.new_sample(new_sample)
160        } else {
161            self
162        }
163    }
164
165    #[cfg(feature = "v1_20")]
166    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
167    pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
168        Self {
169            new_event: Some(Box::new(new_event)),
170            ..self
171        }
172    }
173
174    #[cfg(feature = "v1_20")]
175    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
176    pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
177        self,
178        new_event: F,
179        predicate: bool,
180    ) -> Self {
181        if predicate {
182            self.new_event(new_event)
183        } else {
184            self
185        }
186    }
187
188    #[cfg(feature = "v1_20")]
189    #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
190    pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
191        self,
192        new_event: Option<F>,
193    ) -> Self {
194        if let Some(new_event) = new_event {
195            self.new_event(new_event)
196        } else {
197            self
198        }
199    }
200
201    #[cfg(feature = "v1_24")]
202    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
203    pub fn propose_allocation<
204        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
205    >(
206        self,
207        propose_allocation: F,
208    ) -> Self {
209        Self {
210            propose_allocation: Some(Box::new(propose_allocation)),
211            ..self
212        }
213    }
214
215    #[cfg(feature = "v1_24")]
216    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
217    pub fn propose_allocation_if<
218        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
219    >(
220        self,
221        propose_allocation: F,
222        predicate: bool,
223    ) -> Self {
224        if predicate {
225            self.propose_allocation(propose_allocation)
226        } else {
227            self
228        }
229    }
230
231    #[cfg(feature = "v1_24")]
232    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
233    pub fn propose_allocation_if_some<
234        F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
235    >(
236        self,
237        propose_allocation: Option<F>,
238    ) -> Self {
239        if let Some(propose_allocation) = propose_allocation {
240            self.propose_allocation(propose_allocation)
241        } else {
242            self
243        }
244    }
245
246    #[must_use = "Building the callbacks without using them has no effect"]
247    pub fn build(self) -> AppSinkCallbacks {
248        let have_eos = self.eos.is_some();
249        let have_new_preroll = self.new_preroll.is_some();
250        let have_new_sample = self.new_sample.is_some();
251        let have_new_event = self.new_event.is_some();
252        let have_propose_allocation = self.propose_allocation.is_some();
253
254        AppSinkCallbacks {
255            eos: self.eos,
256            new_preroll: self.new_preroll,
257            new_sample: self.new_sample,
258            new_event: self.new_event,
259            propose_allocation: self.propose_allocation,
260            #[cfg(not(panic = "abort"))]
261            panicked: AtomicBool::new(false),
262            callbacks: ffi::GstAppSinkCallbacks {
263                eos: if have_eos { Some(trampoline_eos) } else { None },
264                new_preroll: if have_new_preroll {
265                    Some(trampoline_new_preroll)
266                } else {
267                    None
268                },
269                new_sample: if have_new_sample {
270                    Some(trampoline_new_sample)
271                } else {
272                    None
273                },
274                new_event: if have_new_event {
275                    Some(trampoline_new_event)
276                } else {
277                    None
278                },
279                propose_allocation: if have_propose_allocation {
280                    Some(trampoline_propose_allocation)
281                } else {
282                    None
283                },
284                _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
285            },
286        }
287    }
288}
289
290unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
291    unsafe {
292        let callbacks = callbacks as *mut AppSinkCallbacks;
293        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
294
295        #[cfg(not(panic = "abort"))]
296        if (*callbacks).panicked.load(Ordering::Relaxed) {
297            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
298            gst::subclass::post_panic_error_message(
299                element.upcast_ref(),
300                element.upcast_ref(),
301                None,
302            );
303            return;
304        }
305
306        if let Some(ref mut eos) = (*callbacks).eos {
307            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
308            match result {
309                Ok(result) => result,
310                Err(err) => {
311                    #[cfg(panic = "abort")]
312                    {
313                        unreachable!("{err:?}");
314                    }
315                    #[cfg(not(panic = "abort"))]
316                    {
317                        (*callbacks).panicked.store(true, Ordering::Relaxed);
318                        gst::subclass::post_panic_error_message(
319                            element.upcast_ref(),
320                            element.upcast_ref(),
321                            Some(err),
322                        );
323                    }
324                }
325            }
326        }
327    }
328}
329
330unsafe extern "C" fn trampoline_new_preroll(
331    appsink: *mut ffi::GstAppSink,
332    callbacks: gpointer,
333) -> gst::ffi::GstFlowReturn {
334    unsafe {
335        let callbacks = callbacks as *mut AppSinkCallbacks;
336        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
337
338        #[cfg(not(panic = "abort"))]
339        if (*callbacks).panicked.load(Ordering::Relaxed) {
340            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
341            gst::subclass::post_panic_error_message(
342                element.upcast_ref(),
343                element.upcast_ref(),
344                None,
345            );
346            return gst::FlowReturn::Error.into_glib();
347        }
348
349        let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
350            let result =
351                panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
352            match result {
353                Ok(result) => result,
354                Err(err) => {
355                    #[cfg(panic = "abort")]
356                    {
357                        unreachable!("{err:?}");
358                    }
359                    #[cfg(not(panic = "abort"))]
360                    {
361                        (*callbacks).panicked.store(true, Ordering::Relaxed);
362                        gst::subclass::post_panic_error_message(
363                            element.upcast_ref(),
364                            element.upcast_ref(),
365                            Some(err),
366                        );
367
368                        gst::FlowReturn::Error
369                    }
370                }
371            }
372        } else {
373            gst::FlowReturn::Error
374        };
375
376        ret.into_glib()
377    }
378}
379
380unsafe extern "C" fn trampoline_new_sample(
381    appsink: *mut ffi::GstAppSink,
382    callbacks: gpointer,
383) -> gst::ffi::GstFlowReturn {
384    unsafe {
385        let callbacks = callbacks as *mut AppSinkCallbacks;
386        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
387
388        #[cfg(not(panic = "abort"))]
389        if (*callbacks).panicked.load(Ordering::Relaxed) {
390            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
391            gst::subclass::post_panic_error_message(
392                element.upcast_ref(),
393                element.upcast_ref(),
394                None,
395            );
396            return gst::FlowReturn::Error.into_glib();
397        }
398
399        let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
400            let result =
401                panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
402            match result {
403                Ok(result) => result,
404                Err(err) => {
405                    #[cfg(panic = "abort")]
406                    {
407                        unreachable!("{err:?}");
408                    }
409                    #[cfg(not(panic = "abort"))]
410                    {
411                        (*callbacks).panicked.store(true, Ordering::Relaxed);
412                        gst::subclass::post_panic_error_message(
413                            element.upcast_ref(),
414                            element.upcast_ref(),
415                            Some(err),
416                        );
417
418                        gst::FlowReturn::Error
419                    }
420                }
421            }
422        } else {
423            gst::FlowReturn::Error
424        };
425
426        ret.into_glib()
427    }
428}
429
430unsafe extern "C" fn trampoline_new_event(
431    appsink: *mut ffi::GstAppSink,
432    callbacks: gpointer,
433) -> glib::ffi::gboolean {
434    unsafe {
435        let callbacks = callbacks as *mut AppSinkCallbacks;
436        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
437
438        #[cfg(not(panic = "abort"))]
439        if (*callbacks).panicked.load(Ordering::Relaxed) {
440            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
441            gst::subclass::post_panic_error_message(
442                element.upcast_ref(),
443                element.upcast_ref(),
444                None,
445            );
446            return false.into_glib();
447        }
448
449        let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
450            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
451            match result {
452                Ok(result) => result,
453                Err(err) => {
454                    #[cfg(panic = "abort")]
455                    {
456                        unreachable!("{err:?}");
457                    }
458                    #[cfg(not(panic = "abort"))]
459                    {
460                        (*callbacks).panicked.store(true, Ordering::Relaxed);
461                        gst::subclass::post_panic_error_message(
462                            element.upcast_ref(),
463                            element.upcast_ref(),
464                            Some(err),
465                        );
466
467                        false
468                    }
469                }
470            }
471        } else {
472            false
473        };
474
475        ret.into_glib()
476    }
477}
478
479unsafe extern "C" fn trampoline_propose_allocation(
480    appsink: *mut ffi::GstAppSink,
481    query: *mut gst::ffi::GstQuery,
482    callbacks: gpointer,
483) -> glib::ffi::gboolean {
484    unsafe {
485        let callbacks = callbacks as *mut AppSinkCallbacks;
486        let element: Borrowed<AppSink> = from_glib_borrow(appsink);
487
488        #[cfg(not(panic = "abort"))]
489        if (*callbacks).panicked.load(Ordering::Relaxed) {
490            let element: Borrowed<AppSink> = from_glib_borrow(appsink);
491            gst::subclass::post_panic_error_message(
492                element.upcast_ref(),
493                element.upcast_ref(),
494                None,
495            );
496            return false.into_glib();
497        }
498
499        let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
500            let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
501                gst::QueryViewMut::Allocation(allocation) => allocation,
502                _ => unreachable!(),
503            };
504            let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
505                propose_allocation(&element, query)
506            }));
507            match result {
508                Ok(result) => result,
509                Err(err) => {
510                    #[cfg(panic = "abort")]
511                    {
512                        unreachable!("{err:?}");
513                    }
514                    #[cfg(not(panic = "abort"))]
515                    {
516                        (*callbacks).panicked.store(true, Ordering::Relaxed);
517                        gst::subclass::post_panic_error_message(
518                            element.upcast_ref(),
519                            element.upcast_ref(),
520                            Some(err),
521                        );
522                        false
523                    }
524                }
525            }
526        } else {
527            false
528        };
529
530        ret.into_glib()
531    }
532}
533
534unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
535    unsafe {
536        let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
537    }
538}
539
540impl AppSink {
541    // rustdoc-stripper-ignore-next
542    /// Creates a new builder-pattern struct instance to construct [`AppSink`] objects.
543    ///
544    /// This method returns an instance of [`AppSinkBuilder`](crate::builders::AppSinkBuilder) which can be used to create [`AppSink`] objects.
545    pub fn builder<'a>() -> AppSinkBuilder<'a> {
546        assert_initialized_main_thread!();
547        AppSinkBuilder {
548            builder: gst::Object::builder(),
549            callbacks: None,
550            drop_out_of_segment: None,
551        }
552    }
553
554    #[doc(alias = "gst_app_sink_set_callbacks")]
555    pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
556        unsafe {
557            let sink = self.to_glib_none().0;
558
559            #[allow(clippy::manual_dangling_ptr)]
560            #[cfg(not(feature = "v1_18"))]
561            {
562                static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
563                    std::sync::OnceLock::new();
564
565                let set_once_quark = SET_ONCE_QUARK
566                    .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
567
568                // This is not thread-safe before 1.16.3, see
569                // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
570                if gst::version() < (1, 16, 3, 0) {
571                    if !glib::gobject_ffi::g_object_get_qdata(
572                        sink as *mut _,
573                        set_once_quark.into_glib(),
574                    )
575                    .is_null()
576                    {
577                        panic!("AppSink callbacks can only be set once");
578                    }
579
580                    glib::gobject_ffi::g_object_set_qdata(
581                        sink as *mut _,
582                        set_once_quark.into_glib(),
583                        1 as *mut _,
584                    );
585                }
586            }
587
588            ffi::gst_app_sink_set_callbacks(
589                sink,
590                mut_override(&callbacks.callbacks),
591                Box::into_raw(Box::new(callbacks)) as *mut _,
592                Some(destroy_callbacks),
593            );
594        }
595    }
596
597    #[doc(alias = "drop-out-of-segment")]
598    pub fn drops_out_of_segment(&self) -> bool {
599        unsafe {
600            from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
601                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
602            ))
603        }
604    }
605
606    #[doc(alias = "max-bitrate")]
607    #[doc(alias = "gst_base_sink_get_max_bitrate")]
608    pub fn max_bitrate(&self) -> u64 {
609        unsafe {
610            gst_base::ffi::gst_base_sink_get_max_bitrate(
611                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
612            )
613        }
614    }
615
616    #[doc(alias = "max-lateness")]
617    #[doc(alias = "gst_base_sink_get_max_lateness")]
618    pub fn max_lateness(&self) -> i64 {
619        unsafe {
620            gst_base::ffi::gst_base_sink_get_max_lateness(
621                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
622            )
623        }
624    }
625
626    #[doc(alias = "processing-deadline")]
627    #[cfg(feature = "v1_16")]
628    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
629    #[doc(alias = "gst_base_sink_get_processing_deadline")]
630    pub fn processing_deadline(&self) -> gst::ClockTime {
631        unsafe {
632            try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
633                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
634            ))
635            .expect("undefined processing_deadline")
636        }
637    }
638
639    #[doc(alias = "render-delay")]
640    #[doc(alias = "gst_base_sink_get_render_delay")]
641    pub fn render_delay(&self) -> gst::ClockTime {
642        unsafe {
643            try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
644                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
645            ))
646            .expect("undefined render_delay")
647        }
648    }
649
650    #[cfg(feature = "v1_18")]
651    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
652    #[doc(alias = "gst_base_sink_get_stats")]
653    pub fn stats(&self) -> gst::Structure {
654        unsafe {
655            from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
656                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
657            ))
658        }
659    }
660
661    #[doc(alias = "sync")]
662    pub fn is_sync(&self) -> bool {
663        unsafe {
664            from_glib(gst_base::ffi::gst_base_sink_get_sync(
665                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
666            ))
667        }
668    }
669
670    #[doc(alias = "throttle-time")]
671    #[doc(alias = "gst_base_sink_get_throttle_time")]
672    pub fn throttle_time(&self) -> u64 {
673        unsafe {
674            gst_base::ffi::gst_base_sink_get_throttle_time(
675                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
676            )
677        }
678    }
679
680    #[doc(alias = "ts-offset")]
681    #[doc(alias = "gst_base_sink_get_ts_offset")]
682    pub fn ts_offset(&self) -> gst::ClockTimeDiff {
683        unsafe {
684            gst_base::ffi::gst_base_sink_get_ts_offset(
685                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
686            )
687        }
688    }
689
690    #[doc(alias = "async")]
691    #[doc(alias = "gst_base_sink_is_async_enabled")]
692    pub fn is_async(&self) -> bool {
693        unsafe {
694            from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
695                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
696            ))
697        }
698    }
699
700    #[doc(alias = "last-sample")]
701    pub fn enables_last_sample(&self) -> bool {
702        unsafe {
703            from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
704                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
705            ))
706        }
707    }
708
709    #[doc(alias = "qos")]
710    #[doc(alias = "gst_base_sink_is_qos_enabled")]
711    pub fn is_qos(&self) -> bool {
712        unsafe {
713            from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
714                self.as_ptr() as *mut gst_base::ffi::GstBaseSink
715            ))
716        }
717    }
718
719    #[doc(alias = "async")]
720    #[doc(alias = "gst_base_sink_set_async_enabled")]
721    pub fn set_async(&self, enabled: bool) {
722        unsafe {
723            gst_base::ffi::gst_base_sink_set_async_enabled(
724                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
725                enabled.into_glib(),
726            );
727        }
728    }
729
730    #[doc(alias = "drop-out-of-segment")]
731    #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
732    pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
733        unsafe {
734            gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
735                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
736                drop_out_of_segment.into_glib(),
737            );
738        }
739    }
740
741    #[doc(alias = "last-sample")]
742    pub fn set_enable_last_sample(&self, enabled: bool) {
743        unsafe {
744            gst_base::ffi::gst_base_sink_set_last_sample_enabled(
745                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
746                enabled.into_glib(),
747            );
748        }
749    }
750
751    #[doc(alias = "max-bitrate")]
752    #[doc(alias = "gst_base_sink_set_max_bitrate")]
753    pub fn set_max_bitrate(&self, max_bitrate: u64) {
754        unsafe {
755            gst_base::ffi::gst_base_sink_set_max_bitrate(
756                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
757                max_bitrate,
758            );
759        }
760    }
761
762    #[doc(alias = "max-lateness")]
763    #[doc(alias = "gst_base_sink_set_max_lateness")]
764    pub fn set_max_lateness(&self, max_lateness: i64) {
765        unsafe {
766            gst_base::ffi::gst_base_sink_set_max_lateness(
767                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
768                max_lateness,
769            );
770        }
771    }
772
773    #[doc(alias = "processing-deadline")]
774    #[cfg(feature = "v1_16")]
775    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
776    #[doc(alias = "gst_base_sink_set_processing_deadline")]
777    pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
778        unsafe {
779            gst_base::ffi::gst_base_sink_set_processing_deadline(
780                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
781                processing_deadline.into_glib(),
782            );
783        }
784    }
785
786    #[doc(alias = "qos")]
787    #[doc(alias = "gst_base_sink_set_qos_enabled")]
788    pub fn set_qos(&self, enabled: bool) {
789        unsafe {
790            gst_base::ffi::gst_base_sink_set_qos_enabled(
791                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
792                enabled.into_glib(),
793            );
794        }
795    }
796
797    #[doc(alias = "render-delay")]
798    #[doc(alias = "gst_base_sink_set_render_delay")]
799    pub fn set_render_delay(&self, delay: gst::ClockTime) {
800        unsafe {
801            gst_base::ffi::gst_base_sink_set_render_delay(
802                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
803                delay.into_glib(),
804            );
805        }
806    }
807
808    #[doc(alias = "sync")]
809    #[doc(alias = "gst_base_sink_set_sync")]
810    pub fn set_sync(&self, sync: bool) {
811        unsafe {
812            gst_base::ffi::gst_base_sink_set_sync(
813                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
814                sync.into_glib(),
815            );
816        }
817    }
818
819    #[doc(alias = "throttle-time")]
820    #[doc(alias = "gst_base_sink_set_throttle_time")]
821    pub fn set_throttle_time(&self, throttle: u64) {
822        unsafe {
823            gst_base::ffi::gst_base_sink_set_throttle_time(
824                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
825                throttle,
826            );
827        }
828    }
829
830    #[doc(alias = "ts-offset")]
831    #[doc(alias = "gst_base_sink_set_ts_offset")]
832    pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
833        unsafe {
834            gst_base::ffi::gst_base_sink_set_ts_offset(
835                self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
836                offset,
837            );
838        }
839    }
840
841    #[doc(alias = "async")]
842    pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
843        &self,
844        f: F,
845    ) -> glib::SignalHandlerId {
846        unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
847            this: *mut ffi::GstAppSink,
848            _param_spec: glib::ffi::gpointer,
849            f: glib::ffi::gpointer,
850        ) {
851            unsafe {
852                let f: &F = &*(f as *const F);
853                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
854            }
855        }
856        unsafe {
857            let f: Box<F> = Box::new(f);
858            glib::signal::connect_raw(
859                self.as_ptr() as *mut _,
860                b"notify::async\0".as_ptr() as *const _,
861                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
862                    notify_async_trampoline::<F> as *const (),
863                )),
864                Box::into_raw(f),
865            )
866        }
867    }
868
869    #[doc(alias = "blocksize")]
870    pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
871        &self,
872        f: F,
873    ) -> glib::SignalHandlerId {
874        unsafe extern "C" fn notify_blocksize_trampoline<
875            F: Fn(&AppSink) + Send + Sync + 'static,
876        >(
877            this: *mut ffi::GstAppSink,
878            _param_spec: glib::ffi::gpointer,
879            f: glib::ffi::gpointer,
880        ) {
881            unsafe {
882                let f: &F = &*(f as *const F);
883                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
884            }
885        }
886        unsafe {
887            let f: Box<F> = Box::new(f);
888            glib::signal::connect_raw(
889                self.as_ptr() as *mut _,
890                b"notify::blocksize\0".as_ptr() as *const _,
891                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
892                    notify_blocksize_trampoline::<F> as *const (),
893                )),
894                Box::into_raw(f),
895            )
896        }
897    }
898
899    #[doc(alias = "enable-last-sample")]
900    pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
901        &self,
902        f: F,
903    ) -> glib::SignalHandlerId {
904        unsafe extern "C" fn notify_enable_last_sample_trampoline<
905            F: Fn(&AppSink) + Send + Sync + 'static,
906        >(
907            this: *mut ffi::GstAppSink,
908            _param_spec: glib::ffi::gpointer,
909            f: glib::ffi::gpointer,
910        ) {
911            unsafe {
912                let f: &F = &*(f as *const F);
913                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
914            }
915        }
916        unsafe {
917            let f: Box<F> = Box::new(f);
918            glib::signal::connect_raw(
919                self.as_ptr() as *mut _,
920                b"notify::enable-last-sample\0".as_ptr() as *const _,
921                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
922                    notify_enable_last_sample_trampoline::<F> as *const (),
923                )),
924                Box::into_raw(f),
925            )
926        }
927    }
928
929    #[doc(alias = "last-sample")]
930    pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
931        &self,
932        f: F,
933    ) -> glib::SignalHandlerId {
934        unsafe extern "C" fn notify_last_sample_trampoline<
935            F: Fn(&AppSink) + Send + Sync + 'static,
936        >(
937            this: *mut ffi::GstAppSink,
938            _param_spec: glib::ffi::gpointer,
939            f: glib::ffi::gpointer,
940        ) {
941            unsafe {
942                let f: &F = &*(f as *const F);
943                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
944            }
945        }
946        unsafe {
947            let f: Box<F> = Box::new(f);
948            glib::signal::connect_raw(
949                self.as_ptr() as *mut _,
950                b"notify::last-sample\0".as_ptr() as *const _,
951                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
952                    notify_last_sample_trampoline::<F> as *const (),
953                )),
954                Box::into_raw(f),
955            )
956        }
957    }
958
959    #[doc(alias = "max-bitrate")]
960    pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
961        &self,
962        f: F,
963    ) -> glib::SignalHandlerId {
964        unsafe extern "C" fn notify_max_bitrate_trampoline<
965            F: Fn(&AppSink) + Send + Sync + 'static,
966        >(
967            this: *mut ffi::GstAppSink,
968            _param_spec: glib::ffi::gpointer,
969            f: glib::ffi::gpointer,
970        ) {
971            unsafe {
972                let f: &F = &*(f as *const F);
973                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
974            }
975        }
976        unsafe {
977            let f: Box<F> = Box::new(f);
978            glib::signal::connect_raw(
979                self.as_ptr() as *mut _,
980                b"notify::max-bitrate\0".as_ptr() as *const _,
981                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
982                    notify_max_bitrate_trampoline::<F> as *const (),
983                )),
984                Box::into_raw(f),
985            )
986        }
987    }
988
989    #[doc(alias = "max-lateness")]
990    pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
991        &self,
992        f: F,
993    ) -> glib::SignalHandlerId {
994        unsafe extern "C" fn notify_max_lateness_trampoline<
995            F: Fn(&AppSink) + Send + Sync + 'static,
996        >(
997            this: *mut ffi::GstAppSink,
998            _param_spec: glib::ffi::gpointer,
999            f: glib::ffi::gpointer,
1000        ) {
1001            unsafe {
1002                let f: &F = &*(f as *const F);
1003                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1004            }
1005        }
1006        unsafe {
1007            let f: Box<F> = Box::new(f);
1008            glib::signal::connect_raw(
1009                self.as_ptr() as *mut _,
1010                b"notify::max-lateness\0".as_ptr() as *const _,
1011                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1012                    notify_max_lateness_trampoline::<F> as *const (),
1013                )),
1014                Box::into_raw(f),
1015            )
1016        }
1017    }
1018
1019    #[cfg(feature = "v1_16")]
1020    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1021    #[doc(alias = "processing-deadline")]
1022    pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
1023        &self,
1024        f: F,
1025    ) -> glib::SignalHandlerId {
1026        unsafe extern "C" fn notify_processing_deadline_trampoline<
1027            F: Fn(&AppSink) + Send + Sync + 'static,
1028        >(
1029            this: *mut ffi::GstAppSink,
1030            _param_spec: glib::ffi::gpointer,
1031            f: glib::ffi::gpointer,
1032        ) {
1033            unsafe {
1034                let f: &F = &*(f as *const F);
1035                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1036            }
1037        }
1038        unsafe {
1039            let f: Box<F> = Box::new(f);
1040            glib::signal::connect_raw(
1041                self.as_ptr() as *mut _,
1042                b"notify::processing-deadline\0".as_ptr() as *const _,
1043                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1044                    notify_processing_deadline_trampoline::<F> as *const (),
1045                )),
1046                Box::into_raw(f),
1047            )
1048        }
1049    }
1050
1051    #[doc(alias = "qos")]
1052    pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1053        &self,
1054        f: F,
1055    ) -> glib::SignalHandlerId {
1056        unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1057            this: *mut ffi::GstAppSink,
1058            _param_spec: glib::ffi::gpointer,
1059            f: glib::ffi::gpointer,
1060        ) {
1061            unsafe {
1062                let f: &F = &*(f as *const F);
1063                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1064            }
1065        }
1066        unsafe {
1067            let f: Box<F> = Box::new(f);
1068            glib::signal::connect_raw(
1069                self.as_ptr() as *mut _,
1070                b"notify::qos\0".as_ptr() as *const _,
1071                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1072                    notify_qos_trampoline::<F> as *const (),
1073                )),
1074                Box::into_raw(f),
1075            )
1076        }
1077    }
1078
1079    #[doc(alias = "render-delay")]
1080    pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1081        &self,
1082        f: F,
1083    ) -> glib::SignalHandlerId {
1084        unsafe extern "C" fn notify_render_delay_trampoline<
1085            F: Fn(&AppSink) + Send + Sync + 'static,
1086        >(
1087            this: *mut ffi::GstAppSink,
1088            _param_spec: glib::ffi::gpointer,
1089            f: glib::ffi::gpointer,
1090        ) {
1091            unsafe {
1092                let f: &F = &*(f as *const F);
1093                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1094            }
1095        }
1096        unsafe {
1097            let f: Box<F> = Box::new(f);
1098            glib::signal::connect_raw(
1099                self.as_ptr() as *mut _,
1100                b"notify::render-delay\0".as_ptr() as *const _,
1101                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1102                    notify_render_delay_trampoline::<F> as *const (),
1103                )),
1104                Box::into_raw(f),
1105            )
1106        }
1107    }
1108
1109    #[cfg(feature = "v1_18")]
1110    #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1111    #[doc(alias = "stats")]
1112    pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1113        &self,
1114        f: F,
1115    ) -> glib::SignalHandlerId {
1116        unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1117            this: *mut ffi::GstAppSink,
1118            _param_spec: glib::ffi::gpointer,
1119            f: glib::ffi::gpointer,
1120        ) {
1121            unsafe {
1122                let f: &F = &*(f as *const F);
1123                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1124            }
1125        }
1126        unsafe {
1127            let f: Box<F> = Box::new(f);
1128            glib::signal::connect_raw(
1129                self.as_ptr() as *mut _,
1130                b"notify::stats\0".as_ptr() as *const _,
1131                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1132                    notify_stats_trampoline::<F> as *const (),
1133                )),
1134                Box::into_raw(f),
1135            )
1136        }
1137    }
1138
1139    #[doc(alias = "sync")]
1140    pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1141        &self,
1142        f: F,
1143    ) -> glib::SignalHandlerId {
1144        unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1145            this: *mut ffi::GstAppSink,
1146            _param_spec: glib::ffi::gpointer,
1147            f: glib::ffi::gpointer,
1148        ) {
1149            unsafe {
1150                let f: &F = &*(f as *const F);
1151                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1152            }
1153        }
1154        unsafe {
1155            let f: Box<F> = Box::new(f);
1156            glib::signal::connect_raw(
1157                self.as_ptr() as *mut _,
1158                b"notify::sync\0".as_ptr() as *const _,
1159                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1160                    notify_sync_trampoline::<F> as *const (),
1161                )),
1162                Box::into_raw(f),
1163            )
1164        }
1165    }
1166
1167    #[doc(alias = "throttle-time")]
1168    pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1169        &self,
1170        f: F,
1171    ) -> glib::SignalHandlerId {
1172        unsafe extern "C" fn notify_throttle_time_trampoline<
1173            F: Fn(&AppSink) + Send + Sync + 'static,
1174        >(
1175            this: *mut ffi::GstAppSink,
1176            _param_spec: glib::ffi::gpointer,
1177            f: glib::ffi::gpointer,
1178        ) {
1179            unsafe {
1180                let f: &F = &*(f as *const F);
1181                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1182            }
1183        }
1184        unsafe {
1185            let f: Box<F> = Box::new(f);
1186            glib::signal::connect_raw(
1187                self.as_ptr() as *mut _,
1188                b"notify::throttle-time\0".as_ptr() as *const _,
1189                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1190                    notify_throttle_time_trampoline::<F> as *const (),
1191                )),
1192                Box::into_raw(f),
1193            )
1194        }
1195    }
1196
1197    #[doc(alias = "ts-offset")]
1198    pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1199        &self,
1200        f: F,
1201    ) -> glib::SignalHandlerId {
1202        unsafe extern "C" fn notify_ts_offset_trampoline<
1203            F: Fn(&AppSink) + Send + Sync + 'static,
1204        >(
1205            this: *mut ffi::GstAppSink,
1206            _param_spec: glib::ffi::gpointer,
1207            f: glib::ffi::gpointer,
1208        ) {
1209            unsafe {
1210                let f: &F = &*(f as *const F);
1211                f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1212            }
1213        }
1214        unsafe {
1215            let f: Box<F> = Box::new(f);
1216            glib::signal::connect_raw(
1217                self.as_ptr() as *mut _,
1218                b"notify::ts-offset\0".as_ptr() as *const _,
1219                Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1220                    notify_ts_offset_trampoline::<F> as *const (),
1221                )),
1222                Box::into_raw(f),
1223            )
1224        }
1225    }
1226
1227    pub fn stream(&self) -> AppSinkStream {
1228        AppSinkStream::new(self)
1229    }
1230}
1231
1232// rustdoc-stripper-ignore-next
1233/// A [builder-pattern] type to construct [`AppSink`] objects.
1234///
1235/// [builder-pattern]: https://doc.rust-lang.org/1.0.0/style/ownership/builders.html
1236#[must_use = "The builder must be built to be used"]
1237pub struct AppSinkBuilder<'a> {
1238    builder: gst::gobject::GObjectBuilder<'a, AppSink>,
1239    callbacks: Option<AppSinkCallbacks>,
1240    drop_out_of_segment: Option<bool>,
1241}
1242
1243impl<'a> AppSinkBuilder<'a> {
1244    // rustdoc-stripper-ignore-next
1245    /// Build the [`AppSink`].
1246    ///
1247    /// # Panics
1248    ///
1249    /// This panics if the [`AppSink`] doesn't have all the given properties or
1250    /// property values of the wrong type are provided.
1251    #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1252    pub fn build(self) -> AppSink {
1253        let appsink = self.builder.build().unwrap();
1254
1255        if let Some(callbacks) = self.callbacks {
1256            appsink.set_callbacks(callbacks);
1257        }
1258
1259        if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1260            appsink.set_drop_out_of_segment(drop_out_of_segment);
1261        }
1262
1263        appsink
1264    }
1265
1266    pub fn async_(self, async_: bool) -> Self {
1267        Self {
1268            builder: self.builder.property("async", async_),
1269            ..self
1270        }
1271    }
1272
1273    pub fn buffer_list(self, buffer_list: bool) -> Self {
1274        Self {
1275            builder: self.builder.property("buffer-list", buffer_list),
1276            ..self
1277        }
1278    }
1279
1280    pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1281        Self {
1282            callbacks: Some(callbacks),
1283            ..self
1284        }
1285    }
1286
1287    pub fn caps(self, caps: &'a gst::Caps) -> Self {
1288        Self {
1289            builder: self.builder.property("caps", caps),
1290            ..self
1291        }
1292    }
1293
1294    #[cfg_attr(feature = "v1_28", deprecated = "Since 1.28")]
1295    #[allow(deprecated)]
1296    pub fn drop(self, drop: bool) -> Self {
1297        Self {
1298            builder: self.builder.property("drop", drop),
1299            ..self
1300        }
1301    }
1302
1303    pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1304        Self {
1305            drop_out_of_segment: Some(drop_out_of_segment),
1306            ..self
1307        }
1308    }
1309
1310    pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1311        Self {
1312            builder: self
1313                .builder
1314                .property("enable-last-sample", enable_last_sample),
1315            ..self
1316        }
1317    }
1318
1319    pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1320        Self {
1321            builder: self.builder.property("max-bitrate", max_bitrate),
1322            ..self
1323        }
1324    }
1325
1326    pub fn max_buffers(self, max_buffers: u32) -> Self {
1327        Self {
1328            builder: self.builder.property("max-buffers", max_buffers),
1329            ..self
1330        }
1331    }
1332
1333    pub fn max_lateness(self, max_lateness: i64) -> Self {
1334        Self {
1335            builder: self.builder.property("max-lateness", max_lateness),
1336            ..self
1337        }
1338    }
1339
1340    #[cfg(feature = "v1_16")]
1341    #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1342    pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1343        Self {
1344            builder: self
1345                .builder
1346                .property("processing-deadline", processing_deadline),
1347            ..self
1348        }
1349    }
1350
1351    pub fn qos(self, qos: bool) -> Self {
1352        Self {
1353            builder: self.builder.property("qos", qos),
1354            ..self
1355        }
1356    }
1357
1358    pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1359        Self {
1360            builder: self.builder.property("render-delay", render_delay),
1361            ..self
1362        }
1363    }
1364
1365    pub fn sync(self, sync: bool) -> Self {
1366        Self {
1367            builder: self.builder.property("sync", sync),
1368            ..self
1369        }
1370    }
1371
1372    pub fn throttle_time(self, throttle_time: u64) -> Self {
1373        Self {
1374            builder: self.builder.property("throttle-time", throttle_time),
1375            ..self
1376        }
1377    }
1378
1379    pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1380        Self {
1381            builder: self.builder.property("ts-offset", ts_offset),
1382            ..self
1383        }
1384    }
1385
1386    pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1387        Self {
1388            builder: self.builder.property("wait-on-eos", wait_on_eos),
1389            ..self
1390        }
1391    }
1392
1393    #[cfg(feature = "v1_24")]
1394    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1395    pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1396        Self {
1397            builder: self.builder.property("max-time", max_time),
1398            ..self
1399        }
1400    }
1401
1402    #[cfg(feature = "v1_24")]
1403    #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1404    pub fn max_bytes(self, max_bytes: u64) -> Self {
1405        Self {
1406            builder: self.builder.property("max-bytes", max_bytes),
1407            ..self
1408        }
1409    }
1410
1411    #[cfg(feature = "v1_28")]
1412    #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1413    pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
1414        Self {
1415            builder: self.builder.property("leaky-type", leaky_type),
1416            ..self
1417        }
1418    }
1419
1420    #[cfg(feature = "v1_28")]
1421    #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1422    pub fn silent(self, silent: bool) -> Self {
1423        Self {
1424            builder: self.builder.property("silent", silent),
1425            ..self
1426        }
1427    }
1428
1429    // rustdoc-stripper-ignore-next
1430    /// Sets property `name` to the given value `value`.
1431    ///
1432    /// Overrides any default or previously defined value for `name`.
1433    #[inline]
1434    pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
1435        Self {
1436            builder: self.builder.property(name, value),
1437            ..self
1438        }
1439    }
1440
1441    // rustdoc-stripper-ignore-next
1442    /// Sets property `name` to the given string value `value`.
1443    #[inline]
1444    pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
1445        Self {
1446            builder: self.builder.property_from_str(name, value),
1447            ..self
1448        }
1449    }
1450
1451    gst::impl_builder_gvalue_extra_setters!(property_and_name);
1452}
1453
1454#[derive(Debug)]
1455pub struct AppSinkStream {
1456    app_sink: glib::WeakRef<AppSink>,
1457    waker_reference: Arc<Mutex<Option<Waker>>>,
1458}
1459
1460impl AppSinkStream {
1461    fn new(app_sink: &AppSink) -> Self {
1462        skip_assert_initialized!();
1463
1464        let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1465
1466        app_sink.set_callbacks(
1467            AppSinkCallbacks::builder()
1468                .new_sample({
1469                    let waker_reference = Arc::clone(&waker_reference);
1470
1471                    move |_| {
1472                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1473                            waker.wake();
1474                        }
1475
1476                        Ok(gst::FlowSuccess::Ok)
1477                    }
1478                })
1479                .eos({
1480                    let waker_reference = Arc::clone(&waker_reference);
1481
1482                    move |_| {
1483                        if let Some(waker) = waker_reference.lock().unwrap().take() {
1484                            waker.wake();
1485                        }
1486                    }
1487                })
1488                .build(),
1489        );
1490
1491        Self {
1492            app_sink: app_sink.downgrade(),
1493            waker_reference,
1494        }
1495    }
1496}
1497
1498impl Drop for AppSinkStream {
1499    fn drop(&mut self) {
1500        #[cfg(not(feature = "v1_18"))]
1501        {
1502            // This is not thread-safe before 1.16.3, see
1503            // https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/merge_requests/570
1504            if gst::version() >= (1, 16, 3, 0)
1505                && let Some(app_sink) = self.app_sink.upgrade()
1506            {
1507                app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1508            }
1509        }
1510    }
1511}
1512
1513impl Stream for AppSinkStream {
1514    type Item = gst::Sample;
1515
1516    fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1517        let mut waker = self.waker_reference.lock().unwrap();
1518
1519        let Some(app_sink) = self.app_sink.upgrade() else {
1520            return Poll::Ready(None);
1521        };
1522
1523        app_sink
1524            .try_pull_sample(gst::ClockTime::ZERO)
1525            .map(|sample| Poll::Ready(Some(sample)))
1526            .unwrap_or_else(|| {
1527                if app_sink.is_eos() {
1528                    return Poll::Ready(None);
1529                }
1530
1531                waker.replace(context.waker().to_owned());
1532
1533                Poll::Pending
1534            })
1535    }
1536}
1537
1538#[cfg(test)]
1539mod tests {
1540    use futures_util::StreamExt;
1541    use gst::prelude::*;
1542
1543    use super::*;
1544
1545    #[test]
1546    fn test_app_sink_stream() {
1547        gst::init().unwrap();
1548
1549        let videotestsrc = gst::ElementFactory::make("videotestsrc")
1550            .property("num-buffers", 5)
1551            .build()
1552            .unwrap();
1553        let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1554
1555        let pipeline = gst::Pipeline::new();
1556        pipeline.add(&videotestsrc).unwrap();
1557        pipeline.add(&appsink).unwrap();
1558
1559        videotestsrc.link(&appsink).unwrap();
1560
1561        let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1562        let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1563
1564        pipeline.set_state(gst::State::Playing).unwrap();
1565        let samples = futures_executor::block_on(samples_future);
1566        pipeline.set_state(gst::State::Null).unwrap();
1567
1568        assert_eq!(samples.len(), 5);
1569    }
1570}