gstreamer_utils/
streamproducer.rs

1use std::{
2    collections::HashMap,
3    mem,
4    sync::{atomic, Arc, Mutex, MutexGuard},
5};
6
7use gst::{glib, prelude::*};
8use std::sync::LazyLock;
9use thiserror::Error;
10
11pub const DEFAULT_PRODUCER_SYNC: bool = true;
12
13pub const DEFAULT_CONSUMER_MAX_BUFFERS: u64 = 0;
14pub const DEFAULT_CONSUMER_MAX_BYTES: gst::format::Bytes = gst::format::Bytes::ZERO;
15pub const DEFAULT_CONSUMER_MAX_TIME: gst::ClockTime = gst::ClockTime::from_mseconds(500);
16
17// Small wrapper around AtomicU64 and a Mutex, to allow it to run regular AtomicU64
18// operations where supported, and fallback to a mutex where it is not. The wrapper methods
19// are the ones that are needed, and not all are exposed.
20#[derive(Debug)]
21struct WrappedAtomicU64 {
22    #[cfg(not(target_has_atomic = "64"))]
23    atomic: Mutex<u64>,
24    #[cfg(target_has_atomic = "64")]
25    atomic: atomic::AtomicU64,
26}
27
28#[cfg(target_has_atomic = "64")]
29impl WrappedAtomicU64 {
30    fn new(value: u64) -> WrappedAtomicU64 {
31        WrappedAtomicU64 {
32            atomic: atomic::AtomicU64::new(value),
33        }
34    }
35    fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
36        self.atomic.fetch_add(value, order)
37    }
38    fn store(&self, value: u64, order: atomic::Ordering) {
39        self.atomic.store(value, order);
40    }
41
42    fn load(&self, order: atomic::Ordering) -> u64 {
43        self.atomic.load(order)
44    }
45}
46
47#[cfg(not(target_has_atomic = "64"))]
48impl WrappedAtomicU64 {
49    fn new(value: u64) -> WrappedAtomicU64 {
50        WrappedAtomicU64 {
51            atomic: Mutex::new(value),
52        }
53    }
54    fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
55        let mut guard = self.atomic.lock().unwrap();
56        let old = *guard;
57        *guard += value;
58        old
59    }
60    fn store(&self, value: u64, _order: atomic::Ordering) {
61        *self.atomic.lock().unwrap() = value;
62    }
63    fn load(&self, _order: atomic::Ordering) -> u64 {
64        *self.atomic.lock().unwrap()
65    }
66}
67
68static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
69    gst::DebugCategory::new(
70        "utilsrs-stream-producer",
71        gst::DebugColorFlags::empty(),
72        Some("gst_app Stream Producer interface"),
73    )
74});
75
76/// The interface for transporting media data from one node
77/// to another.
78///
79/// A producer is essentially a GStreamer `appsink` whose output
80/// is sent to a set of consumers, who are essentially `appsrc` wrappers
81#[derive(Debug, Clone)]
82pub struct StreamProducer(Arc<StreamProducerInner>);
83
84impl PartialEq for StreamProducer {
85    fn eq(&self, other: &Self) -> bool {
86        self.0.appsink.eq(&other.0.appsink)
87    }
88}
89
90impl Eq for StreamProducer {}
91
92#[derive(Debug)]
93struct StreamProducerInner {
94    /// The appsink to dispatch data for
95    appsink: gst_app::AppSink,
96    /// The pad probe on the appsink=
97    appsink_probe_id: Option<gst::PadProbeId>,
98    /// The consumers to dispatch data to
99    consumers: Arc<Mutex<StreamConsumers>>,
100}
101
102impl Drop for StreamProducerInner {
103    fn drop(&mut self) {
104        if let Some(probe_id) = self.appsink_probe_id.take() {
105            let pad = self.appsink.static_pad("sink").unwrap();
106            pad.remove_probe(probe_id);
107        }
108
109        self.appsink
110            .set_callbacks(gst_app::AppSinkCallbacks::builder().build());
111    }
112}
113
114/// User defined producer settings
115///
116/// Defaults to:
117///
118/// * `sync` <- `true` (sync on the clock)
119///
120/// Use `ConsumerSettings::builder()` if you need different values.
121#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
122pub struct ProducerSettings {
123    pub sync: bool,
124}
125
126impl Default for ProducerSettings {
127    fn default() -> Self {
128        ProducerSettings {
129            sync: DEFAULT_PRODUCER_SYNC,
130        }
131    }
132}
133
134/// Link between a `StreamProducer` and a consumer, disconnecting the link on `Drop`.
135/// The producer and consumer will stay alive while the link is.
136#[derive(Debug)]
137#[must_use]
138pub struct ConsumptionLink {
139    consumer: gst_app::AppSrc,
140    settings: ConsumerSettings,
141    producer: Option<StreamProducer>,
142    /// number of buffers dropped because `consumer` internal queue was full
143    dropped: Arc<WrappedAtomicU64>,
144    /// number of buffers pushed through `consumer`
145    pushed: Arc<WrappedAtomicU64>,
146    /// if buffers should not be pushed to the `consumer` right now
147    discard: Arc<atomic::AtomicBool>,
148    /// whether the link will drop delta frames until next keyframe on discont
149    wait_for_keyframe: Arc<atomic::AtomicBool>,
150}
151
152impl ConsumptionLink {
153    /// Create a new disconnected `ConsumptionLink`.
154    ///
155    /// The consumer will use the default configuration (see [StreamProducer::configure_consumer]).
156    /// If you need different settings, call [ConsumptionLink::disconnected_with] instead.
157    pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
158        StreamProducer::configure_consumer(&consumer);
159
160        ConsumptionLink {
161            consumer,
162            settings: ConsumerSettings::default(),
163            producer: None,
164            dropped: Arc::new(WrappedAtomicU64::new(0)),
165            pushed: Arc::new(WrappedAtomicU64::new(0)),
166            discard: Arc::new(atomic::AtomicBool::new(false)),
167            wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
168        }
169    }
170
171    /// Create a new disconnected `ConsumptionLink`.
172    pub fn disconnected_with(
173        consumer: gst_app::AppSrc,
174        settings: ConsumerSettings,
175    ) -> ConsumptionLink {
176        StreamProducer::configure_consumer(&consumer);
177
178        ConsumptionLink {
179            consumer,
180            settings,
181            producer: None,
182            dropped: Arc::new(WrappedAtomicU64::new(0)),
183            pushed: Arc::new(WrappedAtomicU64::new(0)),
184            discard: Arc::new(atomic::AtomicBool::new(false)),
185            wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
186        }
187    }
188
189    /// Replace the producer by a new one, keeping the existing consumer.
190    pub fn change_producer(
191        &mut self,
192        new_producer: &StreamProducer,
193        reset_stats: bool,
194    ) -> Result<(), AddConsumerError> {
195        self.disconnect();
196        if reset_stats {
197            self.dropped.store(0, atomic::Ordering::SeqCst);
198            self.pushed.store(0, atomic::Ordering::SeqCst);
199        }
200        new_producer.add_consumer_internal(
201            &self.consumer,
202            self.settings,
203            self.dropped.clone(),
204            self.pushed.clone(),
205            self.discard.clone(),
206            self.wait_for_keyframe.clone(),
207        )?;
208        self.producer = Some(new_producer.clone());
209        Ok(())
210    }
211
212    /// Disconnect the consumer from the producer
213    pub fn disconnect(&mut self) {
214        if let Some(producer) = self.producer.take() {
215            producer.remove_consumer(&self.consumer);
216        }
217    }
218
219    /// number of dropped buffers because the consumer internal queue was full
220    pub fn dropped(&self) -> u64 {
221        self.dropped.load(atomic::Ordering::SeqCst)
222    }
223
224    /// number of buffers pushed through this link
225    pub fn pushed(&self) -> u64 {
226        self.pushed.load(atomic::Ordering::SeqCst)
227    }
228
229    /// if buffers are currently pushed through this link
230    pub fn discard(&self) -> bool {
231        self.discard.load(atomic::Ordering::SeqCst)
232    }
233
234    /// If set to `true` then no buffers will be pushed through this link
235    pub fn set_discard(&self, discard: bool) {
236        self.discard.store(discard, atomic::Ordering::SeqCst)
237    }
238
239    /// if the link will drop frames until the next keyframe on discont
240    pub fn wait_for_keyframe(&self) -> bool {
241        self.wait_for_keyframe.load(atomic::Ordering::SeqCst)
242    }
243
244    /// If set to `true` then the link will drop delta-frames until the next
245    /// keyframe on discont (default behavior).
246    pub fn set_wait_for_keyframe(&self, wait: bool) {
247        self.wait_for_keyframe.store(wait, atomic::Ordering::SeqCst)
248    }
249
250    /// Get the GStreamer `appsrc` wrapped by this link
251    pub fn appsrc(&self) -> &gst_app::AppSrc {
252        &self.consumer
253    }
254
255    /// Get the `StreamProducer` currently by this link, if any.
256    pub fn stream_producer(&self) -> Option<&StreamProducer> {
257        self.producer.as_ref()
258    }
259
260    /// Get the settings for this Consumer.
261    pub fn settings(&self) -> ConsumerSettings {
262        self.settings
263    }
264}
265
266impl Drop for ConsumptionLink {
267    fn drop(&mut self) {
268        self.disconnect();
269    }
270}
271
272/// User defined consumer settings
273///
274/// Defaults to:
275///
276/// * `max-buffers` <- `0` (unlimited)
277/// * `max-bytes` <- `0` (unlimited)
278/// * `max-time` <- `500ms`
279///
280/// Use `ConsumerSettings::builder()` if you need different values.
281#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
282pub struct ConsumerSettings {
283    pub max_buffer: u64,
284    pub max_bytes: gst::format::Bytes,
285    pub max_time: gst::ClockTime,
286}
287
288impl Default for ConsumerSettings {
289    fn default() -> Self {
290        ConsumerSettings {
291            max_buffer: DEFAULT_CONSUMER_MAX_BUFFERS,
292            max_bytes: DEFAULT_CONSUMER_MAX_BYTES,
293            max_time: DEFAULT_CONSUMER_MAX_TIME,
294        }
295    }
296}
297
298#[derive(Debug, Error)]
299/// Error type returned when adding consumers to producers.
300pub enum AddConsumerError {
301    #[error("Consumer already added")]
302    /// Consumer has already been added to this producer.
303    AlreadyAdded,
304}
305
306impl StreamProducer {
307    /// Configures a consumer `appsrc` for later use in a `StreamProducer`.
308    ///
309    /// This function configures the `appsrc` in a suitable state to act as a consumer
310    /// and also sets the internal queue properties as follows:
311    ///
312    /// * `max-buffers` <- `0` (unlimited)
313    /// * `max-bytes` <- `0` (unlimited)
314    /// * `max-time` <- `500ms`
315    ///
316    /// If you need different settings, call [`StreamProducer::configure_consumer_with`] instead.
317    ///
318    /// This is automatically invoked when calling [`StreamProducer::add_consumer`].
319    pub fn configure_consumer(consumer: &gst_app::AppSrc) {
320        Self::configure_consumer_with(consumer, ConsumerSettings::default());
321    }
322
323    /// Configures a consumer `appsrc` for later use in a `StreamProducer`.
324    ///
325    /// This function configures the `appsrc` in a suitable state to act as a consumer
326    /// and applies the provided settings.
327    ///
328    /// If unsure, call [`StreamProducer::configure_consumer`] instead.
329    pub fn configure_consumer_with(consumer: &gst_app::AppSrc, settings: ConsumerSettings) {
330        // Latency on the appsrc is set by the publisher before the first buffer
331        // and whenever it changes
332        consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
333        consumer.set_format(gst::Format::Time);
334        consumer.set_is_live(true);
335        consumer.set_handle_segment_change(true);
336        consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
337        consumer.set_automatic_eos(false);
338
339        consumer.set_max_buffers(settings.max_buffer);
340        consumer.set_max_bytes(settings.max_bytes.into());
341        consumer.set_max_time(settings.max_time);
342    }
343
344    /// Adds an `appsrc` to dispatch data to.
345    ///
346    /// This function configures the `appsrc` in a suitable state to act as a consumer
347    /// and also sets the internal queue properties as follows:
348    ///
349    /// * `max-buffers` <- `0` (unlimited)
350    /// * `max-bytes` <- `0` (unlimited)
351    /// * `max-time` <- `500ms`
352    ///
353    /// If you need different values, call [`StreamProducer::add_consumer_with`] instead.
354    ///
355    /// Dropping the returned `ConsumptionLink` will automatically disconnect the consumer from the producer.
356    pub fn add_consumer(
357        &self,
358        consumer: &gst_app::AppSrc,
359    ) -> Result<ConsumptionLink, AddConsumerError> {
360        let dropped = Arc::new(WrappedAtomicU64::new(0));
361        let pushed = Arc::new(WrappedAtomicU64::new(0));
362        let discard = Arc::new(atomic::AtomicBool::new(false));
363        let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
364
365        self.add_consumer_internal(
366            consumer,
367            ConsumerSettings::default(),
368            dropped.clone(),
369            pushed.clone(),
370            discard.clone(),
371            wait_for_keyframe.clone(),
372        )?;
373
374        Ok(ConsumptionLink {
375            consumer: consumer.clone(),
376            settings: ConsumerSettings::default(),
377            producer: Some(self.clone()),
378            dropped,
379            pushed,
380            discard,
381            wait_for_keyframe,
382        })
383    }
384
385    /// Adds a configured `appsrc` to dispatch data to.
386    ///
387    /// This function configures the `appsrc` in a suitable state to act as a consumer
388    /// and applies the provided settings.
389    ///
390    /// If unsure, call [`StreamProducer::add_consumer`] instead.
391    ///
392    /// Dropping the returned `ConsumptionLink` will automatically disconnect the consumer from the producer.
393    pub fn add_consumer_with(
394        &self,
395        consumer: &gst_app::AppSrc,
396        settings: ConsumerSettings,
397    ) -> Result<ConsumptionLink, AddConsumerError> {
398        let dropped = Arc::new(WrappedAtomicU64::new(0));
399        let pushed = Arc::new(WrappedAtomicU64::new(0));
400        let discard = Arc::new(atomic::AtomicBool::new(false));
401        let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
402
403        self.add_consumer_internal(
404            consumer,
405            settings,
406            dropped.clone(),
407            pushed.clone(),
408            discard.clone(),
409            wait_for_keyframe.clone(),
410        )?;
411
412        Ok(ConsumptionLink {
413            consumer: consumer.clone(),
414            settings,
415            producer: Some(self.clone()),
416            dropped,
417            pushed,
418            discard,
419            wait_for_keyframe,
420        })
421    }
422
423    fn add_consumer_internal(
424        &self,
425        consumer: &gst_app::AppSrc,
426        settings: ConsumerSettings,
427        dropped: Arc<WrappedAtomicU64>,
428        pushed: Arc<WrappedAtomicU64>,
429        discard: Arc<atomic::AtomicBool>,
430        wait_for_keyframe: Arc<atomic::AtomicBool>,
431    ) -> Result<(), AddConsumerError> {
432        let mut consumers = self.0.consumers.lock().unwrap();
433        if consumers.consumers.contains_key(consumer) {
434            gst::error!(
435                CAT,
436                obj = &self.0.appsink,
437                "Consumer {} ({:?}) already added",
438                consumer.name(),
439                consumer
440            );
441            return Err(AddConsumerError::AlreadyAdded);
442        }
443
444        gst::debug!(
445            CAT,
446            obj = &self.0.appsink,
447            "Adding consumer {} ({:?})",
448            consumer.name(),
449            consumer
450        );
451
452        Self::configure_consumer_with(consumer, settings);
453
454        // Forward force-keyunit events upstream to the appsink
455        let srcpad = consumer.static_pad("src").unwrap();
456        let fku_probe_id = srcpad
457            .add_probe(
458                gst::PadProbeType::EVENT_UPSTREAM,
459                glib::clone!(
460                    #[weak(rename_to = appsink)]
461                    self.0.appsink,
462                    #[upgrade_or_panic]
463                    move |_pad, info| {
464                        let Some(event) = info.event() else {
465                            return gst::PadProbeReturn::Ok;
466                        };
467
468                        if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() {
469                            gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
470                            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
471                            let pad = appsink.static_pad("sink").unwrap();
472                            let _ = pad.push_event(event.clone());
473                        }
474
475                        gst::PadProbeReturn::Ok
476                    }
477                ),
478            )
479            .unwrap();
480
481        let stream_consumer = StreamConsumer::new(
482            consumer,
483            fku_probe_id,
484            dropped,
485            pushed,
486            discard,
487            wait_for_keyframe,
488        );
489
490        consumers
491            .consumers
492            .insert(consumer.clone(), stream_consumer);
493
494        // forward selected sticky events. We can send those now as appsrc will delay the events
495        // until stream-start, caps and segment are sent.
496        let events_to_forward = consumers.events_to_forward.clone();
497        // drop the lock before sending events
498        drop(consumers);
499
500        let appsink_pad = self.0.appsink.static_pad("sink").unwrap();
501        appsink_pad.sticky_events_foreach(|event| {
502            if events_to_forward.contains(&event.type_()) {
503                gst::debug!(
504                    CAT,
505                    obj = &self.0.appsink,
506                    "forward sticky event {:?}",
507                    event
508                );
509                consumer.send_event(event.clone());
510            }
511
512            std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
513        });
514
515        Ok(())
516    }
517
518    fn process_sample(
519        sample: gst::Sample,
520        appsink: &gst_app::AppSink,
521        mut consumers: MutexGuard<StreamConsumers>,
522    ) -> Result<gst::FlowSuccess, gst::FlowError> {
523        let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
524            let flags = buf.flags();
525
526            (
527                flags.contains(gst::BufferFlags::DISCONT),
528                !flags.contains(gst::BufferFlags::DELTA_UNIT),
529            )
530        } else {
531            (false, true)
532        };
533
534        gst::trace!(
535            CAT,
536            obj = appsink,
537            "processing sample {:?}",
538            sample.buffer()
539        );
540
541        let latency = consumers.current_latency;
542        let latency_updated = mem::replace(&mut consumers.latency_updated, false);
543
544        let mut needs_keyframe_request = false;
545
546        let current_consumers = consumers
547            .consumers
548            .values()
549            .filter_map(|consumer| {
550                if let Some(latency) = latency {
551                    if consumer
552                        .forwarded_latency
553                        .compare_exchange(
554                            false,
555                            true,
556                            atomic::Ordering::SeqCst,
557                            atomic::Ordering::SeqCst,
558                        )
559                        .is_ok()
560                        || latency_updated
561                    {
562                        gst::info!(CAT, obj = appsink, "setting new latency: {latency}");
563                        consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
564                    }
565                }
566
567                if consumer.discard.load(atomic::Ordering::SeqCst) {
568                    consumer
569                        .needs_keyframe
570                        .store(true, atomic::Ordering::SeqCst);
571                    return None;
572                }
573
574                if is_discont
575                    && !is_keyframe
576                    && consumer.wait_for_keyframe.load(atomic::Ordering::SeqCst)
577                {
578                    // Whenever we have a discontinuity, we need a new keyframe
579                    consumer
580                        .needs_keyframe
581                        .store(true, atomic::Ordering::SeqCst);
582                }
583
584                if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
585                    // If we need a keyframe (and this one isn't) request a keyframe upstream
586                    if !needs_keyframe_request {
587                        gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
588                        needs_keyframe_request = true;
589                    }
590
591                    consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
592
593                    gst::error!(
594                        CAT,
595                        obj = appsink,
596                        "Ignoring frame for {} while waiting for a keyframe",
597                        consumer.appsrc.name()
598                    );
599                    None
600                } else {
601                    consumer
602                        .needs_keyframe
603                        .store(false, atomic::Ordering::SeqCst);
604                    consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
605
606                    Some(consumer.appsrc.clone())
607                }
608            })
609            .collect::<Vec<_>>();
610
611        drop(consumers);
612
613        if needs_keyframe_request {
614            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
615            let pad = appsink.static_pad("sink").unwrap();
616            pad.push_event(
617                gst_video::UpstreamForceKeyUnitEvent::builder()
618                    .all_headers(true)
619                    .build(),
620            );
621        }
622
623        for consumer in current_consumers {
624            if let Err(err) = consumer.push_sample(&sample) {
625                gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
626            }
627        }
628        Ok(gst::FlowSuccess::Ok)
629    }
630
631    /// Remove a consumer appsrc by id
632    pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
633        let name = consumer.name();
634        if self
635            .0
636            .consumers
637            .lock()
638            .unwrap()
639            .consumers
640            .remove(consumer)
641            .is_some()
642        {
643            gst::debug!(
644                CAT,
645                obj = &self.0.appsink,
646                "Removed consumer {} ({:?})",
647                name,
648                consumer
649            );
650            consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
651        } else {
652            gst::debug!(
653                CAT,
654                obj = &self.0.appsink,
655                "Consumer {} ({:?}) not found",
656                name,
657                consumer
658            );
659        }
660    }
661
662    /// configure event types the appsink should forward to all its consumers (default: `Eos`).
663    pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
664        self.0.consumers.lock().unwrap().events_to_forward =
665            events_to_forward.into_iter().collect();
666    }
667
668    /// get event types the appsink should forward to all its consumers
669    pub fn get_forwarded_events(&self) -> Vec<gst::EventType> {
670        self.0.consumers.lock().unwrap().events_to_forward.clone()
671    }
672
673    /// configure whether the preroll sample should be forwarded (default: `true`)
674    pub fn set_forward_preroll(&self, forward_preroll: bool) {
675        self.0.consumers.lock().unwrap().forward_preroll = forward_preroll;
676    }
677
678    /// Get the GStreamer `appsink` wrapped by this producer
679    pub fn appsink(&self) -> &gst_app::AppSink {
680        &self.0.appsink
681    }
682
683    /// Signals an error on all consumers
684    pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
685        let consumers = self.0.consumers.lock().unwrap();
686
687        for consumer in consumers.consumers.keys() {
688            let mut msg_builder =
689                gst::message::Error::builder_from_error(error.clone()).src(consumer);
690            if let Some(debug) = debug {
691                msg_builder = msg_builder.debug(debug);
692            }
693
694            let _ = consumer.post_message(msg_builder.build());
695        }
696    }
697
698    /// The last sample produced by this producer.
699    pub fn last_sample(&self) -> Option<gst::Sample> {
700        self.0.appsink.property("last-sample")
701    }
702}
703
704impl StreamProducer {
705    /// Adds an `appsink` to dispatch data from.
706    ///
707    /// This function configures the `appsink` in a suitable state to act as a producer
708    /// and also sets the properties as follows:
709    ///
710    /// * `sync` <- `true` (sync on the clock)
711    ///
712    /// If you need a different value, use [`StreamProducer::with`] instead.
713    pub fn from(appsink: &gst_app::AppSink) -> Self {
714        Self::with(appsink, ProducerSettings::default())
715    }
716
717    /// Adds an `appsink` to dispatch data from.
718    ///
719    /// This function configures the `appsink` in a suitable state to act as a producer
720    /// and applies the provided settings.
721    ///
722    /// If unsure, use [`StreamProducer::from`] instead.
723    pub fn with(appsink: &gst_app::AppSink, settings: ProducerSettings) -> Self {
724        let consumers = Arc::new(Mutex::new(StreamConsumers {
725            current_latency: None,
726            latency_updated: false,
727            consumers: HashMap::new(),
728            // it would make sense to automatically forward more events such as Tag but that would break
729            // with older GStreamer, see https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4297
730            events_to_forward: vec![gst::EventType::Eos, gst::EventType::Gap],
731            forward_preroll: true,
732            just_forwarded_preroll: false,
733        }));
734
735        appsink.set_sync(settings.sync);
736
737        appsink.set_callbacks(
738            gst_app::AppSinkCallbacks::builder()
739                .new_sample(glib::clone!(
740                    #[strong]
741                    consumers,
742                    move |appsink| {
743                        let mut consumers = consumers.lock().unwrap();
744
745                        let sample = match appsink.pull_sample() {
746                            Ok(sample) => sample,
747                            Err(_err) => {
748                                gst::debug!(CAT, obj = appsink, "Failed to pull sample");
749                                return Err(gst::FlowError::Flushing);
750                            }
751                        };
752
753                        let just_forwarded_preroll =
754                            mem::replace(&mut consumers.just_forwarded_preroll, false);
755
756                        if just_forwarded_preroll {
757                            return Ok(gst::FlowSuccess::Ok);
758                        }
759
760                        StreamProducer::process_sample(sample, appsink, consumers)
761                    }
762                ))
763                .new_preroll(glib::clone!(
764                    #[strong]
765                    consumers,
766                    move |appsink| {
767                        let mut consumers = consumers.lock().unwrap();
768
769                        let sample = match appsink.pull_preroll() {
770                            Ok(sample) => sample,
771                            Err(_err) => {
772                                gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
773                                return Err(gst::FlowError::Flushing);
774                            }
775                        };
776
777                        if consumers.forward_preroll {
778                            consumers.just_forwarded_preroll = true;
779
780                            StreamProducer::process_sample(sample, appsink, consumers)
781                        } else {
782                            Ok(gst::FlowSuccess::Ok)
783                        }
784                    }
785                ))
786                .new_event(glib::clone!(
787                    #[strong]
788                    consumers,
789                    move |appsink| {
790                        match appsink
791                            .pull_object()
792                            .map(|obj| obj.downcast::<gst::Event>())
793                        {
794                            Ok(Ok(event)) => {
795                                let (events_to_forward, appsrcs) = {
796                                    // clone so we don't keep the lock while pushing events
797                                    let consumers = consumers.lock().unwrap();
798                                    let events = consumers.events_to_forward.clone();
799                                    let appsrcs =
800                                        consumers.consumers.keys().cloned().collect::<Vec<_>>();
801
802                                    (events, appsrcs)
803                                };
804
805                                if events_to_forward.contains(&event.type_()) {
806                                    for appsrc in appsrcs {
807                                        appsrc.send_event(event.clone());
808                                    }
809                                }
810                            }
811                            Ok(Err(_)) => {} // pulled another unsupported object type, ignore
812                            Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
813                        }
814
815                        false
816                    }
817                ))
818                .eos(glib::clone!(
819                    #[strong]
820                    consumers,
821                    move |appsink| {
822                        let stream_consumers = consumers.lock().unwrap();
823
824                        if stream_consumers
825                            .events_to_forward
826                            .contains(&gst::EventType::Eos)
827                        {
828                            let current_consumers = stream_consumers
829                                .consumers
830                                .values()
831                                .map(|c| c.appsrc.clone())
832                                .collect::<Vec<_>>();
833                            drop(stream_consumers);
834
835                            for consumer in current_consumers {
836                                gst::debug!(
837                                    CAT,
838                                    obj = appsink,
839                                    "set EOS on consumer {}",
840                                    consumer.name()
841                                );
842                                let _ = consumer.end_of_stream();
843                            }
844                        } else {
845                            gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
846                        }
847                    }
848                ))
849                .build(),
850        );
851
852        let sinkpad = appsink.static_pad("sink").unwrap();
853        let appsink_probe_id = if settings.sync {
854            // If appsink syncs on the clock,
855            // we need to propagate the latency consolidated by the producer's pipeline,
856            // which uses the max latency among all the branches. This value can be
857            // captured by observing the Latency event posted by the pipeline
858            // upon latency recalculation.
859            sinkpad
860                .add_probe(
861                    gst::PadProbeType::EVENT_UPSTREAM,
862                    glib::clone!(
863                        #[strong]
864                        consumers,
865                        move |_pad, info| {
866                            let Some(event) = info.event() else {
867                                unreachable!();
868                            };
869                            let gst::EventView::Latency(event) = event.view() else {
870                                return gst::PadProbeReturn::Ok;
871                            };
872
873                            let mut consumers = consumers.lock().unwrap();
874                            consumers.current_latency = Some(event.latency());
875                            consumers.latency_updated = true;
876
877                            gst::PadProbeReturn::Ok
878                        }
879                    ),
880                )
881                .unwrap()
882        } else {
883            // If appsink doesn't sync on the clock,
884            // only the latency from current branch needs to be considered.
885            // In this case, the Latency event doesn't take into account current branch.
886            // We can still capture current branch's latency by observing the result
887            // of the Latency query set by upstream elements.
888            //
889            // For reference:
890            //
891            // * BaseSink reports being non-live if the sink doesn't sync on the clock:
892            //   https://gitlab.freedesktop.org/gstreamer/gstreamer/-/blob/9669136207c6a2d35c45c9460f9a838cb5a97379/subprojects/gstreamer/libs/gst/base/gstbasesink.c#L1297
893            // * Bin-level consolidation skips non-live branches:
894            //   https://gitlab.freedesktop.org/gstreamer/gstreamer/-/blob/9669136207c6a2d35c45c9460f9a838cb5a97379/subprojects/gstreamer/gst/gstbin.c#L4166
895            sinkpad
896                .add_probe(
897                    gst::PadProbeType::QUERY_UPSTREAM | gst::PadProbeType::PULL,
898                    glib::clone!(
899                        #[strong]
900                        consumers,
901                        move |_pad, info| {
902                            let Some(query) = info.query() else {
903                                unreachable!();
904                            };
905                            let gst::QueryView::Latency(query) = query.view() else {
906                                return gst::PadProbeReturn::Ok;
907                            };
908
909                            let mut consumers = consumers.lock().unwrap();
910                            consumers.current_latency = Some(query.result().1);
911                            consumers.latency_updated = true;
912
913                            gst::PadProbeReturn::Ok
914                        }
915                    ),
916                )
917                .unwrap()
918        };
919
920        StreamProducer(Arc::new(StreamProducerInner {
921            appsink: appsink.clone(),
922            appsink_probe_id: Some(appsink_probe_id),
923            consumers,
924        }))
925    }
926}
927
928/// Wrapper around a HashMap of consumers, exists for thread safety
929/// and also protects some of the producer state
930#[derive(Debug)]
931struct StreamConsumers {
932    /// The currently-observed latency
933    current_latency: Option<gst::ClockTime>,
934    /// Whether the consumers' appsrc latency needs updating
935    latency_updated: bool,
936    /// The consumers, AppSrc pointer value -> consumer
937    consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
938    /// What events should be forwarded to consumers
939    events_to_forward: Vec<gst::EventType>,
940    /// Whether the preroll sample should be forwarded at all
941    forward_preroll: bool,
942    /// Whether we just forwarded the preroll sample. When we did we want to
943    /// discard the next sample from on_new_sample as it would cause us to
944    /// otherwise push out the same sample twice to consumers.
945    just_forwarded_preroll: bool,
946}
947
948/// Wrapper around a consumer's `appsrc`
949#[derive(Debug)]
950struct StreamConsumer {
951    /// The GStreamer `appsrc` of the consumer
952    appsrc: gst_app::AppSrc,
953    /// The id of a pad probe that intercepts force-key-unit events
954    fku_probe_id: Option<gst::PadProbeId>,
955    /// Whether an initial latency was forwarded to the `appsrc`
956    forwarded_latency: atomic::AtomicBool,
957    /// Whether a first buffer has made it through, used to determine
958    /// whether a new key unit should be requested. Only useful for encoded
959    /// streams.
960    needs_keyframe: Arc<atomic::AtomicBool>,
961    /// number of buffers dropped because `appsrc` internal queue was full
962    dropped: Arc<WrappedAtomicU64>,
963    /// number of buffers pushed through `appsrc`
964    pushed: Arc<WrappedAtomicU64>,
965    /// if buffers should not be pushed to the `appsrc` right now
966    discard: Arc<atomic::AtomicBool>,
967    /// whether the consumer should drop delta frames until next keyframe on discont
968    wait_for_keyframe: Arc<atomic::AtomicBool>,
969}
970
971impl StreamConsumer {
972    /// Create a new consumer
973    fn new(
974        appsrc: &gst_app::AppSrc,
975        fku_probe_id: gst::PadProbeId,
976        dropped: Arc<WrappedAtomicU64>,
977        pushed: Arc<WrappedAtomicU64>,
978        discard: Arc<atomic::AtomicBool>,
979        wait_for_keyframe: Arc<atomic::AtomicBool>,
980    ) -> Self {
981        let needs_keyframe = Arc::new(atomic::AtomicBool::new(
982            wait_for_keyframe.load(atomic::Ordering::SeqCst),
983        ));
984        let needs_keyframe_clone = needs_keyframe.clone();
985        let wait_for_keyframe_clone = wait_for_keyframe.clone();
986        let dropped_clone = dropped.clone();
987
988        appsrc.set_callbacks(
989            gst_app::AppSrcCallbacks::builder()
990                .enough_data(move |appsrc| {
991                    gst::debug!(
992                        CAT,
993                        obj = appsrc,
994                        "consumer {} ({appsrc:?}) is not consuming fast enough, old samples are getting dropped",
995                        appsrc.name(),
996                    );
997
998                    needs_keyframe_clone.store(wait_for_keyframe_clone.load(atomic::Ordering::SeqCst), atomic::Ordering::SeqCst);
999                    dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
1000
1001                    let _  = appsrc.post_message(gst::message::Element::builder(
1002                        gst::Structure::new_empty("dropped-buffer")).src(appsrc).build()
1003                    );
1004                })
1005                .build(),
1006        );
1007
1008        StreamConsumer {
1009            appsrc: appsrc.clone(),
1010            fku_probe_id: Some(fku_probe_id),
1011            forwarded_latency: atomic::AtomicBool::new(false),
1012            needs_keyframe,
1013            dropped,
1014            pushed,
1015            discard,
1016            wait_for_keyframe,
1017        }
1018    }
1019}
1020
1021impl Drop for StreamConsumer {
1022    fn drop(&mut self) {
1023        if let Some(fku_probe_id) = self.fku_probe_id.take() {
1024            let srcpad = self.appsrc.static_pad("src").unwrap();
1025            srcpad.remove_probe(fku_probe_id);
1026        }
1027    }
1028}
1029
1030impl PartialEq for StreamConsumer {
1031    fn eq(&self, other: &Self) -> bool {
1032        self.appsrc.eq(&other.appsrc)
1033    }
1034}
1035
1036impl Eq for StreamConsumer {}
1037
1038impl std::hash::Hash for StreamConsumer {
1039    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1040        std::hash::Hash::hash(&self.appsrc, state);
1041    }
1042}
1043
1044impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
1045    #[inline]
1046    fn borrow(&self) -> &gst_app::AppSrc {
1047        &self.appsrc
1048    }
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053    use std::{
1054        str::FromStr,
1055        sync::{Arc, Mutex},
1056    };
1057
1058    use futures::{
1059        channel::{mpsc, mpsc::Receiver},
1060        SinkExt, StreamExt,
1061    };
1062    use gst::prelude::*;
1063
1064    use crate::{streamproducer::ConsumerSettings, ConsumptionLink, StreamProducer};
1065
1066    fn create_producer() -> (
1067        gst::Pipeline,
1068        gst_app::AppSrc,
1069        gst_app::AppSink,
1070        StreamProducer,
1071    ) {
1072        let producer_pipe =
1073            gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
1074                .unwrap()
1075                .downcast::<gst::Pipeline>()
1076                .unwrap();
1077        let producer_sink = producer_pipe
1078            .by_name("producer_sink")
1079            .unwrap()
1080            .downcast::<gst_app::AppSink>()
1081            .unwrap();
1082
1083        (
1084            producer_pipe.clone(),
1085            producer_pipe
1086                .by_name("producer_src")
1087                .unwrap()
1088                .downcast::<gst_app::AppSrc>()
1089                .unwrap(),
1090            producer_sink.clone(),
1091            StreamProducer::from(&producer_sink),
1092        )
1093    }
1094
1095    struct Consumer {
1096        pipeline: gst::Pipeline,
1097        src: gst_app::AppSrc,
1098        sink: gst_app::AppSink,
1099        receiver: Mutex<Receiver<gst::Sample>>,
1100        connected: Mutex<bool>,
1101    }
1102
1103    impl Consumer {
1104        fn new(id: &str) -> Self {
1105            let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
1106                .unwrap()
1107                .downcast::<gst::Pipeline>()
1108                .unwrap();
1109
1110            let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
1111            let sender = Arc::new(Mutex::new(sender));
1112            let sink = pipeline
1113                .by_name("sink")
1114                .unwrap()
1115                .downcast::<gst_app::AppSink>()
1116                .unwrap();
1117
1118            sink.set_callbacks(
1119                gst_app::AppSinkCallbacks::builder()
1120                    // Add a handler to the "new-sample" signal.
1121                    .new_sample(move |appsink| {
1122                        // Pull the sample in question out of the appsink's buffer.
1123                        let sender_clone = sender.clone();
1124                        futures::executor::block_on(
1125                            sender_clone
1126                                .lock()
1127                                .unwrap()
1128                                .send(appsink.pull_sample().unwrap()),
1129                        )
1130                        .unwrap();
1131
1132                        Ok(gst::FlowSuccess::Ok)
1133                    })
1134                    .build(),
1135            );
1136
1137            Self {
1138                pipeline: pipeline.clone(),
1139                src: pipeline
1140                    .by_name(id)
1141                    .unwrap()
1142                    .downcast::<gst_app::AppSrc>()
1143                    .unwrap(),
1144                sink,
1145                receiver: Mutex::new(receiver),
1146                connected: Mutex::new(false),
1147            }
1148        }
1149
1150        fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
1151            {
1152                let mut connected = self.connected.lock().unwrap();
1153                *connected = true;
1154            }
1155
1156            producer.add_consumer(&self.src).unwrap()
1157        }
1158
1159        fn disconnect(&self, producer: &StreamProducer) {
1160            {
1161                let mut connected = self.connected.lock().unwrap();
1162                *connected = false;
1163            }
1164
1165            producer.remove_consumer(&self.src);
1166        }
1167    }
1168
1169    #[test]
1170    fn simple() {
1171        gst::init().unwrap();
1172
1173        let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
1174        producer_pipe
1175            .set_state(gst::State::Playing)
1176            .expect("Couldn't set producer pipeline state");
1177
1178        let mut consumers: Vec<Consumer> = Vec::new();
1179        let consumer = Consumer::new("consumer1");
1180        let link1 = consumer.connect(&producer);
1181        consumer
1182            .pipeline
1183            .set_state(gst::State::Playing)
1184            .expect("Couldn't set producer pipeline state");
1185        consumers.push(consumer);
1186
1187        let consumer = Consumer::new("consumer2");
1188        let link2 = consumer.connect(&producer);
1189        consumer
1190            .pipeline
1191            .set_state(gst::State::Playing)
1192            .expect("Couldn't set producer pipeline state");
1193        consumers.push(consumer);
1194
1195        assert!(producer.last_sample().is_none());
1196
1197        for i in 0..10 {
1198            let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
1199            producer_src.set_caps(Some(&caps));
1200            producer_src.push_buffer(gst::Buffer::new()).unwrap();
1201
1202            for consumer in &consumers {
1203                if *consumer.connected.lock().unwrap() {
1204                    let sample =
1205                        futures::executor::block_on(consumer.receiver.lock().unwrap().next())
1206                            .expect("Received an empty buffer?");
1207                    sample.buffer().expect("No buffer on the sample?");
1208                    assert_eq!(sample.caps(), Some(caps.as_ref()));
1209                } else {
1210                    debug_assert!(
1211                        consumer
1212                            .sink
1213                            .try_pull_sample(gst::ClockTime::from_nseconds(0))
1214                            .is_none(),
1215                        "Disconnected consumer got a new sample?!"
1216                    );
1217                }
1218            }
1219
1220            if i == 5 {
1221                consumers.first().unwrap().disconnect(&producer);
1222            }
1223        }
1224
1225        assert!(producer.last_sample().is_some());
1226
1227        assert_eq!(link1.pushed(), 6);
1228        assert_eq!(link1.dropped(), 0);
1229        assert_eq!(link2.pushed(), 10);
1230        assert_eq!(link2.dropped(), 0);
1231    }
1232
1233    fn check_consumer_commons(consumer: &gst_app::AppSrc) {
1234        assert_eq!(
1235            consumer.latency(),
1236            (Some(gst::ClockTime::ZERO), gst::ClockTime::NONE)
1237        );
1238        assert_eq!(consumer.format(), gst::Format::Time);
1239        assert!(consumer.is_live());
1240        assert!(consumer.is_handle_segment_change());
1241        assert_eq!(consumer.leaky_type(), gst_app::AppLeakyType::Downstream);
1242        assert!(!consumer.property::<bool>("automatic-eos"));
1243    }
1244
1245    #[test]
1246    fn configure_consumer_defaults() {
1247        gst::init().unwrap();
1248
1249        let consumer = gst_app::AppSrc::builder().build();
1250        StreamProducer::configure_consumer(&consumer);
1251        check_consumer_commons(&consumer);
1252
1253        assert_eq!(consumer.max_buffers(), 0);
1254        assert_eq!(consumer.max_bytes(), 0);
1255        assert_eq!(consumer.max_time().unwrap(), 500.mseconds());
1256    }
1257
1258    #[test]
1259    fn configure_consumer_with_defaults() {
1260        gst::init().unwrap();
1261
1262        let consumer = gst_app::AppSrc::builder().build();
1263        StreamProducer::configure_consumer_with(&consumer, ConsumerSettings::default());
1264        check_consumer_commons(&consumer);
1265
1266        assert_eq!(consumer.max_buffers(), 0);
1267        assert_eq!(consumer.max_bytes(), 0);
1268        assert_eq!(consumer.max_time().unwrap(), 500.mseconds());
1269    }
1270
1271    #[test]
1272    fn configure_consumer_with_specifics() {
1273        gst::init().unwrap();
1274
1275        let consumer = gst_app::AppSrc::builder().build();
1276
1277        StreamProducer::configure_consumer_with(
1278            &consumer,
1279            ConsumerSettings {
1280                max_buffer: 50,
1281                ..Default::default()
1282            },
1283        );
1284        check_consumer_commons(&consumer);
1285
1286        assert_eq!(consumer.max_buffers(), 50);
1287        assert_eq!(consumer.max_bytes(), 0);
1288        assert_eq!(consumer.max_time().unwrap(), 500.mseconds());
1289
1290        StreamProducer::configure_consumer_with(
1291            &consumer,
1292            ConsumerSettings {
1293                max_buffer: 10,
1294                max_bytes: 2.mebibytes(),
1295                ..Default::default()
1296            },
1297        );
1298        check_consumer_commons(&consumer);
1299
1300        assert_eq!(consumer.max_buffers(), 10);
1301        assert_eq!(consumer.max_bytes(), 2 * 1024 * 1024);
1302        assert_eq!(consumer.max_time().unwrap(), 500.mseconds());
1303
1304        StreamProducer::configure_consumer_with(
1305            &consumer,
1306            ConsumerSettings {
1307                max_time: gst::ClockTime::ZERO,
1308                ..Default::default()
1309            },
1310        );
1311        check_consumer_commons(&consumer);
1312
1313        assert_eq!(consumer.max_buffers(), 0);
1314        assert_eq!(consumer.max_bytes(), 0);
1315        assert!(consumer.max_time().unwrap().is_zero());
1316
1317        StreamProducer::configure_consumer_with(
1318            &consumer,
1319            ConsumerSettings {
1320                max_time: 750.mseconds(),
1321                ..Default::default()
1322            },
1323        );
1324        check_consumer_commons(&consumer);
1325
1326        assert_eq!(consumer.max_buffers(), 0);
1327        assert_eq!(consumer.max_bytes(), 0);
1328        assert_eq!(consumer.max_time().unwrap(), 750.mseconds());
1329    }
1330}