use gst::prelude::*;
use crate::RUNTIME;
use crate::signaller::{Signallable, Signaller, prelude::*};
use crate::utils::{
self, AUDIO_CAPS, CONTROL_DATA_CHANNEL_LABEL, Codec, Codecs, INPUT_DATA_CHANNEL_LABEL,
NavigationEvent, RTP_CAPS, VIDEO_CAPS,
};
use crate::webrtcsrc::WebRTCSrcPad;
use anyhow::{Context, Error};
use gst::glib;
use gst::subclass::prelude::*;
use gst_webrtc::WebRTCDataChannel;
use itertools::Itertools;
use std::borrow::BorrowMut;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::atomic::AtomicU16;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Condvar, LazyLock, Mutex};
use url::Url;
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19302");
const DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION: bool = false;
const DEFAULT_ENABLE_CONTROL_DATA_CHANNEL: bool = false;
const DEFAULT_DO_RETRANSMISSION: bool = true;
const AUDIO_REQUEST_PAD_NAME_PREFIX: &str = "req_audio_";
const VIDEO_REQUEST_PAD_NAME_PREFIX: &str = "req_video_";
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"webrtcsrc",
gst::DebugColorFlags::empty(),
Some("WebRTC src"),
)
});
struct Settings {
stun_server: Option<String>,
turn_servers: gst::Array,
signaller: Signallable,
meta: Option<gst::Structure>,
video_codecs: Vec<Codec>,
audio_codecs: Vec<Codec>,
enable_data_channel_navigation: bool,
enable_control_data_channel: bool,
do_retransmission: bool,
}
#[derive(Default)]
pub struct BaseWebRTCSrc {
settings: Mutex<Settings>,
state: Mutex<State>,
n_request_video_pads: AtomicU16,
n_request_audio_pads: AtomicU16,
}
#[glib::object_subclass]
impl ObjectSubclass for BaseWebRTCSrc {
const NAME: &'static str = "GstBaseWebRTCSrc";
type Type = super::BaseWebRTCSrc;
type ParentType = gst::Bin;
type Interfaces = (gst::ChildProxy,);
}
unsafe impl<T: BaseWebRTCSrcImpl> IsSubclassable<T> for super::BaseWebRTCSrc {
fn class_init(class: &mut glib::Class<Self>) {
Self::parent_class_init::<T>(class);
}
}
pub(crate) trait BaseWebRTCSrcImpl:
BinImpl + ObjectSubclass<Type: IsA<super::BaseWebRTCSrc>>
{
}
impl ObjectImpl for BaseWebRTCSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPS: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![
glib::ParamSpecString::builder("stun-server")
.nick("The STUN server to use")
.blurb("The STUN server of the form stun://host:port")
.flags(glib::ParamFlags::READWRITE)
.default_value(DEFAULT_STUN_SERVER)
.mutable_ready()
.build(),
gst::ParamSpecArray::builder("turn-servers")
.nick("List of TURN servers to use")
.blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">")
.element_spec(&glib::ParamSpecString::builder("turn-server")
.nick("TURN Server")
.blurb("The TURN server of the form turn(s)://username:password@host:port.")
.build()
)
.mutable_ready()
.build(),
glib::ParamSpecObject::builder::<Signallable>("signaller")
.flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY)
.blurb("The Signallable object to use to handle WebRTC Signalling")
.build(),
glib::ParamSpecBoxed::builder::<gst::Structure>("meta")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb("Free form metadata about the consumer")
.build(),
gst::ParamSpecArray::builder("video-codecs")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb(&format!("Names of video codecs to be be used during the SDP negotiation. Valid values: [{}]",
Codecs::video_codecs()
.map(|c| c.name.as_str())
.join(", ")
))
.element_spec(&glib::ParamSpecString::builder("video-codec-name").build())
.build(),
gst::ParamSpecArray::builder("audio-codecs")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb(&format!("Names of audio codecs to be be used during the SDP negotiation. Valid values: [{}]",
Codecs::audio_codecs()
.map(|c| c.name.as_str())
.join(", ")
))
.element_spec(&glib::ParamSpecString::builder("audio-codec-name").build())
.build(),
glib::ParamSpecBoolean::builder("enable-data-channel-navigation")
.nick("Enable data channel navigation")
.blurb("Enable navigation events through a dedicated WebRTCDataChannel")
.default_value(DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION)
.mutable_ready()
.build(),
glib::ParamSpecBoolean::builder("enable-control-data-channel")
.nick("Enable control data channel")
.blurb("Enable sending control requests through a dedicated WebRTCDataChannel")
.default_value(DEFAULT_ENABLE_CONTROL_DATA_CHANNEL)
.mutable_ready()
.build(),
glib::ParamSpecBoolean::builder("do-retransmission")
.nick("Enable retransmission")
.blurb("Send retransmission events upstream when a packet is late")
.default_value(DEFAULT_DO_RETRANSMISSION)
.mutable_ready()
.build(),
]
});
PROPS.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"signaller" => {
let signaller = value
.get::<Option<Signallable>>()
.expect("type checked upstream");
if let Some(signaller) = signaller {
self.settings.lock().unwrap().signaller = signaller;
}
}
"video-codecs" => {
self.settings.lock().unwrap().video_codecs = value
.get::<gst::ArrayRef>()
.expect("type checked upstream")
.as_slice()
.iter()
.filter_map(|codec_name| {
Codecs::find(codec_name.get::<&str>().expect("type checked upstream"))
})
.collect::<Vec<Codec>>()
}
"audio-codecs" => {
self.settings.lock().unwrap().audio_codecs = value
.get::<gst::ArrayRef>()
.expect("type checked upstream")
.as_slice()
.iter()
.filter_map(|codec_name| {
Codecs::find(codec_name.get::<&str>().expect("type checked upstream"))
})
.collect::<Vec<Codec>>()
}
"stun-server" => {
self.settings.lock().unwrap().stun_server = value
.get::<Option<String>>()
.expect("type checked upstream");
}
"turn-servers" => {
let mut settings = self.settings.lock().unwrap();
settings.turn_servers = value.get::<gst::Array>().expect("type checked upstream");
}
"meta" => {
self.settings.lock().unwrap().meta = value
.get::<Option<gst::Structure>>()
.expect("type checked upstream")
}
"enable-data-channel-navigation" => {
let mut settings = self.settings.lock().unwrap();
settings.enable_data_channel_navigation = value.get::<bool>().unwrap();
}
"enable-control-data-channel" => {
let mut settings = self.settings.lock().unwrap();
settings.enable_control_data_channel = value.get::<bool>().unwrap();
}
"do-retransmission" => {
let mut settings = self.settings.lock().unwrap();
settings.do_retransmission = value.get::<bool>().unwrap();
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"signaller" => self.settings.lock().unwrap().signaller.to_value(),
"video-codecs" => gst::Array::new(
self.settings
.lock()
.unwrap()
.video_codecs
.iter()
.map(|v| &v.name),
)
.to_value(),
"audio-codecs" => gst::Array::new(
self.settings
.lock()
.unwrap()
.audio_codecs
.iter()
.map(|v| &v.name),
)
.to_value(),
"stun-server" => self.settings.lock().unwrap().stun_server.to_value(),
"turn-servers" => self.settings.lock().unwrap().turn_servers.to_value(),
"meta" => self.settings.lock().unwrap().meta.to_value(),
"enable-data-channel-navigation" => {
let settings = self.settings.lock().unwrap();
settings.enable_data_channel_navigation.to_value()
}
"enable-control-data-channel" => {
let settings = self.settings.lock().unwrap();
settings.enable_control_data_channel.to_value()
}
"do-retransmission" => self.settings.lock().unwrap().do_retransmission.to_value(),
name => panic!("{name} getter not implemented"),
}
}
fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: LazyLock<Vec<glib::subclass::Signal>> = LazyLock::new(|| {
vec![
glib::subclass::Signal::builder("request-encoded-filter")
.param_types([
Option::<String>::static_type(),
String::static_type(),
Option::<gst::Caps>::static_type(),
])
.return_type::<gst::Element>()
.build(),
]
});
SIGNALS.as_ref()
}
fn constructed(&self) {
self.parent_constructed();
let signaller = self.settings.lock().unwrap().signaller.clone();
self.connect_signaller(&signaller);
let obj = &*self.obj();
obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
obj.set_element_flags(gst::ElementFlags::SOURCE);
}
}
impl Default for Settings {
fn default() -> Self {
let signaller = Signaller::default();
Self {
stun_server: DEFAULT_STUN_SERVER.map(|v| v.to_string()),
turn_servers: Default::default(),
signaller: signaller.upcast(),
meta: Default::default(),
audio_codecs: Codecs::audio_codecs()
.filter(|codec| !codec.is_raw)
.cloned()
.collect(),
video_codecs: Codecs::video_codecs()
.filter(|codec| !codec.is_raw)
.cloned()
.collect(),
enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION,
enable_control_data_channel: DEFAULT_ENABLE_CONTROL_DATA_CHANNEL,
do_retransmission: DEFAULT_DO_RETRANSMISSION,
}
}
}
#[allow(dead_code)]
struct SignallerSignals {
error: glib::SignalHandlerId,
session_started: glib::SignalHandlerId,
session_ended: glib::SignalHandlerId,
request_meta: glib::SignalHandlerId,
session_description: glib::SignalHandlerId,
handle_ice: glib::SignalHandlerId,
session_requested: glib::SignalHandlerId,
}
impl SessionInner {
fn webrtcbin(&self) -> gst::Bin {
self.webrtcbin.clone().downcast::<gst::Bin>().unwrap()
}
fn new(session_id: &str) -> Result<Self, Error> {
let webrtcbin = gst::ElementFactory::make("webrtcbin")
.property("bundle-policy", gst_webrtc::WebRTCBundlePolicy::MaxBundle)
.build()
.with_context(|| "Failed to make element webrtcbin".to_string())?;
Ok(Self {
id: session_id.to_string(),
webrtcbin,
navigation_data_channel: None,
control_data_channel: None,
n_video_pads: AtomicU16::new(0),
n_audio_pads: AtomicU16::new(0),
flow_combiner: Mutex::new(gst_base::UniqueFlowCombiner::new()),
request_counter: 0,
pending_srcpads: HashMap::new(),
})
}
fn get_stream_id(
&self,
transceiver: Option<gst_webrtc::WebRTCRTPTransceiver>,
mline: Option<u32>,
) -> Option<String> {
let mline = transceiver.map_or(mline, |t| Some(t.mlineindex()));
mline.map(|mline| {
let cs = glib::Checksum::new(glib::ChecksumType::Sha256).unwrap();
format!("{}:{mline}", cs.string().unwrap())
})
}
fn take_pending_src_pad(
&mut self,
webrtcbin_src: &gst::Pad,
) -> Option<(WebRTCSrcPad, gst::Caps)> {
self.get_stream_id(
Some(webrtcbin_src.property::<gst_webrtc::WebRTCRTPTransceiver>("transceiver")),
None,
)
.and_then(|stream_id| self.pending_srcpads.remove(&stream_id))
}
fn get_src_pad_from_webrtcbin_pad(
&self,
webrtcbin_src: &gst::Pad,
element: &super::BaseWebRTCSrc,
) -> Option<WebRTCSrcPad> {
self.get_stream_id(
Some(webrtcbin_src.property::<gst_webrtc::WebRTCRTPTransceiver>("transceiver")),
None,
)
.and_then(|stream_id| {
element.iterate_src_pads().into_iter().find_map(|s| {
let pad = s.ok()?.downcast::<WebRTCSrcPad>().unwrap();
if pad.imp().stream_id() == stream_id {
Some(pad)
} else {
None
}
})
})
}
fn send_navigation_event(
&mut self,
evt: gst_video::NavigationEvent,
element: &super::BaseWebRTCSrc,
) {
if let Some(data_channel) = &self.navigation_data_channel.borrow_mut() {
let nav_event = NavigationEvent {
mid: None,
event: evt,
};
match serde_json::to_string(&nav_event).ok() {
Some(str) => {
gst::trace!(CAT, obj = element, "Sending navigation event to peer",);
if let Err(err) = data_channel.send_string_full(Some(str.as_str())) {
gst::error!(
CAT,
obj = element,
"Failed sending navigation event to peer: {err}",
);
}
}
None => {
gst::error!(CAT, obj = element, "Could not serialize navigation event",);
}
}
}
}
fn send_control_request(
&mut self,
request: utils::ControlRequest,
element: &super::BaseWebRTCSrc,
mid: Option<String>,
) {
if let Some(data_channel) = &self.control_data_channel.borrow_mut() {
let msg = utils::ControlRequestMessage {
id: self.request_counter,
mid,
request: utils::StringOrRequest::Request(request),
};
self.request_counter += 1;
match serde_json::to_string(&msg).ok() {
Some(str) => {
gst::trace!(CAT, obj = element, "Sending control request to peer",);
if let Err(err) = data_channel.send_string_full(Some(str.as_str())) {
gst::error!(
CAT,
obj = element,
"Failed sending control request to peer: {err}",
);
}
}
None => {
gst::error!(CAT, obj = element, "Could not serialize navigation event",);
}
}
}
}
fn handle_webrtc_src_pad(
&mut self,
bin: &gst::Bin,
webrtcbin_pad: &gst::Pad,
element: &super::BaseWebRTCSrc,
) -> gst::GhostPad {
let srcpad_and_caps = self.take_pending_src_pad(webrtcbin_pad);
if let Some((ref srcpad, ref caps)) = srcpad_and_caps {
let stream_id = srcpad.imp().stream_id();
let mut builder = gst::event::StreamStart::builder(&stream_id);
if let Some(stream_start) = webrtcbin_pad.sticky_event::<gst::event::StreamStart>(0) {
builder = builder
.seqnum(stream_start.seqnum())
.group_id(stream_start.group_id().unwrap_or_else(gst::GroupId::next));
}
gst::debug!(
CAT,
obj = element,
"Storing id {stream_id} on {webrtcbin_pad:?}",
);
webrtcbin_pad.store_sticky_event(&builder.build()).ok();
srcpad.imp().set_webrtc_pad(webrtcbin_pad.downgrade());
if srcpad.parent().is_none() {
element
.add_pad(srcpad)
.expect("Adding ghost pad should never fail");
}
let media_type = caps
.structure(0)
.expect("Passing empty caps is invalid")
.get::<&str>("media")
.expect("Only caps with a `media` field are expected when creating the pad");
let settings = element.imp().settings.lock().unwrap();
let (raw_caps, encoded_caps) = if media_type == "video" {
let encoded_video_caps = settings
.video_codecs
.iter()
.map(|c| c.caps.clone())
.collect::<gst::Caps>();
(VIDEO_CAPS.clone(), encoded_video_caps)
} else if media_type == "audio" {
let encoded_audio_caps = settings
.audio_codecs
.iter()
.map(|c| c.caps.clone())
.collect::<gst::Caps>();
(AUDIO_CAPS.clone(), encoded_audio_caps)
} else {
unreachable!()
};
drop(settings);
let caps_with_raw = [caps.clone(), raw_caps.clone(), encoded_caps.clone()]
.into_iter()
.collect::<gst::Caps>();
let downstream_caps = srcpad.peer_query_caps(Some(&caps_with_raw));
if let Some(first_struct) = downstream_caps.structure(0)
&& (first_struct.has_name(raw_caps.structure(0).unwrap().name())
|| !first_struct.has_name("application/x-rtp"))
{
srcpad.imp().set_needs_decoding(true)
}
}
let ghostpad = gst::GhostPad::builder(gst::PadDirection::Src)
.proxy_pad_chain_function(glib::clone!(
#[weak]
element,
#[upgrade_or_panic]
move |pad, parent, buffer| {
let padret = gst::ProxyPad::chain_default(pad, parent, buffer);
let state = element.imp().state.lock().unwrap();
if let Some(ref session) = state.session {
let session = session.0.lock().unwrap();
session.flow_combiner.lock().unwrap().update_flow(padret)
} else {
padret
}
}
))
.proxy_pad_event_function(glib::clone!(
#[weak]
element,
#[weak(rename_to = webrtcpad)]
webrtcbin_pad,
#[upgrade_or_panic]
move |pad, parent, event| {
let event = if let gst::EventView::StreamStart(stream_start) = event.view() {
let state = element.imp().state.lock().unwrap();
if let Some(ref session) = state.session {
session
.0
.lock()
.unwrap()
.get_src_pad_from_webrtcbin_pad(&webrtcpad, &element)
.map(|srcpad| {
gst::event::StreamStart::builder(&srcpad.imp().stream_id())
.seqnum(stream_start.seqnum())
.group_id(
stream_start
.group_id()
.unwrap_or_else(gst::GroupId::next),
)
.build()
})
.unwrap_or(event)
} else {
gst::error!(CAT, obj = element, "no current session");
event
}
} else {
event
};
gst::Pad::event_default(pad, parent, event)
}
))
.build();
let (enable_data_channel_navigation, enable_control_data_channel) = {
let settings = element.imp().settings.lock().unwrap();
(
settings.enable_data_channel_navigation,
settings.enable_control_data_channel,
)
};
if enable_data_channel_navigation || enable_control_data_channel {
webrtcbin_pad.add_probe(
gst::PadProbeType::EVENT_UPSTREAM,
glib::clone!(
#[weak]
element,
#[upgrade_or]
gst::PadProbeReturn::Remove,
move |pad, info| {
let Some(ev) = info.event() else {
return gst::PadProbeReturn::Ok;
};
match ev.view() {
gst::EventView::Navigation(nav) => {
let mut state = element.imp().state.lock().unwrap();
if let Some(ref mut session) = state.session {
let mut session = session.0.lock().unwrap();
if enable_data_channel_navigation {
session.send_navigation_event(
gst_video::NavigationEvent::parse(nav).unwrap(),
&element,
);
}
if enable_control_data_channel {
let transceiver = pad
.property::<gst_webrtc::WebRTCRTPTransceiver>(
"transceiver",
);
let request = utils::ControlRequest::NavigationEvent {
event: gst_video::NavigationEvent::parse(nav).unwrap(),
};
session.send_control_request(
request,
&element,
transceiver.mid().map(|s| s.to_string()),
);
}
}
}
gst::EventView::CustomUpstream(cup) => {
let transceiver =
pad.property::<gst_webrtc::WebRTCRTPTransceiver>("transceiver");
let mut state = element.imp().state.lock().unwrap();
if let Some(ref mut session) = state.session {
let mut session = session.0.lock().unwrap();
if enable_control_data_channel
&& let Some(s) = cup.structure()
&& let Some(structure) =
utils::gvalue_to_json(&s.to_value())
{
let request = utils::ControlRequest::CustomUpstreamEvent {
structure_name: s.name().to_string(),
structure,
};
session.send_control_request(
request,
&element,
transceiver.mid().map(|s| s.to_string()),
);
}
}
}
_ => (),
}
gst::PadProbeReturn::Ok
}
),
);
}
if let Some((srcpad, _)) = srcpad_and_caps {
let signaller = element.imp().signaller();
let producer_id = if signaller
.has_property_with_type("producer-peer-id", Option::<String>::static_type())
{
signaller
.property::<Option<String>>("producer-peer-id")
.or_else(|| webrtcbin_pad.property("msid"))
} else {
None
};
let encoded_filter = element.emit_by_name::<Option<gst::Element>>(
"request-encoded-filter",
&[&producer_id, &srcpad.name(), &srcpad.allowed_caps()],
);
if srcpad.imp().needs_decoding() {
let decodebin = gst::ElementFactory::make("decodebin3")
.build()
.expect("decodebin3 needs to be present!");
let downstream_caps = srcpad.peer_query_caps(None);
gst::debug!(
CAT,
obj = element,
"Stopping decoding at caps {downstream_caps:#?}"
);
decodebin.set_property("caps", &downstream_caps);
bin.add(&decodebin).unwrap();
decodebin.sync_state_with_parent().unwrap();
decodebin.connect_pad_added(glib::clone!(
#[weak]
ghostpad,
move |_webrtcbin, pad| {
if pad.direction() == gst::PadDirection::Sink {
return;
}
ghostpad.set_target(Some(pad)).unwrap();
}
));
gst::debug!(
CAT,
obj = element,
"Decoding for {}",
srcpad.imp().stream_id()
);
if let Some(encoded_filter) = encoded_filter {
let filter_sink_pad = encoded_filter
.static_pad("sink")
.expect("encoded filter must expose a static sink pad");
let parsebin = gst::ElementFactory::make("parsebin")
.build()
.expect("parsebin needs to be present!");
bin.add_many([&parsebin, &encoded_filter]).unwrap();
parsebin.connect_pad_added(move |_, pad| {
pad.link(&filter_sink_pad)
.expect("parsebin ! encoded_filter linking failed");
encoded_filter
.link(&decodebin)
.expect("encoded_filter ! decodebin3 linking failed");
encoded_filter.sync_state_with_parent().unwrap();
});
webrtcbin_pad
.link(&parsebin.static_pad("sink").unwrap())
.expect("webrtcbin ! parsebin linking failed");
parsebin.sync_state_with_parent().unwrap();
} else {
let sinkpad = decodebin
.static_pad("sink")
.expect("decodebin has a sink pad");
webrtcbin_pad
.link(&sinkpad)
.expect("webrtcbin ! decodebin3 linking failed");
}
} else {
gst::debug!(
CAT,
obj = element,
"NO decoding for {}",
srcpad.imp().stream_id()
);
if let Some(encoded_filter) = encoded_filter {
let filter_sink_pad = encoded_filter
.static_pad("sink")
.expect("encoded filter must expose a static sink pad");
let filter_src_pad = encoded_filter
.static_pad("src")
.expect("encoded filter must expose a static src pad");
bin.add(&encoded_filter).unwrap();
webrtcbin_pad
.link(&filter_sink_pad)
.expect("webrtcbin ! encoded_filter linking failed");
encoded_filter.sync_state_with_parent().unwrap();
ghostpad.set_target(Some(&filter_src_pad)).unwrap();
} else {
ghostpad.set_target(Some(webrtcbin_pad)).unwrap();
}
}
srcpad.set_target(Some(&ghostpad)).unwrap();
} else {
gst::debug!(
CAT,
obj = element,
"Unused webrtcbin pad {} {:?}",
webrtcbin_pad.name(),
webrtcbin_pad.current_caps(),
);
}
ghostpad
}
fn get_rtp_caps_for_codecs(
element: &super::BaseWebRTCSrc,
pt: &mut std::ops::Range<i32>,
codecs: &[Codec],
media: &str,
default_clock_rate: i32,
) -> Option<gst::Caps> {
let mut rtp_caps = gst::Caps::new_empty();
{
let caps = rtp_caps.get_mut().unwrap();
for c in codecs.iter() {
let Some(pt) = pt.next() else {
gst::warning!(
CAT,
obj = element,
"exhausted the list of dynamic payload types"
);
return None;
};
let mut st = gst::Structure::builder("application/x-rtp")
.field("media", media)
.field("payload", pt)
.field("clock-rate", c.clock_rate().unwrap_or(default_clock_rate))
.field("encoding-name", c.name.as_str())
.build();
if c.name.eq_ignore_ascii_case("H264") {
st.set("profile-level-id", "42e016");
}
caps.append_structure(st);
}
}
Some(rtp_caps)
}
fn get_rtp_caps_for_pad(
element: &super::BaseWebRTCSrc,
pt: &mut std::ops::Range<i32>,
p: &gst::Pad,
peer_caps: &gst::Caps,
) -> Option<gst::Caps> {
gst::trace!(CAT, obj = element, "{peer_caps:#?}");
let rtp_caps = if peer_caps
.structure(0)
.unwrap()
.has_name("application/x-rtp")
{
let mut p_caps = peer_caps.clone();
p_caps.fixate();
let p_caps = p_caps.make_mut();
gst::trace!(CAT, obj = element, "{p_caps:#?}");
for i in 0..p_caps.size() {
if let Some(s) = p_caps.structure_mut(i) {
let Some(pt) = pt.next() else {
gst::warning!(
CAT,
obj = element,
"exhaused the list of dynamic payload types, not adding transceiver for pad {}",
p.name()
);
return None;
};
s.set("payload", pt);
}
}
p_caps.to_owned()
} else {
let mut caps = gst::Caps::new_empty();
let Some(pt) = pt.next() else {
gst::warning!(
CAT,
obj = element,
"exhaused the list of dynamic payload types, not adding transceiver for pad {}",
p.name()
);
return None;
};
if p.name().contains("video") {
let video_caps = caps.get_mut().unwrap();
let mut s = gst::Structure::builder("application/x-rtp")
.field("media", "video")
.field("payload", pt)
.field("clock-rate", 90000)
.build();
match peer_caps.structure(0).unwrap().name().as_str() {
"video/x-h264" => {
s.set("encoding-name", "H264");
s.set("profile-level-id", "42e016");
}
"video/x-vp8" => {
s.set("encoding-name", "VP8");
}
"video/x-vp9" => {
s.set("encoding-name", "VP9");
}
"video/x-av1" => {
s.set("encoding-name", "AV1");
}
"video/x-h265" => {
s.set("encoding-name", "H265");
}
_ => unimplemented!(),
}
video_caps.append_structure(s);
} else {
let audio_caps = caps.get_mut().unwrap();
let mut s = gst::Structure::builder("application/x-rtp")
.field("media", "audio")
.field("payload", pt)
.field("clock-rate", 48000)
.build();
match peer_caps.structure(0).unwrap().name().as_str() {
"audio/x-opus" => {
s.set("encoding-name", "OPUS");
}
_ => unimplemented!(),
}
audio_caps.append_structure(s);
}
caps
};
Some(rtp_caps)
}
fn generate_offer(&mut self, element: &super::BaseWebRTCSrc) {
let webrtcbin = self.webrtcbin();
let direction = gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly;
let mut pt = 96..127;
let settings = element.imp().settings.lock().unwrap();
let encoded_caps = settings
.video_codecs
.iter()
.chain(settings.audio_codecs.iter())
.map(|c| c.caps.to_owned())
.collect::<gst::Caps>();
let filter_caps = [
encoded_caps.clone(),
RTP_CAPS.clone(),
VIDEO_CAPS.clone(),
AUDIO_CAPS.clone(),
]
.into_iter()
.collect::<gst::Caps>();
if element.src_pads().is_empty() {
if let Some(video_caps) = Self::get_rtp_caps_for_codecs(
element,
&mut pt,
&settings.video_codecs,
"video",
90000,
) {
gst::info!(
CAT,
obj = element,
"Adding video transceiver with caps: {video_caps:#?}"
);
let transceiver = webrtcbin.emit_by_name::<gst_webrtc::WebRTCRTPTransceiver>(
"add-transceiver",
&[&direction, &video_caps],
);
transceiver.set_property("do_nack", settings.do_retransmission);
transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed);
}
if let Some(audio_caps) = Self::get_rtp_caps_for_codecs(
element,
&mut pt,
&settings.audio_codecs,
"audio",
48000,
) {
gst::info!(
CAT,
obj = element,
"Adding video transceiver with caps: {audio_caps:#?}"
);
let transceiver = webrtcbin.emit_by_name::<gst_webrtc::WebRTCRTPTransceiver>(
"add-transceiver",
&[&direction, &audio_caps],
);
transceiver.set_property("do_nack", settings.do_retransmission);
transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed);
}
} else {
for (i, p) in element.src_pads().iter().enumerate() {
gst::trace!(CAT, obj = element, "pad {}", p.name());
if p.peer().is_some() {
let peer_caps = p.peer_query_caps(Some(&filter_caps));
let rtp_caps = if peer_caps.is_any()
|| peer_caps.structure(0).unwrap().has_name("video/x-raw")
|| peer_caps.structure(0).unwrap().has_name("audio/x-raw")
{
if p.name().contains("video") {
Self::get_rtp_caps_for_codecs(
element,
&mut pt,
&settings.video_codecs,
"video",
90000,
)
} else {
Self::get_rtp_caps_for_codecs(
element,
&mut pt,
&settings.audio_codecs,
"audio",
48000,
)
}
} else {
Self::get_rtp_caps_for_pad(element, &mut pt, p, &peer_caps)
};
if let Some(rtp_caps) = rtp_caps {
gst::info!(
CAT,
obj = element,
"Adding transceiver with caps: {rtp_caps:#?}"
);
let transceiver = webrtcbin
.emit_by_name::<gst_webrtc::WebRTCRTPTransceiver>(
"add-transceiver",
&[&direction, &rtp_caps],
);
transceiver.set_property("do_nack", settings.do_retransmission);
transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed);
if let Ok(ghost) = p.clone().downcast::<WebRTCSrcPad>() {
let stream_id = self.get_stream_id(None, Some(i as u32)).unwrap();
ghost.imp().set_stream_id(&stream_id);
self.pending_srcpads
.insert(stream_id, (ghost.clone(), rtp_caps.to_owned()));
} else {
gst::error!(
CAT,
obj = element,
"failed to get downcasted version of ghost, not adding to pending_srcpads"
);
}
}
}
}
}
let webrtcbin_weak = webrtcbin.downgrade();
let promise = gst::Promise::with_change_func(glib::clone!(
#[weak (rename_to = ele)]
element,
#[strong(rename_to = sess_id)]
self.id,
move |reply| {
let Some(webrtcbin) = webrtcbin_weak.upgrade() else {
gst::error!(CAT, obj = ele, "generate offer::failed to get webrtcbin");
ele.imp().signaller().end_session(sess_id.as_str());
return;
};
let reply = match reply {
Ok(Some(reply)) => reply,
Ok(None) => {
gst::error!(
CAT,
obj = ele,
"generate offer::Promise returned with no reply"
);
ele.imp().signaller().end_session(sess_id.as_str());
return;
}
Err(e) => {
gst::error!(
CAT,
obj = ele,
"generate offer::Promise returned with error {:?}",
e
);
ele.imp().signaller().end_session(sess_id.as_str());
return;
}
};
match reply
.value("offer")
.map(|offer| offer.get::<gst_webrtc::WebRTCSessionDescription>().unwrap())
{
Ok(offer_sdp) => {
webrtcbin.emit_by_name::<()>(
"set-local-description",
&[&offer_sdp, &None::<gst::Promise>],
);
gst::log!(
CAT,
obj = ele,
"Sending SDP, {}",
offer_sdp.sdp().to_string()
);
let signaller = ele.imp().signaller();
signaller.send_sdp(sess_id.as_str(), &offer_sdp);
}
_ => {
let error = reply
.value("error")
.expect("structure must have an error value")
.get::<glib::Error>()
.expect("value must be a GLib error");
gst::error!(
CAT,
obj = ele,
"generate offer::Promise returned with error: {}",
error
);
ele.imp().signaller().end_session(sess_id.as_str());
}
}
}
));
webrtcbin
.clone()
.emit_by_name::<()>("create-offer", &[&None::<gst::Structure>, &promise]);
}
fn remote_description_set(
&mut self,
element: &super::BaseWebRTCSrc,
desc: &gst_webrtc::WebRTCSessionDescription,
) -> (gst::Promise, gst::Bin) {
let sdp = desc.sdp();
let desc_type = desc.type_();
let webrtcbin = self.webrtcbin();
let request_src_pads = element.src_pads().clone();
for (i, media) in sdp.medias().enumerate() {
if media.attributes().any(|attr| attr.key() == "inactive") {
gst::warning!(
CAT,
obj = element,
"Skipping inactive media: {i} in the remote description"
);
if i < request_src_pads.len() {
gst::info!(
CAT,
obj = element,
"Sending EOS on pad {}",
request_src_pads[i].name()
);
if !request_src_pads[i].push_event(gst::event::Eos::new()) {
gst::error!(
CAT,
obj = element,
"Failed to push EOS on {}",
request_src_pads[i].name()
);
}
}
continue;
}
if !request_src_pads.is_empty() && desc_type == gst_webrtc::WebRTCSDPType::Answer {
gst::info!(
CAT,
obj = element,
"webrtcsrc is the offerer and has src pads already, so not consuming remote answer medias",
);
break;
}
let (codec_names, do_retransmission) = {
let settings = element.imp().settings.lock().unwrap();
(
settings
.video_codecs
.iter()
.chain(settings.audio_codecs.iter())
.map(|codec| codec.name.clone())
.collect::<HashSet<String>>(),
settings.do_retransmission,
)
};
let caps = media
.formats()
.filter_map(|format| {
format.parse::<i32>().ok().and_then(|pt| {
let mut mediacaps = media.caps_from_media(pt)?;
let s = mediacaps.structure(0).unwrap();
if !codec_names.contains(s.get::<&str>("encoding-name").ok()?) {
return None;
}
let mut filtered_s = gst::Structure::new_empty("application/x-rtp");
filtered_s.extend(s.iter().filter_map(|(key, value)| {
if key.starts_with("rtcp-") {
None
} else {
Some((key, value.to_owned()))
}
}));
if media
.attributes_to_caps(mediacaps.get_mut().unwrap())
.is_err()
{
gst::warning!(
CAT,
obj = element,
"Failed to retrieve attributes from media!"
);
return None;
}
let s = mediacaps.structure(0).unwrap();
filtered_s.extend(s.iter().filter_map(|(key, value)| {
if key.starts_with("extmap-") {
return Some((key, value.to_owned()));
}
None
}));
Some(filtered_s)
})
})
.collect::<gst::Caps>();
if !caps.is_empty() {
let stream_id = self.get_stream_id(None, Some(i as u32)).unwrap();
let mut matching_pad_found = false;
let media_name = media.media().unwrap_or_else(|| {
caps.structure(0)
.and_then(|s| s.get::<&str>("media").ok())
.unwrap_or("none")
});
if i < request_src_pads.len() && request_src_pads[i].name().contains(media_name) {
let mut peer_caps = request_src_pads[i].peer_query_caps(None);
peer_caps.fixate();
if peer_caps.is_any() {
matching_pad_found = true;
} else if let Some(s) = peer_caps.structure(0) {
for c in caps.iter() {
gst::trace!(
CAT,
obj = element,
"Comparing media caps: {c:#?} with peer caps: {peer_caps:#?} for pad {}",
request_src_pads[i].name()
);
let codec_name = c.get::<&str>("encoding-name").unwrap();
let media_type = c.get::<&str>("media").unwrap();
gst::trace!(
CAT,
obj = element,
"Comparing with codec name: {codec_name} and {s:?}",
);
if s.name().contains(codec_name.to_lowercase().as_str())
|| (s.name().contains(media_type))
|| s.has_field("encoding-name")
&& s.get::<gst::List>("encoding-name").map_or_else(
|_| match s.get::<&str>("encoding-name") {
Ok(encoding_name) => encoding_name == codec_name,
_ => false,
},
|encoding_names| {
encoding_names.iter().any(|v| {
v.get::<&str>().is_ok_and(|encoding_name| {
encoding_name == codec_name
})
})
},
)
{
gst::trace!(
CAT,
obj = element,
"Using media: {media:#?} for pad {} with peer caps: {peer_caps:#?}",
request_src_pads[i].name()
);
let ws_pad =
request_src_pads[i].downcast_ref::<WebRTCSrcPad>().unwrap();
ws_pad.imp().set_stream_id(&stream_id);
self.pending_srcpads
.insert(stream_id.clone(), (ws_pad.clone(), caps.to_owned()));
matching_pad_found = true;
break;
}
}
}
}
if !request_src_pads.is_empty()
|| element
.imp()
.create_and_probe_src_pad(&caps, &stream_id, self)
{
if desc_type == gst_webrtc::WebRTCSDPType::Offer {
gst::info!(
CAT,
obj = element,
"Getting transceiver for {stream_id} and index {i} with caps: {caps:#?}"
);
let mut transceiver = None;
let mut idx = 0i32;
loop {
let Some(to_check) = webrtcbin
.emit_by_name::<Option<gst_webrtc::WebRTCRTPTransceiver>>(
"get-transceiver",
&[&idx],
)
else {
break;
};
let mline = to_check.property::<u32>("mlineindex");
if mline as usize == i {
transceiver = Some(to_check);
break;
}
idx += 1;
}
let transceiver = transceiver.unwrap_or_else(|| {
gst::warning!(CAT, "Transceiver for idx {i} does not exist, GStreamer <= 1.24, adding it ourself");
webrtcbin.emit_by_name::<gst_webrtc::WebRTCRTPTransceiver>(
"add-transceiver",
&[&gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly, &caps])
});
if !request_src_pads.is_empty() && !matching_pad_found {
gst::warning!(
CAT,
obj = element,
"Setting transceiver for media {i} as inactive, no matching pad found",
);
transceiver
.set_direction(gst_webrtc::WebRTCRTPTransceiverDirection::Inactive);
if i < request_src_pads.len() {
gst::info!(
CAT,
obj = element,
"Sending EOS on pad {}",
request_src_pads[i].name()
);
if !request_src_pads[i].push_event(gst::event::Eos::new()) {
gst::error!(
CAT,
obj = element,
"Failed to push EOS on {}",
request_src_pads[i].name()
);
}
}
} else {
transceiver.set_property("do_nack", do_retransmission);
transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed);
transceiver.set_property("codec-preferences", caps);
}
} else {
}
} else {
gst::error!(
CAT,
obj = element,
"Failed to create src pad with caps {:?}",
caps
);
}
} else {
gst::info!(
CAT,
obj = element,
"Not using media: {media:#?} as it doesn't match our codec restrictions"
);
}
}
let promise = gst::Promise::with_change_func(glib::clone!(
#[weak]
element,
move |reply| {
if desc_type == gst_webrtc::WebRTCSDPType::Offer {
let state = element.imp().state.lock().unwrap();
gst::info!(CAT, obj = element, "got answer");
let Some(ref session) = state.session else {
gst::error!(CAT, obj = element, "no current session");
return;
};
session.0.lock().unwrap().on_answer_created(reply, &element);
} else {
gst::log!(
CAT,
obj = element,
"Nothing to do in the promise in case of an answer"
);
return;
}
}
));
(promise, webrtcbin.clone())
}
fn handle_remote_description(
&mut self,
desc: &gst_webrtc::WebRTCSessionDescription,
element: &super::BaseWebRTCSrc,
) -> (gst::Promise, gst::Bin) {
gst::debug!(
CAT,
obj = element,
"Got remote description: {}",
desc.sdp().to_string()
);
let promise = gst::Promise::with_change_func(glib::clone!(
#[weak]
element,
#[strong]
desc,
move |_| {
let mut state = element.imp().state.lock().unwrap();
gst::info!(CAT, obj = element, "got {:?}", desc.type_());
let Some(ref mut session) = state.session else {
gst::error!(CAT, obj = element, "no current session");
return;
};
let (promise, webrtcbin) = session
.0
.lock()
.unwrap()
.remote_description_set(&element, &desc);
drop(state);
if desc.type_() == gst_webrtc::WebRTCSDPType::Offer {
webrtcbin
.emit_by_name::<()>("create-answer", &[&None::<gst::Structure>, &promise]);
} else {
promise.reply(None);
}
}
));
(promise, self.webrtcbin().clone())
}
fn on_answer_created(
&self,
reply: Result<Option<&gst::StructureRef>, gst::PromiseError>,
element: &super::BaseWebRTCSrc,
) {
let reply = match reply {
Ok(Some(reply)) => {
if !reply.has_field_with_type(
"answer",
gst_webrtc::WebRTCSessionDescription::static_type(),
) {
gst::element_error!(
element,
gst::StreamError::Failed,
["create-answer::Promise returned with no reply"]
);
return;
} else if reply.has_field_with_type("error", glib::Error::static_type()) {
gst::element_error!(
element,
gst::LibraryError::Failed,
["create-offer::Promise returned with error: {:?}", reply]
);
return;
}
reply
}
Ok(None) => {
gst::element_error!(
element,
gst::StreamError::Failed,
["create-answer::Promise returned with no reply"]
);
return;
}
Err(err) => {
gst::element_error!(
element,
gst::LibraryError::Failed,
["create-answer::Promise returned with error {:?}", err]
);
return;
}
};
let answer = reply
.value("answer")
.unwrap()
.get::<gst_webrtc::WebRTCSessionDescription>()
.expect("Invalid argument");
self.webrtcbin
.emit_by_name::<()>("set-local-description", &[&answer, &None::<gst::Promise>]);
gst::log!(
CAT,
obj = element,
"Sending SDP, {}",
answer.sdp().to_string()
);
let signaller = element.imp().signaller();
signaller.send_sdp(&self.id, &answer);
}
fn on_data_channel(&mut self, data_channel: glib::Object, element: &super::BaseWebRTCSrc) {
gst::info!(CAT, obj = element, "Received data channel {data_channel:?}");
let Ok(data_channel) = data_channel.dynamic_cast::<WebRTCDataChannel>() else {
return;
};
let Some(label) = data_channel.label() else {
return;
};
if label == INPUT_DATA_CHANNEL_LABEL {
self.navigation_data_channel = Some(data_channel);
} else if label == CONTROL_DATA_CHANNEL_LABEL {
self.control_data_channel = Some(data_channel);
} else {
gst::debug!(
CAT,
obj = element,
"Not adding data channel with unknown label: {label}"
)
}
}
fn on_ice_candidate(
&self,
sdp_m_line_index: u32,
candidate: String,
element: &super::BaseWebRTCSrc,
) {
let signaller = element.imp().signaller();
signaller.add_ice(&self.id, &candidate, sdp_m_line_index, None::<String>);
}
fn handle_ice(
&self,
sdp_m_line_index: Option<u32>,
_sdp_mid: Option<String>,
candidate: &str,
element: &super::BaseWebRTCSrc,
) {
let sdp_m_line_index = match sdp_m_line_index {
Some(m_line) => m_line,
None => {
gst::error!(CAT, obj = element, "No mandatory mline");
return;
}
};
gst::log!(CAT, obj = element, "Got ice candidate: {candidate}",);
self.webrtcbin()
.emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]);
}
}
impl BaseWebRTCSrc {
fn signaller(&self) -> Signallable {
self.settings.lock().unwrap().signaller.clone()
}
fn unprepare(&self) -> Result<(), Error> {
gst::info!(CAT, imp = self, "unpreparing");
let mut state = self.state.lock().unwrap();
let session = state.end_session(&self.obj());
drop(state);
gst::debug!(CAT, imp = self, "Ending sessions");
if let Some(session) = session {
self.signaller().end_session(&session.0.lock().unwrap().id);
}
gst::debug!(CAT, imp = self, "All sessions have started finalizing");
self.maybe_stop_signaller();
let finalizing_session_opt = self.state.lock().unwrap().finalizing_session.clone();
let (finalizing_session, cvar) = &*finalizing_session_opt;
let mut finalizing_session = finalizing_session.lock().unwrap();
while finalizing_session.is_some() {
finalizing_session = cvar.wait(finalizing_session).unwrap();
}
gst::debug!(CAT, imp = self, "All sessions are done finalizing");
Ok(())
}
fn connect_signaller(&self, signaller: &Signallable) {
let instance = &*self.obj();
let _ = self
.state
.lock()
.unwrap()
.signaller_signals
.insert(SignallerSignals {
error: signaller.connect_closure(
"error",
false,
glib::closure!(
#[watch]
instance,
move |_signaller: glib::Object, error: String| {
gst::element_error!(
instance,
gst::StreamError::Failed,
["Signalling error: {}", error]
);
}
),
),
session_started: signaller.connect_closure(
"session-started",
false,
glib::closure!(
#[watch]
instance,
move |_signaller: glib::Object, session_id: &str, _peer_id: &str| {
let imp = instance.imp();
gst::info!(CAT, imp = imp, "session started");
let _ = imp.start_session(session_id);
}
),
),
session_ended: signaller.connect_closure(
"session-ended",
false,
glib::closure!(
#[watch]
instance,
move |_signaller: glib::Object, _session_id: &str| {
let this = instance.imp();
if let Err(e) = this.remove_session() {
gst::error!(
CAT,
imp = this,
" Failed to end session: {e}"
);
return false;
}
gst::log!(CAT, imp = this, "Session cleaned up");
true
}
),
),
session_requested: signaller.connect_closure(
"session-requested",
false,
glib::closure!(#[watch] instance, move |_signaler: glib::Object, _session_id: &str, _peer_id: &str, offer: Option<&gst_webrtc::WebRTCSessionDescription>|{
if offer.is_none() {
let this = instance.imp();
let state = this.state.lock().unwrap();
let Some(ref session) = state.session else {
gst::error!(CAT, imp = this, "no current session");
return
};
session.0.lock().unwrap().generate_offer(&this.obj());
}
}),
),
request_meta: signaller.connect_closure(
"request-meta",
false,
glib::closure!(
#[watch]
instance,
move |_signaller: glib::Object| -> Option<gst::Structure> {
instance.imp().settings.lock().unwrap().meta.clone()
}
),
),
session_description: signaller.connect_closure(
"session-description",
false,
glib::closure!(
#[watch]
instance,
move |_signaller: glib::Object,
_session_id: &str,
desc: &gst_webrtc::WebRTCSessionDescription| {
match desc.type_() {
gst_webrtc::WebRTCSDPType::Offer | gst_webrtc::WebRTCSDPType::Answer => {
let this = instance.imp();
gst::info!(CAT, imp = this, "got sdp : {:?}", desc.type_());
let mut state = this.state.lock().unwrap();
let Some(ref mut session) = state.session else {
gst::error!(CAT, imp = this, "no current session");
return;
};
let (promise, webrtcbin) = session.0.lock().unwrap().handle_remote_description(desc, &this.obj());
drop(state);
webrtcbin
.emit_by_name::<()>("set-remote-description", &[&desc, &promise]);
},
_ => {
unimplemented!("{:?} type remote description not handled", desc.type_());
},
}
}
),
),
handle_ice: signaller.connect_closure(
"handle-ice",
false,
glib::closure!(
#[watch]
instance,
move |_signaller: glib::Object,
_session_id: &str,
sdp_m_line_index: u32,
_sdp_mid: Option<String>,
candidate: &str| {
let this = instance.imp();
let state = this.state.lock().unwrap();
let Some(ref session) = state.session else {
gst::error!(CAT, imp = this, "no current session");
return;
};
session.0.lock().unwrap().handle_ice(
Some(sdp_m_line_index),
None,
candidate,
&this.obj(),
);
}
),
),
});
}
fn create_and_probe_src_pad(
&self,
caps: &gst::Caps,
stream_id: &str,
session: &mut SessionInner,
) -> bool {
gst::log!(CAT, "Creating pad for {caps:?}, stream: {stream_id}");
let obj = self.obj();
let media_type = caps
.structure(0)
.expect("Passing empty caps is invalid")
.get::<&str>("media")
.expect("Only caps with a `media` field are expected when creating the pad");
let (template, name) = if media_type == "video" {
(
obj.pad_template("video_%u").unwrap(),
format!(
"video_{}",
session.n_video_pads.fetch_add(1, Ordering::SeqCst)
),
)
} else if media_type == "audio" {
(
obj.pad_template("audio_%u").unwrap(),
format!(
"audio_{}",
session.n_audio_pads.fetch_add(1, Ordering::SeqCst)
),
)
} else {
gst::info!(
CAT,
imp = self,
"Not an audio or video media {media_type:?}"
);
return false;
};
let ghost = gst::GhostPad::builder_from_template(&template)
.name(name)
.build()
.downcast::<WebRTCSrcPad>()
.unwrap();
ghost.imp().set_stream_id(stream_id);
session
.pending_srcpads
.insert(stream_id.to_string(), (ghost.clone(), caps.clone()));
true
}
fn maybe_start_signaller(&self) {
let obj = self.obj();
let mut state = self.state.lock().unwrap();
if state.signaller_state == SignallerState::Stopped
&& obj.current_state() >= gst::State::Paused
{
self.signaller().start();
gst::info!(CAT, imp = self, "Started signaller");
state.signaller_state = SignallerState::Started;
}
}
fn maybe_stop_signaller(&self) {
let mut state = self.state.lock().unwrap();
if state.signaller_state == SignallerState::Started {
state.signaller_state = SignallerState::Stopped;
drop(state);
self.signaller().stop();
gst::info!(CAT, imp = self, "Stopped signaller");
}
}
pub fn set_signaller(&self, signaller: Signallable) -> Result<(), Error> {
let sigobj = signaller.clone();
let mut settings = self.settings.lock().unwrap();
self.connect_signaller(&sigobj);
settings.signaller = signaller;
Ok(())
}
fn start_session(&self, session_id: &str) -> Result<(), Error> {
let state = self.state.lock().unwrap();
if state.session.is_some() {
return Err(anyhow::anyhow!("webrtcsrc only supports a single session"));
};
drop(state);
let session = SessionInner::new(session_id)?;
let webrtcbin = session.webrtcbin();
{
let settings = self.settings.lock().unwrap();
if let Some(stun_server) = settings.stun_server.as_ref() {
webrtcbin.set_property("stun-server", stun_server);
}
for turn_server in settings.turn_servers.iter() {
webrtcbin.emit_by_name::<bool>("add-turn-server", &[&turn_server]);
}
}
let bin = gst::Bin::new();
bin.connect_pad_removed(glib::clone!(
#[weak(rename_to = this)]
self,
move |_, pad| {
let mut state = this.state.lock().unwrap();
if let Some(ref mut session) = state.session {
let session = session.0.lock().unwrap();
session.flow_combiner.lock().unwrap().remove_pad(pad);
};
}
));
bin.connect_pad_added(glib::clone!(
#[weak(rename_to = this)]
self,
move |_, pad| {
let mut state = this.state.lock().unwrap();
if let Some(ref mut session) = state.session {
let session = session.0.lock().unwrap();
session.flow_combiner.lock().unwrap().add_pad(pad);
};
}
));
webrtcbin.connect_pad_added(glib::clone!(
#[weak(rename_to = this)]
self,
#[weak]
bin,
move |_webrtcbin, pad| {
if pad.direction() == gst::PadDirection::Sink {
return;
}
let mut state = this.state.lock().unwrap();
let Some(ref mut session) = state.session else {
gst::error!(CAT, imp = this, "no current session");
return;
};
let bin_ghostpad =
session
.0
.lock()
.unwrap()
.handle_webrtc_src_pad(&bin, pad, &this.obj());
drop(state);
bin.add_pad(&bin_ghostpad)
.expect("Adding ghostpad to the bin should always work");
}
));
webrtcbin.connect_pad_removed(glib::clone!(
#[weak(rename_to = this)]
self,
move |_webrtcbin, pad| {
let mut state = this.state.lock().unwrap();
if let Some(ref mut session) = state.session {
let session = session.0.lock().unwrap();
session.flow_combiner.lock().unwrap().remove_pad(pad);
};
}
));
webrtcbin.connect_closure(
"on-ice-candidate",
false,
glib::closure!(
#[weak(rename_to = this)]
self,
move |_webrtcbin: gst::Bin, sdp_m_line_index: u32, candidate: String| {
let mut state = this.state.lock().unwrap();
let Some(ref mut session) = state.session else {
gst::error!(CAT, imp = this, "no current session");
return;
};
let session = session.0.lock().unwrap();
session.on_ice_candidate(sdp_m_line_index, candidate, &this.obj());
}
),
);
webrtcbin.connect_closure(
"on-data-channel",
false,
glib::closure!(
#[weak(rename_to = this)]
self,
move |_webrtcbin: gst::Bin, data_channel: glib::Object| {
let mut state = this.state.lock().unwrap();
let Some(ref mut session) = state.session else {
gst::error!(CAT, imp = this, "no current session");
return;
};
let mut session = session.0.lock().unwrap();
session.on_data_channel(data_channel, &this.obj());
}
),
);
webrtcbin.connect_notify(
Some("ice-connection-state"),
glib::clone!(
#[weak(rename_to = this)]
self,
move |webrtcbin, _pspec| {
let state = webrtcbin
.property::<gst_webrtc::WebRTCICEConnectionState>("ice-connection-state");
match state {
gst_webrtc::WebRTCICEConnectionState::Failed => {
gst::warning!(CAT, imp = this, "Ice connection state failed");
let _ = this.remove_session();
}
_ => {
gst::log!(CAT, imp = this, "Ice connection state changed: {:?}", state);
}
}
}
),
);
bin.add(&webrtcbin).unwrap();
self.obj().add(&bin).context("Could not add `webrtcbin`")?;
bin.sync_state_with_parent().unwrap();
self.signaller()
.emit_by_name::<()>("webrtcbin-ready", &[&session_id, &webrtcbin]);
let mut state = self.state.lock().unwrap();
state.session = Some(Session(Arc::new(Mutex::new(session))));
Ok(())
}
fn remove_session(&self) -> Result<(), Error> {
let mut state = self.state.lock().unwrap();
if state.session.is_none() {
return Err(anyhow::anyhow!("No current session"));
}
self.flush_src_pad()?;
if let Some(session) = state.end_session(&self.obj()) {
drop(state);
let session = session.0.lock().unwrap();
self.signaller().end_session(&session.id);
}
self.remove_src_pad()?;
Ok(())
}
fn flush_src_pad(&self) -> Result<(), Error> {
let obj = self.obj();
for pad in obj.src_pads().iter() {
if !pad.push_event(gst::event::FlushStart::new()) {
return Err(anyhow::anyhow!(
"Failed to send flush-start for pad: {}",
pad.name()
));
}
}
Ok(())
}
fn remove_src_pad(&self) -> Result<(), Error> {
let obj = self.obj();
for pad in obj.src_pads().iter() {
obj.remove_pad(pad)
.map_err(|err| anyhow::anyhow!("Couldn't remove pad? {err:?}"))?;
gst::log!(CAT, imp = self, "Removed pad {}", pad.name());
}
Ok(())
}
}
impl ElementImpl for BaseWebRTCSrc {
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: LazyLock<Vec<gst::PadTemplate>> = LazyLock::new(|| {
let mut video_caps_builder = gst::Caps::builder_full()
.structure_with_any_features(VIDEO_CAPS.structure(0).unwrap().to_owned())
.structure(RTP_CAPS.structure(0).unwrap().to_owned());
for codec in Codecs::video_codecs().filter(|codec| !codec.is_raw) {
video_caps_builder =
video_caps_builder.structure(codec.caps.structure(0).unwrap().to_owned());
}
let mut audio_caps_builder = gst::Caps::builder_full()
.structure_with_any_features(AUDIO_CAPS.structure(0).unwrap().to_owned())
.structure(RTP_CAPS.structure(0).unwrap().to_owned());
for codec in Codecs::audio_codecs().filter(|codec| !codec.is_raw) {
audio_caps_builder =
audio_caps_builder.structure(codec.caps.structure(0).unwrap().to_owned());
}
let video_caps = video_caps_builder.build();
let audio_caps = audio_caps_builder.build();
vec![
gst::PadTemplate::with_gtype(
"video_%u",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&video_caps,
WebRTCSrcPad::static_type(),
)
.unwrap(),
gst::PadTemplate::with_gtype(
format!("{}%u", VIDEO_REQUEST_PAD_NAME_PREFIX).as_str(),
gst::PadDirection::Src,
gst::PadPresence::Request,
&video_caps,
WebRTCSrcPad::static_type(),
)
.unwrap(),
gst::PadTemplate::with_gtype(
"audio_%u",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&audio_caps,
WebRTCSrcPad::static_type(),
)
.unwrap(),
gst::PadTemplate::with_gtype(
format!("{}%u", AUDIO_REQUEST_PAD_NAME_PREFIX).as_str(),
gst::PadDirection::Src,
gst::PadPresence::Request,
&audio_caps,
WebRTCSrcPad::static_type(),
)
.unwrap(),
]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
let obj = &*self.obj();
let mut ret = self.parent_change_state(transition);
match transition {
gst::StateChange::PausedToReady => {
if let Err(err) = self.unprepare() {
gst::element_error!(
obj,
gst::StreamError::Failed,
["Failed to unprepare: {}", err]
);
return Err(gst::StateChangeError);
}
}
gst::StateChange::ReadyToPaused => {
ret = Ok(gst::StateChangeSuccess::NoPreroll);
}
gst::StateChange::PlayingToPaused => {
ret = Ok(gst::StateChangeSuccess::NoPreroll);
}
gst::StateChange::PausedToPlaying => {
self.maybe_start_signaller();
}
_ => (),
}
ret
}
fn send_event(&self, event: gst::Event) -> bool {
match event.view() {
gst::EventView::Navigation(ev) => {
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
if !settings.enable_data_channel_navigation && !settings.enable_control_data_channel
{
return true;
}
if let Some(ref mut session) = state.session {
if settings.enable_data_channel_navigation {
session.0.lock().unwrap().send_navigation_event(
gst_video::NavigationEvent::parse(ev).unwrap(),
&self.obj(),
);
}
if settings.enable_control_data_channel {
let request = utils::ControlRequest::NavigationEvent {
event: gst_video::NavigationEvent::parse(ev).unwrap(),
};
session
.0
.lock()
.unwrap()
.send_control_request(request, &self.obj(), None);
}
}
true
}
_ => true,
}
}
fn request_new_pad(
&self,
templ: &gst::PadTemplate,
name: Option<&str>,
caps: Option<&gst::Caps>,
) -> Option<gst::Pad> {
gst::trace!(CAT, imp = self, "{:?} {:?} {:?}", templ.name(), caps, name);
let pad_name = if templ.name().starts_with(VIDEO_REQUEST_PAD_NAME_PREFIX) {
format!(
"{}{}",
VIDEO_REQUEST_PAD_NAME_PREFIX,
self.n_request_video_pads.fetch_add(1, Ordering::SeqCst)
)
} else if templ.name().starts_with(AUDIO_REQUEST_PAD_NAME_PREFIX) {
format!(
"{}{}",
AUDIO_REQUEST_PAD_NAME_PREFIX,
self.n_request_audio_pads.fetch_add(1, Ordering::SeqCst)
)
} else {
return None;
};
if let Some(n) = name {
gst::info!(
CAT,
imp = self,
"not using the pad name {n} requested, pad serial is autoenumerated internally"
);
}
gst::info!(CAT, imp = self, "Creating new pad {pad_name}");
let src_pad = gst::GhostPad::builder_from_template(templ)
.name(pad_name)
.build()
.downcast::<WebRTCSrcPad>()
.unwrap();
src_pad.set_active(true).unwrap();
src_pad.use_fixed_caps();
self.obj().add_pad(&src_pad).unwrap();
Some(src_pad.into())
}
fn release_pad(&self, pad: &gst::Pad) {
let element = self.obj();
let ws = element
.upcast_ref::<crate::webrtcsrc::BaseWebRTCSrc>()
.imp();
gst::info!(CAT, obj = element, "releasing {pad:?}");
let state = ws.state.lock().unwrap();
if let Some(s) = &state.session {
let mut session = s.0.lock().unwrap();
let _ = session.take_pending_src_pad(pad);
}
drop(state);
if let Err(e) = self.obj().remove_pad(pad) {
gst::error!(CAT, obj = element, "could remove {pad:?}: {e:?}");
};
}
}
impl GstObjectImpl for BaseWebRTCSrc {}
impl BinImpl for BaseWebRTCSrc {}
impl ChildProxyImpl for BaseWebRTCSrc {
fn child_by_index(&self, index: u32) -> Option<glib::Object> {
if index == 0 {
Some(self.signaller().upcast())
} else {
None
}
}
fn children_count(&self) -> u32 {
1
}
fn child_by_name(&self, name: &str) -> Option<glib::Object> {
match name {
"signaller" => {
gst::info!(CAT, imp = self, "Getting signaller");
Some(self.signaller().upcast())
}
_ => None,
}
}
}
#[derive(PartialEq)]
enum SignallerState {
Started,
Stopped,
}
struct SessionInner {
id: String,
webrtcbin: gst::Element,
navigation_data_channel: Option<WebRTCDataChannel>,
control_data_channel: Option<WebRTCDataChannel>,
n_video_pads: AtomicU16,
n_audio_pads: AtomicU16,
flow_combiner: Mutex<gst_base::UniqueFlowCombiner>,
request_counter: u64,
pending_srcpads: HashMap<String, (WebRTCSrcPad, gst::Caps)>,
}
#[derive(Clone)]
struct Session(Arc<Mutex<SessionInner>>);
struct State {
session: Option<Session>,
signaller_state: SignallerState,
signaller_signals: Option<SignallerSignals>,
finalizing_session: Arc<(Mutex<Option<Session>>, Condvar)>,
}
impl Default for State {
fn default() -> Self {
Self {
signaller_state: SignallerState::Stopped,
session: None,
signaller_signals: Default::default(),
finalizing_session: Arc::new((Mutex::new(None), Condvar::new())),
}
}
}
impl State {
fn finalize_session(&mut self, element: &super::BaseWebRTCSrc, session: Session) {
let inner = session.0.lock().unwrap();
drop(inner);
gst::info!(CAT, "Ending session");
let finalizing_session_opt = self.finalizing_session.clone();
let (finalizing_session, _cvar) = &*finalizing_session_opt;
*finalizing_session.lock().unwrap() = Some(session);
let element = element.clone();
RUNTIME.spawn_blocking(move || {
let (finalizing_session, cvar) = &*finalizing_session_opt;
let mut finalizing_session = finalizing_session.lock().unwrap();
let session = finalizing_session.take().unwrap();
let session = session.0.lock().unwrap();
let bin = session
.webrtcbin()
.parent()
.and_downcast::<gst::Bin>()
.unwrap();
let _ = bin.set_state(gst::State::Null);
if let Some(webrtcsrc) = bin.parent()
&& let Err(e) = webrtcsrc.downcast_ref::<gst::Bin>().unwrap().remove(&bin)
{
gst::error!(CAT, obj = bin, "Failed to remove bin for session: {e}",);
}
cvar.notify_one();
gst::debug!(CAT, obj = element, "Session ended");
});
}
fn end_session(&mut self, element: &super::BaseWebRTCSrc) -> Option<Session> {
if let Some(session) = self.session.take() {
self.finalize_session(element, session.clone());
return Some(session);
}
None
}
}
#[derive(Default)]
pub struct WebRTCSrc {}
impl ObjectImpl for WebRTCSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![glib::ParamSpecBoolean::builder("connect-to-first-producer")
.nick("Connect to first peer")
.blurb(
"When enabled, automatically connect to the first peer that becomes available \
if no 'peer-id' is specified.",
)
.default_value(false)
.mutable_ready()
.build()]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let obj = self.obj();
let base = obj.upcast_ref::<super::BaseWebRTCSrc>().imp();
match pspec.name() {
"connect-to-first-producer" => base
.signaller()
.downcast::<Signaller>()
.unwrap()
.imp()
.set_connect_to_first_producer(value.get().unwrap()),
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let obj = self.obj();
let base = obj.upcast_ref::<super::BaseWebRTCSrc>().imp();
match pspec.name() {
"connect-to-first-producer" => base
.signaller()
.downcast::<Signaller>()
.unwrap()
.imp()
.connect_to_first_producer()
.into(),
_ => unimplemented!(),
}
}
}
impl GstObjectImpl for WebRTCSrc {}
impl BinImpl for WebRTCSrc {}
impl ElementImpl for WebRTCSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> = LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"WebRTCSrc",
"Source/Network/WebRTC",
"WebRTC src",
"Thibault Saunier <tsaunier@igalia.com>",
)
});
Some(&*ELEMENT_METADATA)
}
}
impl BaseWebRTCSrcImpl for WebRTCSrc {}
impl URIHandlerImpl for WebRTCSrc {
const URI_TYPE: gst::URIType = gst::URIType::Src;
fn protocols() -> &'static [&'static str] {
&["gstwebrtc", "gstwebrtcs"]
}
fn uri(&self) -> Option<String> {
let obj = self.obj();
let base = obj.upcast_ref::<super::BaseWebRTCSrc>().imp();
base.signaller().property::<Option<String>>("uri")
}
fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
let uri = Url::from_str(uri)
.map_err(|err| glib::Error::new(gst::URIError::BadUri, &format!("{err:?}")))?;
let socket_scheme = match uri.scheme() {
"gstwebrtc" => Ok("ws"),
"gstwebrtcs" => Ok("wss"),
_ => Err(glib::Error::new(
gst::URIError::BadUri,
&format!("Invalid protocol: {}", uri.scheme()),
)),
}?;
let mut url_str = uri.to_string();
url_str.replace_range(0..uri.scheme().len(), socket_scheme);
let obj = self.obj();
let base = obj.upcast_ref::<super::BaseWebRTCSrc>().imp();
base.signaller().set_property("uri", &url_str);
Ok(())
}
}
#[glib::object_subclass]
impl ObjectSubclass for WebRTCSrc {
const NAME: &'static str = "GstWebRTCSrc";
type Type = super::WebRTCSrc;
type ParentType = super::BaseWebRTCSrc;
type Interfaces = (gst::URIHandler,);
}
#[cfg(feature = "whip")]
pub(super) mod whip {
use super::*;
use crate::whip_signaller::WhipServerSignaller;
#[derive(Default)]
pub struct WhipServerSrc {}
impl ObjectImpl for WhipServerSrc {
fn constructed(&self) {
self.parent_constructed();
let element = self.obj();
let ws = element
.upcast_ref::<crate::webrtcsrc::BaseWebRTCSrc>()
.imp();
let _ = ws.set_signaller(WhipServerSignaller::default().upcast());
let settings = ws.settings.lock().unwrap();
element
.bind_property("stun-server", &settings.signaller, "stun-server")
.build();
element
.bind_property("turn-servers", &settings.signaller, "turn-servers")
.build();
}
}
impl GstObjectImpl for WhipServerSrc {}
impl BinImpl for WhipServerSrc {}
impl ElementImpl for WhipServerSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> =
LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"WhipServerSrc",
"Source/Network/WebRTC",
"WebRTC source element using WHIP Server as the signaller",
"Taruntej Kanakamalla <taruntej@asymptotic.io>",
)
});
Some(&*ELEMENT_METADATA)
}
}
impl BaseWebRTCSrcImpl for WhipServerSrc {}
#[glib::object_subclass]
impl ObjectSubclass for WhipServerSrc {
const NAME: &'static str = "GstWhipServerSrc";
type Type = crate::webrtcsrc::WhipServerSrc;
type ParentType = crate::webrtcsrc::BaseWebRTCSrc;
}
}
#[cfg(feature = "livekit")]
pub(super) mod livekit {
use super::*;
use crate::{livekit_signaller::LiveKitSignaller, webrtcsrc::pad::WebRTCSrcPadImpl};
#[derive(Default)]
pub struct LiveKitWebRTCSrc;
impl ObjectImpl for LiveKitWebRTCSrc {
fn constructed(&self) {
self.parent_constructed();
let element = self.obj();
let ws = element
.upcast_ref::<crate::webrtcsrc::BaseWebRTCSrc>()
.imp();
let _ = ws.set_signaller(LiveKitSignaller::new_consumer().upcast());
}
}
impl GstObjectImpl for LiveKitWebRTCSrc {}
impl BinImpl for LiveKitWebRTCSrc {}
impl ElementImpl for LiveKitWebRTCSrc {
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: LazyLock<Vec<gst::PadTemplate>> = LazyLock::new(|| {
super::BaseWebRTCSrc::pad_templates()
.iter()
.map(|pad_templ| {
gst::PadTemplate::with_gtype(
pad_templ.name_template(),
pad_templ.direction(),
pad_templ.presence(),
pad_templ.caps(),
super::super::LiveKitWebRTCSrcPad::static_type(),
)
.unwrap()
})
.collect()
});
PAD_TEMPLATES.as_ref()
}
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> =
LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"LiveKitWebRTCSrc",
"Source/Network/WebRTC",
"WebRTC source with LiveKit signaller",
"Jordan Yelloz <jordan.yelloz@collabora.com>",
)
});
Some(&*ELEMENT_METADATA)
}
}
impl BaseWebRTCSrcImpl for LiveKitWebRTCSrc {}
#[glib::object_subclass]
impl ObjectSubclass for LiveKitWebRTCSrc {
const NAME: &'static str = "GstLiveKitWebRTCSrc";
type Type = crate::webrtcsrc::LiveKitWebRTCSrc;
type ParentType = crate::webrtcsrc::BaseWebRTCSrc;
}
#[derive(Default)]
pub struct LiveKitWebRTCSrcPad {}
fn participant_permission_to_structure(
participant_permission: &livekit_protocol::ParticipantPermission,
) -> gst::Structure {
gst::Structure::builder("livekit/participant-permission")
.field("can-subscribe", participant_permission.can_subscribe)
.field("can-publish", participant_permission.can_publish)
.field("can-publish-data", participant_permission.can_publish_data)
.field_from_iter::<gst::Array, _>(
"can-publish-sources",
participant_permission.can_publish_sources.iter().map(|f| {
livekit_protocol::TrackSource::try_from(*f)
.map(|s| s.as_str_name())
.unwrap_or("unknown")
}),
)
.field("hidden", participant_permission.hidden)
.field(
"can-update-metadata",
participant_permission.can_update_metadata,
)
.field(
"can-subscribe-metrics",
participant_permission.can_subscribe_metrics,
)
.build()
}
fn track_info_to_structure(track_info: &livekit_protocol::TrackInfo) -> gst::Structure {
gst::Structure::builder("livekit/track-info")
.field("sid", &track_info.sid)
.field(
"type",
livekit_protocol::TrackType::try_from(track_info.r#type)
.map(|t| t.as_str_name())
.unwrap_or("unknown"),
)
.field("name", &track_info.name)
.field("muted", track_info.muted)
.field("width", track_info.width)
.field("height", track_info.height)
.field(
"source",
livekit_protocol::TrackSource::try_from(track_info.source)
.map(|s| s.as_str_name())
.unwrap_or("unknown"),
)
.field("mime-type", &track_info.mime_type)
.field("mid", &track_info.mid)
.field("disable-red", track_info.disable_red)
.field(
"encryption",
livekit_protocol::encryption::Type::try_from(track_info.encryption)
.map(|e| e.as_str_name())
.unwrap_or("unknown"),
)
.field("stream", &track_info.stream)
.field_from_iter::<gst::Array, _>(
"audio-features",
track_info.audio_features.iter().map(|f| {
livekit_protocol::AudioTrackFeature::try_from(*f)
.map(|s| s.as_str_name())
.unwrap_or("unknown")
}),
)
.field(
"backup-codec-policy",
livekit_protocol::BackupCodecPolicy::try_from(track_info.backup_codec_policy)
.map(|s| s.as_str_name())
.unwrap_or("unknown"),
)
.build()
}
fn participant_info_to_structure(
participant_info: &livekit_protocol::ParticipantInfo,
) -> gst::Structure {
gst::Structure::builder("livekit/participant-info")
.field("sid", &participant_info.sid)
.field("identity", &participant_info.identity)
.field(
"state",
livekit_protocol::participant_info::State::try_from(participant_info.state)
.map(|s| s.as_str_name())
.unwrap_or("unknown"),
)
.field_from_iter::<gst::Array, _>(
"tracks",
participant_info.tracks.iter().map(track_info_to_structure),
)
.field("metadata", &participant_info.metadata)
.field("joined-at", participant_info.joined_at)
.field("name", &participant_info.name)
.field("version", participant_info.version)
.field_if_some(
"permission",
participant_info
.permission
.as_ref()
.map(participant_permission_to_structure),
)
.field("region", &participant_info.region)
.field("is-publisher", participant_info.is_publisher)
.field(
"kind",
livekit_protocol::participant_info::Kind::try_from(participant_info.kind)
.map(|k| k.as_str_name())
.unwrap_or("unknown"),
)
.field(
"disconnect-reason",
livekit_protocol::DisconnectReason::try_from(participant_info.disconnect_reason)
.map(|d| d.as_str_name())
.unwrap_or("unknown"),
)
.field_from_iter::<gst::Array, _>(
"kind-details",
participant_info.kind_details.iter().map(|k| {
livekit_protocol::participant_info::KindDetail::try_from(*k)
.map(|s| s.as_str_name())
.unwrap_or("unknown")
}),
)
.build()
}
impl LiveKitWebRTCSrcPad {
fn participant_track_sid(&self) -> Option<(String, String)> {
let msid = self.obj().property::<Option<String>>("msid")?;
let (participant_sid, track_sid) = msid.split_once('|')?;
Some((String::from(participant_sid), String::from(track_sid)))
}
fn participant_info(&self) -> Option<gst::Structure> {
let participant_sid = self.participant_sid()?;
let webrtcbin = self
.obj()
.parent()
.and_downcast::<super::super::BaseWebRTCSrc>()?;
let signaller = webrtcbin.property::<LiveKitSignaller>("signaller");
let participant_info = signaller.imp().participant_info(&participant_sid)?;
Some(participant_info_to_structure(&participant_info))
}
fn track_info(&self) -> Option<gst::Structure> {
let (participant_sid, track_sid) = self.participant_track_sid()?;
let webrtcbin = self
.obj()
.parent()
.and_downcast::<super::super::BaseWebRTCSrc>()?;
let signaller = webrtcbin.property::<LiveKitSignaller>("signaller");
let participant_info = signaller.imp().participant_info(&participant_sid)?;
participant_info
.tracks
.iter()
.find(|t| t.sid == track_sid)
.map(track_info_to_structure)
}
fn participant_sid(&self) -> Option<String> {
self.participant_track_sid()
.map(|(participant_sid, _track_sid)| participant_sid)
}
fn track_sid(&self) -> Option<String> {
self.participant_track_sid()
.map(|(_participant_sid, track_sid)| track_sid)
}
fn set_track_disabled(&self, disabled: bool) {
let Some(track_sid) = self.track_sid() else {
return;
};
let Some(webrtcbin) = self
.obj()
.parent()
.and_downcast::<super::super::BaseWebRTCSrc>()
else {
return;
};
let signaller = webrtcbin.property::<LiveKitSignaller>("signaller");
signaller.imp().set_track_disabled(&track_sid, disabled)
}
}
#[glib::object_subclass]
impl ObjectSubclass for LiveKitWebRTCSrcPad {
const NAME: &'static str = "GstLiveKitWebRTCSrcPad";
type Type = super::super::LiveKitWebRTCSrcPad;
type ParentType = super::super::WebRTCSrcPad;
}
impl ObjectImpl for LiveKitWebRTCSrcPad {
fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: LazyLock<Vec<glib::subclass::Signal>> = LazyLock::new(|| {
vec![
glib::subclass::Signal::builder("set-track-disabled")
.param_types([bool::static_type()])
.action()
.class_handler(|values| {
let pad = values[0]
.get::<&super::super::LiveKitWebRTCSrcPad>()
.unwrap();
let disabled = values[1].get::<bool>().unwrap();
pad.imp().set_track_disabled(disabled);
None
})
.build(),
]
});
SIGNALS.as_ref()
}
fn properties() -> &'static [glib::ParamSpec] {
static PROPS: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![
glib::ParamSpecBoxed::builder::<gst::Structure>("participant-info")
.flags(glib::ParamFlags::READABLE)
.blurb("Participant Information")
.build(),
glib::ParamSpecBoxed::builder::<gst::Structure>("track-info")
.flags(glib::ParamFlags::READABLE)
.blurb("Track Information")
.build(),
glib::ParamSpecString::builder("participant-sid")
.flags(glib::ParamFlags::READABLE)
.blurb("Participant ID")
.build(),
glib::ParamSpecString::builder("track-sid")
.flags(glib::ParamFlags::READABLE)
.blurb("Track ID")
.build(),
]
});
PROPS.as_ref()
}
fn property(&self, _sid: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"participant-info" => self.participant_info().into(),
"track-info" => self.track_info().into(),
"participant-sid" => self.participant_sid().into(),
"track-sid" => self.track_sid().into(),
name => panic!("no readable property {name:?}"),
}
}
}
impl GstObjectImpl for LiveKitWebRTCSrcPad {}
impl PadImpl for LiveKitWebRTCSrcPad {}
impl ProxyPadImpl for LiveKitWebRTCSrcPad {}
impl GhostPadImpl for LiveKitWebRTCSrcPad {}
impl WebRTCSrcPadImpl for LiveKitWebRTCSrcPad {}
}
#[cfg(feature = "janus")]
pub(super) mod janus {
use super::*;
use crate::{
janusvr_signaller::{JanusVRSignallerStr, JanusVRSignallerU64},
webrtcsink::JanusVRSignallerState,
webrtcsrc::WebRTCSignallerRole,
};
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"janusvrwebrtcsrc",
gst::DebugColorFlags::empty(),
Some("WebRTC Janus Video Room src"),
)
});
#[derive(Debug, Clone, Default)]
struct JanusSettings {
use_string_ids: bool,
}
#[derive(Debug, Default)]
struct JanusState {
janus_state: JanusVRSignallerState,
}
#[derive(Default, glib::Properties)]
#[properties(wrapper_type = crate::webrtcsrc::JanusVRWebRTCSrc)]
pub struct JanusVRWebRTCSrc {
#[property(name="use-string-ids", get, construct_only, type = bool, member = use_string_ids, blurb = "Use strings instead of u64 for Janus IDs, see strings_ids config option in janus.plugin.videoroom.jcfg")]
settings: Mutex<JanusSettings>,
#[property(
name = "janus-state",
get,
member = janus_state,
type = JanusVRSignallerState,
blurb = "The current state of the signaller",
builder(JanusVRSignallerState::Initialized)
)]
state: Mutex<JanusState>,
}
#[glib::derived_properties]
impl ObjectImpl for JanusVRWebRTCSrc {
fn constructed(&self) {
self.parent_constructed();
let settings = self.settings.lock().unwrap();
let element = self.obj();
let ws = element
.upcast_ref::<crate::webrtcsrc::BaseWebRTCSrc>()
.imp();
let signaller: Signallable = if settings.use_string_ids {
JanusVRSignallerStr::new(WebRTCSignallerRole::Consumer).upcast()
} else {
JanusVRSignallerU64::new(WebRTCSignallerRole::Consumer).upcast()
};
let self_weak = self.downgrade();
signaller.connect("state-updated", false, move |args| {
let self_ = self_weak.upgrade()?;
let janus_state = args[1].get::<JanusVRSignallerState>().unwrap();
{
let mut state = self_.state.lock().unwrap();
state.janus_state = janus_state;
}
gst::debug!(
CAT,
imp = self_,
"signaller state updated: {:?}",
janus_state
);
self_.obj().notify("janus-state");
None
});
let _ = ws.set_signaller(signaller);
}
}
impl GstObjectImpl for JanusVRWebRTCSrc {}
impl ElementImpl for JanusVRWebRTCSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> =
LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"JanusVRWebRTCSrc",
"Source/Network/WebRTC",
"WebRTC source with Janus Video Room signaller",
"Eva Pace <epace@igalia.com>",
)
});
Some(&*ELEMENT_METADATA)
}
}
impl BinImpl for JanusVRWebRTCSrc {}
impl BaseWebRTCSrcImpl for JanusVRWebRTCSrc {}
#[glib::object_subclass]
impl ObjectSubclass for JanusVRWebRTCSrc {
const NAME: &'static str = "GstJanusVRWebRTCSrc";
type Type = crate::webrtcsrc::JanusVRWebRTCSrc;
type ParentType = crate::webrtcsrc::BaseWebRTCSrc;
type Interfaces = (gst::URIHandler,);
}
impl URIHandlerImpl for JanusVRWebRTCSrc {
const URI_TYPE: gst::URIType = gst::URIType::Src;
fn protocols() -> &'static [&'static str] {
&["gstjanusvr", "gstjanusvrs"]
}
fn uri(&self) -> Option<String> {
{
let settings = self.settings.lock().unwrap();
if settings.use_string_ids {
return None;
}
}
let obj = self.obj();
let base = obj.upcast_ref::<crate::webrtcsrc::BaseWebRTCSrc>().imp();
let signaller = base.signaller();
let janus_endpoint = signaller.property::<String>("janus-endpoint");
let uri = janus_endpoint
.replace("wss://", "gstjansvrs://")
.replace("ws://", "gstjanusvr://");
let room_id = signaller.property::<u64>("room-id");
let producer_peer_id = signaller.property::<u64>("producer-peer-id");
Some(format!(
"{uri}?use-string-ids=false&room-id={room_id}&producer-peer-id={producer_peer_id}"
))
}
fn set_uri(&self, uri: &str) -> Result<(), glib::Error> {
gst::debug!(CAT, imp = self, "parsing URI {uri}");
let uri = Url::from_str(uri)
.map_err(|err| glib::Error::new(gst::URIError::BadUri, &format!("{err:?}")))?;
let socket_scheme = match uri.scheme() {
"gstjanusvr" => Ok("ws"),
"gstjanusvrs" => Ok("wss"),
_ => Err(glib::Error::new(
gst::URIError::BadUri,
&format!("Invalid protocol: {}", uri.scheme()),
)),
}?;
let port = uri
.port()
.map(|port| format!(":{port}"))
.unwrap_or_default();
let janus_endpoint = format!(
"{socket_scheme}://{}{port}{}",
uri.host_str().unwrap_or("127.0.0.1"),
uri.path()
);
let use_strings_ids = uri
.query_pairs()
.find(|(k, _v)| k == "use-string-ids")
.map(|(_k, v)| v.to_lowercase() == "true")
.unwrap_or_default();
if use_strings_ids {
return Err(glib::Error::new(
gst::URIError::BadUri,
"use-string-ids=true not yet supported in URI",
));
}
let room_id = uri
.query_pairs()
.find(|(k, _v)| k == "room-id")
.map(|(_k, v)| v)
.ok_or(glib::Error::new(gst::URIError::BadUri, "room-id missing"))?;
let producer_peer_id = uri
.query_pairs()
.find(|(k, _v)| k == "producer-peer-id")
.map(|(_k, v)| v)
.ok_or(glib::Error::new(
gst::URIError::BadUri,
"producer-peer-id missing",
))?;
let obj = self.obj();
let base = obj.upcast_ref::<crate::webrtcsrc::BaseWebRTCSrc>().imp();
let signaller = base.signaller();
let room_id = room_id.parse::<u64>().map_err(|err| {
glib::Error::new(gst::URIError::BadUri, &format!("Invalid room-id: {err}"))
})?;
let producer_peer_id = producer_peer_id.parse::<u64>().map_err(|err| {
glib::Error::new(
gst::URIError::BadUri,
&format!("Invalid producer-peer-id: {err}"),
)
})?;
signaller.set_property("janus-endpoint", &janus_endpoint);
signaller.set_property("room-id", room_id);
signaller.set_property("producer-peer-id", producer_peer_id);
Ok(())
}
}
}
#[cfg(feature = "whep")]
pub(super) mod whep {
use super::*;
use crate::whep_signaller::WhepClientSignaller;
#[derive(Default)]
pub struct WhepClientSrc {}
impl ObjectImpl for WhepClientSrc {
fn constructed(&self) {
self.parent_constructed();
let element = self.obj();
let ws = element
.upcast_ref::<crate::webrtcsrc::BaseWebRTCSrc>()
.imp();
let _ = ws.set_signaller(WhepClientSignaller::default().upcast());
let obj = &*self.obj();
obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE);
obj.set_element_flags(gst::ElementFlags::SOURCE);
}
}
impl GstObjectImpl for WhepClientSrc {}
impl BinImpl for WhepClientSrc {}
impl ElementImpl for WhepClientSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> =
LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"WhepClientSrc",
"Source/Network/WebRTC",
"WebRTC source element using WHEP Client as the signaller",
"Sanchayan Maity <sanchayan@asymptotic.io>",
)
});
Some(&*ELEMENT_METADATA)
}
}
impl BaseWebRTCSrcImpl for WhepClientSrc {}
#[glib::object_subclass]
impl ObjectSubclass for WhepClientSrc {
const NAME: &'static str = "GstWhepClientSrc";
type Type = crate::webrtcsrc::WhepClientSrc;
type ParentType = crate::webrtcsrc::BaseWebRTCSrc;
}
}