datafusion_dist/executor.rs
1use std::{fmt::Debug, sync::Arc};
2
3use log::{debug, error};
4use tokio::{runtime::Handle, sync::Notify};
5
6pub trait DistExecutor: Debug + Send + Sync {
7 fn handle(&self) -> &Handle;
8}
9
10/// Creates a Tokio [`Runtime`] for use with CPU bound tasks
11///
12/// Tokio forbids dropping `Runtime`s in async contexts, so creating a separate
13/// `Runtime` correctly is somewhat tricky. This structure manages the creation
14/// and shutdown of a separate thread.
15///
16/// # Notes
17/// On drop, the thread will wait for all remaining tasks to complete.
18///
19/// Depending on your application, more sophisticated shutdown logic may be
20/// required, such as ensuring that no new tasks are added to the runtime.
21///
22/// # Credits
23/// This code is derived from code originally written for [InfluxDB 3.0]
24///
25/// [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor
26#[derive(Debug)]
27pub struct DefaultExecutor {
28 /// Handle is the tokio structure for interacting with a Runtime.
29 handle: Handle,
30 /// Signal to start shutting down
31 notify_shutdown: Arc<Notify>,
32 /// When thread is active, is Some
33 thread_join_handle: Option<std::thread::JoinHandle<()>>,
34}
35
36impl Drop for DefaultExecutor {
37 fn drop(&mut self) {
38 // Notify the thread to shutdown.
39 self.notify_shutdown.notify_one();
40 // In a production system you also need to ensure your code stops adding
41 // new tasks to the underlying runtime after this point to allow the
42 // thread to complete its work and exit cleanly.
43 if let Some(thread_join_handle) = self.thread_join_handle.take() {
44 // If the thread is still running, we wait for it to finish
45 debug!("Shutting down default executor thread...");
46 if let Err(e) = thread_join_handle.join() {
47 error!("Error joining default executor thread: {e:?}",);
48 } else {
49 debug!("Default executor thread shutdown successfully.");
50 }
51 }
52 }
53}
54
55impl DefaultExecutor {
56 /// Create a new Tokio Runtime for CPU bound tasks
57 pub fn new() -> Self {
58 let runtime = tokio::runtime::Builder::new_multi_thread()
59 .enable_all()
60 .thread_name("default-executor-worker")
61 .build()
62 .expect("Creating tokio runtime");
63 let handle = runtime.handle().clone();
64 let notify_shutdown = Arc::new(Notify::new());
65 let notify_shutdown_captured = Arc::clone(¬ify_shutdown);
66
67 // The runtime runs and is dropped on a separate thread
68 let thread_join_handle = std::thread::spawn(move || {
69 runtime.block_on(async move {
70 notify_shutdown_captured.notified().await;
71 });
72 // Note: runtime is dropped here, which will wait for all tasks
73 // to complete
74 });
75
76 Self {
77 handle,
78 notify_shutdown,
79 thread_join_handle: Some(thread_join_handle),
80 }
81 }
82
83 /// Return a handle suitable for spawning CPU bound tasks
84 ///
85 /// # Notes
86 ///
87 /// If a task spawned on this handle attempts to do IO, it will error with a
88 /// message such as:
89 ///
90 /// ```text
91 /// A Tokio 1.x context was found, but IO is disabled.
92 /// ```
93 pub fn handle(&self) -> &Handle {
94 &self.handle
95 }
96}
97
98impl DistExecutor for DefaultExecutor {
99 fn handle(&self) -> &Handle {
100 self.handle()
101 }
102}
103
104impl Default for DefaultExecutor {
105 fn default() -> Self {
106 Self::new()
107 }
108}
109
110pub fn logging_executor_metrics(handle: &Handle) {
111 let metrics = handle.metrics();
112 debug!(
113 "Executor metrics num_workers: {}, num_alive_tasks: {}, global_queue_depth: {}",
114 metrics.num_workers(),
115 metrics.num_alive_tasks(),
116 metrics.global_queue_depth()
117 );
118}