spark_connect_core/
errors.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
//! Defines a [SparkError] for representing failures in various Spark operations.
//! Most of these are wrappers for tonic or arrow error messages
use std::fmt::{Debug, Display, Formatter};
use std::io::Write;

use std::error::Error;

use arrow::error::ArrowError;

#[cfg(feature = "datafusion")]
use datafusion::error::DataFusionError;
#[cfg(feature = "polars")]
use polars::error::PolarsError;

/// Different `Spark` types
#[derive(Debug)]
pub enum SparkError {
    /// Returned when functionality is not yet available.
    NotYetImplemented(String),
    ExternalError(Box<dyn Error + Send + Sync>),
    AnalysisException(String),
    IoError(String, std::io::Error),
    ArrowError(ArrowError),
    InvalidConnectionUrl(String),
}

impl SparkError {
    /// Wraps an external error in an `SparkError`.
    pub fn from_external_error(error: Box<dyn Error + Send + Sync>) -> Self {
        Self::ExternalError(error)
    }
}

impl From<std::io::Error> for SparkError {
    fn from(error: std::io::Error) -> Self {
        SparkError::IoError(error.to_string(), error)
    }
}

impl From<std::str::Utf8Error> for SparkError {
    fn from(error: std::str::Utf8Error) -> Self {
        SparkError::AnalysisException(error.to_string())
    }
}

impl From<std::string::FromUtf8Error> for SparkError {
    fn from(error: std::string::FromUtf8Error) -> Self {
        SparkError::AnalysisException(error.to_string())
    }
}

impl From<ArrowError> for SparkError {
    fn from(error: ArrowError) -> Self {
        SparkError::ArrowError(error)
    }
}

impl From<tonic::Status> for SparkError {
    fn from(status: tonic::Status) -> Self {
        SparkError::AnalysisException(status.message().to_string())
    }
}

impl From<serde_json::Error> for SparkError {
    fn from(value: serde_json::Error) -> Self {
        SparkError::AnalysisException(value.to_string())
    }
}

#[cfg(feature = "datafusion")]
impl From<DataFusionError> for SparkError {
    fn from(_value: DataFusionError) -> Self {
        SparkError::AnalysisException("Error converting to DataFusion DataFrame".to_string())
    }
}

#[cfg(feature = "polars")]
impl From<PolarsError> for SparkError {
    fn from(_value: PolarsError) -> Self {
        SparkError::AnalysisException("Error converting to Polars DataFrame".to_string())
    }
}

impl<W: Write> From<std::io::IntoInnerError<W>> for SparkError {
    fn from(error: std::io::IntoInnerError<W>) -> Self {
        SparkError::IoError(error.to_string(), error.into())
    }
}

impl Display for SparkError {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        match self {
            SparkError::ExternalError(source) => write!(f, "External error: {}", &source),
            SparkError::AnalysisException(desc) => write!(f, "Analysis error: {desc}"),
            SparkError::IoError(desc, _) => write!(f, "Io error: {desc}"),
            SparkError::ArrowError(desc) => write!(f, "Apache Arrow error: {desc}"),
            SparkError::NotYetImplemented(source) => write!(f, "Not yet implemented: {source}"),
            SparkError::InvalidConnectionUrl(val) => write!(f, "Invalid URL error: {val}"),
        }
    }
}

impl Error for SparkError {
    fn source(&self) -> Option<&(dyn Error + 'static)> {
        if let Self::ExternalError(e) = self {
            Some(e.as_ref())
        } else {
            None
        }
    }
}