1use std::{error::Error, fmt::Display, panic::Location};
2
3use datafusion::{arrow::error::ArrowError, error::DataFusionError};
4use tokio::task::JoinError;
5
6pub type DistResult<T> = Result<T, DistError>;
7
8#[derive(Debug)]
9pub enum DistError {
10 Arrow(ArrowError, &'static Location<'static>),
11 DataFusion(DataFusionError, &'static Location<'static>),
12 Tokio(JoinError, &'static Location<'static>),
13 Network(Box<dyn Error + Send + Sync>, &'static Location<'static>),
14 Plan(String, &'static Location<'static>),
15 Schedule(String, &'static Location<'static>),
16 Internal(String, &'static Location<'static>),
17 Cluster(Box<dyn Error + Send + Sync>, &'static Location<'static>),
18}
19
20impl DistError {
21 #[track_caller]
22 pub fn network(err: Box<dyn Error + Send + Sync>) -> Self {
23 DistError::Network(err, Location::caller())
24 }
25
26 #[track_caller]
27 pub fn schedule(msg: impl Into<String>) -> Self {
28 DistError::Schedule(msg.into(), Location::caller())
29 }
30
31 #[track_caller]
32 pub fn internal(msg: impl Into<String>) -> Self {
33 DistError::Internal(msg.into(), Location::caller())
34 }
35
36 #[track_caller]
37 pub fn cluster(err: Box<dyn Error + Send + Sync>) -> Self {
38 DistError::Cluster(err, Location::caller())
39 }
40}
41
42impl Display for DistError {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 match self {
45 DistError::Arrow(err, loc) => write!(f, "Arrow error: {err} at {loc}"),
46 DistError::DataFusion(err, loc) => write!(f, "DataFusion error: {err} at {loc}"),
47 DistError::Tokio(err, loc) => write!(f, "Tokio error: {err} at {loc}"),
48 DistError::Network(err, loc) => write!(f, "Network error: {err} at {loc}"),
49 DistError::Plan(msg, loc) => write!(f, "Plan error: {msg} at {loc}"),
50 DistError::Schedule(msg, loc) => write!(f, "Schedule error: {msg} at {loc}"),
51 DistError::Internal(msg, loc) => write!(f, "Internal error: {msg} at {loc}"),
52 DistError::Cluster(err, _) => write!(f, "Cluster error: {}", err),
53 }
54 }
55}
56
57impl Error for DistError {
58 fn source(&self) -> Option<&(dyn Error + 'static)> {
59 match self {
60 DistError::Arrow(err, _) => Some(err),
61 DistError::DataFusion(err, _) => Some(err),
62 DistError::Tokio(err, _) => Some(err),
63 DistError::Network(err, _) => Some(err.as_ref()),
64 DistError::Plan(_, _) => None,
65 DistError::Schedule(_, _) => None,
66 DistError::Internal(_, _) => None,
67 DistError::Cluster(err, _) => Some(err.as_ref()),
68 }
69 }
70}
71
72impl From<ArrowError> for DistError {
73 #[track_caller]
74 fn from(value: ArrowError) -> Self {
75 DistError::Arrow(value, Location::caller())
76 }
77}
78
79impl From<DataFusionError> for DistError {
80 #[track_caller]
81 fn from(err: DataFusionError) -> Self {
82 match err {
83 DataFusionError::External(external) if external.is::<DistError>() => {
84 let dist = external
85 .downcast::<DistError>()
86 .expect("Datafusion external error can not be downcasted to DistError");
87 *dist
88 }
89 e => DistError::DataFusion(e, Location::caller()),
90 }
91 }
92}
93
94impl From<JoinError> for DistError {
95 #[track_caller]
96 fn from(err: JoinError) -> Self {
97 DistError::Tokio(err, Location::caller())
98 }
99}
100
101impl From<DistError> for DataFusionError {
102 fn from(value: DistError) -> Self {
103 match value {
104 DistError::DataFusion(err, _) => err,
105 v => DataFusionError::External(Box::new(v)),
106 }
107 }
108}