datafusion_dist/
network.rs

1use std::{
2    collections::{HashMap, HashSet},
3    fmt::Debug,
4    sync::Arc,
5};
6
7use datafusion_physical_plan::ExecutionPlan;
8use serde::{Deserialize, Serialize};
9use uuid::Uuid;
10
11use crate::{
12    DistError, DistResult, RecordBatchStream,
13    cluster::NodeId,
14    planner::{StageId, TaskId},
15    runtime::{StageState, TaskMetrics, TaskSet},
16};
17
18#[async_trait::async_trait]
19pub trait DistNetwork: Debug + Send + Sync {
20    fn local_node(&self) -> NodeId;
21
22    // Send task plan
23    async fn send_tasks(&self, node_id: NodeId, scheduled_tasks: ScheduledTasks) -> DistResult<()>;
24
25    // Execute task plan
26    async fn execute_task(&self, node_id: NodeId, task_id: TaskId)
27    -> DistResult<RecordBatchStream>;
28
29    async fn get_job_status(
30        &self,
31        node_id: NodeId,
32        job_id: Option<Uuid>,
33    ) -> DistResult<HashMap<StageId, StageInfo>>;
34
35    async fn cleanup_job(&self, node_id: NodeId, job_id: Uuid) -> DistResult<()>;
36}
37
38pub struct ScheduledTasks {
39    pub stage_plans: HashMap<StageId, Arc<dyn ExecutionPlan>>,
40    pub task_ids: Vec<TaskId>,
41    pub job_task_distribution: Arc<HashMap<TaskId, NodeId>>,
42}
43
44impl ScheduledTasks {
45    pub fn new(
46        stage_plans: HashMap<StageId, Arc<dyn ExecutionPlan>>,
47        task_ids: Vec<TaskId>,
48        job_task_distribution: Arc<HashMap<TaskId, NodeId>>,
49    ) -> Self {
50        ScheduledTasks {
51            stage_plans,
52            task_ids,
53            job_task_distribution,
54        }
55    }
56
57    pub fn job_id(&self) -> DistResult<Uuid> {
58        self.task_ids
59            .first()
60            .map(|task_id| task_id.job_id)
61            .ok_or_else(|| DistError::internal("ScheduledTasks has no task_ids".to_string()))
62    }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct StageInfo {
67    pub create_at_ms: i64,
68    pub assigned_partitions: HashSet<usize>,
69    pub task_set_infos: Vec<TaskSetInfo>,
70}
71
72impl StageInfo {
73    pub fn from_stage_state(stage_state: &StageState) -> Self {
74        let task_set_infos = stage_state
75            .task_sets
76            .iter()
77            .map(TaskSetInfo::from_task_set)
78            .collect();
79
80        StageInfo {
81            create_at_ms: stage_state.create_at_ms,
82            assigned_partitions: stage_state.assigned_partitions.clone(),
83            task_set_infos,
84        }
85    }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct TaskSetInfo {
90    pub running_partitions: HashSet<usize>,
91    pub dropped_partitions: HashMap<usize, TaskMetrics>,
92}
93
94impl TaskSetInfo {
95    pub fn from_task_set(task_set: &TaskSet) -> Self {
96        TaskSetInfo {
97            running_partitions: task_set.running_partitions.clone(),
98            dropped_partitions: task_set.dropped_partitions.clone(),
99        }
100    }
101}