Skip to main content

lance_core/
error.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::fmt;
5
6use arrow_schema::ArrowError;
7use snafu::{IntoError as _, Location, Snafu};
8
9type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
10
11/// Error for when a requested field is not found in a schema.
12///
13/// This error computes suggestions lazily (only when displayed) to avoid
14/// computing Levenshtein distance when the error is created but never shown.
15#[derive(Debug)]
16pub struct FieldNotFoundError {
17    pub field_name: String,
18    pub candidates: Vec<String>,
19}
20
21impl fmt::Display for FieldNotFoundError {
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        write!(f, "Field '{}' not found.", self.field_name)?;
24        let suggestion =
25            crate::levenshtein::find_best_suggestion(&self.field_name, &self.candidates);
26        if let Some(suggestion) = suggestion {
27            write!(f, " Did you mean '{}'?", suggestion)?;
28        }
29        write!(f, "\nAvailable fields: [")?;
30        for (i, candidate) in self.candidates.iter().take(10).enumerate() {
31            if i > 0 {
32                write!(f, ", ")?;
33            }
34            write!(f, "'{}'", candidate)?;
35        }
36        if self.candidates.len() > 10 {
37            let remaining = self.candidates.len() - 10;
38            write!(f, ", ... and {} more]", remaining)?;
39        } else {
40            write!(f, "]")?;
41        }
42        Ok(())
43    }
44}
45
46impl std::error::Error for FieldNotFoundError {}
47
48/// Allocates error on the heap and then places `e` into it.
49#[inline]
50pub fn box_error(e: impl std::error::Error + Send + Sync + 'static) -> BoxedError {
51    Box::new(e)
52}
53
54#[derive(Debug, Snafu)]
55#[snafu(visibility(pub))]
56pub enum Error {
57    #[snafu(display("Invalid user input: {source}, {location}"))]
58    InvalidInput {
59        source: BoxedError,
60        #[snafu(implicit)]
61        location: Location,
62    },
63    #[snafu(display("Dataset already exists: {uri}, {location}"))]
64    DatasetAlreadyExists {
65        uri: String,
66        #[snafu(implicit)]
67        location: Location,
68    },
69    #[snafu(display("Append with different schema: {difference}, location: {location}"))]
70    SchemaMismatch {
71        difference: String,
72        #[snafu(implicit)]
73        location: Location,
74    },
75    #[snafu(display("Dataset at path {path} was not found: {source}, {location}"))]
76    DatasetNotFound {
77        path: String,
78        source: BoxedError,
79        #[snafu(implicit)]
80        location: Location,
81    },
82    #[snafu(display("Encountered corrupt file {path}: {source}, {location}"))]
83    CorruptFile {
84        path: object_store::path::Path,
85        source: BoxedError,
86        #[snafu(implicit)]
87        location: Location,
88        // TODO: add backtrace?
89    },
90    #[snafu(display("Not supported: {source}, {location}"))]
91    NotSupported {
92        source: BoxedError,
93        #[snafu(implicit)]
94        location: Location,
95    },
96    #[snafu(display("Commit conflict for version {version}: {source}, {location}"))]
97    CommitConflict {
98        version: u64,
99        source: BoxedError,
100        #[snafu(implicit)]
101        location: Location,
102    },
103    #[snafu(display("Incompatible transaction: {source}, {location}"))]
104    IncompatibleTransaction {
105        source: BoxedError,
106        #[snafu(implicit)]
107        location: Location,
108    },
109    #[snafu(display("Retryable commit conflict for version {version}: {source}, {location}"))]
110    RetryableCommitConflict {
111        version: u64,
112        source: BoxedError,
113        #[snafu(implicit)]
114        location: Location,
115    },
116    #[snafu(display("Too many concurrent writers. {message}, {location}"))]
117    TooMuchWriteContention {
118        message: String,
119        #[snafu(implicit)]
120        location: Location,
121    },
122    #[snafu(display("Operation timed out: {message}, {location}"))]
123    Timeout {
124        message: String,
125        #[snafu(implicit)]
126        location: Location,
127    },
128    #[snafu(display(
129        "Encountered internal error. Please file a bug report at https://github.com/lance-format/lance/issues. {message}, {location}"
130    ))]
131    Internal {
132        message: String,
133        #[snafu(implicit)]
134        location: Location,
135    },
136    #[snafu(display("A prerequisite task failed: {message}, {location}"))]
137    PrerequisiteFailed {
138        message: String,
139        #[snafu(implicit)]
140        location: Location,
141    },
142    #[snafu(display("Unprocessable: {message}, {location}"))]
143    Unprocessable {
144        message: String,
145        #[snafu(implicit)]
146        location: Location,
147    },
148    #[snafu(display("LanceError(Arrow): {message}, {location}"))]
149    Arrow {
150        message: String,
151        #[snafu(implicit)]
152        location: Location,
153    },
154    #[snafu(display("LanceError(Schema): {message}, {location}"))]
155    Schema {
156        message: String,
157        #[snafu(implicit)]
158        location: Location,
159    },
160    #[snafu(display("Not found: {uri}, {location}"))]
161    NotFound {
162        uri: String,
163        #[snafu(implicit)]
164        location: Location,
165    },
166    #[snafu(display("LanceError(IO): {source}, {location}"))]
167    IO {
168        source: BoxedError,
169        #[snafu(implicit)]
170        location: Location,
171    },
172    #[snafu(display("LanceError(Index): {message}, {location}"))]
173    Index {
174        message: String,
175        #[snafu(implicit)]
176        location: Location,
177    },
178    #[snafu(display("Lance index not found: {identity}, {location}"))]
179    IndexNotFound {
180        identity: String,
181        #[snafu(implicit)]
182        location: Location,
183    },
184    #[snafu(display("Cannot infer storage location from: {message}"))]
185    InvalidTableLocation { message: String },
186    /// Stream early stop
187    Stop,
188    #[snafu(display("Wrapped error: {error}, {location}"))]
189    Wrapped {
190        error: BoxedError,
191        #[snafu(implicit)]
192        location: Location,
193    },
194    #[snafu(display("Cloned error: {message}, {location}"))]
195    Cloned {
196        message: String,
197        #[snafu(implicit)]
198        location: Location,
199    },
200    #[snafu(display("Query Execution error: {message}, {location}"))]
201    Execution {
202        message: String,
203        #[snafu(implicit)]
204        location: Location,
205    },
206    #[snafu(display("Ref is invalid: {message}"))]
207    InvalidRef { message: String },
208    #[snafu(display("Ref conflict error: {message}"))]
209    RefConflict { message: String },
210    #[snafu(display("Ref not found error: {message}"))]
211    RefNotFound { message: String },
212    #[snafu(display("Cleanup error: {message}"))]
213    Cleanup { message: String },
214    #[snafu(display("Version not found error: {message}"))]
215    VersionNotFound { message: String },
216    #[snafu(display("Version conflict error: {message}"))]
217    VersionConflict {
218        message: String,
219        major_version: u16,
220        minor_version: u16,
221        #[snafu(implicit)]
222        location: Location,
223    },
224    #[snafu(display("Namespace error: {source}, {location}"))]
225    Namespace {
226        source: BoxedError,
227        #[snafu(implicit)]
228        location: Location,
229    },
230    /// External error passed through from user code.
231    ///
232    /// This variant preserves errors that users pass into Lance APIs (e.g., via streams
233    /// with custom error types). The original error can be recovered using [`Error::into_external`]
234    /// or inspected using [`Error::external_source`].
235    #[snafu(transparent)]
236    External { source: BoxedError },
237
238    /// A requested field was not found in a schema.
239    #[snafu(transparent)]
240    FieldNotFound { source: FieldNotFoundError },
241}
242
243impl Error {
244    #[track_caller]
245    pub fn corrupt_file(path: object_store::path::Path, message: impl Into<String>) -> Self {
246        CorruptFileSnafu { path }.into_error(message.into().into())
247    }
248
249    #[track_caller]
250    pub fn invalid_input(message: impl Into<String>) -> Self {
251        InvalidInputSnafu.into_error(message.into().into())
252    }
253
254    #[track_caller]
255    pub fn invalid_input_source(source: BoxedError) -> Self {
256        InvalidInputSnafu.into_error(source)
257    }
258
259    #[track_caller]
260    pub fn io(message: impl Into<String>) -> Self {
261        IOSnafu.into_error(message.into().into())
262    }
263
264    #[track_caller]
265    pub fn io_source(source: BoxedError) -> Self {
266        IOSnafu.into_error(source)
267    }
268
269    #[track_caller]
270    pub fn dataset_already_exists(uri: impl Into<String>) -> Self {
271        DatasetAlreadyExistsSnafu { uri: uri.into() }.build()
272    }
273
274    #[track_caller]
275    pub fn dataset_not_found(path: impl Into<String>, source: BoxedError) -> Self {
276        DatasetNotFoundSnafu { path: path.into() }.into_error(source)
277    }
278
279    #[track_caller]
280    pub fn version_conflict(
281        message: impl Into<String>,
282        major_version: u16,
283        minor_version: u16,
284    ) -> Self {
285        VersionConflictSnafu {
286            message: message.into(),
287            major_version,
288            minor_version,
289        }
290        .build()
291    }
292
293    #[track_caller]
294    pub fn not_found(uri: impl Into<String>) -> Self {
295        NotFoundSnafu { uri: uri.into() }.build()
296    }
297
298    #[track_caller]
299    pub fn wrapped(error: BoxedError) -> Self {
300        WrappedSnafu { error }.build()
301    }
302
303    #[track_caller]
304    pub fn schema(message: impl Into<String>) -> Self {
305        SchemaSnafu {
306            message: message.into(),
307        }
308        .build()
309    }
310
311    #[track_caller]
312    pub fn not_supported(message: impl Into<String>) -> Self {
313        NotSupportedSnafu.into_error(message.into().into())
314    }
315
316    #[track_caller]
317    pub fn not_supported_source(source: BoxedError) -> Self {
318        NotSupportedSnafu.into_error(source)
319    }
320
321    #[track_caller]
322    pub fn internal(message: impl Into<String>) -> Self {
323        InternalSnafu {
324            message: message.into(),
325        }
326        .build()
327    }
328
329    #[track_caller]
330    pub fn timeout(message: impl Into<String>) -> Self {
331        TimeoutSnafu {
332            message: message.into(),
333        }
334        .build()
335    }
336
337    #[track_caller]
338    pub fn namespace(message: impl Into<String>) -> Self {
339        NamespaceSnafu.into_error(message.into().into())
340    }
341
342    #[track_caller]
343    pub fn namespace_source(source: Box<dyn std::error::Error + Send + Sync + 'static>) -> Self {
344        NamespaceSnafu.into_error(source)
345    }
346
347    #[track_caller]
348    pub fn arrow(message: impl Into<String>) -> Self {
349        ArrowSnafu {
350            message: message.into(),
351        }
352        .build()
353    }
354
355    #[track_caller]
356    pub fn execution(message: impl Into<String>) -> Self {
357        ExecutionSnafu {
358            message: message.into(),
359        }
360        .build()
361    }
362
363    #[track_caller]
364    pub fn cloned(message: impl Into<String>) -> Self {
365        ClonedSnafu {
366            message: message.into(),
367        }
368        .build()
369    }
370
371    #[track_caller]
372    pub fn schema_mismatch(difference: impl Into<String>) -> Self {
373        SchemaMismatchSnafu {
374            difference: difference.into(),
375        }
376        .build()
377    }
378
379    #[track_caller]
380    pub fn unprocessable(message: impl Into<String>) -> Self {
381        UnprocessableSnafu {
382            message: message.into(),
383        }
384        .build()
385    }
386
387    #[track_caller]
388    pub fn too_much_write_contention(message: impl Into<String>) -> Self {
389        TooMuchWriteContentionSnafu {
390            message: message.into(),
391        }
392        .build()
393    }
394
395    #[track_caller]
396    pub fn prerequisite_failed(message: impl Into<String>) -> Self {
397        PrerequisiteFailedSnafu {
398            message: message.into(),
399        }
400        .build()
401    }
402
403    #[track_caller]
404    pub fn index(message: impl Into<String>) -> Self {
405        IndexSnafu {
406            message: message.into(),
407        }
408        .build()
409    }
410
411    #[track_caller]
412    pub fn index_not_found(identity: impl Into<String>) -> Self {
413        IndexNotFoundSnafu {
414            identity: identity.into(),
415        }
416        .build()
417    }
418
419    #[track_caller]
420    pub fn commit_conflict_source(version: u64, source: BoxedError) -> Self {
421        CommitConflictSnafu { version }.into_error(source)
422    }
423
424    #[track_caller]
425    pub fn retryable_commit_conflict_source(version: u64, source: BoxedError) -> Self {
426        RetryableCommitConflictSnafu { version }.into_error(source)
427    }
428
429    #[track_caller]
430    pub fn incompatible_transaction_source(source: BoxedError) -> Self {
431        IncompatibleTransactionSnafu.into_error(source)
432    }
433
434    /// Create an External error from a boxed error source.
435    pub fn external(source: BoxedError) -> Self {
436        Self::External { source }
437    }
438
439    /// Create a FieldNotFound error with the given field name and available candidates.
440    pub fn field_not_found(field_name: impl Into<String>, candidates: Vec<String>) -> Self {
441        Self::FieldNotFound {
442            source: FieldNotFoundError {
443                field_name: field_name.into(),
444                candidates,
445            },
446        }
447    }
448
449    /// Returns a reference to the external error source if this is an `External` variant.
450    ///
451    /// This allows downcasting to recover the original error type.
452    pub fn external_source(&self) -> Option<&BoxedError> {
453        match self {
454            Self::External { source } => Some(source),
455            _ => None,
456        }
457    }
458
459    /// Consumes the error and returns the external source if this is an `External` variant.
460    ///
461    /// Returns `Err(self)` if this is not an `External` variant, allowing for chained handling.
462    pub fn into_external(self) -> std::result::Result<BoxedError, Self> {
463        match self {
464            Self::External { source } => Ok(source),
465            other => Err(other),
466        }
467    }
468}
469
470pub trait LanceOptionExt<T> {
471    /// Unwraps an option, returning an internal error if the option is None.
472    ///
473    /// Can be used when an option is expected to have a value.
474    fn expect_ok(self) -> Result<T>;
475}
476
477impl<T> LanceOptionExt<T> for Option<T> {
478    #[track_caller]
479    fn expect_ok(self) -> Result<T> {
480        self.ok_or_else(|| Error::internal("Expected option to have value"))
481    }
482}
483
484pub type Result<T> = std::result::Result<T, Error>;
485pub type ArrowResult<T> = std::result::Result<T, ArrowError>;
486#[cfg(feature = "datafusion")]
487pub type DataFusionResult<T> = std::result::Result<T, datafusion_common::DataFusionError>;
488
489impl From<ArrowError> for Error {
490    #[track_caller]
491    fn from(e: ArrowError) -> Self {
492        match e {
493            ArrowError::ExternalError(source) => {
494                // Try to downcast to lance_core::Error first to recover the original
495                match source.downcast::<Self>() {
496                    Ok(lance_err) => *lance_err,
497                    Err(source) => Self::External { source },
498                }
499            }
500            other => Self::arrow(other.to_string()),
501        }
502    }
503}
504
505impl From<&ArrowError> for Error {
506    #[track_caller]
507    fn from(e: &ArrowError) -> Self {
508        Self::arrow(e.to_string())
509    }
510}
511
512impl From<std::io::Error> for Error {
513    #[track_caller]
514    fn from(e: std::io::Error) -> Self {
515        Self::io_source(box_error(e))
516    }
517}
518
519impl From<object_store::Error> for Error {
520    #[track_caller]
521    fn from(e: object_store::Error) -> Self {
522        Self::io_source(box_error(e))
523    }
524}
525
526impl From<prost::DecodeError> for Error {
527    #[track_caller]
528    fn from(e: prost::DecodeError) -> Self {
529        Self::io_source(box_error(e))
530    }
531}
532
533impl From<prost::EncodeError> for Error {
534    #[track_caller]
535    fn from(e: prost::EncodeError) -> Self {
536        Self::io_source(box_error(e))
537    }
538}
539
540impl From<prost::UnknownEnumValue> for Error {
541    #[track_caller]
542    fn from(e: prost::UnknownEnumValue) -> Self {
543        Self::io_source(box_error(e))
544    }
545}
546
547impl From<tokio::task::JoinError> for Error {
548    #[track_caller]
549    fn from(e: tokio::task::JoinError) -> Self {
550        Self::io_source(box_error(e))
551    }
552}
553
554impl From<object_store::path::Error> for Error {
555    #[track_caller]
556    fn from(e: object_store::path::Error) -> Self {
557        Self::io_source(box_error(e))
558    }
559}
560
561impl From<url::ParseError> for Error {
562    #[track_caller]
563    fn from(e: url::ParseError) -> Self {
564        Self::io_source(box_error(e))
565    }
566}
567
568impl From<serde_json::Error> for Error {
569    #[track_caller]
570    fn from(e: serde_json::Error) -> Self {
571        Self::arrow(e.to_string())
572    }
573}
574
575impl From<Error> for ArrowError {
576    fn from(value: Error) -> Self {
577        match value {
578            // Pass through external errors directly
579            Error::External { source } => Self::ExternalError(source),
580            // Preserve schema errors with their specific type
581            Error::Schema { message, .. } => Self::SchemaError(message),
582            // Wrap all other lance errors so they can be recovered
583            e => Self::ExternalError(Box::new(e)),
584        }
585    }
586}
587
588#[cfg(feature = "datafusion")]
589impl From<datafusion_sql::sqlparser::parser::ParserError> for Error {
590    #[track_caller]
591    fn from(e: datafusion_sql::sqlparser::parser::ParserError) -> Self {
592        Self::io_source(box_error(e))
593    }
594}
595
596#[cfg(feature = "datafusion")]
597impl From<datafusion_sql::sqlparser::tokenizer::TokenizerError> for Error {
598    #[track_caller]
599    fn from(e: datafusion_sql::sqlparser::tokenizer::TokenizerError) -> Self {
600        Self::io_source(box_error(e))
601    }
602}
603
604#[cfg(feature = "datafusion")]
605impl From<Error> for datafusion_common::DataFusionError {
606    #[track_caller]
607    fn from(e: Error) -> Self {
608        Self::External(Box::new(e))
609    }
610}
611
612#[cfg(feature = "datafusion")]
613impl From<datafusion_common::DataFusionError> for Error {
614    #[track_caller]
615    fn from(e: datafusion_common::DataFusionError) -> Self {
616        match e {
617            datafusion_common::DataFusionError::SQL(..)
618            | datafusion_common::DataFusionError::Plan(..)
619            | datafusion_common::DataFusionError::Configuration(..)
620            | datafusion_common::DataFusionError::SchemaError(..) => {
621                Self::invalid_input_source(box_error(e))
622            }
623            datafusion_common::DataFusionError::ArrowError(arrow_err, _) => Self::from(*arrow_err),
624            datafusion_common::DataFusionError::NotImplemented(..) => {
625                Self::not_supported_source(box_error(e))
626            }
627            datafusion_common::DataFusionError::Execution(..) => Self::execution(e.to_string()),
628            datafusion_common::DataFusionError::External(source) => {
629                // Try to downcast to lance_core::Error first
630                match source.downcast::<Self>() {
631                    Ok(lance_err) => *lance_err,
632                    Err(source) => Self::External { source },
633                }
634            }
635            _ => Self::io_source(box_error(e)),
636        }
637    }
638}
639
640// This is a bit odd but some object_store functions only accept
641// Stream<Result<T, ObjectStoreError>> and so we need to convert
642// to ObjectStoreError to call the methods.
643impl From<Error> for object_store::Error {
644    fn from(err: Error) -> Self {
645        Self::Generic {
646            store: "N/A",
647            source: Box::new(err),
648        }
649    }
650}
651
652#[track_caller]
653pub fn get_caller_location() -> &'static std::panic::Location<'static> {
654    std::panic::Location::caller()
655}
656
657/// Wrap an error in a new error type that implements Clone
658///
659/// This is useful when two threads/streams share a common fallible source
660/// The base error will always have the full error.  Any cloned results will
661/// only have Error::Cloned with the to_string of the base error.
662pub struct CloneableError(pub Error);
663
664impl Clone for CloneableError {
665    #[track_caller]
666    fn clone(&self) -> Self {
667        Self(Error::cloned(self.0.to_string()))
668    }
669}
670
671#[derive(Clone)]
672pub struct CloneableResult<T: Clone>(pub std::result::Result<T, CloneableError>);
673
674impl<T: Clone> From<Result<T>> for CloneableResult<T> {
675    fn from(result: Result<T>) -> Self {
676        Self(result.map_err(CloneableError))
677    }
678}
679
680#[cfg(test)]
681mod test {
682    use super::*;
683    use std::fmt;
684
685    #[test]
686    fn test_caller_location_capture() {
687        let current_fn = get_caller_location();
688        // make sure ? captures the correct location
689        // .into() WILL NOT capture the correct location
690        let f: Box<dyn Fn() -> Result<()>> = Box::new(|| {
691            Err(object_store::Error::Generic {
692                store: "",
693                source: "".into(),
694            })?;
695            Ok(())
696        });
697        match f().unwrap_err() {
698            Error::IO { location, .. } => {
699                // +4 is the beginning of object_store::Error::Generic...
700                assert_eq!(location.line(), current_fn.line() + 4, "{}", location)
701            }
702            #[allow(unreachable_patterns)]
703            _ => panic!("expected ObjectStore error"),
704        }
705    }
706
707    #[derive(Debug)]
708    struct MyCustomError {
709        code: i32,
710        message: String,
711    }
712
713    impl fmt::Display for MyCustomError {
714        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
715            write!(f, "MyCustomError({}): {}", self.code, self.message)
716        }
717    }
718
719    impl std::error::Error for MyCustomError {}
720
721    #[test]
722    fn test_external_error_creation() {
723        let custom_err = MyCustomError {
724            code: 42,
725            message: "test error".to_string(),
726        };
727        let err = Error::external(Box::new(custom_err));
728
729        match &err {
730            Error::External { source } => {
731                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
732                assert_eq!(recovered.code, 42);
733                assert_eq!(recovered.message, "test error");
734            }
735            _ => panic!("Expected External variant"),
736        }
737    }
738
739    #[test]
740    fn test_external_source_method() {
741        let custom_err = MyCustomError {
742            code: 123,
743            message: "source test".to_string(),
744        };
745        let err = Error::external(Box::new(custom_err));
746
747        let source = err.external_source().expect("should have external source");
748        let recovered = source.downcast_ref::<MyCustomError>().unwrap();
749        assert_eq!(recovered.code, 123);
750
751        // Test that non-External variants return None
752        let io_err = Error::io("test");
753        assert!(io_err.external_source().is_none());
754    }
755
756    #[test]
757    fn test_into_external_method() {
758        let custom_err = MyCustomError {
759            code: 456,
760            message: "into test".to_string(),
761        };
762        let err = Error::external(Box::new(custom_err));
763
764        match err.into_external() {
765            Ok(source) => {
766                let recovered = source.downcast::<MyCustomError>().unwrap();
767                assert_eq!(recovered.code, 456);
768            }
769            Err(_) => panic!("Expected Ok"),
770        }
771
772        // Test that non-External variants return Err(self)
773        let io_err = Error::io("test");
774        match io_err.into_external() {
775            Err(Error::IO { .. }) => {}
776            _ => panic!("Expected Err with IO variant"),
777        }
778    }
779
780    #[test]
781    fn test_arrow_external_error_conversion() {
782        let custom_err = MyCustomError {
783            code: 789,
784            message: "arrow test".to_string(),
785        };
786        let arrow_err = ArrowError::ExternalError(Box::new(custom_err));
787        let lance_err: Error = arrow_err.into();
788
789        match lance_err {
790            Error::External { source } => {
791                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
792                assert_eq!(recovered.code, 789);
793            }
794            _ => panic!("Expected External variant, got {:?}", lance_err),
795        }
796    }
797
798    #[test]
799    fn test_external_to_arrow_roundtrip() {
800        let custom_err = MyCustomError {
801            code: 999,
802            message: "roundtrip".to_string(),
803        };
804        let lance_err = Error::external(Box::new(custom_err));
805        let arrow_err: ArrowError = lance_err.into();
806
807        match arrow_err {
808            ArrowError::ExternalError(source) => {
809                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
810                assert_eq!(recovered.code, 999);
811            }
812            _ => panic!("Expected ExternalError variant"),
813        }
814    }
815
816    #[cfg(feature = "datafusion")]
817    #[test]
818    fn test_datafusion_schema_error_is_invalid_input() {
819        // Schema errors from DataFusion (e.g., a filter referencing an unknown
820        // column) are user-input failures, not internal lance failures. They
821        // must surface as `Error::InvalidInput` so downstream FFI/Python
822        // bindings can map them to the right user-facing error code.
823        use datafusion_common::Column;
824
825        let schema_err = datafusion_common::SchemaError::FieldNotFound {
826            field: Box::new(Column::from_name("missing_col")),
827            valid_fields: vec![],
828        };
829        let df_err =
830            datafusion_common::DataFusionError::SchemaError(Box::new(schema_err), Box::new(None));
831        let lance_err: Error = df_err.into();
832
833        match lance_err {
834            Error::InvalidInput { .. } => {
835                assert!(
836                    lance_err.to_string().contains("missing_col"),
837                    "expected the column name to survive in the error message, got: {lance_err}"
838                );
839            }
840            _ => panic!("Expected InvalidInput variant, got {:?}", lance_err),
841        }
842    }
843
844    #[cfg(feature = "datafusion")]
845    #[test]
846    fn test_datafusion_external_error_conversion() {
847        let custom_err = MyCustomError {
848            code: 111,
849            message: "datafusion test".to_string(),
850        };
851        let df_err = datafusion_common::DataFusionError::External(Box::new(custom_err));
852        let lance_err: Error = df_err.into();
853
854        match lance_err {
855            Error::External { source } => {
856                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
857                assert_eq!(recovered.code, 111);
858            }
859            _ => panic!("Expected External variant"),
860        }
861    }
862
863    #[cfg(feature = "datafusion")]
864    #[test]
865    fn test_datafusion_arrow_external_error_conversion() {
866        // Test the nested case: ArrowError::ExternalError inside DataFusionError::ArrowError
867        let custom_err = MyCustomError {
868            code: 222,
869            message: "nested test".to_string(),
870        };
871        let arrow_err = ArrowError::ExternalError(Box::new(custom_err));
872        let df_err = datafusion_common::DataFusionError::ArrowError(Box::new(arrow_err), None);
873        let lance_err: Error = df_err.into();
874
875        match lance_err {
876            Error::External { source } => {
877                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
878                assert_eq!(recovered.code, 222);
879            }
880            _ => panic!("Expected External variant, got {:?}", lance_err),
881        }
882    }
883
884    /// Test that lance_core::Error round-trips through ArrowError.
885    ///
886    /// This simulates the case where a user defines an iterator in terms of
887    /// lance_core::Error, and the error goes through Arrow's error type
888    /// (e.g., via RecordBatchIterator) before being converted back.
889    #[test]
890    fn test_lance_error_roundtrip_through_arrow() {
891        let original = Error::invalid_input("test validation error");
892
893        // Simulate what happens when using ? in an Arrow context
894        let arrow_err: ArrowError = original.into();
895
896        // Convert back to lance error (as happens when Lance consumes the stream)
897        let recovered: Error = arrow_err.into();
898
899        // Should get back the original lance error directly (not wrapped in External)
900        match recovered {
901            Error::InvalidInput { .. } => {
902                assert!(recovered.to_string().contains("test validation error"));
903            }
904            _ => panic!("Expected InvalidInput variant, got {:?}", recovered),
905        }
906    }
907
908    /// Test that lance_core::Error round-trips through DataFusionError.
909    ///
910    /// This simulates the case where a user defines a stream in terms of
911    /// lance_core::Error, and the error goes through DataFusion's error type
912    /// (e.g., via SendableRecordBatchStream) before being converted back.
913    #[cfg(feature = "datafusion")]
914    #[test]
915    fn test_lance_error_roundtrip_through_datafusion() {
916        let original = Error::invalid_input("test validation error");
917
918        // Simulate what happens when using ? in a DataFusion context
919        let df_err: datafusion_common::DataFusionError = original.into();
920
921        // Convert back to lance error (as happens when Lance consumes the stream)
922        let recovered: Error = df_err.into();
923
924        // Should get back the original lance error directly (not wrapped in External)
925        match recovered {
926            Error::InvalidInput { .. } => {
927                assert!(recovered.to_string().contains("test validation error"));
928            }
929            _ => panic!("Expected InvalidInput variant, got {:?}", recovered),
930        }
931    }
932}