1use std::{
4 mem, panic,
5 pin::Pin,
6 ptr,
7 sync::{Arc, Mutex},
8 task::{Context, Poll, Waker},
9};
10
11#[cfg(not(panic = "abort"))]
12use std::sync::atomic::{AtomicBool, Ordering};
13
14use futures_core::Stream;
15use glib::{ffi::gpointer, prelude::*, translate::*};
16
17use crate::{AppSink, ffi};
18
19#[allow(clippy::type_complexity)]
20pub struct AppSinkCallbacks {
21 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
22 new_preroll: Option<
23 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
24 >,
25 new_sample: Option<
26 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
27 >,
28 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
29 propose_allocation:
30 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
31 #[cfg(not(panic = "abort"))]
32 panicked: AtomicBool,
33 callbacks: ffi::GstAppSinkCallbacks,
34}
35
36unsafe impl Send for AppSinkCallbacks {}
37unsafe impl Sync for AppSinkCallbacks {}
38
39impl AppSinkCallbacks {
40 pub fn builder() -> AppSinkCallbacksBuilder {
41 skip_assert_initialized!();
42 AppSinkCallbacksBuilder {
43 eos: None,
44 new_preroll: None,
45 new_sample: None,
46 new_event: None,
47 propose_allocation: None,
48 }
49 }
50}
51
52#[allow(clippy::type_complexity)]
53#[must_use = "The builder must be built to be used"]
54pub struct AppSinkCallbacksBuilder {
55 eos: Option<Box<dyn FnMut(&AppSink) + Send + 'static>>,
56 new_preroll: Option<
57 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
58 >,
59 new_sample: Option<
60 Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
61 >,
62 new_event: Option<Box<dyn FnMut(&AppSink) -> bool + Send + 'static>>,
63 propose_allocation:
64 Option<Box<dyn FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static>>,
65}
66
67impl AppSinkCallbacksBuilder {
68 pub fn eos<F: FnMut(&AppSink) + Send + 'static>(self, eos: F) -> Self {
69 Self {
70 eos: Some(Box::new(eos)),
71 ..self
72 }
73 }
74
75 pub fn eos_if<F: FnMut(&AppSink) + Send + 'static>(self, eos: F, predicate: bool) -> Self {
76 if predicate { self.eos(eos) } else { self }
77 }
78
79 pub fn eos_if_some<F: FnMut(&AppSink) + Send + 'static>(self, eos: Option<F>) -> Self {
80 if let Some(eos) = eos {
81 self.eos(eos)
82 } else {
83 self
84 }
85 }
86
87 pub fn new_preroll<
88 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
89 >(
90 self,
91 new_preroll: F,
92 ) -> Self {
93 Self {
94 new_preroll: Some(Box::new(new_preroll)),
95 ..self
96 }
97 }
98
99 pub fn new_preroll_if<
100 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
101 >(
102 self,
103 new_preroll: F,
104 predicate: bool,
105 ) -> Self {
106 if predicate {
107 self.new_preroll(new_preroll)
108 } else {
109 self
110 }
111 }
112
113 pub fn new_preroll_if_some<
114 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
115 >(
116 self,
117 new_preroll: Option<F>,
118 ) -> Self {
119 if let Some(new_preroll) = new_preroll {
120 self.new_preroll(new_preroll)
121 } else {
122 self
123 }
124 }
125
126 pub fn new_sample<
127 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
128 >(
129 self,
130 new_sample: F,
131 ) -> Self {
132 Self {
133 new_sample: Some(Box::new(new_sample)),
134 ..self
135 }
136 }
137
138 pub fn new_sample_if<
139 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
140 >(
141 self,
142 new_sample: F,
143 predicate: bool,
144 ) -> Self {
145 if predicate {
146 self.new_sample(new_sample)
147 } else {
148 self
149 }
150 }
151
152 pub fn new_sample_if_some<
153 F: FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static,
154 >(
155 self,
156 new_sample: Option<F>,
157 ) -> Self {
158 if let Some(new_sample) = new_sample {
159 self.new_sample(new_sample)
160 } else {
161 self
162 }
163 }
164
165 #[cfg(feature = "v1_20")]
166 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
167 pub fn new_event<F: FnMut(&AppSink) -> bool + Send + 'static>(self, new_event: F) -> Self {
168 Self {
169 new_event: Some(Box::new(new_event)),
170 ..self
171 }
172 }
173
174 #[cfg(feature = "v1_20")]
175 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
176 pub fn new_event_if<F: FnMut(&AppSink) -> bool + Send + 'static>(
177 self,
178 new_event: F,
179 predicate: bool,
180 ) -> Self {
181 if predicate {
182 self.new_event(new_event)
183 } else {
184 self
185 }
186 }
187
188 #[cfg(feature = "v1_20")]
189 #[cfg_attr(docsrs, doc(cfg(feature = "v1_20")))]
190 pub fn new_event_if_some<F: FnMut(&AppSink) -> bool + Send + 'static>(
191 self,
192 new_event: Option<F>,
193 ) -> Self {
194 if let Some(new_event) = new_event {
195 self.new_event(new_event)
196 } else {
197 self
198 }
199 }
200
201 #[cfg(feature = "v1_24")]
202 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
203 pub fn propose_allocation<
204 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
205 >(
206 self,
207 propose_allocation: F,
208 ) -> Self {
209 Self {
210 propose_allocation: Some(Box::new(propose_allocation)),
211 ..self
212 }
213 }
214
215 #[cfg(feature = "v1_24")]
216 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
217 pub fn propose_allocation_if<
218 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
219 >(
220 self,
221 propose_allocation: F,
222 predicate: bool,
223 ) -> Self {
224 if predicate {
225 self.propose_allocation(propose_allocation)
226 } else {
227 self
228 }
229 }
230
231 #[cfg(feature = "v1_24")]
232 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
233 pub fn propose_allocation_if_some<
234 F: FnMut(&AppSink, &mut gst::query::Allocation) -> bool + Send + 'static,
235 >(
236 self,
237 propose_allocation: Option<F>,
238 ) -> Self {
239 if let Some(propose_allocation) = propose_allocation {
240 self.propose_allocation(propose_allocation)
241 } else {
242 self
243 }
244 }
245
246 #[must_use = "Building the callbacks without using them has no effect"]
247 pub fn build(self) -> AppSinkCallbacks {
248 let have_eos = self.eos.is_some();
249 let have_new_preroll = self.new_preroll.is_some();
250 let have_new_sample = self.new_sample.is_some();
251 let have_new_event = self.new_event.is_some();
252 let have_propose_allocation = self.propose_allocation.is_some();
253
254 AppSinkCallbacks {
255 eos: self.eos,
256 new_preroll: self.new_preroll,
257 new_sample: self.new_sample,
258 new_event: self.new_event,
259 propose_allocation: self.propose_allocation,
260 #[cfg(not(panic = "abort"))]
261 panicked: AtomicBool::new(false),
262 callbacks: ffi::GstAppSinkCallbacks {
263 eos: if have_eos { Some(trampoline_eos) } else { None },
264 new_preroll: if have_new_preroll {
265 Some(trampoline_new_preroll)
266 } else {
267 None
268 },
269 new_sample: if have_new_sample {
270 Some(trampoline_new_sample)
271 } else {
272 None
273 },
274 new_event: if have_new_event {
275 Some(trampoline_new_event)
276 } else {
277 None
278 },
279 propose_allocation: if have_propose_allocation {
280 Some(trampoline_propose_allocation)
281 } else {
282 None
283 },
284 _gst_reserved: [ptr::null_mut(), ptr::null_mut()],
285 },
286 }
287 }
288}
289
290unsafe extern "C" fn trampoline_eos(appsink: *mut ffi::GstAppSink, callbacks: gpointer) {
291 unsafe {
292 let callbacks = callbacks as *mut AppSinkCallbacks;
293 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
294
295 #[cfg(not(panic = "abort"))]
296 if (*callbacks).panicked.load(Ordering::Relaxed) {
297 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
298 gst::subclass::post_panic_error_message(
299 element.upcast_ref(),
300 element.upcast_ref(),
301 None,
302 );
303 return;
304 }
305
306 if let Some(ref mut eos) = (*callbacks).eos {
307 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| eos(&element)));
308 match result {
309 Ok(result) => result,
310 Err(err) => {
311 #[cfg(panic = "abort")]
312 {
313 unreachable!("{err:?}");
314 }
315 #[cfg(not(panic = "abort"))]
316 {
317 (*callbacks).panicked.store(true, Ordering::Relaxed);
318 gst::subclass::post_panic_error_message(
319 element.upcast_ref(),
320 element.upcast_ref(),
321 Some(err),
322 );
323 }
324 }
325 }
326 }
327 }
328}
329
330unsafe extern "C" fn trampoline_new_preroll(
331 appsink: *mut ffi::GstAppSink,
332 callbacks: gpointer,
333) -> gst::ffi::GstFlowReturn {
334 unsafe {
335 let callbacks = callbacks as *mut AppSinkCallbacks;
336 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
337
338 #[cfg(not(panic = "abort"))]
339 if (*callbacks).panicked.load(Ordering::Relaxed) {
340 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
341 gst::subclass::post_panic_error_message(
342 element.upcast_ref(),
343 element.upcast_ref(),
344 None,
345 );
346 return gst::FlowReturn::Error.into_glib();
347 }
348
349 let ret = if let Some(ref mut new_preroll) = (*callbacks).new_preroll {
350 let result =
351 panic::catch_unwind(panic::AssertUnwindSafe(|| new_preroll(&element).into()));
352 match result {
353 Ok(result) => result,
354 Err(err) => {
355 #[cfg(panic = "abort")]
356 {
357 unreachable!("{err:?}");
358 }
359 #[cfg(not(panic = "abort"))]
360 {
361 (*callbacks).panicked.store(true, Ordering::Relaxed);
362 gst::subclass::post_panic_error_message(
363 element.upcast_ref(),
364 element.upcast_ref(),
365 Some(err),
366 );
367
368 gst::FlowReturn::Error
369 }
370 }
371 }
372 } else {
373 gst::FlowReturn::Error
374 };
375
376 ret.into_glib()
377 }
378}
379
380unsafe extern "C" fn trampoline_new_sample(
381 appsink: *mut ffi::GstAppSink,
382 callbacks: gpointer,
383) -> gst::ffi::GstFlowReturn {
384 unsafe {
385 let callbacks = callbacks as *mut AppSinkCallbacks;
386 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
387
388 #[cfg(not(panic = "abort"))]
389 if (*callbacks).panicked.load(Ordering::Relaxed) {
390 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
391 gst::subclass::post_panic_error_message(
392 element.upcast_ref(),
393 element.upcast_ref(),
394 None,
395 );
396 return gst::FlowReturn::Error.into_glib();
397 }
398
399 let ret = if let Some(ref mut new_sample) = (*callbacks).new_sample {
400 let result =
401 panic::catch_unwind(panic::AssertUnwindSafe(|| new_sample(&element).into()));
402 match result {
403 Ok(result) => result,
404 Err(err) => {
405 #[cfg(panic = "abort")]
406 {
407 unreachable!("{err:?}");
408 }
409 #[cfg(not(panic = "abort"))]
410 {
411 (*callbacks).panicked.store(true, Ordering::Relaxed);
412 gst::subclass::post_panic_error_message(
413 element.upcast_ref(),
414 element.upcast_ref(),
415 Some(err),
416 );
417
418 gst::FlowReturn::Error
419 }
420 }
421 }
422 } else {
423 gst::FlowReturn::Error
424 };
425
426 ret.into_glib()
427 }
428}
429
430unsafe extern "C" fn trampoline_new_event(
431 appsink: *mut ffi::GstAppSink,
432 callbacks: gpointer,
433) -> glib::ffi::gboolean {
434 unsafe {
435 let callbacks = callbacks as *mut AppSinkCallbacks;
436 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
437
438 #[cfg(not(panic = "abort"))]
439 if (*callbacks).panicked.load(Ordering::Relaxed) {
440 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
441 gst::subclass::post_panic_error_message(
442 element.upcast_ref(),
443 element.upcast_ref(),
444 None,
445 );
446 return false.into_glib();
447 }
448
449 let ret = if let Some(ref mut new_event) = (*callbacks).new_event {
450 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| new_event(&element)));
451 match result {
452 Ok(result) => result,
453 Err(err) => {
454 #[cfg(panic = "abort")]
455 {
456 unreachable!("{err:?}");
457 }
458 #[cfg(not(panic = "abort"))]
459 {
460 (*callbacks).panicked.store(true, Ordering::Relaxed);
461 gst::subclass::post_panic_error_message(
462 element.upcast_ref(),
463 element.upcast_ref(),
464 Some(err),
465 );
466
467 false
468 }
469 }
470 }
471 } else {
472 false
473 };
474
475 ret.into_glib()
476 }
477}
478
479unsafe extern "C" fn trampoline_propose_allocation(
480 appsink: *mut ffi::GstAppSink,
481 query: *mut gst::ffi::GstQuery,
482 callbacks: gpointer,
483) -> glib::ffi::gboolean {
484 unsafe {
485 let callbacks = callbacks as *mut AppSinkCallbacks;
486 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
487
488 #[cfg(not(panic = "abort"))]
489 if (*callbacks).panicked.load(Ordering::Relaxed) {
490 let element: Borrowed<AppSink> = from_glib_borrow(appsink);
491 gst::subclass::post_panic_error_message(
492 element.upcast_ref(),
493 element.upcast_ref(),
494 None,
495 );
496 return false.into_glib();
497 }
498
499 let ret = if let Some(ref mut propose_allocation) = (*callbacks).propose_allocation {
500 let query = match gst::QueryRef::from_mut_ptr(query).view_mut() {
501 gst::QueryViewMut::Allocation(allocation) => allocation,
502 _ => unreachable!(),
503 };
504 let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
505 propose_allocation(&element, query)
506 }));
507 match result {
508 Ok(result) => result,
509 Err(err) => {
510 #[cfg(panic = "abort")]
511 {
512 unreachable!("{err:?}");
513 }
514 #[cfg(not(panic = "abort"))]
515 {
516 (*callbacks).panicked.store(true, Ordering::Relaxed);
517 gst::subclass::post_panic_error_message(
518 element.upcast_ref(),
519 element.upcast_ref(),
520 Some(err),
521 );
522 false
523 }
524 }
525 }
526 } else {
527 false
528 };
529
530 ret.into_glib()
531 }
532}
533
534unsafe extern "C" fn destroy_callbacks(ptr: gpointer) {
535 unsafe {
536 let _ = Box::<AppSinkCallbacks>::from_raw(ptr as *mut _);
537 }
538}
539
540impl AppSink {
541 pub fn builder<'a>() -> AppSinkBuilder<'a> {
546 assert_initialized_main_thread!();
547 AppSinkBuilder {
548 builder: gst::Object::builder(),
549 callbacks: None,
550 drop_out_of_segment: None,
551 }
552 }
553
554 #[doc(alias = "gst_app_sink_set_callbacks")]
555 pub fn set_callbacks(&self, callbacks: AppSinkCallbacks) {
556 unsafe {
557 let sink = self.to_glib_none().0;
558
559 #[allow(clippy::manual_dangling_ptr)]
560 #[cfg(not(feature = "v1_18"))]
561 {
562 static SET_ONCE_QUARK: std::sync::OnceLock<glib::Quark> =
563 std::sync::OnceLock::new();
564
565 let set_once_quark = SET_ONCE_QUARK
566 .get_or_init(|| glib::Quark::from_str("gstreamer-rs-app-sink-callbacks"));
567
568 if gst::version() < (1, 16, 3, 0) {
571 if !glib::gobject_ffi::g_object_get_qdata(
572 sink as *mut _,
573 set_once_quark.into_glib(),
574 )
575 .is_null()
576 {
577 panic!("AppSink callbacks can only be set once");
578 }
579
580 glib::gobject_ffi::g_object_set_qdata(
581 sink as *mut _,
582 set_once_quark.into_glib(),
583 1 as *mut _,
584 );
585 }
586 }
587
588 ffi::gst_app_sink_set_callbacks(
589 sink,
590 mut_override(&callbacks.callbacks),
591 Box::into_raw(Box::new(callbacks)) as *mut _,
592 Some(destroy_callbacks),
593 );
594 }
595 }
596
597 #[doc(alias = "drop-out-of-segment")]
598 pub fn drops_out_of_segment(&self) -> bool {
599 unsafe {
600 from_glib(gst_base::ffi::gst_base_sink_get_drop_out_of_segment(
601 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
602 ))
603 }
604 }
605
606 #[doc(alias = "max-bitrate")]
607 #[doc(alias = "gst_base_sink_get_max_bitrate")]
608 pub fn max_bitrate(&self) -> u64 {
609 unsafe {
610 gst_base::ffi::gst_base_sink_get_max_bitrate(
611 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
612 )
613 }
614 }
615
616 #[doc(alias = "max-lateness")]
617 #[doc(alias = "gst_base_sink_get_max_lateness")]
618 pub fn max_lateness(&self) -> i64 {
619 unsafe {
620 gst_base::ffi::gst_base_sink_get_max_lateness(
621 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
622 )
623 }
624 }
625
626 #[doc(alias = "processing-deadline")]
627 #[cfg(feature = "v1_16")]
628 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
629 #[doc(alias = "gst_base_sink_get_processing_deadline")]
630 pub fn processing_deadline(&self) -> gst::ClockTime {
631 unsafe {
632 try_from_glib(gst_base::ffi::gst_base_sink_get_processing_deadline(
633 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
634 ))
635 .expect("undefined processing_deadline")
636 }
637 }
638
639 #[doc(alias = "render-delay")]
640 #[doc(alias = "gst_base_sink_get_render_delay")]
641 pub fn render_delay(&self) -> gst::ClockTime {
642 unsafe {
643 try_from_glib(gst_base::ffi::gst_base_sink_get_render_delay(
644 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
645 ))
646 .expect("undefined render_delay")
647 }
648 }
649
650 #[cfg(feature = "v1_18")]
651 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
652 #[doc(alias = "gst_base_sink_get_stats")]
653 pub fn stats(&self) -> gst::Structure {
654 unsafe {
655 from_glib_full(gst_base::ffi::gst_base_sink_get_stats(
656 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
657 ))
658 }
659 }
660
661 #[doc(alias = "sync")]
662 pub fn is_sync(&self) -> bool {
663 unsafe {
664 from_glib(gst_base::ffi::gst_base_sink_get_sync(
665 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
666 ))
667 }
668 }
669
670 #[doc(alias = "throttle-time")]
671 #[doc(alias = "gst_base_sink_get_throttle_time")]
672 pub fn throttle_time(&self) -> u64 {
673 unsafe {
674 gst_base::ffi::gst_base_sink_get_throttle_time(
675 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
676 )
677 }
678 }
679
680 #[doc(alias = "ts-offset")]
681 #[doc(alias = "gst_base_sink_get_ts_offset")]
682 pub fn ts_offset(&self) -> gst::ClockTimeDiff {
683 unsafe {
684 gst_base::ffi::gst_base_sink_get_ts_offset(
685 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
686 )
687 }
688 }
689
690 #[doc(alias = "async")]
691 #[doc(alias = "gst_base_sink_is_async_enabled")]
692 pub fn is_async(&self) -> bool {
693 unsafe {
694 from_glib(gst_base::ffi::gst_base_sink_is_async_enabled(
695 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
696 ))
697 }
698 }
699
700 #[doc(alias = "last-sample")]
701 pub fn enables_last_sample(&self) -> bool {
702 unsafe {
703 from_glib(gst_base::ffi::gst_base_sink_is_last_sample_enabled(
704 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
705 ))
706 }
707 }
708
709 #[doc(alias = "qos")]
710 #[doc(alias = "gst_base_sink_is_qos_enabled")]
711 pub fn is_qos(&self) -> bool {
712 unsafe {
713 from_glib(gst_base::ffi::gst_base_sink_is_qos_enabled(
714 self.as_ptr() as *mut gst_base::ffi::GstBaseSink
715 ))
716 }
717 }
718
719 #[doc(alias = "async")]
720 #[doc(alias = "gst_base_sink_set_async_enabled")]
721 pub fn set_async(&self, enabled: bool) {
722 unsafe {
723 gst_base::ffi::gst_base_sink_set_async_enabled(
724 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
725 enabled.into_glib(),
726 );
727 }
728 }
729
730 #[doc(alias = "drop-out-of-segment")]
731 #[doc(alias = "gst_base_sink_set_drop_out_of_segment")]
732 pub fn set_drop_out_of_segment(&self, drop_out_of_segment: bool) {
733 unsafe {
734 gst_base::ffi::gst_base_sink_set_drop_out_of_segment(
735 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
736 drop_out_of_segment.into_glib(),
737 );
738 }
739 }
740
741 #[doc(alias = "last-sample")]
742 pub fn set_enable_last_sample(&self, enabled: bool) {
743 unsafe {
744 gst_base::ffi::gst_base_sink_set_last_sample_enabled(
745 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
746 enabled.into_glib(),
747 );
748 }
749 }
750
751 #[doc(alias = "max-bitrate")]
752 #[doc(alias = "gst_base_sink_set_max_bitrate")]
753 pub fn set_max_bitrate(&self, max_bitrate: u64) {
754 unsafe {
755 gst_base::ffi::gst_base_sink_set_max_bitrate(
756 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
757 max_bitrate,
758 );
759 }
760 }
761
762 #[doc(alias = "max-lateness")]
763 #[doc(alias = "gst_base_sink_set_max_lateness")]
764 pub fn set_max_lateness(&self, max_lateness: i64) {
765 unsafe {
766 gst_base::ffi::gst_base_sink_set_max_lateness(
767 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
768 max_lateness,
769 );
770 }
771 }
772
773 #[doc(alias = "processing-deadline")]
774 #[cfg(feature = "v1_16")]
775 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
776 #[doc(alias = "gst_base_sink_set_processing_deadline")]
777 pub fn set_processing_deadline(&self, processing_deadline: gst::ClockTime) {
778 unsafe {
779 gst_base::ffi::gst_base_sink_set_processing_deadline(
780 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
781 processing_deadline.into_glib(),
782 );
783 }
784 }
785
786 #[doc(alias = "qos")]
787 #[doc(alias = "gst_base_sink_set_qos_enabled")]
788 pub fn set_qos(&self, enabled: bool) {
789 unsafe {
790 gst_base::ffi::gst_base_sink_set_qos_enabled(
791 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
792 enabled.into_glib(),
793 );
794 }
795 }
796
797 #[doc(alias = "render-delay")]
798 #[doc(alias = "gst_base_sink_set_render_delay")]
799 pub fn set_render_delay(&self, delay: gst::ClockTime) {
800 unsafe {
801 gst_base::ffi::gst_base_sink_set_render_delay(
802 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
803 delay.into_glib(),
804 );
805 }
806 }
807
808 #[doc(alias = "sync")]
809 #[doc(alias = "gst_base_sink_set_sync")]
810 pub fn set_sync(&self, sync: bool) {
811 unsafe {
812 gst_base::ffi::gst_base_sink_set_sync(
813 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
814 sync.into_glib(),
815 );
816 }
817 }
818
819 #[doc(alias = "throttle-time")]
820 #[doc(alias = "gst_base_sink_set_throttle_time")]
821 pub fn set_throttle_time(&self, throttle: u64) {
822 unsafe {
823 gst_base::ffi::gst_base_sink_set_throttle_time(
824 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
825 throttle,
826 );
827 }
828 }
829
830 #[doc(alias = "ts-offset")]
831 #[doc(alias = "gst_base_sink_set_ts_offset")]
832 pub fn set_ts_offset(&self, offset: gst::ClockTimeDiff) {
833 unsafe {
834 gst_base::ffi::gst_base_sink_set_ts_offset(
835 self.as_ptr() as *mut gst_base::ffi::GstBaseSink,
836 offset,
837 );
838 }
839 }
840
841 #[doc(alias = "async")]
842 pub fn connect_async_notify<F: Fn(&Self) + Send + Sync + 'static>(
843 &self,
844 f: F,
845 ) -> glib::SignalHandlerId {
846 unsafe extern "C" fn notify_async_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
847 this: *mut ffi::GstAppSink,
848 _param_spec: glib::ffi::gpointer,
849 f: glib::ffi::gpointer,
850 ) {
851 unsafe {
852 let f: &F = &*(f as *const F);
853 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
854 }
855 }
856 unsafe {
857 let f: Box<F> = Box::new(f);
858 glib::signal::connect_raw(
859 self.as_ptr() as *mut _,
860 b"notify::async\0".as_ptr() as *const _,
861 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
862 notify_async_trampoline::<F> as *const (),
863 )),
864 Box::into_raw(f),
865 )
866 }
867 }
868
869 #[doc(alias = "blocksize")]
870 pub fn connect_blocksize_notify<F: Fn(&Self) + Send + Sync + 'static>(
871 &self,
872 f: F,
873 ) -> glib::SignalHandlerId {
874 unsafe extern "C" fn notify_blocksize_trampoline<
875 F: Fn(&AppSink) + Send + Sync + 'static,
876 >(
877 this: *mut ffi::GstAppSink,
878 _param_spec: glib::ffi::gpointer,
879 f: glib::ffi::gpointer,
880 ) {
881 unsafe {
882 let f: &F = &*(f as *const F);
883 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
884 }
885 }
886 unsafe {
887 let f: Box<F> = Box::new(f);
888 glib::signal::connect_raw(
889 self.as_ptr() as *mut _,
890 b"notify::blocksize\0".as_ptr() as *const _,
891 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
892 notify_blocksize_trampoline::<F> as *const (),
893 )),
894 Box::into_raw(f),
895 )
896 }
897 }
898
899 #[doc(alias = "enable-last-sample")]
900 pub fn connect_enable_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
901 &self,
902 f: F,
903 ) -> glib::SignalHandlerId {
904 unsafe extern "C" fn notify_enable_last_sample_trampoline<
905 F: Fn(&AppSink) + Send + Sync + 'static,
906 >(
907 this: *mut ffi::GstAppSink,
908 _param_spec: glib::ffi::gpointer,
909 f: glib::ffi::gpointer,
910 ) {
911 unsafe {
912 let f: &F = &*(f as *const F);
913 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
914 }
915 }
916 unsafe {
917 let f: Box<F> = Box::new(f);
918 glib::signal::connect_raw(
919 self.as_ptr() as *mut _,
920 b"notify::enable-last-sample\0".as_ptr() as *const _,
921 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
922 notify_enable_last_sample_trampoline::<F> as *const (),
923 )),
924 Box::into_raw(f),
925 )
926 }
927 }
928
929 #[doc(alias = "last-sample")]
930 pub fn connect_last_sample_notify<F: Fn(&Self) + Send + Sync + 'static>(
931 &self,
932 f: F,
933 ) -> glib::SignalHandlerId {
934 unsafe extern "C" fn notify_last_sample_trampoline<
935 F: Fn(&AppSink) + Send + Sync + 'static,
936 >(
937 this: *mut ffi::GstAppSink,
938 _param_spec: glib::ffi::gpointer,
939 f: glib::ffi::gpointer,
940 ) {
941 unsafe {
942 let f: &F = &*(f as *const F);
943 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
944 }
945 }
946 unsafe {
947 let f: Box<F> = Box::new(f);
948 glib::signal::connect_raw(
949 self.as_ptr() as *mut _,
950 b"notify::last-sample\0".as_ptr() as *const _,
951 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
952 notify_last_sample_trampoline::<F> as *const (),
953 )),
954 Box::into_raw(f),
955 )
956 }
957 }
958
959 #[doc(alias = "max-bitrate")]
960 pub fn connect_max_bitrate_notify<F: Fn(&Self) + Send + Sync + 'static>(
961 &self,
962 f: F,
963 ) -> glib::SignalHandlerId {
964 unsafe extern "C" fn notify_max_bitrate_trampoline<
965 F: Fn(&AppSink) + Send + Sync + 'static,
966 >(
967 this: *mut ffi::GstAppSink,
968 _param_spec: glib::ffi::gpointer,
969 f: glib::ffi::gpointer,
970 ) {
971 unsafe {
972 let f: &F = &*(f as *const F);
973 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
974 }
975 }
976 unsafe {
977 let f: Box<F> = Box::new(f);
978 glib::signal::connect_raw(
979 self.as_ptr() as *mut _,
980 b"notify::max-bitrate\0".as_ptr() as *const _,
981 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
982 notify_max_bitrate_trampoline::<F> as *const (),
983 )),
984 Box::into_raw(f),
985 )
986 }
987 }
988
989 #[doc(alias = "max-lateness")]
990 pub fn connect_max_lateness_notify<F: Fn(&Self) + Send + Sync + 'static>(
991 &self,
992 f: F,
993 ) -> glib::SignalHandlerId {
994 unsafe extern "C" fn notify_max_lateness_trampoline<
995 F: Fn(&AppSink) + Send + Sync + 'static,
996 >(
997 this: *mut ffi::GstAppSink,
998 _param_spec: glib::ffi::gpointer,
999 f: glib::ffi::gpointer,
1000 ) {
1001 unsafe {
1002 let f: &F = &*(f as *const F);
1003 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1004 }
1005 }
1006 unsafe {
1007 let f: Box<F> = Box::new(f);
1008 glib::signal::connect_raw(
1009 self.as_ptr() as *mut _,
1010 b"notify::max-lateness\0".as_ptr() as *const _,
1011 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1012 notify_max_lateness_trampoline::<F> as *const (),
1013 )),
1014 Box::into_raw(f),
1015 )
1016 }
1017 }
1018
1019 #[cfg(feature = "v1_16")]
1020 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1021 #[doc(alias = "processing-deadline")]
1022 pub fn connect_processing_deadline_notify<F: Fn(&Self) + Send + Sync + 'static>(
1023 &self,
1024 f: F,
1025 ) -> glib::SignalHandlerId {
1026 unsafe extern "C" fn notify_processing_deadline_trampoline<
1027 F: Fn(&AppSink) + Send + Sync + 'static,
1028 >(
1029 this: *mut ffi::GstAppSink,
1030 _param_spec: glib::ffi::gpointer,
1031 f: glib::ffi::gpointer,
1032 ) {
1033 unsafe {
1034 let f: &F = &*(f as *const F);
1035 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1036 }
1037 }
1038 unsafe {
1039 let f: Box<F> = Box::new(f);
1040 glib::signal::connect_raw(
1041 self.as_ptr() as *mut _,
1042 b"notify::processing-deadline\0".as_ptr() as *const _,
1043 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1044 notify_processing_deadline_trampoline::<F> as *const (),
1045 )),
1046 Box::into_raw(f),
1047 )
1048 }
1049 }
1050
1051 #[doc(alias = "qos")]
1052 pub fn connect_qos_notify<F: Fn(&Self) + Send + Sync + 'static>(
1053 &self,
1054 f: F,
1055 ) -> glib::SignalHandlerId {
1056 unsafe extern "C" fn notify_qos_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1057 this: *mut ffi::GstAppSink,
1058 _param_spec: glib::ffi::gpointer,
1059 f: glib::ffi::gpointer,
1060 ) {
1061 unsafe {
1062 let f: &F = &*(f as *const F);
1063 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1064 }
1065 }
1066 unsafe {
1067 let f: Box<F> = Box::new(f);
1068 glib::signal::connect_raw(
1069 self.as_ptr() as *mut _,
1070 b"notify::qos\0".as_ptr() as *const _,
1071 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1072 notify_qos_trampoline::<F> as *const (),
1073 )),
1074 Box::into_raw(f),
1075 )
1076 }
1077 }
1078
1079 #[doc(alias = "render-delay")]
1080 pub fn connect_render_delay_notify<F: Fn(&Self) + Send + Sync + 'static>(
1081 &self,
1082 f: F,
1083 ) -> glib::SignalHandlerId {
1084 unsafe extern "C" fn notify_render_delay_trampoline<
1085 F: Fn(&AppSink) + Send + Sync + 'static,
1086 >(
1087 this: *mut ffi::GstAppSink,
1088 _param_spec: glib::ffi::gpointer,
1089 f: glib::ffi::gpointer,
1090 ) {
1091 unsafe {
1092 let f: &F = &*(f as *const F);
1093 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1094 }
1095 }
1096 unsafe {
1097 let f: Box<F> = Box::new(f);
1098 glib::signal::connect_raw(
1099 self.as_ptr() as *mut _,
1100 b"notify::render-delay\0".as_ptr() as *const _,
1101 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1102 notify_render_delay_trampoline::<F> as *const (),
1103 )),
1104 Box::into_raw(f),
1105 )
1106 }
1107 }
1108
1109 #[cfg(feature = "v1_18")]
1110 #[cfg_attr(docsrs, doc(cfg(feature = "v1_18")))]
1111 #[doc(alias = "stats")]
1112 pub fn connect_stats_notify<F: Fn(&Self) + Send + Sync + 'static>(
1113 &self,
1114 f: F,
1115 ) -> glib::SignalHandlerId {
1116 unsafe extern "C" fn notify_stats_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1117 this: *mut ffi::GstAppSink,
1118 _param_spec: glib::ffi::gpointer,
1119 f: glib::ffi::gpointer,
1120 ) {
1121 unsafe {
1122 let f: &F = &*(f as *const F);
1123 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1124 }
1125 }
1126 unsafe {
1127 let f: Box<F> = Box::new(f);
1128 glib::signal::connect_raw(
1129 self.as_ptr() as *mut _,
1130 b"notify::stats\0".as_ptr() as *const _,
1131 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1132 notify_stats_trampoline::<F> as *const (),
1133 )),
1134 Box::into_raw(f),
1135 )
1136 }
1137 }
1138
1139 #[doc(alias = "sync")]
1140 pub fn connect_sync_notify<F: Fn(&Self) + Send + Sync + 'static>(
1141 &self,
1142 f: F,
1143 ) -> glib::SignalHandlerId {
1144 unsafe extern "C" fn notify_sync_trampoline<F: Fn(&AppSink) + Send + Sync + 'static>(
1145 this: *mut ffi::GstAppSink,
1146 _param_spec: glib::ffi::gpointer,
1147 f: glib::ffi::gpointer,
1148 ) {
1149 unsafe {
1150 let f: &F = &*(f as *const F);
1151 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1152 }
1153 }
1154 unsafe {
1155 let f: Box<F> = Box::new(f);
1156 glib::signal::connect_raw(
1157 self.as_ptr() as *mut _,
1158 b"notify::sync\0".as_ptr() as *const _,
1159 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1160 notify_sync_trampoline::<F> as *const (),
1161 )),
1162 Box::into_raw(f),
1163 )
1164 }
1165 }
1166
1167 #[doc(alias = "throttle-time")]
1168 pub fn connect_throttle_time_notify<F: Fn(&Self) + Send + Sync + 'static>(
1169 &self,
1170 f: F,
1171 ) -> glib::SignalHandlerId {
1172 unsafe extern "C" fn notify_throttle_time_trampoline<
1173 F: Fn(&AppSink) + Send + Sync + 'static,
1174 >(
1175 this: *mut ffi::GstAppSink,
1176 _param_spec: glib::ffi::gpointer,
1177 f: glib::ffi::gpointer,
1178 ) {
1179 unsafe {
1180 let f: &F = &*(f as *const F);
1181 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1182 }
1183 }
1184 unsafe {
1185 let f: Box<F> = Box::new(f);
1186 glib::signal::connect_raw(
1187 self.as_ptr() as *mut _,
1188 b"notify::throttle-time\0".as_ptr() as *const _,
1189 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1190 notify_throttle_time_trampoline::<F> as *const (),
1191 )),
1192 Box::into_raw(f),
1193 )
1194 }
1195 }
1196
1197 #[doc(alias = "ts-offset")]
1198 pub fn connect_ts_offset_notify<F: Fn(&Self) + Send + Sync + 'static>(
1199 &self,
1200 f: F,
1201 ) -> glib::SignalHandlerId {
1202 unsafe extern "C" fn notify_ts_offset_trampoline<
1203 F: Fn(&AppSink) + Send + Sync + 'static,
1204 >(
1205 this: *mut ffi::GstAppSink,
1206 _param_spec: glib::ffi::gpointer,
1207 f: glib::ffi::gpointer,
1208 ) {
1209 unsafe {
1210 let f: &F = &*(f as *const F);
1211 f(AppSink::from_glib_borrow(this).unsafe_cast_ref())
1212 }
1213 }
1214 unsafe {
1215 let f: Box<F> = Box::new(f);
1216 glib::signal::connect_raw(
1217 self.as_ptr() as *mut _,
1218 b"notify::ts-offset\0".as_ptr() as *const _,
1219 Some(mem::transmute::<*const (), unsafe extern "C" fn()>(
1220 notify_ts_offset_trampoline::<F> as *const (),
1221 )),
1222 Box::into_raw(f),
1223 )
1224 }
1225 }
1226
1227 pub fn stream(&self) -> AppSinkStream {
1228 AppSinkStream::new(self)
1229 }
1230}
1231
1232#[must_use = "The builder must be built to be used"]
1237pub struct AppSinkBuilder<'a> {
1238 builder: gst::gobject::GObjectBuilder<'a, AppSink>,
1239 callbacks: Option<AppSinkCallbacks>,
1240 drop_out_of_segment: Option<bool>,
1241}
1242
1243impl<'a> AppSinkBuilder<'a> {
1244 #[must_use = "Building the object from the builder is usually expensive and is not expected to have side effects"]
1252 pub fn build(self) -> AppSink {
1253 let appsink = self.builder.build().unwrap();
1254
1255 if let Some(callbacks) = self.callbacks {
1256 appsink.set_callbacks(callbacks);
1257 }
1258
1259 if let Some(drop_out_of_segment) = self.drop_out_of_segment {
1260 appsink.set_drop_out_of_segment(drop_out_of_segment);
1261 }
1262
1263 appsink
1264 }
1265
1266 pub fn async_(self, async_: bool) -> Self {
1267 Self {
1268 builder: self.builder.property("async", async_),
1269 ..self
1270 }
1271 }
1272
1273 pub fn buffer_list(self, buffer_list: bool) -> Self {
1274 Self {
1275 builder: self.builder.property("buffer-list", buffer_list),
1276 ..self
1277 }
1278 }
1279
1280 pub fn callbacks(self, callbacks: AppSinkCallbacks) -> Self {
1281 Self {
1282 callbacks: Some(callbacks),
1283 ..self
1284 }
1285 }
1286
1287 pub fn caps(self, caps: &'a gst::Caps) -> Self {
1288 Self {
1289 builder: self.builder.property("caps", caps),
1290 ..self
1291 }
1292 }
1293
1294 #[cfg_attr(feature = "v1_28", deprecated = "Since 1.28")]
1295 #[allow(deprecated)]
1296 pub fn drop(self, drop: bool) -> Self {
1297 Self {
1298 builder: self.builder.property("drop", drop),
1299 ..self
1300 }
1301 }
1302
1303 pub fn drop_out_of_segment(self, drop_out_of_segment: bool) -> Self {
1304 Self {
1305 drop_out_of_segment: Some(drop_out_of_segment),
1306 ..self
1307 }
1308 }
1309
1310 pub fn enable_last_sample(self, enable_last_sample: bool) -> Self {
1311 Self {
1312 builder: self
1313 .builder
1314 .property("enable-last-sample", enable_last_sample),
1315 ..self
1316 }
1317 }
1318
1319 pub fn max_bitrate(self, max_bitrate: u64) -> Self {
1320 Self {
1321 builder: self.builder.property("max-bitrate", max_bitrate),
1322 ..self
1323 }
1324 }
1325
1326 pub fn max_buffers(self, max_buffers: u32) -> Self {
1327 Self {
1328 builder: self.builder.property("max-buffers", max_buffers),
1329 ..self
1330 }
1331 }
1332
1333 pub fn max_lateness(self, max_lateness: i64) -> Self {
1334 Self {
1335 builder: self.builder.property("max-lateness", max_lateness),
1336 ..self
1337 }
1338 }
1339
1340 #[cfg(feature = "v1_16")]
1341 #[cfg_attr(docsrs, doc(cfg(feature = "v1_16")))]
1342 pub fn processing_deadline(self, processing_deadline: gst::ClockTime) -> Self {
1343 Self {
1344 builder: self
1345 .builder
1346 .property("processing-deadline", processing_deadline),
1347 ..self
1348 }
1349 }
1350
1351 pub fn qos(self, qos: bool) -> Self {
1352 Self {
1353 builder: self.builder.property("qos", qos),
1354 ..self
1355 }
1356 }
1357
1358 pub fn render_delay(self, render_delay: Option<gst::ClockTime>) -> Self {
1359 Self {
1360 builder: self.builder.property("render-delay", render_delay),
1361 ..self
1362 }
1363 }
1364
1365 pub fn sync(self, sync: bool) -> Self {
1366 Self {
1367 builder: self.builder.property("sync", sync),
1368 ..self
1369 }
1370 }
1371
1372 pub fn throttle_time(self, throttle_time: u64) -> Self {
1373 Self {
1374 builder: self.builder.property("throttle-time", throttle_time),
1375 ..self
1376 }
1377 }
1378
1379 pub fn ts_offset(self, ts_offset: gst::ClockTimeDiff) -> Self {
1380 Self {
1381 builder: self.builder.property("ts-offset", ts_offset),
1382 ..self
1383 }
1384 }
1385
1386 pub fn wait_on_eos(self, wait_on_eos: bool) -> Self {
1387 Self {
1388 builder: self.builder.property("wait-on-eos", wait_on_eos),
1389 ..self
1390 }
1391 }
1392
1393 #[cfg(feature = "v1_24")]
1394 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1395 pub fn max_time(self, max_time: Option<gst::ClockTime>) -> Self {
1396 Self {
1397 builder: self.builder.property("max-time", max_time),
1398 ..self
1399 }
1400 }
1401
1402 #[cfg(feature = "v1_24")]
1403 #[cfg_attr(docsrs, doc(cfg(feature = "v1_24")))]
1404 pub fn max_bytes(self, max_bytes: u64) -> Self {
1405 Self {
1406 builder: self.builder.property("max-bytes", max_bytes),
1407 ..self
1408 }
1409 }
1410
1411 #[cfg(feature = "v1_28")]
1412 #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1413 pub fn leaky_type(self, leaky_type: crate::AppLeakyType) -> Self {
1414 Self {
1415 builder: self.builder.property("leaky-type", leaky_type),
1416 ..self
1417 }
1418 }
1419
1420 #[cfg(feature = "v1_28")]
1421 #[cfg_attr(docsrs, doc(cfg(feature = "v1_28")))]
1422 pub fn silent(self, silent: bool) -> Self {
1423 Self {
1424 builder: self.builder.property("silent", silent),
1425 ..self
1426 }
1427 }
1428
1429 #[inline]
1434 pub fn property(self, name: &'a str, value: impl Into<glib::Value> + 'a) -> Self {
1435 Self {
1436 builder: self.builder.property(name, value),
1437 ..self
1438 }
1439 }
1440
1441 #[inline]
1444 pub fn property_from_str(self, name: &'a str, value: &'a str) -> Self {
1445 Self {
1446 builder: self.builder.property_from_str(name, value),
1447 ..self
1448 }
1449 }
1450
1451 gst::impl_builder_gvalue_extra_setters!(property_and_name);
1452}
1453
1454#[derive(Debug)]
1455pub struct AppSinkStream {
1456 app_sink: glib::WeakRef<AppSink>,
1457 waker_reference: Arc<Mutex<Option<Waker>>>,
1458}
1459
1460impl AppSinkStream {
1461 fn new(app_sink: &AppSink) -> Self {
1462 skip_assert_initialized!();
1463
1464 let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
1465
1466 app_sink.set_callbacks(
1467 AppSinkCallbacks::builder()
1468 .new_sample({
1469 let waker_reference = Arc::clone(&waker_reference);
1470
1471 move |_| {
1472 if let Some(waker) = waker_reference.lock().unwrap().take() {
1473 waker.wake();
1474 }
1475
1476 Ok(gst::FlowSuccess::Ok)
1477 }
1478 })
1479 .eos({
1480 let waker_reference = Arc::clone(&waker_reference);
1481
1482 move |_| {
1483 if let Some(waker) = waker_reference.lock().unwrap().take() {
1484 waker.wake();
1485 }
1486 }
1487 })
1488 .build(),
1489 );
1490
1491 Self {
1492 app_sink: app_sink.downgrade(),
1493 waker_reference,
1494 }
1495 }
1496}
1497
1498impl Drop for AppSinkStream {
1499 fn drop(&mut self) {
1500 #[cfg(not(feature = "v1_18"))]
1501 {
1502 if gst::version() >= (1, 16, 3, 0)
1505 && let Some(app_sink) = self.app_sink.upgrade()
1506 {
1507 app_sink.set_callbacks(AppSinkCallbacks::builder().build());
1508 }
1509 }
1510 }
1511}
1512
1513impl Stream for AppSinkStream {
1514 type Item = gst::Sample;
1515
1516 fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
1517 let mut waker = self.waker_reference.lock().unwrap();
1518
1519 let Some(app_sink) = self.app_sink.upgrade() else {
1520 return Poll::Ready(None);
1521 };
1522
1523 app_sink
1524 .try_pull_sample(gst::ClockTime::ZERO)
1525 .map(|sample| Poll::Ready(Some(sample)))
1526 .unwrap_or_else(|| {
1527 if app_sink.is_eos() {
1528 return Poll::Ready(None);
1529 }
1530
1531 waker.replace(context.waker().to_owned());
1532
1533 Poll::Pending
1534 })
1535 }
1536}
1537
1538#[cfg(test)]
1539mod tests {
1540 use futures_util::StreamExt;
1541 use gst::prelude::*;
1542
1543 use super::*;
1544
1545 #[test]
1546 fn test_app_sink_stream() {
1547 gst::init().unwrap();
1548
1549 let videotestsrc = gst::ElementFactory::make("videotestsrc")
1550 .property("num-buffers", 5)
1551 .build()
1552 .unwrap();
1553 let appsink = gst::ElementFactory::make("appsink").build().unwrap();
1554
1555 let pipeline = gst::Pipeline::new();
1556 pipeline.add(&videotestsrc).unwrap();
1557 pipeline.add(&appsink).unwrap();
1558
1559 videotestsrc.link(&appsink).unwrap();
1560
1561 let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
1562 let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
1563
1564 pipeline.set_state(gst::State::Playing).unwrap();
1565 let samples = futures_executor::block_on(samples_future);
1566 pipeline.set_state(gst::State::Null).unwrap();
1567
1568 assert_eq!(samples.len(), 5);
1569 }
1570}