mod channel;
mod common;
mod payload_channel;
mod utils;
use crate::data_structures::AppData;
use crate::messages::{
RouterInternal, WorkerCloseRequest, WorkerCreateRouterRequest, WorkerDumpRequest,
WorkerUpdateSettingsRequest,
};
use crate::ortc::RtpCapabilitiesError;
use crate::router::{Router, RouterId, RouterOptions};
use crate::worker::channel::BufferMessagesGuard;
pub use crate::worker::utils::ExitError;
use crate::worker_manager::WorkerManager;
use crate::{ortc, uuid_based_wrapper_type};
use async_executor::Executor;
pub(crate) use channel::Channel;
pub(crate) use common::{SubscriptionHandler, SubscriptionTarget};
use event_listener_primitives::{Bag, BagOnce, HandlerId};
use futures_lite::FutureExt;
use log::*;
use parking_lot::Mutex;
pub(crate) use payload_channel::{NotificationError, NotificationMessage, PayloadChannel};
use serde::{Deserialize, Serialize};
use std::ops::RangeInclusive;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{fmt, io};
use thiserror::Error;
use utils::WorkerRunResult;
uuid_based_wrapper_type!(
WorkerId
);
#[derive(Debug, Error, Eq, PartialEq)]
pub enum RequestError {
#[error("Channel already closed")]
ChannelClosed,
#[error("Message is too long")]
MessageTooLong,
#[error("Payload is too long")]
PayloadTooLong,
#[error("Request timed out")]
TimedOut,
#[error("Received response error: {reason}")]
Response {
reason: String,
},
#[error("Failed to parse response from worker: {error}")]
FailedToParse {
error: String,
},
#[error("Worker did not return any data in response")]
NoData,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum WorkerLogLevel {
Debug,
Warn,
Error,
None,
}
impl Default for WorkerLogLevel {
fn default() -> Self {
Self::Error
}
}
impl WorkerLogLevel {
fn as_str(&self) -> &'static str {
match self {
Self::Debug => "debug",
Self::Warn => "warn",
Self::Error => "error",
Self::None => "none",
}
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum WorkerLogTag {
Info,
Ice,
Dtls,
Rtp,
Srtp,
Rtcp,
Rtx,
Bwe,
Score,
Simulcast,
Svc,
Sctp,
Message,
}
impl WorkerLogTag {
fn as_str(&self) -> &'static str {
match self {
Self::Info => "info",
Self::Ice => "ice",
Self::Dtls => "dtls",
Self::Rtp => "rtp",
Self::Srtp => "srtp",
Self::Rtcp => "rtcp",
Self::Rtx => "rtx",
Self::Bwe => "bwe",
Self::Score => "score",
Self::Simulcast => "simulcast",
Self::Svc => "svc",
Self::Sctp => "sctp",
Self::Message => "message",
}
}
}
#[derive(Debug, Clone)]
pub struct WorkerDtlsFiles {
pub certificate: PathBuf,
pub private_key: PathBuf,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct WorkerSettings {
pub log_level: WorkerLogLevel,
pub log_tags: Vec<WorkerLogTag>,
pub rtc_ports_range: RangeInclusive<u16>,
pub dtls_files: Option<WorkerDtlsFiles>,
pub app_data: AppData,
}
impl Default for WorkerSettings {
fn default() -> Self {
Self {
log_level: WorkerLogLevel::default(),
log_tags: Vec::new(),
rtc_ports_range: 10000..=59999,
dtls_files: None,
app_data: AppData::default(),
}
}
}
#[derive(Default, Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct WorkerUpdateSettings {
pub log_level: Option<WorkerLogLevel>,
pub log_tags: Option<Vec<WorkerLogTag>>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
#[non_exhaustive]
pub struct WorkerDump {
pub router_ids: Vec<RouterId>,
}
#[derive(Debug, Error, Eq, PartialEq)]
pub enum CreateRouterError {
#[error("RTP capabilities generation error: {0}")]
FailedRtpCapabilitiesGeneration(RtpCapabilitiesError),
#[error("Request to worker failed: {0}")]
Request(RequestError),
}
#[derive(Default)]
struct Handlers {
new_router: Bag<Box<dyn Fn(&Router) + Send + Sync>>,
#[allow(clippy::type_complexity)]
dead: BagOnce<Box<dyn FnOnce(Result<(), ExitError>) + Send>>,
close: BagOnce<Box<dyn FnOnce() + Send>>,
}
struct Inner {
id: WorkerId,
channel: Channel,
payload_channel: PayloadChannel,
executor: Arc<Executor<'static>>,
handlers: Handlers,
app_data: AppData,
closed: Arc<AtomicBool>,
_worker_manager: WorkerManager,
}
impl Drop for Inner {
fn drop(&mut self) {
debug!("drop()");
self.close();
}
}
impl Inner {
async fn new(
executor: Arc<Executor<'static>>,
WorkerSettings {
app_data,
log_level,
log_tags,
rtc_ports_range,
dtls_files,
}: WorkerSettings,
worker_manager: WorkerManager,
) -> io::Result<Arc<Self>> {
debug!("new()");
let mut spawn_args: Vec<String> = vec!["".to_string()];
spawn_args.push(format!("--logLevel={}", log_level.as_str()));
for log_tag in log_tags {
spawn_args.push(format!("--logTag={}", log_tag.as_str()));
}
if rtc_ports_range.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid RTC ports range",
));
}
spawn_args.push(format!("--rtcMinPort={}", rtc_ports_range.start()));
spawn_args.push(format!("--rtcMaxPort={}", rtc_ports_range.end()));
if let Some(dtls_files) = dtls_files {
spawn_args.push(format!(
"--dtlsCertificateFile={}",
dtls_files
.certificate
.to_str()
.expect("Paths are only expected to be utf8")
));
spawn_args.push(format!(
"--dtlsPrivateKeyFile={}",
dtls_files
.private_key
.to_str()
.expect("Paths are only expected to be utf8")
));
}
let id = WorkerId::new();
debug!(
"spawning worker with arguments [id:{}]: {}",
id,
spawn_args.join(" ")
);
let WorkerRunResult {
channel,
payload_channel,
status_receiver,
buffer_worker_messages_guard,
} = utils::run_worker_with_channels(id, Arc::clone(&executor), spawn_args);
let handlers = Handlers::default();
let mut inner = Self {
id,
channel,
payload_channel,
executor,
handlers,
app_data,
closed: Arc::new(AtomicBool::new(false)),
_worker_manager: worker_manager,
};
inner.setup_message_handling();
let (mut early_status_sender, early_status_receiver) = async_oneshot::oneshot();
let inner = Arc::new(inner);
{
let inner_weak = Arc::downgrade(&inner);
inner
.executor
.spawn(async move {
let status = status_receiver.await.unwrap_or(Err(ExitError::Unexpected));
let _ = early_status_sender.send(status);
if let Some(inner) = inner_weak.upgrade() {
warn!("worker exited [id:{}]: {:?}", id, status);
if !inner.closed.swap(true, Ordering::SeqCst) {
inner.handlers.dead.call(|callback| {
callback(status);
});
inner.handlers.close.call_simple();
}
}
})
.detach();
}
inner
.wait_for_worker_ready(buffer_worker_messages_guard)
.or(async {
let status = early_status_receiver
.await
.unwrap_or(Err(ExitError::Unexpected));
let error_message = format!(
"worker thread exited before being ready [id:{}]: exit status {:?}",
inner.id, status,
);
Err(io::Error::new(io::ErrorKind::NotFound, error_message))
})
.await?;
Ok(inner)
}
async fn wait_for_worker_ready(
&self,
buffer_worker_messages_guard: BufferMessagesGuard,
) -> io::Result<()> {
#[derive(Deserialize)]
#[serde(tag = "event", rename_all = "lowercase")]
enum Notification {
Running,
}
let (sender, receiver) = async_oneshot::oneshot();
let id = self.id;
let sender = Mutex::new(Some(sender));
let _handler = self.channel.subscribe_to_notifications(
std::process::id().into(),
move |notification| {
let result = match serde_json::from_value(notification.clone()) {
Ok(Notification::Running) => {
debug!("worker thread running [id:{}]", id);
Ok(())
}
Err(error) => Err(io::Error::new(
io::ErrorKind::Other,
format!(
"unexpected first notification from worker [id:{}]: {:?}; error = {}",
id, notification, error
),
)),
};
let _ = sender
.lock()
.take()
.expect("Receiving more than one worker notification")
.send(result);
},
);
drop(buffer_worker_messages_guard);
receiver.await.map_err(|_closed| {
io::Error::new(io::ErrorKind::Other, "Worker dropped before it is ready")
})?
}
fn setup_message_handling(&mut self) {
let channel_receiver = self.channel.get_internal_message_receiver();
let payload_channel_receiver = self.payload_channel.get_internal_message_receiver();
let id = self.id;
let closed = Arc::clone(&self.closed);
self.executor
.spawn(async move {
while let Ok(message) = channel_receiver.recv().await {
match message {
channel::InternalMessage::Debug(text) => debug!("[id:{}] {}", id, text),
channel::InternalMessage::Warn(text) => warn!("[id:{}] {}", id, text),
channel::InternalMessage::Error(text) => {
if !closed.load(Ordering::SeqCst) {
error!("[id:{}] {}", id, text)
}
}
channel::InternalMessage::Dump(text) => eprintln!("{}", text),
channel::InternalMessage::Unexpected(data) => error!(
"worker[id:{}] unexpected channel data: {}",
id,
String::from_utf8_lossy(&data)
),
}
}
})
.detach();
self.executor
.spawn(async move {
while let Ok(message) = payload_channel_receiver.recv().await {
match message {
payload_channel::InternalMessage::UnexpectedData(data) => error!(
"worker[id:{}] unexpected payload channel data: {}",
id,
String::from_utf8_lossy(&data)
),
}
}
})
.detach();
}
fn close(&self) {
let already_closed = self.closed.swap(true, Ordering::SeqCst);
let channel = self.channel.clone();
let payload_channel = self.payload_channel.clone();
self.executor
.spawn(async move {
let _ = channel.request(WorkerCloseRequest {}).await;
drop(channel);
drop(payload_channel);
})
.detach();
if !already_closed {
self.handlers.close.call_simple();
}
}
}
#[derive(Clone)]
pub struct Worker {
inner: Arc<Inner>,
}
impl fmt::Debug for Worker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Worker")
.field("id", &self.inner.id)
.field("closed", &self.inner.closed)
.finish()
}
}
impl Worker {
pub(super) async fn new(
executor: Arc<Executor<'static>>,
worker_settings: WorkerSettings,
worker_manager: WorkerManager,
) -> io::Result<Self> {
let inner = Inner::new(executor, worker_settings, worker_manager).await?;
Ok(Self { inner })
}
pub fn id(&self) -> WorkerId {
self.inner.id
}
pub fn app_data(&self) -> &AppData {
&self.inner.app_data
}
pub fn closed(&self) -> bool {
self.inner.closed.load(Ordering::SeqCst)
}
#[doc(hidden)]
pub async fn dump(&self) -> Result<WorkerDump, RequestError> {
debug!("dump()");
self.inner.channel.request(WorkerDumpRequest {}).await
}
pub async fn update_settings(&self, data: WorkerUpdateSettings) -> Result<(), RequestError> {
debug!("update_settings()");
self.inner
.channel
.request(WorkerUpdateSettingsRequest { data })
.await
}
pub async fn create_router(
&self,
router_options: RouterOptions,
) -> Result<Router, CreateRouterError> {
debug!("create_router()");
let RouterOptions {
app_data,
media_codecs,
} = router_options;
let rtp_capabilities = ortc::generate_router_rtp_capabilities(media_codecs)
.map_err(CreateRouterError::FailedRtpCapabilitiesGeneration)?;
let router_id = RouterId::new();
let internal = RouterInternal { router_id };
let _buffer_guard = self.inner.channel.buffer_messages_for(router_id.into());
self.inner
.channel
.request(WorkerCreateRouterRequest { internal })
.await
.map_err(CreateRouterError::Request)?;
let router = Router::new(
router_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
self.inner.payload_channel.clone(),
rtp_capabilities,
app_data,
self.clone(),
);
self.inner.handlers.new_router.call(|callback| {
callback(&router);
});
Ok(router)
}
pub fn on_new_router<F: Fn(&Router) + Send + Sync + 'static>(&self, callback: F) -> HandlerId {
self.inner.handlers.new_router.add(Box::new(callback))
}
pub fn on_dead<F: FnOnce(Result<(), ExitError>) + Send + Sync + 'static>(
&self,
callback: F,
) -> HandlerId {
self.inner.handlers.dead.add(Box::new(callback))
}
pub fn on_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
let handler_id = self.inner.handlers.close.add(Box::new(callback));
if self.inner.closed.load(Ordering::Relaxed) {
self.inner.handlers.close.call_simple();
}
handler_id
}
#[cfg(test)]
pub(crate) fn close(&self) {
self.inner.close();
}
}