1use std::{
2 collections::HashMap,
3 mem,
4 sync::{atomic, Arc, Mutex, MutexGuard},
5};
6
7use gst::{glib, prelude::*};
8use std::sync::LazyLock;
9use thiserror::Error;
10
11pub const DEFAULT_PRODUCER_SYNC: bool = true;
12
13pub const DEFAULT_CONSUMER_MAX_BUFFERS: u64 = 0;
14pub const DEFAULT_CONSUMER_MAX_BYTES: gst::format::Bytes = gst::format::Bytes::ZERO;
15pub const DEFAULT_CONSUMER_MAX_TIME: gst::ClockTime = gst::ClockTime::from_mseconds(500);
16
17#[derive(Debug)]
21struct WrappedAtomicU64 {
22 #[cfg(not(target_has_atomic = "64"))]
23 atomic: Mutex<u64>,
24 #[cfg(target_has_atomic = "64")]
25 atomic: atomic::AtomicU64,
26}
27
28#[cfg(target_has_atomic = "64")]
29impl WrappedAtomicU64 {
30 fn new(value: u64) -> WrappedAtomicU64 {
31 WrappedAtomicU64 {
32 atomic: atomic::AtomicU64::new(value),
33 }
34 }
35 fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
36 self.atomic.fetch_add(value, order)
37 }
38 fn store(&self, value: u64, order: atomic::Ordering) {
39 self.atomic.store(value, order);
40 }
41
42 fn load(&self, order: atomic::Ordering) -> u64 {
43 self.atomic.load(order)
44 }
45}
46
47#[cfg(not(target_has_atomic = "64"))]
48impl WrappedAtomicU64 {
49 fn new(value: u64) -> WrappedAtomicU64 {
50 WrappedAtomicU64 {
51 atomic: Mutex::new(value),
52 }
53 }
54 fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
55 let mut guard = self.atomic.lock().unwrap();
56 let old = *guard;
57 *guard += value;
58 old
59 }
60 fn store(&self, value: u64, _order: atomic::Ordering) {
61 *self.atomic.lock().unwrap() = value;
62 }
63 fn load(&self, _order: atomic::Ordering) -> u64 {
64 *self.atomic.lock().unwrap()
65 }
66}
67
68static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
69 gst::DebugCategory::new(
70 "utilsrs-stream-producer",
71 gst::DebugColorFlags::empty(),
72 Some("gst_app Stream Producer interface"),
73 )
74});
75
76#[derive(Debug, Clone)]
82pub struct StreamProducer(Arc<StreamProducerInner>);
83
84impl PartialEq for StreamProducer {
85 fn eq(&self, other: &Self) -> bool {
86 self.0.appsink.eq(&other.0.appsink)
87 }
88}
89
90impl Eq for StreamProducer {}
91
92#[derive(Debug)]
93struct StreamProducerInner {
94 appsink: gst_app::AppSink,
96 appsink_probe_id: Option<gst::PadProbeId>,
98 consumers: Arc<Mutex<StreamConsumers>>,
100}
101
102impl Drop for StreamProducerInner {
103 fn drop(&mut self) {
104 if let Some(probe_id) = self.appsink_probe_id.take() {
105 let pad = self.appsink.static_pad("sink").unwrap();
106 pad.remove_probe(probe_id);
107 }
108
109 self.appsink
110 .set_callbacks(gst_app::AppSinkCallbacks::builder().build());
111 }
112}
113
114#[derive(Debug, Clone, Eq, PartialEq, Hash)]
122pub struct ProducerSettings {
123 pub sync: bool,
124}
125
126impl Default for ProducerSettings {
127 fn default() -> Self {
128 ProducerSettings {
129 sync: DEFAULT_PRODUCER_SYNC,
130 }
131 }
132}
133
134#[derive(Debug)]
137#[must_use]
138pub struct ConsumptionLink {
139 consumer: gst_app::AppSrc,
140 settings: ConsumerSettings,
141 producer: Option<StreamProducer>,
142 dropped: Arc<WrappedAtomicU64>,
144 pushed: Arc<WrappedAtomicU64>,
146 discard: Arc<atomic::AtomicBool>,
148 wait_for_keyframe: Arc<atomic::AtomicBool>,
150}
151
152impl ConsumptionLink {
153 pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
158 StreamProducer::configure_consumer(&consumer);
159
160 ConsumptionLink {
161 consumer,
162 settings: ConsumerSettings::default(),
163 producer: None,
164 dropped: Arc::new(WrappedAtomicU64::new(0)),
165 pushed: Arc::new(WrappedAtomicU64::new(0)),
166 discard: Arc::new(atomic::AtomicBool::new(false)),
167 wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
168 }
169 }
170
171 pub fn disconnected_with(
173 consumer: gst_app::AppSrc,
174 settings: ConsumerSettings,
175 ) -> ConsumptionLink {
176 StreamProducer::configure_consumer(&consumer);
177
178 ConsumptionLink {
179 consumer,
180 settings,
181 producer: None,
182 dropped: Arc::new(WrappedAtomicU64::new(0)),
183 pushed: Arc::new(WrappedAtomicU64::new(0)),
184 discard: Arc::new(atomic::AtomicBool::new(false)),
185 wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
186 }
187 }
188
189 pub fn change_producer(
191 &mut self,
192 new_producer: &StreamProducer,
193 reset_stats: bool,
194 ) -> Result<(), AddConsumerError> {
195 self.disconnect();
196 if reset_stats {
197 self.dropped.store(0, atomic::Ordering::SeqCst);
198 self.pushed.store(0, atomic::Ordering::SeqCst);
199 }
200 new_producer.add_consumer_internal(
201 &self.consumer,
202 self.settings.clone(),
203 self.dropped.clone(),
204 self.pushed.clone(),
205 self.discard.clone(),
206 self.wait_for_keyframe.clone(),
207 )?;
208 self.producer = Some(new_producer.clone());
209 Ok(())
210 }
211
212 pub fn disconnect(&mut self) {
214 if let Some(producer) = self.producer.take() {
215 producer.remove_consumer(&self.consumer);
216 }
217 }
218
219 pub fn dropped(&self) -> u64 {
221 self.dropped.load(atomic::Ordering::SeqCst)
222 }
223
224 pub fn pushed(&self) -> u64 {
226 self.pushed.load(atomic::Ordering::SeqCst)
227 }
228
229 pub fn discard(&self) -> bool {
231 self.discard.load(atomic::Ordering::SeqCst)
232 }
233
234 pub fn set_discard(&self, discard: bool) {
236 self.discard.store(discard, atomic::Ordering::SeqCst)
237 }
238
239 pub fn wait_for_keyframe(&self) -> bool {
241 self.wait_for_keyframe.load(atomic::Ordering::SeqCst)
242 }
243
244 pub fn set_wait_for_keyframe(&self, wait: bool) {
247 self.wait_for_keyframe.store(wait, atomic::Ordering::SeqCst)
248 }
249
250 pub fn appsrc(&self) -> &gst_app::AppSrc {
252 &self.consumer
253 }
254
255 pub fn stream_producer(&self) -> Option<&StreamProducer> {
257 self.producer.as_ref()
258 }
259
260 pub fn settings(&self) -> ConsumerSettings {
262 self.settings.clone()
263 }
264}
265
266impl Drop for ConsumptionLink {
267 fn drop(&mut self) {
268 self.disconnect();
269 }
270}
271
272#[derive(Debug, Clone, Eq, PartialEq, Hash)]
285pub struct ConsumerSettings {
286 pub max_buffer: u64,
287 pub max_bytes: gst::format::Bytes,
288 pub max_time: gst::ClockTime,
289 pub event_types: Vec<gst::EventType>,
290}
291
292impl Default for ConsumerSettings {
293 fn default() -> Self {
294 ConsumerSettings {
295 max_buffer: DEFAULT_CONSUMER_MAX_BUFFERS,
296 max_bytes: DEFAULT_CONSUMER_MAX_BYTES,
297 max_time: DEFAULT_CONSUMER_MAX_TIME,
298 event_types: Vec::new(),
299 }
300 }
301}
302
303#[derive(Debug, Error)]
304pub enum AddConsumerError {
306 #[error("Consumer already added")]
307 AlreadyAdded,
309}
310
311impl StreamProducer {
312 pub fn configure_consumer(consumer: &gst_app::AppSrc) {
325 Self::configure_consumer_with(consumer, ConsumerSettings::default());
326 }
327
328 pub fn configure_consumer_with(consumer: &gst_app::AppSrc, settings: ConsumerSettings) {
335 consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
338 consumer.set_format(gst::Format::Time);
339 consumer.set_is_live(true);
340 consumer.set_handle_segment_change(true);
341 consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
342 consumer.set_automatic_eos(false);
343
344 consumer.set_max_buffers(settings.max_buffer);
345 consumer.set_max_bytes(settings.max_bytes.into());
346 consumer.set_max_time(settings.max_time);
347 }
348
349 pub fn add_consumer(
362 &self,
363 consumer: &gst_app::AppSrc,
364 ) -> Result<ConsumptionLink, AddConsumerError> {
365 let dropped = Arc::new(WrappedAtomicU64::new(0));
366 let pushed = Arc::new(WrappedAtomicU64::new(0));
367 let discard = Arc::new(atomic::AtomicBool::new(false));
368 let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
369
370 self.add_consumer_internal(
371 consumer,
372 ConsumerSettings::default(),
373 dropped.clone(),
374 pushed.clone(),
375 discard.clone(),
376 wait_for_keyframe.clone(),
377 )?;
378
379 Ok(ConsumptionLink {
380 consumer: consumer.clone(),
381 settings: ConsumerSettings::default(),
382 producer: Some(self.clone()),
383 dropped,
384 pushed,
385 discard,
386 wait_for_keyframe,
387 })
388 }
389
390 pub fn add_consumer_with(
399 &self,
400 consumer: &gst_app::AppSrc,
401 settings: ConsumerSettings,
402 ) -> Result<ConsumptionLink, AddConsumerError> {
403 let dropped = Arc::new(WrappedAtomicU64::new(0));
404 let pushed = Arc::new(WrappedAtomicU64::new(0));
405 let discard = Arc::new(atomic::AtomicBool::new(false));
406 let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
407
408 self.add_consumer_internal(
409 consumer,
410 settings.clone(),
411 dropped.clone(),
412 pushed.clone(),
413 discard.clone(),
414 wait_for_keyframe.clone(),
415 )?;
416
417 Ok(ConsumptionLink {
418 consumer: consumer.clone(),
419 settings,
420 producer: Some(self.clone()),
421 dropped,
422 pushed,
423 discard,
424 wait_for_keyframe,
425 })
426 }
427
428 fn add_consumer_internal(
429 &self,
430 consumer: &gst_app::AppSrc,
431 settings: ConsumerSettings,
432 dropped: Arc<WrappedAtomicU64>,
433 pushed: Arc<WrappedAtomicU64>,
434 discard: Arc<atomic::AtomicBool>,
435 wait_for_keyframe: Arc<atomic::AtomicBool>,
436 ) -> Result<(), AddConsumerError> {
437 let mut consumers = self.0.consumers.lock().unwrap();
438 if consumers.consumers.contains_key(consumer) {
439 gst::error!(
440 CAT,
441 obj = &self.0.appsink,
442 "Consumer {} ({:?}) already added",
443 consumer.name(),
444 consumer
445 );
446 return Err(AddConsumerError::AlreadyAdded);
447 }
448
449 gst::debug!(
450 CAT,
451 obj = &self.0.appsink,
452 "Adding consumer {} ({:?})",
453 consumer.name(),
454 consumer
455 );
456
457 let settings_clone = settings.clone();
458 Self::configure_consumer_with(consumer, settings);
459
460 let srcpad = consumer.static_pad("src").unwrap();
462 let fku_probe_id = srcpad
463 .add_probe(
464 gst::PadProbeType::EVENT_UPSTREAM,
465 glib::clone!(
466 #[weak(rename_to = appsink)]
467 self.0.appsink,
468 #[upgrade_or_panic]
469 move |_pad, info| {
470 let Some(event) = info.event() else {
471 return gst::PadProbeReturn::Ok;
472 };
473
474 if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok()
475 || settings_clone.event_types.contains(&event.type_())
476 {
477 if gst_video::ForceKeyUnitEvent::is(event) {
478 gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
479 } else {
480 gst::debug!(
481 CAT,
482 obj = &appsink,
483 "pushing upstream event {:?}",
484 event
485 );
486 }
487 let pad = appsink.static_pad("sink").unwrap();
489 let _ = pad.push_event(event.clone());
490 }
491
492 gst::PadProbeReturn::Ok
493 }
494 ),
495 )
496 .unwrap();
497
498 let stream_consumer = StreamConsumer::new(
499 consumer,
500 fku_probe_id,
501 dropped,
502 pushed,
503 discard,
504 wait_for_keyframe,
505 );
506
507 consumers
508 .consumers
509 .insert(consumer.clone(), stream_consumer);
510
511 let events_to_forward = consumers.events_to_forward.clone();
514 drop(consumers);
516
517 let appsink_pad = self.0.appsink.static_pad("sink").unwrap();
518 appsink_pad.sticky_events_foreach(|event| {
519 if events_to_forward.contains(&event.type_()) {
520 gst::debug!(
521 CAT,
522 obj = &self.0.appsink,
523 "forward sticky event {:?}",
524 event
525 );
526 consumer.send_event(event.clone());
527 }
528
529 std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
530 });
531
532 Ok(())
533 }
534
535 fn process_sample(
536 sample: gst::Sample,
537 appsink: &gst_app::AppSink,
538 mut consumers: MutexGuard<StreamConsumers>,
539 ) -> Result<gst::FlowSuccess, gst::FlowError> {
540 let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
541 let flags = buf.flags();
542
543 (
544 flags.contains(gst::BufferFlags::DISCONT),
545 !flags.contains(gst::BufferFlags::DELTA_UNIT),
546 )
547 } else {
548 (false, true)
549 };
550
551 gst::trace!(
552 CAT,
553 obj = appsink,
554 "processing sample {:?}",
555 sample.buffer()
556 );
557
558 let latency = consumers.current_latency;
559 let latency_updated = mem::replace(&mut consumers.latency_updated, false);
560
561 let mut needs_keyframe_request = false;
562
563 let current_consumers = consumers
564 .consumers
565 .values()
566 .filter_map(|consumer| {
567 if let Some(latency) = latency {
568 if consumer
569 .forwarded_latency
570 .compare_exchange(
571 false,
572 true,
573 atomic::Ordering::SeqCst,
574 atomic::Ordering::SeqCst,
575 )
576 .is_ok()
577 || latency_updated
578 {
579 gst::info!(CAT, obj = appsink, "setting new latency: {latency}");
580 consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
581 }
582 }
583
584 if consumer.discard.load(atomic::Ordering::SeqCst) {
585 consumer
586 .needs_keyframe
587 .store(true, atomic::Ordering::SeqCst);
588 return None;
589 }
590
591 if is_discont
592 && !is_keyframe
593 && consumer.wait_for_keyframe.load(atomic::Ordering::SeqCst)
594 {
595 consumer
597 .needs_keyframe
598 .store(true, atomic::Ordering::SeqCst);
599 }
600
601 if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
602 if !needs_keyframe_request {
604 gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
605 needs_keyframe_request = true;
606 }
607
608 consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
609
610 gst::error!(
611 CAT,
612 obj = appsink,
613 "Ignoring frame for {} while waiting for a keyframe",
614 consumer.appsrc.name()
615 );
616 None
617 } else {
618 consumer
619 .needs_keyframe
620 .store(false, atomic::Ordering::SeqCst);
621 consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
622
623 Some(consumer.appsrc.clone())
624 }
625 })
626 .collect::<Vec<_>>();
627
628 drop(consumers);
629
630 if needs_keyframe_request {
631 let pad = appsink.static_pad("sink").unwrap();
633 pad.push_event(
634 gst_video::UpstreamForceKeyUnitEvent::builder()
635 .all_headers(true)
636 .build(),
637 );
638 }
639
640 for consumer in current_consumers {
641 if let Err(err) = consumer.push_sample(&sample) {
642 gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
643 }
644 }
645 Ok(gst::FlowSuccess::Ok)
646 }
647
648 pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
650 let name = consumer.name();
651 if self
652 .0
653 .consumers
654 .lock()
655 .unwrap()
656 .consumers
657 .remove(consumer)
658 .is_some()
659 {
660 gst::debug!(
661 CAT,
662 obj = &self.0.appsink,
663 "Removed consumer {} ({:?})",
664 name,
665 consumer
666 );
667 consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
668 } else {
669 gst::debug!(
670 CAT,
671 obj = &self.0.appsink,
672 "Consumer {} ({:?}) not found",
673 name,
674 consumer
675 );
676 }
677 }
678
679 pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
681 self.0.consumers.lock().unwrap().events_to_forward =
682 events_to_forward.into_iter().collect();
683 }
684
685 pub fn get_forwarded_events(&self) -> Vec<gst::EventType> {
687 self.0.consumers.lock().unwrap().events_to_forward.clone()
688 }
689
690 pub fn set_forward_preroll(&self, forward_preroll: bool) {
692 self.0.consumers.lock().unwrap().forward_preroll = forward_preroll;
693 }
694
695 pub fn appsink(&self) -> &gst_app::AppSink {
697 &self.0.appsink
698 }
699
700 pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
702 let consumers = self.0.consumers.lock().unwrap();
703
704 for consumer in consumers.consumers.keys() {
705 let mut msg_builder =
706 gst::message::Error::builder_from_error(error.clone()).src(consumer);
707 if let Some(debug) = debug {
708 msg_builder = msg_builder.debug(debug);
709 }
710
711 let _ = consumer.post_message(msg_builder.build());
712 }
713 }
714
715 pub fn last_sample(&self) -> Option<gst::Sample> {
717 self.0.appsink.property("last-sample")
718 }
719}
720
721impl StreamProducer {
722 pub fn from(appsink: &gst_app::AppSink) -> Self {
731 Self::with(appsink, ProducerSettings::default())
732 }
733
734 pub fn with(appsink: &gst_app::AppSink, settings: ProducerSettings) -> Self {
741 let consumers = Arc::new(Mutex::new(StreamConsumers {
742 current_latency: None,
743 latency_updated: false,
744 consumers: HashMap::new(),
745 events_to_forward: vec![gst::EventType::Eos, gst::EventType::Gap],
748 forward_preroll: true,
749 just_forwarded_preroll: false,
750 }));
751
752 appsink.set_sync(settings.sync);
753
754 appsink.set_callbacks(
755 gst_app::AppSinkCallbacks::builder()
756 .new_sample(glib::clone!(
757 #[strong]
758 consumers,
759 move |appsink| {
760 let mut consumers = consumers.lock().unwrap();
761
762 let sample = match appsink.pull_sample() {
763 Ok(sample) => sample,
764 Err(_err) => {
765 gst::debug!(CAT, obj = appsink, "Failed to pull sample");
766 return Err(gst::FlowError::Flushing);
767 }
768 };
769
770 let just_forwarded_preroll =
771 mem::replace(&mut consumers.just_forwarded_preroll, false);
772
773 if just_forwarded_preroll {
774 return Ok(gst::FlowSuccess::Ok);
775 }
776
777 StreamProducer::process_sample(sample, appsink, consumers)
778 }
779 ))
780 .new_preroll(glib::clone!(
781 #[strong]
782 consumers,
783 move |appsink| {
784 let mut consumers = consumers.lock().unwrap();
785
786 let sample = match appsink.pull_preroll() {
787 Ok(sample) => sample,
788 Err(_err) => {
789 gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
790 return Err(gst::FlowError::Flushing);
791 }
792 };
793
794 if consumers.forward_preroll {
795 consumers.just_forwarded_preroll = true;
796
797 StreamProducer::process_sample(sample, appsink, consumers)
798 } else {
799 Ok(gst::FlowSuccess::Ok)
800 }
801 }
802 ))
803 .new_event(glib::clone!(
804 #[strong]
805 consumers,
806 move |appsink| {
807 match appsink
808 .pull_object()
809 .map(|obj| obj.downcast::<gst::Event>())
810 {
811 Ok(Ok(event)) => {
812 let (events_to_forward, appsrcs) = {
813 let consumers = consumers.lock().unwrap();
815 let events = consumers.events_to_forward.clone();
816 let appsrcs =
817 consumers.consumers.keys().cloned().collect::<Vec<_>>();
818
819 (events, appsrcs)
820 };
821
822 if events_to_forward.contains(&event.type_()) {
823 for appsrc in appsrcs {
824 appsrc.send_event(event.clone());
825 }
826 }
827 }
828 Ok(Err(_)) => {} Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
830 }
831
832 false
833 }
834 ))
835 .eos(glib::clone!(
836 #[strong]
837 consumers,
838 move |appsink| {
839 let stream_consumers = consumers.lock().unwrap();
840
841 if stream_consumers
842 .events_to_forward
843 .contains(&gst::EventType::Eos)
844 {
845 let current_consumers = stream_consumers
846 .consumers
847 .values()
848 .map(|c| c.appsrc.clone())
849 .collect::<Vec<_>>();
850 drop(stream_consumers);
851
852 for consumer in current_consumers {
853 gst::debug!(
854 CAT,
855 obj = appsink,
856 "set EOS on consumer {}",
857 consumer.name()
858 );
859 let _ = consumer.end_of_stream();
860 }
861 } else {
862 gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
863 }
864 }
865 ))
866 .build(),
867 );
868
869 let sinkpad = appsink.static_pad("sink").unwrap();
870 let appsink_probe_id = if settings.sync {
871 sinkpad
877 .add_probe(
878 gst::PadProbeType::EVENT_UPSTREAM,
879 glib::clone!(
880 #[strong]
881 consumers,
882 move |_pad, info| {
883 let Some(event) = info.event() else {
884 unreachable!();
885 };
886 let gst::EventView::Latency(event) = event.view() else {
887 return gst::PadProbeReturn::Ok;
888 };
889
890 let mut consumers = consumers.lock().unwrap();
891 consumers.current_latency = Some(event.latency());
892 consumers.latency_updated = true;
893
894 gst::PadProbeReturn::Ok
895 }
896 ),
897 )
898 .unwrap()
899 } else {
900 sinkpad
913 .add_probe(
914 gst::PadProbeType::QUERY_UPSTREAM | gst::PadProbeType::PULL,
915 glib::clone!(
916 #[strong]
917 consumers,
918 move |_pad, info| {
919 let Some(query) = info.query() else {
920 unreachable!();
921 };
922 let gst::QueryView::Latency(query) = query.view() else {
923 return gst::PadProbeReturn::Ok;
924 };
925
926 let mut consumers = consumers.lock().unwrap();
927 consumers.current_latency = Some(query.result().1);
928 consumers.latency_updated = true;
929
930 gst::PadProbeReturn::Ok
931 }
932 ),
933 )
934 .unwrap()
935 };
936
937 StreamProducer(Arc::new(StreamProducerInner {
938 appsink: appsink.clone(),
939 appsink_probe_id: Some(appsink_probe_id),
940 consumers,
941 }))
942 }
943}
944
945#[derive(Debug)]
948struct StreamConsumers {
949 current_latency: Option<gst::ClockTime>,
951 latency_updated: bool,
953 consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
955 events_to_forward: Vec<gst::EventType>,
957 forward_preroll: bool,
959 just_forwarded_preroll: bool,
963}
964
965#[derive(Debug)]
967struct StreamConsumer {
968 appsrc: gst_app::AppSrc,
970 fku_probe_id: Option<gst::PadProbeId>,
972 forwarded_latency: atomic::AtomicBool,
974 needs_keyframe: Arc<atomic::AtomicBool>,
978 dropped: Arc<WrappedAtomicU64>,
980 pushed: Arc<WrappedAtomicU64>,
982 discard: Arc<atomic::AtomicBool>,
984 wait_for_keyframe: Arc<atomic::AtomicBool>,
986}
987
988impl StreamConsumer {
989 fn new(
991 appsrc: &gst_app::AppSrc,
992 fku_probe_id: gst::PadProbeId,
993 dropped: Arc<WrappedAtomicU64>,
994 pushed: Arc<WrappedAtomicU64>,
995 discard: Arc<atomic::AtomicBool>,
996 wait_for_keyframe: Arc<atomic::AtomicBool>,
997 ) -> Self {
998 let needs_keyframe = Arc::new(atomic::AtomicBool::new(
999 wait_for_keyframe.load(atomic::Ordering::SeqCst),
1000 ));
1001 let needs_keyframe_clone = needs_keyframe.clone();
1002 let wait_for_keyframe_clone = wait_for_keyframe.clone();
1003 let dropped_clone = dropped.clone();
1004
1005 appsrc.set_callbacks(
1006 gst_app::AppSrcCallbacks::builder()
1007 .enough_data(move |appsrc| {
1008 gst::debug!(
1009 CAT,
1010 obj = appsrc,
1011 "consumer {} ({appsrc:?}) is not consuming fast enough, old samples are getting dropped",
1012 appsrc.name(),
1013 );
1014
1015 needs_keyframe_clone.store(wait_for_keyframe_clone.load(atomic::Ordering::SeqCst), atomic::Ordering::SeqCst);
1016 dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
1017
1018 let _ = appsrc.post_message(gst::message::Element::builder(
1019 gst::Structure::new_empty("dropped-buffer")).src(appsrc).build()
1020 );
1021 })
1022 .build(),
1023 );
1024
1025 StreamConsumer {
1026 appsrc: appsrc.clone(),
1027 fku_probe_id: Some(fku_probe_id),
1028 forwarded_latency: atomic::AtomicBool::new(false),
1029 needs_keyframe,
1030 dropped,
1031 pushed,
1032 discard,
1033 wait_for_keyframe,
1034 }
1035 }
1036}
1037
1038impl Drop for StreamConsumer {
1039 fn drop(&mut self) {
1040 if let Some(fku_probe_id) = self.fku_probe_id.take() {
1041 let srcpad = self.appsrc.static_pad("src").unwrap();
1042 srcpad.remove_probe(fku_probe_id);
1043 }
1044 }
1045}
1046
1047impl PartialEq for StreamConsumer {
1048 fn eq(&self, other: &Self) -> bool {
1049 self.appsrc.eq(&other.appsrc)
1050 }
1051}
1052
1053impl Eq for StreamConsumer {}
1054
1055impl std::hash::Hash for StreamConsumer {
1056 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1057 std::hash::Hash::hash(&self.appsrc, state);
1058 }
1059}
1060
1061impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
1062 #[inline]
1063 fn borrow(&self) -> &gst_app::AppSrc {
1064 &self.appsrc
1065 }
1066}
1067
1068#[cfg(test)]
1069mod tests {
1070 use std::{
1071 str::FromStr,
1072 sync::{Arc, Mutex},
1073 };
1074
1075 use futures::{
1076 channel::{mpsc, mpsc::Receiver},
1077 SinkExt, StreamExt,
1078 };
1079 use gst::prelude::*;
1080
1081 use crate::{streamproducer::ConsumerSettings, ConsumptionLink, StreamProducer};
1082
1083 fn create_producer() -> (
1084 gst::Pipeline,
1085 gst_app::AppSrc,
1086 gst_app::AppSink,
1087 StreamProducer,
1088 ) {
1089 let producer_pipe =
1090 gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
1091 .unwrap()
1092 .downcast::<gst::Pipeline>()
1093 .unwrap();
1094 let producer_sink = producer_pipe
1095 .by_name("producer_sink")
1096 .unwrap()
1097 .downcast::<gst_app::AppSink>()
1098 .unwrap();
1099
1100 (
1101 producer_pipe.clone(),
1102 producer_pipe
1103 .by_name("producer_src")
1104 .unwrap()
1105 .downcast::<gst_app::AppSrc>()
1106 .unwrap(),
1107 producer_sink.clone(),
1108 StreamProducer::from(&producer_sink),
1109 )
1110 }
1111
1112 struct Consumer {
1113 pipeline: gst::Pipeline,
1114 src: gst_app::AppSrc,
1115 sink: gst_app::AppSink,
1116 receiver: Mutex<Receiver<gst::Sample>>,
1117 connected: Mutex<bool>,
1118 }
1119
1120 impl Consumer {
1121 fn new(id: &str) -> Self {
1122 let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
1123 .unwrap()
1124 .downcast::<gst::Pipeline>()
1125 .unwrap();
1126
1127 let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
1128 let sender = Arc::new(Mutex::new(sender));
1129 let sink = pipeline
1130 .by_name("sink")
1131 .unwrap()
1132 .downcast::<gst_app::AppSink>()
1133 .unwrap();
1134
1135 sink.set_callbacks(
1136 gst_app::AppSinkCallbacks::builder()
1137 .new_sample(move |appsink| {
1139 let sender_clone = sender.clone();
1141 futures::executor::block_on(
1142 sender_clone
1143 .lock()
1144 .unwrap()
1145 .send(appsink.pull_sample().unwrap()),
1146 )
1147 .unwrap();
1148
1149 Ok(gst::FlowSuccess::Ok)
1150 })
1151 .build(),
1152 );
1153
1154 Self {
1155 pipeline: pipeline.clone(),
1156 src: pipeline
1157 .by_name(id)
1158 .unwrap()
1159 .downcast::<gst_app::AppSrc>()
1160 .unwrap(),
1161 sink,
1162 receiver: Mutex::new(receiver),
1163 connected: Mutex::new(false),
1164 }
1165 }
1166
1167 fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
1168 {
1169 let mut connected = self.connected.lock().unwrap();
1170 *connected = true;
1171 }
1172
1173 producer.add_consumer(&self.src).unwrap()
1174 }
1175
1176 fn disconnect(&self, producer: &StreamProducer) {
1177 {
1178 let mut connected = self.connected.lock().unwrap();
1179 *connected = false;
1180 }
1181
1182 producer.remove_consumer(&self.src);
1183 }
1184 }
1185
1186 #[test]
1187 fn simple() {
1188 gst::init().unwrap();
1189
1190 let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
1191 producer_pipe
1192 .set_state(gst::State::Playing)
1193 .expect("Couldn't set producer pipeline state");
1194
1195 let mut consumers: Vec<Consumer> = Vec::new();
1196 let consumer = Consumer::new("consumer1");
1197 let link1 = consumer.connect(&producer);
1198 consumer
1199 .pipeline
1200 .set_state(gst::State::Playing)
1201 .expect("Couldn't set producer pipeline state");
1202 consumers.push(consumer);
1203
1204 let consumer = Consumer::new("consumer2");
1205 let link2 = consumer.connect(&producer);
1206 consumer
1207 .pipeline
1208 .set_state(gst::State::Playing)
1209 .expect("Couldn't set producer pipeline state");
1210 consumers.push(consumer);
1211
1212 assert!(producer.last_sample().is_none());
1213
1214 for i in 0..10 {
1215 let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
1216 producer_src.set_caps(Some(&caps));
1217 producer_src.push_buffer(gst::Buffer::new()).unwrap();
1218
1219 for consumer in &consumers {
1220 if *consumer.connected.lock().unwrap() {
1221 let sample =
1222 futures::executor::block_on(consumer.receiver.lock().unwrap().next())
1223 .expect("Received an empty buffer?");
1224 sample.buffer().expect("No buffer on the sample?");
1225 assert_eq!(sample.caps(), Some(caps.as_ref()));
1226 } else {
1227 debug_assert!(
1228 consumer
1229 .sink
1230 .try_pull_sample(gst::ClockTime::from_nseconds(0))
1231 .is_none(),
1232 "Disconnected consumer got a new sample?!"
1233 );
1234 }
1235 }
1236
1237 if i == 5 {
1238 consumers.first().unwrap().disconnect(&producer);
1239 }
1240 }
1241
1242 assert!(producer.last_sample().is_some());
1243
1244 assert_eq!(link1.pushed(), 6);
1245 assert_eq!(link1.dropped(), 0);
1246 assert_eq!(link2.pushed(), 10);
1247 assert_eq!(link2.dropped(), 0);
1248 }
1249
1250 fn check_consumer_commons(consumer: &gst_app::AppSrc) {
1251 assert_eq!(
1252 consumer.latency(),
1253 (Some(gst::ClockTime::ZERO), gst::ClockTime::NONE)
1254 );
1255 assert_eq!(consumer.format(), gst::Format::Time);
1256 assert!(consumer.is_live());
1257 assert!(consumer.is_handle_segment_change());
1258 assert_eq!(consumer.leaky_type(), gst_app::AppLeakyType::Downstream);
1259 assert!(!consumer.property::<bool>("automatic-eos"));
1260 }
1261
1262 #[test]
1263 fn configure_consumer_defaults() {
1264 gst::init().unwrap();
1265
1266 let consumer = gst_app::AppSrc::builder().build();
1267 StreamProducer::configure_consumer(&consumer);
1268 check_consumer_commons(&consumer);
1269
1270 assert_eq!(consumer.max_buffers(), 0);
1271 assert_eq!(consumer.max_bytes(), 0);
1272 assert_eq!(consumer.max_time(), 500.mseconds());
1273 }
1274
1275 #[test]
1276 fn configure_consumer_with_defaults() {
1277 gst::init().unwrap();
1278
1279 let consumer = gst_app::AppSrc::builder().build();
1280 StreamProducer::configure_consumer_with(&consumer, ConsumerSettings::default());
1281 check_consumer_commons(&consumer);
1282
1283 assert_eq!(consumer.max_buffers(), 0);
1284 assert_eq!(consumer.max_bytes(), 0);
1285 assert_eq!(consumer.max_time(), 500.mseconds());
1286 }
1287
1288 #[test]
1289 fn configure_consumer_with_specifics() {
1290 gst::init().unwrap();
1291
1292 let consumer = gst_app::AppSrc::builder().build();
1293
1294 StreamProducer::configure_consumer_with(
1295 &consumer,
1296 ConsumerSettings {
1297 max_buffer: 50,
1298 ..Default::default()
1299 },
1300 );
1301 check_consumer_commons(&consumer);
1302
1303 assert_eq!(consumer.max_buffers(), 50);
1304 assert_eq!(consumer.max_bytes(), 0);
1305 assert_eq!(consumer.max_time(), 500.mseconds());
1306
1307 StreamProducer::configure_consumer_with(
1308 &consumer,
1309 ConsumerSettings {
1310 max_buffer: 10,
1311 max_bytes: 2.mebibytes(),
1312 ..Default::default()
1313 },
1314 );
1315 check_consumer_commons(&consumer);
1316
1317 assert_eq!(consumer.max_buffers(), 10);
1318 assert_eq!(consumer.max_bytes(), 2 * 1024 * 1024);
1319 assert_eq!(consumer.max_time(), 500.mseconds());
1320
1321 StreamProducer::configure_consumer_with(
1322 &consumer,
1323 ConsumerSettings {
1324 max_time: gst::ClockTime::ZERO,
1325 ..Default::default()
1326 },
1327 );
1328 check_consumer_commons(&consumer);
1329
1330 assert_eq!(consumer.max_buffers(), 0);
1331 assert_eq!(consumer.max_bytes(), 0);
1332 assert!(consumer.max_time().is_zero());
1333
1334 StreamProducer::configure_consumer_with(
1335 &consumer,
1336 ConsumerSettings {
1337 max_time: 750.mseconds(),
1338 ..Default::default()
1339 },
1340 );
1341 check_consumer_commons(&consumer);
1342
1343 assert_eq!(consumer.max_buffers(), 0);
1344 assert_eq!(consumer.max_bytes(), 0);
1345 assert_eq!(consumer.max_time(), 750.mseconds());
1346 }
1347}