studiole_command/services/
worker_pool.rs1use crate::prelude::*;
2use tokio::task::yield_now;
3
4pub struct WorkerPool<T: ICommandInfo> {
6 mediator: Arc<CommandMediator<T>>,
7 latest_worker_index: Arc<Mutex<WorkerId>>,
11 workers: Arc<Mutex<Vec<Worker>>>,
13}
14
15impl<T: ICommandInfo + 'static> Service for WorkerPool<T> {
16 type Error = ServiceError;
17
18 async fn from_services(services: &ServiceProvider) -> Result<Self, Report<Self::Error>> {
19 Ok(Self::new(services.get_service().await?))
20 }
21}
22
23impl<T: ICommandInfo + 'static> WorkerPool<T> {
24 #[must_use]
26 pub fn new(mediator: Arc<CommandMediator<T>>) -> Self {
27 Self {
28 mediator,
29 latest_worker_index: Arc::default(),
30 workers: Arc::default(),
31 }
32 }
33
34 pub(super) async fn start(&self, worker_count: usize) {
40 trace!(count = %worker_count, "Creating workers");
41 let mut index_guard = self.latest_worker_index.lock().await;
42 let start = *index_guard + 1;
43 let end = start + worker_count - 1;
44 *index_guard = end;
45 drop(index_guard);
46 self.mediator.set_runner_status(RunnerStatus::Running).await;
47 let mut handles = Vec::with_capacity(worker_count);
48 for worker_id in start..=end {
49 trace!(worker = worker_id, "Creating worker");
50 let handle = Worker::new(worker_id, self.mediator.clone());
51 handles.push(handle);
52 }
53 let mut workers_guard = self.workers.lock().await;
54 workers_guard.append(&mut handles);
55 drop(workers_guard);
56 trace!("Yielding until workers have started");
57 yield_now().await;
58 trace!("Workers have started");
59 }
60
61 pub async fn drain(&self) {
63 self.mediator
64 .set_runner_status(RunnerStatus::Draining)
65 .await;
66 self.wait_for_stop().await;
67 }
68
69 pub async fn stop(&self) {
71 self.mediator
72 .set_runner_status(RunnerStatus::Stopping)
73 .await;
74 self.wait_for_stop().await;
75 }
76
77 pub(super) async fn wait_for_stop(&self) {
78 let mut workers_guard = self.workers.lock().await;
79 let workers = take(&mut *workers_guard);
80 drop(workers_guard);
81 for worker in workers {
82 worker.wait_for_stop().await;
83 }
84 }
85}