use crate::scheduler_server::event::QueryStageSchedulerEvent;
use crate::scheduler_server::SessionBuilder;
use crate::state::backend::{Keyspace, Operation, StateBackendClient};
use crate::state::execution_graph::{
ExecutionGraph, ExecutionStage, RunningTaskInfo, TaskDescription,
};
use crate::state::executor_manager::{ExecutorManager, ExecutorReservation};
use crate::state::{decode_protobuf, encode_protobuf, with_lock, with_locks};
use ballista_core::config::BallistaConfig;
use ballista_core::error::BallistaError;
use ballista_core::error::Result;
use crate::state::backend::Keyspace::{CompletedJobs, FailedJobs};
use crate::state::session_manager::create_datafusion_context;
use ballista_core::serde::protobuf::{
self, job_status, FailedJob, JobStatus, MultiTaskDefinition, TaskDefinition, TaskId,
TaskStatus,
};
use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto;
use ballista_core::serde::scheduler::ExecutorMetadata;
use ballista_core::serde::{AsExecutionPlan, BallistaCodec};
use dashmap::DashMap;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_proto::logical_plan::AsLogicalPlan;
use log::{debug, error, info, warn};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use tracing::trace;
type ActiveJobCache = Arc<DashMap<String, JobInfoCache>>;
pub const TASK_MAX_FAILURES: usize = 4;
pub const STAGE_MAX_FAILURES: usize = 4;
#[async_trait::async_trait]
pub(crate) trait TaskLauncher: Send + Sync + 'static {
async fn launch_tasks(
&self,
executor: &ExecutorMetadata,
tasks: Vec<MultiTaskDefinition>,
executor_manager: &ExecutorManager,
) -> Result<()>;
}
struct DefaultTaskLauncher {
scheduler_id: String,
}
impl DefaultTaskLauncher {
pub fn new(scheduler_id: String) -> Self {
Self { scheduler_id }
}
}
#[async_trait::async_trait]
impl TaskLauncher for DefaultTaskLauncher {
async fn launch_tasks(
&self,
executor: &ExecutorMetadata,
tasks: Vec<MultiTaskDefinition>,
executor_manager: &ExecutorManager,
) -> Result<()> {
info!("Launching multi task on executor {:?}", executor.id);
let mut client = executor_manager.get_client(&executor.id).await?;
client
.launch_multi_task(protobuf::LaunchMultiTaskParams {
multi_tasks: tasks,
scheduler_id: self.scheduler_id.clone(),
})
.await
.map_err(|e| {
BallistaError::Internal(format!(
"Failed to connect to executor {}: {:?}",
executor.id, e
))
})?;
Ok(())
}
}
#[derive(Clone)]
pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
state: Arc<dyn StateBackendClient>,
session_builder: SessionBuilder,
codec: BallistaCodec<T, U>,
scheduler_id: String,
active_job_cache: ActiveJobCache,
launcher: Arc<dyn TaskLauncher>,
}
#[derive(Clone)]
struct JobInfoCache {
execution_graph: Arc<RwLock<ExecutionGraph>>,
encoded_stage_plans: HashMap<usize, Vec<u8>>,
}
impl JobInfoCache {
fn new(graph: ExecutionGraph) -> Self {
Self {
execution_graph: Arc::new(RwLock::new(graph)),
encoded_stage_plans: HashMap::new(),
}
}
}
#[derive(Clone)]
pub struct UpdatedStages {
pub resolved_stages: HashSet<usize>,
pub successful_stages: HashSet<usize>,
pub failed_stages: HashMap<usize, String>,
pub rollback_running_stages: HashMap<usize, HashSet<String>>,
pub resubmit_successful_stages: HashSet<usize>,
}
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U> {
pub fn new(
state: Arc<dyn StateBackendClient>,
session_builder: SessionBuilder,
codec: BallistaCodec<T, U>,
scheduler_id: String,
) -> Self {
Self {
state,
session_builder,
codec,
scheduler_id: scheduler_id.clone(),
active_job_cache: Arc::new(DashMap::new()),
launcher: Arc::new(DefaultTaskLauncher::new(scheduler_id)),
}
}
#[allow(dead_code)]
pub(crate) fn with_launcher(
state: Arc<dyn StateBackendClient>,
session_builder: SessionBuilder,
codec: BallistaCodec<T, U>,
scheduler_id: String,
launcher: Arc<dyn TaskLauncher>,
) -> Self {
Self {
state,
session_builder,
codec,
scheduler_id,
active_job_cache: Arc::new(DashMap::new()),
launcher,
}
}
pub async fn submit_job(
&self,
job_id: &str,
job_name: &str,
session_id: &str,
plan: Arc<dyn ExecutionPlan>,
queued_at: u64,
) -> Result<()> {
let mut graph = ExecutionGraph::new(
&self.scheduler_id,
job_id,
job_name,
session_id,
plan,
queued_at,
)?;
info!("Submitting execution graph: {:?}", graph);
self.state
.put(
Keyspace::ActiveJobs,
job_id.to_owned(),
self.encode_execution_graph(graph.clone())?,
)
.await?;
graph.revive();
self.active_job_cache
.insert(job_id.to_owned(), JobInfoCache::new(graph));
Ok(())
}
pub async fn get_jobs(&self) -> Result<Vec<JobOverview>> {
let mut job_ids = vec![];
for job_id in self.state.scan_keys(Keyspace::ActiveJobs).await? {
job_ids.push(job_id);
}
for job_id in self.state.scan_keys(Keyspace::CompletedJobs).await? {
job_ids.push(job_id);
}
for job_id in self.state.scan_keys(Keyspace::FailedJobs).await? {
job_ids.push(job_id);
}
let mut jobs = vec![];
for job_id in &job_ids {
let graph = self.get_execution_graph(job_id).await?;
let mut completed_stages = 0;
for stage in graph.stages().values() {
if let ExecutionStage::Successful(_) = stage {
completed_stages += 1;
}
}
jobs.push(JobOverview {
job_id: job_id.clone(),
job_name: graph.job_name().to_string(),
status: graph.status(),
start_time: graph.start_time(),
end_time: graph.end_time(),
num_stages: graph.stage_count(),
completed_stages,
});
}
Ok(jobs)
}
pub async fn get_job_status(&self, job_id: &str) -> Result<Option<JobStatus>> {
if let Some(graph) = self.get_active_execution_graph(job_id).await {
let status = graph.read().await.status();
Ok(Some(status))
} else if let Ok(graph) = self.get_execution_graph(job_id).await {
Ok(Some(graph.status()))
} else {
let value = self.state.get(Keyspace::FailedJobs, job_id).await?;
if !value.is_empty() {
let status = decode_protobuf(&value)?;
Ok(Some(status))
} else {
Ok(None)
}
}
}
pub async fn get_job_execution_graph(
&self,
job_id: &str,
) -> Result<Option<Arc<ExecutionGraph>>> {
if let Some(graph) = self.get_active_execution_graph(job_id).await {
Ok(Some(Arc::new(graph.read().await.clone())))
} else if let Ok(graph) = self.get_execution_graph(job_id).await {
Ok(Some(Arc::new(graph)))
} else {
Ok(None)
}
}
pub(crate) async fn update_task_statuses(
&self,
executor: &ExecutorMetadata,
task_status: Vec<TaskStatus>,
) -> Result<Vec<QueryStageSchedulerEvent>> {
let mut job_updates: HashMap<String, Vec<TaskStatus>> = HashMap::new();
for status in task_status {
trace!("Task Update\n{:?}", status);
let job_id = status.job_id.clone();
let job_task_statuses = job_updates.entry(job_id).or_insert_with(Vec::new);
job_task_statuses.push(status);
}
let mut events: Vec<QueryStageSchedulerEvent> = vec![];
for (job_id, statuses) in job_updates {
let num_tasks = statuses.len();
debug!("Updating {} tasks in job {}", num_tasks, job_id);
let graph = self.get_active_execution_graph(&job_id).await;
let job_events = if let Some(graph) = graph {
let mut graph = graph.write().await;
graph.update_task_status(
executor,
statuses,
TASK_MAX_FAILURES,
STAGE_MAX_FAILURES,
)?
} else {
error!("Fail to find job {} in the active cache and it may not be curated by this scheduler", job_id);
vec![]
};
for event in job_events {
events.push(event);
}
}
Ok(events)
}
pub async fn fill_reservations(
&self,
reservations: &[ExecutorReservation],
) -> Result<(
Vec<(String, TaskDescription)>,
Vec<ExecutorReservation>,
usize,
)> {
let free_reservations: Vec<ExecutorReservation> = reservations
.iter()
.map(|reservation| {
ExecutorReservation::new_free(reservation.executor_id.clone())
})
.collect();
let mut assignments: Vec<(String, TaskDescription)> = vec![];
let mut pending_tasks = 0usize;
let mut assign_tasks = 0usize;
for pairs in self.active_job_cache.iter() {
let (_job_id, job_info) = pairs.pair();
let mut graph = job_info.execution_graph.write().await;
for reservation in free_reservations.iter().skip(assign_tasks) {
if let Some(task) = graph.pop_next_task(&reservation.executor_id)? {
assignments.push((reservation.executor_id.clone(), task));
assign_tasks += 1;
} else {
break;
}
}
if assign_tasks >= free_reservations.len() {
pending_tasks += graph.available_tasks();
break;
}
}
let mut unassigned = vec![];
for reservation in free_reservations.iter().skip(assign_tasks) {
unassigned.push(reservation.clone());
}
Ok((assignments, unassigned, pending_tasks))
}
pub(crate) async fn succeed_job(&self, job_id: &str) -> Result<()> {
debug!("Moving job {} from Active to Success", job_id);
let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
with_lock(lock, self.state.delete(Keyspace::ActiveJobs, job_id)).await?;
if let Some(graph) = self.remove_active_execution_graph(job_id).await {
let graph = graph.read().await.clone();
if graph.is_successful() {
let value = self.encode_execution_graph(graph)?;
self.state
.put(Keyspace::CompletedJobs, job_id.to_owned(), value)
.await?;
} else {
error!("Job {} has not finished and cannot be completed", job_id);
return Ok(());
}
} else {
warn!("Fail to find job {} in the cache", job_id);
}
Ok(())
}
pub(crate) async fn cancel_job(
&self,
job_id: &str,
) -> Result<(Vec<RunningTaskInfo>, usize)> {
self.abort_job(job_id, "Cancelled".to_owned()).await
}
pub(crate) async fn abort_job(
&self,
job_id: &str,
failure_reason: String,
) -> Result<(Vec<RunningTaskInfo>, usize)> {
let locks = self
.state
.acquire_locks(vec![
(Keyspace::ActiveJobs, job_id),
(Keyspace::FailedJobs, job_id),
])
.await?;
let (tasks_to_cancel, pending_tasks) = if let Some(graph) =
self.get_active_execution_graph(job_id).await
{
let (pending_tasks, running_tasks) = {
let guard = graph.read().await;
(guard.available_tasks(), guard.running_tasks())
};
info!(
"Cancelling {} running tasks for job {}",
running_tasks.len(),
job_id
);
with_locks(locks, self.fail_job_state(job_id, failure_reason))
.await
.unwrap();
(running_tasks, pending_tasks)
} else {
warn!("Fail to find job {} in the cache, unable to cancel tasks for job, fail the job state only.", job_id);
with_locks(locks, self.fail_job_state(job_id, failure_reason)).await?;
(vec![], 0)
};
Ok((tasks_to_cancel, pending_tasks))
}
pub async fn fail_unscheduled_job(
&self,
job_id: &str,
failure_reason: String,
) -> Result<()> {
debug!("Moving job {} from Active or Queue to Failed", job_id);
let locks = self
.state
.acquire_locks(vec![
(Keyspace::ActiveJobs, job_id),
(Keyspace::FailedJobs, job_id),
])
.await?;
with_locks(locks, self.fail_job_state(job_id, failure_reason)).await?;
Ok(())
}
async fn fail_job_state(&self, job_id: &str, failure_reason: String) -> Result<()> {
let txn_operations = |value: Vec<u8>| -> Vec<(Operation, Keyspace, String)> {
vec![
(Operation::Delete, Keyspace::ActiveJobs, job_id.to_string()),
(
Operation::Put(value),
Keyspace::FailedJobs,
job_id.to_string(),
),
]
};
if let Some(graph) = self.remove_active_execution_graph(job_id).await {
let mut graph = graph.write().await;
let previous_status = graph.status();
graph.fail_job(failure_reason);
let value = encode_protobuf(&graph.status())?;
let txn_ops = txn_operations(value);
let result = self.state.apply_txn(txn_ops).await;
if result.is_err() {
graph.update_status(previous_status);
warn!("Rollback Execution Graph state change since it did not persisted due to a possible connection error.")
};
} else {
info!("Fail to find job {} in the cache", job_id);
let status = JobStatus {
status: Some(job_status::Status::Failed(FailedJob {
error: failure_reason.clone(),
})),
};
let value = encode_protobuf(&status)?;
let txn_ops = txn_operations(value);
self.state.apply_txn(txn_ops).await?;
};
Ok(())
}
pub async fn update_job(&self, job_id: &str) -> Result<usize> {
debug!("Update job {} in Active", job_id);
if let Some(graph) = self.get_active_execution_graph(job_id).await {
let mut graph = graph.write().await;
let curr_available_tasks = graph.available_tasks();
graph.revive();
let graph = graph.clone();
let new_tasks = graph.available_tasks() - curr_available_tasks;
let value = self.encode_execution_graph(graph)?;
self.state
.put(Keyspace::ActiveJobs, job_id.to_owned(), value)
.await?;
Ok(new_tasks)
} else {
warn!("Fail to find job {} in the cache", job_id);
Ok(0)
}
}
pub async fn executor_lost(&self, executor_id: &str) -> Result<Vec<RunningTaskInfo>> {
let mut running_tasks_to_cancel: Vec<RunningTaskInfo> = vec![];
let updated_graphs: DashMap<String, ExecutionGraph> = DashMap::new();
{
for pairs in self.active_job_cache.iter() {
let (job_id, job_info) = pairs.pair();
let mut graph = job_info.execution_graph.write().await;
let reset = graph.reset_stages_on_lost_executor(executor_id)?;
if !reset.0.is_empty() {
updated_graphs.insert(job_id.to_owned(), graph.clone());
running_tasks_to_cancel.extend(reset.1);
}
}
}
let lock = self.state.lock(Keyspace::ActiveJobs, "").await?;
with_lock(lock, async {
let txn_ops: Vec<(Operation, Keyspace, String)> = updated_graphs
.into_iter()
.map(|(job_id, graph)| {
let value = self.encode_execution_graph(graph)?;
Ok((Operation::Put(value), Keyspace::ActiveJobs, job_id))
})
.collect::<Result<Vec<_>>>()?;
self.state.apply_txn(txn_ops).await?;
Ok(running_tasks_to_cancel)
})
.await
}
pub async fn get_available_task_count(&self, job_id: &str) -> Result<usize> {
if let Some(graph) = self.get_active_execution_graph(job_id).await {
let available_tasks = graph.read().await.available_tasks();
Ok(available_tasks)
} else {
warn!("Fail to find job {} in the cache", job_id);
Ok(0)
}
}
#[allow(dead_code)]
pub fn prepare_task_definition(
&self,
task: TaskDescription,
) -> Result<TaskDefinition> {
debug!("Preparing task definition for {:?}", task);
let job_id = task.partition.job_id.clone();
let stage_id = task.partition.stage_id;
if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) {
let plan = if let Some(plan) = job_info.encoded_stage_plans.get(&stage_id) {
plan.clone()
} else {
let mut plan_buf: Vec<u8> = vec![];
let plan_proto = U::try_from_physical_plan(
task.plan,
self.codec.physical_extension_codec(),
)?;
plan_proto.try_encode(&mut plan_buf)?;
job_info
.encoded_stage_plans
.insert(stage_id, plan_buf.clone());
plan_buf
};
let output_partitioning =
hash_partitioning_to_proto(task.output_partitioning.as_ref())?;
let task_definition = TaskDefinition {
task_id: task.task_id as u32,
task_attempt_num: task.task_attempt as u32,
job_id,
stage_id: stage_id as u32,
stage_attempt_num: task.stage_attempt_num as u32,
partition_id: task.partition.partition_id as u32,
plan,
output_partitioning,
session_id: task.session_id,
launch_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
props: vec![],
};
Ok(task_definition)
} else {
Err(BallistaError::General(format!(
"Cannot prepare task definition for job {} which is not in active cache",
job_id
)))
}
}
pub(crate) async fn launch_multi_task(
&self,
executor: &ExecutorMetadata,
tasks: Vec<Vec<TaskDescription>>,
executor_manager: &ExecutorManager,
) -> Result<()> {
let multi_tasks: Result<Vec<MultiTaskDefinition>> = tasks
.into_iter()
.map(|stage_tasks| self.prepare_multi_task_definition(stage_tasks))
.collect();
self.launcher
.launch_tasks(executor, multi_tasks?, executor_manager)
.await
}
#[allow(dead_code)]
fn prepare_multi_task_definition(
&self,
tasks: Vec<TaskDescription>,
) -> Result<MultiTaskDefinition> {
if let Some(task) = tasks.get(0) {
let session_id = task.session_id.clone();
let job_id = task.partition.job_id.clone();
let stage_id = task.partition.stage_id;
let stage_attempt_num = task.stage_attempt_num;
if log::max_level() >= log::Level::Debug {
let task_ids: Vec<usize> = tasks
.iter()
.map(|task| task.partition.partition_id)
.collect();
debug!("Preparing multi task definition for tasks {:?} belonging to job stage {}/{}", task_ids, job_id, stage_id);
trace!("With task details {:?}", tasks);
}
if let Some(mut job_info) = self.active_job_cache.get_mut(&job_id) {
let plan = if let Some(plan) = job_info.encoded_stage_plans.get(&stage_id)
{
plan.clone()
} else {
let mut plan_buf: Vec<u8> = vec![];
let plan_proto = U::try_from_physical_plan(
task.plan.clone(),
self.codec.physical_extension_codec(),
)?;
plan_proto.try_encode(&mut plan_buf)?;
job_info
.encoded_stage_plans
.insert(stage_id, plan_buf.clone());
plan_buf
};
let output_partitioning =
hash_partitioning_to_proto(task.output_partitioning.as_ref())?;
let task_ids = tasks
.iter()
.map(|task| TaskId {
task_id: task.task_id as u32,
task_attempt_num: task.task_attempt as u32,
partition_id: task.partition.partition_id as u32,
})
.collect();
let multi_task_definition = MultiTaskDefinition {
task_ids,
job_id,
stage_id: stage_id as u32,
stage_attempt_num: stage_attempt_num as u32,
plan,
output_partitioning,
session_id,
launch_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64,
props: vec![],
};
Ok(multi_task_definition)
} else {
Err(BallistaError::General(format!("Cannot prepare multi task definition for job {} which is not in active cache", job_id)))
}
} else {
Err(BallistaError::General(
"Cannot prepare multi task definition for an empty vec".to_string(),
))
}
}
pub(crate) async fn get_active_execution_graph(
&self,
job_id: &str,
) -> Option<Arc<RwLock<ExecutionGraph>>> {
self.active_job_cache
.get(job_id)
.map(|value| value.execution_graph.clone())
}
pub(crate) async fn remove_active_execution_graph(
&self,
job_id: &str,
) -> Option<Arc<RwLock<ExecutionGraph>>> {
self.active_job_cache
.remove(job_id)
.map(|value| value.1.execution_graph)
}
pub(crate) async fn get_execution_graph(
&self,
job_id: &str,
) -> Result<ExecutionGraph> {
let value = self.state.get(Keyspace::ActiveJobs, job_id).await?;
if value.is_empty() {
let value = self.state.get(Keyspace::CompletedJobs, job_id).await?;
self.decode_execution_graph(value).await
} else {
self.decode_execution_graph(value).await
}
}
async fn get_session(&self, session_id: &str) -> Result<Arc<SessionContext>> {
let value = self.state.get(Keyspace::Sessions, session_id).await?;
let settings: protobuf::SessionSettings = decode_protobuf(&value)?;
let mut config_builder = BallistaConfig::builder();
for kv_pair in &settings.configs {
config_builder = config_builder.set(&kv_pair.key, &kv_pair.value);
}
let config = config_builder.build()?;
Ok(create_datafusion_context(&config, self.session_builder))
}
async fn decode_execution_graph(&self, value: Vec<u8>) -> Result<ExecutionGraph> {
let proto: protobuf::ExecutionGraph = decode_protobuf(&value)?;
let session_id = &proto.session_id;
let session_ctx = self.get_session(session_id).await?;
ExecutionGraph::decode_execution_graph(proto, &self.codec, &session_ctx).await
}
fn encode_execution_graph(&self, graph: ExecutionGraph) -> Result<Vec<u8>> {
let proto = ExecutionGraph::encode_execution_graph(graph, &self.codec)?;
encode_protobuf(&proto)
}
pub fn generate_job_id(&self) -> String {
let mut rng = thread_rng();
std::iter::repeat(())
.map(|()| rng.sample(Alphanumeric))
.map(char::from)
.take(7)
.collect()
}
pub(crate) fn clean_up_failed_job_delayed(
&self,
job_id: String,
clean_up_interval: u64,
) {
if clean_up_interval == 0 {
info!("The interval is 0 and the clean up for the failed job state {} will not triggered", job_id);
return;
}
self.delete_from_state_backend_delayed(FailedJobs, job_id, clean_up_interval)
}
pub(crate) fn delete_successful_job_delayed(
&self,
job_id: String,
clean_up_interval: u64,
) {
if clean_up_interval == 0 {
info!("The interval is 0 and the clean up for the successful job state {} will not triggered", job_id);
return;
}
self.delete_from_state_backend_delayed(CompletedJobs, job_id, clean_up_interval)
}
fn delete_from_state_backend_delayed(
&self,
keyspace: Keyspace,
key: String,
clean_up_interval: u64,
) {
let state = self.state.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(clean_up_interval)).await;
Self::delete_from_state_backend(state, keyspace, &key).await
});
}
async fn delete_from_state_backend(
state: Arc<dyn StateBackendClient>,
keyspace: Keyspace,
key: &str,
) -> Result<()> {
let lock = state.lock(keyspace.clone(), "").await?;
with_lock(lock, state.delete(keyspace, key)).await?;
Ok(())
}
}
pub struct JobOverview {
pub job_id: String,
pub job_name: String,
pub status: JobStatus,
pub start_time: u64,
pub end_time: u64,
pub num_stages: usize,
pub completed_stages: usize,
}