use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::{env, io};
use anyhow::{Context, Result};
use arrow_flight::flight_service_server::FlightServiceServer;
use ballista_executor::{execution_loop, executor_server};
use log::{error, info, warn};
use tempfile::TempDir;
use tokio::fs::DirEntry;
use tokio::signal;
use tokio::{fs, time};
use uuid::Uuid;
use ballista_core::config::{LogRotationPolicy, TaskSchedulingPolicy};
use ballista_core::error::BallistaError;
use ballista_core::serde::protobuf::{
executor_registration, scheduler_grpc_client::SchedulerGrpcClient,
ExecutorRegistration, ExecutorStoppedParams, PhysicalPlanNode,
};
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::BallistaCodec;
use ballista_core::utils::{
create_grpc_client_connection, create_grpc_server, with_object_store_provider,
};
use ballista_core::{print_version, BALLISTA_VERSION};
use ballista_executor::executor::Executor;
use ballista_executor::flight_service::BallistaFlightService;
use ballista_executor::metrics::LoggingMetricsCollector;
use ballista_executor::shutdown::Shutdown;
use ballista_executor::shutdown::ShutdownNotifier;
use ballista_executor::terminate;
use config::prelude::*;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion_proto::protobuf::LogicalPlanNode;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tracing_subscriber::EnvFilter;
#[macro_use]
extern crate configure_me;
#[allow(clippy::all, warnings)]
mod config {
include!(concat!(env!("OUT_DIR"), "/executor_configure_me_config.rs"));
}
#[cfg(feature = "mimalloc")]
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[tokio::main]
async fn main() -> Result<()> {
let (opt, _remaining_args) =
Config::including_optional_config_files(&["/etc/ballista/executor.toml"])
.unwrap_or_exit();
if opt.version {
print_version();
std::process::exit(0);
}
let special_mod_log_level = opt.log_level_setting;
let external_host = opt.external_host;
let bind_host = opt.bind_host;
let port = opt.bind_port;
let grpc_port = opt.bind_grpc_port;
let log_dir = opt.log_dir;
let print_thread_info = opt.print_thread_info;
let log_file_name_prefix = format!(
"executor_{}_{}",
external_host
.clone()
.unwrap_or_else(|| "localhost".to_string()),
port
);
let rust_log = env::var(EnvFilter::DEFAULT_ENV);
let log_filter = EnvFilter::new(rust_log.unwrap_or(special_mod_log_level));
if let Some(log_dir) = log_dir {
let log_file = match opt.log_rotation_policy {
LogRotationPolicy::Minutely => {
tracing_appender::rolling::minutely(log_dir, &log_file_name_prefix)
}
LogRotationPolicy::Hourly => {
tracing_appender::rolling::hourly(log_dir, &log_file_name_prefix)
}
LogRotationPolicy::Daily => {
tracing_appender::rolling::daily(log_dir, &log_file_name_prefix)
}
LogRotationPolicy::Never => {
tracing_appender::rolling::never(log_dir, &log_file_name_prefix)
}
};
tracing_subscriber::fmt()
.with_ansi(true)
.with_thread_names(print_thread_info)
.with_thread_ids(print_thread_info)
.with_writer(log_file)
.with_env_filter(log_filter)
.init();
} else {
tracing_subscriber::fmt()
.with_ansi(true)
.with_thread_names(print_thread_info)
.with_thread_ids(print_thread_info)
.with_writer(io::stdout)
.with_env_filter(log_filter)
.init();
}
let addr = format!("{}:{}", bind_host, port);
let addr = addr
.parse()
.with_context(|| format!("Could not parse address: {}", addr))?;
let scheduler_host = opt.scheduler_host;
let scheduler_port = opt.scheduler_port;
let scheduler_url = format!("http://{}:{}", scheduler_host, scheduler_port);
let work_dir = opt.work_dir.unwrap_or(
TempDir::new()?
.into_path()
.into_os_string()
.into_string()
.unwrap(),
);
let concurrent_tasks = if opt.concurrent_tasks == 0 {
num_cpus::get()
} else {
opt.concurrent_tasks
};
info!("Running with config:");
info!("work_dir: {}", work_dir);
info!("concurrent_tasks: {}", concurrent_tasks);
let executor_id = Uuid::new_v4().to_string();
let executor_meta = ExecutorRegistration {
id: executor_id.clone(),
optional_host: external_host
.clone()
.map(executor_registration::OptionalHost::Host),
port: port as u32,
grpc_port: grpc_port as u32,
specification: Some(
ExecutorSpecification {
task_slots: concurrent_tasks as u32,
}
.into(),
),
};
let config = with_object_store_provider(
RuntimeConfig::new().with_temp_file_path(work_dir.clone()),
);
let runtime = Arc::new(RuntimeEnv::new(config).map_err(|_| {
BallistaError::Internal("Failed to init Executor RuntimeEnv".to_owned())
})?);
let metrics_collector = Arc::new(LoggingMetricsCollector::default());
let executor = Arc::new(Executor::new(
executor_meta,
&work_dir,
runtime,
metrics_collector,
concurrent_tasks,
));
let connect_timeout = opt.scheduler_connect_timeout_seconds as u64;
let connection = if connect_timeout == 0 {
create_grpc_client_connection(scheduler_url)
.await
.context("Could not connect to scheduler")
} else {
let start_time = Instant::now().elapsed().as_secs();
let mut x = None;
while x.is_none()
&& Instant::now().elapsed().as_secs() - start_time < connect_timeout
{
match create_grpc_client_connection(scheduler_url.clone())
.await
.context("Could not connect to scheduler")
{
Ok(conn) => {
info!("Connected to scheduler at {}", scheduler_url);
x = Some(conn);
}
Err(e) => {
warn!(
"Failed to connect to scheduler at {} ({}); retrying ...",
scheduler_url, e
);
std::thread::sleep(time::Duration::from_millis(500));
}
}
}
match x {
Some(conn) => Ok(conn),
_ => Err(BallistaError::General(format!(
"Timed out attempting to connect to scheduler at {}",
scheduler_url
))
.into()),
}
}?;
let mut scheduler = SchedulerGrpcClient::new(connection);
let default_codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
BallistaCodec::default();
let scheduler_policy = opt.task_scheduling_policy;
let job_data_ttl_seconds = opt.job_data_ttl_seconds;
let shutdown_noti = ShutdownNotifier::new();
if opt.job_data_clean_up_interval_seconds > 0 {
let mut interval_time =
time::interval(Duration::from_secs(opt.job_data_clean_up_interval_seconds));
let mut shuffle_cleaner_shutdown = shutdown_noti.subscribe_for_shutdown();
let shuffle_cleaner_complete = shutdown_noti.shutdown_complete_tx.clone();
tokio::spawn(async move {
while !shuffle_cleaner_shutdown.is_shutdown() {
tokio::select! {
_ = interval_time.tick() => {
if let Err(e) = clean_shuffle_data_loop(&work_dir, job_data_ttl_seconds).await
{
error!("Ballista executor fail to clean_shuffle_data {:?}", e)
}
},
_ = shuffle_cleaner_shutdown.recv() => {
if let Err(e) = clean_all_shuffle_data(&work_dir).await
{
error!("Ballista executor fail to clean_shuffle_data {:?}", e)
} else {
info!("Shuffle data cleaned.");
}
drop(shuffle_cleaner_complete);
return;
}
};
}
});
}
let mut service_handlers: FuturesUnordered<JoinHandle<Result<(), BallistaError>>> =
FuturesUnordered::new();
let (stop_send, mut stop_recv) = mpsc::channel::<bool>(10);
match scheduler_policy {
TaskSchedulingPolicy::PushStaged => {
service_handlers.push(
executor_server::startup(
scheduler.clone(),
bind_host,
executor.clone(),
default_codec,
stop_send,
&shutdown_noti,
)
.await?,
);
}
_ => {
service_handlers.push(tokio::spawn(execution_loop::poll_loop(
scheduler.clone(),
executor.clone(),
default_codec,
)));
}
};
service_handlers.push(tokio::spawn(flight_server_run(
addr,
shutdown_noti.subscribe_for_shutdown(),
)));
let (notify_scheduler, stop_reason) = tokio::select! {
service_val = check_services(&mut service_handlers) => {
let msg = format!("executor services stopped with reason {:?}", service_val);
info!("{:?}", msg);
(true, msg)
},
_ = signal::ctrl_c() => {
let msg = "executor received ctrl-c event.".to_string();
info!("{:?}", msg);
(true, msg)
},
_ = terminate::sig_term() => {
let msg = "executor received terminate signal.".to_string();
info!("{:?}", msg);
(true, msg)
},
_ = stop_recv.recv() => {
(false, "".to_string())
},
};
if notify_scheduler {
if let Err(error) = scheduler
.executor_stopped(ExecutorStoppedParams {
executor_id,
reason: stop_reason,
})
.await
{
error!("ExecutorStopped grpc failed: {:?}", error);
}
}
let ShutdownNotifier {
mut shutdown_complete_rx,
shutdown_complete_tx,
notify_shutdown,
..
} = shutdown_noti;
drop(notify_shutdown);
drop(shutdown_complete_tx);
let _ = shutdown_complete_rx.recv().await;
info!("Executor stopped.");
Ok(())
}
async fn flight_server_run(
addr: SocketAddr,
mut grpc_shutdown: Shutdown,
) -> Result<(), BallistaError> {
let service = BallistaFlightService::new();
let server = FlightServiceServer::new(service);
info!(
"Ballista v{} Rust Executor Flight Server listening on {:?}",
BALLISTA_VERSION, addr
);
let shutdown_signal = grpc_shutdown.recv();
let server_future = create_grpc_server()
.add_service(server)
.serve_with_shutdown(addr, shutdown_signal);
server_future.await.map_err(|e| {
error!("Tonic error, Could not start Executor Flight Server.");
BallistaError::TonicError(e)
})
}
async fn check_services(
service_handlers: &mut FuturesUnordered<JoinHandle<Result<(), BallistaError>>>,
) -> Result<(), BallistaError> {
loop {
match service_handlers.next().await {
Some(result) => match result {
Ok(inner_result) => match inner_result {
Ok(()) => (),
Err(e) => return Err(e),
},
Err(e) => return Err(BallistaError::TokioError(e)),
},
None => {
info!("service handlers are all done with their work!");
return Ok(());
}
}
}
}
async fn clean_shuffle_data_loop(work_dir: &str, seconds: u64) -> Result<()> {
let mut dir = fs::read_dir(work_dir).await?;
let mut to_deleted = Vec::new();
while let Some(child) = dir.next_entry().await? {
if let Ok(metadata) = child.metadata().await {
let child_path = child.path().into_os_string();
if metadata.is_dir() {
match satisfy_dir_ttl(child, seconds).await {
Err(e) => {
error!(
"Fail to check ttl for the directory {:?} due to {:?}",
child_path, e
)
}
Ok(false) => to_deleted.push(child_path),
Ok(_) => {}
}
} else {
warn!("{:?} under the working directory is a not a directory and will be ignored when doing cleanup", child_path)
}
} else {
error!("Fail to get metadata for file {:?}", child.path())
}
}
info!(
"The directories {:?} that have not been modified for {:?} seconds so that they will be deleted",
to_deleted, seconds
);
for del in to_deleted {
if let Err(e) = fs::remove_dir_all(&del).await {
error!("Fail to remove the directory {:?} due to {}", del, e);
}
}
Ok(())
}
async fn clean_all_shuffle_data(work_dir: &str) -> Result<()> {
let mut dir = fs::read_dir(work_dir).await?;
let mut to_deleted = Vec::new();
while let Some(child) = dir.next_entry().await? {
if let Ok(metadata) = child.metadata().await {
if metadata.is_dir() {
to_deleted.push(child.path().into_os_string())
}
} else {
error!("Can not get metadata from file: {:?}", child)
}
}
info!("The work_dir {:?} will be deleted", &to_deleted);
for del in to_deleted {
if let Err(e) = fs::remove_dir_all(&del).await {
error!("Fail to remove the directory {:?} due to {}", del, e);
}
}
Ok(())
}
pub async fn satisfy_dir_ttl(dir: DirEntry, ttl_seconds: u64) -> Result<bool> {
let cutoff = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.checked_sub(Duration::from_secs(ttl_seconds))
.expect("The cut off time went backwards");
let mut to_check = vec![dir];
while let Some(dir) = to_check.pop() {
if dir
.metadata()
.await?
.modified()?
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
> cutoff
{
return Ok(true);
}
let mut children = fs::read_dir(dir.path()).await?;
while let Some(child) = children.next_entry().await? {
let metadata = child.metadata().await?;
if metadata.is_dir() {
to_check.push(child);
} else if metadata
.modified()?
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
> cutoff
{
return Ok(true);
};
}
}
Ok(false)
}
#[cfg(test)]
mod tests {
use crate::clean_shuffle_data_loop;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::time::Duration;
use tempfile::TempDir;
#[tokio::test]
async fn test_executor_clean_up() {
let work_dir = TempDir::new().unwrap().into_path();
let job_dir = work_dir.as_path().join("job_id");
let file_path = job_dir.as_path().join("tmp.csv");
let data = "Jorge,2018-12-13T12:12:10.011Z\n\
Andrew,2018-11-13T17:11:10.011Z";
fs::create_dir(job_dir).unwrap();
File::create(&file_path)
.expect("creating temp file")
.write_all(data.as_bytes())
.expect("writing data");
let work_dir_clone = work_dir.clone();
let count1 = fs::read_dir(work_dir.clone()).unwrap().count();
assert_eq!(count1, 1);
let mut handles = vec![];
handles.push(tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
clean_shuffle_data_loop(work_dir_clone.to_str().unwrap(), 1)
.await
.unwrap();
}));
futures::future::join_all(handles).await;
let count2 = fs::read_dir(work_dir.clone()).unwrap().count();
assert_eq!(count2, 0);
}
}