1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// Contents of this module is inspired by https://github.com/Srinivasa314/alcro/tree/master/src/chrome
use crate::worker::channel::BufferMessagesGuard;
use crate::worker::{Channel, PayloadChannel, WorkerId};
use async_executor::Executor;
use async_fs::File;
use async_oneshot::Receiver;
use std::ffi::CString;
use std::mem;
use std::os::raw::{c_char, c_int};
use std::os::unix::io::FromRawFd;
use std::sync::Arc;
use thiserror::Error;

/// Worker exit error
#[derive(Debug, Copy, Clone, Error)]
pub enum ExitError {
    /// Generic error.
    #[error("Worker exited with generic error")]
    Generic,
    /// Settings error.
    #[error("Worker exited with settings error")]
    Settings,
    /// Unknown error.
    #[error("Worker exited with unknown error and status code {status_code}")]
    Unknown {
        /// Status code returned by worker
        status_code: i32,
    },
    /// Unexpected error.
    #[error("Worker exited unexpectedly")]
    Unexpected,
}

fn pipe() -> [c_int; 2] {
    unsafe {
        let mut fds = mem::MaybeUninit::<[c_int; 2]>::uninit();

        if libc::pipe(fds.as_mut_ptr() as *mut c_int) != 0 {
            panic!(
                "libc::pipe() failed with code {}",
                *libc::__errno_location()
            );
        }

        fds.assume_init()
    }
}

pub(super) struct WorkerRunResult {
    pub(super) channel: Channel,
    pub(super) payload_channel: PayloadChannel,
    pub(super) status_receiver: Receiver<Result<(), ExitError>>,
    pub(super) buffer_worker_messages_guard: BufferMessagesGuard,
}

pub(super) fn run_worker_with_channels(
    id: WorkerId,
    executor: Arc<Executor<'static>>,
    args: Vec<String>,
) -> WorkerRunResult {
    let [producer_fd_read, producer_fd_write] = pipe();
    let [consumer_fd_read, consumer_fd_write] = pipe();
    let [producer_payload_fd_read, producer_payload_fd_write] = pipe();
    let [consumer_payload_fd_read, consumer_payload_fd_write] = pipe();
    let (mut status_sender, status_receiver) = async_oneshot::oneshot();

    let producer_file = unsafe { File::from_raw_fd(producer_fd_write) };
    let consumer_file = unsafe { File::from_raw_fd(consumer_fd_read) };
    let producer_payload_file = unsafe { File::from_raw_fd(producer_payload_fd_write) };
    let consumer_payload_file = unsafe { File::from_raw_fd(consumer_payload_fd_read) };

    let channel = Channel::new(Arc::clone(&executor), consumer_file, producer_file);
    let payload_channel =
        PayloadChannel::new(executor, consumer_payload_file, producer_payload_file);
    let buffer_worker_messages_guard = channel.buffer_messages_for(std::process::id().into());

    std::thread::Builder::new()
        .name(format!("mediasoup-worker-{}", id))
        .spawn(move || {
            let argc = args.len() as c_int;
            let args_cstring = args
                .into_iter()
                .map(|s| -> CString { CString::new(s).unwrap() })
                .collect::<Vec<_>>();
            let argv = args_cstring
                .iter()
                .map(|arg| arg.as_ptr() as *const c_char)
                .collect::<Vec<_>>();
            let version = CString::new(env!("CARGO_PKG_VERSION")).unwrap();
            let status_code = unsafe {
                mediasoup_sys::run_worker(
                    argc,
                    argv.as_ptr(),
                    version.as_ptr(),
                    producer_fd_read,
                    consumer_fd_write,
                    producer_payload_fd_read,
                    consumer_payload_fd_write,
                )
            };

            let _ = status_sender.send(match status_code {
                0 => Ok(()),
                1 => Err(ExitError::Generic),
                42 => Err(ExitError::Settings),
                status_code => Err(ExitError::Unknown { status_code }),
            });
        })
        .expect("Failed to spawn mediasoup-worker thread");

    WorkerRunResult {
        channel,
        payload_channel,
        status_receiver,
        buffer_worker_messages_guard,
    }
}