vortex_io/runtime/
pool.rs1use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::time::Duration;
7
8use parking_lot::Mutex;
9use smol::block_on;
10use vortex_error::VortexExpect;
11
12#[derive(Clone)]
13pub struct CurrentThreadWorkerPool {
14 executor: Arc<smol::Executor<'static>>,
15 state: Arc<Mutex<PoolState>>,
16}
17
18impl CurrentThreadWorkerPool {
19 pub(super) fn new(executor: Arc<smol::Executor<'static>>) -> Self {
20 Self {
21 executor,
22 state: Arc::new(Mutex::new(PoolState::default())),
23 }
24 }
25
26 pub fn set_workers_to_available_parallelism(&self) {
29 let n = std::thread::available_parallelism()
30 .map(|n| n.get().saturating_sub(1).max(1))
31 .unwrap_or(1);
32 self.set_workers(n);
33 }
34
35 pub fn set_workers(&self, n: usize) {
39 let mut state = self.state.lock();
40 let current = state.workers.len();
41
42 if n > current {
43 for _ in current..n {
45 let shutdown = Arc::new(AtomicBool::new(false));
46 let executor = self.executor.clone();
47 let shutdown_clone = shutdown.clone();
48
49 std::thread::Builder::new()
50 .name("vortex-current-thread-worker".to_string())
51 .spawn(move || {
52 block_on(executor.run(async move {
54 while !shutdown_clone.load(Ordering::Relaxed) {
55 smol::Timer::after(Duration::from_millis(100)).await;
56 }
57 }))
58 })
59 .vortex_expect("Failed to spawn current thread worker");
60
61 state.workers.push(WorkerHandle { shutdown });
62 }
63 } else if n < current {
64 while state.workers.len() > n {
66 if let Some(worker) = state.workers.pop() {
67 worker.shutdown.store(true, Ordering::Relaxed);
68 }
69 }
70 }
71 }
72
73 pub fn worker_count(&self) -> usize {
75 self.state.lock().workers.len()
76 }
77}
78
79#[derive(Default)]
80struct PoolState {
81 workers: Vec<WorkerHandle>,
83}
84
85struct WorkerHandle {
86 shutdown: Arc<AtomicBool>,
88}
89
90impl Drop for CurrentThreadWorkerPool {
91 fn drop(&mut self) {
92 let mut state = self.state.lock();
93
94 for worker in state.workers.drain(..) {
96 worker.shutdown.store(true, Ordering::Relaxed);
97 }
98 }
99}