use crate::RUNTIME;
use crate::signaller::{Signallable, SignallableImpl};
use crate::utils::{WaitError, build_link_header, wait_async};
use gst::glib::{self, RustClosure};
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_sdp::SDPMessage;
use gst_webrtc::{WebRTCICEGatheringState, WebRTCSessionDescription};
use std::collections::HashMap;
use std::sync::LazyLock;
use std::sync::Mutex;
use http::{HeaderMap, HeaderName, HeaderValue};
use std::net::SocketAddr;
use tokio::sync::mpsc;
use url::Url;
use warp::{Filter, Reply, http};
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"whep-server-signaller",
gst::DebugColorFlags::empty(),
Some("WHEP Server Signaller"),
)
});
const DEFAULT_TIMEOUT: u32 = 30;
const DEFAULT_SEND_COUNTER_OFFER: bool = false;
const ROOT: &str = "whep";
const ENDPOINT_PATH: &str = "endpoint";
const RESOURCE_PATH: &str = "resource";
const DEFAULT_HOST_ADDR: &str = "http://127.0.0.1:9090";
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19303");
const CONTENT_SDP: &str = "application/sdp";
struct Settings {
stun_server: Option<String>,
turn_servers: gst::Array,
host_addr: Option<Url>,
timeout: u32,
shutdown_signal: Option<tokio::sync::oneshot::Sender<()>>,
server_handle: Option<tokio::task::JoinHandle<()>>,
sdp_response: HashMap<String, mpsc::Sender<Option<SDPMessage>>>,
send_counter_offer: bool,
}
impl Default for Settings {
fn default() -> Self {
Self {
host_addr: Some(Url::parse(DEFAULT_HOST_ADDR).unwrap()),
stun_server: DEFAULT_STUN_SERVER.map(String::from),
turn_servers: gst::Array::new(Vec::new() as Vec<glib::SendValue>),
timeout: DEFAULT_TIMEOUT,
shutdown_signal: None,
server_handle: None,
sdp_response: HashMap::new(),
send_counter_offer: false,
}
}
}
#[derive(Default)]
pub struct WhepServer {
settings: Mutex<Settings>,
}
impl WhepServer {
pub fn on_webrtcbin_ready(&self) -> RustClosure {
glib::closure!(|signaller: &super::WhepServerSignaller,
session_id: &str,
webrtcbin: &gst::Element| {
webrtcbin.connect_notify(
Some("ice-gathering-state"),
glib::clone!(
#[weak]
signaller,
#[to_owned]
session_id,
move |webrtcbin, _pspec| {
let state =
webrtcbin.property::<WebRTCICEGatheringState>("ice-gathering-state");
match state {
WebRTCICEGatheringState::Gathering => {
gst::info!(CAT, obj = signaller, "ICE gathering started");
}
WebRTCICEGatheringState::Complete => {
gst::info!(
CAT,
obj = signaller,
"ICE gathering complete for {session_id}"
);
let ans: Option<gst_sdp::SDPMessage>;
let mut settings = signaller.imp().settings.lock().unwrap();
match webrtcbin.property::<Option<WebRTCSessionDescription>>(
"local-description",
) {
Some(answer_desc) => {
ans = Some(answer_desc.sdp().to_owned());
}
_ => {
ans = None;
}
}
let tx = settings
.sdp_response
.remove(&session_id)
.expect("SDP answer Sender needs to be valid");
RUNTIME.spawn(glib::clone!(
#[strong]
signaller,
async move {
if let Err(e) = tx.send(ans).await {
gst::error!(
CAT,
obj = signaller,
"Failed to send SDP {e}"
);
}
}
));
}
_ => (),
}
}
),
);
})
}
async fn patch_session(
&self,
id: String,
body: &[u8],
headers: Vec<(Option<HeaderName>, HeaderValue)>,
) -> http::StatusCode {
let mut send_counter_offer = self.settings.lock().unwrap().send_counter_offer;
for (name, value) in headers {
if let Some(name) = name {
match name {
http::header::CONTENT_TYPE => match value.to_str() {
Ok(CONTENT_SDP) => {
send_counter_offer &= true;
}
Ok(t) => gst::info!(CAT, imp = self, "Unhandled content type: {t}"),
Err(e) => gst::error!(CAT, imp = self, "Error getting content type {e}"),
},
_ => gst::info!(CAT, imp = self, "Unhandled header: {:?}", name),
}
}
}
if send_counter_offer {
match gst_sdp::SDPMessage::parse_buffer(body) {
Ok(answer_sdp) => {
let answer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Answer,
answer_sdp,
);
self.obj()
.emit_by_name::<()>("session-description", &[&id, &answer]);
http::StatusCode::NO_CONTENT
}
Err(err) => {
gst::error!(CAT, imp = self, "Could not parse answer SDP: {err}");
http::StatusCode::NOT_ACCEPTABLE
}
}
} else {
http::StatusCode::NOT_IMPLEMENTED
}
}
async fn patch_handler(
&self,
id: String,
body: &[u8],
headermap: HeaderMap,
) -> Result<impl warp::Reply + use<>, warp::Rejection> {
let headers = headermap.into_iter().map(|h| (h.0, h.1)).collect();
Ok(self.patch_session(id, body, headers).await.into_response())
}
async fn delete_session(&self, id: String) -> http::StatusCode {
if self
.obj()
.emit_by_name::<bool>("session-ended", &[&id.as_str()])
{
}
gst::info!(CAT, imp = self, "Ended session {id}");
http::StatusCode::OK
}
async fn delete_handler(
&self,
id: String,
) -> Result<impl warp::Reply + use<>, warp::Rejection> {
Ok(self.delete_session(id).await.into_response())
}
async fn options_handler(&self) -> Result<impl warp::reply::Reply + use<>, warp::Rejection> {
let mut links = http::HeaderMap::new();
let settings = self.settings.lock().unwrap();
if let Some(stun) = &settings.stun_server {
match build_link_header(stun.as_str()) {
Ok(stun_link) => {
links.append(
http::header::LINK,
http::HeaderValue::from_str(stun_link.as_str()).unwrap(),
);
}
Err(e) => {
gst::error!(CAT, imp = self, "Failed to parse {stun:?} : {e:?}");
}
}
}
if !settings.turn_servers.is_empty() {
for turn_server in settings.turn_servers.iter() {
if let Ok(turn) = turn_server.get::<String>() {
gst::debug!(CAT, imp = self, "turn server: {}", turn.as_str());
match build_link_header(turn.as_str()) {
Ok(turn_link) => {
links.append(
http::header::LINK,
http::HeaderValue::from_str(turn_link.as_str()).unwrap(),
);
}
Err(e) => {
gst::error!(CAT, imp = self, "Failed to parse {turn_server:?} : {e:?}");
}
}
} else {
gst::debug!(
CAT,
imp = self,
"Failed to get String value of {turn_server:?}"
);
}
}
}
let mut res = http::Response::builder()
.header("Access-Post", CONTENT_SDP)
.body(bytes::Bytes::new())
.unwrap();
let headers = res.headers_mut();
headers.extend(links);
Ok(res)
}
async fn create_session(
&self,
body: &[u8],
id: Option<String>,
) -> (
http::StatusCode,
Vec<(HeaderName, HeaderValue)>,
Option<bytes::Bytes>,
) {
let mut status = http::StatusCode::CREATED;
let mut headermap = Vec::new();
let session_id = match id {
Some(id) => {
gst::debug!(CAT, imp = self, "got session id {id} from the URL");
id
}
None => {
gst::info!(CAT, imp = self, "no session id in the URL, generating UUID");
uuid::Uuid::new_v4().to_string()
}
};
let (tx, mut rx) = mpsc::channel::<Option<SDPMessage>>(1);
let (wait_timeout, send_counter_offer) = {
let mut settings = self.settings.lock().unwrap();
let wait_timeout = settings.timeout;
let send_counter_offer = settings.send_counter_offer;
settings.sdp_response.insert(session_id.clone(), tx);
drop(settings);
(wait_timeout, send_counter_offer)
};
if send_counter_offer {
self.obj().emit_by_name::<()>(
"session-requested",
&[
&session_id,
&session_id,
&None::<gst_webrtc::WebRTCSessionDescription>,
],
);
status = http::StatusCode::NOT_ACCEPTABLE;
} else {
match gst_sdp::SDPMessage::parse_buffer(body) {
Ok(offer_sdp) => {
let offer = gst_webrtc::WebRTCSessionDescription::new(
gst_webrtc::WebRTCSDPType::Offer,
offer_sdp,
);
self.obj().emit_by_name::<()>(
"session-requested",
&[&session_id, &session_id, &offer],
);
}
Err(err) => {
gst::error!(CAT, imp = self, "Could not parse offer SDP: {err}");
status = http::StatusCode::NOT_ACCEPTABLE;
return (status, headermap, None);
}
}
}
let canceller = Mutex::new(None);
let result = wait_async(&canceller, rx.recv(), wait_timeout).await;
let response_sdp = match result {
Ok(resp) => match resp {
Some(a) => a,
None => {
let err = "Channel closed, can't receive SDP".to_owned();
status = http::StatusCode::INTERNAL_SERVER_ERROR;
return (status, headermap, Some(err.into()));
}
},
Err(e) => {
let err = match e {
WaitError::FutureAborted => "Aborted".to_owned(),
WaitError::FutureError(err) => err.to_string(),
};
status = http::StatusCode::INTERNAL_SERVER_ERROR;
return (status, headermap, Some(err.into()));
}
};
let settings = self.settings.lock().unwrap();
let mut links = vec![];
if let Some(stun) = &settings.stun_server {
match build_link_header(stun.as_str()) {
Ok(stun_link) => {
links.push((
http::header::LINK,
http::HeaderValue::from_str(stun_link.as_str()).unwrap(),
));
}
Err(e) => {
gst::error!(CAT, imp = self, "Failed to parse {stun:?} : {e:?}");
}
}
}
if !settings.turn_servers.is_empty() {
for turn_server in settings.turn_servers.iter() {
if let Ok(turn) = turn_server.get::<String>() {
gst::debug!(CAT, imp = self, "turn server: {}", turn.as_str());
match build_link_header(turn.as_str()) {
Ok(turn_link) => {
links.push((
http::header::LINK,
http::HeaderValue::from_str(turn_link.as_str()).unwrap(),
));
}
Err(e) => {
gst::error!(CAT, imp = self, "Failed to parse {turn_server:?} : {e:?}");
}
}
} else {
gst::error!(
CAT,
imp = self,
"Failed to get String value of {turn_server:?}"
);
}
}
}
let sdp_text = if let Some(sdp) = response_sdp {
match sdp.as_text() {
Ok(text) => {
gst::debug!(CAT, imp = self, "{text:?}");
Ok(text)
}
Err(e) => {
gst::error!(CAT, imp = self, "{e:?}");
Err(format!("Failed to get SDP answer: {e:?}"))
}
}
} else {
let e = "SDP Answer is empty!".to_string();
gst::error!(CAT, imp = self, "{e:?}");
Err(e)
};
match sdp_text {
Ok(sdp) => {
let resource_url = "/".to_owned() + ROOT + "/" + RESOURCE_PATH + "/" + &session_id;
headermap.push((
http::header::CONTENT_TYPE,
HeaderValue::from_str(CONTENT_SDP).unwrap(),
));
headermap.push((
HeaderName::from_static("location"),
HeaderValue::from_str(resource_url.as_str()).unwrap(),
));
headermap.append(&mut links);
(status, headermap, Some(sdp.into()))
}
Err(e) => {
status = http::StatusCode::INTERNAL_SERVER_ERROR;
(status, headermap, Some(e.into()))
}
}
}
async fn post_handler(
&self,
body: &[u8],
id: Option<String>,
) -> Result<impl warp::reply::Reply + use<>, warp::Rejection> {
let (status, mut headermap, body) = self.create_session(body, id.clone()).await;
let mut response_builder = http::Response::builder().status(status);
for (key, value) in headermap.drain(..) {
response_builder = response_builder.header(key, value);
}
Ok(response_builder.body(body).unwrap())
}
fn serve(&self) -> Option<tokio::task::JoinHandle<()>> {
let mut settings = self.settings.lock().unwrap();
let host_addr = settings.host_addr.as_ref()?;
let addr: SocketAddr;
match host_addr.socket_addrs(|| None) {
Ok(v) => {
addr = v[0];
gst::info!(CAT, imp = self, "using {addr:?} as address");
}
Err(e) => {
gst::error!(CAT, imp = self, "error getting addr from uri {e:?}");
self.obj()
.emit_by_name::<()>("error", &[&format!("Unable to start WHEP Server: {e:?}")]);
return None;
}
}
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
settings.shutdown_signal = Some(tx);
drop(settings);
let api = self.filter();
let jh = RUNTIME.spawn(async move {
warp::serve(api)
.bind(addr)
.await
.graceful(async move {
match rx.await {
Ok(_) => gst::debug!(CAT, "Server shut down signal received"),
Err(e) => gst::error!(CAT, "{e:?}: Sender dropped"),
}
})
.run()
.await;
gst::debug!(CAT, "Stopped the server task...");
});
gst::debug!(CAT, imp = self, "Started the server...");
Some(jh)
}
fn set_host_addr(&self, host_addr: Option<&str>) -> Result<(), url::ParseError> {
let mut settings = self.settings.lock().unwrap();
if let Some(host_addr) = host_addr {
if !host_addr.is_empty() {
settings.host_addr = Some(Url::parse(host_addr)?);
} else {
settings.host_addr = None;
}
} else {
settings.host_addr = None;
}
Ok(())
}
fn filter(
&self,
) -> impl Filter<Extract = impl warp::Reply + use<>> + Clone + Send + Sync + 'static + use<>
{
let prefix = warp::path(ROOT);
let post_filter = warp::post()
.and(warp::path(ENDPOINT_PATH))
.and(warp::path::end())
.and(warp::header::exact(
http::header::CONTENT_TYPE.as_str(),
CONTENT_SDP,
))
.and(warp::body::bytes())
.and_then(glib::clone!(
#[weak(rename_to = s)]
self,
#[upgrade_or_panic]
move |body: warp::hyper::body::Bytes| async move {
s.post_handler(body.as_ref(), None).await
}
));
let post_filter_with_id = warp::post()
.and(warp::path(ENDPOINT_PATH))
.and(warp::path::param::<String>())
.and(warp::path::end())
.and(warp::header::exact(
http::header::CONTENT_TYPE.as_str(),
CONTENT_SDP,
))
.and(warp::body::bytes())
.and_then(glib::clone!(
#[weak(rename_to = self_)]
self,
#[upgrade_or_panic]
move |id, body: warp::hyper::body::Bytes| async move {
self_.post_handler(body.as_ref(), Some(id)).await
}
));
let options_filter = warp::options()
.and(warp::path(ENDPOINT_PATH))
.and(warp::path::end())
.and_then(glib::clone!(
#[weak(rename_to = s)]
self,
#[upgrade_or_panic]
move || async move { s.options_handler().await }
));
let patch_filter = warp::patch()
.and(warp::path(RESOURCE_PATH))
.and(warp::path::param::<String>())
.and(warp::path::end())
.and(warp::body::bytes())
.and(warp::header::headers_cloned())
.and_then(glib::clone!(
#[weak(rename_to = s)]
self,
#[upgrade_or_panic]
move |id, body: warp::hyper::body::Bytes, headers| async move {
s.patch_handler(id, body.as_ref(), headers).await
}
));
let delete_filter = warp::delete()
.and(warp::path(RESOURCE_PATH))
.and(warp::path::param::<String>())
.and(warp::path::end())
.and_then(glib::clone!(
#[weak(rename_to = s)]
self,
#[upgrade_or_panic]
move |id| async move { s.delete_handler(id).await }
));
prefix
.and(post_filter.or(post_filter_with_id))
.or(prefix.and(options_filter))
.or(prefix.and(patch_filter))
.or(prefix.and(delete_filter))
}
}
impl SignallableImpl for WhepServer {
fn start(&self) {
gst::info!(CAT, imp = self, "starting the WHEP server");
let jh = self.serve();
let mut settings = self.settings.lock().unwrap();
settings.server_handle = jh;
}
fn stop(&self) {
let mut settings = self.settings.lock().unwrap();
let handle = settings.server_handle.take();
if let Some(tx) = settings.shutdown_signal.take()
&& tx.send(()).is_err()
{
gst::error!(
CAT,
imp = self,
"Failed to send shutdown signal. Receiver dropped"
);
}
if let Some(handle) = handle {
gst::debug!(CAT, imp = self, "Await server handle to join");
RUNTIME.block_on(async {
if let Err(e) = handle.await {
gst::error!(CAT, imp = self, "Failed to join server handle: {e:?}");
};
});
gst::info!(CAT, imp = self, "stopped the WHEP server");
}
}
fn end_session(&self, _session_id: &str) {
}
}
#[glib::object_subclass]
impl ObjectSubclass for WhepServer {
const NAME: &'static str = "GstWhepServerSignaller";
type Type = super::WhepServerSignaller;
type ParentType = glib::Object;
type Interfaces = (Signallable,);
}
impl ObjectImpl for WhepServer {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![
glib::ParamSpecString::builder("host-addr")
.nick("Host address")
.blurb("The host address of the WHEP endpoint e.g., http://127.0.0.1:9090")
.default_value(DEFAULT_HOST_ADDR)
.flags(glib::ParamFlags::READWRITE)
.build(),
glib::ParamSpecString::builder("stun-server")
.nick("STUN Server")
.blurb("The STUN server of the form stun://hostname:port")
.default_value(DEFAULT_STUN_SERVER)
.build(),
gst::ParamSpecArray::builder("turn-servers")
.nick("List of TURN Servers to use")
.blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">")
.element_spec(&glib::ParamSpecString::builder("turn-server")
.nick("TURN Server")
.blurb("The TURN server of the form turn(s)://username:password@host:port.")
.build()
)
.mutable_ready()
.build(),
glib::ParamSpecUInt::builder("timeout")
.nick("Timeout")
.blurb("Value in seconds to timeout WHEP endpoint requests (0 = No timeout).")
.maximum(3600)
.default_value(DEFAULT_TIMEOUT)
.build(),
glib::ParamSpecBoolean::builder("manual-sdp-munging")
.nick("Manual SDP munging")
.blurb("Whether the signaller manages SDP munging itself")
.default_value(false)
.read_only()
.build(),
glib::ParamSpecBoolean::builder("send-counter-offer")
.nick("Send Counter offer")
.blurb("Reject the offer sent by the WHEP player and propose a counter offer")
.default_value(DEFAULT_SEND_COUNTER_OFFER)
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"host-addr" => {
if let Err(e) =
self.set_host_addr(value.get::<Option<&str>>().expect("type checked upstream"))
{
gst::error!(
CAT,
"Couldn't set the host address as {e:?}, fallback to the default value {DEFAULT_HOST_ADDR:?}"
);
}
}
"stun-server" => {
let mut settings = self.settings.lock().unwrap();
settings.stun_server = value
.get::<Option<String>>()
.expect("type checked upstream")
}
"turn-servers" => {
let mut settings = self.settings.lock().unwrap();
settings.turn_servers = value.get::<gst::Array>().expect("type checked upstream")
}
"timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.timeout = value.get().unwrap();
}
"send-counter-offer" => {
let mut settings = self.settings.lock().unwrap();
settings.send_counter_offer = value.get().unwrap();
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"host-addr" => settings
.host_addr
.as_ref()
.map(|host_addr| host_addr.to_string())
.to_value(),
"stun-server" => settings.stun_server.to_value(),
"turn-servers" => settings.turn_servers.to_value(),
"timeout" => settings.timeout.to_value(),
"manual-sdp-munging" => false.to_value(),
"send-counter-offer" => settings.send_counter_offer.to_value(),
_ => unimplemented!(),
}
}
fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: LazyLock<Vec<glib::subclass::Signal>> = LazyLock::new(|| {
vec![
glib::subclass::Signal::builder("post")
.action()
.class_handler(|args| {
let signaller = args[0]
.get::<super::WhepServerSignaller>()
.expect("signal arg");
let id = args[1]
.get::<Option<String>>()
.expect("ID as string as first parameter");
let request_body = args[2]
.get::<glib::Bytes>()
.expect("request body as GBytes as second parameter");
let promise = args[3]
.get::<gst::Promise>()
.expect("GstPromise as third parameter");
RUNTIME.spawn(async move {
let (status, mut headers, body) =
signaller.imp().create_session(&request_body, id).await;
let mut headers_builder =
gst::Structure::builder("whep-signaller/headers");
for (header, value) in headers.drain(..) {
let value = value.to_str().expect(
"Header value should contain only visible ASCII strings",
);
headers_builder = headers_builder.field(header.to_string(), value);
}
let mut reply_builder =
gst::Structure::builder("whep-signaller/response")
.field("status", status.as_u16() as u32)
.field("headers", headers_builder.build());
if let Some(body) = body {
reply_builder =
reply_builder.field("body", glib::Bytes::from(&body));
}
let reply = reply_builder.build();
gst::log!(CAT, obj = signaller, "replying to promise with {reply:?}");
promise.reply(Some(reply));
});
None
})
.param_types([
String::static_type(),
glib::Bytes::static_type(),
gst::Promise::static_type(),
])
.return_type::<()>()
.build(),
glib::subclass::Signal::builder("delete")
.action()
.class_handler(|args| {
let signaller = args[0]
.get::<super::WhepServerSignaller>()
.expect("signal arg");
let id = args[1]
.get::<String>()
.expect("ID as string as first parameter");
let promise = args[2]
.get::<gst::Promise>()
.expect("GstPromise as third parameter");
RUNTIME.spawn(async move {
let status = signaller.imp().delete_session(id).await;
let reply = gst::Structure::builder("whep-signaller/response")
.field("status", status.as_u16() as u32)
.build();
gst::log!(CAT, obj = signaller, "replying to promise with {reply:?}");
promise.reply(Some(reply));
});
None
})
.param_types([
String::static_type(),
gst::Promise::static_type(),
])
.return_type::<()>()
.build(),
glib::subclass::Signal::builder("patch")
.action()
.class_handler(|args| {
let signaller = args[0]
.get::<super::WhepServerSignaller>()
.expect("signal arg");
let id = args[1]
.get::<String>()
.expect("ID as string as first parameter");
let request_body = args[2]
.get::<glib::Bytes>()
.expect("request body as GBytes as second parameter");
let headers = args[3]
.get::<gst::Structure>()
.expect("request body as GstStructure as third parameter");
let promise = args[3]
.get::<gst::Promise>()
.expect("GstPromise as fourth parameter");
RUNTIME.spawn(async move {
let mut headermap: Vec<(Option<HeaderName>, HeaderValue)> = vec![];
for (name, value) in headers.iter() {
if let Ok(value) = value.get::<String>() {
let Ok(headername) = HeaderName::from_bytes(name.as_str().as_bytes()) else {
continue;
};
let Ok(headervalue) = HeaderValue::from_str(value.as_str()) else {
continue;
};
headermap.push((Some(headername), headervalue));
}
}
let status = signaller.imp().patch_session(id, &request_body, headermap).await;
let reply =
gst::Structure::builder("whep-signaller/response")
.field("status", status.as_u16() as u32)
.build();
gst::log!(CAT, obj = signaller, "replying to promise with {reply:?}");
promise.reply(Some(reply));
});
None
})
.param_types([
String::static_type(),
glib::Bytes::static_type(),
gst::Structure::static_type(),
gst::Promise::static_type(),
])
.return_type::<()>()
.build(),
]
});
SIGNALS.as_ref()
}
}