1use std::{
2 collections::HashMap,
3 mem,
4 sync::{atomic, Arc, Mutex, MutexGuard},
5};
6
7use gst::{glib, prelude::*};
8use std::sync::LazyLock;
9use thiserror::Error;
10
11pub const DEFAULT_PRODUCER_SYNC: bool = true;
12
13pub const DEFAULT_CONSUMER_MAX_BUFFERS: u64 = 0;
14pub const DEFAULT_CONSUMER_MAX_BYTES: gst::format::Bytes = gst::format::Bytes::ZERO;
15pub const DEFAULT_CONSUMER_MAX_TIME: gst::ClockTime = gst::ClockTime::from_mseconds(500);
16
17#[derive(Debug)]
21struct WrappedAtomicU64 {
22 #[cfg(not(target_has_atomic = "64"))]
23 atomic: Mutex<u64>,
24 #[cfg(target_has_atomic = "64")]
25 atomic: atomic::AtomicU64,
26}
27
28#[cfg(target_has_atomic = "64")]
29impl WrappedAtomicU64 {
30 fn new(value: u64) -> WrappedAtomicU64 {
31 WrappedAtomicU64 {
32 atomic: atomic::AtomicU64::new(value),
33 }
34 }
35 fn fetch_add(&self, value: u64, order: atomic::Ordering) -> u64 {
36 self.atomic.fetch_add(value, order)
37 }
38 fn store(&self, value: u64, order: atomic::Ordering) {
39 self.atomic.store(value, order);
40 }
41
42 fn load(&self, order: atomic::Ordering) -> u64 {
43 self.atomic.load(order)
44 }
45}
46
47#[cfg(not(target_has_atomic = "64"))]
48impl WrappedAtomicU64 {
49 fn new(value: u64) -> WrappedAtomicU64 {
50 WrappedAtomicU64 {
51 atomic: Mutex::new(value),
52 }
53 }
54 fn fetch_add(&self, value: u64, _order: atomic::Ordering) -> u64 {
55 let mut guard = self.atomic.lock().unwrap();
56 let old = *guard;
57 *guard += value;
58 old
59 }
60 fn store(&self, value: u64, _order: atomic::Ordering) {
61 *self.atomic.lock().unwrap() = value;
62 }
63 fn load(&self, _order: atomic::Ordering) -> u64 {
64 *self.atomic.lock().unwrap()
65 }
66}
67
68static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
69 gst::DebugCategory::new(
70 "utilsrs-stream-producer",
71 gst::DebugColorFlags::empty(),
72 Some("gst_app Stream Producer interface"),
73 )
74});
75
76#[derive(Debug, Clone)]
82pub struct StreamProducer(Arc<StreamProducerInner>);
83
84impl PartialEq for StreamProducer {
85 fn eq(&self, other: &Self) -> bool {
86 self.0.appsink.eq(&other.0.appsink)
87 }
88}
89
90impl Eq for StreamProducer {}
91
92#[derive(Debug)]
93struct StreamProducerInner {
94 appsink: gst_app::AppSink,
96 appsink_probe_id: Option<gst::PadProbeId>,
98 consumers: Arc<Mutex<StreamConsumers>>,
100}
101
102impl Drop for StreamProducerInner {
103 fn drop(&mut self) {
104 if let Some(probe_id) = self.appsink_probe_id.take() {
105 let pad = self.appsink.static_pad("sink").unwrap();
106 pad.remove_probe(probe_id);
107 }
108
109 self.appsink
110 .set_callbacks(gst_app::AppSinkCallbacks::builder().build());
111 }
112}
113
114#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
122pub struct ProducerSettings {
123 pub sync: bool,
124}
125
126impl Default for ProducerSettings {
127 fn default() -> Self {
128 ProducerSettings {
129 sync: DEFAULT_PRODUCER_SYNC,
130 }
131 }
132}
133
134#[derive(Debug)]
137#[must_use]
138pub struct ConsumptionLink {
139 consumer: gst_app::AppSrc,
140 settings: ConsumerSettings,
141 producer: Option<StreamProducer>,
142 dropped: Arc<WrappedAtomicU64>,
144 pushed: Arc<WrappedAtomicU64>,
146 discard: Arc<atomic::AtomicBool>,
148 wait_for_keyframe: Arc<atomic::AtomicBool>,
150}
151
152impl ConsumptionLink {
153 pub fn disconnected(consumer: gst_app::AppSrc) -> ConsumptionLink {
158 StreamProducer::configure_consumer(&consumer);
159
160 ConsumptionLink {
161 consumer,
162 settings: ConsumerSettings::default(),
163 producer: None,
164 dropped: Arc::new(WrappedAtomicU64::new(0)),
165 pushed: Arc::new(WrappedAtomicU64::new(0)),
166 discard: Arc::new(atomic::AtomicBool::new(false)),
167 wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
168 }
169 }
170
171 pub fn disconnected_with(
173 consumer: gst_app::AppSrc,
174 settings: ConsumerSettings,
175 ) -> ConsumptionLink {
176 StreamProducer::configure_consumer(&consumer);
177
178 ConsumptionLink {
179 consumer,
180 settings,
181 producer: None,
182 dropped: Arc::new(WrappedAtomicU64::new(0)),
183 pushed: Arc::new(WrappedAtomicU64::new(0)),
184 discard: Arc::new(atomic::AtomicBool::new(false)),
185 wait_for_keyframe: Arc::new(atomic::AtomicBool::new(true)),
186 }
187 }
188
189 pub fn change_producer(
191 &mut self,
192 new_producer: &StreamProducer,
193 reset_stats: bool,
194 ) -> Result<(), AddConsumerError> {
195 self.disconnect();
196 if reset_stats {
197 self.dropped.store(0, atomic::Ordering::SeqCst);
198 self.pushed.store(0, atomic::Ordering::SeqCst);
199 }
200 new_producer.add_consumer_internal(
201 &self.consumer,
202 self.settings,
203 self.dropped.clone(),
204 self.pushed.clone(),
205 self.discard.clone(),
206 self.wait_for_keyframe.clone(),
207 )?;
208 self.producer = Some(new_producer.clone());
209 Ok(())
210 }
211
212 pub fn disconnect(&mut self) {
214 if let Some(producer) = self.producer.take() {
215 producer.remove_consumer(&self.consumer);
216 }
217 }
218
219 pub fn dropped(&self) -> u64 {
221 self.dropped.load(atomic::Ordering::SeqCst)
222 }
223
224 pub fn pushed(&self) -> u64 {
226 self.pushed.load(atomic::Ordering::SeqCst)
227 }
228
229 pub fn discard(&self) -> bool {
231 self.discard.load(atomic::Ordering::SeqCst)
232 }
233
234 pub fn set_discard(&self, discard: bool) {
236 self.discard.store(discard, atomic::Ordering::SeqCst)
237 }
238
239 pub fn wait_for_keyframe(&self) -> bool {
241 self.wait_for_keyframe.load(atomic::Ordering::SeqCst)
242 }
243
244 pub fn set_wait_for_keyframe(&self, wait: bool) {
247 self.wait_for_keyframe.store(wait, atomic::Ordering::SeqCst)
248 }
249
250 pub fn appsrc(&self) -> &gst_app::AppSrc {
252 &self.consumer
253 }
254
255 pub fn stream_producer(&self) -> Option<&StreamProducer> {
257 self.producer.as_ref()
258 }
259
260 pub fn settings(&self) -> ConsumerSettings {
262 self.settings
263 }
264}
265
266impl Drop for ConsumptionLink {
267 fn drop(&mut self) {
268 self.disconnect();
269 }
270}
271
272#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
282pub struct ConsumerSettings {
283 pub max_buffer: u64,
284 pub max_bytes: gst::format::Bytes,
285 pub max_time: gst::ClockTime,
286}
287
288impl Default for ConsumerSettings {
289 fn default() -> Self {
290 ConsumerSettings {
291 max_buffer: DEFAULT_CONSUMER_MAX_BUFFERS,
292 max_bytes: DEFAULT_CONSUMER_MAX_BYTES,
293 max_time: DEFAULT_CONSUMER_MAX_TIME,
294 }
295 }
296}
297
298#[derive(Debug, Error)]
299pub enum AddConsumerError {
301 #[error("Consumer already added")]
302 AlreadyAdded,
304}
305
306impl StreamProducer {
307 pub fn configure_consumer(consumer: &gst_app::AppSrc) {
320 Self::configure_consumer_with(consumer, ConsumerSettings::default());
321 }
322
323 pub fn configure_consumer_with(consumer: &gst_app::AppSrc, settings: ConsumerSettings) {
330 consumer.set_latency(gst::ClockTime::ZERO, gst::ClockTime::NONE);
333 consumer.set_format(gst::Format::Time);
334 consumer.set_is_live(true);
335 consumer.set_handle_segment_change(true);
336 consumer.set_leaky_type(gst_app::AppLeakyType::Downstream);
337 consumer.set_automatic_eos(false);
338
339 consumer.set_max_buffers(settings.max_buffer);
340 consumer.set_max_bytes(settings.max_bytes.into());
341 consumer.set_max_time(settings.max_time);
342 }
343
344 pub fn add_consumer(
357 &self,
358 consumer: &gst_app::AppSrc,
359 ) -> Result<ConsumptionLink, AddConsumerError> {
360 let dropped = Arc::new(WrappedAtomicU64::new(0));
361 let pushed = Arc::new(WrappedAtomicU64::new(0));
362 let discard = Arc::new(atomic::AtomicBool::new(false));
363 let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
364
365 self.add_consumer_internal(
366 consumer,
367 ConsumerSettings::default(),
368 dropped.clone(),
369 pushed.clone(),
370 discard.clone(),
371 wait_for_keyframe.clone(),
372 )?;
373
374 Ok(ConsumptionLink {
375 consumer: consumer.clone(),
376 settings: ConsumerSettings::default(),
377 producer: Some(self.clone()),
378 dropped,
379 pushed,
380 discard,
381 wait_for_keyframe,
382 })
383 }
384
385 pub fn add_consumer_with(
394 &self,
395 consumer: &gst_app::AppSrc,
396 settings: ConsumerSettings,
397 ) -> Result<ConsumptionLink, AddConsumerError> {
398 let dropped = Arc::new(WrappedAtomicU64::new(0));
399 let pushed = Arc::new(WrappedAtomicU64::new(0));
400 let discard = Arc::new(atomic::AtomicBool::new(false));
401 let wait_for_keyframe = Arc::new(atomic::AtomicBool::new(true));
402
403 self.add_consumer_internal(
404 consumer,
405 settings,
406 dropped.clone(),
407 pushed.clone(),
408 discard.clone(),
409 wait_for_keyframe.clone(),
410 )?;
411
412 Ok(ConsumptionLink {
413 consumer: consumer.clone(),
414 settings,
415 producer: Some(self.clone()),
416 dropped,
417 pushed,
418 discard,
419 wait_for_keyframe,
420 })
421 }
422
423 fn add_consumer_internal(
424 &self,
425 consumer: &gst_app::AppSrc,
426 settings: ConsumerSettings,
427 dropped: Arc<WrappedAtomicU64>,
428 pushed: Arc<WrappedAtomicU64>,
429 discard: Arc<atomic::AtomicBool>,
430 wait_for_keyframe: Arc<atomic::AtomicBool>,
431 ) -> Result<(), AddConsumerError> {
432 let mut consumers = self.0.consumers.lock().unwrap();
433 if consumers.consumers.contains_key(consumer) {
434 gst::error!(
435 CAT,
436 obj = &self.0.appsink,
437 "Consumer {} ({:?}) already added",
438 consumer.name(),
439 consumer
440 );
441 return Err(AddConsumerError::AlreadyAdded);
442 }
443
444 gst::debug!(
445 CAT,
446 obj = &self.0.appsink,
447 "Adding consumer {} ({:?})",
448 consumer.name(),
449 consumer
450 );
451
452 Self::configure_consumer_with(consumer, settings);
453
454 let srcpad = consumer.static_pad("src").unwrap();
456 let fku_probe_id = srcpad
457 .add_probe(
458 gst::PadProbeType::EVENT_UPSTREAM,
459 glib::clone!(
460 #[weak(rename_to = appsink)]
461 self.0.appsink,
462 #[upgrade_or_panic]
463 move |_pad, info| {
464 let Some(event) = info.event() else {
465 return gst::PadProbeReturn::Ok;
466 };
467
468 if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() {
469 gst::debug!(CAT, obj = &appsink, "Requesting keyframe");
470 let pad = appsink.static_pad("sink").unwrap();
472 let _ = pad.push_event(event.clone());
473 }
474
475 gst::PadProbeReturn::Ok
476 }
477 ),
478 )
479 .unwrap();
480
481 let stream_consumer = StreamConsumer::new(
482 consumer,
483 fku_probe_id,
484 dropped,
485 pushed,
486 discard,
487 wait_for_keyframe,
488 );
489
490 consumers
491 .consumers
492 .insert(consumer.clone(), stream_consumer);
493
494 let events_to_forward = consumers.events_to_forward.clone();
497 drop(consumers);
499
500 let appsink_pad = self.0.appsink.static_pad("sink").unwrap();
501 appsink_pad.sticky_events_foreach(|event| {
502 if events_to_forward.contains(&event.type_()) {
503 gst::debug!(
504 CAT,
505 obj = &self.0.appsink,
506 "forward sticky event {:?}",
507 event
508 );
509 consumer.send_event(event.clone());
510 }
511
512 std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
513 });
514
515 Ok(())
516 }
517
518 fn process_sample(
519 sample: gst::Sample,
520 appsink: &gst_app::AppSink,
521 mut consumers: MutexGuard<StreamConsumers>,
522 ) -> Result<gst::FlowSuccess, gst::FlowError> {
523 let (is_discont, is_keyframe) = if let Some(buf) = sample.buffer() {
524 let flags = buf.flags();
525
526 (
527 flags.contains(gst::BufferFlags::DISCONT),
528 !flags.contains(gst::BufferFlags::DELTA_UNIT),
529 )
530 } else {
531 (false, true)
532 };
533
534 gst::trace!(
535 CAT,
536 obj = appsink,
537 "processing sample {:?}",
538 sample.buffer()
539 );
540
541 let latency = consumers.current_latency;
542 let latency_updated = mem::replace(&mut consumers.latency_updated, false);
543
544 let mut needs_keyframe_request = false;
545
546 let current_consumers = consumers
547 .consumers
548 .values()
549 .filter_map(|consumer| {
550 if let Some(latency) = latency {
551 if consumer
552 .forwarded_latency
553 .compare_exchange(
554 false,
555 true,
556 atomic::Ordering::SeqCst,
557 atomic::Ordering::SeqCst,
558 )
559 .is_ok()
560 || latency_updated
561 {
562 gst::info!(CAT, obj = appsink, "setting new latency: {latency}");
563 consumer.appsrc.set_latency(latency, gst::ClockTime::NONE);
564 }
565 }
566
567 if consumer.discard.load(atomic::Ordering::SeqCst) {
568 consumer
569 .needs_keyframe
570 .store(true, atomic::Ordering::SeqCst);
571 return None;
572 }
573
574 if is_discont
575 && !is_keyframe
576 && consumer.wait_for_keyframe.load(atomic::Ordering::SeqCst)
577 {
578 consumer
580 .needs_keyframe
581 .store(true, atomic::Ordering::SeqCst);
582 }
583
584 if !is_keyframe && consumer.needs_keyframe.load(atomic::Ordering::SeqCst) {
585 if !needs_keyframe_request {
587 gst::debug!(CAT, obj = appsink, "Requesting keyframe for first buffer");
588 needs_keyframe_request = true;
589 }
590
591 consumer.dropped.fetch_add(1, atomic::Ordering::SeqCst);
592
593 gst::error!(
594 CAT,
595 obj = appsink,
596 "Ignoring frame for {} while waiting for a keyframe",
597 consumer.appsrc.name()
598 );
599 None
600 } else {
601 consumer
602 .needs_keyframe
603 .store(false, atomic::Ordering::SeqCst);
604 consumer.pushed.fetch_add(1, atomic::Ordering::SeqCst);
605
606 Some(consumer.appsrc.clone())
607 }
608 })
609 .collect::<Vec<_>>();
610
611 drop(consumers);
612
613 if needs_keyframe_request {
614 let pad = appsink.static_pad("sink").unwrap();
616 pad.push_event(
617 gst_video::UpstreamForceKeyUnitEvent::builder()
618 .all_headers(true)
619 .build(),
620 );
621 }
622
623 for consumer in current_consumers {
624 if let Err(err) = consumer.push_sample(&sample) {
625 gst::warning!(CAT, obj = appsink, "Failed to push sample: {}", err);
626 }
627 }
628 Ok(gst::FlowSuccess::Ok)
629 }
630
631 pub fn remove_consumer(&self, consumer: &gst_app::AppSrc) {
633 let name = consumer.name();
634 if self
635 .0
636 .consumers
637 .lock()
638 .unwrap()
639 .consumers
640 .remove(consumer)
641 .is_some()
642 {
643 gst::debug!(
644 CAT,
645 obj = &self.0.appsink,
646 "Removed consumer {} ({:?})",
647 name,
648 consumer
649 );
650 consumer.set_callbacks(gst_app::AppSrcCallbacks::builder().build());
651 } else {
652 gst::debug!(
653 CAT,
654 obj = &self.0.appsink,
655 "Consumer {} ({:?}) not found",
656 name,
657 consumer
658 );
659 }
660 }
661
662 pub fn set_forward_events(&self, events_to_forward: impl IntoIterator<Item = gst::EventType>) {
664 self.0.consumers.lock().unwrap().events_to_forward =
665 events_to_forward.into_iter().collect();
666 }
667
668 pub fn get_forwarded_events(&self) -> Vec<gst::EventType> {
670 self.0.consumers.lock().unwrap().events_to_forward.clone()
671 }
672
673 pub fn set_forward_preroll(&self, forward_preroll: bool) {
675 self.0.consumers.lock().unwrap().forward_preroll = forward_preroll;
676 }
677
678 pub fn appsink(&self) -> &gst_app::AppSink {
680 &self.0.appsink
681 }
682
683 pub fn error(&self, error: &gst::glib::Error, debug: Option<&str>) {
685 let consumers = self.0.consumers.lock().unwrap();
686
687 for consumer in consumers.consumers.keys() {
688 let mut msg_builder =
689 gst::message::Error::builder_from_error(error.clone()).src(consumer);
690 if let Some(debug) = debug {
691 msg_builder = msg_builder.debug(debug);
692 }
693
694 let _ = consumer.post_message(msg_builder.build());
695 }
696 }
697
698 pub fn last_sample(&self) -> Option<gst::Sample> {
700 self.0.appsink.property("last-sample")
701 }
702}
703
704impl StreamProducer {
705 pub fn from(appsink: &gst_app::AppSink) -> Self {
714 Self::with(appsink, ProducerSettings::default())
715 }
716
717 pub fn with(appsink: &gst_app::AppSink, settings: ProducerSettings) -> Self {
724 let consumers = Arc::new(Mutex::new(StreamConsumers {
725 current_latency: None,
726 latency_updated: false,
727 consumers: HashMap::new(),
728 events_to_forward: vec![gst::EventType::Eos, gst::EventType::Gap],
731 forward_preroll: true,
732 just_forwarded_preroll: false,
733 }));
734
735 appsink.set_sync(settings.sync);
736
737 appsink.set_callbacks(
738 gst_app::AppSinkCallbacks::builder()
739 .new_sample(glib::clone!(
740 #[strong]
741 consumers,
742 move |appsink| {
743 let mut consumers = consumers.lock().unwrap();
744
745 let sample = match appsink.pull_sample() {
746 Ok(sample) => sample,
747 Err(_err) => {
748 gst::debug!(CAT, obj = appsink, "Failed to pull sample");
749 return Err(gst::FlowError::Flushing);
750 }
751 };
752
753 let just_forwarded_preroll =
754 mem::replace(&mut consumers.just_forwarded_preroll, false);
755
756 if just_forwarded_preroll {
757 return Ok(gst::FlowSuccess::Ok);
758 }
759
760 StreamProducer::process_sample(sample, appsink, consumers)
761 }
762 ))
763 .new_preroll(glib::clone!(
764 #[strong]
765 consumers,
766 move |appsink| {
767 let mut consumers = consumers.lock().unwrap();
768
769 let sample = match appsink.pull_preroll() {
770 Ok(sample) => sample,
771 Err(_err) => {
772 gst::debug!(CAT, obj = appsink, "Failed to pull preroll");
773 return Err(gst::FlowError::Flushing);
774 }
775 };
776
777 if consumers.forward_preroll {
778 consumers.just_forwarded_preroll = true;
779
780 StreamProducer::process_sample(sample, appsink, consumers)
781 } else {
782 Ok(gst::FlowSuccess::Ok)
783 }
784 }
785 ))
786 .new_event(glib::clone!(
787 #[strong]
788 consumers,
789 move |appsink| {
790 match appsink
791 .pull_object()
792 .map(|obj| obj.downcast::<gst::Event>())
793 {
794 Ok(Ok(event)) => {
795 let (events_to_forward, appsrcs) = {
796 let consumers = consumers.lock().unwrap();
798 let events = consumers.events_to_forward.clone();
799 let appsrcs =
800 consumers.consumers.keys().cloned().collect::<Vec<_>>();
801
802 (events, appsrcs)
803 };
804
805 if events_to_forward.contains(&event.type_()) {
806 for appsrc in appsrcs {
807 appsrc.send_event(event.clone());
808 }
809 }
810 }
811 Ok(Err(_)) => {} Err(_err) => gst::warning!(CAT, obj = appsink, "Failed to pull event"),
813 }
814
815 false
816 }
817 ))
818 .eos(glib::clone!(
819 #[strong]
820 consumers,
821 move |appsink| {
822 let stream_consumers = consumers.lock().unwrap();
823
824 if stream_consumers
825 .events_to_forward
826 .contains(&gst::EventType::Eos)
827 {
828 let current_consumers = stream_consumers
829 .consumers
830 .values()
831 .map(|c| c.appsrc.clone())
832 .collect::<Vec<_>>();
833 drop(stream_consumers);
834
835 for consumer in current_consumers {
836 gst::debug!(
837 CAT,
838 obj = appsink,
839 "set EOS on consumer {}",
840 consumer.name()
841 );
842 let _ = consumer.end_of_stream();
843 }
844 } else {
845 gst::debug!(CAT, obj = appsink, "don't forward EOS to consumers");
846 }
847 }
848 ))
849 .build(),
850 );
851
852 let sinkpad = appsink.static_pad("sink").unwrap();
853 let appsink_probe_id = if settings.sync {
854 sinkpad
860 .add_probe(
861 gst::PadProbeType::EVENT_UPSTREAM,
862 glib::clone!(
863 #[strong]
864 consumers,
865 move |_pad, info| {
866 let Some(event) = info.event() else {
867 unreachable!();
868 };
869 let gst::EventView::Latency(event) = event.view() else {
870 return gst::PadProbeReturn::Ok;
871 };
872
873 let mut consumers = consumers.lock().unwrap();
874 consumers.current_latency = Some(event.latency());
875 consumers.latency_updated = true;
876
877 gst::PadProbeReturn::Ok
878 }
879 ),
880 )
881 .unwrap()
882 } else {
883 sinkpad
896 .add_probe(
897 gst::PadProbeType::QUERY_UPSTREAM | gst::PadProbeType::PULL,
898 glib::clone!(
899 #[strong]
900 consumers,
901 move |_pad, info| {
902 let Some(query) = info.query() else {
903 unreachable!();
904 };
905 let gst::QueryView::Latency(query) = query.view() else {
906 return gst::PadProbeReturn::Ok;
907 };
908
909 let mut consumers = consumers.lock().unwrap();
910 consumers.current_latency = Some(query.result().1);
911 consumers.latency_updated = true;
912
913 gst::PadProbeReturn::Ok
914 }
915 ),
916 )
917 .unwrap()
918 };
919
920 StreamProducer(Arc::new(StreamProducerInner {
921 appsink: appsink.clone(),
922 appsink_probe_id: Some(appsink_probe_id),
923 consumers,
924 }))
925 }
926}
927
928#[derive(Debug)]
931struct StreamConsumers {
932 current_latency: Option<gst::ClockTime>,
934 latency_updated: bool,
936 consumers: HashMap<gst_app::AppSrc, StreamConsumer>,
938 events_to_forward: Vec<gst::EventType>,
940 forward_preroll: bool,
942 just_forwarded_preroll: bool,
946}
947
948#[derive(Debug)]
950struct StreamConsumer {
951 appsrc: gst_app::AppSrc,
953 fku_probe_id: Option<gst::PadProbeId>,
955 forwarded_latency: atomic::AtomicBool,
957 needs_keyframe: Arc<atomic::AtomicBool>,
961 dropped: Arc<WrappedAtomicU64>,
963 pushed: Arc<WrappedAtomicU64>,
965 discard: Arc<atomic::AtomicBool>,
967 wait_for_keyframe: Arc<atomic::AtomicBool>,
969}
970
971impl StreamConsumer {
972 fn new(
974 appsrc: &gst_app::AppSrc,
975 fku_probe_id: gst::PadProbeId,
976 dropped: Arc<WrappedAtomicU64>,
977 pushed: Arc<WrappedAtomicU64>,
978 discard: Arc<atomic::AtomicBool>,
979 wait_for_keyframe: Arc<atomic::AtomicBool>,
980 ) -> Self {
981 let needs_keyframe = Arc::new(atomic::AtomicBool::new(
982 wait_for_keyframe.load(atomic::Ordering::SeqCst),
983 ));
984 let needs_keyframe_clone = needs_keyframe.clone();
985 let wait_for_keyframe_clone = wait_for_keyframe.clone();
986 let dropped_clone = dropped.clone();
987
988 appsrc.set_callbacks(
989 gst_app::AppSrcCallbacks::builder()
990 .enough_data(move |appsrc| {
991 gst::debug!(
992 CAT,
993 obj = appsrc,
994 "consumer {} ({appsrc:?}) is not consuming fast enough, old samples are getting dropped",
995 appsrc.name(),
996 );
997
998 needs_keyframe_clone.store(wait_for_keyframe_clone.load(atomic::Ordering::SeqCst), atomic::Ordering::SeqCst);
999 dropped_clone.fetch_add(1, atomic::Ordering::SeqCst);
1000
1001 let _ = appsrc.post_message(gst::message::Element::builder(
1002 gst::Structure::new_empty("dropped-buffer")).src(appsrc).build()
1003 );
1004 })
1005 .build(),
1006 );
1007
1008 StreamConsumer {
1009 appsrc: appsrc.clone(),
1010 fku_probe_id: Some(fku_probe_id),
1011 forwarded_latency: atomic::AtomicBool::new(false),
1012 needs_keyframe,
1013 dropped,
1014 pushed,
1015 discard,
1016 wait_for_keyframe,
1017 }
1018 }
1019}
1020
1021impl Drop for StreamConsumer {
1022 fn drop(&mut self) {
1023 if let Some(fku_probe_id) = self.fku_probe_id.take() {
1024 let srcpad = self.appsrc.static_pad("src").unwrap();
1025 srcpad.remove_probe(fku_probe_id);
1026 }
1027 }
1028}
1029
1030impl PartialEq for StreamConsumer {
1031 fn eq(&self, other: &Self) -> bool {
1032 self.appsrc.eq(&other.appsrc)
1033 }
1034}
1035
1036impl Eq for StreamConsumer {}
1037
1038impl std::hash::Hash for StreamConsumer {
1039 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1040 std::hash::Hash::hash(&self.appsrc, state);
1041 }
1042}
1043
1044impl std::borrow::Borrow<gst_app::AppSrc> for StreamConsumer {
1045 #[inline]
1046 fn borrow(&self) -> &gst_app::AppSrc {
1047 &self.appsrc
1048 }
1049}
1050
1051#[cfg(test)]
1052mod tests {
1053 use std::{
1054 str::FromStr,
1055 sync::{Arc, Mutex},
1056 };
1057
1058 use futures::{
1059 channel::{mpsc, mpsc::Receiver},
1060 SinkExt, StreamExt,
1061 };
1062 use gst::prelude::*;
1063
1064 use crate::{streamproducer::ConsumerSettings, ConsumptionLink, StreamProducer};
1065
1066 fn create_producer() -> (
1067 gst::Pipeline,
1068 gst_app::AppSrc,
1069 gst_app::AppSink,
1070 StreamProducer,
1071 ) {
1072 let producer_pipe =
1073 gst::parse::launch("appsrc name=producer_src ! appsink name=producer_sink")
1074 .unwrap()
1075 .downcast::<gst::Pipeline>()
1076 .unwrap();
1077 let producer_sink = producer_pipe
1078 .by_name("producer_sink")
1079 .unwrap()
1080 .downcast::<gst_app::AppSink>()
1081 .unwrap();
1082
1083 (
1084 producer_pipe.clone(),
1085 producer_pipe
1086 .by_name("producer_src")
1087 .unwrap()
1088 .downcast::<gst_app::AppSrc>()
1089 .unwrap(),
1090 producer_sink.clone(),
1091 StreamProducer::from(&producer_sink),
1092 )
1093 }
1094
1095 struct Consumer {
1096 pipeline: gst::Pipeline,
1097 src: gst_app::AppSrc,
1098 sink: gst_app::AppSink,
1099 receiver: Mutex<Receiver<gst::Sample>>,
1100 connected: Mutex<bool>,
1101 }
1102
1103 impl Consumer {
1104 fn new(id: &str) -> Self {
1105 let pipeline = gst::parse::launch(&format!("appsrc name={id} ! appsink name=sink"))
1106 .unwrap()
1107 .downcast::<gst::Pipeline>()
1108 .unwrap();
1109
1110 let (sender, receiver) = mpsc::channel::<gst::Sample>(1000);
1111 let sender = Arc::new(Mutex::new(sender));
1112 let sink = pipeline
1113 .by_name("sink")
1114 .unwrap()
1115 .downcast::<gst_app::AppSink>()
1116 .unwrap();
1117
1118 sink.set_callbacks(
1119 gst_app::AppSinkCallbacks::builder()
1120 .new_sample(move |appsink| {
1122 let sender_clone = sender.clone();
1124 futures::executor::block_on(
1125 sender_clone
1126 .lock()
1127 .unwrap()
1128 .send(appsink.pull_sample().unwrap()),
1129 )
1130 .unwrap();
1131
1132 Ok(gst::FlowSuccess::Ok)
1133 })
1134 .build(),
1135 );
1136
1137 Self {
1138 pipeline: pipeline.clone(),
1139 src: pipeline
1140 .by_name(id)
1141 .unwrap()
1142 .downcast::<gst_app::AppSrc>()
1143 .unwrap(),
1144 sink,
1145 receiver: Mutex::new(receiver),
1146 connected: Mutex::new(false),
1147 }
1148 }
1149
1150 fn connect(&self, producer: &StreamProducer) -> ConsumptionLink {
1151 {
1152 let mut connected = self.connected.lock().unwrap();
1153 *connected = true;
1154 }
1155
1156 producer.add_consumer(&self.src).unwrap()
1157 }
1158
1159 fn disconnect(&self, producer: &StreamProducer) {
1160 {
1161 let mut connected = self.connected.lock().unwrap();
1162 *connected = false;
1163 }
1164
1165 producer.remove_consumer(&self.src);
1166 }
1167 }
1168
1169 #[test]
1170 fn simple() {
1171 gst::init().unwrap();
1172
1173 let (producer_pipe, producer_src, _producer_sink, producer) = create_producer();
1174 producer_pipe
1175 .set_state(gst::State::Playing)
1176 .expect("Couldn't set producer pipeline state");
1177
1178 let mut consumers: Vec<Consumer> = Vec::new();
1179 let consumer = Consumer::new("consumer1");
1180 let link1 = consumer.connect(&producer);
1181 consumer
1182 .pipeline
1183 .set_state(gst::State::Playing)
1184 .expect("Couldn't set producer pipeline state");
1185 consumers.push(consumer);
1186
1187 let consumer = Consumer::new("consumer2");
1188 let link2 = consumer.connect(&producer);
1189 consumer
1190 .pipeline
1191 .set_state(gst::State::Playing)
1192 .expect("Couldn't set producer pipeline state");
1193 consumers.push(consumer);
1194
1195 assert!(producer.last_sample().is_none());
1196
1197 for i in 0..10 {
1198 let caps = gst::Caps::from_str(&format!("test,n={i}")).unwrap();
1199 producer_src.set_caps(Some(&caps));
1200 producer_src.push_buffer(gst::Buffer::new()).unwrap();
1201
1202 for consumer in &consumers {
1203 if *consumer.connected.lock().unwrap() {
1204 let sample =
1205 futures::executor::block_on(consumer.receiver.lock().unwrap().next())
1206 .expect("Received an empty buffer?");
1207 sample.buffer().expect("No buffer on the sample?");
1208 assert_eq!(sample.caps(), Some(caps.as_ref()));
1209 } else {
1210 debug_assert!(
1211 consumer
1212 .sink
1213 .try_pull_sample(gst::ClockTime::from_nseconds(0))
1214 .is_none(),
1215 "Disconnected consumer got a new sample?!"
1216 );
1217 }
1218 }
1219
1220 if i == 5 {
1221 consumers.first().unwrap().disconnect(&producer);
1222 }
1223 }
1224
1225 assert!(producer.last_sample().is_some());
1226
1227 assert_eq!(link1.pushed(), 6);
1228 assert_eq!(link1.dropped(), 0);
1229 assert_eq!(link2.pushed(), 10);
1230 assert_eq!(link2.dropped(), 0);
1231 }
1232
1233 fn check_consumer_commons(consumer: &gst_app::AppSrc) {
1234 assert_eq!(
1235 consumer.latency(),
1236 (Some(gst::ClockTime::ZERO), gst::ClockTime::NONE)
1237 );
1238 assert_eq!(consumer.format(), gst::Format::Time);
1239 assert!(consumer.is_live());
1240 assert!(consumer.is_handle_segment_change());
1241 assert_eq!(consumer.leaky_type(), gst_app::AppLeakyType::Downstream);
1242 assert!(!consumer.property::<bool>("automatic-eos"));
1243 }
1244
1245 #[test]
1246 fn configure_consumer_defaults() {
1247 gst::init().unwrap();
1248
1249 let consumer = gst_app::AppSrc::builder().build();
1250 StreamProducer::configure_consumer(&consumer);
1251 check_consumer_commons(&consumer);
1252
1253 assert_eq!(consumer.max_buffers(), 0);
1254 assert_eq!(consumer.max_bytes(), 0);
1255 assert_eq!(consumer.max_time().unwrap(), 500.mseconds());
1256 }
1257
1258 #[test]
1259 fn configure_consumer_with_defaults() {
1260 gst::init().unwrap();
1261
1262 let consumer = gst_app::AppSrc::builder().build();
1263 StreamProducer::configure_consumer_with(&consumer, ConsumerSettings::default());
1264 check_consumer_commons(&consumer);
1265
1266 assert_eq!(consumer.max_buffers(), 0);
1267 assert_eq!(consumer.max_bytes(), 0);
1268 assert_eq!(consumer.max_time().unwrap(), 500.mseconds());
1269 }
1270
1271 #[test]
1272 fn configure_consumer_with_specifics() {
1273 gst::init().unwrap();
1274
1275 let consumer = gst_app::AppSrc::builder().build();
1276
1277 StreamProducer::configure_consumer_with(
1278 &consumer,
1279 ConsumerSettings {
1280 max_buffer: 50,
1281 ..Default::default()
1282 },
1283 );
1284 check_consumer_commons(&consumer);
1285
1286 assert_eq!(consumer.max_buffers(), 50);
1287 assert_eq!(consumer.max_bytes(), 0);
1288 assert_eq!(consumer.max_time().unwrap(), 500.mseconds());
1289
1290 StreamProducer::configure_consumer_with(
1291 &consumer,
1292 ConsumerSettings {
1293 max_buffer: 10,
1294 max_bytes: 2.mebibytes(),
1295 ..Default::default()
1296 },
1297 );
1298 check_consumer_commons(&consumer);
1299
1300 assert_eq!(consumer.max_buffers(), 10);
1301 assert_eq!(consumer.max_bytes(), 2 * 1024 * 1024);
1302 assert_eq!(consumer.max_time().unwrap(), 500.mseconds());
1303
1304 StreamProducer::configure_consumer_with(
1305 &consumer,
1306 ConsumerSettings {
1307 max_time: gst::ClockTime::ZERO,
1308 ..Default::default()
1309 },
1310 );
1311 check_consumer_commons(&consumer);
1312
1313 assert_eq!(consumer.max_buffers(), 0);
1314 assert_eq!(consumer.max_bytes(), 0);
1315 assert!(consumer.max_time().unwrap().is_zero());
1316
1317 StreamProducer::configure_consumer_with(
1318 &consumer,
1319 ConsumerSettings {
1320 max_time: 750.mseconds(),
1321 ..Default::default()
1322 },
1323 );
1324 check_consumer_commons(&consumer);
1325
1326 assert_eq!(consumer.max_buffers(), 0);
1327 assert_eq!(consumer.max_bytes(), 0);
1328 assert_eq!(consumer.max_time().unwrap(), 750.mseconds());
1329 }
1330}