datafusion_dist/
executor.rs

1use std::{fmt::Debug, sync::Arc};
2
3use log::{debug, error};
4use tokio::{runtime::Handle, sync::Notify};
5
6pub trait DistExecutor: Debug + Send + Sync {
7    fn handle(&self) -> &Handle;
8}
9
10/// Creates a Tokio [`Runtime`] for use with CPU bound tasks
11///
12/// Tokio forbids dropping `Runtime`s in async contexts, so creating a separate
13/// `Runtime` correctly is somewhat tricky. This structure manages the creation
14/// and shutdown of a separate thread.
15///
16/// # Notes
17/// On drop, the thread will wait for all remaining tasks to complete.
18///
19/// Depending on your application, more sophisticated shutdown logic may be
20/// required, such as ensuring that no new tasks are added to the runtime.
21///
22/// # Credits
23/// This code is derived from code originally written for [InfluxDB 3.0]
24///
25/// [InfluxDB 3.0]: https://github.com/influxdata/influxdb3_core/tree/6fcbb004232738d55655f32f4ad2385523d10696/executor
26#[derive(Debug)]
27pub struct DefaultExecutor {
28    /// Handle is the tokio structure for interacting with a Runtime.
29    handle: Handle,
30    /// Signal to start shutting down
31    notify_shutdown: Arc<Notify>,
32    /// When thread is active, is Some
33    thread_join_handle: Option<std::thread::JoinHandle<()>>,
34}
35
36impl Drop for DefaultExecutor {
37    fn drop(&mut self) {
38        // Notify the thread to shutdown.
39        self.notify_shutdown.notify_one();
40        // In a production system you also need to ensure your code stops adding
41        // new tasks to the underlying runtime after this point to allow the
42        // thread to complete its work and exit cleanly.
43        if let Some(thread_join_handle) = self.thread_join_handle.take() {
44            // If the thread is still running, we wait for it to finish
45            debug!("Shutting down default executor thread...");
46            if let Err(e) = thread_join_handle.join() {
47                error!("Error joining default executor thread: {e:?}",);
48            } else {
49                debug!("Default executor thread shutdown successfully.");
50            }
51        }
52    }
53}
54
55impl DefaultExecutor {
56    /// Create a new Tokio Runtime for CPU bound tasks
57    pub fn new() -> Self {
58        let runtime = tokio::runtime::Builder::new_multi_thread()
59            .enable_all()
60            .thread_name("default-executor-worker")
61            .build()
62            .expect("Creating tokio runtime");
63        let handle = runtime.handle().clone();
64        let notify_shutdown = Arc::new(Notify::new());
65        let notify_shutdown_captured = Arc::clone(&notify_shutdown);
66
67        // The runtime runs and is dropped on a separate thread
68        let thread_join_handle = std::thread::spawn(move || {
69            runtime.block_on(async move {
70                notify_shutdown_captured.notified().await;
71            });
72            // Note: runtime is dropped here, which will wait for all tasks
73            // to complete
74        });
75
76        Self {
77            handle,
78            notify_shutdown,
79            thread_join_handle: Some(thread_join_handle),
80        }
81    }
82
83    /// Return a handle suitable for spawning CPU bound tasks
84    ///
85    /// # Notes
86    ///
87    /// If a task spawned on this handle attempts to do IO, it will error with a
88    /// message such as:
89    ///
90    /// ```text
91    /// A Tokio 1.x context was found, but IO is disabled.
92    /// ```
93    pub fn handle(&self) -> &Handle {
94        &self.handle
95    }
96}
97
98impl DistExecutor for DefaultExecutor {
99    fn handle(&self) -> &Handle {
100        self.handle()
101    }
102}
103
104impl Default for DefaultExecutor {
105    fn default() -> Self {
106        Self::new()
107    }
108}
109
110pub fn logging_executor_metrics(handle: &Handle) {
111    let metrics = handle.metrics();
112    debug!(
113        "Executor metrics num_workers: {}, num_alive_tasks: {}, global_queue_depth: {}",
114        metrics.num_workers(),
115        metrics.num_alive_tasks(),
116        metrics.global_queue_depth()
117    );
118}