gstreamer_utils/
streamproducer.rs

1use std::{
2    collections::HashMap,
3    mem,
4    sync::{atomic, Arc, Mutex, MutexGuard},
5};
6
7use gst::{glib, prelude::*};
8use std::sync::LazyLock;
9use thiserror::Error;
10
11pub const DEFAULT_PRODUCER_SYNC: bool = true;
12
13pub const DEFAULT_CONSUMER_MAX_BUFFERS: u64 = 0;
14pub const DEFAULT_CONSUMER_MAX_BYTES: gst::format::Bytes = gst::format::Bytes::ZERO;
15pub const DEFAULT_CONSUMER_MAX_TIME: gst::ClockTime = gst::ClockTime::from_mseconds(500);
16
17// Small wrapper around AtomicU64 and a Mutex, to allow it to run regular AtomicU64
18// operations where supported, and fallback to a mutex where it is not. The wrapper methods
19// are the ones that are needed, and not all are exposed.
20#[derive(Debug)]
21struct WrappedAtomicU64 {
22    #[cfg(not(target_has_atomic = "64"))]
23    atomic: Mutex<u64>,
24    #[cfg(target_has_atomic = "64")]
25    atomic: atomic::AtomicU64,
26}
27
28#[cfg(target_has_atomic = "64")]
29impl WrappedAtomicU64 {
30    fn new(value: u64) -> WrappedAtomicU64 {
31        WrappedAtomicU64 {
32            atomic: atomic::AtomicU64::new(value),
33        }
34    }
35    fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
36        self.atomic.fetch_add(value, order)
37    }
38    fn store(&self, value: u64, order: atomic::Ordering) {
39        self.atomic.store(value, order);
40    }
41
42    fn load(&self, order: atomic::Ordering) -> u64 {
43        self.atomic.load(order)
44    }
45}
46
47#[cfg(not(target_has_atomic = "64"))]
48impl WrappedAtomicU64 {
49    fn new(value: u64) -> WrappedAtomicU64 {
50        WrappedAtomicU64 {
51            atomic: Mutex::new(value),
52        }
53    }
54    fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
55        let mut guard = self.atomic.lock().unwrap();
56        let old = *guard;
57        *guard += value;
58        old
59    }
60    fn store(&self, value: u64, _order: atomic::Ordering) {
61        *self.atomic.lock().unwrap() = value;
62    }
63    fn load(&self, _order: atomic::Ordering) -> u64 {
64        *self.atomic.lock().unwrap()
65    }
66}
67
68static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
69    gst::DebugCategory::new(
70        "utilsrs-stream-producer",
71        gst::DebugColorFlags::empty(),
72        Some("gst_app Stream Producer interface"),
73    )
74});
75
76/// The interface for transporting media data from one node
77/// to another.
78///
79/// A producer is essentially a GStreamer `appsink` whose output
80/// is sent to a set of consumers, who are essentially `appsrc` wrappers
81#[derive(Debug, Clone)]
82pub struct StreamProducer(Arc<StreamProducerInner>);
83
84impl PartialEq for StreamProducer {
85    fn eq(&self, other: &Self) -> bool {
86        self.0.appsink.eq(&other.0.appsink)
87    }
88}
89
90impl Eq for StreamProducer {}
91
92#[derive(Debug)]
93struct StreamProducerInner {
94    /// The appsink to dispatch data for
95    appsink: gst_app::AppSink,
96    /// The pad probe on the appsink=
97    appsink_probe_id: Option<gst::PadProbeId>,
98    /// The consumers to dispatch data to
99    consumers: Arc<Mutex<StreamConsumers>>,
100}
101
102impl Drop for StreamProducerInner {
103    fn drop(&mut self) {
104        if let Some(probe_id) = self.appsink_probe_id.take() {
105            let pad = self.appsink.static_pad("sink").unwrap();
106            pad.remove_probe(probe_id);
107        }
108
109        self.appsink
110            .set_callbacks(gst_app::AppSinkCallbacks::builder().build());
111    }
112}
113
114/// User defined producer settings
115///
116/// Defaults to:
117///
118/// * `sync` <- `true` (sync on the clock)
119///
120/// Use `ConsumerSettings::builder()` if you need different values.
121#[derive(Debug, Clone, Eq, PartialEq, Hash)]
122pub struct ProducerSettings {
123    pub sync: bool,
124}
125
126impl Default for ProducerSettings {
127    fn default() -> Self {
128        ProducerSettings {
129            sync: DEFAULT_PRODUCER_SYNC,
130        }
131    }
132}
133
134/// Link between a `StreamProducer` and a consumer, disconnecting the link on `Drop`.
135/// The producer and consumer will stay alive while the link is.
136#[derive(Debug)]
137#[must_use]
138pub struct ConsumptionLink {
139    consumer: gst_app::AppSrc,
140    settings: ConsumerSettings,
141    producer: Option<StreamProducer>,
142    /// number of buffers dropped because `consumer` internal queue was full
143    dropped: Arc<WrappedAtomicU64>,
144    /// number of buffers pushed through `consumer`
145    pushed: Arc<WrappedAtomicU64>,
146    /// if buffers should not be pushed to the `consumer` right now
147    discard: Arc<atomic::AtomicBool>,
148    /// whether the link will drop delta frames until next keyframe on discont
149    wait_for_keyframe: Arc<atomic::AtomicBool>,
150}
151
152impl ConsumptionLink {
153    /// Create a new disconnected `ConsumptionLink`.
154    ///
155    /// The consumer will use the default configuration (see [StreamProducer::configure_consumer]).
156    /// If you need different settings, call [ConsumptionLink::disconnected_with] instead.
157    pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
158        StreamProducer::configure_consumer(&consumer);
159
160        ConsumptionLink {
161            consumer,
162            settings: ConsumerSettings::default(),
163            producer: None,
164            dropped: Arc::new(WrappedAtomicU64::new(0)),
165            pushed: Arc::new(WrappedAtomicU64::new(0)),
166            discard: Arc::new(atomic::AtomicBool::new(false)),
167            wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
168        }
169    }
170
171    /// Create a new disconnected `ConsumptionLink`.
172    pub fn disconnected_with(
173        consumer: gst_app::AppSrc,
174        settings: ConsumerSettings,
175    ) -> ConsumptionLink {
176        StreamProducer::configure_consumer(&consumer);
177
178        ConsumptionLink {
179            consumer,
180            settings,
181            producer: None,
182            dropped: Arc::new(WrappedAtomicU64::new(0)),
183            pushed: Arc::new(WrappedAtomicU64::new(0)),
184            discard: Arc::new(atomic::AtomicBool::new(false)),
185            wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
186        }
187    }
188
189    /// Replace the producer by a new one, keeping the existing consumer.
190    pub fn change_producer(
191        &mut self,
192        new_producer: &StreamProducer,
193        reset_stats: bool,
194    ) -> Result<(), AddConsumerError> {
195        self.disconnect();
196        if reset_stats {
197            self.dropped.store(0, atomic::Ordering::SeqCst);
198            self.pushed.store(0, atomic::Ordering::SeqCst);
199        }
200        new_producer.add_consumer_internal(
201            &self.consumer,
202            self.settings.clone(),
203            self.dropped.clone(),
204            self.pushed.clone(),
205            self.discard.clone(),
206            self.wait_for_keyframe.clone(),
207        )?;
208        self.producer = Some(new_producer.clone());
209        Ok(())
210    }
211
212    /// Disconnect the consumer from the producer
213    pub fn disconnect(&mut self) {
214        if let Some(producer) = self.producer.take() {
215            producer.remove_consumer(&self.consumer);
216        }
217    }
218
219    /// number of dropped buffers because the consumer internal queue was full
220    pub fn dropped(&self) -> u64 {
221        self.dropped.load(atomic::Ordering::SeqCst)
222    }
223
224    /// number of buffers pushed through this link
225    pub fn pushed(&self) -> u64 {
226        self.pushed.load(atomic::Ordering::SeqCst)
227    }
228
229    /// if buffers are currently pushed through this link
230    pub fn discard(&self) -> bool {
231        self.discard.load(atomic::Ordering::SeqCst)
232    }
233
234    /// If set to `true` then no buffers will be pushed through this link
235    pub fn set_discard(&self, discard: bool) {
236        self.discard.store(discard, atomic::Ordering::SeqCst)
237    }
238
239    /// if the link will drop frames until the next keyframe on discont
240    pub fn wait_for_keyframe(&self) -> bool {
241        self.wait_for_keyframe.load(atomic::Ordering::SeqCst)
242    }
243
244    /// If set to `true` then the link will drop delta-frames until the next
245    /// keyframe on discont (default behavior).
246    pub fn set_wait_for_keyframe(&self, wait: bool) {
247        self.wait_for_keyframe.store(wait, atomic::Ordering::SeqCst)
248    }
249
250    /// Get the GStreamer `appsrc` wrapped by this link
251    pub fn appsrc(&self) -> &gst_app::AppSrc {
252        &self.consumer
253    }
254
255    /// Get the `StreamProducer` currently by this link, if any.
256    pub fn stream_producer(&self) -> Option<&StreamProducer> {
257        self.producer.as_ref()
258    }
259
260    /// Get the settings for this Consumer.
261    pub fn settings(&self) -> ConsumerSettings {
262        self.settings.clone()
263    }
264}
265
266impl Drop for ConsumptionLink {
267    fn drop(&mut self) {
268        self.disconnect();
269    }
270}
271
272/// User defined consumer settings
273///
274/// Defaults to:
275///
276/// * `max-buffers` <- `0` (unlimited)
277/// * `max-bytes` <- `0` (unlimited)
278/// * `max-time` <- `500ms`
279/// * `event-types` <- `gst::UpstreamForceKeyUnitEvent`
280///   Note: force-key-unit events are always forwarded in addition to any
281///   of the `gst::EventType` set through this property
282///
283/// Use `ConsumerSettings::builder()` if you need different values.
284#[derive(Debug, Clone, Eq, PartialEq, Hash)]
285pub struct ConsumerSettings {
286    pub max_buffer: u64,
287    pub max_bytes: gst::format::Bytes,
288    pub max_time: gst::ClockTime,
289    pub event_types: Vec<gst::EventType>,
290}
291
292impl Default for ConsumerSettings {
293    fn default() -> Self {
294        ConsumerSettings {
295            max_buffer: DEFAULT_CONSUMER_MAX_BUFFERS,
296            max_bytes: DEFAULT_CONSUMER_MAX_BYTES,
297            max_time: DEFAULT_CONSUMER_MAX_TIME,
298            event_types: Vec::new(),
299        }
300    }
301}
302
303#[derive(Debug, Error)]
304/// Error type returned when adding consumers to producers.
305pub enum AddConsumerError {
306    #[error("Consumer already added")]
307    /// Consumer has already been added to this producer.
308    AlreadyAdded,
309}
310
311impl StreamProducer {
312    /// Configures a consumer `appsrc` for later use in a `StreamProducer`.
313    ///
314    /// This function configures the `appsrc` in a suitable state to act as a consumer
315    /// and also sets the internal queue properties as follows:
316    ///
317    /// * `max-buffers` <- `0` (unlimited)
318    /// * `max-bytes` <- `0` (unlimited)
319    /// * `max-time` <- `500ms`
320    ///
321    /// If you need different settings, call [`StreamProducer::configure_consumer_with`] instead.
322    ///
323    /// This is automatically invoked when calling [`StreamProducer::add_consumer`].
324    pub fn configure_consumer(consumer: &gst_app::AppSrc) {
325        Self::configure_consumer_with(consumer, ConsumerSettings::default());
326    }
327
328    /// Configures a consumer `appsrc` for later use in a `StreamProducer`.
329    ///
330    /// This function configures the `appsrc` in a suitable state to act as a consumer
331    /// and applies the provided settings.
332    ///
333    /// If unsure, call [`StreamProducer::configure_consumer`] instead.
334    pub fn configure_consumer_with(consumer: &gst_app::AppSrc, settings: ConsumerSettings) {
335        // Latency on the appsrc is set by the publisher before the first buffer
336        // and whenever it changes
337        consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
338        consumer.set_format(gst::Format::Time);
339        consumer.set_is_live(true);
340        consumer.set_handle_segment_change(true);
341        consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
342        consumer.set_automatic_eos(false);
343
344        consumer.set_max_buffers(settings.max_buffer);
345        consumer.set_max_bytes(settings.max_bytes.into());
346        consumer.set_max_time(settings.max_time);
347    }
348
349    /// Adds an `appsrc` to dispatch data to.
350    ///
351    /// This function configures the `appsrc` in a suitable state to act as a consumer
352    /// and also sets the internal queue properties as follows:
353    ///
354    /// * `max-buffers` <- `0` (unlimited)
355    /// * `max-bytes` <- `0` (unlimited)
356    /// * `max-time` <- `500ms`
357    ///
358    /// If you need different values, call [`StreamProducer::add_consumer_with`] instead.
359    ///
360    /// Dropping the returned `ConsumptionLink` will automatically disconnect the consumer from the producer.
361    pub fn add_consumer(
362        &self,
363        consumer: &gst_app::AppSrc,
364    ) -> Result<ConsumptionLink, AddConsumerError> {
365        let dropped = Arc::new(WrappedAtomicU64::new(0));
366        let pushed = Arc::new(WrappedAtomicU64::new(0));
367        let discard = Arc::new(atomic::AtomicBool::new(false));
368        let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
369
370        self.add_consumer_internal(
371            consumer,
372            ConsumerSettings::default(),
373            dropped.clone(),
374            pushed.clone(),
375            discard.clone(),
376            wait_for_keyframe.clone(),
377        )?;
378
379        Ok(ConsumptionLink {
380            consumer: consumer.clone(),
381            settings: ConsumerSettings::default(),
382            producer: Some(self.clone()),
383            dropped,
384            pushed,
385            discard,
386            wait_for_keyframe,
387        })
388    }
389
390    /// Adds a configured `appsrc` to dispatch data to.
391    ///
392    /// This function configures the `appsrc` in a suitable state to act as a consumer
393    /// and applies the provided settings.
394    ///
395    /// If unsure, call [`StreamProducer::add_consumer`] instead.
396    ///
397    /// Dropping the returned `ConsumptionLink` will automatically disconnect the consumer from the producer.
398    pub fn add_consumer_with(
399        &self,
400        consumer: &gst_app::AppSrc,
401        settings: ConsumerSettings,
402    ) -> Result<ConsumptionLink, AddConsumerError> {
403        let dropped = Arc::new(WrappedAtomicU64::new(0));
404        let pushed = Arc::new(WrappedAtomicU64::new(0));
405        let discard = Arc::new(atomic::AtomicBool::new(false));
406        let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
407
408        self.add_consumer_internal(
409            consumer,
410            settings.clone(),
411            dropped.clone(),
412            pushed.clone(),
413            discard.clone(),
414            wait_for_keyframe.clone(),
415        )?;
416
417        Ok(ConsumptionLink {
418            consumer: consumer.clone(),
419            settings,
420            producer: Some(self.clone()),
421            dropped,
422            pushed,
423            discard,
424            wait_for_keyframe,
425        })
426    }
427
428    fn add_consumer_internal(
429        &self,
430        consumer: &gst_app::AppSrc,
431        settings: ConsumerSettings,
432        dropped: Arc<WrappedAtomicU64>,
433        pushed: Arc<WrappedAtomicU64>,
434        discard: Arc<atomic::AtomicBool>,
435        wait_for_keyframe: Arc<atomic::AtomicBool>,
436    ) -> Result<(), AddConsumerError> {
437        let mut consumers = self.0.consumers.lock().unwrap();
438        if consumers.consumers.contains_key(consumer) {
439            gst::error!(
440                CAT,
441                obj = &self.0.appsink,
442                "Consumer {} ({:?}) already added",
443                consumer.name(),
444                consumer
445            );
446            return Err(AddConsumerError::AlreadyAdded);
447        }
448
449        gst::debug!(
450            CAT,
451            obj = &self.0.appsink,
452            "Adding consumer {} ({:?})",
453            consumer.name(),
454            consumer
455        );
456
457        let settings_clone = settings.clone();
458        Self::configure_consumer_with(consumer, settings);
459
460        // Forward force-keyunit events upstream to the appsink
461        let srcpad = consumer.static_pad("src").unwrap();
462        let fku_probe_id = srcpad
463            .add_probe(
464                gst::PadProbeType::EVENT_UPSTREAM,
465                glib::clone!(
466                    #[weak(rename_to = appsink)]
467                    self.0.appsink,
468                    #[upgrade_or_panic]
469                    move |_pad, info| {
470                        let Some(event) = info.event() else {
471                            return gst::PadProbeReturn::Ok;
472                        };
473
474                        if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok()
475                            || settings_clone.event_types.contains(&event.type_())
476                        {
477                            if gst_video::ForceKeyUnitEvent::is(event) {
478                                gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
479                            } else {
480                                gst::debug!(
481                                    CAT,
482                                    obj = &appsink,
483                                    "pushing upstream event {:?}",
484                                    event
485                                );
486                            }
487                            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
488                            let pad = appsink.static_pad("sink").unwrap();
489                            let _ = pad.push_event(event.clone());
490                        }
491
492                        gst::PadProbeReturn::Ok
493                    }
494                ),
495            )
496            .unwrap();
497
498        let stream_consumer = StreamConsumer::new(
499            consumer,
500            fku_probe_id,
501            dropped,
502            pushed,
503            discard,
504            wait_for_keyframe,
505        );
506
507        consumers
508            .consumers
509            .insert(consumer.clone(), stream_consumer);
510
511        // forward selected sticky events. We can send those now as appsrc will delay the events
512        // until stream-start, caps and segment are sent.
513        let events_to_forward = consumers.events_to_forward.clone();
514        // drop the lock before sending events
515        drop(consumers);
516
517        let appsink_pad = self.0.appsink.static_pad("sink").unwrap();
518        appsink_pad.sticky_events_foreach(|event| {
519            if events_to_forward.contains(&event.type_()) {
520                gst::debug!(
521                    CAT,
522                    obj = &self.0.appsink,
523                    "forward sticky event {:?}",
524                    event
525                );
526                consumer.send_event(event.clone());
527            }
528
529            std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
530        });
531
532        Ok(())
533    }
534
535    fn process_sample(
536        sample: gst::Sample,
537        appsink: &gst_app::AppSink,
538        mut consumers: MutexGuard<StreamConsumers>,
539    ) -> Result<gst::FlowSuccess, gst::FlowError> {
540        let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
541            let flags = buf.flags();
542
543            (
544                flags.contains(gst::BufferFlags::DISCONT),
545                !flags.contains(gst::BufferFlags::DELTA_UNIT),
546            )
547        } else {
548            (false, true)
549        };
550
551        gst::trace!(
552            CAT,
553            obj = appsink,
554            "processing sample {:?}",
555            sample.buffer()
556        );
557
558        let latency = consumers.current_latency;
559        let latency_updated = mem::replace(&mut consumers.latency_updated, false);
560
561        let mut needs_keyframe_request = false;
562
563        let current_consumers = consumers
564            .consumers
565            .values()
566            .filter_map(|consumer| {
567                if let Some(latency) = latency {
568                    if consumer
569                        .forwarded_latency
570                        .compare_exchange(
571                            false,
572                            true,
573                            atomic::Ordering::SeqCst,
574                            atomic::Ordering::SeqCst,
575                        )
576                        .is_ok()
577                        || latency_updated
578                    {
579                        gst::info!(CAT, obj = appsink, "setting new latency: {latency}");
580                        consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
581                    }
582                }
583
584                if consumer.discard.load(atomic::Ordering::SeqCst) {
585                    consumer
586                        .needs_keyframe
587                        .store(true, atomic::Ordering::SeqCst);
588                    return None;
589                }
590
591                if is_discont
592                    && !is_keyframe
593                    && consumer.wait_for_keyframe.load(atomic::Ordering::SeqCst)
594                {
595                    // Whenever we have a discontinuity, we need a new keyframe
596                    consumer
597                        .needs_keyframe
598                        .store(true, atomic::Ordering::SeqCst);
599                }
600
601                if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
602                    // If we need a keyframe (and this one isn't) request a keyframe upstream
603                    if !needs_keyframe_request {
604                        gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
605                        needs_keyframe_request = true;
606                    }
607
608                    consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
609
610                    gst::error!(
611                        CAT,
612                        obj = appsink,
613                        "Ignoring frame for {} while waiting for a keyframe",
614                        consumer.appsrc.name()
615                    );
616                    None
617                } else {
618                    consumer
619                        .needs_keyframe
620                        .store(false, atomic::Ordering::SeqCst);
621                    consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
622
623                    Some(consumer.appsrc.clone())
624                }
625            })
626            .collect::<Vec<_>>();
627
628        drop(consumers);
629
630        if needs_keyframe_request {
631            // Do not use `gst_element_send_event()` as it takes the state lock which may lead to dead locks.
632            let pad = appsink.static_pad("sink").unwrap();
633            pad.push_event(
634                gst_video::UpstreamForceKeyUnitEvent::builder()
635                    .all_headers(true)
636                    .build(),
637            );
638        }
639
640        for consumer in current_consumers {
641            if let Err(err) = consumer.push_sample(&sample) {
642                gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
643            }
644        }
645        Ok(gst::FlowSuccess::Ok)
646    }
647
648    /// Remove a consumer appsrc by id
649    pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
650        let name = consumer.name();
651        if self
652            .0
653            .consumers
654            .lock()
655            .unwrap()
656            .consumers
657            .remove(consumer)
658            .is_some()
659        {
660            gst::debug!(
661                CAT,
662                obj = &self.0.appsink,
663                "Removed consumer {} ({:?})",
664                name,
665                consumer
666            );
667            consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
668        } else {
669            gst::debug!(
670                CAT,
671                obj = &self.0.appsink,
672                "Consumer {} ({:?}) not found",
673                name,
674                consumer
675            );
676        }
677    }
678
679    /// configure event types the appsink should forward to all its consumers (default: `Eos`).
680    pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
681        self.0.consumers.lock().unwrap().events_to_forward =
682            events_to_forward.into_iter().collect();
683    }
684
685    /// get event types the appsink should forward to all its consumers
686    pub fn get_forwarded_events(&self) -> Vec<gst::EventType> {
687        self.0.consumers.lock().unwrap().events_to_forward.clone()
688    }
689
690    /// configure whether the preroll sample should be forwarded (default: `true`)
691    pub fn set_forward_preroll(&self, forward_preroll: bool) {
692        self.0.consumers.lock().unwrap().forward_preroll = forward_preroll;
693    }
694
695    /// Get the GStreamer `appsink` wrapped by this producer
696    pub fn appsink(&self) -> &gst_app::AppSink {
697        &self.0.appsink
698    }
699
700    /// Signals an error on all consumers
701    pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
702        let consumers = self.0.consumers.lock().unwrap();
703
704        for consumer in consumers.consumers.keys() {
705            let mut msg_builder =
706                gst::message::Error::builder_from_error(error.clone()).src(consumer);
707            if let Some(debug) = debug {
708                msg_builder = msg_builder.debug(debug);
709            }
710
711            let _ = consumer.post_message(msg_builder.build());
712        }
713    }
714
715    /// The last sample produced by this producer.
716    pub fn last_sample(&self) -> Option<gst::Sample> {
717        self.0.appsink.property("last-sample")
718    }
719}
720
721impl StreamProducer {
722    /// Adds an `appsink` to dispatch data from.
723    ///
724    /// This function configures the `appsink` in a suitable state to act as a producer
725    /// and also sets the properties as follows:
726    ///
727    /// * `sync` <- `true` (sync on the clock)
728    ///
729    /// If you need a different value, use [`StreamProducer::with`] instead.
730    pub fn from(appsink: &gst_app::AppSink) -> Self {
731        Self::with(appsink, ProducerSettings::default())
732    }
733
734    /// Adds an `appsink` to dispatch data from.
735    ///
736    /// This function configures the `appsink` in a suitable state to act as a producer
737    /// and applies the provided settings.
738    ///
739    /// If unsure, use [`StreamProducer::from`] instead.
740    pub fn with(appsink: &gst_app::AppSink, settings: ProducerSettings) -> Self {
741        let consumers = Arc::new(Mutex::new(StreamConsumers {
742            current_latency: None,
743            latency_updated: false,
744            consumers: HashMap::new(),
745            // it would make sense to automatically forward more events such as Tag but that would break
746            // with older GStreamer, see https://gitlab.freedesktop.org/gstreamer/gstreamer/-/merge_requests/4297
747            events_to_forward: vec![gst::EventType::Eos, gst::EventType::Gap],
748            forward_preroll: true,
749            just_forwarded_preroll: false,
750        }));
751
752        appsink.set_sync(settings.sync);
753
754        appsink.set_callbacks(
755            gst_app::AppSinkCallbacks::builder()
756                .new_sample(glib::clone!(
757                    #[strong]
758                    consumers,
759                    move |appsink| {
760                        let mut consumers = consumers.lock().unwrap();
761
762                        let sample = match appsink.pull_sample() {
763                            Ok(sample) => sample,
764                            Err(_err) => {
765                                gst::debug!(CAT, obj = appsink, "Failed to pull sample");
766                                return Err(gst::FlowError::Flushing);
767                            }
768                        };
769
770                        let just_forwarded_preroll =
771                            mem::replace(&mut consumers.just_forwarded_preroll, false);
772
773                        if just_forwarded_preroll {
774                            return Ok(gst::FlowSuccess::Ok);
775                        }
776
777                        StreamProducer::process_sample(sample, appsink, consumers)
778                    }
779                ))
780                .new_preroll(glib::clone!(
781                    #[strong]
782                    consumers,
783                    move |appsink| {
784                        let mut consumers = consumers.lock().unwrap();
785
786                        let sample = match appsink.pull_preroll() {
787                            Ok(sample) => sample,
788                            Err(_err) => {
789                                gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
790                                return Err(gst::FlowError::Flushing);
791                            }
792                        };
793
794                        if consumers.forward_preroll {
795                            consumers.just_forwarded_preroll = true;
796
797                            StreamProducer::process_sample(sample, appsink, consumers)
798                        } else {
799                            Ok(gst::FlowSuccess::Ok)
800                        }
801                    }
802                ))
803                .new_event(glib::clone!(
804                    #[strong]
805                    consumers,
806                    move |appsink| {
807                        match appsink
808                            .pull_object()
809                            .map(|obj| obj.downcast::<gst::Event>())
810                        {
811                            Ok(Ok(event)) => {
812                                let (events_to_forward, appsrcs) = {
813                                    // clone so we don't keep the lock while pushing events
814                                    let consumers = consumers.lock().unwrap();
815                                    let events = consumers.events_to_forward.clone();
816                                    let appsrcs =
817                                        consumers.consumers.keys().cloned().collect::<Vec<_>>();
818
819                                    (events, appsrcs)
820                                };
821
822                                if events_to_forward.contains(&event.type_()) {
823                                    for appsrc in appsrcs {
824                                        appsrc.send_event(event.clone());
825                                    }
826                                }
827                            }
828                            Ok(Err(_)) => {} // pulled another unsupported object type, ignore
829                            Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
830                        }
831
832                        false
833                    }
834                ))
835                .eos(glib::clone!(
836                    #[strong]
837                    consumers,
838                    move |appsink| {
839                        let stream_consumers = consumers.lock().unwrap();
840
841                        if stream_consumers
842                            .events_to_forward
843                            .contains(&gst::EventType::Eos)
844                        {
845                            let current_consumers = stream_consumers
846                                .consumers
847                                .values()
848                                .map(|c| c.appsrc.clone())
849                                .collect::<Vec<_>>();
850                            drop(stream_consumers);
851
852                            for consumer in current_consumers {
853                                gst::debug!(
854                                    CAT,
855                                    obj = appsink,
856                                    "set EOS on consumer {}",
857                                    consumer.name()
858                                );
859                                let _ = consumer.end_of_stream();
860                            }
861                        } else {
862                            gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
863                        }
864                    }
865                ))
866                .build(),
867        );
868
869        let sinkpad = appsink.static_pad("sink").unwrap();
870        let appsink_probe_id = if settings.sync {
871            // If appsink syncs on the clock,
872            // we need to propagate the latency consolidated by the producer's pipeline,
873            // which uses the max latency among all the branches. This value can be
874            // captured by observing the Latency event posted by the pipeline
875            // upon latency recalculation.
876            sinkpad
877                .add_probe(
878                    gst::PadProbeType::EVENT_UPSTREAM,
879                    glib::clone!(
880                        #[strong]
881                        consumers,
882                        move |_pad, info| {
883                            let Some(event) = info.event() else {
884                                unreachable!();
885                            };
886                            let gst::EventView::Latency(event) = event.view() else {
887                                return gst::PadProbeReturn::Ok;
888                            };
889
890                            let mut consumers = consumers.lock().unwrap();
891                            consumers.current_latency = Some(event.latency());
892                            consumers.latency_updated = true;
893
894                            gst::PadProbeReturn::Ok
895                        }
896                    ),
897                )
898                .unwrap()
899        } else {
900            // If appsink doesn't sync on the clock,
901            // only the latency from current branch needs to be considered.
902            // In this case, the Latency event doesn't take into account current branch.
903            // We can still capture current branch's latency by observing the result
904            // of the Latency query set by upstream elements.
905            //
906            // For reference:
907            //
908            // * BaseSink reports being non-live if the sink doesn't sync on the clock:
909            //   https://gitlab.freedesktop.org/gstreamer/gstreamer/-/blob/9669136207c6a2d35c45c9460f9a838cb5a97379/subprojects/gstreamer/libs/gst/base/gstbasesink.c#L1297
910            // * Bin-level consolidation skips non-live branches:
911            //   https://gitlab.freedesktop.org/gstreamer/gstreamer/-/blob/9669136207c6a2d35c45c9460f9a838cb5a97379/subprojects/gstreamer/gst/gstbin.c#L4166
912            sinkpad
913                .add_probe(
914                    gst::PadProbeType::QUERY_UPSTREAM | gst::PadProbeType::PULL,
915                    glib::clone!(
916                        #[strong]
917                        consumers,
918                        move |_pad, info| {
919                            let Some(query) = info.query() else {
920                                unreachable!();
921                            };
922                            let gst::QueryView::Latency(query) = query.view() else {
923                                return gst::PadProbeReturn::Ok;
924                            };
925
926                            let mut consumers = consumers.lock().unwrap();
927                            consumers.current_latency = Some(query.result().1);
928                            consumers.latency_updated = true;
929
930                            gst::PadProbeReturn::Ok
931                        }
932                    ),
933                )
934                .unwrap()
935        };
936
937        StreamProducer(Arc::new(StreamProducerInner {
938            appsink: appsink.clone(),
939            appsink_probe_id: Some(appsink_probe_id),
940            consumers,
941        }))
942    }
943}
944
945/// Wrapper around a HashMap of consumers, exists for thread safety
946/// and also protects some of the producer state
947#[derive(Debug)]
948struct StreamConsumers {
949    /// The currently-observed latency
950    current_latency: Option<gst::ClockTime>,
951    /// Whether the consumers' appsrc latency needs updating
952    latency_updated: bool,
953    /// The consumers, AppSrc pointer value -> consumer
954    consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
955    /// What events should be forwarded to consumers
956    events_to_forward: Vec<gst::EventType>,
957    /// Whether the preroll sample should be forwarded at all
958    forward_preroll: bool,
959    /// Whether we just forwarded the preroll sample. When we did we want to
960    /// discard the next sample from on_new_sample as it would cause us to
961    /// otherwise push out the same sample twice to consumers.
962    just_forwarded_preroll: bool,
963}
964
965/// Wrapper around a consumer's `appsrc`
966#[derive(Debug)]
967struct StreamConsumer {
968    /// The GStreamer `appsrc` of the consumer
969    appsrc: gst_app::AppSrc,
970    /// The id of a pad probe that intercepts force-key-unit events
971    fku_probe_id: Option<gst::PadProbeId>,
972    /// Whether an initial latency was forwarded to the `appsrc`
973    forwarded_latency: atomic::AtomicBool,
974    /// Whether a first buffer has made it through, used to determine
975    /// whether a new key unit should be requested. Only useful for encoded
976    /// streams.
977    needs_keyframe: Arc<atomic::AtomicBool>,
978    /// number of buffers dropped because `appsrc` internal queue was full
979    dropped: Arc<WrappedAtomicU64>,
980    /// number of buffers pushed through `appsrc`
981    pushed: Arc<WrappedAtomicU64>,
982    /// if buffers should not be pushed to the `appsrc` right now
983    discard: Arc<atomic::AtomicBool>,
984    /// whether the consumer should drop delta frames until next keyframe on discont
985    wait_for_keyframe: Arc<atomic::AtomicBool>,
986}
987
988impl StreamConsumer {
989    /// Create a new consumer
990    fn new(
991        appsrc: &gst_app::AppSrc,
992        fku_probe_id: gst::PadProbeId,
993        dropped: Arc<WrappedAtomicU64>,
994        pushed: Arc<WrappedAtomicU64>,
995        discard: Arc<atomic::AtomicBool>,
996        wait_for_keyframe: Arc<atomic::AtomicBool>,
997    ) -> Self {
998        let needs_keyframe = Arc::new(atomic::AtomicBool::new(
999            wait_for_keyframe.load(atomic::Ordering::SeqCst),
1000        ));
1001        let needs_keyframe_clone = needs_keyframe.clone();
1002        let wait_for_keyframe_clone = wait_for_keyframe.clone();
1003        let dropped_clone = dropped.clone();
1004
1005        appsrc.set_callbacks(
1006            gst_app::AppSrcCallbacks::builder()
1007                .enough_data(move |appsrc| {
1008                    gst::debug!(
1009                        CAT,
1010                        obj = appsrc,
1011                        "consumer {} ({appsrc:?}) is not consuming fast enough, old samples are getting dropped",
1012                        appsrc.name(),
1013                    );
1014
1015                    needs_keyframe_clone.store(wait_for_keyframe_clone.load(atomic::Ordering::SeqCst), atomic::Ordering::SeqCst);
1016                    dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
1017
1018                    let _  = appsrc.post_message(gst::message::Element::builder(
1019                        gst::Structure::new_empty("dropped-buffer")).src(appsrc).build()
1020                    );
1021                })
1022                .build(),
1023        );
1024
1025        StreamConsumer {
1026            appsrc: appsrc.clone(),
1027            fku_probe_id: Some(fku_probe_id),
1028            forwarded_latency: atomic::AtomicBool::new(false),
1029            needs_keyframe,
1030            dropped,
1031            pushed,
1032            discard,
1033            wait_for_keyframe,
1034        }
1035    }
1036}
1037
1038impl Drop for StreamConsumer {
1039    fn drop(&mut self) {
1040        if let Some(fku_probe_id) = self.fku_probe_id.take() {
1041            let srcpad = self.appsrc.static_pad("src").unwrap();
1042            srcpad.remove_probe(fku_probe_id);
1043        }
1044    }
1045}
1046
1047impl PartialEq for StreamConsumer {
1048    fn eq(&self, other: &Self) -> bool {
1049        self.appsrc.eq(&other.appsrc)
1050    }
1051}
1052
1053impl Eq for StreamConsumer {}
1054
1055impl std::hash::Hash for StreamConsumer {
1056    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1057        std::hash::Hash::hash(&self.appsrc, state);
1058    }
1059}
1060
1061impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
1062    #[inline]
1063    fn borrow(&self) -> &gst_app::AppSrc {
1064        &self.appsrc
1065    }
1066}
1067
1068#[cfg(test)]
1069mod tests {
1070    use std::{
1071        str::FromStr,
1072        sync::{Arc, Mutex},
1073    };
1074
1075    use futures::{
1076        channel::{mpsc, mpsc::Receiver},
1077        SinkExt, StreamExt,
1078    };
1079    use gst::prelude::*;
1080
1081    use crate::{streamproducer::ConsumerSettings, ConsumptionLink, StreamProducer};
1082
1083    fn create_producer() -> (
1084        gst::Pipeline,
1085        gst_app::AppSrc,
1086        gst_app::AppSink,
1087        StreamProducer,
1088    ) {
1089        let producer_pipe =
1090            gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
1091                .unwrap()
1092                .downcast::<gst::Pipeline>()
1093                .unwrap();
1094        let producer_sink = producer_pipe
1095            .by_name("producer_sink")
1096            .unwrap()
1097            .downcast::<gst_app::AppSink>()
1098            .unwrap();
1099
1100        (
1101            producer_pipe.clone(),
1102            producer_pipe
1103                .by_name("producer_src")
1104                .unwrap()
1105                .downcast::<gst_app::AppSrc>()
1106                .unwrap(),
1107            producer_sink.clone(),
1108            StreamProducer::from(&producer_sink),
1109        )
1110    }
1111
1112    struct Consumer {
1113        pipeline: gst::Pipeline,
1114        src: gst_app::AppSrc,
1115        sink: gst_app::AppSink,
1116        receiver: Mutex<Receiver<gst::Sample>>,
1117        connected: Mutex<bool>,
1118    }
1119
1120    impl Consumer {
1121        fn new(id: &str) -> Self {
1122            let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
1123                .unwrap()
1124                .downcast::<gst::Pipeline>()
1125                .unwrap();
1126
1127            let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
1128            let sender = Arc::new(Mutex::new(sender));
1129            let sink = pipeline
1130                .by_name("sink")
1131                .unwrap()
1132                .downcast::<gst_app::AppSink>()
1133                .unwrap();
1134
1135            sink.set_callbacks(
1136                gst_app::AppSinkCallbacks::builder()
1137                    // Add a handler to the "new-sample" signal.
1138                    .new_sample(move |appsink| {
1139                        // Pull the sample in question out of the appsink's buffer.
1140                        let sender_clone = sender.clone();
1141                        futures::executor::block_on(
1142                            sender_clone
1143                                .lock()
1144                                .unwrap()
1145                                .send(appsink.pull_sample().unwrap()),
1146                        )
1147                        .unwrap();
1148
1149                        Ok(gst::FlowSuccess::Ok)
1150                    })
1151                    .build(),
1152            );
1153
1154            Self {
1155                pipeline: pipeline.clone(),
1156                src: pipeline
1157                    .by_name(id)
1158                    .unwrap()
1159                    .downcast::<gst_app::AppSrc>()
1160                    .unwrap(),
1161                sink,
1162                receiver: Mutex::new(receiver),
1163                connected: Mutex::new(false),
1164            }
1165        }
1166
1167        fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
1168            {
1169                let mut connected = self.connected.lock().unwrap();
1170                *connected = true;
1171            }
1172
1173            producer.add_consumer(&self.src).unwrap()
1174        }
1175
1176        fn disconnect(&self, producer: &StreamProducer) {
1177            {
1178                let mut connected = self.connected.lock().unwrap();
1179                *connected = false;
1180            }
1181
1182            producer.remove_consumer(&self.src);
1183        }
1184    }
1185
1186    #[test]
1187    fn simple() {
1188        gst::init().unwrap();
1189
1190        let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
1191        producer_pipe
1192            .set_state(gst::State::Playing)
1193            .expect("Couldn't set producer pipeline state");
1194
1195        let mut consumers: Vec<Consumer> = Vec::new();
1196        let consumer = Consumer::new("consumer1");
1197        let link1 = consumer.connect(&producer);
1198        consumer
1199            .pipeline
1200            .set_state(gst::State::Playing)
1201            .expect("Couldn't set producer pipeline state");
1202        consumers.push(consumer);
1203
1204        let consumer = Consumer::new("consumer2");
1205        let link2 = consumer.connect(&producer);
1206        consumer
1207            .pipeline
1208            .set_state(gst::State::Playing)
1209            .expect("Couldn't set producer pipeline state");
1210        consumers.push(consumer);
1211
1212        assert!(producer.last_sample().is_none());
1213
1214        for i in 0..10 {
1215            let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
1216            producer_src.set_caps(Some(&caps));
1217            producer_src.push_buffer(gst::Buffer::new()).unwrap();
1218
1219            for consumer in &consumers {
1220                if *consumer.connected.lock().unwrap() {
1221                    let sample =
1222                        futures::executor::block_on(consumer.receiver.lock().unwrap().next())
1223                            .expect("Received an empty buffer?");
1224                    sample.buffer().expect("No buffer on the sample?");
1225                    assert_eq!(sample.caps(), Some(caps.as_ref()));
1226                } else {
1227                    debug_assert!(
1228                        consumer
1229                            .sink
1230                            .try_pull_sample(gst::ClockTime::from_nseconds(0))
1231                            .is_none(),
1232                        "Disconnected consumer got a new sample?!"
1233                    );
1234                }
1235            }
1236
1237            if i == 5 {
1238                consumers.first().unwrap().disconnect(&producer);
1239            }
1240        }
1241
1242        assert!(producer.last_sample().is_some());
1243
1244        assert_eq!(link1.pushed(), 6);
1245        assert_eq!(link1.dropped(), 0);
1246        assert_eq!(link2.pushed(), 10);
1247        assert_eq!(link2.dropped(), 0);
1248    }
1249
1250    fn check_consumer_commons(consumer: &gst_app::AppSrc) {
1251        assert_eq!(
1252            consumer.latency(),
1253            (Some(gst::ClockTime::ZERO), gst::ClockTime::NONE)
1254        );
1255        assert_eq!(consumer.format(), gst::Format::Time);
1256        assert!(consumer.is_live());
1257        assert!(consumer.is_handle_segment_change());
1258        assert_eq!(consumer.leaky_type(), gst_app::AppLeakyType::Downstream);
1259        assert!(!consumer.property::<bool>("automatic-eos"));
1260    }
1261
1262    #[test]
1263    fn configure_consumer_defaults() {
1264        gst::init().unwrap();
1265
1266        let consumer = gst_app::AppSrc::builder().build();
1267        StreamProducer::configure_consumer(&consumer);
1268        check_consumer_commons(&consumer);
1269
1270        assert_eq!(consumer.max_buffers(), 0);
1271        assert_eq!(consumer.max_bytes(), 0);
1272        assert_eq!(consumer.max_time(), 500.mseconds());
1273    }
1274
1275    #[test]
1276    fn configure_consumer_with_defaults() {
1277        gst::init().unwrap();
1278
1279        let consumer = gst_app::AppSrc::builder().build();
1280        StreamProducer::configure_consumer_with(&consumer, ConsumerSettings::default());
1281        check_consumer_commons(&consumer);
1282
1283        assert_eq!(consumer.max_buffers(), 0);
1284        assert_eq!(consumer.max_bytes(), 0);
1285        assert_eq!(consumer.max_time(), 500.mseconds());
1286    }
1287
1288    #[test]
1289    fn configure_consumer_with_specifics() {
1290        gst::init().unwrap();
1291
1292        let consumer = gst_app::AppSrc::builder().build();
1293
1294        StreamProducer::configure_consumer_with(
1295            &consumer,
1296            ConsumerSettings {
1297                max_buffer: 50,
1298                ..Default::default()
1299            },
1300        );
1301        check_consumer_commons(&consumer);
1302
1303        assert_eq!(consumer.max_buffers(), 50);
1304        assert_eq!(consumer.max_bytes(), 0);
1305        assert_eq!(consumer.max_time(), 500.mseconds());
1306
1307        StreamProducer::configure_consumer_with(
1308            &consumer,
1309            ConsumerSettings {
1310                max_buffer: 10,
1311                max_bytes: 2.mebibytes(),
1312                ..Default::default()
1313            },
1314        );
1315        check_consumer_commons(&consumer);
1316
1317        assert_eq!(consumer.max_buffers(), 10);
1318        assert_eq!(consumer.max_bytes(), 2 * 1024 * 1024);
1319        assert_eq!(consumer.max_time(), 500.mseconds());
1320
1321        StreamProducer::configure_consumer_with(
1322            &consumer,
1323            ConsumerSettings {
1324                max_time: gst::ClockTime::ZERO,
1325                ..Default::default()
1326            },
1327        );
1328        check_consumer_commons(&consumer);
1329
1330        assert_eq!(consumer.max_buffers(), 0);
1331        assert_eq!(consumer.max_bytes(), 0);
1332        assert!(consumer.max_time().is_zero());
1333
1334        StreamProducer::configure_consumer_with(
1335            &consumer,
1336            ConsumerSettings {
1337                max_time: 750.mseconds(),
1338                ..Default::default()
1339            },
1340        );
1341        check_consumer_commons(&consumer);
1342
1343        assert_eq!(consumer.max_buffers(), 0);
1344        assert_eq!(consumer.max_bytes(), 0);
1345        assert_eq!(consumer.max_time(), 750.mseconds());
1346    }
1347}