use clap::Parser;
use futures::prelude::*;
use gst::glib;
use gst::prelude::*;
use std::str::FromStr;
use std::sync::LazyLock;
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"gst-fallbacksrc-example",
gst::DebugColorFlags::empty(),
Some("gst-fallbacksrc-example"),
)
});
#[derive(Parser, Debug)]
struct Cli {
#[clap(long, short, action)]
uri: String,
#[clap(long, short, action)]
fallback_uri: String,
#[clap(long, short, action)]
audio_caps: Option<String>,
#[clap(long, short, action)]
video_caps: Option<String>,
}
fn sink_bin(is_encoded: bool, is_video: bool) -> gst::Bin {
let bin = gst::Bin::new();
let queue = gst::ElementFactory::make("queue")
.property_from_str("leaky", "downstream")
.property("max-size-bytes", 0u32)
.property("max-size-buffers", 0u32)
.property("max-size-time", 5.seconds())
.build()
.unwrap();
let decoder = if is_encoded {
gst::ElementFactory::make("decodebin").build().unwrap()
} else {
gst::ElementFactory::make("identity").build().unwrap()
};
let decoder_queue = gst::ElementFactory::make("queue")
.property_from_str("leaky", "downstream")
.property("max-size-bytes", 0u32)
.property("max-size-buffers", 0u32)
.property("max-size-time", 5.seconds())
.build()
.unwrap();
let (convert, sink) = if is_video {
(
gst::ElementFactory::make("videoconvert").build().unwrap(),
gst::ElementFactory::make("autovideosink").build().unwrap(),
)
} else {
(
gst::ElementFactory::make("audioconvert").build().unwrap(),
gst::ElementFactory::make("autoaudiosink").build().unwrap(),
)
};
bin.add_many([&queue, &decoder, &decoder_queue, &convert, &sink])
.unwrap();
if is_encoded {
let decoder_queue_sink_pad = decoder_queue
.static_pad("sink")
.expect("queue must have a sink pad");
decoder.connect_pad_added(glib::clone!(move |decodebin, pad| {
pad.link(&decoder_queue_sink_pad).unwrap();
let _ = decodebin.sync_state_with_parent();
}));
}
queue
.link(&decoder)
.expect("Failed to link queue to decoder");
if !is_encoded {
decoder
.link(&decoder_queue)
.expect("Failed to link decoder to queue");
}
decoder_queue
.link(&convert)
.expect("Failed to link queue to converter");
convert
.link(&sink)
.expect("Failed to link converter to sink");
let queue_sink_pad = queue
.static_pad("sink")
.expect("queue must have a sink pad");
let sink_pad_tmpl = queue
.pad_template("sink")
.expect("Failed to get sink pad template");
let sink_pad =
gst::GhostPad::builder_from_template_with_target(&sink_pad_tmpl, &queue_sink_pad)
.unwrap()
.build();
bin.add_pad(&sink_pad).expect("Failed to add sink pad");
bin
}
fn handle_caps(pipeline: &gst::Pipeline, src_pad: &gst::Pad) {
let caps = match src_pad.stream().and_then(|stream| stream.caps()) {
Some(caps) => caps,
None => {
return;
}
};
let s = match caps.structure(0) {
Some(s) => s,
None => {
return;
}
};
let src_pad_name = src_pad.name();
let codec = s.name().as_str();
let bin = if codec.starts_with("audio/x-raw") {
gst::info!(CAT, "Handling raw audio for pad {}", src_pad_name);
sink_bin(false, false)
} else if codec.starts_with("video/x-raw") {
gst::info!(CAT, "Handling raw video for pad {}", src_pad_name);
sink_bin(false, true)
} else if codec.starts_with("video/") {
gst::info!(CAT, "Handling encoded video for pad {}", src_pad_name);
sink_bin(true, true)
} else {
gst::info!(CAT, "Handling encoded audio for pad {}", src_pad_name);
sink_bin(true, false)
};
pipeline.add(&bin).unwrap();
bin.sync_state_with_parent().unwrap();
let sink_pad = bin.static_pad("sink").unwrap();
src_pad.link(&sink_pad).unwrap();
let src_pad_name = src_pad.name();
glib::timeout_add_seconds_once(
1,
glib::clone!(
#[weak]
pipeline,
move || {
pipeline.debug_to_dot_file(
gst::DebugGraphDetails::all(),
format!("caps-handled-on-pad-{}", src_pad_name),
);
}
),
);
}
fn main() {
gst::init().unwrap();
gstfallbackswitch::plugin_register_static().expect("Failed to register fallbacksrc plugin");
let cli = Cli::parse();
let pipeline = gst::Pipeline::new();
let context = glib::MainContext::default();
let fallbacksrc = gst::ElementFactory::make("fallbacksrc")
.property("timeout", gst::ClockTime::from_seconds(5))
.property("min-latency", gst::ClockTime::from_mseconds(100))
.property("uri", cli.uri)
.property("fallback-uri", cli.fallback_uri)
.property("restart-on-eos", true)
.build()
.unwrap();
if let Some(caps) = cli.audio_caps {
match gst::Caps::from_str(&caps) {
Ok(c) => {
fallbacksrc.set_property("audio-caps", c);
}
_ => {
gst::error!(CAT, "Invalid audio caps");
return;
}
}
}
if let Some(caps) = cli.video_caps {
match gst::Caps::from_str(&caps) {
Ok(c) => {
fallbacksrc.set_property("video-caps", c);
}
_ => {
gst::error!(CAT, "Invalid video caps");
return;
}
}
}
fallbacksrc.connect_pad_added(glib::clone!(
#[weak]
pipeline,
move |_, src_pad| {
let caps = src_pad
.stream()
.and_then(|stream| stream.caps())
.expect("Expect stream caps to be valid here");
gst::info!(CAT, "{} added with caps {caps:?}", src_pad.name());
if !src_pad.is_linked() {
handle_caps(&pipeline, src_pad);
}
src_pad.connect_caps_notify(glib::clone!(
#[weak]
pipeline,
move |src_pad| {
gst::info!(CAT, "{} caps changed {caps:?}", src_pad.name());
if let Some(peer_pad) = src_pad.peer() {
if src_pad.unlink(&peer_pad).is_err() {
gst::error!(
CAT,
"Failed to unlink {} and {}",
src_pad.name(),
peer_pad.name()
);
}
if let Some(el) = peer_pad
.parent()
.and_then(|p| p.downcast::<gst::Element>().ok())
{
let _ = pipeline.remove(&el);
let _ = el.set_state(gst::State::Null);
}
}
handle_caps(&pipeline, src_pad);
}
));
}
));
pipeline.add(&fallbacksrc).unwrap();
pipeline.set_state(gst::State::Playing).unwrap();
context.block_on(async {
let bus = pipeline.bus().expect("Pipeline should have a bus");
let mut messages = bus.stream();
while let Some(msg) = messages.next().await {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
gst::info!(
CAT,
"EOS from {}",
msg.src()
.map(|s| String::from(s.path_string()))
.unwrap_or_else(|| "None".into())
);
break;
}
MessageView::StateChanged(sc) if msg.src() == Some(pipeline.upcast_ref()) => {
pipeline.debug_to_dot_file(
gst::DebugGraphDetails::all(),
format!("{}-{:?}-{:?}", pipeline.name(), sc.old(), sc.current()),
);
}
MessageView::Error(err) => {
pipeline.debug_to_dot_file(gst::DebugGraphDetails::ALL, "fallbacksrc-error");
gst::info!(
CAT,
"Got error from {}: {} ({})",
msg.src()
.map(|s| String::from(s.path_string()))
.unwrap_or_else(|| "None".into()),
err.error(),
err.debug().unwrap_or_else(|| "".into()),
);
break;
}
_ => (),
};
}
});
pipeline.set_state(gst::State::Null).unwrap();
}