use crate::RUNTIME;
use crate::signaller::{Signallable, SignallableImpl};
use crate::utils::{
WaitError, build_reqwest_client, parse_redirect_location, set_ice_servers, wait, wait_async,
};
use bytes::Bytes;
use futures::future;
use gst::glib::RustClosure;
use gst::{glib, prelude::*, subclass::prelude::*};
use gst_sdp::*;
use gst_webrtc::*;
use reqwest::StatusCode;
use reqwest::header::{HeaderMap, HeaderValue};
use std::sync::LazyLock;
use std::sync::Mutex;
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"whep-client-signaller",
gst::DebugColorFlags::empty(),
Some("WHEP Client Signaller"),
)
});
const MAX_REDIRECTS: u8 = 10;
const DEFAULT_TIMEOUT: u32 = 15;
const WHEP_CLIENT_OFFER: &str = "whep-client-offer";
const WHEP_SSO: &str = "whep-server-sent-offer";
#[derive(Debug, Clone)]
struct Settings {
whep_endpoint: Option<String>,
auth_token: Option<String>,
use_link_headers: bool,
timeout: u32,
}
#[allow(clippy::derivable_impls)]
impl Default for Settings {
fn default() -> Self {
Self {
whep_endpoint: None,
auth_token: None,
use_link_headers: false,
timeout: DEFAULT_TIMEOUT,
}
}
}
#[derive(Debug, Clone)]
enum PatchType {
AnswerCounterOffer { whep_resource: String },
}
#[derive(Debug, Default, Clone)]
enum State {
#[default]
Stopped,
Post {
redirects: u8,
},
Running {
whep_resource: String,
},
Patch {
patch: PatchType,
},
}
impl std::fmt::Display for State {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
match self {
State::Stopped => write!(f, "Stopped"),
State::Post { redirects } => write!(f, "Post (redirects: {redirects})"),
State::Running { whep_resource } => {
write!(f, "Running (whep_resource: {whep_resource})")
}
State::Patch { patch } => match patch {
PatchType::AnswerCounterOffer { whep_resource } => write!(
f,
"Patch AnswerCounterOffer (whep_resource {whep_resource})"
),
},
}
}
}
pub struct WhepClient {
settings: Mutex<Settings>,
state: Mutex<State>,
canceller: Mutex<Option<future::AbortHandle>>,
client: reqwest::Client,
}
impl Default for WhepClient {
fn default() -> Self {
let pol = reqwest::redirect::Policy::none();
let client = build_reqwest_client(pol);
Self {
settings: Mutex::new(Settings::default()),
state: Mutex::new(State::default()),
canceller: Mutex::new(None),
client,
}
}
}
impl ObjectImpl for WhepClient {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![
glib::ParamSpecString::builder("whep-endpoint")
.nick("WHEP Endpoint")
.blurb("The WHEP server endpoint to POST SDP offer to.")
.build(),
glib::ParamSpecBoolean::builder("use-link-headers")
.nick("Use Link Headers")
.blurb("Use link headers to configure STUN/TURN servers if present in WHEP endpoint response.")
.build(),
glib::ParamSpecString::builder("auth-token")
.nick("Authorization Token")
.blurb("Authentication token to use, will be sent in the HTTP Header as 'Bearer <auth-token>'")
.build(),
glib::ParamSpecUInt::builder("timeout")
.nick("Timeout")
.blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).")
.maximum(3600)
.default_value(DEFAULT_TIMEOUT)
.readwrite()
.build(),
glib::ParamSpecBoolean::builder("manual-sdp-munging")
.nick("Manual SDP munging")
.blurb("Whether the signaller manages SDP munging itself")
.default_value(false)
.read_only()
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"whep-endpoint" => {
let mut settings = self.settings.lock().unwrap();
settings.whep_endpoint = value.get().expect("WHEP endpoint should be a string");
}
"use-link-headers" => {
let mut settings = self.settings.lock().unwrap();
settings.use_link_headers = value
.get()
.expect("use-link-headers should be a boolean value");
}
"auth-token" => {
let mut settings = self.settings.lock().unwrap();
settings.auth_token = value.get().expect("Auth token should be a string");
}
"timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.timeout = value.get().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"whep-endpoint" => {
let settings = self.settings.lock().unwrap();
settings.whep_endpoint.to_value()
}
"use-link-headers" => {
let settings = self.settings.lock().unwrap();
settings.use_link_headers.to_value()
}
"auth-token" => {
let settings = self.settings.lock().unwrap();
settings.auth_token.to_value()
}
"timeout" => {
let settings = self.settings.lock().unwrap();
settings.timeout.to_value()
}
"manual-sdp-munging" => false.to_value(),
_ => unimplemented!(),
}
}
}
#[glib::object_subclass]
impl ObjectSubclass for WhepClient {
const NAME: &'static str = "GstWhepClientSignaller";
type Type = super::WhepClientSignaller;
type ParentType = glib::Object;
type Interfaces = (Signallable,);
}
impl WhepClient {
fn raise_error(&self, msg: String) {
self.obj()
.emit_by_name::<()>("error", &[&format!("Error: {msg}")]);
}
fn handle_future_error(&self, err: WaitError) {
match err {
WaitError::FutureAborted => {
gst::warning!(CAT, imp = self, "Future aborted")
}
WaitError::FutureError(err) => self.raise_error(err.to_string()),
};
}
fn sdp_message_parse(&self, sdp_bytes: Bytes, server_sent_offer: bool) {
let sdp = match sdp_message::SDPMessage::parse_buffer(&sdp_bytes) {
Ok(sdp) => sdp,
Err(_) => {
self.raise_error("Could not parse answer SDP".to_string());
return;
}
};
let remote_sdp = WebRTCSessionDescription::new(
if server_sent_offer {
WebRTCSDPType::Offer
} else {
WebRTCSDPType::Answer
},
sdp,
);
if !server_sent_offer {
self.obj()
.emit_by_name::<()>("session-description", &[&WHEP_CLIENT_OFFER, &remote_sdp]);
} else {
if self
.obj()
.emit_by_name::<bool>("session-ended", &[&WHEP_CLIENT_OFFER])
{
gst::debug!(CAT, imp = self, "Requesting new session");
self.obj()
.emit_by_name::<()>("session-started", &[&WHEP_SSO, &WHEP_SSO]);
gst::debug!(CAT, imp = self, "Sending new session description as Offer");
self.obj()
.emit_by_name::<()>("session-description", &[&WHEP_SSO, &remote_sdp]);
}
}
}
async fn parse_endpoint_response(
&self,
sess_desc: WebRTCSessionDescription,
resp: reqwest::Response,
redirects: u8,
webrtcbin: gst::Element,
) {
let endpoint;
let use_link_headers;
{
let settings = self.settings.lock().unwrap();
endpoint =
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
use_link_headers = settings.use_link_headers;
drop(settings);
}
let status = resp.status();
match status {
StatusCode::OK | StatusCode::NO_CONTENT => {
gst::info!(CAT, imp = self, "SDP offer successfully sent");
}
StatusCode::CREATED | StatusCode::NOT_ACCEPTABLE => {
gst::debug!(CAT, imp = self, "Response headers: {:?}", resp.headers());
if use_link_headers && let Err(e) = set_ice_servers(&webrtcbin, resp.headers()) {
self.raise_error(e.to_string());
return;
};
let location = match resp.headers().get(reqwest::header::LOCATION) {
Some(location) => location,
None => {
self.raise_error(
"Location header field should be present for WHEP resource URL"
.to_string(),
);
return;
}
};
let location = match location.to_str() {
Ok(loc) => loc,
Err(e) => {
self.raise_error(format!("Failed to convert location to string: {e}"));
return;
}
};
let url = reqwest::Url::parse(endpoint.as_str()).unwrap();
gst::debug!(CAT, imp = self, "WHEP resource: {:?}", location);
let url = match url.join(location) {
Ok(joined_url) => joined_url,
Err(err) => {
self.raise_error(format!("URL join operation failed: {err:?}"));
return;
}
};
let server_sent_offer = status == StatusCode::NOT_ACCEPTABLE;
match resp.bytes().await {
Ok(sdp) => {
let mut state = self.state.lock().unwrap();
*state = match *state {
State::Post { redirects: _r } => {
if server_sent_offer {
State::Patch {
patch: PatchType::AnswerCounterOffer {
whep_resource: url.to_string(),
},
}
} else {
State::Running {
whep_resource: url.to_string(),
}
}
}
_ => {
self.raise_error("Expected to be in POST state".to_string());
return;
}
};
drop(state);
self.sdp_message_parse(sdp, server_sent_offer)
}
Err(err) => self.raise_error(err.to_string()),
}
}
status if status.is_redirection() => {
if redirects < MAX_REDIRECTS {
match parse_redirect_location(resp.headers(), &endpoint) {
Ok(redirect_url) => {
{
let mut state = self.state.lock().unwrap();
*state = match *state {
State::Post { redirects: _r } => State::Post {
redirects: redirects + 1,
},
State::Running { .. } => {
self.raise_error(
"Unexpected redirection in RUNNING state".to_string(),
);
return;
}
_ => unreachable!(),
};
drop(state);
}
gst::warning!(
CAT,
imp = self,
"Redirecting endpoint to {}",
redirect_url.as_str()
);
Box::pin(self.do_post(sess_desc, webrtcbin, redirect_url)).await
}
Err(e) => self.raise_error(e.to_string()),
}
} else {
self.raise_error("Too many redirects. Unable to connect.".to_string());
}
}
s => {
match resp.bytes().await {
Ok(r) => {
let res = r.escape_ascii().to_string();
self.raise_error(format!("Unexpected response: {} - {}", s.as_str(), res));
}
Err(err) => self.raise_error(err.to_string()),
}
}
}
}
async fn whep_offer(&self, webrtcbin: gst::Element) {
let local_desc =
webrtcbin.property::<Option<WebRTCSessionDescription>>("local-description");
let sess_desc = match local_desc {
None => {
self.raise_error("Local description is not set".to_string());
return;
}
Some(mut local_desc) => {
local_desc.set_type(WebRTCSDPType::Offer);
local_desc
}
};
gst::debug!(
CAT,
imp = self,
"Sending offer SDP: {}",
sess_desc.sdp().to_string()
);
let timeout;
let endpoint;
{
let settings = self.settings.lock().unwrap();
timeout = settings.timeout;
endpoint =
reqwest::Url::parse(settings.whep_endpoint.as_ref().unwrap().as_str()).unwrap();
drop(settings);
}
if let Err(e) = wait_async(
&self.canceller,
self.do_post(sess_desc, webrtcbin, endpoint),
timeout,
)
.await
{
self.handle_future_error(e);
}
}
async fn answer_sso(&self, webrtcbin: gst::Element) {
let local_desc =
webrtcbin.property::<Option<WebRTCSessionDescription>>("local-description");
let timeout = self.settings.lock().unwrap().timeout;
let sess_desc = match local_desc {
None => {
self.raise_error("Local description is not set".to_string());
return;
}
Some(mut local_desc) => {
local_desc.set_type(WebRTCSDPType::Answer);
local_desc
}
};
gst::debug!(
CAT,
imp = self,
"Sending answer SDP: {}",
sess_desc.sdp().to_string()
);
if let Err(e) = wait_async(&self.canceller, self.do_patch(sess_desc), timeout).await {
self.handle_future_error(e);
}
}
async fn do_patch(&self, offer: WebRTCSessionDescription) -> Result<(), gst::ErrorMessage> {
let resource_url = {
let state = self.state.lock().unwrap();
match &*state {
State::Patch { patch: p } => match p {
PatchType::AnswerCounterOffer { whep_resource } => whep_resource.clone(),
},
State::Running { whep_resource: _ } => {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Trying to do PATCH in Running state"]
));
}
_ => {
gst::warning!(
CAT,
imp = self,
"Not allowed to do PATCH request in {} state",
&*state
);
return Ok(());
}
}
};
let auth_token = self.settings.lock().unwrap().auth_token.clone();
let sdp = offer.sdp();
let body = sdp.as_text().unwrap();
let mut headermap = HeaderMap::new();
headermap.insert(
reqwest::header::CONTENT_TYPE,
HeaderValue::from_static("application/sdp"),
);
if let Some(token) = auth_token.as_ref() {
let bearer_token = format!("Bearer {token}");
headermap.insert(
reqwest::header::AUTHORIZATION,
HeaderValue::from_str(bearer_token.as_str())
.expect("Failed to set auth token to header"),
);
}
gst::debug!(
CAT,
imp = self,
"Url for HTTP PATCH request: {resource_url}",
);
let resp = self
.client
.request(reqwest::Method::PATCH, resource_url)
.headers(headermap)
.body(body)
.send()
.await;
match resp {
Ok(r) => match r.status() {
StatusCode::OK | StatusCode::NO_CONTENT => {
let mut state = self.state.lock().unwrap();
if let State::Patch { patch } = &mut *state {
match patch {
PatchType::AnswerCounterOffer { whep_resource } => {
*state = State::Running {
whep_resource: whep_resource.clone(),
};
}
}
}
gst::debug!(CAT, imp = self, "Server accepted SDP answer",);
}
_ => {
return Err(gst::error_msg!(
gst::ResourceError::Failed,
["Unexpected response from the server: {}", r.status()]
));
}
},
Err(err) => return Err(gst::error_msg!(gst::ResourceError::Failed, ["{err}"])),
}
Ok(())
}
async fn do_post(
&self,
offer: WebRTCSessionDescription,
webrtcbin: gst::Element,
endpoint: reqwest::Url,
) {
let redirects = {
let state = self.state.lock().unwrap();
match *state {
State::Post { redirects } => redirects,
State::Running { whep_resource: _ } => {
let err = "Trying to do POST in Running state".to_string();
self.raise_error(err);
return;
}
_ => {
gst::warning!(CAT, imp = self, "Not allowed to POST in {} state", &*state);
return;
}
}
};
let auth_token = self.settings.lock().unwrap().auth_token.clone();
let sdp = offer.sdp();
let body = sdp.as_text().unwrap();
gst::info!(CAT, imp = self, "Using endpoint {}", endpoint.as_str());
let mut headermap = HeaderMap::new();
headermap.insert(
reqwest::header::CONTENT_TYPE,
HeaderValue::from_static("application/sdp"),
);
if let Some(token) = auth_token.as_ref() {
let bearer_token = "Bearer ".to_owned() + token.as_str();
headermap.insert(
reqwest::header::AUTHORIZATION,
HeaderValue::from_str(bearer_token.as_str())
.expect("Failed to set auth token to header"),
);
}
gst::debug!(
CAT,
imp = self,
"Url for HTTP POST request: {}",
endpoint.as_str()
);
let resp = self
.client
.request(reqwest::Method::POST, endpoint.clone())
.headers(headermap)
.body(body)
.send()
.await;
match resp {
Ok(r) => {
self.parse_endpoint_response(offer, r, redirects, webrtcbin)
.await
}
Err(err) => self.raise_error(err.to_string()),
}
}
fn terminate_session(&self, resource_url: &String) {
let settings = self.settings.lock().unwrap();
let timeout = settings.timeout;
let mut headermap = HeaderMap::new();
if let Some(token) = &settings.auth_token {
let bearer_token = "Bearer ".to_owned() + token.as_str();
headermap.insert(
reqwest::header::AUTHORIZATION,
HeaderValue::from_str(bearer_token.as_str())
.expect("Failed to set auth token to header"),
);
}
drop(settings);
gst::debug!(CAT, imp = self, "DELETE request on {}", resource_url);
let client = build_reqwest_client(reqwest::redirect::Policy::default());
let future = async {
client
.delete(resource_url.clone())
.headers(headermap)
.send()
.await
.map_err(|err| {
gst::error_msg!(
gst::ResourceError::Failed,
["DELETE request failed {}: {:?}", resource_url, err]
)
})
};
let res = wait(&self.canceller, future, timeout);
match res {
Ok(r) => {
gst::debug!(CAT, imp = self, "Response to DELETE : {}", r.status());
}
Err(e) => match e {
WaitError::FutureAborted => {
gst::warning!(CAT, imp = self, "DELETE request aborted")
}
WaitError::FutureError(e) => {
gst::error!(CAT, imp = self, "Error on DELETE request : {}", e)
}
},
};
}
pub fn on_webrtcbin_ready(&self) -> RustClosure {
glib::closure!(|signaller: &super::WhepClientSignaller,
_consumer_identifier: &str,
webrtcbin: &gst::Element| {
webrtcbin.connect_notify(
Some("ice-gathering-state"),
glib::clone!(
#[weak]
signaller,
move |webrtcbin, _pspec| {
let state =
webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
match state {
WebRTCICEGatheringState::Gathering => {
gst::info!(CAT, obj = signaller, "ICE gathering started");
}
WebRTCICEGatheringState::Complete => {
gst::info!(CAT, obj = signaller, "ICE gathering complete");
let webrtcbin = webrtcbin.clone();
RUNTIME.spawn(async move {
signaller.imp().on_ice_gathering_complete(webrtcbin).await
});
}
_ => (),
}
}
),
);
})
}
async fn on_ice_gathering_complete(&self, webrtcbin: gst::Element) {
let state = self.state.lock().unwrap().clone();
match state {
State::Patch { patch: p } => match p.clone() {
PatchType::AnswerCounterOffer { whep_resource: _ } => {
self.answer_sso(webrtcbin).await
}
},
State::Post { redirects: _ } => self.whep_offer(webrtcbin).await,
_ => {}
}
}
}
impl SignallableImpl for WhepClient {
fn start(&self) {
if self.settings.lock().unwrap().whep_endpoint.is_none() {
self.raise_error("WHEP endpoint URL must be set".to_string());
return;
}
let mut state = self.state.lock().unwrap();
*state = State::Post { redirects: 0 };
drop(state);
let this_weak = self.downgrade();
RUNTIME.spawn(async move {
if let Some(this) = this_weak.upgrade() {
this.obj().emit_by_name::<()>(
"session-started",
&[&WHEP_CLIENT_OFFER, &WHEP_CLIENT_OFFER],
);
this.obj().emit_by_name::<()>(
"session-requested",
&[
&WHEP_CLIENT_OFFER,
&WHEP_CLIENT_OFFER,
&None::<gst_webrtc::WebRTCSessionDescription>,
],
);
}
});
}
fn stop(&self) {}
fn end_session(&self, session_id: &str) {
gst::debug!(CAT, imp = self, " ending session {session_id}");
if let Some(canceller) = &*self.canceller.lock().unwrap() {
canceller.abort();
}
let mut state = self.state.lock().unwrap();
match &*state {
State::Running { whep_resource } => {
self.terminate_session(whep_resource);
*state = State::Stopped;
}
State::Patch { patch } => match patch {
PatchType::AnswerCounterOffer { whep_resource } => {
if session_id == WHEP_SSO {
self.terminate_session(whep_resource);
*state = State::Stopped;
}
}
},
_ => {}
};
}
}