Skip to main content

lance_core/
error.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::fmt;
5
6use arrow_schema::ArrowError;
7use snafu::{IntoError as _, Location, Snafu};
8
9type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
10
11/// Error for when a requested field is not found in a schema.
12///
13/// This error computes suggestions lazily (only when displayed) to avoid
14/// computing Levenshtein distance when the error is created but never shown.
15#[derive(Debug)]
16pub struct FieldNotFoundError {
17    pub field_name: String,
18    pub candidates: Vec<String>,
19}
20
21impl fmt::Display for FieldNotFoundError {
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        write!(f, "Field '{}' not found.", self.field_name)?;
24        let suggestion =
25            crate::levenshtein::find_best_suggestion(&self.field_name, &self.candidates);
26        if let Some(suggestion) = suggestion {
27            write!(f, " Did you mean '{}'?", suggestion)?;
28        }
29        write!(f, "\nAvailable fields: [")?;
30        for (i, candidate) in self.candidates.iter().take(10).enumerate() {
31            if i > 0 {
32                write!(f, ", ")?;
33            }
34            write!(f, "'{}'", candidate)?;
35        }
36        if self.candidates.len() > 10 {
37            let remaining = self.candidates.len() - 10;
38            write!(f, ", ... and {} more]", remaining)?;
39        } else {
40            write!(f, "]")?;
41        }
42        Ok(())
43    }
44}
45
46impl std::error::Error for FieldNotFoundError {}
47
48/// Allocates error on the heap and then places `e` into it.
49#[inline]
50pub fn box_error(e: impl std::error::Error + Send + Sync + 'static) -> BoxedError {
51    Box::new(e)
52}
53
54#[derive(Debug, Snafu)]
55#[snafu(visibility(pub))]
56pub enum Error {
57    #[snafu(display("Invalid user input: {source}, {location}"))]
58    InvalidInput {
59        source: BoxedError,
60        #[snafu(implicit)]
61        location: Location,
62    },
63    #[snafu(display("Dataset already exists: {uri}, {location}"))]
64    DatasetAlreadyExists {
65        uri: String,
66        #[snafu(implicit)]
67        location: Location,
68    },
69    #[snafu(display("Append with different schema: {difference}, location: {location}"))]
70    SchemaMismatch {
71        difference: String,
72        #[snafu(implicit)]
73        location: Location,
74    },
75    #[snafu(display("Dataset at path {path} was not found: {source}, {location}"))]
76    DatasetNotFound {
77        path: String,
78        source: BoxedError,
79        #[snafu(implicit)]
80        location: Location,
81    },
82    #[snafu(display("Encountered corrupt file {path}: {source}, {location}"))]
83    CorruptFile {
84        path: object_store::path::Path,
85        source: BoxedError,
86        #[snafu(implicit)]
87        location: Location,
88        // TODO: add backtrace?
89    },
90    #[snafu(display("Not supported: {source}, {location}"))]
91    NotSupported {
92        source: BoxedError,
93        #[snafu(implicit)]
94        location: Location,
95    },
96    #[snafu(display("Commit conflict for version {version}: {source}, {location}"))]
97    CommitConflict {
98        version: u64,
99        source: BoxedError,
100        #[snafu(implicit)]
101        location: Location,
102    },
103    #[snafu(display("Incompatible transaction: {source}, {location}"))]
104    IncompatibleTransaction {
105        source: BoxedError,
106        #[snafu(implicit)]
107        location: Location,
108    },
109    #[snafu(display("Retryable commit conflict for version {version}: {source}, {location}"))]
110    RetryableCommitConflict {
111        version: u64,
112        source: BoxedError,
113        #[snafu(implicit)]
114        location: Location,
115    },
116    #[snafu(display("Too many concurrent writers. {message}, {location}"))]
117    TooMuchWriteContention {
118        message: String,
119        #[snafu(implicit)]
120        location: Location,
121    },
122    #[snafu(display(
123        "Encountered internal error. Please file a bug report at https://github.com/lance-format/lance/issues. {message}, {location}"
124    ))]
125    Internal {
126        message: String,
127        #[snafu(implicit)]
128        location: Location,
129    },
130    #[snafu(display("A prerequisite task failed: {message}, {location}"))]
131    PrerequisiteFailed {
132        message: String,
133        #[snafu(implicit)]
134        location: Location,
135    },
136    #[snafu(display("Unprocessable: {message}, {location}"))]
137    Unprocessable {
138        message: String,
139        #[snafu(implicit)]
140        location: Location,
141    },
142    #[snafu(display("LanceError(Arrow): {message}, {location}"))]
143    Arrow {
144        message: String,
145        #[snafu(implicit)]
146        location: Location,
147    },
148    #[snafu(display("LanceError(Schema): {message}, {location}"))]
149    Schema {
150        message: String,
151        #[snafu(implicit)]
152        location: Location,
153    },
154    #[snafu(display("Not found: {uri}, {location}"))]
155    NotFound {
156        uri: String,
157        #[snafu(implicit)]
158        location: Location,
159    },
160    #[snafu(display("LanceError(IO): {source}, {location}"))]
161    IO {
162        source: BoxedError,
163        #[snafu(implicit)]
164        location: Location,
165    },
166    #[snafu(display("LanceError(Index): {message}, {location}"))]
167    Index {
168        message: String,
169        #[snafu(implicit)]
170        location: Location,
171    },
172    #[snafu(display("Lance index not found: {identity}, {location}"))]
173    IndexNotFound {
174        identity: String,
175        #[snafu(implicit)]
176        location: Location,
177    },
178    #[snafu(display("Cannot infer storage location from: {message}"))]
179    InvalidTableLocation { message: String },
180    /// Stream early stop
181    Stop,
182    #[snafu(display("Wrapped error: {error}, {location}"))]
183    Wrapped {
184        error: BoxedError,
185        #[snafu(implicit)]
186        location: Location,
187    },
188    #[snafu(display("Cloned error: {message}, {location}"))]
189    Cloned {
190        message: String,
191        #[snafu(implicit)]
192        location: Location,
193    },
194    #[snafu(display("Query Execution error: {message}, {location}"))]
195    Execution {
196        message: String,
197        #[snafu(implicit)]
198        location: Location,
199    },
200    #[snafu(display("Ref is invalid: {message}"))]
201    InvalidRef { message: String },
202    #[snafu(display("Ref conflict error: {message}"))]
203    RefConflict { message: String },
204    #[snafu(display("Ref not found error: {message}"))]
205    RefNotFound { message: String },
206    #[snafu(display("Cleanup error: {message}"))]
207    Cleanup { message: String },
208    #[snafu(display("Version not found error: {message}"))]
209    VersionNotFound { message: String },
210    #[snafu(display("Version conflict error: {message}"))]
211    VersionConflict {
212        message: String,
213        major_version: u16,
214        minor_version: u16,
215        #[snafu(implicit)]
216        location: Location,
217    },
218    #[snafu(display("Namespace error: {source}, {location}"))]
219    Namespace {
220        source: BoxedError,
221        #[snafu(implicit)]
222        location: Location,
223    },
224    /// External error passed through from user code.
225    ///
226    /// This variant preserves errors that users pass into Lance APIs (e.g., via streams
227    /// with custom error types). The original error can be recovered using [`Error::into_external`]
228    /// or inspected using [`Error::external_source`].
229    #[snafu(transparent)]
230    External { source: BoxedError },
231
232    /// A requested field was not found in a schema.
233    #[snafu(transparent)]
234    FieldNotFound { source: FieldNotFoundError },
235}
236
237impl Error {
238    #[track_caller]
239    pub fn corrupt_file(path: object_store::path::Path, message: impl Into<String>) -> Self {
240        CorruptFileSnafu { path }.into_error(message.into().into())
241    }
242
243    #[track_caller]
244    pub fn invalid_input(message: impl Into<String>) -> Self {
245        InvalidInputSnafu.into_error(message.into().into())
246    }
247
248    #[track_caller]
249    pub fn invalid_input_source(source: BoxedError) -> Self {
250        InvalidInputSnafu.into_error(source)
251    }
252
253    #[track_caller]
254    pub fn io(message: impl Into<String>) -> Self {
255        IOSnafu.into_error(message.into().into())
256    }
257
258    #[track_caller]
259    pub fn io_source(source: BoxedError) -> Self {
260        IOSnafu.into_error(source)
261    }
262
263    #[track_caller]
264    pub fn dataset_already_exists(uri: impl Into<String>) -> Self {
265        DatasetAlreadyExistsSnafu { uri: uri.into() }.build()
266    }
267
268    #[track_caller]
269    pub fn dataset_not_found(path: impl Into<String>, source: BoxedError) -> Self {
270        DatasetNotFoundSnafu { path: path.into() }.into_error(source)
271    }
272
273    #[track_caller]
274    pub fn version_conflict(
275        message: impl Into<String>,
276        major_version: u16,
277        minor_version: u16,
278    ) -> Self {
279        VersionConflictSnafu {
280            message: message.into(),
281            major_version,
282            minor_version,
283        }
284        .build()
285    }
286
287    #[track_caller]
288    pub fn not_found(uri: impl Into<String>) -> Self {
289        NotFoundSnafu { uri: uri.into() }.build()
290    }
291
292    #[track_caller]
293    pub fn wrapped(error: BoxedError) -> Self {
294        WrappedSnafu { error }.build()
295    }
296
297    #[track_caller]
298    pub fn schema(message: impl Into<String>) -> Self {
299        SchemaSnafu {
300            message: message.into(),
301        }
302        .build()
303    }
304
305    #[track_caller]
306    pub fn not_supported(message: impl Into<String>) -> Self {
307        NotSupportedSnafu.into_error(message.into().into())
308    }
309
310    #[track_caller]
311    pub fn not_supported_source(source: BoxedError) -> Self {
312        NotSupportedSnafu.into_error(source)
313    }
314
315    #[track_caller]
316    pub fn internal(message: impl Into<String>) -> Self {
317        InternalSnafu {
318            message: message.into(),
319        }
320        .build()
321    }
322
323    #[track_caller]
324    pub fn namespace(message: impl Into<String>) -> Self {
325        NamespaceSnafu.into_error(message.into().into())
326    }
327
328    #[track_caller]
329    pub fn namespace_source(source: Box<dyn std::error::Error + Send + Sync + 'static>) -> Self {
330        NamespaceSnafu.into_error(source)
331    }
332
333    #[track_caller]
334    pub fn arrow(message: impl Into<String>) -> Self {
335        ArrowSnafu {
336            message: message.into(),
337        }
338        .build()
339    }
340
341    #[track_caller]
342    pub fn execution(message: impl Into<String>) -> Self {
343        ExecutionSnafu {
344            message: message.into(),
345        }
346        .build()
347    }
348
349    #[track_caller]
350    pub fn cloned(message: impl Into<String>) -> Self {
351        ClonedSnafu {
352            message: message.into(),
353        }
354        .build()
355    }
356
357    #[track_caller]
358    pub fn schema_mismatch(difference: impl Into<String>) -> Self {
359        SchemaMismatchSnafu {
360            difference: difference.into(),
361        }
362        .build()
363    }
364
365    #[track_caller]
366    pub fn unprocessable(message: impl Into<String>) -> Self {
367        UnprocessableSnafu {
368            message: message.into(),
369        }
370        .build()
371    }
372
373    #[track_caller]
374    pub fn too_much_write_contention(message: impl Into<String>) -> Self {
375        TooMuchWriteContentionSnafu {
376            message: message.into(),
377        }
378        .build()
379    }
380
381    #[track_caller]
382    pub fn prerequisite_failed(message: impl Into<String>) -> Self {
383        PrerequisiteFailedSnafu {
384            message: message.into(),
385        }
386        .build()
387    }
388
389    #[track_caller]
390    pub fn index(message: impl Into<String>) -> Self {
391        IndexSnafu {
392            message: message.into(),
393        }
394        .build()
395    }
396
397    #[track_caller]
398    pub fn index_not_found(identity: impl Into<String>) -> Self {
399        IndexNotFoundSnafu {
400            identity: identity.into(),
401        }
402        .build()
403    }
404
405    #[track_caller]
406    pub fn commit_conflict_source(version: u64, source: BoxedError) -> Self {
407        CommitConflictSnafu { version }.into_error(source)
408    }
409
410    #[track_caller]
411    pub fn retryable_commit_conflict_source(version: u64, source: BoxedError) -> Self {
412        RetryableCommitConflictSnafu { version }.into_error(source)
413    }
414
415    #[track_caller]
416    pub fn incompatible_transaction_source(source: BoxedError) -> Self {
417        IncompatibleTransactionSnafu.into_error(source)
418    }
419
420    /// Create an External error from a boxed error source.
421    pub fn external(source: BoxedError) -> Self {
422        Self::External { source }
423    }
424
425    /// Create a FieldNotFound error with the given field name and available candidates.
426    pub fn field_not_found(field_name: impl Into<String>, candidates: Vec<String>) -> Self {
427        Self::FieldNotFound {
428            source: FieldNotFoundError {
429                field_name: field_name.into(),
430                candidates,
431            },
432        }
433    }
434
435    /// Returns a reference to the external error source if this is an `External` variant.
436    ///
437    /// This allows downcasting to recover the original error type.
438    pub fn external_source(&self) -> Option<&BoxedError> {
439        match self {
440            Self::External { source } => Some(source),
441            _ => None,
442        }
443    }
444
445    /// Consumes the error and returns the external source if this is an `External` variant.
446    ///
447    /// Returns `Err(self)` if this is not an `External` variant, allowing for chained handling.
448    pub fn into_external(self) -> std::result::Result<BoxedError, Self> {
449        match self {
450            Self::External { source } => Ok(source),
451            other => Err(other),
452        }
453    }
454}
455
456pub trait LanceOptionExt<T> {
457    /// Unwraps an option, returning an internal error if the option is None.
458    ///
459    /// Can be used when an option is expected to have a value.
460    fn expect_ok(self) -> Result<T>;
461}
462
463impl<T> LanceOptionExt<T> for Option<T> {
464    #[track_caller]
465    fn expect_ok(self) -> Result<T> {
466        self.ok_or_else(|| Error::internal("Expected option to have value"))
467    }
468}
469
470pub type Result<T> = std::result::Result<T, Error>;
471pub type ArrowResult<T> = std::result::Result<T, ArrowError>;
472#[cfg(feature = "datafusion")]
473pub type DataFusionResult<T> = std::result::Result<T, datafusion_common::DataFusionError>;
474
475impl From<ArrowError> for Error {
476    #[track_caller]
477    fn from(e: ArrowError) -> Self {
478        match e {
479            ArrowError::ExternalError(source) => {
480                // Try to downcast to lance_core::Error first to recover the original
481                match source.downcast::<Self>() {
482                    Ok(lance_err) => *lance_err,
483                    Err(source) => Self::External { source },
484                }
485            }
486            other => Self::arrow(other.to_string()),
487        }
488    }
489}
490
491impl From<&ArrowError> for Error {
492    #[track_caller]
493    fn from(e: &ArrowError) -> Self {
494        Self::arrow(e.to_string())
495    }
496}
497
498impl From<std::io::Error> for Error {
499    #[track_caller]
500    fn from(e: std::io::Error) -> Self {
501        Self::io_source(box_error(e))
502    }
503}
504
505impl From<object_store::Error> for Error {
506    #[track_caller]
507    fn from(e: object_store::Error) -> Self {
508        Self::io_source(box_error(e))
509    }
510}
511
512impl From<prost::DecodeError> for Error {
513    #[track_caller]
514    fn from(e: prost::DecodeError) -> Self {
515        Self::io_source(box_error(e))
516    }
517}
518
519impl From<prost::EncodeError> for Error {
520    #[track_caller]
521    fn from(e: prost::EncodeError) -> Self {
522        Self::io_source(box_error(e))
523    }
524}
525
526impl From<prost::UnknownEnumValue> for Error {
527    #[track_caller]
528    fn from(e: prost::UnknownEnumValue) -> Self {
529        Self::io_source(box_error(e))
530    }
531}
532
533impl From<tokio::task::JoinError> for Error {
534    #[track_caller]
535    fn from(e: tokio::task::JoinError) -> Self {
536        Self::io_source(box_error(e))
537    }
538}
539
540impl From<object_store::path::Error> for Error {
541    #[track_caller]
542    fn from(e: object_store::path::Error) -> Self {
543        Self::io_source(box_error(e))
544    }
545}
546
547impl From<url::ParseError> for Error {
548    #[track_caller]
549    fn from(e: url::ParseError) -> Self {
550        Self::io_source(box_error(e))
551    }
552}
553
554impl From<serde_json::Error> for Error {
555    #[track_caller]
556    fn from(e: serde_json::Error) -> Self {
557        Self::arrow(e.to_string())
558    }
559}
560
561impl From<Error> for ArrowError {
562    fn from(value: Error) -> Self {
563        match value {
564            // Pass through external errors directly
565            Error::External { source } => Self::ExternalError(source),
566            // Preserve schema errors with their specific type
567            Error::Schema { message, .. } => Self::SchemaError(message),
568            // Wrap all other lance errors so they can be recovered
569            e => Self::ExternalError(Box::new(e)),
570        }
571    }
572}
573
574#[cfg(feature = "datafusion")]
575impl From<datafusion_sql::sqlparser::parser::ParserError> for Error {
576    #[track_caller]
577    fn from(e: datafusion_sql::sqlparser::parser::ParserError) -> Self {
578        Self::io_source(box_error(e))
579    }
580}
581
582#[cfg(feature = "datafusion")]
583impl From<datafusion_sql::sqlparser::tokenizer::TokenizerError> for Error {
584    #[track_caller]
585    fn from(e: datafusion_sql::sqlparser::tokenizer::TokenizerError) -> Self {
586        Self::io_source(box_error(e))
587    }
588}
589
590#[cfg(feature = "datafusion")]
591impl From<Error> for datafusion_common::DataFusionError {
592    #[track_caller]
593    fn from(e: Error) -> Self {
594        Self::External(Box::new(e))
595    }
596}
597
598#[cfg(feature = "datafusion")]
599impl From<datafusion_common::DataFusionError> for Error {
600    #[track_caller]
601    fn from(e: datafusion_common::DataFusionError) -> Self {
602        match e {
603            datafusion_common::DataFusionError::SQL(..)
604            | datafusion_common::DataFusionError::Plan(..)
605            | datafusion_common::DataFusionError::Configuration(..) => {
606                Self::invalid_input_source(box_error(e))
607            }
608            datafusion_common::DataFusionError::SchemaError(..) => Self::schema(e.to_string()),
609            datafusion_common::DataFusionError::ArrowError(arrow_err, _) => Self::from(*arrow_err),
610            datafusion_common::DataFusionError::NotImplemented(..) => {
611                Self::not_supported_source(box_error(e))
612            }
613            datafusion_common::DataFusionError::Execution(..) => Self::execution(e.to_string()),
614            datafusion_common::DataFusionError::External(source) => {
615                // Try to downcast to lance_core::Error first
616                match source.downcast::<Self>() {
617                    Ok(lance_err) => *lance_err,
618                    Err(source) => Self::External { source },
619                }
620            }
621            _ => Self::io_source(box_error(e)),
622        }
623    }
624}
625
626// This is a bit odd but some object_store functions only accept
627// Stream<Result<T, ObjectStoreError>> and so we need to convert
628// to ObjectStoreError to call the methods.
629impl From<Error> for object_store::Error {
630    fn from(err: Error) -> Self {
631        Self::Generic {
632            store: "N/A",
633            source: Box::new(err),
634        }
635    }
636}
637
638#[track_caller]
639pub fn get_caller_location() -> &'static std::panic::Location<'static> {
640    std::panic::Location::caller()
641}
642
643/// Wrap an error in a new error type that implements Clone
644///
645/// This is useful when two threads/streams share a common fallible source
646/// The base error will always have the full error.  Any cloned results will
647/// only have Error::Cloned with the to_string of the base error.
648pub struct CloneableError(pub Error);
649
650impl Clone for CloneableError {
651    #[track_caller]
652    fn clone(&self) -> Self {
653        Self(Error::cloned(self.0.to_string()))
654    }
655}
656
657#[derive(Clone)]
658pub struct CloneableResult<T: Clone>(pub std::result::Result<T, CloneableError>);
659
660impl<T: Clone> From<Result<T>> for CloneableResult<T> {
661    fn from(result: Result<T>) -> Self {
662        Self(result.map_err(CloneableError))
663    }
664}
665
666#[cfg(test)]
667mod test {
668    use super::*;
669    use std::fmt;
670
671    #[test]
672    fn test_caller_location_capture() {
673        let current_fn = get_caller_location();
674        // make sure ? captures the correct location
675        // .into() WILL NOT capture the correct location
676        let f: Box<dyn Fn() -> Result<()>> = Box::new(|| {
677            Err(object_store::Error::Generic {
678                store: "",
679                source: "".into(),
680            })?;
681            Ok(())
682        });
683        match f().unwrap_err() {
684            Error::IO { location, .. } => {
685                // +4 is the beginning of object_store::Error::Generic...
686                assert_eq!(location.line(), current_fn.line() + 4, "{}", location)
687            }
688            #[allow(unreachable_patterns)]
689            _ => panic!("expected ObjectStore error"),
690        }
691    }
692
693    #[derive(Debug)]
694    struct MyCustomError {
695        code: i32,
696        message: String,
697    }
698
699    impl fmt::Display for MyCustomError {
700        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
701            write!(f, "MyCustomError({}): {}", self.code, self.message)
702        }
703    }
704
705    impl std::error::Error for MyCustomError {}
706
707    #[test]
708    fn test_external_error_creation() {
709        let custom_err = MyCustomError {
710            code: 42,
711            message: "test error".to_string(),
712        };
713        let err = Error::external(Box::new(custom_err));
714
715        match &err {
716            Error::External { source } => {
717                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
718                assert_eq!(recovered.code, 42);
719                assert_eq!(recovered.message, "test error");
720            }
721            _ => panic!("Expected External variant"),
722        }
723    }
724
725    #[test]
726    fn test_external_source_method() {
727        let custom_err = MyCustomError {
728            code: 123,
729            message: "source test".to_string(),
730        };
731        let err = Error::external(Box::new(custom_err));
732
733        let source = err.external_source().expect("should have external source");
734        let recovered = source.downcast_ref::<MyCustomError>().unwrap();
735        assert_eq!(recovered.code, 123);
736
737        // Test that non-External variants return None
738        let io_err = Error::io("test");
739        assert!(io_err.external_source().is_none());
740    }
741
742    #[test]
743    fn test_into_external_method() {
744        let custom_err = MyCustomError {
745            code: 456,
746            message: "into test".to_string(),
747        };
748        let err = Error::external(Box::new(custom_err));
749
750        match err.into_external() {
751            Ok(source) => {
752                let recovered = source.downcast::<MyCustomError>().unwrap();
753                assert_eq!(recovered.code, 456);
754            }
755            Err(_) => panic!("Expected Ok"),
756        }
757
758        // Test that non-External variants return Err(self)
759        let io_err = Error::io("test");
760        match io_err.into_external() {
761            Err(Error::IO { .. }) => {}
762            _ => panic!("Expected Err with IO variant"),
763        }
764    }
765
766    #[test]
767    fn test_arrow_external_error_conversion() {
768        let custom_err = MyCustomError {
769            code: 789,
770            message: "arrow test".to_string(),
771        };
772        let arrow_err = ArrowError::ExternalError(Box::new(custom_err));
773        let lance_err: Error = arrow_err.into();
774
775        match lance_err {
776            Error::External { source } => {
777                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
778                assert_eq!(recovered.code, 789);
779            }
780            _ => panic!("Expected External variant, got {:?}", lance_err),
781        }
782    }
783
784    #[test]
785    fn test_external_to_arrow_roundtrip() {
786        let custom_err = MyCustomError {
787            code: 999,
788            message: "roundtrip".to_string(),
789        };
790        let lance_err = Error::external(Box::new(custom_err));
791        let arrow_err: ArrowError = lance_err.into();
792
793        match arrow_err {
794            ArrowError::ExternalError(source) => {
795                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
796                assert_eq!(recovered.code, 999);
797            }
798            _ => panic!("Expected ExternalError variant"),
799        }
800    }
801
802    #[cfg(feature = "datafusion")]
803    #[test]
804    fn test_datafusion_external_error_conversion() {
805        let custom_err = MyCustomError {
806            code: 111,
807            message: "datafusion test".to_string(),
808        };
809        let df_err = datafusion_common::DataFusionError::External(Box::new(custom_err));
810        let lance_err: Error = df_err.into();
811
812        match lance_err {
813            Error::External { source } => {
814                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
815                assert_eq!(recovered.code, 111);
816            }
817            _ => panic!("Expected External variant"),
818        }
819    }
820
821    #[cfg(feature = "datafusion")]
822    #[test]
823    fn test_datafusion_arrow_external_error_conversion() {
824        // Test the nested case: ArrowError::ExternalError inside DataFusionError::ArrowError
825        let custom_err = MyCustomError {
826            code: 222,
827            message: "nested test".to_string(),
828        };
829        let arrow_err = ArrowError::ExternalError(Box::new(custom_err));
830        let df_err = datafusion_common::DataFusionError::ArrowError(Box::new(arrow_err), None);
831        let lance_err: Error = df_err.into();
832
833        match lance_err {
834            Error::External { source } => {
835                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
836                assert_eq!(recovered.code, 222);
837            }
838            _ => panic!("Expected External variant, got {:?}", lance_err),
839        }
840    }
841
842    /// Test that lance_core::Error round-trips through ArrowError.
843    ///
844    /// This simulates the case where a user defines an iterator in terms of
845    /// lance_core::Error, and the error goes through Arrow's error type
846    /// (e.g., via RecordBatchIterator) before being converted back.
847    #[test]
848    fn test_lance_error_roundtrip_through_arrow() {
849        let original = Error::invalid_input("test validation error");
850
851        // Simulate what happens when using ? in an Arrow context
852        let arrow_err: ArrowError = original.into();
853
854        // Convert back to lance error (as happens when Lance consumes the stream)
855        let recovered: Error = arrow_err.into();
856
857        // Should get back the original lance error directly (not wrapped in External)
858        match recovered {
859            Error::InvalidInput { .. } => {
860                assert!(recovered.to_string().contains("test validation error"));
861            }
862            _ => panic!("Expected InvalidInput variant, got {:?}", recovered),
863        }
864    }
865
866    /// Test that lance_core::Error round-trips through DataFusionError.
867    ///
868    /// This simulates the case where a user defines a stream in terms of
869    /// lance_core::Error, and the error goes through DataFusion's error type
870    /// (e.g., via SendableRecordBatchStream) before being converted back.
871    #[cfg(feature = "datafusion")]
872    #[test]
873    fn test_lance_error_roundtrip_through_datafusion() {
874        let original = Error::invalid_input("test validation error");
875
876        // Simulate what happens when using ? in a DataFusion context
877        let df_err: datafusion_common::DataFusionError = original.into();
878
879        // Convert back to lance error (as happens when Lance consumes the stream)
880        let recovered: Error = df_err.into();
881
882        // Should get back the original lance error directly (not wrapped in External)
883        match recovered {
884            Error::InvalidInput { .. } => {
885                assert!(recovered.to_string().contains("test validation error"));
886            }
887            _ => panic!("Expected InvalidInput variant, got {:?}", recovered),
888        }
889    }
890}