gst-plugin-inter 0.15.0

GStreamer Inter Plugin
Documentation
use gst::prelude::*;
use gst_utils::streamproducer::{ConsumerSettings, ProducerSettings};

use std::collections::HashMap;
use std::sync::Mutex;

use anyhow::{Error, anyhow};
use std::sync::LazyLock;

pub enum InterStreamProducer {
    Pending {
        consumers: HashMap<gst_app::AppSrc, ConsumerSettings>,
    },
    Active {
        producer: gst_utils::StreamProducer,
        links: HashMap<gst_app::AppSrc, gst_utils::ConsumptionLink>,
    },
}

static PRODUCERS: LazyLock<Mutex<HashMap<String, InterStreamProducer>>> =
    LazyLock::new(|| Mutex::new(HashMap::new()));

fn toplevel(obj: &gst::Object) -> gst::Object {
    match obj.parent() {
        Some(parent) => toplevel(&parent),
        _ => obj.clone(),
    }
}

fn ensure_different_toplevel(producer: &gst_app::AppSink, consumer: &gst_app::AppSrc) {
    let top_a = toplevel(producer.upcast_ref());
    let top_b = toplevel(consumer.upcast_ref());

    if top_a == top_b {
        gst::glib::g_critical!(
            "gstrsinter",
            "Intersink with appsink {} should not share the same toplevel bin \
             as intersrc with appsrc {}, this results in loops in latency calculation",
            producer.name(),
            consumer.name()
        );
    }
}

impl InterStreamProducer {
    pub fn acquire(
        name: &str,
        appsink: &gst_app::AppSink,
        settings: ProducerSettings,
    ) -> Result<gst_utils::StreamProducer, Error> {
        let mut producers = PRODUCERS.lock().unwrap();

        match producers.remove(name) {
            Some(producer) => match producer {
                InterStreamProducer::Pending { consumers } => {
                    let producer = gst_utils::StreamProducer::with(appsink, settings);
                    let mut links = HashMap::new();

                    for (consumer, settings) in consumers {
                        ensure_different_toplevel(appsink, &consumer);

                        let link = producer
                            .add_consumer_with(&consumer, settings)
                            .expect("consumer should not have already been added");
                        links.insert(consumer, link);
                    }

                    producers.insert(
                        name.to_string(),
                        InterStreamProducer::Active {
                            producer: producer.clone(),
                            links,
                        },
                    );

                    Ok(producer)
                }
                InterStreamProducer::Active { .. } => {
                    producers.insert(name.to_string(), producer);

                    Err(anyhow!(
                        "An active producer already exists with name {}",
                        name
                    ))
                }
            },
            _ => {
                let producer = gst_utils::StreamProducer::with(appsink, settings);

                producers.insert(
                    name.to_string(),
                    InterStreamProducer::Active {
                        producer: producer.clone(),
                        links: HashMap::new(),
                    },
                );

                Ok(producer)
            }
        }
    }

    pub fn release(name: &str) -> Option<gst_app::AppSink> {
        let mut producers = PRODUCERS.lock().unwrap();

        match producers.remove(name) {
            Some(producer) => match producer {
                InterStreamProducer::Pending { .. } => None,
                InterStreamProducer::Active { links, .. } if links.is_empty() => None,
                InterStreamProducer::Active { links, producer } => {
                    producers.insert(
                        name.to_string(),
                        InterStreamProducer::Pending {
                            consumers: HashMap::from_iter(
                                links
                                    .iter()
                                    .map(|(consumer, link)| (consumer.clone(), link.settings())),
                            ),
                        },
                    );

                    Some(producer.appsink().clone())
                }
            },
            _ => None,
        }
    }

    pub fn subscribe(name: &str, consumer: &gst_app::AppSrc, settings: ConsumerSettings) {
        let mut producers = PRODUCERS.lock().unwrap();

        if let Some(producer) = producers.get_mut(name) {
            match producer {
                InterStreamProducer::Pending { consumers } => {
                    consumers.insert(consumer.clone(), settings);
                }
                InterStreamProducer::Active { producer, links } => {
                    ensure_different_toplevel(producer.appsink(), consumer);

                    let link = producer
                        .add_consumer_with(consumer, settings)
                        .expect("consumer should not already have been added");
                    links.insert(consumer.clone(), link);
                }
            }
        } else {
            let producer = InterStreamProducer::Pending {
                consumers: [(consumer.clone(), settings)].into(),
            };
            producers.insert(name.to_string(), producer);
        }
    }

    pub fn unsubscribe(name: &str, consumer: &gst_app::AppSrc) -> bool {
        let mut producers = PRODUCERS.lock().unwrap();

        if let Some(producer) = producers.get_mut(name) {
            match producer {
                InterStreamProducer::Pending { consumers } => consumers.remove(consumer).is_some(),
                InterStreamProducer::Active { links, .. } => links.remove(consumer).is_some(),
            }
        } else {
            false
        }
    }
}