use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, Instant, UNIX_EPOCH};
use std::{env, io};
use anyhow::{Context, Result};
use arrow_flight::flight_service_server::FlightServiceServer;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use kapot_core::object_store_registry::KapotObjectStoreRegistry;
use log::{error, info, warn};
use tempfile::TempDir;
use tokio::fs::DirEntry;
use tokio::signal;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::{fs, time};
use tracing_subscriber::EnvFilter;
use uuid::Uuid;
use datafusion::execution::runtime_env::{RuntimeEnv, RuntimeEnvBuilder};
use datafusion_proto::protobuf::{LogicalPlanNode, PhysicalPlanNode};
#[cfg(not(windows))]
use kapot_core::cache_layer::{
medium::local_disk::LocalDiskMedium, policy::file::FileCacheLayer, CacheLayer,
};
use kapot_core::config::{DataCachePolicy, LogRotationPolicy, TaskSchedulingPolicy};
use kapot_core::error::KapotError;
#[cfg(not(windows))]
use kapot_core::object_store_registry::cache::CachedBasedObjectStoreRegistry;
use kapot_core::serde::protobuf::executor_resource::Resource;
use kapot_core::serde::protobuf::executor_status::Status;
use kapot_core::serde::protobuf::{
executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
ExecutorRegistration, ExecutorResource, ExecutorSpecification, ExecutorStatus,
ExecutorStoppedParams, HeartBeatParams,
};
use kapot_core::serde::KapotCodec;
use kapot_core::utils::{
create_grpc_client_connection, create_grpc_server, get_time_before,
};
use kapot_core::KAPOT_VERSION;
use crate::execution_engine::ExecutionEngine;
use crate::executor::{Executor, TasksDrainedFuture};
use crate::executor_server::TERMINATING;
use crate::flight_service::KapotFlightService;
use crate::metrics::LoggingMetricsCollector;
use crate::shutdown::Shutdown;
use crate::shutdown::ShutdownNotifier;
use crate::terminate;
use crate::{execution_loop, executor_server};
pub struct ExecutorProcessConfig {
pub bind_host: String,
pub external_host: Option<String>,
pub port: u16,
pub grpc_port: u16,
pub scheduler_host: String,
pub scheduler_port: u16,
pub scheduler_connect_timeout_seconds: u16,
pub concurrent_tasks: usize,
pub task_scheduling_policy: TaskSchedulingPolicy,
pub log_dir: Option<String>,
pub work_dir: Option<String>,
pub special_mod_log_level: String,
pub print_thread_info: bool,
pub log_file_name_prefix: String,
pub log_rotation_policy: LogRotationPolicy,
pub job_data_ttl_seconds: u64,
pub job_data_clean_up_interval_seconds: u64,
pub data_cache_policy: Option<DataCachePolicy>,
pub cache_dir: Option<String>,
pub cache_capacity: u64,
pub cache_io_concurrency: u32,
pub grpc_max_decoding_message_size: u32,
pub grpc_max_encoding_message_size: u32,
pub executor_heartbeat_interval_seconds: u64,
pub execution_engine: Option<Arc<dyn ExecutionEngine>>,
}
pub async fn start_executor_process(opt: Arc<ExecutorProcessConfig>) -> Result<()> {
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
let log_filter = EnvFilter::new(rust_log.unwrap_or(opt.special_mod_log_level.clone()));
if let Some(log_dir) = opt.log_dir.clone() {
let log_file = match opt.log_rotation_policy {
LogRotationPolicy::Minutely => {
tracing_appender::rolling::minutely(log_dir, &opt.log_file_name_prefix)
}
LogRotationPolicy::Hourly => {
tracing_appender::rolling::hourly(log_dir, &opt.log_file_name_prefix)
}
LogRotationPolicy::Daily => {
tracing_appender::rolling::daily(log_dir, &opt.log_file_name_prefix)
}
LogRotationPolicy::Never => {
tracing_appender::rolling::never(log_dir, &opt.log_file_name_prefix)
}
};
tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(opt.print_thread_info)
.with_thread_ids(opt.print_thread_info)
.with_writer(log_file)
.with_env_filter(log_filter)
.init();
} else {
tracing_subscriber::fmt()
.with_ansi(false)
.with_thread_names(opt.print_thread_info)
.with_thread_ids(opt.print_thread_info)
.with_writer(io::stdout)
.with_env_filter(log_filter)
.init();
}
let addr = format!("{}:{}", opt.bind_host, opt.port);
let addr = addr
.parse()
.with_context(|| format!("Could not parse address: {addr}"))?;
let scheduler_host = opt.scheduler_host.clone();
let scheduler_port = opt.scheduler_port;
let scheduler_url = format!("http://{scheduler_host}:{scheduler_port}");
let work_dir = opt.work_dir.clone().unwrap_or(
TempDir::new()?
.into_path()
.into_os_string()
.into_string()
.unwrap(),
);
let concurrent_tasks = if opt.concurrent_tasks == 0 {
num_cpus::get()
} else {
opt.concurrent_tasks
};
info!("Running with config:");
info!("work_dir: {}", work_dir);
info!("concurrent_tasks: {}", concurrent_tasks);
let executor_id = Uuid::new_v4().to_string();
let executor_meta = ExecutorRegistration {
id: executor_id.clone(),
optional_host: opt
.external_host
.clone()
.map(executor_registration::OptionalHost::Host),
port: opt.port as u32,
grpc_port: opt.grpc_port as u32,
specification: Some(ExecutorSpecification {
resources: vec![ExecutorResource {
resource: Some(Resource::TaskSlots(concurrent_tasks as u32)),
}],
}),
};
let runtime_env = RuntimeEnvBuilder::new()
.with_object_store_registry(Arc::new(KapotObjectStoreRegistry::new()))
.with_temp_file_path(work_dir.clone())
.build()
.unwrap();
#[cfg(not(windows))]
let runtime_with_data_cache = {
let cache_dir = opt.cache_dir.clone();
let cache_capacity = opt.cache_capacity;
let cache_io_concurrency = opt.cache_io_concurrency;
let cache_layer =
opt.data_cache_policy
.map(|data_cache_policy| match data_cache_policy {
DataCachePolicy::LocalDiskFile => {
let cache_dir = cache_dir.unwrap();
let cache_layer = FileCacheLayer::new(
cache_capacity as usize,
cache_io_concurrency,
LocalDiskMedium::new(cache_dir),
);
CacheLayer::LocalDiskFile(Arc::new(cache_layer))
}
});
if let Some(cache_layer) = cache_layer {
let registry = Arc::new(CachedBasedObjectStoreRegistry::new(
runtime_env.object_store_registry.clone(),
cache_layer,
));
Some(Arc::new(RuntimeEnv {
memory_pool: runtime_env.memory_pool.clone(),
disk_manager: runtime_env.disk_manager.clone(),
cache_manager: runtime_env.cache_manager.clone(),
object_store_registry: registry,
}))
} else {
None
}
};
let metrics_collector = Arc::new(LoggingMetricsCollector::default());
let executor = Arc::new(Executor::new(
executor_meta,
&work_dir,
Arc::new(runtime_env),
runtime_with_data_cache,
metrics_collector,
concurrent_tasks,
opt.execution_engine.clone(),
));
let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
let connection = if connect_timeout == 0 {
create_grpc_client_connection(scheduler_url)
.await
.context("Could not connect to scheduler")
} else {
let start_time = Instant::now().elapsed().as_secs();
let mut x = None;
while x.is_none()
&& Instant::now().elapsed().as_secs() - start_time < connect_timeout
{
match create_grpc_client_connection(scheduler_url.clone())
.await
.context("Could not connect to scheduler")
{
Ok(conn) => {
info!("Connected to scheduler at {}", scheduler_url);
x = Some(conn);
}
Err(e) => {
warn!(
"Failed to connect to scheduler at {} ({}); retrying ...",
scheduler_url, e
);
std::thread::sleep(time::Duration::from_millis(500));
}
}
}
match x {
Some(conn) => Ok(conn),
_ => Err(KapotError::General(format!(
"Timed out attempting to connect to scheduler at {scheduler_url}"
))
.into()),
}
}?;
let mut scheduler = SchedulerGrpcClient::new(connection)
.max_encoding_message_size(opt.grpc_max_encoding_message_size as usize)
.max_decoding_message_size(opt.grpc_max_decoding_message_size as usize);
let default_codec: KapotCodec<LogicalPlanNode, PhysicalPlanNode> = KapotCodec::default();
let scheduler_policy = opt.task_scheduling_policy;
let job_data_ttl_seconds = opt.job_data_ttl_seconds;
let shutdown_noti = ShutdownNotifier::new();
if opt.job_data_clean_up_interval_seconds > 0 {
let mut interval_time =
time::interval(Duration::from_secs(opt.job_data_clean_up_interval_seconds));
let mut shuffle_cleaner_shutdown = shutdown_noti.subscribe_for_shutdown();
let shuffle_cleaner_complete = shutdown_noti.shutdown_complete_tx.clone();
tokio::spawn(async move {
while !shuffle_cleaner_shutdown.is_shutdown() {
tokio::select! {
_ = interval_time.tick() => {
if let Err(e) = clean_shuffle_data_loop(&work_dir, job_data_ttl_seconds).await
{
error!("kapot executor fail to clean_shuffle_data {:?}", e)
}
},
_ = shuffle_cleaner_shutdown.recv() => {
if let Err(e) = clean_all_shuffle_data(&work_dir).await
{
error!("kapot executor fail to clean_shuffle_data {:?}", e)
} else {
info!("Shuffle data cleaned.");
}
drop(shuffle_cleaner_complete);
return;
}
};
}
});
}
let mut service_handlers: FuturesUnordered<JoinHandle<Result<(), KapotError>>> =
FuturesUnordered::new();
let (stop_send, mut stop_recv) = mpsc::channel::<bool>(10);
match scheduler_policy {
TaskSchedulingPolicy::PushStaged => {
service_handlers.push(
executor_server::startup(
scheduler.clone(),
opt.clone(),
executor.clone(),
default_codec,
stop_send,
&shutdown_noti,
)
.await?,
);
}
_ => {
service_handlers.push(tokio::spawn(execution_loop::poll_loop(
scheduler.clone(),
executor.clone(),
default_codec,
)));
}
};
service_handlers.push(tokio::spawn(flight_server_run(
addr,
shutdown_noti.subscribe_for_shutdown(),
)));
let tasks_drained = TasksDrainedFuture(executor);
let (notify_scheduler, stop_reason) = tokio::select! {
service_val = check_services(&mut service_handlers) => {
let msg = format!("executor services stopped with reason {service_val:?}");
info!("{:?}", msg);
(true, msg)
},
_ = signal::ctrl_c() => {
let msg = "executor received ctrl-c event.".to_string();
info!("{:?}", msg);
(true, msg)
},
_ = terminate::sig_term() => {
let msg = "executor received terminate signal.".to_string();
info!("{:?}", msg);
(true, msg)
},
_ = stop_recv.recv() => {
(false, "".to_string())
},
};
info!("setting executor to TERMINATING status");
TERMINATING.store(true, Ordering::Release);
if notify_scheduler {
if let Err(error) = scheduler
.heart_beat_from_executor(HeartBeatParams {
executor_id: executor_id.clone(),
metrics: vec![],
status: Some(ExecutorStatus {
status: Some(Status::Terminating(String::default())),
}),
metadata: Some(ExecutorRegistration {
id: executor_id.clone(),
optional_host: opt
.external_host
.clone()
.map(executor_registration::OptionalHost::Host),
port: opt.port as u32,
grpc_port: opt.grpc_port as u32,
specification: Some(ExecutorSpecification {
resources: vec![ExecutorResource {
resource: Some(Resource::TaskSlots(concurrent_tasks as u32)),
}],
}),
}),
})
.await
{
error!("error sending heartbeat with fenced status: {:?}", error);
}
if let Err(error) = scheduler
.executor_stopped(ExecutorStoppedParams {
executor_id,
reason: stop_reason,
})
.await
{
error!("ExecutorStopped grpc failed: {:?}", error);
}
tasks_drained.await;
}
let ShutdownNotifier {
mut shutdown_complete_rx,
shutdown_complete_tx,
notify_shutdown,
..
} = shutdown_noti;
drop(notify_shutdown);
drop(shutdown_complete_tx);
let _ = shutdown_complete_rx.recv().await;
info!("Executor stopped.");
Ok(())
}
async fn flight_server_run(
addr: SocketAddr,
mut grpc_shutdown: Shutdown,
) -> Result<(), KapotError> {
let service = KapotFlightService::new();
let server = FlightServiceServer::new(service);
info!(
"kapot v{} Rust Executor Flight Server listening on {:?}",
KAPOT_VERSION, addr
);
let shutdown_signal = grpc_shutdown.recv();
let server_future = create_grpc_server()
.add_service(server)
.serve_with_shutdown(addr, shutdown_signal);
server_future.await.map_err(|e| {
error!("Tonic error, Could not start Executor Flight Server.");
KapotError::TonicError(e)
})
}
async fn check_services(
service_handlers: &mut FuturesUnordered<JoinHandle<Result<(), KapotError>>>,
) -> Result<(), KapotError> {
loop {
match service_handlers.next().await {
Some(result) => match result {
Ok(inner_result) => match inner_result {
Ok(()) => (),
Err(e) => return Err(e),
},
Err(e) => return Err(KapotError::TokioError(e)),
},
None => {
info!("service handlers are all done with their work!");
return Ok(());
}
}
}
}
async fn clean_shuffle_data_loop(work_dir: &str, seconds: u64) -> Result<()> {
let mut dir = fs::read_dir(work_dir).await?;
let mut to_deleted = Vec::new();
while let Some(child) = dir.next_entry().await? {
if let Ok(metadata) = child.metadata().await {
let child_path = child.path().into_os_string();
if metadata.is_dir() {
match satisfy_dir_ttl(child, seconds).await {
Err(e) => {
error!(
"Fail to check ttl for the directory {:?} due to {:?}",
child_path, e
)
}
Ok(false) => to_deleted.push(child_path),
Ok(_) => {}
}
} else {
warn!("{:?} under the working directory is a not a directory and will be ignored when doing cleanup", child_path)
}
} else {
error!("Fail to get metadata for file {:?}", child.path())
}
}
info!(
"The directories {:?} that have not been modified for {:?} seconds so that they will be deleted",
to_deleted, seconds
);
for del in to_deleted {
if let Err(e) = fs::remove_dir_all(&del).await {
error!("Fail to remove the directory {:?} due to {}", del, e);
}
}
Ok(())
}
async fn clean_all_shuffle_data(work_dir: &str) -> Result<()> {
let mut dir = fs::read_dir(work_dir).await?;
let mut to_deleted = Vec::new();
while let Some(child) = dir.next_entry().await? {
if let Ok(metadata) = child.metadata().await {
if metadata.is_dir() {
to_deleted.push(child.path().into_os_string())
}
} else {
error!("Can not get metadata from file: {:?}", child)
}
}
info!("The work_dir {:?} will be deleted", &to_deleted);
for del in to_deleted {
if let Err(e) = fs::remove_dir_all(&del).await {
error!("Fail to remove the directory {:?} due to {}", del, e);
}
}
Ok(())
}
pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds: u64) -> Result<bool> {
let cutoff = get_time_before(ttl_seconds);
let mut to_check = vec![dir];
while let Some(dir) = to_check.pop() {
if dir
.metadata()
.await?
.modified()?
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs()
> cutoff
{
return Ok(true);
}
let mut children = fs::read_dir(dir.path()).await?;
while let Some(child) = children.next_entry().await? {
let metadata = child.metadata().await?;
if metadata.is_dir() {
to_check.push(child);
} else if metadata
.modified()?
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs()
> cutoff
{
return Ok(true);
};
}
}
Ok(false)
}
#[cfg(test)]
mod tests {
use super::clean_shuffle_data_loop;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::time::Duration;
use tempfile::TempDir;
#[tokio::test]
async fn test_executor_clean_up() {
let work_dir = TempDir::new().unwrap().into_path();
let job_dir = work_dir.as_path().join("job_id");
let file_path = job_dir.as_path().join("tmp.csv");
let data = "Jorge,2018-12-13T12:12:10.011Z\n\
Andrew,2018-11-13T17:11:10.011Z";
fs::create_dir(job_dir).unwrap();
File::create(&file_path)
.expect("creating temp file")
.write_all(data.as_bytes())
.expect("writing data");
let work_dir_clone = work_dir.clone();
let count1 = fs::read_dir(work_dir.clone()).unwrap().count();
assert_eq!(count1, 1);
let mut handles = vec![];
handles.push(tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
clean_shuffle_data_loop(work_dir_clone.to_str().unwrap(), 1)
.await
.unwrap();
}));
futures::future::join_all(handles).await;
let count2 = fs::read_dir(work_dir.clone()).unwrap().count();
assert_eq!(count2, 0);
}
}