datafusion-dist 0.3.0

A distributed streaming execution library for Apache DataFusion
Documentation
use std::{error::Error, fmt::Display, panic::Location};

use arrow::error::ArrowError;
use datafusion_common::DataFusionError;
use tokio::task::JoinError;

pub type DistResult<T> = Result<T, DistError>;

#[derive(Debug)]
pub enum DistError {
    Arrow(ArrowError, &'static Location<'static>),
    DataFusion(DataFusionError, &'static Location<'static>),
    Tokio(JoinError, &'static Location<'static>),
    Network(Box<dyn Error + Send + Sync>, &'static Location<'static>),
    Plan(String, &'static Location<'static>),
    Schedule(String, &'static Location<'static>),
    Internal(String, &'static Location<'static>),
    Cluster(Box<dyn Error + Send + Sync>, &'static Location<'static>),
}

impl DistError {
    #[track_caller]
    pub fn network(err: Box<dyn Error + Send + Sync>) -> Self {
        DistError::Network(err, Location::caller())
    }

    #[track_caller]
    pub fn schedule(msg: impl Into<String>) -> Self {
        DistError::Schedule(msg.into(), Location::caller())
    }

    #[track_caller]
    pub fn internal(msg: impl Into<String>) -> Self {
        DistError::Internal(msg.into(), Location::caller())
    }

    #[track_caller]
    pub fn cluster(err: Box<dyn Error + Send + Sync>) -> Self {
        DistError::Cluster(err, Location::caller())
    }
}

impl Display for DistError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            DistError::Arrow(err, loc) => write!(f, "Arrow error: {err} at {loc}"),
            DistError::DataFusion(err, loc) => write!(f, "DataFusion error: {err} at {loc}"),
            DistError::Tokio(err, loc) => write!(f, "Tokio error: {err} at {loc}"),
            DistError::Network(err, loc) => write!(f, "Network error: {err} at {loc}"),
            DistError::Plan(msg, loc) => write!(f, "Plan error: {msg} at {loc}"),
            DistError::Schedule(msg, loc) => write!(f, "Schedule error: {msg} at {loc}"),
            DistError::Internal(msg, loc) => write!(f, "Internal error: {msg} at {loc}"),
            DistError::Cluster(err, _) => write!(f, "Cluster error: {}", err),
        }
    }
}

impl Error for DistError {
    fn source(&self) -> Option<&(dyn Error + 'static)> {
        match self {
            DistError::Arrow(err, _) => Some(err),
            DistError::DataFusion(err, _) => Some(err),
            DistError::Tokio(err, _) => Some(err),
            DistError::Network(err, _) => Some(err.as_ref()),
            DistError::Plan(_, _) => None,
            DistError::Schedule(_, _) => None,
            DistError::Internal(_, _) => None,
            DistError::Cluster(err, _) => Some(err.as_ref()),
        }
    }
}

impl From<ArrowError> for DistError {
    #[track_caller]
    fn from(value: ArrowError) -> Self {
        DistError::Arrow(value, Location::caller())
    }
}

impl From<DataFusionError> for DistError {
    #[track_caller]
    fn from(err: DataFusionError) -> Self {
        match err {
            DataFusionError::External(external) if external.is::<DistError>() => {
                let dist = external
                    .downcast::<DistError>()
                    .expect("Datafusion external error can not be downcasted to DistError");
                *dist
            }
            e => DistError::DataFusion(e, Location::caller()),
        }
    }
}

impl From<JoinError> for DistError {
    #[track_caller]
    fn from(err: JoinError) -> Self {
        DistError::Tokio(err, Location::caller())
    }
}

impl From<DistError> for DataFusionError {
    fn from(value: DistError) -> Self {
        match value {
            DistError::DataFusion(err, _) => err,
            v => DataFusionError::External(Box::new(v)),
        }
    }
}