vortex_layout/scan/executor/
threads.rs1use std::future::Future;
2use std::num::NonZeroUsize;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, Ordering};
5
6use futures::channel::oneshot;
7use futures::future::BoxFuture;
8use futures::{FutureExt as _, TryFutureExt as _};
9use vortex_error::{VortexResult, VortexUnwrap, vortex_err};
10
11use super::Executor;
12
13trait Task {
14 fn run(self: Box<Self>);
15}
16
17struct ExecutorTask<F, R> {
18 task: F,
19 result: oneshot::Sender<R>,
20}
21
22impl<F, R> Task for ExecutorTask<F, R>
23where
24 F: Future<Output = R> + Send,
25 R: Send,
26{
27 fn run(self: Box<Self>) {
28 let Self { task, result } = *self;
29 futures::executor::block_on(async move {
30 let output = task.await;
31 _ = result.send(output);
32 })
33 }
34}
35
36#[derive(Clone, Default)]
38pub struct ThreadsExecutor {
39 inner: Arc<Inner>,
40}
41
42impl ThreadsExecutor {
43 pub fn new(num_threads: NonZeroUsize) -> Self {
44 Self {
45 inner: Arc::new(Inner::new(num_threads)),
46 }
47 }
48}
49
50struct Inner {
51 submitter: flume::Sender<Box<dyn Task + Send>>,
52 is_running: Arc<AtomicBool>,
54}
55
56impl Default for Inner {
57 fn default() -> Self {
58 Self::new(unsafe { NonZeroUsize::new_unchecked(1) })
61 }
62}
63
64impl Inner {
65 fn new(num_threads: NonZeroUsize) -> Self {
66 let (tx, rx) = flume::unbounded::<Box<dyn Task + Send>>();
67 let shutdown_signal = Arc::new(AtomicBool::new(true));
68 (0..num_threads.get()).for_each(|_| {
69 let rx = rx.clone();
70 let shutdown_signal = shutdown_signal.clone();
71 std::thread::spawn(move || {
72 while shutdown_signal.load(Ordering::Relaxed) {
75 if let Ok(task) = rx.recv() {
76 task.run()
77 } else {
78 break;
79 }
80 }
81 });
82 });
83
84 Self {
85 submitter: tx,
86 is_running: shutdown_signal,
87 }
88 }
89}
90
91impl Executor for ThreadsExecutor {
92 fn spawn<F>(&self, f: F) -> BoxFuture<'static, VortexResult<F::Output>>
93 where
94 F: Future + Send + 'static,
95 <F as Future>::Output: Send + 'static,
96 {
97 let (tx, rx) = oneshot::channel();
98 let task = Box::new(ExecutorTask {
99 task: f,
100 result: tx,
101 });
102 self.inner
103 .submitter
104 .send(task)
105 .map_err(|e| vortex_err!("Failed to submit work to executor: {e}"))
106 .vortex_unwrap();
107
108 rx.map_err(|e| vortex_err!("Future canceled: {e}")).boxed()
109 }
110}
111
112impl Drop for Inner {
113 fn drop(&mut self) {
114 self.is_running.store(false, Ordering::SeqCst);
115 }
116}