use futures::prelude::*;
use gst::prelude::*;
use serial_test::serial;
use pretty_assertions::assert_eq;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstrsinter::plugin_register_static().unwrap();
});
}
fn start_consumer(producer_name: &str) -> gst_check::Harness {
let mut hc = gst_check::Harness::new("intersrc");
hc.element()
.unwrap()
.set_property("producer-name", producer_name);
hc.play();
hc
}
fn start_producer(producer_name: &str) -> (gst::Pad, gst::Element) {
let element = gst::ElementFactory::make("intersink")
.property("producer-name", producer_name)
.build()
.unwrap();
element.set_state(gst::State::Playing).unwrap();
let sinkpad = element.static_pad("sink").unwrap();
let srcpad = gst::Pad::new(gst::PadDirection::Src);
srcpad.set_active(true).unwrap();
srcpad.link(&sinkpad).unwrap();
srcpad.push_event(gst::event::StreamStart::builder("foo").build());
srcpad
.push_event(gst::event::Caps::builder(&gst::Caps::builder("video/x-raw").build()).build());
srcpad.push_event(
gst::event::Segment::builder(&gst::FormattedSegment::<gst::format::Time>::new()).build(),
);
(srcpad, element)
}
fn push_one(srcpad: &gst::Pad, pts: gst::ClockTime) {
let mut inbuf = gst::Buffer::with_size(1).unwrap();
{
let buf = inbuf.get_mut().unwrap();
buf.set_pts(pts);
}
srcpad.push(inbuf).unwrap();
}
#[test]
#[serial]
fn test_forward_one_buffer() {
init();
let mut hc = start_consumer("p1");
let (srcpad, element) = start_producer("p1");
push_one(&srcpad, gst::ClockTime::from_nseconds(1));
let outbuf = hc.pull().unwrap();
assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(1)));
element.set_state(gst::State::Null).unwrap();
}
#[test]
#[serial]
fn test_change_name_of_producer() {
init();
let mut hc1 = start_consumer("p1");
let mut hc2 = start_consumer("p2");
let (srcpad, element) = start_producer("p1");
push_one(&srcpad, gst::ClockTime::from_nseconds(1));
let outbuf = hc1.pull().unwrap();
assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(1)));
element.set_property("producer-name", "p2");
push_one(&srcpad, gst::ClockTime::from_nseconds(2));
let outbuf = hc2.pull().unwrap();
assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(2)));
element.set_property("producer-name", "p1");
push_one(&srcpad, gst::ClockTime::from_nseconds(3));
let outbuf = hc1.pull().unwrap();
assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(3)));
element.set_state(gst::State::Null).unwrap();
}
#[test]
#[serial]
fn test_change_producer_name() {
init();
let mut hc = start_consumer("p1");
let (srcpad1, element1) = start_producer("p1");
let (srcpad2, element2) = start_producer("p2");
push_one(&srcpad2, gst::ClockTime::from_nseconds(1));
push_one(&srcpad1, gst::ClockTime::from_nseconds(2));
let outbuf = hc.pull().unwrap();
assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(2)));
hc.element().unwrap().set_property("producer-name", "p2");
push_one(&srcpad1, gst::ClockTime::from_nseconds(3));
push_one(&srcpad2, gst::ClockTime::from_nseconds(4));
let outbuf = hc.pull().unwrap();
assert_eq!(outbuf.pts(), Some(gst::ClockTime::from_nseconds(4)));
element1.set_state(gst::State::Null).unwrap();
element2.set_state(gst::State::Null).unwrap();
}
#[test]
#[serial]
fn test_event_forwarding() {
init();
let mut hc = start_consumer("p");
let (srcpad, intersink) = start_producer("p");
intersink.set_state(gst::State::Null).unwrap();
intersink.set_property(
"event-types",
gst::Array::new(vec![gst::EventType::Eos, gst::EventType::CustomDownstream]),
);
intersink.set_state(gst::State::Playing).unwrap();
push_one(&srcpad, gst::ClockTime::from_nseconds(1));
let s = gst::Structure::builder("MyEvent")
.field("unsigned", 100u64)
.build();
assert!(srcpad.push_event(gst::event::CustomDownstream::new(s)));
assert!(srcpad.push_event(gst::event::Eos::new()));
let mut found = false;
loop {
use gst::EventView;
if let Ok(event) = hc.pull_event() {
match event.view() {
EventView::CustomDownstream(e) => {
let v = e.structure().unwrap().get::<u64>("unsigned");
assert!(v.is_ok_and(|v| v == 100u64));
found = true;
break;
}
EventView::Eos(..) => {
break;
}
_ => (),
};
}
}
intersink.set_state(gst::State::Null).unwrap();
assert!(found);
}
#[test]
#[serial]
fn test_intersrc_upstream_event_forwarding() {
init();
let pipe_up = gst::parse::launch("fakesrc name=src ! intersink producer-name=p")
.unwrap()
.downcast::<gst::Pipeline>()
.unwrap();
let pipe_down =
gst::parse::launch("intersrc name=intersrc producer-name=p ! fakesink name=sink")
.unwrap()
.downcast::<gst::Pipeline>()
.unwrap();
let sink_pad = pipe_down
.by_name("sink")
.unwrap()
.static_pad("sink")
.unwrap();
let intersrc = pipe_down.by_name("intersrc").unwrap();
intersrc.set_property(
"event-types",
gst::Array::new(vec![
gst::EventType::Navigation,
gst::EventType::CustomUpstream,
]),
);
let src_pad = pipe_up.by_name("src").unwrap().static_pad("src").unwrap();
pipe_up.set_state(gst::State::Playing).unwrap();
pipe_down.set_state(gst::State::Playing).unwrap();
let got_custom_upstream = Arc::new(AtomicBool::new(false));
let got_custom_upstream_clone = got_custom_upstream.clone();
let srcpad_clone = src_pad.clone();
src_pad.add_probe(gst::PadProbeType::EVENT_UPSTREAM, move |_pad, info| {
if let Some(e) = info.event() {
#[allow(clippy::single_match)]
match e.type_() {
gst::EventType::CustomUpstream => {
got_custom_upstream_clone.store(true, Ordering::SeqCst);
srcpad_clone.push_event(gst::event::Eos::new());
}
_ => {}
}
}
gst::PadProbeReturn::Ok
});
let s = gst::Structure::builder("MyEvent")
.field("unsigned", 100u64)
.build();
sink_pad.push_event(gst::event::CustomUpstream::new(s));
let bus = pipe_down.bus().unwrap();
while let Some(msg) = bus.timed_pop(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
break;
}
MessageView::Error(..) => unreachable!(),
_ => (),
}
}
assert!(got_custom_upstream.load(Ordering::SeqCst));
pipe_up.set_state(gst::State::Null).unwrap();
pipe_down.set_state(gst::State::Null).unwrap();
}
fn test_latency_propagation_with(sync: bool) {
init();
let pipe_up = gst::parse::launch(&format!(
"videotestsrc is-live=true ! intersink sync={sync} name=producer"
))
.unwrap()
.downcast::<gst::Pipeline>()
.unwrap();
let pipe_down = gst::parse::launch("intersrc ! fakesink name=sink")
.unwrap()
.downcast::<gst::Pipeline>()
.unwrap();
let got_latency = Arc::new(AtomicBool::new(false));
let sink_pad = pipe_down
.by_name("sink")
.unwrap()
.static_pad("sink")
.unwrap();
sink_pad.add_probe(
gst::PadProbeType::QUERY_UPSTREAM | gst::PadProbeType::PULL,
{
let got_latency = got_latency.clone();
move |_, info| {
let Some(q) = info.query() else {
unreachable!();
};
if let gst::QueryView::Latency(q) = q.view() {
println!(
"==> Downstream: latency query (sync {sync}): {}",
q.result().1
);
if q.result().1 > gst::ClockTime::ZERO {
got_latency.store(true, Ordering::SeqCst);
}
}
gst::PadProbeReturn::Ok
}
},
);
let clock = gst::SystemClock::obtain();
let base_time = clock.time();
pipe_up.set_clock(Some(&clock)).unwrap();
pipe_up.set_start_time(gst::ClockTime::NONE);
pipe_up.set_base_time(base_time);
pipe_down.set_clock(Some(&clock)).unwrap();
pipe_down.set_start_time(gst::ClockTime::NONE);
pipe_down.set_base_time(base_time);
pipe_up.set_state(gst::State::Playing).unwrap();
pipe_down.set_state(gst::State::Playing).unwrap();
futures::executor::block_on(async {
use gst::MessageView::*;
let mut bus_up_stream = pipe_up.bus().unwrap().stream();
let mut bus_down_stream = pipe_down.bus().unwrap().stream();
loop {
if got_latency.load(Ordering::SeqCst) {
break;
}
futures::select! {
msg = bus_down_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
println!("\n==> Got downstream pipeline latency message (sync {sync})");
let _ = pipe_down.recalculate_latency();
}
Error(err) => unreachable!("inter::latency_propagation (sync {sync}) {err:?}"),
_ => (),
}
}
msg = bus_up_stream.next() => {
let Some(msg) = msg else { continue };
match msg.view() {
Latency(_) => {
println!("\n==> Got upstream pipeline latency message (sync {sync})");
let _ = pipe_up.recalculate_latency();
}
Error(err) => unreachable!("inter::latency_propagation (sync {sync}) {err:?}"),
_ => (),
}
}
};
}
});
let producer_pad = pipe_up
.by_name("producer")
.unwrap()
.static_pad("sink")
.unwrap();
let mut q_lat_prod = gst::query::Latency::new();
assert!(producer_pad.peer_query(&mut q_lat_prod));
let mut q_lat_sink = gst::query::Latency::new();
assert!(sink_pad.peer_query(&mut q_lat_sink));
let expected = if sync {
q_lat_prod.result().1 + 20.mseconds() } else {
q_lat_prod.result().1
};
assert_eq!(expected, q_lat_sink.result().1);
pipe_up.set_state(gst::State::Null).unwrap();
pipe_down.set_state(gst::State::Null).unwrap();
}
#[test]
#[serial]
fn test_latency_propagation_sync() {
test_latency_propagation_with(true);
}
#[test]
#[serial]
fn test_latency_propagation_non_sync() {
test_latency_propagation_with(false);
}