Skip to main content

gstthreadshare/runtime/
pad.rs

1// Copyright (C) 2019-2022 François Laignel <fengalin@free.fr>
2// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
3//
4// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
5// If a copy of the MPL was not distributed with this file, You can obtain one at
6// <https://mozilla.org/MPL/2.0/>.
7//
8// SPDX-License-Identifier: MPL-2.0
9
10//! An implementation of `Pad`s to run asynchronous processings.
11//!
12//! [`PadSink`] & [`PadSrc`] provide an asynchronous API to ease the development of `Element`s in
13//! the `threadshare` GStreamer plugins framework.
14//!
15//! The diagram below shows how the [`PadSrc`] & [`PadSink`] and the related `struct`s integrate in
16//! `ts` `Element`s.
17//!
18//! Note: [`PadSrc`] & [`PadSink`] only support `gst::PadMode::Push` at the moment.
19//!
20//! ```text
21//!    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓          ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━
22//!                    Element A               ┃          ┃          Element B
23//!                                            ┃          ┃
24//!                 ╭─────────────────╮        ┃          ┃       ╭──────────────────╮
25//!                 │     PadSrc      │        ┃          ┃       │      PadSink     │
26//!                 │     Handler     │        ┃          ┃       │      Handler     │
27//!                 │─────────────────│        ┃          ┃       │──────────────────│
28//!                 │ - src_activate* │     ╭──┸──╮    ╭──┸──╮    │ - sink_activate* │
29//!                 │ - src_event*    │<────│     │<╌╌╌│     │───>│ - sink_chain*    │
30//!                 │ - src_query     │<────│ gst │    │ gst │───>│ - sink_event*    │
31//!                 ╰─────────────────╯     │     │    │     │───>│ - sink_query     │
32//!                          │              │ Pad │    │ Pad │    ╰──────────────────╯
33//!                          │           ╭─>│     │╌╌╌>│     │─╮            │
34//!                          │           │  ╰──┰──╯    ╰──┰──╯ ╰───────╮    │
35//!      ╭─────────╮    ╭────────╮ push* │     ┃          ┃          ╭─────────╮
36//!      │ Task ↺  │───>│ PadSrc │───────╯     ┃          ┃          │ PadSink │
37//!      ╰─────────╯    ╰────────╯             ┃          ┃          ╰─────────╯
38//!    ━━━━━━━│━━━━━━━━━━━━━━│━━━━━━━━━━━━━━━━━┛          ┗━━━━━━━━━━━━━━━│━━━━━━━━━━━━
39//!           ╰──────────────┴───────────────────╮      ╭─────────────────╯
40//!                                           ╭────────────╮
41//!                                           │  Context ↺ │
42//!                                           ╰────────────╯
43//! ```
44//!
45//! Asynchronous operations for both [`PadSrc`] in `Element A` and [`PadSink`] in `Element B` run on
46//!  the same [`Context`], which can also be shared by other `Element`s or instances of the same
47//! `Element`s in multiple `Pipeline`s.
48//!
49//! `Element A` & `Element B` can also be linked to non-threadshare `Element`s in which case, they
50//! operate in a regular synchronous way.
51//!
52//! Note that only operations on the streaming thread (serialized events, buffers, serialized
53//! queries) are handled from the `PadContext` and asynchronously, everything else operates
54//! blocking.
55//!
56//! [`PadSink`]: struct.PadSink.html
57//! [`PadSrc`]: struct.PadSrc.html
58//! [`Context`]: ../executor/struct.Context.html
59
60use futures::future;
61use futures::prelude::*;
62
63use gst::prelude::*;
64use gst::subclass::prelude::*;
65use gst::{FlowError, FlowSuccess};
66
67use std::marker::PhantomData;
68use std::ops::Deref;
69use std::sync::{Arc, Weak};
70
71use super::RUNTIME_CAT;
72use super::executor::{self, Context};
73
74#[inline]
75fn event_ret_to_event_full_res(
76    ret: bool,
77    event_type: gst::EventType,
78) -> Result<FlowSuccess, FlowError> {
79    if ret {
80        Ok(FlowSuccess::Ok)
81    } else if event_type == gst::EventType::Caps {
82        Err(FlowError::NotNegotiated)
83    } else {
84        Err(FlowError::Error)
85    }
86}
87
88#[inline]
89fn event_to_event_full(ret: bool, event_type: gst::EventType) -> Result<FlowSuccess, FlowError> {
90    event_ret_to_event_full_res(ret, event_type)
91}
92
93#[inline]
94fn event_to_event_full_serialized(
95    ret: impl Future<Output = bool> + Send,
96    event_type: gst::EventType,
97) -> impl Future<Output = Result<FlowSuccess, FlowError>> + Send {
98    ret.map(move |ret| event_ret_to_event_full_res(ret, event_type))
99}
100
101/// A trait to define `handler`s for [`PadSrc`] callbacks.
102///
103/// *See the [`pad` module] documentation for a description of the model.*
104///
105/// [`PadSrc`]: struct.PadSrc.html
106/// [`pad` module]: index.html
107pub trait PadSrcHandler: Clone + Send + Sync + 'static {
108    type ElementImpl: ElementImpl<Type: Send>;
109
110    fn src_activate(
111        self,
112        pad: &gst::Pad,
113        _imp: &Self::ElementImpl,
114    ) -> Result<(), gst::LoggableError> {
115        if pad.is_active() {
116            gst::debug!(
117                RUNTIME_CAT,
118                obj = pad,
119                "Already activated in {:?} mode ",
120                pad.mode()
121            );
122            return Ok(());
123        }
124
125        pad.activate_mode(gst::PadMode::Push, true).map_err(|err| {
126            gst::error!(
127                RUNTIME_CAT,
128                obj = pad,
129                "Error in PadSrc activate: {:?}",
130                err
131            );
132            gst::loggable_error!(RUNTIME_CAT, "Error in PadSrc activate: {:?}", err)
133        })
134    }
135
136    fn src_activatemode(
137        self,
138        _pad: &gst::Pad,
139        _imp: &Self::ElementImpl,
140        _mode: gst::PadMode,
141        _active: bool,
142    ) -> Result<(), gst::LoggableError> {
143        Ok(())
144    }
145
146    fn src_event(self, pad: &gst::Pad, imp: &Self::ElementImpl, event: gst::Event) -> bool {
147        gst::log!(RUNTIME_CAT, obj = pad, "Handling {:?}", event);
148        gst::Pad::event_default(pad, Some(imp.obj().as_ref()), event)
149    }
150
151    fn src_event_full(
152        self,
153        pad: &gst::Pad,
154        imp: &Self::ElementImpl,
155        event: gst::Event,
156    ) -> Result<FlowSuccess, FlowError> {
157        // default is to dispatch to `src_event`
158        // (as implemented in `gst_pad_send_event_unchecked`)
159        let event_type = event.type_();
160        event_to_event_full(self.src_event(pad, imp, event), event_type)
161    }
162
163    fn src_query(self, pad: &gst::Pad, imp: &Self::ElementImpl, query: &mut gst::QueryRef) -> bool {
164        gst::log!(RUNTIME_CAT, obj = pad, "Handling {:?}", query);
165        if query.is_serialized() {
166            // FIXME serialized queries should be handled with the dataflow
167            // but we can't return a `Future` because we couldn't honor QueryRef's lifetime
168            false
169        } else {
170            gst::log!(RUNTIME_CAT, obj = pad, "Handling {:?}", query);
171            gst::Pad::query_default(pad, Some(imp.obj().as_ref()), query)
172        }
173    }
174}
175
176#[derive(Debug)]
177pub struct PadSrcInner {
178    gst_pad: gst::Pad,
179}
180
181impl PadSrcInner {
182    fn new(gst_pad: gst::Pad) -> Self {
183        if gst_pad.direction() != gst::PadDirection::Src {
184            panic!("Wrong pad direction for PadSrc");
185        }
186
187        PadSrcInner { gst_pad }
188    }
189
190    pub fn gst_pad(&self) -> &gst::Pad {
191        &self.gst_pad
192    }
193
194    pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
195        gst::log!(RUNTIME_CAT, obj = self.gst_pad, "Pushing {buffer:?}");
196
197        let success = self.gst_pad.push(buffer).inspect_err(|&err| {
198            if err == gst::FlowError::Flushing {
199                gst::debug!(
200                    RUNTIME_CAT,
201                    obj = self.gst_pad,
202                    "Failed to push Buffer to PadSrc: Flushing"
203                );
204            } else {
205                gst::error!(
206                    RUNTIME_CAT,
207                    obj = self.gst_pad,
208                    "Failed to push Buffer to PadSrc: {err:?}"
209                );
210            }
211        })?;
212
213        gst::log!(
214            RUNTIME_CAT,
215            obj = self.gst_pad,
216            "Processing any pending sub tasks"
217        );
218        Context::drain_sub_tasks().await?;
219
220        Ok(success)
221    }
222
223    pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
224        gst::log!(RUNTIME_CAT, obj = self.gst_pad, "Pushing {:?}", list);
225
226        let success = self.gst_pad.push_list(list).inspect_err(|&err| {
227            if err == gst::FlowError::Flushing {
228                gst::debug!(
229                    RUNTIME_CAT,
230                    obj = self.gst_pad,
231                    "Failed to push BufferList to PadSrc: Flushing"
232                );
233            } else {
234                gst::error!(
235                    RUNTIME_CAT,
236                    obj = self.gst_pad,
237                    "Failed to push BufferList to PadSrc: {err:?}"
238                );
239            }
240        })?;
241
242        gst::log!(
243            RUNTIME_CAT,
244            obj = self.gst_pad,
245            "Processing any pending sub tasks"
246        );
247        Context::drain_sub_tasks().await?;
248
249        Ok(success)
250    }
251
252    pub async fn push_event(&self, event: gst::Event) -> bool {
253        gst::log!(RUNTIME_CAT, obj = self.gst_pad, "Pushing {:?}", event);
254
255        let was_handled = self.gst_pad.push_event(event);
256
257        gst::log!(
258            RUNTIME_CAT,
259            obj = self.gst_pad,
260            "Processing any pending sub tasks"
261        );
262        if Context::drain_sub_tasks().await.is_err() {
263            return false;
264        }
265
266        was_handled
267    }
268}
269
270/// A [`PadSrc`] which can be moved in [`handler`]s functions and `Future`s.
271///
272/// Call [`upgrade`] to use the [`PadSrc`].
273///
274/// *See the [`pad` module] documentation for a description of the model.*
275///
276/// [`PadSrc`]: struct.PadSrc.html
277/// [`handler`]: trait.PadSrcHandler.html
278/// [`upgrade`]: struct.PadSrcWeak.html#method.upgrade
279/// [`pad` module]: index.html
280#[derive(Clone, Debug)]
281pub struct PadSrcWeak(Weak<PadSrcInner>);
282
283impl PadSrcWeak {
284    pub fn upgrade(&self) -> Option<PadSrcRef<'_>> {
285        self.0.upgrade().map(PadSrcRef::new)
286    }
287}
288
289/// A [`PadSrc`] to be used in `Handler`s functions and `Future`s.
290///
291/// Call [`downgrade`] if you need to `clone` the [`PadSrc`].
292///
293/// *See the [`pad` module] documentation for a description of the model.*
294///
295/// [`PadSrc`]: struct.PadSrc.html
296/// [`PadSrcWeak`]: struct.PadSrcWeak.html
297/// [`downgrade`]: struct.PadSrcRef.html#method.downgrade
298/// [`pad` module]: index.html
299#[derive(Debug)]
300pub struct PadSrcRef<'a> {
301    strong: Arc<PadSrcInner>,
302    phantom: PhantomData<&'a Self>,
303}
304
305impl PadSrcRef<'_> {
306    fn new(inner_arc: Arc<PadSrcInner>) -> Self {
307        PadSrcRef {
308            strong: inner_arc,
309            phantom: PhantomData,
310        }
311    }
312
313    pub fn downgrade(&self) -> PadSrcWeak {
314        PadSrcWeak(Arc::downgrade(&self.strong))
315    }
316}
317
318impl Deref for PadSrcRef<'_> {
319    type Target = PadSrcInner;
320
321    fn deref(&self) -> &Self::Target {
322        &self.strong
323    }
324}
325
326/// The `PadSrc` which `Element`s must own.
327///
328/// Call [`downgrade`] if you need to `clone` the `PadSrc`.
329///
330/// *See the [`pad` module] documentation for a description of the model.*
331///
332/// [`downgrade`]: struct.PadSrc.html#method.downgrade
333/// [`pad` module]: index.html
334#[derive(Debug)]
335pub struct PadSrc(Arc<PadSrcInner>);
336
337impl PadSrc {
338    pub fn new(gst_pad: gst::Pad, handler: impl PadSrcHandler) -> Self {
339        let this = PadSrc(Arc::new(PadSrcInner::new(gst_pad)));
340        this.init_pad_functions(handler);
341
342        this
343    }
344
345    pub fn downgrade(&self) -> PadSrcWeak {
346        PadSrcWeak(Arc::downgrade(&self.0))
347    }
348
349    pub fn as_ref(&self) -> PadSrcRef<'_> {
350        PadSrcRef::new(Arc::clone(&self.0))
351    }
352
353    pub fn check_reconfigure(&self) -> bool {
354        self.0.gst_pad.check_reconfigure()
355    }
356
357    fn init_pad_functions<H: PadSrcHandler>(&self, handler: H) {
358        // FIXME: Do this better
359        unsafe {
360            let handler_clone = handler.clone();
361            self.0
362                .gst_pad
363                .set_activate_function(move |gst_pad, parent| {
364                    let handler = handler_clone.clone();
365                    H::ElementImpl::catch_panic_pad_function(
366                        parent,
367                        || {
368                            gst::error!(RUNTIME_CAT, obj = gst_pad, "Panic in PadSrc activate");
369                            Err(gst::loggable_error!(
370                                RUNTIME_CAT,
371                                "Panic in PadSrc activate"
372                            ))
373                        },
374                        move |imp| H::src_activate(handler, gst_pad, imp),
375                    )
376                });
377
378            let handler_clone = handler.clone();
379            self.0
380                .gst_pad
381                .set_activatemode_function(move |gst_pad, parent, mode, active| {
382                    let handler = handler_clone.clone();
383                    H::ElementImpl::catch_panic_pad_function(
384                        parent,
385                        || {
386                            gst::error!(RUNTIME_CAT, obj = gst_pad, "Panic in PadSrc activatemode");
387                            Err(gst::loggable_error!(
388                                RUNTIME_CAT,
389                                "Panic in PadSrc activatemode"
390                            ))
391                        },
392                        move |imp| {
393                            gst::log!(
394                                RUNTIME_CAT,
395                                obj = gst_pad,
396                                "ActivateMode {:?}, {}",
397                                mode,
398                                active
399                            );
400
401                            if mode == gst::PadMode::Pull {
402                                gst::error!(
403                                    RUNTIME_CAT,
404                                    obj = gst_pad,
405                                    "Pull mode not supported by PadSrc"
406                                );
407                                return Err(gst::loggable_error!(
408                                    RUNTIME_CAT,
409                                    "Pull mode not supported by PadSrc"
410                                ));
411                            }
412
413                            H::src_activatemode(handler, gst_pad, imp, mode, active)
414                        },
415                    )
416                });
417
418            // No need to `set_event_function` since `set_event_full_function`
419            // overrides it and dispatches to `src_event` when necessary
420            let handler_clone = handler.clone();
421            self.0
422                .gst_pad
423                .set_event_full_function(move |gst_pad, parent, event| {
424                    let handler = handler_clone.clone();
425                    H::ElementImpl::catch_panic_pad_function(
426                        parent,
427                        || Err(FlowError::Error),
428                        move |imp| H::src_event_full(handler, gst_pad, imp, event),
429                    )
430                });
431
432            self.0
433                .gst_pad
434                .set_query_function(move |gst_pad, parent, query| {
435                    let handler = handler.clone();
436                    H::ElementImpl::catch_panic_pad_function(
437                        parent,
438                        || false,
439                        move |imp| {
440                            if !query.is_serialized() {
441                                H::src_query(handler, gst_pad, imp, query)
442                            } else {
443                                gst::fixme!(
444                                    RUNTIME_CAT,
445                                    obj = gst_pad,
446                                    "Serialized Query not supported"
447                                );
448                                false
449                            }
450                        },
451                    )
452                });
453        }
454    }
455}
456
457impl Drop for PadSrc {
458    fn drop(&mut self) {
459        // FIXME: Do this better
460        unsafe {
461            self.0
462                .gst_pad
463                .set_activate_function(move |_gst_pad, _parent| {
464                    Err(gst::loggable_error!(RUNTIME_CAT, "PadSrc no longer exists"))
465                });
466            self.0
467                .gst_pad
468                .set_activatemode_function(move |_gst_pad, _parent, _mode, _active| {
469                    Err(gst::loggable_error!(RUNTIME_CAT, "PadSrc no longer exists"))
470                });
471            self.0
472                .gst_pad
473                .set_event_function(move |_gst_pad, _parent, _event| false);
474            self.0
475                .gst_pad
476                .set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing));
477            self.0
478                .gst_pad
479                .set_query_function(move |_gst_pad, _parent, _query| false);
480        }
481    }
482}
483
484impl Deref for PadSrc {
485    type Target = PadSrcInner;
486
487    fn deref(&self) -> &Self::Target {
488        &self.0
489    }
490}
491
492/// A trait to define `handler`s for [`PadSink`] callbacks.
493///
494/// *See the [`pad` module] documentation for a description of the model.*
495///
496/// [`PadSink`]: struct.PadSink.html
497/// [`pad` module]: index.html
498pub trait PadSinkHandler: Clone + Send + Sync + 'static {
499    type ElementImpl: ElementImpl<Type: Send>;
500
501    fn sink_activate(
502        self,
503        pad: &gst::Pad,
504        _imp: &Self::ElementImpl,
505    ) -> Result<(), gst::LoggableError> {
506        if pad.is_active() {
507            gst::debug!(
508                RUNTIME_CAT,
509                obj = pad,
510                "Already activated in {:?} mode ",
511                pad.mode()
512            );
513            return Ok(());
514        }
515
516        pad.activate_mode(gst::PadMode::Push, true).map_err(|err| {
517            gst::error!(
518                RUNTIME_CAT,
519                obj = pad,
520                "Error in PadSink activate: {:?}",
521                err
522            );
523            gst::loggable_error!(RUNTIME_CAT, "Error in PadSink activate: {:?}", err)
524        })
525    }
526
527    fn sink_activatemode(
528        self,
529        _pad: &gst::Pad,
530        _imp: &Self::ElementImpl,
531        _mode: gst::PadMode,
532        _active: bool,
533    ) -> Result<(), gst::LoggableError> {
534        Ok(())
535    }
536
537    fn sink_chain(
538        self,
539        _pad: gst::Pad,
540        _elem: <Self::ElementImpl as ObjectSubclass>::Type,
541        _buffer: gst::Buffer,
542    ) -> impl Future<Output = Result<FlowSuccess, FlowError>> + Send {
543        future::err(FlowError::NotSupported)
544    }
545
546    fn sink_chain_list(
547        self,
548        _pad: gst::Pad,
549        _elem: <Self::ElementImpl as ObjectSubclass>::Type,
550        _buffer_list: gst::BufferList,
551    ) -> impl Future<Output = Result<FlowSuccess, FlowError>> + Send {
552        future::err(FlowError::NotSupported)
553    }
554
555    fn sink_event(self, pad: &gst::Pad, imp: &Self::ElementImpl, event: gst::Event) -> bool {
556        assert!(!event.is_serialized());
557        gst::log!(RUNTIME_CAT, obj = pad, "Handling {:?}", event);
558        gst::Pad::event_default(pad, Some(imp.obj().as_ref()), event)
559    }
560
561    fn sink_event_serialized(
562        self,
563        pad: gst::Pad,
564        elem: <Self::ElementImpl as ObjectSubclass>::Type,
565        event: gst::Event,
566    ) -> impl Future<Output = bool> + Send {
567        assert!(event.is_serialized());
568
569        async move {
570            gst::log!(RUNTIME_CAT, obj = pad, "Handling {:?}", event);
571            gst::Pad::event_default(&pad, Some(&elem), event)
572        }
573    }
574
575    fn sink_event_full(
576        self,
577        pad: &gst::Pad,
578        imp: &Self::ElementImpl,
579        event: gst::Event,
580    ) -> Result<FlowSuccess, FlowError> {
581        assert!(!event.is_serialized());
582        // default is to dispatch to `sink_event`
583        // (as implemented in `gst_pad_send_event_unchecked`)
584        let event_type = event.type_();
585        event_to_event_full(self.sink_event(pad, imp, event), event_type)
586    }
587
588    fn sink_event_full_serialized(
589        self,
590        pad: gst::Pad,
591        elem: <Self::ElementImpl as ObjectSubclass>::Type,
592        event: gst::Event,
593    ) -> impl Future<Output = Result<FlowSuccess, FlowError>> + Send {
594        assert!(event.is_serialized());
595        // default is to dispatch to `sink_event`
596        // (as implemented in `gst_pad_send_event_unchecked`)
597        let event_type = event.type_();
598        event_to_event_full_serialized(
599            Self::sink_event_serialized(self, pad, elem, event),
600            event_type,
601        )
602    }
603
604    fn sink_query(
605        self,
606        pad: &gst::Pad,
607        imp: &Self::ElementImpl,
608        query: &mut gst::QueryRef,
609    ) -> bool {
610        if query.is_serialized() {
611            gst::log!(RUNTIME_CAT, obj = pad, "Dropping {:?}", query);
612            // FIXME serialized queries should be handled with the dataflow
613            // but we can't return a `Future` because we couldn't honor QueryRef's lifetime
614            false
615        } else {
616            gst::log!(RUNTIME_CAT, obj = pad, "Handling {:?}", query);
617            gst::Pad::query_default(pad, Some(imp.obj().as_ref()), query)
618        }
619    }
620}
621
622#[derive(Debug)]
623pub struct PadSinkInner {
624    gst_pad: gst::Pad,
625}
626
627impl PadSinkInner {
628    fn new(gst_pad: gst::Pad) -> Self {
629        if gst_pad.direction() != gst::PadDirection::Sink {
630            panic!("Wrong pad direction for PadSink");
631        }
632
633        PadSinkInner { gst_pad }
634    }
635
636    pub fn gst_pad(&self) -> &gst::Pad {
637        &self.gst_pad
638    }
639}
640
641/// A [`PadSink`] which can be moved in `Handler`s functions and `Future`s.
642///
643/// Call [`upgrade`] to use the [`PadSink`].
644///
645/// *See the [`pad` module] documentation for a description of the model.*
646///
647/// [`PadSink`]: struct.PadSink.html
648/// [`upgrade`]: struct.PadSinkWeak.html#method.upgrade
649/// [`pad` module]: index.html
650#[derive(Clone, Debug)]
651pub struct PadSinkWeak(Weak<PadSinkInner>);
652
653impl PadSinkWeak {
654    pub fn upgrade(&self) -> Option<PadSinkRef<'_>> {
655        self.0.upgrade().map(PadSinkRef::new)
656    }
657}
658
659/// A [`PadSink`] to be used in [`handler`]s functions and `Future`s.
660///
661/// Call [`downgrade`] if you need to `clone` the [`PadSink`].
662///
663/// *See the [`pad` module] documentation for a description of the model.*
664///
665/// [`PadSink`]: struct.PadSink.html
666/// [`handler`]: trait.PadSinkHandler.html
667/// [`downgrade`]: struct.PadSinkRef.html#method.downgrade
668/// [`pad` module]: index.html
669#[derive(Debug)]
670pub struct PadSinkRef<'a> {
671    strong: Arc<PadSinkInner>,
672    phantom: PhantomData<&'a Self>,
673}
674
675impl PadSinkRef<'_> {
676    fn new(inner_arc: Arc<PadSinkInner>) -> Self {
677        PadSinkRef {
678            strong: inner_arc,
679            phantom: PhantomData,
680        }
681    }
682
683    pub fn downgrade(&self) -> PadSinkWeak {
684        PadSinkWeak(Arc::downgrade(&self.strong))
685    }
686}
687
688impl Deref for PadSinkRef<'_> {
689    type Target = PadSinkInner;
690
691    fn deref(&self) -> &Self::Target {
692        &self.strong
693    }
694}
695
696/// The `PadSink` which `Element`s must own.
697///
698/// Call [`downgrade`] if you need to `clone` the `PadSink`.
699///
700/// *See the [`pad` module] documentation for a description of the model.*
701///
702/// [`downgrade`]: struct.PadSink.html#method.downgrade
703/// [`pad` module]: index.html
704#[derive(Debug)]
705pub struct PadSink(Arc<PadSinkInner>);
706
707impl PadSink {
708    pub fn downgrade(&self) -> PadSinkWeak {
709        PadSinkWeak(Arc::downgrade(&self.0))
710    }
711
712    pub fn as_ref(&self) -> PadSinkRef<'_> {
713        PadSinkRef::new(Arc::clone(&self.0))
714    }
715}
716
717impl PadSink {
718    pub fn new<H>(gst_pad: gst::Pad, handler: H) -> Self
719    where
720        H: PadSinkHandler,
721        <H::ElementImpl as ObjectSubclass>::Type: IsA<gst::Element> + Send,
722    {
723        let this = PadSink(Arc::new(PadSinkInner::new(gst_pad)));
724        this.init_pad_functions(handler);
725
726        this
727    }
728
729    fn init_pad_functions<H>(&self, handler: H)
730    where
731        H: PadSinkHandler,
732        <H::ElementImpl as ObjectSubclass>::Type: IsA<gst::Element> + Send,
733    {
734        unsafe {
735            let handler_clone = handler.clone();
736            self.0
737                .gst_pad
738                .set_activate_function(move |gst_pad, parent| {
739                    let handler = handler_clone.clone();
740                    H::ElementImpl::catch_panic_pad_function(
741                        parent,
742                        || {
743                            gst::error!(RUNTIME_CAT, obj = gst_pad, "Panic in PadSink activate");
744                            Err(gst::loggable_error!(
745                                RUNTIME_CAT,
746                                "Panic in PadSink activate"
747                            ))
748                        },
749                        move |imp| H::sink_activate(handler, gst_pad, imp),
750                    )
751                });
752
753            let handler_clone = handler.clone();
754            self.0
755                .gst_pad
756                .set_activatemode_function(move |gst_pad, parent, mode, active| {
757                    let handler = handler_clone.clone();
758                    H::ElementImpl::catch_panic_pad_function(
759                        parent,
760                        || {
761                            gst::error!(
762                                RUNTIME_CAT,
763                                obj = gst_pad,
764                                "Panic in PadSink activatemode"
765                            );
766                            Err(gst::loggable_error!(
767                                RUNTIME_CAT,
768                                "Panic in PadSink activatemode"
769                            ))
770                        },
771                        move |imp| {
772                            gst::log!(
773                                RUNTIME_CAT,
774                                obj = gst_pad,
775                                "ActivateMode {:?}, {}",
776                                mode,
777                                active
778                            );
779
780                            if mode == gst::PadMode::Pull {
781                                gst::error!(
782                                    RUNTIME_CAT,
783                                    obj = gst_pad,
784                                    "Pull mode not supported by PadSink"
785                                );
786                                return Err(gst::loggable_error!(
787                                    RUNTIME_CAT,
788                                    "Pull mode not supported by PadSink"
789                                ));
790                            }
791
792                            H::sink_activatemode(handler, gst_pad, imp, mode, active)
793                        },
794                    )
795                });
796
797            let handler_clone = handler.clone();
798            self.0
799                .gst_pad
800                .set_chain_function(move |gst_pad, parent, buffer| {
801                    let handler = handler_clone.clone();
802                    H::ElementImpl::catch_panic_pad_function(
803                        parent,
804                        || Err(FlowError::Error),
805                        move |imp| {
806                            let elem = imp.obj().clone();
807                            let gst_pad = gst_pad.clone();
808
809                            match Context::current_task() {
810                                Some((ctx, task_id)) => {
811                                    let delayed_fut = async move {
812                                        H::sink_chain(handler, gst_pad, elem, buffer).await
813                                    };
814                                    let _ = ctx.add_sub_task(
815                                        task_id,
816                                        delayed_fut.map(|res| res.map(drop)),
817                                    );
818
819                                    Ok(gst::FlowSuccess::Ok)
820                                }
821                                _ => {
822                                    let chain_fut = H::sink_chain(handler, gst_pad, elem, buffer);
823                                    executor::block_on(chain_fut)
824                                }
825                            }
826                        },
827                    )
828                });
829
830            let handler_clone = handler.clone();
831            self.0
832                .gst_pad
833                .set_chain_list_function(move |gst_pad, parent, list| {
834                    let handler = handler_clone.clone();
835                    H::ElementImpl::catch_panic_pad_function(
836                        parent,
837                        || Err(FlowError::Error),
838                        move |imp| {
839                            let elem = imp.obj().clone();
840                            let gst_pad = gst_pad.clone();
841
842                            match Context::current_task() {
843                                Some((ctx, task_id)) => {
844                                    let delayed_fut = async move {
845                                        H::sink_chain_list(handler, gst_pad, elem, list).await
846                                    };
847                                    let _ = ctx.add_sub_task(
848                                        task_id,
849                                        delayed_fut.map(|res| res.map(drop)),
850                                    );
851
852                                    Ok(gst::FlowSuccess::Ok)
853                                }
854                                _ => {
855                                    let chain_list_fut =
856                                        H::sink_chain_list(handler, gst_pad, elem, list);
857                                    executor::block_on(chain_list_fut)
858                                }
859                            }
860                        },
861                    )
862                });
863
864            // No need to `set_event_function` since `set_event_full_function`
865            // overrides it and dispatches to `sink_event` when necessary
866            let handler_clone = handler.clone();
867            self.0
868                .gst_pad
869                .set_event_full_function(move |gst_pad, parent, event| {
870                    let handler = handler_clone.clone();
871                    H::ElementImpl::catch_panic_pad_function(
872                        parent,
873                        || Err(FlowError::Error),
874                        move |imp| {
875                            if event.is_serialized() {
876                                let elem = imp.obj().clone();
877                                let gst_pad = gst_pad.clone();
878
879                                match Context::current_task() {
880                                    Some((ctx, task_id)) => {
881                                        let delayed_fut = async move {
882                                            H::sink_event_full_serialized(
883                                                handler, gst_pad, elem, event,
884                                            )
885                                            .await
886                                        };
887                                        let _ = ctx.add_sub_task(
888                                            task_id,
889                                            delayed_fut.map(|res| res.map(drop)),
890                                        );
891
892                                        Ok(gst::FlowSuccess::Ok)
893                                    }
894                                    _ => {
895                                        let event_fut = H::sink_event_full_serialized(
896                                            handler, gst_pad, elem, event,
897                                        );
898                                        executor::block_on(event_fut)
899                                    }
900                                }
901                            } else {
902                                handler.sink_event_full(gst_pad, imp, event)
903                            }
904                        },
905                    )
906                });
907
908            self.0
909                .gst_pad
910                .set_query_function(move |gst_pad, parent, query| {
911                    let handler = handler.clone();
912                    H::ElementImpl::catch_panic_pad_function(
913                        parent,
914                        || false,
915                        move |imp| {
916                            if !query.is_serialized() {
917                                H::sink_query(handler, gst_pad, imp, query)
918                            } else {
919                                gst::fixme!(
920                                    RUNTIME_CAT,
921                                    obj = gst_pad,
922                                    "Serialized Query not supported {query:?}"
923                                );
924                                false
925                            }
926                        },
927                    )
928                });
929        }
930    }
931}
932
933impl Drop for PadSink {
934    fn drop(&mut self) {
935        // FIXME: Do this better
936        unsafe {
937            self.0
938                .gst_pad
939                .set_activate_function(move |_gst_pad, _parent| {
940                    Err(gst::loggable_error!(
941                        RUNTIME_CAT,
942                        "PadSink no longer exists"
943                    ))
944                });
945            self.0
946                .gst_pad
947                .set_activatemode_function(move |_gst_pad, _parent, _mode, _active| {
948                    Err(gst::loggable_error!(
949                        RUNTIME_CAT,
950                        "PadSink no longer exists"
951                    ))
952                });
953            self.0
954                .gst_pad
955                .set_chain_function(move |_gst_pad, _parent, _buffer| Err(FlowError::Flushing));
956            self.0
957                .gst_pad
958                .set_chain_list_function(move |_gst_pad, _parent, _list| Err(FlowError::Flushing));
959            self.0
960                .gst_pad
961                .set_event_function(move |_gst_pad, _parent, _event| false);
962            self.0
963                .gst_pad
964                .set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing));
965            self.0
966                .gst_pad
967                .set_query_function(move |_gst_pad, _parent, _query| false);
968        }
969    }
970}
971
972impl Deref for PadSink {
973    type Target = PadSinkInner;
974
975    fn deref(&self) -> &Self::Target {
976        &self.0
977    }
978}