1#![doc = include_str!("../README.md")]
19
20pub mod collect;
21#[cfg(feature = "build-binary")]
22pub mod config;
23pub mod execution_engine;
24pub mod execution_loop;
25pub mod executor;
26pub mod executor_process;
27pub mod executor_server;
28pub mod flight_service;
29pub mod metrics;
30pub mod shutdown;
31pub mod terminate;
32
33mod cpu_bound_executor;
34mod standalone;
35
36use ballista_core::error::BallistaError;
37use std::net::SocketAddr;
38
39pub use standalone::new_standalone_executor;
40pub use standalone::new_standalone_executor_from_builder;
41pub use standalone::new_standalone_executor_from_state;
42
43use log::info;
44
45use crate::shutdown::Shutdown;
46use ballista_core::serde::protobuf::{
47 task_status, FailedTask, OperatorMetricsSet, ShuffleWritePartition, SuccessfulTask,
48 TaskStatus,
49};
50use ballista_core::serde::scheduler::PartitionId;
51use ballista_core::utils::GrpcServerConfig;
52
53pub type ArrowFlightServerProvider = dyn Fn(
62 SocketAddr,
63 Shutdown,
64 GrpcServerConfig,
65 ) -> tokio::task::JoinHandle<Result<(), BallistaError>>
66 + Send
67 + Sync;
68
69#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
70pub struct TaskExecutionTimes {
71 launch_time: u64,
72 start_exec_time: u64,
73 end_exec_time: u64,
74}
75
76pub fn as_task_status(
77 execution_result: ballista_core::error::Result<Vec<ShuffleWritePartition>>,
78 executor_id: String,
79 task_id: usize,
80 stage_attempt_num: usize,
81 partition_id: PartitionId,
82 operator_metrics: Option<Vec<OperatorMetricsSet>>,
83 execution_times: TaskExecutionTimes,
84) -> TaskStatus {
85 let metrics = operator_metrics.unwrap_or_default();
86 match execution_result {
87 Ok(partitions) => {
88 info!(
89 "Task {:?} finished with operator_metrics array size {}",
90 task_id,
91 metrics.len()
92 );
93 TaskStatus {
94 task_id: task_id as u32,
95 job_id: partition_id.job_id,
96 stage_id: partition_id.stage_id as u32,
97 stage_attempt_num: stage_attempt_num as u32,
98 partition_id: partition_id.partition_id as u32,
99 launch_time: execution_times.launch_time,
100 start_exec_time: execution_times.start_exec_time,
101 end_exec_time: execution_times.end_exec_time,
102 metrics,
103 status: Some(task_status::Status::Successful(SuccessfulTask {
104 executor_id,
105 partitions,
106 })),
107 }
108 }
109 Err(e) => {
110 let error_msg = e.to_string();
111 info!("Task {task_id:?} failed: {error_msg}");
112
113 TaskStatus {
114 task_id: task_id as u32,
115 job_id: partition_id.job_id,
116 stage_id: partition_id.stage_id as u32,
117 stage_attempt_num: stage_attempt_num as u32,
118 partition_id: partition_id.partition_id as u32,
119 launch_time: execution_times.launch_time,
120 start_exec_time: execution_times.start_exec_time,
121 end_exec_time: execution_times.end_exec_time,
122 metrics,
123 status: Some(task_status::Status::Failed(FailedTask::from(e))),
124 }
125 }
126 }
127}