gst-plugin-threadshare 0.9.3

GStreamer Threadshare Plugin
use gst::glib;
use once_cell::sync::Lazy;

mod args;
use args::*;

#[macro_use]
mod macros;

mod sink;
mod src;

use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
    gst::DebugCategory::new(
        "ts-standalone-main",
        gst::DebugColorFlags::empty(),
        Some("Thread-sharing standalone test main"),
    )
});

fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
    src::register(plugin)?;
    sink::async_mutex::register(plugin)?;
    sink::sync_mutex::register(plugin)?;
    sink::task::register(plugin)?;

    Ok(())
}

gst::plugin_define!(
    threadshare_standalone_test,
    env!("CARGO_PKG_DESCRIPTION"),
    plugin_init,
    concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
    "LGPL",
    env!("CARGO_PKG_NAME"),
    env!("CARGO_PKG_NAME"),
    env!("CARGO_PKG_REPOSITORY"),
    env!("BUILD_REL_DATE")
);

fn main() {
    use gst::prelude::*;
    use std::time::Instant;

    gst::init().unwrap();
    self::plugin_register_static().unwrap();

    #[cfg(debug_assertions)]
    gst::warning!(CAT, "RUNNING DEBUG BUILD");

    let args = args();

    let pipeline = gst::Pipeline::default();

    for i in 0..args.streams {
        let ctx_name = format!("standalone {}", i % args.groups);

        let src = gst::ElementFactory::make(src::ELEMENT_NAME)
            .name(format!("src-{i}").as_str())
            .property("context", &ctx_name)
            .property("context-wait", args.wait)
            .property("push-period", args.push_period)
            .property("num-buffers", args.num_buffers)
            .build()
            .unwrap();

        let sink = gst::ElementFactory::make(args.sink.element_name())
            .name(format!("sink-{i}").as_str())
            .property("context", &ctx_name)
            .property("context-wait", args.wait)
            .build()
            .unwrap();

        if i == 0 {
            src.set_property("main-elem", true);
            sink.set_property("main-elem", true);

            if !args.disable_stats_log {
                // Don't use the last 5 secs in stats
                // otherwise we get outliers when reaching EOS.
                // Note that stats don't start before the 20 first seconds
                // and we get 50 buffers per sec.
                const BUFFERS_BEFORE_LOGS: i32 = 20 * 50;
                const BUFFERS_TO_SKIP: i32 = BUFFERS_BEFORE_LOGS + 5 * 50;
                if args.num_buffers > BUFFERS_TO_SKIP {
                    sink.set_property("push-period", args.push_period);
                    sink.set_property("logs-stats", true);
                    let max_buffers = args.num_buffers - BUFFERS_TO_SKIP;
                    sink.set_property("max-buffers", max_buffers);
                } else {
                    gst::warning!(CAT, "Not enough buffers to log, disabling stats");
                }
            }
        }

        let elements = &[&src, &sink];
        pipeline.add_many(elements).unwrap();
        gst::Element::link_many(elements).unwrap();
    }

    let l = glib::MainLoop::new(None, false);

    let bus = pipeline.bus().unwrap();
    let terminated_count = Arc::new(AtomicU32::new(0));
    let pipeline_clone = pipeline.clone();
    let l_clone = l.clone();
    bus.add_watch(move |_, msg| {
        use gst::MessageView;
        match msg.view() {
            MessageView::Eos(_) => {
                // Actually, we don't post EOS (see sinks impl).
                gst::info!(CAT, "Received eos");
                l_clone.quit();

                glib::Continue(false)
            }
            MessageView::Error(msg) => {
                if let gst::MessageView::Error(msg) = msg.message().view() {
                    if msg.error().matches(gst::LibraryError::Shutdown) {
                        if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 {
                            gst::info!(CAT, "Received all shutdown requests");
                            l_clone.quit();

                            return glib::Continue(false);
                        } else {
                            return glib::Continue(true);
                        }
                    }
                }

                gst::error!(
                    CAT,
                    "Error from {:?}: {} ({:?})",
                    msg.src().map(|s| s.path_string()),
                    msg.error(),
                    msg.debug()
                );
                l_clone.quit();

                glib::Continue(false)
            }
            _ => glib::Continue(true),
        }
    })
    .expect("Failed to add bus watch");

    gst::info!(CAT, "Switching to Ready");
    let start = Instant::now();
    pipeline.set_state(gst::State::Ready).unwrap();
    gst::info!(CAT, "Switching to Ready took {:.2?}", start.elapsed());

    gst::info!(CAT, "Switching to Playing");
    let start = Instant::now();
    pipeline.set_state(gst::State::Playing).unwrap();
    gst::info!(CAT, "Switching to Playing took {:.2?}", start.elapsed());

    l.run();

    gst::info!(CAT, "Switching to Ready");
    let stop = Instant::now();
    pipeline_clone.set_state(gst::State::Ready).unwrap();
    gst::info!(CAT, "Switching to Ready took {:.2?}", stop.elapsed());

    gst::info!(CAT, "Shutting down");
    let stop = Instant::now();
    pipeline_clone.set_state(gst::State::Null).unwrap();
    gst::info!(CAT, "Shutting down took {:.2?}", stop.elapsed());
}