vortex_io/runtime/
pool.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5use std::sync::atomic::AtomicBool;
6use std::sync::atomic::Ordering;
7use std::time::Duration;
8
9use parking_lot::Mutex;
10use smol::block_on;
11use vortex_error::VortexExpect;
12
13#[derive(Clone)]
14pub struct CurrentThreadWorkerPool {
15    executor: Arc<smol::Executor<'static>>,
16    state: Arc<Mutex<PoolState>>,
17}
18
19impl CurrentThreadWorkerPool {
20    pub(super) fn new(executor: Arc<smol::Executor<'static>>) -> Self {
21        Self {
22            executor,
23            state: Arc::new(Mutex::new(PoolState::default())),
24        }
25    }
26
27    /// Set the number of worker threads to the available system parallelism as reported by
28    /// `std::thread::available_parallelism()` minus 1, to leave a slot open for the calling thread.
29    pub fn set_workers_to_available_parallelism(&self) {
30        let n = std::thread::available_parallelism()
31            .map(|n| n.get().saturating_sub(1).max(1))
32            .unwrap_or(1);
33        self.set_workers(n);
34    }
35
36    /// Set the number of worker threads
37    /// - If n > current: spawns additional workers
38    /// - If n < current: signals extra workers to shut down
39    pub fn set_workers(&self, n: usize) {
40        let mut state = self.state.lock();
41        let current = state.workers.len();
42
43        if n > current {
44            // Spawn new workers
45            for _ in current..n {
46                let shutdown = Arc::new(AtomicBool::new(false));
47                let executor = self.executor.clone();
48                let shutdown_clone = shutdown.clone();
49
50                std::thread::Builder::new()
51                    .name("vortex-current-thread-worker".to_string())
52                    .spawn(move || {
53                        // Run the executor with a sleeping future that checks for shutdown
54                        block_on(executor.run(async move {
55                            while !shutdown_clone.load(Ordering::Relaxed) {
56                                smol::Timer::after(Duration::from_millis(100)).await;
57                            }
58                        }))
59                    })
60                    .vortex_expect("Failed to spawn current thread worker");
61
62                state.workers.push(WorkerHandle { shutdown });
63            }
64        } else if n < current {
65            // Signal extra workers to shutdown and remove them
66            while state.workers.len() > n {
67                if let Some(worker) = state.workers.pop() {
68                    worker.shutdown.store(true, Ordering::Relaxed);
69                }
70            }
71        }
72    }
73
74    /// Get the current number of worker threads
75    pub fn worker_count(&self) -> usize {
76        self.state.lock().workers.len()
77    }
78}
79
80#[derive(Default)]
81struct PoolState {
82    /// The set of worker handles for the background threads.
83    workers: Vec<WorkerHandle>,
84}
85
86struct WorkerHandle {
87    /// The shutdown flag indicating that the worker should stop.
88    shutdown: Arc<AtomicBool>,
89}
90
91impl Drop for CurrentThreadWorkerPool {
92    fn drop(&mut self) {
93        let mut state = self.state.lock();
94
95        // Signal all workers to shut down
96        for worker in state.workers.drain(..) {
97            worker.shutdown.store(true, Ordering::Relaxed);
98        }
99    }
100}