datafusion_table_providers/util/
retriable_error.rs1use std::error::Error;
2
3use datafusion::error::DataFusionError;
4use snafu::Snafu;
5
6#[derive(Debug, Snafu)]
7pub enum RetriableError {
8 #[snafu(display("{source}"))]
9 DataRetrievalError {
10 source: datafusion::error::DataFusionError,
11 },
12
13 #[snafu(display("{source}"))]
14 DataWriteError {
15 source: Box<dyn Error + Send + Sync>,
16 },
17}
18
19#[must_use]
20pub fn is_retriable_error(err: &DataFusionError) -> bool {
21 match err {
22 DataFusionError::External(err) => err.downcast_ref::<RetriableError>().is_some(),
23 DataFusionError::Context(_, err) => is_retriable_error(err.as_ref()),
24 _ => false,
25 }
26}
27
28#[must_use]
32pub fn check_and_mark_retriable_error(err: DataFusionError) -> DataFusionError {
33 if is_invalid_query_error(&err) {
35 return err;
36 }
37
38 if is_retriable_error(&err) {
40 return err;
41 }
42
43 DataFusionError::External(Box::new(RetriableError::DataRetrievalError { source: err }))
44}
45
46#[must_use]
48pub fn to_retriable_data_write_error<E>(error: E) -> DataFusionError
49where
50 E: Error + Send + Sync + 'static,
51{
52 DataFusionError::External(Box::new(RetriableError::DataWriteError {
53 source: error.into(),
54 }))
55}
56
57fn is_invalid_query_error(error: &DataFusionError) -> bool {
58 match error {
59 DataFusionError::Context(_, err) => is_invalid_query_error(err.as_ref()),
60 DataFusionError::SQL(..) | DataFusionError::Plan(..) | DataFusionError::SchemaError(..) => {
61 true
62 }
63 _ => false,
64 }
65}