use std::error::Error;
use std::fmt::{Display, Formatter};
use std::io;
use std::result;
use std::sync::Arc;
use crate::{Column, DFSchema};
#[cfg(feature = "avro")]
use apache_avro::Error as AvroError;
use arrow::error::ArrowError;
#[cfg(feature = "jit")]
use cranelift_module::ModuleError;
#[cfg(feature = "parquet")]
use parquet::errors::ParquetError;
use sqlparser::parser::ParserError;
pub type Result<T, E = DataFusionError> = result::Result<T, E>;
pub type SharedResult<T> = result::Result<T, Arc<DataFusionError>>;
pub type GenericError = Box<dyn Error + Send + Sync>;
#[derive(Debug)]
pub enum DataFusionError {
ArrowError(ArrowError),
#[cfg(feature = "parquet")]
ParquetError(ParquetError),
#[cfg(feature = "avro")]
AvroError(AvroError),
#[cfg(feature = "object_store")]
ObjectStore(object_store::Error),
IoError(io::Error),
SQL(ParserError),
NotImplemented(String),
Internal(String),
Plan(String),
SchemaError(SchemaError),
Execution(String),
ResourcesExhausted(String),
External(GenericError),
#[cfg(feature = "jit")]
JITError(ModuleError),
Context(String, Box<DataFusionError>),
Substrait(String),
}
#[macro_export]
macro_rules! context {
($desc:expr, $err:expr) => {
datafusion_common::DataFusionError::Context(
format!("{} at {}:{}", $desc, file!(), line!()),
Box::new($err),
)
};
}
#[macro_export]
macro_rules! plan_err {
($desc:expr) => {
Err(datafusion_common::DataFusionError::Plan(format!(
"{} at {}:{}",
$desc,
file!(),
line!()
)))
};
}
#[derive(Debug)]
pub enum SchemaError {
AmbiguousReference {
qualifier: Option<String>,
name: String,
},
DuplicateQualifiedField { qualifier: String, name: String },
DuplicateUnqualifiedField { name: String },
FieldNotFound {
field: Column,
valid_fields: Vec<Column>,
},
}
pub fn field_not_found(
qualifier: Option<String>,
name: &str,
schema: &DFSchema,
) -> DataFusionError {
DataFusionError::SchemaError(SchemaError::FieldNotFound {
field: Column::new(qualifier, name),
valid_fields: schema
.fields()
.iter()
.map(|f| f.qualified_column())
.collect(),
})
}
impl Display for SchemaError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::FieldNotFound {
field,
valid_fields,
} => {
write!(f, "No field named ")?;
if let Some(q) = &field.relation {
write!(f, "'{}'.'{}'", q, field.name)?;
} else {
write!(f, "'{}'", field.name)?;
}
if !valid_fields.is_empty() {
write!(
f,
". Valid fields are {}",
valid_fields
.iter()
.map(|field| {
if let Some(q) = &field.relation {
format!("'{}'.'{}'", q, field.name)
} else {
format!("'{}'", field.name)
}
})
.collect::<Vec<String>>()
.join(", ")
)?;
}
write!(f, ".")
}
Self::DuplicateQualifiedField { qualifier, name } => {
write!(
f,
"Schema contains duplicate qualified field name '{qualifier}'.'{name}'"
)
}
Self::DuplicateUnqualifiedField { name } => {
write!(
f,
"Schema contains duplicate unqualified field name '{name}'"
)
}
Self::AmbiguousReference { qualifier, name } => {
if let Some(q) = qualifier {
write!(f, "Schema contains qualified field name '{q}'.'{name}' and unqualified field name '{name}' which would be ambiguous")
} else {
write!(f, "Ambiguous reference to unqualified field '{name}'")
}
}
}
}
}
impl Error for SchemaError {}
impl From<io::Error> for DataFusionError {
fn from(e: io::Error) -> Self {
DataFusionError::IoError(e)
}
}
impl From<ArrowError> for DataFusionError {
fn from(e: ArrowError) -> Self {
DataFusionError::ArrowError(e)
}
}
impl From<DataFusionError> for ArrowError {
fn from(e: DataFusionError) -> Self {
match e {
DataFusionError::ArrowError(e) => e,
DataFusionError::External(e) => ArrowError::ExternalError(e),
other => ArrowError::ExternalError(Box::new(other)),
}
}
}
#[cfg(feature = "parquet")]
impl From<ParquetError> for DataFusionError {
fn from(e: ParquetError) -> Self {
DataFusionError::ParquetError(e)
}
}
#[cfg(feature = "avro")]
impl From<AvroError> for DataFusionError {
fn from(e: AvroError) -> Self {
DataFusionError::AvroError(e)
}
}
#[cfg(feature = "object_store")]
impl From<object_store::Error> for DataFusionError {
fn from(e: object_store::Error) -> Self {
DataFusionError::ObjectStore(e)
}
}
#[cfg(feature = "object_store")]
impl From<object_store::path::Error> for DataFusionError {
fn from(e: object_store::path::Error) -> Self {
DataFusionError::ObjectStore(e.into())
}
}
impl From<ParserError> for DataFusionError {
fn from(e: ParserError) -> Self {
DataFusionError::SQL(e)
}
}
#[cfg(feature = "jit")]
impl From<ModuleError> for DataFusionError {
fn from(e: ModuleError) -> Self {
DataFusionError::JITError(e)
}
}
impl From<GenericError> for DataFusionError {
fn from(err: GenericError) -> Self {
DataFusionError::External(err)
}
}
impl Display for DataFusionError {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
match *self {
DataFusionError::ArrowError(ref desc) => write!(f, "Arrow error: {desc}"),
#[cfg(feature = "parquet")]
DataFusionError::ParquetError(ref desc) => {
write!(f, "Parquet error: {desc}")
}
#[cfg(feature = "avro")]
DataFusionError::AvroError(ref desc) => {
write!(f, "Avro error: {desc}")
}
DataFusionError::IoError(ref desc) => write!(f, "IO error: {desc}"),
DataFusionError::SQL(ref desc) => {
write!(f, "SQL error: {desc:?}")
}
DataFusionError::NotImplemented(ref desc) => {
write!(f, "This feature is not implemented: {desc}")
}
DataFusionError::Internal(ref desc) => {
write!(f, "Internal error: {desc}. This was likely caused by a bug in DataFusion's \
code and we would welcome that you file an bug report in our issue tracker")
}
DataFusionError::Plan(ref desc) => {
write!(f, "Error during planning: {desc}")
}
DataFusionError::SchemaError(ref desc) => {
write!(f, "Schema error: {desc}")
}
DataFusionError::Execution(ref desc) => {
write!(f, "Execution error: {desc}")
}
DataFusionError::ResourcesExhausted(ref desc) => {
write!(f, "Resources exhausted: {desc}")
}
DataFusionError::External(ref desc) => {
write!(f, "External error: {desc}")
}
#[cfg(feature = "jit")]
DataFusionError::JITError(ref desc) => {
write!(f, "JIT error: {desc}")
}
#[cfg(feature = "object_store")]
DataFusionError::ObjectStore(ref desc) => {
write!(f, "Object Store error: {desc}")
}
DataFusionError::Context(ref desc, ref err) => {
write!(f, "{}\ncaused by\n{}", desc, *err)
}
DataFusionError::Substrait(ref desc) => {
write!(f, "Substrait error: {desc}")
}
}
}
}
impl Error for DataFusionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
DataFusionError::ArrowError(e) => Some(e),
#[cfg(feature = "parquet")]
DataFusionError::ParquetError(e) => Some(e),
#[cfg(feature = "avro")]
DataFusionError::AvroError(e) => Some(e),
#[cfg(feature = "object_store")]
DataFusionError::ObjectStore(e) => Some(e),
DataFusionError::IoError(e) => Some(e),
DataFusionError::SQL(e) => Some(e),
DataFusionError::NotImplemented(_) => None,
DataFusionError::Internal(_) => None,
DataFusionError::Plan(_) => None,
DataFusionError::SchemaError(e) => Some(e),
DataFusionError::Execution(_) => None,
DataFusionError::ResourcesExhausted(_) => None,
DataFusionError::External(e) => Some(e.as_ref()),
#[cfg(feature = "jit")]
DataFusionError::JITError(e) => Some(e),
DataFusionError::Context(_, e) => Some(e.as_ref()),
DataFusionError::Substrait(_) => None,
}
}
}
impl From<DataFusionError> for io::Error {
fn from(e: DataFusionError) -> Self {
io::Error::new(io::ErrorKind::Other, e)
}
}
impl DataFusionError {
pub fn find_root(&self) -> &Self {
let mut last_datafusion_error = self;
let mut root_error: &dyn Error = self;
while let Some(source) = find_source(root_error) {
root_error = source;
if let Some(e) = root_error.downcast_ref::<DataFusionError>() {
last_datafusion_error = e;
}
}
last_datafusion_error
}
}
fn find_source<'a>(e: &'a (dyn Error + 'static)) -> Option<&'a (dyn Error + 'static)> {
if let Some(e) = e.downcast_ref::<ArrowError>() {
return if let ArrowError::ExternalError(e) = e {
Some(e.as_ref())
} else {
None
};
}
if let Some(e) = e.downcast_ref::<Arc<dyn Error + 'static>>() {
return Some(e.as_ref());
}
if let Some(e) = e.downcast_ref::<Arc<ArrowError>>() {
return Some(e.as_ref());
}
if let Some(e) = e.downcast_ref::<Arc<DataFusionError>>() {
return Some(e.as_ref());
}
e.source()
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use crate::error::DataFusionError;
use arrow::error::ArrowError;
#[test]
fn arrow_error_to_datafusion() {
let res = return_arrow_error().unwrap_err();
assert_eq!(
res.to_string(),
"External error: Error during planning: foo"
);
}
#[test]
fn datafusion_error_to_arrow() {
let res = return_datafusion_error().unwrap_err();
assert_eq!(res.to_string(), "Arrow error: Schema error: bar");
}
#[test]
fn test_find_root_error() {
do_root_test(
DataFusionError::Context(
"it happened!".to_string(),
Box::new(DataFusionError::ResourcesExhausted("foo".to_string())),
),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::ArrowError(ArrowError::ExternalError(Box::new(
DataFusionError::ResourcesExhausted("foo".to_string()),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::External(Box::new(DataFusionError::ResourcesExhausted(
"foo".to_string(),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::External(Box::new(ArrowError::ExternalError(Box::new(
DataFusionError::ResourcesExhausted("foo".to_string()),
)))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::ArrowError(ArrowError::ExternalError(Box::new(
ArrowError::ExternalError(Box::new(DataFusionError::ResourcesExhausted(
"foo".to_string(),
))),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::External(Box::new(Arc::new(
DataFusionError::ResourcesExhausted("foo".to_string()),
))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
do_root_test(
DataFusionError::External(Box::new(Arc::new(ArrowError::ExternalError(
Box::new(DataFusionError::ResourcesExhausted("foo".to_string())),
)))),
DataFusionError::ResourcesExhausted("foo".to_string()),
);
}
#[allow(clippy::try_err)]
fn return_arrow_error() -> arrow::error::Result<()> {
Err(DataFusionError::Plan("foo".to_string()))?;
Ok(())
}
#[allow(clippy::try_err)]
fn return_datafusion_error() -> crate::error::Result<()> {
Err(ArrowError::SchemaError("bar".to_string()))?;
Ok(())
}
fn do_root_test(e: DataFusionError, exp: DataFusionError) {
let e = e.find_root();
assert_eq!(e.to_string(), exp.to_string(),);
assert_eq!(std::mem::discriminant(e), std::mem::discriminant(&exp),)
}
}
#[macro_export]
macro_rules! internal_err {
($($arg:tt)*) => {
Err(DataFusionError::Internal(format!($($arg)*)))
};
}
#[macro_export]
macro_rules! unwrap_or_internal_err {
($Value: ident) => {
$Value.ok_or_else(|| {
DataFusionError::Internal(format!(
"{} should not be None",
stringify!($Value)
))
})?
};
}