mod channel;
mod common;
mod payload_channel;
mod utils;
use crate::data_structures::AppData;
use crate::messages::{
RouterInternal, WorkerCreateRouterRequest, WorkerDumpRequest, WorkerGetResourceRequest,
WorkerUpdateSettingsRequest,
};
use crate::ortc;
use crate::ortc::RtpCapabilitiesError;
use crate::router::{Router, RouterId, RouterOptions};
use crate::worker_manager::WorkerManager;
use async_executor::Executor;
use async_process::{Child, Command, ExitStatus, Stdio};
pub(crate) use channel::Channel;
pub(crate) use common::{SubscriptionHandler, SubscriptionTarget};
use event_listener_primitives::{Bag, BagOnce, HandlerId};
use futures_lite::io::BufReader;
use futures_lite::{future, AsyncBufReadExt, StreamExt};
use log::*;
use parking_lot::Mutex;
pub(crate) use payload_channel::{NotificationError, NotificationMessage, PayloadChannel};
use serde::{Deserialize, Serialize};
use std::ffi::OsString;
use std::ops::RangeInclusive;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{env, io};
use thiserror::Error;
use utils::SpawnResult;
#[derive(Debug, Error, Eq, PartialEq)]
pub enum RequestError {
#[error("Channel already closed")]
ChannelClosed,
#[error("Message is too long")]
MessageTooLong,
#[error("Payload is too long")]
PayloadTooLong,
#[error("Request timed out")]
TimedOut,
#[error("Received response error: {reason}")]
Response { reason: String },
#[error("Failed to parse response from worker: {error}")]
FailedToParse { error: String },
#[error("Worker did not return any data in response")]
NoData,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum WorkerLogLevel {
Debug,
Warn,
Error,
None,
}
impl Default for WorkerLogLevel {
fn default() -> Self {
Self::Error
}
}
impl WorkerLogLevel {
fn as_str(&self) -> &'static str {
match self {
Self::Debug => "debug",
Self::Warn => "warn",
Self::Error => "error",
Self::None => "none",
}
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum WorkerLogTag {
Info,
Ice,
Dtls,
Rtp,
Srtp,
Rtcp,
Rtx,
Bwe,
Score,
Simulcast,
Svc,
Sctp,
Message,
}
impl WorkerLogTag {
fn as_str(&self) -> &'static str {
match self {
Self::Info => "info",
Self::Ice => "ice",
Self::Dtls => "dtls",
Self::Rtp => "rtp",
Self::Srtp => "srtp",
Self::Rtcp => "rtcp",
Self::Rtx => "rtx",
Self::Bwe => "bwe",
Self::Score => "score",
Self::Simulcast => "simulcast",
Self::Svc => "svc",
Self::Sctp => "sctp",
Self::Message => "message",
}
}
}
#[derive(Debug, Clone)]
pub struct WorkerDtlsFiles {
pub certificate: PathBuf,
pub private_key: PathBuf,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct WorkerSettings {
pub app_data: AppData,
pub log_level: WorkerLogLevel,
pub log_tags: Vec<WorkerLogTag>,
pub rtc_ports_range: RangeInclusive<u16>,
pub dtls_files: Option<WorkerDtlsFiles>,
}
impl Default for WorkerSettings {
fn default() -> Self {
Self {
app_data: AppData::default(),
log_level: WorkerLogLevel::default(),
log_tags: Vec::new(),
rtc_ports_range: 10000..=59999,
dtls_files: None,
}
}
}
#[derive(Default, Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct WorkerUpdateSettings {
pub log_level: Option<WorkerLogLevel>,
pub log_tags: Option<Vec<WorkerLogTag>>,
}
#[derive(Debug, Copy, Clone, Deserialize)]
#[non_exhaustive]
pub struct WorkerResourceUsage {
pub ru_utime: u64,
pub ru_stime: u64,
pub ru_maxrss: u64,
pub ru_ixrss: u64,
pub ru_idrss: u64,
pub ru_isrss: u64,
pub ru_minflt: u64,
pub ru_majflt: u64,
pub ru_nswap: u64,
pub ru_inblock: u64,
pub ru_oublock: u64,
pub ru_msgsnd: u64,
pub ru_msgrcv: u64,
pub ru_nsignals: u64,
pub ru_nvcsw: u64,
pub ru_nivcsw: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
#[non_exhaustive]
pub struct WorkerDump {
pub pid: u32,
pub router_ids: Vec<RouterId>,
}
#[derive(Debug, Error, Eq, PartialEq)]
pub enum CreateRouterError {
#[error("RTP capabilities generation error: {0}")]
FailedRtpCapabilitiesGeneration(RtpCapabilitiesError),
#[error("Request to worker failed: {0}")]
Request(RequestError),
}
#[derive(Default)]
struct Handlers {
new_router: Bag<Box<dyn Fn(&Router) + Send + Sync>>,
dead: BagOnce<Box<dyn FnOnce(ExitStatus) + Send>>,
close: BagOnce<Box<dyn FnOnce() + Send>>,
}
struct Inner {
channel: Channel,
payload_channel: PayloadChannel,
child: Child,
executor: Arc<Executor<'static>>,
pid: u32,
handlers: Handlers,
app_data: AppData,
closed: Arc<AtomicBool>,
_worker_manager: WorkerManager,
}
impl Drop for Inner {
fn drop(&mut self) {
debug!("drop()");
let already_closed = self.closed.swap(true, Ordering::SeqCst);
if matches!(self.child.try_status(), Ok(None)) {
unsafe {
libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
}
}
if !already_closed {
self.handlers.close.call_simple();
}
}
}
impl Inner {
async fn new(
executor: Arc<Executor<'static>>,
worker_binary: PathBuf,
WorkerSettings {
app_data,
log_level,
log_tags,
rtc_ports_range,
dtls_files,
}: WorkerSettings,
worker_manager: WorkerManager,
) -> io::Result<Arc<Self>> {
debug!("new()");
let mut spawn_args: Vec<OsString> = Vec::new();
let spawn_bin: PathBuf = match env::var("MEDIASOUP_USE_VALGRIND") {
Ok(value) if value.as_str() == "true" => {
let binary = match env::var("MEDIASOUP_VALGRIND_BIN") {
Ok(binary) => binary.into(),
_ => "valgrind".into(),
};
spawn_args.push(worker_binary.into_os_string());
binary
}
_ => worker_binary,
};
spawn_args.push(format!("--logLevel={}", log_level.as_str()).into());
for log_tag in log_tags {
spawn_args.push(format!("--logTag={}", log_tag.as_str()).into());
}
if rtc_ports_range.is_empty() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid RTC ports range",
));
}
spawn_args.push(format!("--rtcMinPort={}", rtc_ports_range.start()).into());
spawn_args.push(format!("--rtcMaxPort={}", rtc_ports_range.end()).into());
if let Some(dtls_files) = dtls_files {
{
let mut arg = OsString::new();
arg.push("--dtlsCertificateFile=");
arg.push(dtls_files.certificate);
spawn_args.push(arg);
}
{
let mut arg = OsString::new();
arg.push("--dtlsPrivateKeyFile=");
arg.push(dtls_files.private_key);
spawn_args.push(arg);
}
}
debug!(
"spawning worker process: {} {}",
spawn_bin.to_string_lossy(),
spawn_args
.iter()
.map(|arg| arg.to_string_lossy())
.collect::<Vec<_>>()
.join(" ")
);
let mut command = Command::new(spawn_bin);
command
.args(spawn_args)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env("MEDIASOUP_VERSION", env!("CARGO_PKG_VERSION"));
let SpawnResult {
child,
channel,
payload_channel,
} = utils::spawn_with_worker_channels(Arc::clone(&executor), &mut command)?;
let pid = child.id();
let handlers = Handlers::default();
let mut inner = Self {
channel,
payload_channel,
child,
executor,
pid,
handlers,
app_data,
closed: Arc::new(AtomicBool::new(false)),
_worker_manager: worker_manager,
};
inner.setup_output_forwarding();
inner.setup_message_handling();
inner.wait_for_worker_process().await?;
let status_fut = inner.child.status();
let inner = Arc::new(inner);
{
let inner_weak = Arc::downgrade(&inner);
inner
.executor
.spawn(async move {
let status = status_fut.await;
if let Some(inner) = inner_weak.upgrade() {
if let Ok(exit_status) = status {
warn!("exit status {}", exit_status);
if !inner.closed.swap(true, Ordering::SeqCst) {
inner.handlers.dead.call(|callback| {
callback(exit_status);
});
inner.handlers.close.call_simple();
}
}
}
})
.detach();
}
Ok(inner)
}
fn setup_output_forwarding(&mut self) {
let stdout = self.child.stdout.take().unwrap();
self.executor
.spawn(async move {
let mut lines = BufReader::new(stdout).lines();
while let Some(Ok(line)) = lines.next().await {
debug!("(stdout) {}", line);
}
})
.detach();
let stderr = self.child.stderr.take().unwrap();
let closed = Arc::clone(&self.closed);
self.executor
.spawn(async move {
let mut lines = BufReader::new(stderr).lines();
while let Some(Ok(line)) = lines.next().await {
if !closed.load(Ordering::SeqCst) {
error!("(stderr) {}", line);
}
}
})
.detach();
}
async fn wait_for_worker_process(&mut self) -> io::Result<()> {
let status = self.child.status();
future::or(
async move {
let status = status.await?;
let error_message = format!(
"worker process exited before being ready, exit status {}, code {:?}",
status,
status.code(),
);
Err(io::Error::new(io::ErrorKind::NotFound, error_message))
},
self.wait_for_worker_ready(),
)
.await
}
async fn wait_for_worker_ready(&mut self) -> io::Result<()> {
#[derive(Deserialize)]
#[serde(tag = "event", rename_all = "lowercase")]
enum Notification {
Running,
}
let (sender, receiver) = async_oneshot::oneshot();
let pid = self.pid;
let sender = Mutex::new(Some(sender));
let _handler =
self.channel
.subscribe_to_notifications(self.pid.into(), move |notification| {
let result = match serde_json::from_value(notification.clone()) {
Ok(Notification::Running) => {
debug!("worker process running [pid:{}]", pid);
Ok(())
}
Err(error) => Err(io::Error::new(
io::ErrorKind::Other,
format!(
"unexpected first notification from worker [pid:{}]: {:?}; error = {}",
pid, notification, error
),
)),
};
let _ = sender
.lock()
.take()
.expect("Receiving more than one worker notification")
.send(result);
});
receiver.await.unwrap()
}
fn setup_message_handling(&mut self) {
let channel_receiver = self.channel.get_internal_message_receiver();
let payload_channel_receiver = self.payload_channel.get_internal_message_receiver();
let pid = self.pid;
let closed = Arc::clone(&self.closed);
self.executor
.spawn(async move {
while let Ok(message) = channel_receiver.recv().await {
match message {
channel::InternalMessage::Debug(text) => debug!("[pid:{}] {}", pid, text),
channel::InternalMessage::Warn(text) => warn!("[pid:{}] {}", pid, text),
channel::InternalMessage::Error(text) => {
if !closed.load(Ordering::SeqCst) {
error!("[pid:{}] {}", pid, text)
}
}
channel::InternalMessage::Dump(text) => eprintln!("{}", text),
channel::InternalMessage::Unexpected(data) => error!(
"worker[pid:{}] unexpected channel data: {}",
pid,
String::from_utf8_lossy(&data)
),
}
}
})
.detach();
self.executor
.spawn(async move {
while let Ok(message) = payload_channel_receiver.recv().await {
match message {
payload_channel::InternalMessage::UnexpectedData(data) => error!(
"worker[pid:{}] unexpected payload channel data: {}",
pid,
String::from_utf8_lossy(&data)
),
}
}
})
.detach();
}
}
#[derive(Clone)]
pub struct Worker {
inner: Arc<Inner>,
}
impl Worker {
pub(super) async fn new(
executor: Arc<Executor<'static>>,
worker_binary: PathBuf,
worker_settings: WorkerSettings,
worker_manager: WorkerManager,
) -> io::Result<Self> {
let inner = Inner::new(executor, worker_binary, worker_settings, worker_manager).await?;
Ok(Self { inner })
}
pub fn pid(&self) -> u32 {
self.inner.pid
}
pub fn app_data(&self) -> &AppData {
&self.inner.app_data
}
pub fn closed(&self) -> bool {
self.inner.closed.load(Ordering::SeqCst)
}
#[doc(hidden)]
pub async fn dump(&self) -> Result<WorkerDump, RequestError> {
debug!("dump()");
self.inner.channel.request(WorkerDumpRequest {}).await
}
pub async fn get_resource_usage(&self) -> Result<WorkerResourceUsage, RequestError> {
debug!("get_resource_usage()");
self.inner
.channel
.request(WorkerGetResourceRequest {})
.await
}
pub async fn update_settings(&self, data: WorkerUpdateSettings) -> Result<(), RequestError> {
debug!("update_settings()");
self.inner
.channel
.request(WorkerUpdateSettingsRequest { data })
.await
}
pub async fn create_router(
&self,
router_options: RouterOptions,
) -> Result<Router, CreateRouterError> {
debug!("create_router()");
let RouterOptions {
app_data,
media_codecs,
} = router_options;
let rtp_capabilities = ortc::generate_router_rtp_capabilities(media_codecs)
.map_err(CreateRouterError::FailedRtpCapabilitiesGeneration)?;
let router_id = RouterId::new();
let internal = RouterInternal { router_id };
let _buffer_guard = self.inner.channel.buffer_messages_for(router_id.into());
self.inner
.channel
.request(WorkerCreateRouterRequest { internal })
.await
.map_err(CreateRouterError::Request)?;
let router = Router::new(
router_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
self.inner.payload_channel.clone(),
rtp_capabilities,
app_data,
self.clone(),
);
self.inner.handlers.new_router.call(|callback| {
callback(&router);
});
Ok(router)
}
pub fn on_new_router<F: Fn(&Router) + Send + Sync + 'static>(&self, callback: F) -> HandlerId {
self.inner.handlers.new_router.add(Box::new(callback))
}
pub fn on_dead<F: FnOnce(ExitStatus) + Send + Sync + 'static>(&self, callback: F) -> HandlerId {
self.inner.handlers.dead.add(Box::new(callback))
}
pub fn on_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId {
let handler_id = self.inner.handlers.close.add(Box::new(callback));
if self.inner.closed.load(Ordering::Relaxed) {
self.inner.handlers.close.call_simple();
}
handler_id
}
}