datafusion_table_providers/util/
retriable_error.rs

1use std::error::Error;
2
3use datafusion::error::DataFusionError;
4use snafu::Snafu;
5
6#[derive(Debug, Snafu)]
7pub enum RetriableError {
8    #[snafu(display("{source}"))]
9    DataRetrievalError {
10        source: datafusion::error::DataFusionError,
11    },
12
13    #[snafu(display("{source}"))]
14    DataWriteError {
15        source: Box<dyn Error + Send + Sync>,
16    },
17}
18
19#[must_use]
20pub fn is_retriable_error(err: &DataFusionError) -> bool {
21    match err {
22        DataFusionError::External(err) => err.downcast_ref::<RetriableError>().is_some(),
23        DataFusionError::Context(_, err) => is_retriable_error(err.as_ref()),
24        _ => false,
25    }
26}
27
28/// Checks if the data retrieval error is NOT related to invalid input (e.g., SQL, plan creation, schema issues).
29/// In this case, the error is wrapped as `RetriableError::DataRetrievalError`
30/// so we can detect this error and retry later at a higher level
31#[must_use]
32pub fn check_and_mark_retriable_error(err: DataFusionError) -> DataFusionError {
33    // don't wrap as retriable errors related to invalid SQL, schema, query plan, etc.
34    if is_invalid_query_error(&err) {
35        return err;
36    }
37
38    // already wrapped RetriableError
39    if is_retriable_error(&err) {
40        return err;
41    }
42
43    DataFusionError::External(Box::new(RetriableError::DataRetrievalError { source: err }))
44}
45
46// Wraps error as `RetriableError::DataWriteError` so we can detect this error and retry later at a higher level
47#[must_use]
48pub fn to_retriable_data_write_error<E>(error: E) -> DataFusionError
49where
50    E: Error + Send + Sync + 'static,
51{
52    DataFusionError::External(Box::new(RetriableError::DataWriteError {
53        source: error.into(),
54    }))
55}
56
57fn is_invalid_query_error(error: &DataFusionError) -> bool {
58    match error {
59        DataFusionError::Context(_, err) => is_invalid_query_error(err.as_ref()),
60        DataFusionError::SQL(..) | DataFusionError::Plan(..) | DataFusionError::SchemaError(..) => {
61            true
62        }
63        _ => false,
64    }
65}