mod channel;
mod common;
mod payload_channel;
mod utils;
use crate::data_structures::AppData;
use crate::messages::{
RouterInternal, WorkerCreateRouterRequest, WorkerDumpRequest, WorkerGetResourceRequest,
WorkerUpdateSettingsRequest,
};
use crate::ortc;
use crate::ortc::RtpCapabilitiesError;
use crate::router::{Router, RouterId, RouterOptions};
use crate::worker_manager::WorkerManager;
use async_executor::Executor;
use async_process::{Child, Command, ExitStatus, Stdio};
pub(crate) use channel::Channel;
pub(crate) use common::SubscriptionHandler;
use event_listener_primitives::{Bag, HandlerId};
use futures_lite::io::BufReader;
use futures_lite::{future, AsyncBufReadExt, StreamExt};
use log::*;
pub(crate) use payload_channel::{NotificationError, NotificationMessage, PayloadChannel};
use serde::ser::Serializer;
use serde::{Deserialize, Serialize};
use std::cell::Cell;
use std::error::Error;
use std::ffi::OsString;
use std::path::PathBuf;
use std::sync::Arc;
use std::{env, io};
use thiserror::Error;
use utils::SpawnResult;
#[derive(Debug, Error)]
pub enum RequestError {
#[error("Channel already closed")]
ChannelClosed,
#[error("Message is too long")]
MessageTooLong,
#[error("Payload is too long")]
PayloadTooLong,
#[error("Request timed out")]
TimedOut,
#[error("Received response error: {reason}")]
Response { reason: String },
#[error("Failed to parse response from worker: {error}")]
FailedToParse {
#[from]
error: Box<dyn Error>,
},
#[error("Worker did not return any data in response")]
NoData,
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum WorkerLogLevel {
Debug,
Warn,
Error,
None,
}
impl Default for WorkerLogLevel {
fn default() -> Self {
Self::Error
}
}
impl Serialize for WorkerLogLevel {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.as_str())
}
}
impl WorkerLogLevel {
pub fn as_str(&self) -> &'static str {
match self {
Self::Debug => "debug",
Self::Warn => "warn",
Self::Error => "error",
Self::None => "none",
}
}
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum WorkerLogTag {
Info,
Ice,
Dtls,
Rtp,
Srtp,
Rtcp,
Rtx,
Bwe,
Score,
Simulcast,
Svc,
Sctp,
Message,
}
impl Serialize for WorkerLogTag {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(self.as_str())
}
}
impl WorkerLogTag {
pub fn as_str(&self) -> &'static str {
match self {
Self::Info => "info",
Self::Ice => "ice",
Self::Dtls => "dtls",
Self::Rtp => "rtp",
Self::Srtp => "srtp",
Self::Rtcp => "rtcp",
Self::Rtx => "rtx",
Self::Bwe => "bwe",
Self::Score => "score",
Self::Simulcast => "simulcast",
Self::Svc => "svc",
Self::Sctp => "sctp",
Self::Message => "message",
}
}
}
#[derive(Debug, Clone)]
pub struct WorkerDtlsFiles {
pub certificate: PathBuf,
pub private_key: PathBuf,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct WorkerSettings {
pub app_data: AppData,
pub log_level: WorkerLogLevel,
pub log_tags: Vec<WorkerLogTag>,
pub rtc_min_port: u16,
pub rtc_max_port: u16,
pub dtls_files: Option<WorkerDtlsFiles>,
}
impl Default for WorkerSettings {
fn default() -> Self {
Self {
app_data: AppData::default(),
log_level: WorkerLogLevel::default(),
log_tags: Vec::new(),
rtc_min_port: 10000,
rtc_max_port: 59999,
dtls_files: None,
}
}
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct WorkerUpdateSettings {
pub log_level: WorkerLogLevel,
pub log_tags: Vec<WorkerLogTag>,
}
#[derive(Debug, Copy, Clone, Deserialize)]
pub struct WorkerResourceUsage {
pub ru_utime: u64,
pub ru_stime: u64,
pub ru_maxrss: u64,
pub ru_ixrss: u64,
pub ru_idrss: u64,
pub ru_isrss: u64,
pub ru_minflt: u64,
pub ru_majflt: u64,
pub ru_nswap: u64,
pub ru_inblock: u64,
pub ru_oublock: u64,
pub ru_msgsnd: u64,
pub ru_msgrcv: u64,
pub ru_nsignals: u64,
pub ru_nvcsw: u64,
pub ru_nivcsw: u64,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[doc(hidden)]
pub struct WorkerDump {
pub pid: u32,
pub router_ids: Vec<RouterId>,
}
#[derive(Debug, Error)]
pub enum CreateRouterError {
#[error("RTP capabilities generation error: {0}")]
FailedRtpCapabilitiesGeneration(RtpCapabilitiesError),
#[error("Request to worker failed: {0}")]
Request(RequestError),
}
#[derive(Default)]
struct Handlers {
new_router: Bag<'static, dyn Fn(&Router) + Send>,
dead: Bag<'static, dyn FnOnce(ExitStatus) + Send>,
close: Bag<'static, dyn FnOnce() + Send>,
}
struct Inner {
channel: Channel,
payload_channel: PayloadChannel,
child: Child,
executor: Arc<Executor<'static>>,
pid: u32,
handlers: Handlers,
app_data: AppData,
_worker_manager: WorkerManager,
}
impl Drop for Inner {
fn drop(&mut self) {
debug!("drop()");
self.handlers.close.call_once_simple();
if matches!(self.child.try_status(), Ok(None)) {
unsafe {
libc::kill(self.pid as libc::pid_t, libc::SIGTERM);
}
}
}
}
impl Inner {
async fn new(
executor: Arc<Executor<'static>>,
worker_binary: PathBuf,
WorkerSettings {
app_data,
log_level,
log_tags,
rtc_min_port,
rtc_max_port,
dtls_files,
}: WorkerSettings,
worker_manager: WorkerManager,
) -> io::Result<Arc<Self>> {
debug!("new()");
let mut spawn_args: Vec<OsString> = Vec::new();
let spawn_bin: PathBuf = match env::var("MEDIASOUP_USE_VALGRIND") {
Ok(value) if value.as_str() == "true" => {
let binary = match env::var("MEDIASOUP_VALGRIND_BIN") {
Ok(binary) => binary.into(),
_ => "valgrind".into(),
};
spawn_args.push(worker_binary.into_os_string());
binary
}
_ => worker_binary,
};
spawn_args.push(format!("--logLevel={}", log_level.as_str()).into());
if !log_tags.is_empty() {
let log_tags = log_tags
.iter()
.map(|log_tag| log_tag.as_str())
.collect::<Vec<_>>()
.join(",");
spawn_args.push(format!("--logTags={}", log_tags).into());
}
spawn_args.push(format!("--rtcMinPort={}", rtc_min_port).into());
spawn_args.push(format!("--rtcMaxPort={}", rtc_max_port).into());
if let Some(dtls_files) = dtls_files {
{
let mut arg = OsString::new();
arg.push("--dtlsCertificateFile=");
arg.push(dtls_files.certificate);
spawn_args.push(arg);
}
{
let mut arg = OsString::new();
arg.push("--dtlsPrivateKeyFile=");
arg.push(dtls_files.private_key);
spawn_args.push(arg);
}
}
debug!(
"spawning worker process: {} {}",
spawn_bin.to_string_lossy(),
spawn_args
.iter()
.map(|arg| arg.to_string_lossy())
.collect::<Vec<_>>()
.join(" ")
);
let mut command = Command::new(spawn_bin);
command
.args(spawn_args)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.env("MEDIASOUP_VERSION", env!("CARGO_PKG_VERSION"));
let SpawnResult {
child,
channel,
payload_channel,
} = utils::spawn_with_worker_channels(Arc::clone(&executor), &mut command)?;
let pid = child.id();
let handlers = Handlers::default();
let mut inner = Self {
channel,
payload_channel,
child,
executor,
pid,
handlers,
app_data,
_worker_manager: worker_manager,
};
inner.setup_output_forwarding();
inner.wait_for_worker_process().await?;
inner.setup_message_handling();
let status_fut = inner.child.status();
let inner = Arc::new(inner);
{
let inner_weak = Arc::downgrade(&inner);
inner
.executor
.spawn(async move {
let status = status_fut.await;
if let Some(inner) = inner_weak.upgrade() {
if let Ok(exit_status) = status {
warn!("exit status {}", exit_status);
inner.handlers.dead.call_once(|callback| {
callback(exit_status);
});
}
}
})
.detach();
}
Ok(inner)
}
fn setup_output_forwarding(&mut self) {
let stdout = self.child.stdout.take().unwrap();
self.executor
.spawn(async move {
let mut lines = BufReader::new(stdout).lines();
while let Some(Ok(line)) = lines.next().await {
debug!("(stdout) {}", line);
}
})
.detach();
let stderr = self.child.stderr.take().unwrap();
self.executor
.spawn(async move {
let mut lines = BufReader::new(stderr).lines();
while let Some(Ok(line)) = lines.next().await {
error!("(stderr) {}", line);
}
})
.detach();
}
async fn wait_for_worker_process(&mut self) -> io::Result<()> {
let status = self.child.status();
future::or(
async move {
status.await?;
Err(io::Error::new(
io::ErrorKind::NotFound,
"worker process exited before being ready",
))
},
self.wait_for_worker_ready(),
)
.await
}
async fn wait_for_worker_ready(&mut self) -> io::Result<()> {
#[derive(Deserialize)]
#[serde(tag = "event", rename_all = "lowercase")]
enum Notification {
Running,
}
let (sender, receiver) = async_oneshot::oneshot();
let pid = self.pid;
let sender = Cell::new(Some(sender));
let _handler = self
.channel
.subscribe_to_notifications(self.pid.to_string(), move |notification| {
let result = match serde_json::from_value(notification.clone()) {
Ok(Notification::Running) => {
debug!("worker process running [pid:{}]", pid);
Ok(())
}
Err(error) => Err(io::Error::new(
io::ErrorKind::Other,
format!(
"unexpected first notification from worker [pid:{}]: {:?}; error = {}",
pid, notification, error
),
)),
};
let _ = sender
.take()
.take()
.expect("Receiving more than one worker notification")
.send(result);
})
.await;
receiver.await.unwrap()
}
fn setup_message_handling(&mut self) {
let channel_receiver = self.channel.get_internal_message_receiver();
let payload_channel_receiver = self.payload_channel.get_internal_message_receiver();
let pid = self.pid;
self.executor
.spawn(async move {
while let Ok(message) = channel_receiver.recv().await {
match message {
channel::InternalMessage::Debug(text) => debug!("[pid:{}] {}", pid, text),
channel::InternalMessage::Warn(text) => warn!("[pid:{}] {}", pid, text),
channel::InternalMessage::Error(text) => error!("[pid:{}] {}", pid, text),
channel::InternalMessage::Dump(text) => eprintln!("{}", text),
channel::InternalMessage::Unexpected(data) => error!(
"worker[pid:{}] unexpected channel data: {}",
pid,
String::from_utf8_lossy(&data)
),
}
}
})
.detach();
self.executor
.spawn(async move {
while let Ok(message) = payload_channel_receiver.recv().await {
match message {
payload_channel::InternalMessage::UnexpectedData(data) => error!(
"worker[pid:{}] unexpected payload channel data: {}",
pid,
String::from_utf8_lossy(&data)
),
}
}
})
.detach();
}
}
#[derive(Clone)]
pub struct Worker {
inner: Arc<Inner>,
}
impl Worker {
pub(super) async fn new(
executor: Arc<Executor<'static>>,
worker_binary: PathBuf,
worker_settings: WorkerSettings,
worker_manager: WorkerManager,
) -> io::Result<Self> {
let inner = Inner::new(executor, worker_binary, worker_settings, worker_manager).await?;
Ok(Self { inner })
}
pub fn pid(&self) -> u32 {
self.inner.pid
}
pub fn app_data(&self) -> &AppData {
&self.inner.app_data
}
#[doc(hidden)]
pub async fn dump(&self) -> Result<WorkerDump, RequestError> {
debug!("dump()");
self.inner.channel.request(WorkerDumpRequest {}).await
}
pub async fn get_resource_usage(&self) -> Result<WorkerResourceUsage, RequestError> {
debug!("get_resource_usage()");
self.inner
.channel
.request(WorkerGetResourceRequest {})
.await
}
pub async fn update_settings(&self, data: WorkerUpdateSettings) -> Result<(), RequestError> {
debug!("update_settings()");
self.inner
.channel
.request(WorkerUpdateSettingsRequest { data })
.await
}
pub async fn create_router(
&self,
router_options: RouterOptions,
) -> Result<Router, CreateRouterError> {
debug!("create_router()");
let RouterOptions {
app_data,
media_codecs,
} = router_options;
let rtp_capabilities = ortc::generate_router_rtp_capabilities(media_codecs)
.map_err(CreateRouterError::FailedRtpCapabilitiesGeneration)?;
let router_id = RouterId::new();
let internal = RouterInternal { router_id };
self.inner
.channel
.request(WorkerCreateRouterRequest { internal })
.await
.map_err(CreateRouterError::Request)?;
let router = Router::new(
router_id,
Arc::clone(&self.inner.executor),
self.inner.channel.clone(),
self.inner.payload_channel.clone(),
rtp_capabilities,
app_data,
self.clone(),
);
self.inner.handlers.new_router.call(|callback| {
callback(&router);
});
Ok(router)
}
pub fn on_new_router<F: Fn(&Router) + Send + 'static>(
&self,
callback: F,
) -> HandlerId<'static> {
self.inner.handlers.new_router.add(Box::new(callback))
}
pub fn on_dead<F: FnOnce(ExitStatus) + Send + 'static>(
&self,
callback: F,
) -> HandlerId<'static> {
self.inner.handlers.dead.add(Box::new(callback))
}
pub fn on_close<F: FnOnce() + Send + 'static>(&self, callback: F) -> HandlerId<'static> {
self.inner.handlers.close.add(Box::new(callback))
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_lite::future;
use std::thread;
fn init() {
let _ = env_logger::builder().is_test(true).try_init();
}
#[test]
fn worker_test() {
init();
let executor = Arc::new(Executor::new());
let (_stop_sender, stop_receiver) = async_oneshot::oneshot::<()>();
{
let executor = Arc::clone(&executor);
thread::spawn(move || {
let _ = future::block_on(executor.run(stop_receiver));
});
}
let worker_settings = WorkerSettings::default();
let worker_binary: PathBuf = env::var("MEDIASOUP_WORKER_BIN")
.map(Into::into)
.unwrap_or_else(|_| "../worker/out/Release/mediasoup-worker".into());
future::block_on(async move {
let worker = Worker::new(
executor,
worker_binary.clone(),
worker_settings,
WorkerManager::new(worker_binary),
)
.await
.unwrap();
println!("Worker dump: {:#?}", worker.dump().await.unwrap());
println!(
"Resource usage: {:#?}",
worker.get_resource_usage().await.unwrap()
);
println!(
"Update settings: {:?}",
worker
.update_settings(WorkerUpdateSettings {
log_level: WorkerLogLevel::Debug,
log_tags: Vec::new(),
})
.await
.unwrap()
);
thread::sleep(std::time::Duration::from_millis(200));
});
thread::sleep(std::time::Duration::from_millis(200));
}
}