datafusion_common/
error.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DataFusion error types
19#[cfg(feature = "backtrace")]
20use std::backtrace::{Backtrace, BacktraceStatus};
21
22use std::borrow::Cow;
23use std::collections::VecDeque;
24use std::error::Error;
25use std::fmt::{Display, Formatter};
26use std::io;
27use std::result;
28use std::sync::Arc;
29
30use crate::utils::datafusion_strsim::normalized_levenshtein;
31use crate::utils::quote_identifier;
32use crate::{Column, DFSchema, Diagnostic, TableReference};
33#[cfg(feature = "avro")]
34use apache_avro::Error as AvroError;
35use arrow::error::ArrowError;
36#[cfg(feature = "parquet")]
37use parquet::errors::ParquetError;
38#[cfg(feature = "sql")]
39use sqlparser::parser::ParserError;
40use tokio::task::JoinError;
41
42/// Result type for operations that could result in an [DataFusionError]
43pub type Result<T, E = DataFusionError> = result::Result<T, E>;
44
45/// Result type for operations that could result in an [DataFusionError] and needs to be shared (wrapped into `Arc`).
46pub type SharedResult<T> = result::Result<T, Arc<DataFusionError>>;
47
48/// Error type for generic operations that could result in DataFusionError::External
49pub type GenericError = Box<dyn Error + Send + Sync>;
50
51/// DataFusion error
52#[derive(Debug)]
53pub enum DataFusionError {
54    /// Error returned by arrow.
55    ///
56    /// 2nd argument is for optional backtrace
57    ArrowError(Box<ArrowError>, Option<String>),
58    /// Error when reading / writing Parquet data.
59    #[cfg(feature = "parquet")]
60    ParquetError(Box<ParquetError>),
61    /// Error when reading Avro data.
62    #[cfg(feature = "avro")]
63    AvroError(Box<AvroError>),
64    /// Error when reading / writing to / from an object_store (e.g. S3 or LocalFile)
65    #[cfg(feature = "object_store")]
66    ObjectStore(Box<object_store::Error>),
67    /// Error when an I/O operation fails
68    IoError(io::Error),
69    /// Error when SQL is syntactically incorrect.
70    ///
71    /// 2nd argument is for optional backtrace
72    #[cfg(feature = "sql")]
73    SQL(Box<ParserError>, Option<String>),
74    /// Error when a feature is not yet implemented.
75    ///
76    /// These errors are sometimes returned for features that are still in
77    /// development and are not entirely complete. Often, these errors are
78    /// tracked in our issue tracker.
79    NotImplemented(String),
80    /// Error due to bugs in DataFusion
81    ///
82    /// This error should not happen in normal usage of DataFusion. It results
83    /// from something that wasn't expected/anticipated by the implementation
84    /// and that is most likely a bug (the error message even encourages users
85    /// to open a bug report). A user should not be able to trigger internal
86    /// errors under normal circumstances by feeding in malformed queries, bad
87    /// data, etc.
88    ///
89    /// Note that I/O errors (or any error that happens due to external systems)
90    /// do NOT fall under this category. See other variants such as
91    /// [`Self::IoError`] and [`Self::External`].
92    ///
93    /// DataFusions has internal invariants that the compiler is not always able
94    /// to check. This error is raised when one of those invariants does not
95    /// hold for some reason.
96    Internal(String),
97    /// Error during planning of the query.
98    ///
99    /// This error happens when the user provides a bad query or plan, for
100    /// example the user attempts to call a function that doesn't exist, or if
101    /// the types of a function call are not supported.
102    Plan(String),
103    /// Error for invalid or unsupported configuration options.
104    Configuration(String),
105    /// Error when there is a problem with the query related to schema.
106    ///
107    /// This error can be returned in cases such as when schema inference is not
108    /// possible and when column names are not unique.
109    ///
110    /// 2nd argument is for optional backtrace
111    /// Boxing the optional backtrace to prevent <https://rust-lang.github.io/rust-clippy/master/index.html#/result_large_err>
112    SchemaError(Box<SchemaError>, Box<Option<String>>),
113    /// Error during execution of the query.
114    ///
115    /// This error is returned when an error happens during execution due to a
116    /// malformed input. For example, the user passed malformed arguments to a
117    /// SQL method, opened a CSV file that is broken, or tried to divide an
118    /// integer by zero.
119    Execution(String),
120    /// [`JoinError`] during execution of the query.
121    ///
122    /// This error can't occur for unjoined tasks, such as execution shutdown.
123    ExecutionJoin(Box<JoinError>),
124    /// Error when resources (such as memory of scratch disk space) are exhausted.
125    ///
126    /// This error is thrown when a consumer cannot acquire additional memory
127    /// or other resources needed to execute the query from the Memory Manager.
128    ResourcesExhausted(String),
129    /// Errors originating from outside DataFusion's core codebase.
130    ///
131    /// For example, a custom S3Error from the crate datafusion-objectstore-s3
132    External(GenericError),
133    /// Error with additional context
134    Context(String, Box<DataFusionError>),
135    /// Errors from either mapping LogicalPlans to/from Substrait plans
136    /// or serializing/deserializing protobytes to Substrait plans
137    Substrait(String),
138    /// Error wrapped together with additional contextual information intended
139    /// for end users, to help them understand what went wrong by providing
140    /// human-readable messages, and locations in the source query that relate
141    /// to the error in some way.
142    Diagnostic(Box<Diagnostic>, Box<DataFusionError>),
143    /// A collection of one or more [`DataFusionError`]. Useful in cases where
144    /// DataFusion can recover from an erroneous state, and produce more errors
145    /// before terminating. e.g. when planning a SELECT clause, DataFusion can
146    /// synchronize to the next `SelectItem` if the previous one had errors. The
147    /// end result is that the user can see errors about all `SelectItem`,
148    /// instead of just the first one.
149    Collection(Vec<DataFusionError>),
150    /// A [`DataFusionError`] which shares an underlying [`DataFusionError`].
151    ///
152    /// This is useful when the same underlying [`DataFusionError`] is passed
153    /// to multiple receivers. For example, when the source of a repartition
154    /// errors and the error is propagated to multiple consumers.
155    Shared(Arc<DataFusionError>),
156}
157
158#[macro_export]
159macro_rules! context {
160    ($desc:expr, $err:expr) => {
161        $err.context(format!("{} at {}:{}", $desc, file!(), line!()))
162    };
163}
164
165/// Schema-related errors
166#[derive(Debug)]
167pub enum SchemaError {
168    /// Schema contains a (possibly) qualified and unqualified field with same unqualified name
169    AmbiguousReference { field: Box<Column> },
170    /// Schema contains duplicate qualified field name
171    DuplicateQualifiedField {
172        qualifier: Box<TableReference>,
173        name: String,
174    },
175    /// Schema contains duplicate unqualified field name
176    DuplicateUnqualifiedField { name: String },
177    /// No field with this name
178    FieldNotFound {
179        field: Box<Column>,
180        valid_fields: Vec<Column>,
181    },
182}
183
184impl Display for SchemaError {
185    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
186        match self {
187            Self::FieldNotFound {
188                field,
189                valid_fields,
190            } => {
191                write!(f, "No field named {}", field.quoted_flat_name())?;
192                let lower_valid_fields = valid_fields
193                    .iter()
194                    .map(|column| column.flat_name().to_lowercase())
195                    .collect::<Vec<String>>();
196
197                let valid_fields_names = valid_fields
198                    .iter()
199                    .map(|column| column.flat_name())
200                    .collect::<Vec<String>>();
201                if lower_valid_fields.contains(&field.flat_name().to_lowercase()) {
202                    write!(
203                        f,
204                        ". Column names are case sensitive. You can use double quotes to refer to the \"{}\" column \
205                        or set the datafusion.sql_parser.enable_ident_normalization configuration",
206                        field.quoted_flat_name()
207                    )?;
208                }
209                let field_name = field.name();
210                if let Some(matched) = valid_fields_names
211                    .iter()
212                    .filter(|str| normalized_levenshtein(str, field_name) >= 0.5)
213                    .collect::<Vec<&String>>()
214                    .first()
215                {
216                    write!(f, ". Did you mean '{matched}'?")?;
217                } else if !valid_fields.is_empty() {
218                    write!(
219                        f,
220                        ". Valid fields are {}",
221                        valid_fields
222                            .iter()
223                            .map(|field| field.quoted_flat_name())
224                            .collect::<Vec<String>>()
225                            .join(", ")
226                    )?;
227                }
228                write!(f, ".")
229            }
230            Self::DuplicateQualifiedField { qualifier, name } => {
231                write!(
232                    f,
233                    "Schema contains duplicate qualified field name {}.{}",
234                    qualifier.to_quoted_string(),
235                    quote_identifier(name)
236                )
237            }
238            Self::DuplicateUnqualifiedField { name } => {
239                write!(
240                    f,
241                    "Schema contains duplicate unqualified field name {}",
242                    quote_identifier(name)
243                )
244            }
245            Self::AmbiguousReference { field } => {
246                if field.relation.is_some() {
247                    write!(
248                        f,
249                        "Schema contains qualified field name {} and unqualified field name {} which would be ambiguous",
250                        field.quoted_flat_name(),
251                        quote_identifier(&field.name)
252                    )
253                } else {
254                    write!(
255                        f,
256                        "Ambiguous reference to unqualified field {}",
257                        field.quoted_flat_name()
258                    )
259                }
260            }
261        }
262    }
263}
264
265impl Error for SchemaError {}
266
267impl From<std::fmt::Error> for DataFusionError {
268    fn from(_e: std::fmt::Error) -> Self {
269        DataFusionError::Execution("Fail to format".to_string())
270    }
271}
272
273impl From<io::Error> for DataFusionError {
274    fn from(e: io::Error) -> Self {
275        DataFusionError::IoError(e)
276    }
277}
278
279impl From<ArrowError> for DataFusionError {
280    fn from(e: ArrowError) -> Self {
281        DataFusionError::ArrowError(Box::new(e), Some(DataFusionError::get_back_trace()))
282    }
283}
284
285impl From<DataFusionError> for ArrowError {
286    fn from(e: DataFusionError) -> Self {
287        match e {
288            DataFusionError::ArrowError(e, _) => *e,
289            DataFusionError::External(e) => ArrowError::ExternalError(e),
290            other => ArrowError::ExternalError(Box::new(other)),
291        }
292    }
293}
294
295impl From<&Arc<DataFusionError>> for DataFusionError {
296    fn from(e: &Arc<DataFusionError>) -> Self {
297        if let DataFusionError::Shared(e_inner) = e.as_ref() {
298            // don't re-wrap
299            DataFusionError::Shared(Arc::clone(e_inner))
300        } else {
301            DataFusionError::Shared(Arc::clone(e))
302        }
303    }
304}
305
306#[cfg(feature = "parquet")]
307impl From<ParquetError> for DataFusionError {
308    fn from(e: ParquetError) -> Self {
309        DataFusionError::ParquetError(Box::new(e))
310    }
311}
312
313#[cfg(feature = "avro")]
314impl From<AvroError> for DataFusionError {
315    fn from(e: AvroError) -> Self {
316        DataFusionError::AvroError(Box::new(e))
317    }
318}
319
320#[cfg(feature = "object_store")]
321impl From<object_store::Error> for DataFusionError {
322    fn from(e: object_store::Error) -> Self {
323        DataFusionError::ObjectStore(Box::new(e))
324    }
325}
326
327#[cfg(feature = "object_store")]
328impl From<object_store::path::Error> for DataFusionError {
329    fn from(e: object_store::path::Error) -> Self {
330        DataFusionError::ObjectStore(Box::new(e.into()))
331    }
332}
333
334#[cfg(feature = "sql")]
335impl From<ParserError> for DataFusionError {
336    fn from(e: ParserError) -> Self {
337        DataFusionError::SQL(Box::new(e), None)
338    }
339}
340
341impl From<GenericError> for DataFusionError {
342    fn from(err: GenericError) -> Self {
343        // If the error is already a DataFusionError, not wrapping it.
344        if err.is::<DataFusionError>() {
345            if let Ok(e) = err.downcast::<DataFusionError>() {
346                *e
347            } else {
348                unreachable!()
349            }
350        } else {
351            DataFusionError::External(err)
352        }
353    }
354}
355
356impl Display for DataFusionError {
357    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
358        let error_prefix = self.error_prefix();
359        let message = self.message();
360        write!(f, "{error_prefix}{message}")
361    }
362}
363
364impl Error for DataFusionError {
365    fn source(&self) -> Option<&(dyn Error + 'static)> {
366        match self {
367            DataFusionError::ArrowError(e, _) => Some(e.as_ref()),
368            #[cfg(feature = "parquet")]
369            DataFusionError::ParquetError(e) => Some(e.as_ref()),
370            #[cfg(feature = "avro")]
371            DataFusionError::AvroError(e) => Some(e.as_ref()),
372            #[cfg(feature = "object_store")]
373            DataFusionError::ObjectStore(e) => Some(e.as_ref()),
374            DataFusionError::IoError(e) => Some(e),
375            #[cfg(feature = "sql")]
376            DataFusionError::SQL(e, _) => Some(e.as_ref()),
377            DataFusionError::NotImplemented(_) => None,
378            DataFusionError::Internal(_) => None,
379            DataFusionError::Configuration(_) => None,
380            DataFusionError::Plan(_) => None,
381            DataFusionError::SchemaError(e, _) => Some(e.as_ref()),
382            DataFusionError::Execution(_) => None,
383            DataFusionError::ExecutionJoin(e) => Some(e.as_ref()),
384            DataFusionError::ResourcesExhausted(_) => None,
385            DataFusionError::External(e) => Some(e.as_ref()),
386            DataFusionError::Context(_, e) => Some(e.as_ref()),
387            DataFusionError::Substrait(_) => None,
388            DataFusionError::Diagnostic(_, e) => Some(e.as_ref()),
389            // Can't really make a Collection fit into the mold of "an error has
390            // at most one source", but returning the first one is probably good
391            // idea. Especially since `DataFusionError::Collection` is mostly
392            // meant for consumption by the end user, so shouldn't interfere
393            // with programmatic usage too much. Plus, having 1 or 5 errors
394            // doesn't really change the fact that the query is invalid and
395            // can't be executed.
396            DataFusionError::Collection(errs) => errs.first().map(|e| e as &dyn Error),
397            DataFusionError::Shared(e) => Some(e.as_ref()),
398        }
399    }
400}
401
402impl From<DataFusionError> for io::Error {
403    fn from(e: DataFusionError) -> Self {
404        io::Error::other(e)
405    }
406}
407
408impl DataFusionError {
409    /// The separator between the error message and the backtrace
410    pub const BACK_TRACE_SEP: &'static str = "\n\nbacktrace: ";
411
412    /// Get deepest underlying [`DataFusionError`]
413    ///
414    /// [`DataFusionError`]s sometimes form a chain, such as `DataFusionError::ArrowError()` in order to conform
415    /// to the correct error signature. Thus sometimes there is a chain several layers deep that can obscure the
416    /// original error. This function finds the lowest level DataFusionError possible.
417    ///
418    /// For example,  `find_root` will return`DataFusionError::ResourceExhausted` given the input
419    /// ```text
420    /// DataFusionError::ArrowError
421    ///   ArrowError::External
422    ///    Box(DataFusionError::Context)
423    ///      DataFusionError::ResourceExhausted
424    /// ```
425    ///
426    /// This may be the same as `self`.
427    pub fn find_root(&self) -> &Self {
428        // Note: This is a non-recursive algorithm so we do not run
429        // out of stack space, even for long error chains.
430
431        let mut last_datafusion_error = self;
432        let mut root_error: &dyn Error = self;
433        while let Some(source) = root_error.source() {
434            // walk the next level
435            root_error = source;
436            // remember the lowest datafusion error so far
437            if let Some(e) = root_error.downcast_ref::<DataFusionError>() {
438                last_datafusion_error = e;
439            } else if let Some(e) = root_error.downcast_ref::<Arc<DataFusionError>>() {
440                // As `Arc<T>::source()` calls through to `T::source()` we need to
441                // explicitly match `Arc<DataFusionError>` to capture it
442                last_datafusion_error = e.as_ref();
443            }
444        }
445        // return last checkpoint (which may be the original error)
446        last_datafusion_error
447    }
448
449    /// wraps self in Self::Context with a description
450    pub fn context(self, description: impl Into<String>) -> Self {
451        Self::Context(description.into(), Box::new(self))
452    }
453
454    /// Strips backtrace out of the error message
455    /// If backtrace enabled then error has a format "message" [`Self::BACK_TRACE_SEP`] "backtrace"
456    /// The method strips the backtrace and outputs "message"
457    pub fn strip_backtrace(&self) -> String {
458        (*self
459            .to_string()
460            .split(Self::BACK_TRACE_SEP)
461            .collect::<Vec<&str>>()
462            .first()
463            .unwrap_or(&""))
464        .to_string()
465    }
466
467    /// To enable optional rust backtrace in DataFusion:
468    /// - [`Setup Env Variables`]<https://doc.rust-lang.org/std/backtrace/index.html#environment-variables>
469    /// - Enable `backtrace` cargo feature
470    ///
471    /// Example:
472    /// cargo build --features 'backtrace'
473    /// RUST_BACKTRACE=1 ./app
474    #[inline(always)]
475    pub fn get_back_trace() -> String {
476        #[cfg(feature = "backtrace")]
477        {
478            let back_trace = Backtrace::capture();
479            if back_trace.status() == BacktraceStatus::Captured {
480                return format!("{}{}", Self::BACK_TRACE_SEP, back_trace);
481            }
482
483            "".to_owned()
484        }
485
486        #[cfg(not(feature = "backtrace"))]
487        "".to_owned()
488    }
489
490    /// Return a [`DataFusionErrorBuilder`] to build a [`DataFusionError`]
491    pub fn builder() -> DataFusionErrorBuilder {
492        DataFusionErrorBuilder::default()
493    }
494
495    fn error_prefix(&self) -> &'static str {
496        match self {
497            DataFusionError::ArrowError(_, _) => "Arrow error: ",
498            #[cfg(feature = "parquet")]
499            DataFusionError::ParquetError(_) => "Parquet error: ",
500            #[cfg(feature = "avro")]
501            DataFusionError::AvroError(_) => "Avro error: ",
502            #[cfg(feature = "object_store")]
503            DataFusionError::ObjectStore(_) => "Object Store error: ",
504            DataFusionError::IoError(_) => "IO error: ",
505            #[cfg(feature = "sql")]
506            DataFusionError::SQL(_, _) => "SQL error: ",
507            DataFusionError::NotImplemented(_) => {
508                "This feature is not implemented: "
509            }
510            DataFusionError::Internal(_) => "Internal error: ",
511            DataFusionError::Plan(_) => "Error during planning: ",
512            DataFusionError::Configuration(_) => {
513                "Invalid or Unsupported Configuration: "
514            }
515            DataFusionError::SchemaError(_, _) => "Schema error: ",
516            DataFusionError::Execution(_) => "Execution error: ",
517            DataFusionError::ExecutionJoin(_) => "ExecutionJoin error: ",
518            DataFusionError::ResourcesExhausted(_) => {
519                "Resources exhausted: "
520            }
521            DataFusionError::External(_) => "External error: ",
522            DataFusionError::Context(_, _) => "",
523            DataFusionError::Substrait(_) => "Substrait error: ",
524            DataFusionError::Diagnostic(_, _) => "",
525            DataFusionError::Collection(errs) => {
526                errs.first().expect("cannot construct DataFusionError::Collection with 0 errors, but got one such case").error_prefix()
527            }
528            DataFusionError::Shared(_) => "",
529        }
530    }
531
532    pub fn message(&self) -> Cow<'_, str> {
533        match *self {
534            DataFusionError::ArrowError(ref desc, ref backtrace) => {
535                let backtrace = backtrace.clone().unwrap_or_else(|| "".to_owned());
536                Cow::Owned(format!("{desc}{backtrace}"))
537            }
538            #[cfg(feature = "parquet")]
539            DataFusionError::ParquetError(ref desc) => Cow::Owned(desc.to_string()),
540            #[cfg(feature = "avro")]
541            DataFusionError::AvroError(ref desc) => Cow::Owned(desc.to_string()),
542            DataFusionError::IoError(ref desc) => Cow::Owned(desc.to_string()),
543            #[cfg(feature = "sql")]
544            DataFusionError::SQL(ref desc, ref backtrace) => {
545                let backtrace: String =
546                    backtrace.clone().unwrap_or_else(|| "".to_owned());
547                Cow::Owned(format!("{desc:?}{backtrace}"))
548            }
549            DataFusionError::Configuration(ref desc) => Cow::Owned(desc.to_string()),
550            DataFusionError::NotImplemented(ref desc) => Cow::Owned(desc.to_string()),
551            DataFusionError::Internal(ref desc) => Cow::Owned(format!(
552                "{desc}.\nThis issue was likely caused by a bug in DataFusion's code. \
553                Please help us to resolve this by filing a bug report in our issue tracker: \
554                https://github.com/apache/datafusion/issues"
555            )),
556            DataFusionError::Plan(ref desc) => Cow::Owned(desc.to_string()),
557            DataFusionError::SchemaError(ref desc, ref backtrace) => {
558                let backtrace: &str =
559                    &backtrace.as_ref().clone().unwrap_or_else(|| "".to_owned());
560                Cow::Owned(format!("{desc}{backtrace}"))
561            }
562            DataFusionError::Execution(ref desc) => Cow::Owned(desc.to_string()),
563            DataFusionError::ExecutionJoin(ref desc) => Cow::Owned(desc.to_string()),
564            DataFusionError::ResourcesExhausted(ref desc) => Cow::Owned(desc.to_string()),
565            DataFusionError::External(ref desc) => Cow::Owned(desc.to_string()),
566            #[cfg(feature = "object_store")]
567            DataFusionError::ObjectStore(ref desc) => Cow::Owned(desc.to_string()),
568            DataFusionError::Context(ref desc, ref err) => {
569                Cow::Owned(format!("{desc}\ncaused by\n{}", *err))
570            }
571            DataFusionError::Substrait(ref desc) => Cow::Owned(desc.to_string()),
572            DataFusionError::Diagnostic(_, ref err) => Cow::Owned(err.to_string()),
573            // Returning the message of the first error is probably fine enough,
574            // and makes `DataFusionError::Collection` a transparent wrapped,
575            // unless the end user explicitly calls `DataFusionError::iter`.
576            DataFusionError::Collection(ref errs) => errs
577                .first()
578                .expect("cannot construct DataFusionError::Collection with 0 errors")
579                .message(),
580            DataFusionError::Shared(ref desc) => Cow::Owned(desc.to_string()),
581        }
582    }
583
584    /// Wraps the error with contextual information intended for end users
585    pub fn with_diagnostic(self, diagnostic: Diagnostic) -> Self {
586        Self::Diagnostic(Box::new(diagnostic), Box::new(self))
587    }
588
589    /// Wraps the error with contextual information intended for end users.
590    /// Takes a function that inspects the error and returns the diagnostic to
591    /// wrap it with.
592    pub fn with_diagnostic_fn<F: FnOnce(&DataFusionError) -> Diagnostic>(
593        self,
594        f: F,
595    ) -> Self {
596        let diagnostic = f(&self);
597        self.with_diagnostic(diagnostic)
598    }
599
600    /// Gets the [`Diagnostic`] associated with the error, if any. If there is
601    /// more than one, only the outermost [`Diagnostic`] is returned.
602    pub fn diagnostic(&self) -> Option<&Diagnostic> {
603        struct DiagnosticsIterator<'a> {
604            head: &'a DataFusionError,
605        }
606
607        impl<'a> Iterator for DiagnosticsIterator<'a> {
608            type Item = &'a Diagnostic;
609
610            fn next(&mut self) -> Option<Self::Item> {
611                loop {
612                    if let DataFusionError::Diagnostic(diagnostics, source) = self.head {
613                        self.head = source.as_ref();
614                        return Some(diagnostics);
615                    }
616
617                    if let Some(source) = self
618                        .head
619                        .source()
620                        .and_then(|source| source.downcast_ref::<DataFusionError>())
621                    {
622                        self.head = source;
623                    } else {
624                        return None;
625                    }
626                }
627            }
628        }
629
630        DiagnosticsIterator { head: self }.next()
631    }
632
633    /// Return an iterator over this [`DataFusionError`] and any other
634    /// [`DataFusionError`]s in a [`DataFusionError::Collection`].
635    ///
636    /// Sometimes DataFusion is able to collect multiple errors in a SQL query
637    /// before terminating, e.g. across different expressions in a SELECT
638    /// statements or different sides of a UNION. This method returns an
639    /// iterator over all the errors in the collection.
640    ///
641    /// For this to work, the top-level error must be a
642    /// `DataFusionError::Collection`, not something that contains it.
643    pub fn iter(&self) -> impl Iterator<Item = &DataFusionError> {
644        struct ErrorIterator<'a> {
645            queue: VecDeque<&'a DataFusionError>,
646        }
647
648        impl<'a> Iterator for ErrorIterator<'a> {
649            type Item = &'a DataFusionError;
650
651            fn next(&mut self) -> Option<Self::Item> {
652                loop {
653                    let popped = self.queue.pop_front()?;
654                    match popped {
655                        DataFusionError::Collection(errs) => self.queue.extend(errs),
656                        _ => return Some(popped),
657                    }
658                }
659            }
660        }
661
662        let mut queue = VecDeque::new();
663        queue.push_back(self);
664        ErrorIterator { queue }
665    }
666}
667
668/// A builder for [`DataFusionError`]
669///
670/// This builder can be used to collect multiple errors and return them as a
671/// [`DataFusionError::Collection`].
672///
673/// # Example: no errors
674/// ```
675/// # use datafusion_common::DataFusionError;
676/// let mut builder = DataFusionError::builder();
677/// // ok_or returns the value if no errors have been added
678/// assert_eq!(builder.error_or(42).unwrap(), 42);
679/// ```
680///
681/// # Example: with errors
682/// ```
683/// # use datafusion_common::{assert_contains, DataFusionError};
684/// let mut builder = DataFusionError::builder();
685/// builder.add_error(DataFusionError::Internal("foo".to_owned()));
686/// // ok_or returns the value if no errors have been added
687/// assert_contains!(
688///     builder.error_or(42).unwrap_err().to_string(),
689///     "Internal error: foo"
690/// );
691/// ```
692#[derive(Debug, Default)]
693pub struct DataFusionErrorBuilder(Vec<DataFusionError>);
694
695impl DataFusionErrorBuilder {
696    /// Create a new [`DataFusionErrorBuilder`]
697    pub fn new() -> Self {
698        Default::default()
699    }
700
701    /// Add an error to the in progress list
702    ///
703    /// # Example
704    /// ```
705    /// # use datafusion_common::{assert_contains, DataFusionError};
706    /// let mut builder = DataFusionError::builder();
707    /// builder.add_error(DataFusionError::Internal("foo".to_owned()));
708    /// assert_contains!(
709    ///     builder.error_or(42).unwrap_err().to_string(),
710    ///     "Internal error: foo"
711    /// );
712    /// ```
713    pub fn add_error(&mut self, error: DataFusionError) {
714        self.0.push(error);
715    }
716
717    /// Add an error to the in progress list, returning the builder
718    ///
719    /// # Example
720    /// ```
721    /// # use datafusion_common::{assert_contains, DataFusionError};
722    /// let builder = DataFusionError::builder()
723    ///     .with_error(DataFusionError::Internal("foo".to_owned()));
724    /// assert_contains!(
725    ///     builder.error_or(42).unwrap_err().to_string(),
726    ///     "Internal error: foo"
727    /// );
728    /// ```
729    pub fn with_error(mut self, error: DataFusionError) -> Self {
730        self.0.push(error);
731        self
732    }
733
734    /// Returns `Ok(ok)` if no errors were added to the builder,
735    /// otherwise returns a `Result::Err`
736    pub fn error_or<T>(self, ok: T) -> Result<T, DataFusionError> {
737        match self.0.len() {
738            0 => Ok(ok),
739            1 => Err(self.0.into_iter().next().expect("length matched 1")),
740            _ => Err(DataFusionError::Collection(self.0)),
741        }
742    }
743}
744
745/// Unwrap an `Option` if possible. Otherwise return an `DataFusionError::Internal`.
746/// In normal usage of DataFusion the unwrap should always succeed.
747///
748/// Example: `let values = unwrap_or_internal_err!(values)`
749#[macro_export]
750macro_rules! unwrap_or_internal_err {
751    ($Value: ident) => {
752        $Value.ok_or_else(|| {
753            DataFusionError::Internal(format!(
754                "{} should not be None",
755                stringify!($Value)
756            ))
757        })?
758    };
759}
760
761/// Add a macros for concise  DataFusionError::* errors declaration
762/// supports placeholders the same way as `format!`
763/// Examples:
764///     plan_err!("Error")
765///     plan_err!("Error {}", val)
766///     plan_err!("Error {:?}", val)
767///     plan_err!("Error {val}")
768///     plan_err!("Error {val:?}")
769///
770/// `NAME_ERR` -  macro name for wrapping Err(DataFusionError::*)
771/// `NAME_DF_ERR` -  macro name for wrapping DataFusionError::*. Needed to keep backtrace opportunity
772/// in construction where DataFusionError::* used directly, like `map_err`, `ok_or_else`, etc
773macro_rules! make_error {
774    ($NAME_ERR:ident, $NAME_DF_ERR: ident, $ERR:ident) => { make_error!(@inner ($), $NAME_ERR, $NAME_DF_ERR, $ERR); };
775    (@inner ($d:tt), $NAME_ERR:ident, $NAME_DF_ERR:ident, $ERR:ident) => {
776        ::paste::paste!{
777            /// Macro wraps `$ERR` to add backtrace feature
778            #[macro_export]
779            macro_rules! $NAME_DF_ERR {
780                ($d($d args:expr),* $d(; diagnostic=$d DIAG:expr)?) => {{
781                    let err =$crate::DataFusionError::$ERR(
782                        ::std::format!(
783                            "{}{}",
784                            ::std::format!($d($d args),*),
785                            $crate::DataFusionError::get_back_trace(),
786                        ).into()
787                    );
788                    $d (
789                        let err = err.with_diagnostic($d DIAG);
790                    )?
791                    err
792                }
793            }
794        }
795
796            /// Macro wraps Err(`$ERR`) to add backtrace feature
797            #[macro_export]
798            macro_rules! $NAME_ERR {
799                ($d($d args:expr),* $d(; diagnostic = $d DIAG:expr)?) => {{
800                    let err = $crate::[<_ $NAME_DF_ERR>]!($d($d args),*);
801                    $d (
802                        let err = err.with_diagnostic($d DIAG);
803                    )?
804                    Err(err)
805
806                }}
807            }
808
809
810            // Note: Certain macros are used in this  crate, but not all.
811            // This macro generates a use or all of them in case they are needed
812            // so we allow unused code to avoid warnings when they are not used
813            #[doc(hidden)]
814            #[allow(unused)]
815            pub use $NAME_ERR as [<_ $NAME_ERR>];
816            #[doc(hidden)]
817            #[allow(unused)]
818            pub use $NAME_DF_ERR as [<_ $NAME_DF_ERR>];
819        }
820    };
821}
822
823// Exposes a macro to create `DataFusionError::Plan` with optional backtrace
824make_error!(plan_err, plan_datafusion_err, Plan);
825
826// Exposes a macro to create `DataFusionError::Internal` with optional backtrace
827make_error!(internal_err, internal_datafusion_err, Internal);
828
829// Exposes a macro to create `DataFusionError::NotImplemented` with optional backtrace
830make_error!(not_impl_err, not_impl_datafusion_err, NotImplemented);
831
832// Exposes a macro to create `DataFusionError::Execution` with optional backtrace
833make_error!(exec_err, exec_datafusion_err, Execution);
834
835// Exposes a macro to create `DataFusionError::Configuration` with optional backtrace
836make_error!(config_err, config_datafusion_err, Configuration);
837
838// Exposes a macro to create `DataFusionError::Substrait` with optional backtrace
839make_error!(substrait_err, substrait_datafusion_err, Substrait);
840
841// Exposes a macro to create `DataFusionError::ResourcesExhausted` with optional backtrace
842make_error!(resources_err, resources_datafusion_err, ResourcesExhausted);
843
844// Exposes a macro to create `DataFusionError::SQL` with optional backtrace
845#[macro_export]
846macro_rules! sql_datafusion_err {
847    ($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
848        let err = DataFusionError::SQL(Box::new($ERR), Some(DataFusionError::get_back_trace()));
849        $(
850            let err = err.with_diagnostic($DIAG);
851        )?
852        err
853    }};
854}
855
856// Exposes a macro to create `Err(DataFusionError::SQL)` with optional backtrace
857#[macro_export]
858macro_rules! sql_err {
859    ($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
860        let err = datafusion_common::sql_datafusion_err!($ERR);
861        $(
862            let err = err.with_diagnostic($DIAG);
863        )?
864        Err(err)
865    }};
866}
867
868// Exposes a macro to create `DataFusionError::ArrowError` with optional backtrace
869#[macro_export]
870macro_rules! arrow_datafusion_err {
871    ($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
872        let err = DataFusionError::ArrowError(Box::new($ERR), Some(DataFusionError::get_back_trace()));
873        $(
874            let err = err.with_diagnostic($DIAG);
875        )?
876        err
877    }};
878}
879
880// Exposes a macro to create `Err(DataFusionError::ArrowError)` with optional backtrace
881#[macro_export]
882macro_rules! arrow_err {
883    ($ERR:expr $(; diagnostic = $DIAG:expr)?) => {
884    {
885        let err = datafusion_common::arrow_datafusion_err!($ERR);
886        $(
887            let err = err.with_diagnostic($DIAG);
888        )?
889        Err(err)
890    }};
891}
892
893// Exposes a macro to create `DataFusionError::SchemaError` with optional backtrace
894#[macro_export]
895macro_rules! schema_datafusion_err {
896    ($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
897        let err = $crate::error::DataFusionError::SchemaError(
898            Box::new($ERR),
899            Box::new(Some($crate::error::DataFusionError::get_back_trace())),
900        );
901        $(
902            let err = err.with_diagnostic($DIAG);
903        )?
904        err
905    }};
906}
907
908// Exposes a macro to create `Err(DataFusionError::SchemaError)` with optional backtrace
909#[macro_export]
910macro_rules! schema_err {
911    ($ERR:expr $(; diagnostic = $DIAG:expr)?) => {{
912        let err = $crate::error::DataFusionError::SchemaError(
913            Box::new($ERR),
914            Box::new(Some($crate::error::DataFusionError::get_back_trace())),
915        );
916        $(
917            let err = err.with_diagnostic($DIAG);
918        )?
919        Err(err)
920    }
921    };
922}
923
924// To avoid compiler error when using macro in the same crate:
925// macros from the current crate cannot be referred to by absolute paths
926pub use schema_err as _schema_err;
927
928/// Create a "field not found" DataFusion::SchemaError
929pub fn field_not_found<R: Into<TableReference>>(
930    qualifier: Option<R>,
931    name: &str,
932    schema: &DFSchema,
933) -> DataFusionError {
934    schema_datafusion_err!(SchemaError::FieldNotFound {
935        field: Box::new(Column::new(qualifier, name)),
936        valid_fields: schema.columns().to_vec(),
937    })
938}
939
940/// Convenience wrapper over [`field_not_found`] for when there is no qualifier
941pub fn unqualified_field_not_found(name: &str, schema: &DFSchema) -> DataFusionError {
942    schema_datafusion_err!(SchemaError::FieldNotFound {
943        field: Box::new(Column::new_unqualified(name)),
944        valid_fields: schema.columns().to_vec(),
945    })
946}
947
948pub fn add_possible_columns_to_diag(
949    diagnostic: &mut Diagnostic,
950    field: &Column,
951    valid_fields: &[Column],
952) {
953    let field_names: Vec<String> = valid_fields
954        .iter()
955        .filter_map(|f| {
956            if normalized_levenshtein(f.name(), field.name()) >= 0.5 {
957                Some(f.flat_name())
958            } else {
959                None
960            }
961        })
962        .collect();
963
964    for name in field_names {
965        diagnostic.add_note(format!("possible column {name}"), None);
966    }
967}
968
969#[cfg(test)]
970mod test {
971    use super::*;
972
973    use std::mem::size_of;
974    use std::sync::Arc;
975
976    use arrow::error::ArrowError;
977
978    #[test]
979    fn test_error_size() {
980        // Since Errors influence the size of Result which influence the size of the stack
981        // please don't allow this to grow larger
982        assert_eq!(size_of::<SchemaError>(), 40);
983        assert_eq!(size_of::<DataFusionError>(), 40);
984    }
985
986    #[test]
987    fn datafusion_error_to_arrow() {
988        let res = return_arrow_error().unwrap_err();
989        assert!(res
990            .to_string()
991            .starts_with("External error: Error during planning: foo"));
992    }
993
994    #[test]
995    fn arrow_error_to_datafusion() {
996        let res = return_datafusion_error().unwrap_err();
997        assert_eq!(res.strip_backtrace(), "Arrow error: Schema error: bar");
998    }
999
1000    // To pass the test the environment variable RUST_BACKTRACE should be set to 1 to enforce backtrace
1001    #[cfg(feature = "backtrace")]
1002    #[test]
1003    #[allow(clippy::unnecessary_literal_unwrap)]
1004    fn test_enabled_backtrace() {
1005        match std::env::var("RUST_BACKTRACE") {
1006            Ok(val) if val == "1" => {}
1007            _ => panic!("Environment variable RUST_BACKTRACE must be set to 1"),
1008        };
1009
1010        let res: Result<(), DataFusionError> = plan_err!("Err");
1011        let err = res.unwrap_err().to_string();
1012        assert!(err.contains(DataFusionError::BACK_TRACE_SEP));
1013        assert_eq!(
1014            err.split(DataFusionError::BACK_TRACE_SEP)
1015                .collect::<Vec<&str>>()
1016                .first()
1017                .unwrap(),
1018            &"Error during planning: Err"
1019        );
1020        assert!(!err
1021            .split(DataFusionError::BACK_TRACE_SEP)
1022            .collect::<Vec<&str>>()
1023            .get(1)
1024            .unwrap()
1025            .is_empty());
1026    }
1027
1028    #[cfg(not(feature = "backtrace"))]
1029    #[test]
1030    #[allow(clippy::unnecessary_literal_unwrap)]
1031    fn test_disabled_backtrace() {
1032        let res: Result<(), DataFusionError> = plan_err!("Err");
1033        let res = res.unwrap_err().to_string();
1034        assert!(!res.contains(DataFusionError::BACK_TRACE_SEP));
1035        assert_eq!(res, "Error during planning: Err");
1036    }
1037
1038    #[test]
1039    fn test_find_root_error() {
1040        do_root_test(
1041            DataFusionError::Context(
1042                "it happened!".to_string(),
1043                Box::new(DataFusionError::ResourcesExhausted("foo".to_string())),
1044            ),
1045            DataFusionError::ResourcesExhausted("foo".to_string()),
1046        );
1047
1048        do_root_test(
1049            DataFusionError::ArrowError(
1050                Box::new(ArrowError::ExternalError(Box::new(
1051                    DataFusionError::ResourcesExhausted("foo".to_string()),
1052                ))),
1053                None,
1054            ),
1055            DataFusionError::ResourcesExhausted("foo".to_string()),
1056        );
1057
1058        do_root_test(
1059            DataFusionError::External(Box::new(DataFusionError::ResourcesExhausted(
1060                "foo".to_string(),
1061            ))),
1062            DataFusionError::ResourcesExhausted("foo".to_string()),
1063        );
1064
1065        do_root_test(
1066            DataFusionError::External(Box::new(ArrowError::ExternalError(Box::new(
1067                DataFusionError::ResourcesExhausted("foo".to_string()),
1068            )))),
1069            DataFusionError::ResourcesExhausted("foo".to_string()),
1070        );
1071
1072        do_root_test(
1073            DataFusionError::ArrowError(
1074                Box::new(ArrowError::ExternalError(Box::new(
1075                    ArrowError::ExternalError(Box::new(
1076                        DataFusionError::ResourcesExhausted("foo".to_string()),
1077                    )),
1078                ))),
1079                None,
1080            ),
1081            DataFusionError::ResourcesExhausted("foo".to_string()),
1082        );
1083
1084        do_root_test(
1085            DataFusionError::External(Box::new(Arc::new(
1086                DataFusionError::ResourcesExhausted("foo".to_string()),
1087            ))),
1088            DataFusionError::ResourcesExhausted("foo".to_string()),
1089        );
1090
1091        do_root_test(
1092            DataFusionError::External(Box::new(Arc::new(ArrowError::ExternalError(
1093                Box::new(DataFusionError::ResourcesExhausted("foo".to_string())),
1094            )))),
1095            DataFusionError::ResourcesExhausted("foo".to_string()),
1096        );
1097    }
1098
1099    #[test]
1100    #[allow(clippy::unnecessary_literal_unwrap)]
1101    fn test_make_error_parse_input() {
1102        let res: Result<(), DataFusionError> = plan_err!("Err");
1103        let res = res.unwrap_err();
1104        assert_eq!(res.strip_backtrace(), "Error during planning: Err");
1105
1106        let extra1 = "extra1";
1107        let extra2 = "extra2";
1108
1109        let res: Result<(), DataFusionError> = plan_err!("Err {} {}", extra1, extra2);
1110        let res = res.unwrap_err();
1111        assert_eq!(
1112            res.strip_backtrace(),
1113            "Error during planning: Err extra1 extra2"
1114        );
1115
1116        let res: Result<(), DataFusionError> =
1117            plan_err!("Err {:?} {:#?}", extra1, extra2);
1118        let res = res.unwrap_err();
1119        assert_eq!(
1120            res.strip_backtrace(),
1121            "Error during planning: Err \"extra1\" \"extra2\""
1122        );
1123
1124        let res: Result<(), DataFusionError> = plan_err!("Err {extra1} {extra2}");
1125        let res = res.unwrap_err();
1126        assert_eq!(
1127            res.strip_backtrace(),
1128            "Error during planning: Err extra1 extra2"
1129        );
1130
1131        let res: Result<(), DataFusionError> = plan_err!("Err {extra1:?} {extra2:#?}");
1132        let res = res.unwrap_err();
1133        assert_eq!(
1134            res.strip_backtrace(),
1135            "Error during planning: Err \"extra1\" \"extra2\""
1136        );
1137    }
1138
1139    #[test]
1140    fn external_error() {
1141        // assert not wrapping DataFusionError
1142        let generic_error: GenericError =
1143            Box::new(DataFusionError::Plan("test".to_string()));
1144        let datafusion_error: DataFusionError = generic_error.into();
1145        println!("{}", datafusion_error.strip_backtrace());
1146        assert_eq!(
1147            datafusion_error.strip_backtrace(),
1148            "Error during planning: test"
1149        );
1150
1151        // assert wrapping other Error
1152        let generic_error: GenericError = Box::new(io::Error::other("io error"));
1153        let datafusion_error: DataFusionError = generic_error.into();
1154        println!("{}", datafusion_error.strip_backtrace());
1155        assert_eq!(
1156            datafusion_error.strip_backtrace(),
1157            "External error: io error"
1158        );
1159    }
1160
1161    #[test]
1162    fn external_error_no_recursive() {
1163        let generic_error_1: GenericError = Box::new(io::Error::other("io error"));
1164        let external_error_1: DataFusionError = generic_error_1.into();
1165        let generic_error_2: GenericError = Box::new(external_error_1);
1166        let external_error_2: DataFusionError = generic_error_2.into();
1167
1168        println!("{external_error_2}");
1169        assert!(external_error_2
1170            .to_string()
1171            .starts_with("External error: io error"));
1172    }
1173
1174    /// Model what happens when implementing SendableRecordBatchStream:
1175    /// DataFusion code needs to return an ArrowError
1176    fn return_arrow_error() -> arrow::error::Result<()> {
1177        // Expect the '?' to work
1178        Err(DataFusionError::Plan("foo".to_string()).into())
1179    }
1180
1181    /// Model what happens when using arrow kernels in DataFusion
1182    /// code: need to turn an ArrowError into a DataFusionError
1183    fn return_datafusion_error() -> Result<()> {
1184        // Expect the '?' to work
1185        Err(ArrowError::SchemaError("bar".to_string()).into())
1186    }
1187
1188    fn do_root_test(e: DataFusionError, exp: DataFusionError) {
1189        let e = e.find_root();
1190
1191        // DataFusionError does not implement Eq, so we use a string comparison + some cheap "same variant" test instead
1192        assert_eq!(e.strip_backtrace(), exp.strip_backtrace());
1193        assert_eq!(std::mem::discriminant(e), std::mem::discriminant(&exp),)
1194    }
1195
1196    #[test]
1197    fn test_iter() {
1198        let err = DataFusionError::Collection(vec![
1199            DataFusionError::Plan("a".to_string()),
1200            DataFusionError::Collection(vec![
1201                DataFusionError::Plan("b".to_string()),
1202                DataFusionError::Plan("c".to_string()),
1203            ]),
1204        ]);
1205        let errs = err.iter().collect::<Vec<_>>();
1206        assert_eq!(errs.len(), 3);
1207        assert_eq!(errs[0].strip_backtrace(), "Error during planning: a");
1208        assert_eq!(errs[1].strip_backtrace(), "Error during planning: b");
1209        assert_eq!(errs[2].strip_backtrace(), "Error during planning: c");
1210    }
1211}