vortex_io/runtime/
pool.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::time::Duration;
7
8use parking_lot::Mutex;
9use smol::block_on;
10use vortex_error::VortexExpect;
11
12#[derive(Clone)]
13pub struct CurrentThreadWorkerPool {
14    executor: Arc<smol::Executor<'static>>,
15    state: Arc<Mutex<PoolState>>,
16}
17
18impl CurrentThreadWorkerPool {
19    pub(super) fn new(executor: Arc<smol::Executor<'static>>) -> Self {
20        Self {
21            executor,
22            state: Arc::new(Mutex::new(PoolState::default())),
23        }
24    }
25
26    /// Set the number of worker threads to the available system parallelism as reported by
27    /// `std::thread::available_parallelism()` minus 1, to leave a slot open for the calling thread.
28    pub fn set_workers_to_available_parallelism(&self) {
29        let n = std::thread::available_parallelism()
30            .map(|n| n.get().saturating_sub(1).max(1))
31            .unwrap_or(1);
32        self.set_workers(n);
33    }
34
35    /// Set the number of worker threads
36    /// - If n > current: spawns additional workers
37    /// - If n < current: signals extra workers to shut down
38    pub fn set_workers(&self, n: usize) {
39        let mut state = self.state.lock();
40        let current = state.workers.len();
41
42        if n > current {
43            // Spawn new workers
44            for _ in current..n {
45                let shutdown = Arc::new(AtomicBool::new(false));
46                let executor = self.executor.clone();
47                let shutdown_clone = shutdown.clone();
48
49                std::thread::Builder::new()
50                    .name("vortex-current-thread-worker".to_string())
51                    .spawn(move || {
52                        // Run the executor with a sleeping future that checks for shutdown
53                        block_on(executor.run(async move {
54                            while !shutdown_clone.load(Ordering::Relaxed) {
55                                smol::Timer::after(Duration::from_millis(100)).await;
56                            }
57                        }))
58                    })
59                    .vortex_expect("Failed to spawn current thread worker");
60
61                state.workers.push(WorkerHandle { shutdown });
62            }
63        } else if n < current {
64            // Signal extra workers to shutdown and remove them
65            while state.workers.len() > n {
66                if let Some(worker) = state.workers.pop() {
67                    worker.shutdown.store(true, Ordering::Relaxed);
68                }
69            }
70        }
71    }
72
73    /// Get the current number of worker threads
74    pub fn worker_count(&self) -> usize {
75        self.state.lock().workers.len()
76    }
77}
78
79#[derive(Default)]
80struct PoolState {
81    /// The set of worker handles for the background threads.
82    workers: Vec<WorkerHandle>,
83}
84
85struct WorkerHandle {
86    /// The shutdown flag indicating that the worker should stop.
87    shutdown: Arc<AtomicBool>,
88}
89
90impl Drop for CurrentThreadWorkerPool {
91    fn drop(&mut self) {
92        let mut state = self.state.lock();
93
94        // Signal all workers to shut down
95        for worker in state.workers.drain(..) {
96            worker.shutdown.store(true, Ordering::Relaxed);
97        }
98    }
99}