1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
//! Container that creates [`Worker`] instances.

#[cfg(test)]
mod tests;

use crate::worker::{Worker, WorkerId, WorkerSettings};
use async_executor::Executor;
use async_oneshot::Sender;
use event_listener_primitives::{Bag, HandlerId};
use futures_lite::future;
use log::debug;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::ops::DerefMut;
use std::sync::mpsc;
use std::sync::Arc;
use std::{fmt, io, mem};

#[derive(Default)]
#[allow(clippy::type_complexity)]
struct Handlers {
    new_worker: Bag<Arc<dyn Fn(&Worker) + Send + Sync>, Worker>,
}

struct Inner {
    executor: Arc<Executor<'static>>,
    handlers: Handlers,
    /// Mapping from worker ID to the close event receiver
    workers: Arc<Mutex<HashMap<WorkerId, mpsc::Receiver<()>>>>,
    /// This field is only used in order to be dropped with the worker manager itself to stop the
    /// thread created with `WorkerManager::new()` call
    _stop_sender: Option<Sender<()>>,
}

impl Drop for Inner {
    fn drop(&mut self) {
        let workers = mem::take(self.workers.lock().deref_mut());
        for exit_receiver in workers.into_values() {
            let _ = exit_receiver.recv();
        }
    }
}

/// Container that creates [`Worker`] instances.
///
/// # Examples
/// ```no_run
/// use futures_lite::future;
/// use mediasoup::worker::WorkerSettings;
/// use mediasoup::worker_manager::WorkerManager;
///
/// // Create a manager that will use specified binary for spawning new worker thread
/// let worker_manager = WorkerManager::new();
///
/// future::block_on(async move {
///     // Create a new worker with default settings
///     let worker = worker_manager
///         .create_worker(WorkerSettings::default())
///         .await
///         .unwrap();
/// })
/// ```
///
/// If you already happen to have [`async_executor::Executor`] instance available or need a
/// multi-threaded executor, [`WorkerManager::with_executor()`] can be used to create an instance
/// instead.
#[derive(Clone)]
#[must_use]
pub struct WorkerManager {
    inner: Arc<Inner>,
}

impl fmt::Debug for WorkerManager {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("WorkerManager").finish()
    }
}

impl Default for WorkerManager {
    fn default() -> Self {
        Self::new()
    }
}

impl WorkerManager {
    /// Create new worker manager, internally a new single-threaded executor will be created.
    pub fn new() -> Self {
        let executor = Arc::new(Executor::new());
        let (stop_sender, stop_receiver) = async_oneshot::oneshot::<()>();
        {
            let executor = Arc::clone(&executor);
            std::thread::spawn(move || {
                // Will return Err(Closed) when `WorkerManager` struct is dropped
                let _ = future::block_on(executor.run(stop_receiver));
            });
        }

        let handlers = Handlers::default();

        let inner = Arc::new(Inner {
            executor,
            handlers,
            workers: Arc::default(),
            _stop_sender: Some(stop_sender),
        });

        Self { inner }
    }

    /// Create new worker manager, uses externally provided executor.
    pub fn with_executor(executor: Arc<Executor<'static>>) -> Self {
        let handlers = Handlers::default();

        let inner = Arc::new(Inner {
            executor,
            handlers,
            workers: Arc::default(),
            _stop_sender: None,
        });

        Self { inner }
    }

    /// Creates a new worker with the given settings.
    ///
    /// Worker manager will be kept alive as long as at least one worker instance is alive.
    pub async fn create_worker(&self, worker_settings: WorkerSettings) -> io::Result<Worker> {
        debug!("create_worker()");

        let (exit_sender, exit_receiver) = mpsc::channel();
        let id = Arc::new(Mutex::new(None));
        let worker = Worker::new(
            Arc::clone(&self.inner.executor),
            worker_settings,
            self.clone(),
            {
                let id = Arc::clone(&id);
                let workers = Arc::clone(&self.inner.workers);

                move || {
                    let _ = exit_sender.send(());
                    if let Some(id) = id.lock().take() {
                        workers.lock().remove(&id);
                    }
                }
            },
        )
        .await?;

        self.inner.handlers.new_worker.call_simple(&worker);

        id.lock().replace(worker.id());

        self.inner.workers.lock().insert(worker.id(), exit_receiver);

        Ok(worker)
    }

    /// Callback is called when a new worker is created.
    pub fn on_new_worker<F: Fn(&Worker) + Send + Sync + 'static>(&self, callback: F) -> HandlerId {
        self.inner.handlers.new_worker.add(Arc::new(callback))
    }
}