datafusion_dist/
network.rs1use std::{
2 collections::{HashMap, HashSet},
3 fmt::Debug,
4 sync::Arc,
5};
6
7use datafusion_physical_plan::ExecutionPlan;
8use serde::{Deserialize, Serialize};
9use uuid::Uuid;
10
11use crate::{
12 DistError, DistResult, RecordBatchStream,
13 cluster::NodeId,
14 planner::{StageId, TaskId},
15 runtime::{StageState, TaskMetrics, TaskSet},
16};
17
18#[async_trait::async_trait]
19pub trait DistNetwork: Debug + Send + Sync {
20 fn local_node(&self) -> NodeId;
21
22 async fn send_tasks(&self, node_id: NodeId, scheduled_tasks: ScheduledTasks) -> DistResult<()>;
24
25 async fn execute_task(&self, node_id: NodeId, task_id: TaskId)
27 -> DistResult<RecordBatchStream>;
28
29 async fn get_job_status(
30 &self,
31 node_id: NodeId,
32 job_id: Option<Uuid>,
33 ) -> DistResult<HashMap<StageId, StageInfo>>;
34
35 async fn cleanup_job(&self, node_id: NodeId, job_id: Uuid) -> DistResult<()>;
36}
37
38pub struct ScheduledTasks {
39 pub stage_plans: HashMap<StageId, Arc<dyn ExecutionPlan>>,
40 pub task_ids: Vec<TaskId>,
41 pub job_task_distribution: Arc<HashMap<TaskId, NodeId>>,
42}
43
44impl ScheduledTasks {
45 pub fn new(
46 stage_plans: HashMap<StageId, Arc<dyn ExecutionPlan>>,
47 task_ids: Vec<TaskId>,
48 job_task_distribution: Arc<HashMap<TaskId, NodeId>>,
49 ) -> Self {
50 ScheduledTasks {
51 stage_plans,
52 task_ids,
53 job_task_distribution,
54 }
55 }
56
57 pub fn job_id(&self) -> DistResult<Uuid> {
58 self.task_ids
59 .first()
60 .map(|task_id| task_id.job_id)
61 .ok_or_else(|| DistError::internal("ScheduledTasks has no task_ids".to_string()))
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct StageInfo {
67 pub create_at_ms: i64,
68 pub assigned_partitions: HashSet<usize>,
69 pub task_set_infos: Vec<TaskSetInfo>,
70}
71
72impl StageInfo {
73 pub fn from_stage_state(stage_state: &StageState) -> Self {
74 let task_set_infos = stage_state
75 .task_sets
76 .iter()
77 .map(TaskSetInfo::from_task_set)
78 .collect();
79
80 StageInfo {
81 create_at_ms: stage_state.create_at_ms,
82 assigned_partitions: stage_state.assigned_partitions.clone(),
83 task_set_infos,
84 }
85 }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct TaskSetInfo {
90 pub running_partitions: HashSet<usize>,
91 pub dropped_partitions: HashMap<usize, TaskMetrics>,
92}
93
94impl TaskSetInfo {
95 pub fn from_task_set(task_set: &TaskSet) -> Self {
96 TaskSetInfo {
97 running_partitions: task_set.running_partitions.clone(),
98 dropped_partitions: task_set.dropped_partitions.clone(),
99 }
100 }
101}