vortex_layout/scan/executor/
threads.rs

1use std::future::Future;
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5
6use futures::channel::oneshot;
7use futures::future::BoxFuture;
8use futures::{FutureExt as _, TryFutureExt as _};
9use vortex_error::{VortexResult, VortexUnwrap, vortex_err};
10
11use super::Executor;
12
13trait Task {
14    fn run(self: Box<Self>);
15}
16
17struct ExecutorTask<F, R> {
18    task: F,
19    result: oneshot::Sender<R>,
20}
21
22impl<F, R> Task for ExecutorTask<F, R>
23where
24    F: Future<Output = R> + Send,
25    R: Send,
26{
27    fn run(self: Box<Self>) {
28        let Self { task, result } = *self;
29        futures::executor::block_on(async move {
30            let output = task.await;
31            _ = result.send(output);
32        })
33    }
34}
35
36/// Multithreaded task executor, runs tasks on a dedicated thread pool.
37#[derive(Clone, Default)]
38pub struct ThreadsExecutor {
39    inner: Arc<Inner>,
40}
41
42impl ThreadsExecutor {
43    pub fn new(num_threads: NonZeroUsize) -> Self {
44        Self {
45            inner: Arc::new(Inner::new(num_threads)),
46        }
47    }
48}
49
50struct Inner {
51    submitter: flume::Sender<Box<dyn Task + Send>>,
52    /// True as long as the runtime should be running
53    is_running: Arc<AtomicBool>,
54}
55
56impl Default for Inner {
57    fn default() -> Self {
58        // Safety:
59        // 1 isn't 0
60        Self::new(unsafe { NonZeroUsize::new_unchecked(1) })
61    }
62}
63
64impl Inner {
65    fn new(num_threads: NonZeroUsize) -> Self {
66        let (tx, rx) = flume::unbounded::<Box<dyn Task + Send>>();
67        let shutdown_signal = Arc::new(AtomicBool::new(true));
68        (0..num_threads.get()).for_each(|_| {
69            let rx = rx.clone();
70            let shutdown_signal = shutdown_signal.clone();
71            std::thread::spawn(move || {
72                // The channel errors if all senders are dropped, which means we probably don't care about the task anymore,
73                // and we can break and let the thread end.
74                while shutdown_signal.load(Ordering::Relaxed) {
75                    if let Ok(task) = rx.recv() {
76                        task.run()
77                    } else {
78                        break;
79                    }
80                }
81            });
82        });
83
84        Self {
85            submitter: tx,
86            is_running: shutdown_signal,
87        }
88    }
89}
90
91impl Executor for ThreadsExecutor {
92    fn spawn<F>(&self, f: F) -> BoxFuture<'static, VortexResult<F::Output>>
93    where
94        F: Future + Send + 'static,
95        <F as Future>::Output: Send + 'static,
96    {
97        let (tx, rx) = oneshot::channel();
98        let task = Box::new(ExecutorTask {
99            task: f,
100            result: tx,
101        });
102        self.inner
103            .submitter
104            .send(task)
105            .map_err(|e| vortex_err!("Failed to submit work to executor: {e}"))
106            .vortex_unwrap();
107
108        rx.map_err(|e| vortex_err!("Future canceled: {e}")).boxed()
109    }
110}
111
112impl Drop for Inner {
113    fn drop(&mut self) {
114        self.is_running.store(false, Ordering::SeqCst);
115    }
116}