vortex_io/runtime/
pool.rs1use std::sync::Arc;
5use std::sync::atomic::AtomicBool;
6use std::sync::atomic::Ordering;
7use std::time::Duration;
8
9use parking_lot::Mutex;
10use smol::block_on;
11use vortex_error::VortexExpect;
12
13#[derive(Clone)]
14pub struct CurrentThreadWorkerPool {
15 executor: Arc<smol::Executor<'static>>,
16 state: Arc<Mutex<PoolState>>,
17}
18
19impl CurrentThreadWorkerPool {
20 pub(super) fn new(executor: Arc<smol::Executor<'static>>) -> Self {
21 Self {
22 executor,
23 state: Arc::new(Mutex::new(PoolState::default())),
24 }
25 }
26
27 pub fn set_workers_to_available_parallelism(&self) {
30 let n = std::thread::available_parallelism()
31 .map(|n| n.get().saturating_sub(1).max(1))
32 .unwrap_or(1);
33 self.set_workers(n);
34 }
35
36 pub fn set_workers(&self, n: usize) {
40 let mut state = self.state.lock();
41 let current = state.workers.len();
42
43 if n > current {
44 for _ in current..n {
46 let shutdown = Arc::new(AtomicBool::new(false));
47 let executor = self.executor.clone();
48 let shutdown_clone = shutdown.clone();
49
50 std::thread::Builder::new()
51 .name("vortex-current-thread-worker".to_string())
52 .spawn(move || {
53 block_on(executor.run(async move {
55 while !shutdown_clone.load(Ordering::Relaxed) {
56 smol::Timer::after(Duration::from_millis(100)).await;
57 }
58 }))
59 })
60 .vortex_expect("Failed to spawn current thread worker");
61
62 state.workers.push(WorkerHandle { shutdown });
63 }
64 } else if n < current {
65 while state.workers.len() > n {
67 if let Some(worker) = state.workers.pop() {
68 worker.shutdown.store(true, Ordering::Relaxed);
69 }
70 }
71 }
72 }
73
74 pub fn worker_count(&self) -> usize {
76 self.state.lock().workers.len()
77 }
78}
79
80#[derive(Default)]
81struct PoolState {
82 workers: Vec<WorkerHandle>,
84}
85
86struct WorkerHandle {
87 shutdown: Arc<AtomicBool>,
89}
90
91impl Drop for CurrentThreadWorkerPool {
92 fn drop(&mut self) {
93 let mut state = self.state.lock();
94
95 for worker in state.workers.drain(..) {
97 worker.shutdown.store(true, Ordering::Relaxed);
98 }
99 }
100}