use std::{fmt::Debug, sync::Arc};
use log::{debug, error};
use tokio::{runtime::Handle, sync::Notify};
pub trait DistExecutor: Debug + Send + Sync {
fn handle(&self) -> &Handle;
}
#[derive(Debug)]
pub struct DefaultExecutor {
handle: Handle,
notify_shutdown: Arc<Notify>,
thread_join_handle: Option<std::thread::JoinHandle<()>>,
}
impl Drop for DefaultExecutor {
fn drop(&mut self) {
self.notify_shutdown.notify_one();
if let Some(thread_join_handle) = self.thread_join_handle.take() {
debug!("Shutting down default executor thread...");
if let Err(e) = thread_join_handle.join() {
error!("Error joining default executor thread: {e:?}",);
} else {
debug!("Default executor thread shutdown successfully.");
}
}
}
}
impl DefaultExecutor {
pub fn new() -> Self {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_name("default-executor-worker")
.build()
.expect("Creating tokio runtime");
let handle = runtime.handle().clone();
let notify_shutdown = Arc::new(Notify::new());
let notify_shutdown_captured = Arc::clone(¬ify_shutdown);
let thread_join_handle = std::thread::spawn(move || {
runtime.block_on(async move {
notify_shutdown_captured.notified().await;
});
});
Self {
handle,
notify_shutdown,
thread_join_handle: Some(thread_join_handle),
}
}
pub fn handle(&self) -> &Handle {
&self.handle
}
}
impl DistExecutor for DefaultExecutor {
fn handle(&self) -> &Handle {
self.handle()
}
}
impl Default for DefaultExecutor {
fn default() -> Self {
Self::new()
}
}
pub fn logging_executor_metrics(handle: &Handle) {
let metrics = handle.metrics();
debug!(
"Executor metrics num_workers: {}, num_alive_tasks: {}, global_queue_depth: {}",
metrics.num_workers(),
metrics.num_alive_tasks(),
metrics.global_queue_depth()
);
}