use super::router::RouterConfig;
use super::turn::TurnConfig;
use webrtc::api::setting_engine::SettingEngine;
use webrtc::peer_connection::configuration::RTCConfiguration;
use super::data_channel::DataChannel;
use super::session::{Session, SessionLocal};
use anyhow::Result;
use std::collections::HashMap;
use std::time::Duration;
use tokio::net::UdpSocket;
use tokio::sync::Mutex;
use turn::server::Server as TurnServer;
use webrtc::ice_transport::ice_candidate_type::RTCIceCandidateType;
use webrtc::ice_transport::ice_credential_type::RTCIceCredentialType;
use webrtc::ice_transport::ice_server::RTCIceServer;
use super::peer::SessionProvider;
use crate::buffer::factory::AtomicFactory;
use serde::Deserialize;
use std::sync::Arc;
use turn::auth::AuthHandler;
use webrtc::peer_connection::policy::sdp_semantics::RTCSdpSemantics;
use webrtc_ice::mdns::MulticastDnsMode;
use webrtc_ice::udp_mux::*;
use webrtc_ice::udp_network::*;
use super::errors::ConfigError;
use async_trait::async_trait;
use std::fs;
#[derive(Clone, Deserialize)]
struct ICEServerConfig {
urls: Vec<String>,
user_name: String,
credential: String,
}
#[derive(Clone, Default, Deserialize)]
struct Candidates {
#[serde(rename = "icelite")]
ice_lite: Option<bool>,
#[serde(rename = "nat1to1ips")]
nat1_to_1ips: Option<Vec<String>>,
}
#[derive(Default)]
pub struct WebRTCTransportConfig {
pub configuration: RTCConfiguration,
pub setting: SettingEngine,
pub router: RouterConfig,
pub factory: Arc<Mutex<AtomicFactory>>,
}
#[derive(Clone, Default, Deserialize)]
struct WebRTCTimeoutsConfig {
#[serde(rename = "disconnected")]
ice_disconnected_timeout: i32,
#[serde(rename = "failed")]
ice_failed_timeout: i32,
#[serde(rename = "keepalive")]
ice_keepalive_interval: i32,
}
#[derive(Clone, Default, Deserialize)]
pub struct WebRTCConfig {
ice_single_port: Option<i32>,
#[serde(rename = "portrange")]
pub ice_port_range: Option<Vec<u16>>,
ice_servers: Option<Vec<ICEServerConfig>>,
candidates: Candidates,
#[serde(rename = "sdpsemantics")]
pub sdp_semantics: String,
#[serde(rename = "mdns")]
mdns: bool,
timeouts: WebRTCTimeoutsConfig,
}
#[derive(Clone, Default, Deserialize)]
struct SFUConfig {
#[allow(dead_code)]
#[serde(rename = "ballast")]
ballast: i64,
#[serde(rename = "withstats")]
with_stats: bool,
}
#[derive(Clone, Default, Deserialize)]
pub struct Config {
sfu: SFUConfig,
router: RouterConfig,
pub webrtc: WebRTCConfig,
turn: TurnConfig,
#[serde(skip_deserializing)]
turn_auth: Option<Arc<dyn AuthHandler + Send + Sync>>,
}
pub fn load(cfg_path: &String) -> Result<Config, ConfigError> {
let content = fs::read_to_string(cfg_path)?;
let decoded_config = toml::from_str(&content[..]).unwrap();
Ok(decoded_config)
}
#[derive(Default)]
pub struct SFU {
webrtc: Arc<WebRTCTransportConfig>,
turn: Option<TurnServer>,
sessions: Arc<Mutex<HashMap<String, Arc<dyn Session + Send + Sync>>>>,
data_channels: Arc<Mutex<Vec<Arc<DataChannel>>>>,
#[allow(dead_code)]
with_status: bool,
}
impl WebRTCTransportConfig {
async fn new(c: &Config) -> Result<Self> {
let mut se = SettingEngine::default();
se.disable_media_engine_copy(true);
if let Some(ice_single_port) = c.webrtc.ice_single_port {
let rv = UdpSocket::bind(("0.0.0.0", ice_single_port as u16)).await;
let udp_socket: UdpSocket = match rv {
Ok(sock) => sock,
Err(_) => {
std::process::exit(0);
}
};
let udp_mux = UDPMuxDefault::new(UDPMuxParams::new(udp_socket));
se.set_udp_network(UDPNetwork::Muxed(udp_mux));
} else {
let mut ice_port_start: u16 = 0;
let mut ice_port_end: u16 = 0;
if c.turn.enabled && c.turn.port_range.is_none() {
ice_port_start = super::turn::SFU_MIN_PORT;
ice_port_end = super::turn::SFU_MAX_PORT;
} else if let Some(ice_port_range) = &c.webrtc.ice_port_range {
if ice_port_range.len() == 2 {
ice_port_start = ice_port_range[0];
ice_port_end = ice_port_range[1];
}
}
if ice_port_start != 0 || ice_port_end != 0 {
let ephemeral_udp =
UDPNetwork::Ephemeral(EphemeralUDP::new(ice_port_start, ice_port_end).unwrap());
se.set_udp_network(ephemeral_udp);
}
}
let mut ice_servers: Vec<RTCIceServer> = Vec::new();
if let Some(ice_lite) = c.webrtc.candidates.ice_lite {
if ice_lite {
se.set_lite(ice_lite);
} else if let Some(ice_servers_cfg) = &c.webrtc.ice_servers {
for ice_server in ice_servers_cfg {
let s = RTCIceServer {
urls: ice_server.urls.clone(),
username: ice_server.user_name.clone(),
credential: ice_server.credential.clone(),
credential_type: RTCIceCredentialType::Unspecified,
};
ice_servers.push(s);
}
}
}
let mut _sdp_semantics = RTCSdpSemantics::UnifiedPlan;
match c.webrtc.sdp_semantics.as_str() {
"unified-plan-with-fallback" => {
_sdp_semantics = RTCSdpSemantics::UnifiedPlanWithFallback;
}
"plan-b" => {
_sdp_semantics = RTCSdpSemantics::PlanB;
}
_ => {}
}
if c.webrtc.timeouts.ice_disconnected_timeout == 0
&& c.webrtc.timeouts.ice_failed_timeout == 0
&& c.webrtc.timeouts.ice_keepalive_interval == 0
{
} else {
se.set_ice_timeouts(
Some(Duration::from_secs(
c.webrtc.timeouts.ice_disconnected_timeout as u64,
)),
Some(Duration::from_secs(
c.webrtc.timeouts.ice_failed_timeout as u64,
)),
Some(Duration::from_secs(
c.webrtc.timeouts.ice_keepalive_interval as u64,
)),
);
}
let mut w = WebRTCTransportConfig {
configuration: RTCConfiguration {
ice_servers,
..Default::default()
},
setting: se,
router: c.router.clone(),
factory: Arc::new(Mutex::new(AtomicFactory::new(1000, 1000))),
};
if let Some(nat1toiips) = &c.webrtc.candidates.nat1_to_1ips {
if !nat1toiips.is_empty() {
w.setting
.set_nat_1to1_ips(nat1toiips.clone(), RTCIceCandidateType::Host);
}
}
if c.webrtc.mdns {
w.setting
.set_ice_multicast_dns_mode(MulticastDnsMode::Disabled);
}
if c.sfu.with_stats {
w.router.with_stats = true;
}
Ok(w)
}
}
impl SFU {
pub async fn new(c: Config) -> Result<Self> {
let w = Arc::new(WebRTCTransportConfig::new(&c).await.unwrap());
let with_status = w.router.with_stats;
let mut sfu = SFU {
webrtc: w,
sessions: Arc::new(Mutex::new(HashMap::new())),
with_status,
..Default::default()
};
if c.turn.enabled {
let turn_server = super::turn::init_turn_server(c.turn, c.turn_auth).await?;
sfu.turn = Some(turn_server);
}
Ok(sfu)
}
async fn new_session(&self, id: String) -> Arc<dyn Session + Send + Sync> {
let session =
SessionLocal::new(id.clone(), self.data_channels.clone(), self.webrtc.clone()).await;
let sessions_out = self.sessions.clone();
let id_out = id.clone();
session
.on_close(Box::new(move || {
let sessions_in = sessions_out.clone();
let id_in = id_out.clone();
Box::pin(async move {
sessions_in.lock().await.remove(&id_in);
})
}))
.await;
self.sessions.lock().await.insert(id, session.clone());
session
}
pub async fn new_data_channel(&self, label: String) -> Arc<DataChannel> {
let dc = Arc::new(DataChannel::new(label));
self.data_channels.lock().await.push(dc.clone());
dc
}
}
#[async_trait]
impl SessionProvider for SFU {
async fn get_session(
&self,
sid: String,
) -> (
Option<Arc<dyn Session + Send + Sync>>,
Arc<WebRTCTransportConfig>,
) {
if let Some(session) = self.sessions.lock().await.get(&sid) {
return (Some(session.clone()), self.webrtc.clone());
}
let session = self.new_session(sid).await;
return (Some(session), self.webrtc.clone());
}
}