ballista_executor/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc = include_str!("../README.md")]
19
20pub mod collect;
21#[cfg(feature = "build-binary")]
22pub mod config;
23pub mod execution_engine;
24pub mod execution_loop;
25pub mod executor;
26pub mod executor_process;
27pub mod executor_server;
28pub mod flight_service;
29pub mod metrics;
30pub mod shutdown;
31pub mod terminate;
32
33mod cpu_bound_executor;
34mod standalone;
35
36use ballista_core::error::BallistaError;
37use std::net::SocketAddr;
38
39pub use standalone::new_standalone_executor;
40pub use standalone::new_standalone_executor_from_builder;
41pub use standalone::new_standalone_executor_from_state;
42
43use log::info;
44
45use crate::shutdown::Shutdown;
46use ballista_core::serde::protobuf::{
47    task_status, FailedTask, OperatorMetricsSet, ShuffleWritePartition, SuccessfulTask,
48    TaskStatus,
49};
50use ballista_core::serde::scheduler::PartitionId;
51use ballista_core::utils::GrpcServerConfig;
52
53/// [ArrowFlightServerProvider] provides a function which creates a new Arrow Flight server.
54///
55/// The function should take two arguments:
56/// [SocketAddr] - the address to bind the server to
57/// [Shutdown] - a shutdown signal to gracefully shutdown the server
58/// [GrpcServerConfig] - the gRPC server configuration for timeout settings
59/// Returns a [tokio::task::JoinHandle] which will be registered as service handler
60///
61pub type ArrowFlightServerProvider = dyn Fn(
62        SocketAddr,
63        Shutdown,
64        GrpcServerConfig,
65    ) -> tokio::task::JoinHandle<Result<(), BallistaError>>
66    + Send
67    + Sync;
68
69#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
70pub struct TaskExecutionTimes {
71    launch_time: u64,
72    start_exec_time: u64,
73    end_exec_time: u64,
74}
75
76pub fn as_task_status(
77    execution_result: ballista_core::error::Result<Vec<ShuffleWritePartition>>,
78    executor_id: String,
79    task_id: usize,
80    stage_attempt_num: usize,
81    partition_id: PartitionId,
82    operator_metrics: Option<Vec<OperatorMetricsSet>>,
83    execution_times: TaskExecutionTimes,
84) -> TaskStatus {
85    let metrics = operator_metrics.unwrap_or_default();
86    match execution_result {
87        Ok(partitions) => {
88            info!(
89                "Task {:?} finished with operator_metrics array size {}",
90                task_id,
91                metrics.len()
92            );
93            TaskStatus {
94                task_id: task_id as u32,
95                job_id: partition_id.job_id,
96                stage_id: partition_id.stage_id as u32,
97                stage_attempt_num: stage_attempt_num as u32,
98                partition_id: partition_id.partition_id as u32,
99                launch_time: execution_times.launch_time,
100                start_exec_time: execution_times.start_exec_time,
101                end_exec_time: execution_times.end_exec_time,
102                metrics,
103                status: Some(task_status::Status::Successful(SuccessfulTask {
104                    executor_id,
105                    partitions,
106                })),
107            }
108        }
109        Err(e) => {
110            let error_msg = e.to_string();
111            info!("Task {task_id:?} failed: {error_msg}");
112
113            TaskStatus {
114                task_id: task_id as u32,
115                job_id: partition_id.job_id,
116                stage_id: partition_id.stage_id as u32,
117                stage_attempt_num: stage_attempt_num as u32,
118                partition_id: partition_id.partition_id as u32,
119                launch_time: execution_times.launch_time,
120                start_exec_time: execution_times.start_exec_time,
121                end_exec_time: execution_times.end_exec_time,
122                metrics,
123                status: Some(task_status::Status::Failed(FailedTask::from(e))),
124            }
125        }
126    }
127}