datafusion_dist/
error.rs

1use std::{error::Error, fmt::Display, panic::Location};
2
3use datafusion::{arrow::error::ArrowError, error::DataFusionError};
4use tokio::task::JoinError;
5
6pub type DistResult<T> = Result<T, DistError>;
7
8#[derive(Debug)]
9pub enum DistError {
10    Arrow(ArrowError, &'static Location<'static>),
11    DataFusion(DataFusionError, &'static Location<'static>),
12    Tokio(JoinError, &'static Location<'static>),
13    Network(Box<dyn Error + Send + Sync>, &'static Location<'static>),
14    Plan(String, &'static Location<'static>),
15    Schedule(String, &'static Location<'static>),
16    Internal(String, &'static Location<'static>),
17    Cluster(Box<dyn Error + Send + Sync>, &'static Location<'static>),
18}
19
20impl DistError {
21    #[track_caller]
22    pub fn network(err: Box<dyn Error + Send + Sync>) -> Self {
23        DistError::Network(err, Location::caller())
24    }
25
26    #[track_caller]
27    pub fn schedule(msg: impl Into<String>) -> Self {
28        DistError::Schedule(msg.into(), Location::caller())
29    }
30
31    #[track_caller]
32    pub fn internal(msg: impl Into<String>) -> Self {
33        DistError::Internal(msg.into(), Location::caller())
34    }
35
36    #[track_caller]
37    pub fn cluster(err: Box<dyn Error + Send + Sync>) -> Self {
38        DistError::Cluster(err, Location::caller())
39    }
40}
41
42impl Display for DistError {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        match self {
45            DistError::Arrow(err, loc) => write!(f, "Arrow error: {err} at {loc}"),
46            DistError::DataFusion(err, loc) => write!(f, "DataFusion error: {err} at {loc}"),
47            DistError::Tokio(err, loc) => write!(f, "Tokio error: {err} at {loc}"),
48            DistError::Network(err, loc) => write!(f, "Network error: {err} at {loc}"),
49            DistError::Plan(msg, loc) => write!(f, "Plan error: {msg} at {loc}"),
50            DistError::Schedule(msg, loc) => write!(f, "Schedule error: {msg} at {loc}"),
51            DistError::Internal(msg, loc) => write!(f, "Internal error: {msg} at {loc}"),
52            DistError::Cluster(err, _) => write!(f, "Cluster error: {}", err),
53        }
54    }
55}
56
57impl Error for DistError {
58    fn source(&self) -> Option<&(dyn Error + 'static)> {
59        match self {
60            DistError::Arrow(err, _) => Some(err),
61            DistError::DataFusion(err, _) => Some(err),
62            DistError::Tokio(err, _) => Some(err),
63            DistError::Network(err, _) => Some(err.as_ref()),
64            DistError::Plan(_, _) => None,
65            DistError::Schedule(_, _) => None,
66            DistError::Internal(_, _) => None,
67            DistError::Cluster(err, _) => Some(err.as_ref()),
68        }
69    }
70}
71
72impl From<ArrowError> for DistError {
73    #[track_caller]
74    fn from(value: ArrowError) -> Self {
75        DistError::Arrow(value, Location::caller())
76    }
77}
78
79impl From<DataFusionError> for DistError {
80    #[track_caller]
81    fn from(err: DataFusionError) -> Self {
82        match err {
83            DataFusionError::External(external) if external.is::<DistError>() => {
84                let dist = external
85                    .downcast::<DistError>()
86                    .expect("Datafusion external error can not be downcasted to DistError");
87                *dist
88            }
89            e => DistError::DataFusion(e, Location::caller()),
90        }
91    }
92}
93
94impl From<JoinError> for DistError {
95    #[track_caller]
96    fn from(err: JoinError) -> Self {
97        DistError::Tokio(err, Location::caller())
98    }
99}
100
101impl From<DistError> for DataFusionError {
102    fn from(value: DistError) -> Self {
103        match value {
104            DistError::DataFusion(err, _) => err,
105            v => DataFusionError::External(Box::new(v)),
106        }
107    }
108}