use std::{
collections::{HashMap, HashSet},
fmt::Debug,
sync::Arc,
};
use datafusion_execution::SendableRecordBatchStream;
use datafusion_physical_plan::ExecutionPlan;
use serde::{Deserialize, Serialize};
use crate::{
DistError, DistResult, JobId,
cluster::NodeId,
planner::{StageId, TaskId},
runtime::{StageState, TaskMetrics, TaskSet},
};
#[async_trait::async_trait]
pub trait DistNetwork: Debug + Send + Sync {
fn local_node(&self) -> NodeId;
async fn send_tasks(&self, node_id: NodeId, scheduled_tasks: ScheduledTasks) -> DistResult<()>;
async fn execute_task(
&self,
node_id: NodeId,
task_id: TaskId,
) -> DistResult<SendableRecordBatchStream>;
async fn get_jobs(
&self,
node_id: NodeId,
job_ids: Option<Vec<JobId>>,
) -> DistResult<HashMap<StageId, StageInfo>>;
async fn cleanup_jobs(&self, node_id: NodeId, job_ids: Vec<JobId>) -> DistResult<()>;
}
pub struct ScheduledTasks {
pub stage_plans: HashMap<StageId, Arc<dyn ExecutionPlan>>,
pub task_ids: Vec<TaskId>,
pub job_task_distribution: Arc<HashMap<TaskId, NodeId>>,
pub job_meta: Arc<HashMap<String, String>>,
}
impl ScheduledTasks {
pub fn new(
stage_plans: HashMap<StageId, Arc<dyn ExecutionPlan>>,
task_ids: Vec<TaskId>,
job_task_distribution: Arc<HashMap<TaskId, NodeId>>,
job_meta: Arc<HashMap<String, String>>,
) -> Self {
ScheduledTasks {
stage_plans,
task_ids,
job_task_distribution,
job_meta,
}
}
pub fn job_id(&self) -> DistResult<JobId> {
self.task_ids
.first()
.map(|task_id| task_id.job_id.clone())
.ok_or_else(|| DistError::internal("ScheduledTasks has no task_ids".to_string()))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StageInfo {
pub created_at_ms: i64,
pub assigned_partitions: HashSet<usize>,
pub task_set_infos: Vec<TaskSetInfo>,
pub job_meta: Arc<HashMap<String, String>>,
}
impl StageInfo {
pub fn from_stage_state(stage_state: &StageState) -> Self {
let task_set_infos = stage_state
.task_sets
.iter()
.map(TaskSetInfo::from_task_set)
.collect();
StageInfo {
created_at_ms: stage_state.created_at_ms,
assigned_partitions: stage_state.assigned_partitions.clone(),
task_set_infos,
job_meta: stage_state.job_meta.clone(),
}
}
pub fn merge(&mut self, other: &StageInfo) {
self.assigned_partitions.extend(&other.assigned_partitions);
self.task_set_infos.extend(other.task_set_infos.clone());
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskSetInfo {
pub running_partitions: HashSet<usize>,
pub dropped_partitions: HashMap<usize, TaskMetrics>,
}
impl TaskSetInfo {
pub fn from_task_set(task_set: &TaskSet) -> Self {
TaskSetInfo {
running_partitions: task_set.running_partitions.keys().copied().collect(),
dropped_partitions: task_set.dropped_partitions.clone(),
}
}
}