1use std::{error::Error, fmt::Display, panic::Location};
2
3use arrow::error::ArrowError;
4use datafusion_common::DataFusionError;
5use tokio::task::JoinError;
6
7pub type DistResult<T> = Result<T, DistError>;
8
9#[derive(Debug)]
10pub enum DistError {
11 Arrow(ArrowError, &'static Location<'static>),
12 DataFusion(DataFusionError, &'static Location<'static>),
13 Tokio(JoinError, &'static Location<'static>),
14 Network(Box<dyn Error + Send + Sync>, &'static Location<'static>),
15 Plan(String, &'static Location<'static>),
16 Schedule(String, &'static Location<'static>),
17 Internal(String, &'static Location<'static>),
18 Cluster(Box<dyn Error + Send + Sync>, &'static Location<'static>),
19}
20
21impl DistError {
22 #[track_caller]
23 pub fn network(err: Box<dyn Error + Send + Sync>) -> Self {
24 DistError::Network(err, Location::caller())
25 }
26
27 #[track_caller]
28 pub fn schedule(msg: impl Into<String>) -> Self {
29 DistError::Schedule(msg.into(), Location::caller())
30 }
31
32 #[track_caller]
33 pub fn internal(msg: impl Into<String>) -> Self {
34 DistError::Internal(msg.into(), Location::caller())
35 }
36
37 #[track_caller]
38 pub fn cluster(err: Box<dyn Error + Send + Sync>) -> Self {
39 DistError::Cluster(err, Location::caller())
40 }
41}
42
43impl Display for DistError {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 match self {
46 DistError::Arrow(err, loc) => write!(f, "Arrow error: {err} at {loc}"),
47 DistError::DataFusion(err, loc) => write!(f, "DataFusion error: {err} at {loc}"),
48 DistError::Tokio(err, loc) => write!(f, "Tokio error: {err} at {loc}"),
49 DistError::Network(err, loc) => write!(f, "Network error: {err} at {loc}"),
50 DistError::Plan(msg, loc) => write!(f, "Plan error: {msg} at {loc}"),
51 DistError::Schedule(msg, loc) => write!(f, "Schedule error: {msg} at {loc}"),
52 DistError::Internal(msg, loc) => write!(f, "Internal error: {msg} at {loc}"),
53 DistError::Cluster(err, _) => write!(f, "Cluster error: {}", err),
54 }
55 }
56}
57
58impl Error for DistError {
59 fn source(&self) -> Option<&(dyn Error + 'static)> {
60 match self {
61 DistError::Arrow(err, _) => Some(err),
62 DistError::DataFusion(err, _) => Some(err),
63 DistError::Tokio(err, _) => Some(err),
64 DistError::Network(err, _) => Some(err.as_ref()),
65 DistError::Plan(_, _) => None,
66 DistError::Schedule(_, _) => None,
67 DistError::Internal(_, _) => None,
68 DistError::Cluster(err, _) => Some(err.as_ref()),
69 }
70 }
71}
72
73impl From<ArrowError> for DistError {
74 #[track_caller]
75 fn from(value: ArrowError) -> Self {
76 DistError::Arrow(value, Location::caller())
77 }
78}
79
80impl From<DataFusionError> for DistError {
81 #[track_caller]
82 fn from(err: DataFusionError) -> Self {
83 match err {
84 DataFusionError::External(external) if external.is::<DistError>() => {
85 let dist = external
86 .downcast::<DistError>()
87 .expect("Datafusion external error can not be downcasted to DistError");
88 *dist
89 }
90 e => DistError::DataFusion(e, Location::caller()),
91 }
92 }
93}
94
95impl From<JoinError> for DistError {
96 #[track_caller]
97 fn from(err: JoinError) -> Self {
98 DistError::Tokio(err, Location::caller())
99 }
100}
101
102impl From<DistError> for DataFusionError {
103 fn from(value: DistError) -> Self {
104 match value {
105 DistError::DataFusion(err, _) => err,
106 v => DataFusionError::External(Box::new(v)),
107 }
108 }
109}