use std::{
error::Error,
fmt::{Display, Formatter},
io, result,
};
use crate::serde::protobuf::failed_task::FailedReason;
use crate::serde::protobuf::{ExecutionError, FailedTask, FetchPartitionError, IoError};
use datafusion::arrow::error::ArrowError;
use datafusion::error::DataFusionError;
use futures::future::Aborted;
use sqlparser::parser;
pub type Result<T> = result::Result<T, BallistaError>;
#[derive(Debug)]
pub enum BallistaError {
NotImplemented(String),
General(String),
Internal(String),
ArrowError(ArrowError),
DataFusionError(DataFusionError),
SqlError(parser::ParserError),
IoError(io::Error),
TonicError(tonic::transport::Error),
GrpcError(tonic::Status),
GrpcConnectionError(String),
TokioError(tokio::task::JoinError),
GrpcActionError(String),
FetchFailed(String, usize, usize, String),
Cancelled,
}
#[allow(clippy::from_over_into)]
impl<T> Into<Result<T>> for BallistaError {
fn into(self) -> Result<T> {
Err(self)
}
}
pub fn ballista_error(message: &str) -> BallistaError {
BallistaError::General(message.to_owned())
}
impl From<String> for BallistaError {
fn from(e: String) -> Self {
BallistaError::General(e)
}
}
impl From<ArrowError> for BallistaError {
fn from(e: ArrowError) -> Self {
match e {
ArrowError::ExternalError(e)
if e.downcast_ref::<BallistaError>().is_some() =>
{
*e.downcast::<BallistaError>().unwrap()
}
ArrowError::ExternalError(e)
if e.downcast_ref::<DataFusionError>().is_some() =>
{
BallistaError::DataFusionError(*e.downcast::<DataFusionError>().unwrap())
}
other => BallistaError::ArrowError(other),
}
}
}
impl From<parser::ParserError> for BallistaError {
fn from(e: parser::ParserError) -> Self {
BallistaError::SqlError(e)
}
}
impl From<DataFusionError> for BallistaError {
fn from(e: DataFusionError) -> Self {
BallistaError::DataFusionError(e)
}
}
impl From<io::Error> for BallistaError {
fn from(e: io::Error) -> Self {
BallistaError::IoError(e)
}
}
impl From<tonic::transport::Error> for BallistaError {
fn from(e: tonic::transport::Error) -> Self {
BallistaError::TonicError(e)
}
}
impl From<tonic::Status> for BallistaError {
fn from(e: tonic::Status) -> Self {
BallistaError::GrpcError(e)
}
}
impl From<tokio::task::JoinError> for BallistaError {
fn from(e: tokio::task::JoinError) -> Self {
BallistaError::TokioError(e)
}
}
impl From<datafusion_proto::from_proto::Error> for BallistaError {
fn from(e: datafusion_proto::from_proto::Error) -> Self {
BallistaError::General(e.to_string())
}
}
impl From<datafusion_proto::to_proto::Error> for BallistaError {
fn from(e: datafusion_proto::to_proto::Error) -> Self {
BallistaError::General(e.to_string())
}
}
impl From<futures::future::Aborted> for BallistaError {
fn from(_: Aborted) -> Self {
BallistaError::Cancelled
}
}
impl Display for BallistaError {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match self {
BallistaError::NotImplemented(ref desc) => {
write!(f, "Not implemented: {}", desc)
}
BallistaError::General(ref desc) => write!(f, "General error: {}", desc),
BallistaError::ArrowError(ref desc) => write!(f, "Arrow error: {}", desc),
BallistaError::DataFusionError(ref desc) => {
write!(f, "DataFusion error: {:?}", desc)
}
BallistaError::SqlError(ref desc) => write!(f, "SQL error: {:?}", desc),
BallistaError::IoError(ref desc) => write!(f, "IO error: {}", desc),
BallistaError::TonicError(desc) => write!(f, "Tonic error: {}", desc),
BallistaError::GrpcError(desc) => write!(f, "Grpc error: {}", desc),
BallistaError::GrpcConnectionError(desc) => {
write!(f, "Grpc connection error: {}", desc)
}
BallistaError::Internal(desc) => {
write!(f, "Internal Ballista error: {}", desc)
}
BallistaError::TokioError(desc) => write!(f, "Tokio join error: {}", desc),
BallistaError::GrpcActionError(desc) => {
write!(f, "Grpc Execute Action error: {}", desc)
}
BallistaError::FetchFailed(executor_id, map_stage, map_partition, desc) => {
write!(
f,
"Shuffle fetch partition error from Executor {}, map_stage {}, \
map_partition {}, error desc: {}",
executor_id, map_stage, map_partition, desc
)
}
BallistaError::Cancelled => write!(f, "Task cancelled"),
}
}
}
impl From<BallistaError> for FailedTask {
fn from(e: BallistaError) -> Self {
match e {
BallistaError::FetchFailed(
executor_id,
map_stage_id,
map_partition_id,
desc,
) => {
FailedTask {
error: desc,
retryable: false,
count_to_failures: false,
failed_reason: Some(FailedReason::FetchPartitionError(
FetchPartitionError {
executor_id,
map_stage_id: map_stage_id as u32,
map_partition_id: map_partition_id as u32,
},
)),
}
}
BallistaError::IoError(io) => {
FailedTask {
error: format!("Task failed due to Ballista IO error: {:?}", io),
retryable: true,
count_to_failures: true,
failed_reason: Some(FailedReason::IoError(IoError {})),
}
}
BallistaError::DataFusionError(DataFusionError::IoError(io)) => {
FailedTask {
error: format!("Task failed due to DataFusion IO error: {:?}", io),
retryable: true,
count_to_failures: true,
failed_reason: Some(FailedReason::IoError(IoError {})),
}
}
other => FailedTask {
error: format!("Task failed due to runtime execution error: {:?}", other),
retryable: false,
count_to_failures: false,
failed_reason: Some(FailedReason::ExecutionError(ExecutionError {})),
},
}
}
}
impl Error for BallistaError {}