datafusion_dist/
error.rs

1use std::{error::Error, fmt::Display, panic::Location};
2
3use arrow::error::ArrowError;
4use datafusion_common::DataFusionError;
5use tokio::task::JoinError;
6
7pub type DistResult<T> = Result<T, DistError>;
8
9#[derive(Debug)]
10pub enum DistError {
11    Arrow(ArrowError, &'static Location<'static>),
12    DataFusion(DataFusionError, &'static Location<'static>),
13    Tokio(JoinError, &'static Location<'static>),
14    Network(Box<dyn Error + Send + Sync>, &'static Location<'static>),
15    Plan(String, &'static Location<'static>),
16    Schedule(String, &'static Location<'static>),
17    Internal(String, &'static Location<'static>),
18    Cluster(Box<dyn Error + Send + Sync>, &'static Location<'static>),
19}
20
21impl DistError {
22    #[track_caller]
23    pub fn network(err: Box<dyn Error + Send + Sync>) -> Self {
24        DistError::Network(err, Location::caller())
25    }
26
27    #[track_caller]
28    pub fn schedule(msg: impl Into<String>) -> Self {
29        DistError::Schedule(msg.into(), Location::caller())
30    }
31
32    #[track_caller]
33    pub fn internal(msg: impl Into<String>) -> Self {
34        DistError::Internal(msg.into(), Location::caller())
35    }
36
37    #[track_caller]
38    pub fn cluster(err: Box<dyn Error + Send + Sync>) -> Self {
39        DistError::Cluster(err, Location::caller())
40    }
41}
42
43impl Display for DistError {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            DistError::Arrow(err, loc) => write!(f, "Arrow error: {err} at {loc}"),
47            DistError::DataFusion(err, loc) => write!(f, "DataFusion error: {err} at {loc}"),
48            DistError::Tokio(err, loc) => write!(f, "Tokio error: {err} at {loc}"),
49            DistError::Network(err, loc) => write!(f, "Network error: {err} at {loc}"),
50            DistError::Plan(msg, loc) => write!(f, "Plan error: {msg} at {loc}"),
51            DistError::Schedule(msg, loc) => write!(f, "Schedule error: {msg} at {loc}"),
52            DistError::Internal(msg, loc) => write!(f, "Internal error: {msg} at {loc}"),
53            DistError::Cluster(err, _) => write!(f, "Cluster error: {}", err),
54        }
55    }
56}
57
58impl Error for DistError {
59    fn source(&self) -> Option<&(dyn Error + 'static)> {
60        match self {
61            DistError::Arrow(err, _) => Some(err),
62            DistError::DataFusion(err, _) => Some(err),
63            DistError::Tokio(err, _) => Some(err),
64            DistError::Network(err, _) => Some(err.as_ref()),
65            DistError::Plan(_, _) => None,
66            DistError::Schedule(_, _) => None,
67            DistError::Internal(_, _) => None,
68            DistError::Cluster(err, _) => Some(err.as_ref()),
69        }
70    }
71}
72
73impl From<ArrowError> for DistError {
74    #[track_caller]
75    fn from(value: ArrowError) -> Self {
76        DistError::Arrow(value, Location::caller())
77    }
78}
79
80impl From<DataFusionError> for DistError {
81    #[track_caller]
82    fn from(err: DataFusionError) -> Self {
83        match err {
84            DataFusionError::External(external) if external.is::<DistError>() => {
85                let dist = external
86                    .downcast::<DistError>()
87                    .expect("Datafusion external error can not be downcasted to DistError");
88                *dist
89            }
90            e => DistError::DataFusion(e, Location::caller()),
91        }
92    }
93}
94
95impl From<JoinError> for DistError {
96    #[track_caller]
97    fn from(err: JoinError) -> Self {
98        DistError::Tokio(err, Location::caller())
99    }
100}
101
102impl From<DistError> for DataFusionError {
103    fn from(value: DistError) -> Self {
104        match value {
105            DistError::DataFusion(err, _) => err,
106            v => DataFusionError::External(Box::new(v)),
107        }
108    }
109}