1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use crate::worker::channel::BufferMessagesGuard;
use crate::worker::{Channel, PayloadChannel, WorkerId};
use async_executor::Executor;
use async_fs::File;
use async_oneshot::Receiver;
use std::ffi::CString;
use std::mem;
use std::os::raw::{c_char, c_int};
use std::os::unix::io::FromRawFd;
use std::sync::Arc;
use thiserror::Error;
#[derive(Debug, Copy, Clone, Error)]
pub enum ExitError {
#[error("Worker exited with generic error")]
Generic,
#[error("Worker exited with settings error")]
Settings,
#[error("Worker exited with unknown error and status code {status_code}")]
Unknown {
status_code: i32,
},
#[error("Worker exited unexpectedly")]
Unexpected,
}
fn pipe() -> [c_int; 2] {
unsafe {
let mut fds = mem::MaybeUninit::<[c_int; 2]>::uninit();
if libc::pipe(fds.as_mut_ptr() as *mut c_int) != 0 {
panic!(
"libc::pipe() failed with code {}",
*libc::__errno_location()
);
}
fds.assume_init()
}
}
pub(super) struct WorkerRunResult {
pub(super) channel: Channel,
pub(super) payload_channel: PayloadChannel,
pub(super) status_receiver: Receiver<Result<(), ExitError>>,
pub(super) buffer_worker_messages_guard: BufferMessagesGuard,
}
pub(super) fn run_worker_with_channels(
id: WorkerId,
executor: Arc<Executor<'static>>,
args: Vec<String>,
) -> WorkerRunResult {
let [producer_fd_read, producer_fd_write] = pipe();
let [consumer_fd_read, consumer_fd_write] = pipe();
let [producer_payload_fd_read, producer_payload_fd_write] = pipe();
let [consumer_payload_fd_read, consumer_payload_fd_write] = pipe();
let (mut status_sender, status_receiver) = async_oneshot::oneshot();
let producer_file = unsafe { File::from_raw_fd(producer_fd_write) };
let consumer_file = unsafe { File::from_raw_fd(consumer_fd_read) };
let producer_payload_file = unsafe { File::from_raw_fd(producer_payload_fd_write) };
let consumer_payload_file = unsafe { File::from_raw_fd(consumer_payload_fd_read) };
let channel = Channel::new(Arc::clone(&executor), consumer_file, producer_file);
let payload_channel =
PayloadChannel::new(executor, consumer_payload_file, producer_payload_file);
let buffer_worker_messages_guard = channel.buffer_messages_for(std::process::id().into());
std::thread::Builder::new()
.name(format!("mediasoup-worker-{}", id))
.spawn(move || {
let argc = args.len() as c_int;
let args_cstring = args
.into_iter()
.map(|s| -> CString { CString::new(s).unwrap() })
.collect::<Vec<_>>();
let argv = args_cstring
.iter()
.map(|arg| arg.as_ptr() as *const c_char)
.collect::<Vec<_>>();
let version = CString::new(env!("CARGO_PKG_VERSION")).unwrap();
let status_code = unsafe {
mediasoup_sys::run_worker(
argc,
argv.as_ptr(),
version.as_ptr(),
producer_fd_read,
consumer_fd_write,
producer_payload_fd_read,
consumer_payload_fd_write,
)
};
let _ = status_sender.send(match status_code {
0 => Ok(()),
1 => Err(ExitError::Generic),
42 => Err(ExitError::Settings),
status_code => Err(ExitError::Unknown { status_code }),
});
})
.expect("Failed to spawn mediasoup-worker thread");
WorkerRunResult {
channel,
payload_channel,
status_receiver,
buffer_worker_messages_guard,
}
}