Skip to main content

studiole_command/services/
worker_pool.rs

1use crate::prelude::*;
2use tokio::task::yield_now;
3
4/// Manage a dynamic set of [`Worker`] instances.
5pub struct WorkerPool<T: ICommandInfo> {
6    mediator: Arc<CommandMediator<T>>,
7    /// Current worker index
8    ///
9    /// Used to ensure each worker has a unique ID even if additional workers are started
10    latest_worker_index: Arc<Mutex<WorkerId>>,
11    /// Workers
12    workers: Arc<Mutex<Vec<Worker>>>,
13}
14
15impl<T: ICommandInfo + 'static> FromServices for WorkerPool<T> {
16    type Error = ResolveError;
17
18    fn from_services(services: &ServiceProvider) -> Result<Self, Report<Self::Error>> {
19        Ok(Self::new(services.get::<CommandMediator<T>>()?))
20    }
21}
22
23impl<T: ICommandInfo + 'static> WorkerPool<T> {
24    /// Create a new [`WorkerPool`] backed by a [`CommandMediator`].
25    #[must_use]
26    pub fn new(mediator: Arc<CommandMediator<T>>) -> Self {
27        Self {
28            mediator,
29            latest_worker_index: Arc::default(),
30            workers: Arc::default(),
31        }
32    }
33
34    /// Start any number of workers.
35    ///
36    /// Each worker will have a unique ID.
37    ///
38    /// Status will be set to `Running`.
39    pub(super) async fn start(&self, worker_count: usize) {
40        trace!(count = %worker_count, "Creating workers");
41        let mut index_guard = self.latest_worker_index.lock().await;
42        let start = *index_guard + 1;
43        let end = start + worker_count - 1;
44        *index_guard = end;
45        drop(index_guard);
46        self.mediator.set_runner_status(RunnerStatus::Running).await;
47        let mut handles = Vec::with_capacity(worker_count);
48        for worker_id in start..=end {
49            trace!(worker = worker_id, "Creating worker");
50            let handle = Worker::new(worker_id, self.mediator.clone());
51            handles.push(handle);
52        }
53        let mut workers_guard = self.workers.lock().await;
54        workers_guard.append(&mut handles);
55        drop(workers_guard);
56        trace!("Yielding until workers have started");
57        yield_now().await;
58        trace!("Workers have started");
59    }
60
61    /// Stop workers after draining the queue
62    pub async fn drain(&self) {
63        self.mediator
64            .set_runner_status(RunnerStatus::Draining)
65            .await;
66        self.wait_for_stop().await;
67    }
68
69    /// Stop workers after their current work is complete
70    pub async fn stop(&self) {
71        self.mediator
72            .set_runner_status(RunnerStatus::Stopping)
73            .await;
74        self.wait_for_stop().await;
75    }
76
77    pub(super) async fn wait_for_stop(&self) {
78        let mut workers_guard = self.workers.lock().await;
79        let workers = take(&mut *workers_guard);
80        drop(workers_guard);
81        for worker in workers {
82            worker.wait_for_stop().await;
83        }
84    }
85}