Skip to main content

lance_core/
error.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::ArrowError;
5use snafu::{Location, Snafu};
6
7type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
8
9/// Allocates error on the heap and then places `e` into it.
10#[inline]
11pub fn box_error(e: impl std::error::Error + Send + Sync + 'static) -> BoxedError {
12    Box::new(e)
13}
14
15#[derive(Debug, Snafu)]
16#[snafu(visibility(pub))]
17pub enum Error {
18    #[snafu(display("Invalid user input: {source}, {location}"))]
19    InvalidInput {
20        source: BoxedError,
21        location: Location,
22    },
23    #[snafu(display("Dataset already exists: {uri}, {location}"))]
24    DatasetAlreadyExists { uri: String, location: Location },
25    #[snafu(display("Append with different schema: {difference}, location: {location}"))]
26    SchemaMismatch {
27        difference: String,
28        location: Location,
29    },
30    #[snafu(display("Dataset at path {path} was not found: {source}, {location}"))]
31    DatasetNotFound {
32        path: String,
33        source: BoxedError,
34        location: Location,
35    },
36    #[snafu(display("Encountered corrupt file {path}: {source}, {location}"))]
37    CorruptFile {
38        path: object_store::path::Path,
39        source: BoxedError,
40        location: Location,
41        // TODO: add backtrace?
42    },
43    #[snafu(display("Not supported: {source}, {location}"))]
44    NotSupported {
45        source: BoxedError,
46        location: Location,
47    },
48    #[snafu(display("Commit conflict for version {version}: {source}, {location}"))]
49    CommitConflict {
50        version: u64,
51        source: BoxedError,
52        location: Location,
53    },
54    #[snafu(display("Retryable commit conflict for version {version}: {source}, {location}"))]
55    RetryableCommitConflict {
56        version: u64,
57        source: BoxedError,
58        location: Location,
59    },
60    #[snafu(display("Too many concurrent writers. {message}, {location}"))]
61    TooMuchWriteContention { message: String, location: Location },
62    #[snafu(display("Encountered internal error. Please file a bug report at https://github.com/lance-format/lance/issues. {message}, {location}"))]
63    Internal { message: String, location: Location },
64    #[snafu(display("A prerequisite task failed: {message}, {location}"))]
65    PrerequisiteFailed { message: String, location: Location },
66    #[snafu(display("Unprocessable: {message}, {location}"))]
67    Unprocessable { message: String, location: Location },
68    #[snafu(display("LanceError(Arrow): {message}, {location}"))]
69    Arrow { message: String, location: Location },
70    #[snafu(display("LanceError(Schema): {message}, {location}"))]
71    Schema { message: String, location: Location },
72    #[snafu(display("Not found: {uri}, {location}"))]
73    NotFound { uri: String, location: Location },
74    #[snafu(display("LanceError(IO): {source}, {location}"))]
75    IO {
76        source: BoxedError,
77        location: Location,
78    },
79    #[snafu(display("LanceError(Index): {message}, {location}"))]
80    Index { message: String, location: Location },
81    #[snafu(display("Lance index not found: {identity}, {location}"))]
82    IndexNotFound {
83        identity: String,
84        location: Location,
85    },
86    #[snafu(display("Cannot infer storage location from: {message}"))]
87    InvalidTableLocation { message: String },
88    /// Stream early stop
89    Stop,
90    #[snafu(display("Wrapped error: {error}, {location}"))]
91    Wrapped {
92        error: BoxedError,
93        location: Location,
94    },
95    #[snafu(display("Cloned error: {message}, {location}"))]
96    Cloned { message: String, location: Location },
97    #[snafu(display("Query Execution error: {message}, {location}"))]
98    Execution { message: String, location: Location },
99    #[snafu(display("Ref is invalid: {message}"))]
100    InvalidRef { message: String },
101    #[snafu(display("Ref conflict error: {message}"))]
102    RefConflict { message: String },
103    #[snafu(display("Ref not found error: {message}"))]
104    RefNotFound { message: String },
105    #[snafu(display("Cleanup error: {message}"))]
106    Cleanup { message: String },
107    #[snafu(display("Version not found error: {message}"))]
108    VersionNotFound { message: String },
109    #[snafu(display("Version conflict error: {message}"))]
110    VersionConflict {
111        message: String,
112        major_version: u16,
113        minor_version: u16,
114        location: Location,
115    },
116    #[snafu(display("Namespace error: {source}, {location}"))]
117    Namespace {
118        source: BoxedError,
119        location: Location,
120    },
121    /// External error passed through from user code.
122    ///
123    /// This variant preserves errors that users pass into Lance APIs (e.g., via streams
124    /// with custom error types). The original error can be recovered using [`Error::into_external`]
125    /// or inspected using [`Error::external_source`].
126    #[snafu(transparent)]
127    External { source: BoxedError },
128}
129
130impl Error {
131    pub fn corrupt_file(
132        path: object_store::path::Path,
133        message: impl Into<String>,
134        location: Location,
135    ) -> Self {
136        let message: String = message.into();
137        Self::CorruptFile {
138            path,
139            source: message.into(),
140            location,
141        }
142    }
143
144    pub fn invalid_input(message: impl Into<String>, location: Location) -> Self {
145        let message: String = message.into();
146        Self::InvalidInput {
147            source: message.into(),
148            location,
149        }
150    }
151
152    pub fn io(message: impl Into<String>, location: Location) -> Self {
153        let message: String = message.into();
154        Self::IO {
155            source: message.into(),
156            location,
157        }
158    }
159
160    pub fn version_conflict(
161        message: impl Into<String>,
162        major_version: u16,
163        minor_version: u16,
164        location: Location,
165    ) -> Self {
166        let message: String = message.into();
167        Self::VersionConflict {
168            message,
169            major_version,
170            minor_version,
171            location,
172        }
173    }
174
175    pub fn not_found(uri: impl Into<String>) -> Self {
176        Self::NotFound {
177            uri: uri.into(),
178            location: std::panic::Location::caller().to_snafu_location(),
179        }
180    }
181
182    pub fn schema(message: impl Into<String>, location: Location) -> Self {
183        let message: String = message.into();
184        Self::Schema { message, location }
185    }
186
187    pub fn not_supported(message: impl Into<String>, location: Location) -> Self {
188        let message: String = message.into();
189        Self::NotSupported {
190            source: message.into(),
191            location,
192        }
193    }
194
195    /// Create an External error from a boxed error source.
196    pub fn external(source: BoxedError) -> Self {
197        Self::External { source }
198    }
199
200    /// Returns a reference to the external error source if this is an `External` variant.
201    ///
202    /// This allows downcasting to recover the original error type.
203    pub fn external_source(&self) -> Option<&BoxedError> {
204        match self {
205            Self::External { source } => Some(source),
206            _ => None,
207        }
208    }
209
210    /// Consumes the error and returns the external source if this is an `External` variant.
211    ///
212    /// Returns `Err(self)` if this is not an `External` variant, allowing for chained handling.
213    pub fn into_external(self) -> std::result::Result<BoxedError, Self> {
214        match self {
215            Self::External { source } => Ok(source),
216            other => Err(other),
217        }
218    }
219}
220
221pub trait LanceOptionExt<T> {
222    /// Unwraps an option, returning an internal error if the option is None.
223    ///
224    /// Can be used when an option is expected to have a value.
225    fn expect_ok(self) -> Result<T>;
226}
227
228impl<T> LanceOptionExt<T> for Option<T> {
229    #[track_caller]
230    fn expect_ok(self) -> Result<T> {
231        let location = std::panic::Location::caller().to_snafu_location();
232        self.ok_or_else(|| Error::Internal {
233            message: "Expected option to have value".to_string(),
234            location,
235        })
236    }
237}
238
239pub trait ToSnafuLocation {
240    fn to_snafu_location(&'static self) -> snafu::Location;
241}
242
243impl ToSnafuLocation for std::panic::Location<'static> {
244    fn to_snafu_location(&'static self) -> snafu::Location {
245        snafu::Location::new(self.file(), self.line(), self.column())
246    }
247}
248
249pub type Result<T> = std::result::Result<T, Error>;
250pub type ArrowResult<T> = std::result::Result<T, ArrowError>;
251#[cfg(feature = "datafusion")]
252pub type DataFusionResult<T> = std::result::Result<T, datafusion_common::DataFusionError>;
253
254impl From<ArrowError> for Error {
255    #[track_caller]
256    fn from(e: ArrowError) -> Self {
257        match e {
258            ArrowError::ExternalError(source) => {
259                // Try to downcast to lance_core::Error first to recover the original
260                match source.downcast::<Self>() {
261                    Ok(lance_err) => *lance_err,
262                    Err(source) => Self::External { source },
263                }
264            }
265            other => Self::Arrow {
266                message: other.to_string(),
267                location: std::panic::Location::caller().to_snafu_location(),
268            },
269        }
270    }
271}
272
273impl From<&ArrowError> for Error {
274    #[track_caller]
275    fn from(e: &ArrowError) -> Self {
276        Self::Arrow {
277            message: e.to_string(),
278            location: std::panic::Location::caller().to_snafu_location(),
279        }
280    }
281}
282
283impl From<std::io::Error> for Error {
284    #[track_caller]
285    fn from(e: std::io::Error) -> Self {
286        Self::IO {
287            source: box_error(e),
288            location: std::panic::Location::caller().to_snafu_location(),
289        }
290    }
291}
292
293impl From<object_store::Error> for Error {
294    #[track_caller]
295    fn from(e: object_store::Error) -> Self {
296        Self::IO {
297            source: box_error(e),
298            location: std::panic::Location::caller().to_snafu_location(),
299        }
300    }
301}
302
303impl From<prost::DecodeError> for Error {
304    #[track_caller]
305    fn from(e: prost::DecodeError) -> Self {
306        Self::IO {
307            source: box_error(e),
308            location: std::panic::Location::caller().to_snafu_location(),
309        }
310    }
311}
312
313impl From<prost::EncodeError> for Error {
314    #[track_caller]
315    fn from(e: prost::EncodeError) -> Self {
316        Self::IO {
317            source: box_error(e),
318            location: std::panic::Location::caller().to_snafu_location(),
319        }
320    }
321}
322
323impl From<prost::UnknownEnumValue> for Error {
324    #[track_caller]
325    fn from(e: prost::UnknownEnumValue) -> Self {
326        Self::IO {
327            source: box_error(e),
328            location: std::panic::Location::caller().to_snafu_location(),
329        }
330    }
331}
332
333impl From<tokio::task::JoinError> for Error {
334    #[track_caller]
335    fn from(e: tokio::task::JoinError) -> Self {
336        Self::IO {
337            source: box_error(e),
338            location: std::panic::Location::caller().to_snafu_location(),
339        }
340    }
341}
342
343impl From<object_store::path::Error> for Error {
344    #[track_caller]
345    fn from(e: object_store::path::Error) -> Self {
346        Self::IO {
347            source: box_error(e),
348            location: std::panic::Location::caller().to_snafu_location(),
349        }
350    }
351}
352
353impl From<url::ParseError> for Error {
354    #[track_caller]
355    fn from(e: url::ParseError) -> Self {
356        Self::IO {
357            source: box_error(e),
358            location: std::panic::Location::caller().to_snafu_location(),
359        }
360    }
361}
362
363impl From<serde_json::Error> for Error {
364    #[track_caller]
365    fn from(e: serde_json::Error) -> Self {
366        Self::Arrow {
367            message: e.to_string(),
368            location: std::panic::Location::caller().to_snafu_location(),
369        }
370    }
371}
372
373impl From<Error> for ArrowError {
374    fn from(value: Error) -> Self {
375        match value {
376            // Pass through external errors directly
377            Error::External { source } => Self::ExternalError(source),
378            // Preserve schema errors with their specific type
379            Error::Schema { message, .. } => Self::SchemaError(message),
380            // Wrap all other lance errors so they can be recovered
381            e => Self::ExternalError(Box::new(e)),
382        }
383    }
384}
385
386#[cfg(feature = "datafusion")]
387impl From<datafusion_sql::sqlparser::parser::ParserError> for Error {
388    #[track_caller]
389    fn from(e: datafusion_sql::sqlparser::parser::ParserError) -> Self {
390        Self::IO {
391            source: box_error(e),
392            location: std::panic::Location::caller().to_snafu_location(),
393        }
394    }
395}
396
397#[cfg(feature = "datafusion")]
398impl From<datafusion_sql::sqlparser::tokenizer::TokenizerError> for Error {
399    #[track_caller]
400    fn from(e: datafusion_sql::sqlparser::tokenizer::TokenizerError) -> Self {
401        Self::IO {
402            source: box_error(e),
403            location: std::panic::Location::caller().to_snafu_location(),
404        }
405    }
406}
407
408#[cfg(feature = "datafusion")]
409impl From<Error> for datafusion_common::DataFusionError {
410    #[track_caller]
411    fn from(e: Error) -> Self {
412        Self::External(Box::new(e))
413    }
414}
415
416#[cfg(feature = "datafusion")]
417impl From<datafusion_common::DataFusionError> for Error {
418    #[track_caller]
419    fn from(e: datafusion_common::DataFusionError) -> Self {
420        let location = std::panic::Location::caller().to_snafu_location();
421        match e {
422            datafusion_common::DataFusionError::SQL(..)
423            | datafusion_common::DataFusionError::Plan(..)
424            | datafusion_common::DataFusionError::Configuration(..) => Self::InvalidInput {
425                source: box_error(e),
426                location,
427            },
428            datafusion_common::DataFusionError::SchemaError(..) => Self::Schema {
429                message: e.to_string(),
430                location,
431            },
432            datafusion_common::DataFusionError::ArrowError(arrow_err, _) => Self::from(*arrow_err),
433            datafusion_common::DataFusionError::NotImplemented(..) => Self::NotSupported {
434                source: box_error(e),
435                location,
436            },
437            datafusion_common::DataFusionError::Execution(..) => Self::Execution {
438                message: e.to_string(),
439                location,
440            },
441            datafusion_common::DataFusionError::External(source) => {
442                // Try to downcast to lance_core::Error first
443                match source.downcast::<Self>() {
444                    Ok(lance_err) => *lance_err,
445                    Err(source) => Self::External { source },
446                }
447            }
448            _ => Self::IO {
449                source: box_error(e),
450                location,
451            },
452        }
453    }
454}
455
456// This is a bit odd but some object_store functions only accept
457// Stream<Result<T, ObjectStoreError>> and so we need to convert
458// to ObjectStoreError to call the methods.
459impl From<Error> for object_store::Error {
460    fn from(err: Error) -> Self {
461        Self::Generic {
462            store: "N/A",
463            source: Box::new(err),
464        }
465    }
466}
467
468#[track_caller]
469pub fn get_caller_location() -> &'static std::panic::Location<'static> {
470    std::panic::Location::caller()
471}
472
473/// Wrap an error in a new error type that implements Clone
474///
475/// This is useful when two threads/streams share a common fallible source
476/// The base error will always have the full error.  Any cloned results will
477/// only have Error::Cloned with the to_string of the base error.
478pub struct CloneableError(pub Error);
479
480impl Clone for CloneableError {
481    #[track_caller]
482    fn clone(&self) -> Self {
483        Self(Error::Cloned {
484            message: self.0.to_string(),
485            location: std::panic::Location::caller().to_snafu_location(),
486        })
487    }
488}
489
490#[derive(Clone)]
491pub struct CloneableResult<T: Clone>(pub std::result::Result<T, CloneableError>);
492
493impl<T: Clone> From<Result<T>> for CloneableResult<T> {
494    fn from(result: Result<T>) -> Self {
495        Self(result.map_err(CloneableError))
496    }
497}
498
499#[cfg(test)]
500mod test {
501    use super::*;
502    use std::fmt;
503
504    #[test]
505    fn test_caller_location_capture() {
506        let current_fn = get_caller_location();
507        // make sure ? captures the correct location
508        // .into() WILL NOT capture the correct location
509        let f: Box<dyn Fn() -> Result<()>> = Box::new(|| {
510            Err(object_store::Error::Generic {
511                store: "",
512                source: "".into(),
513            })?;
514            Ok(())
515        });
516        match f().unwrap_err() {
517            Error::IO { location, .. } => {
518                // +4 is the beginning of object_store::Error::Generic...
519                assert_eq!(location.line, current_fn.line() + 4, "{}", location)
520            }
521            #[allow(unreachable_patterns)]
522            _ => panic!("expected ObjectStore error"),
523        }
524    }
525
526    #[derive(Debug)]
527    struct MyCustomError {
528        code: i32,
529        message: String,
530    }
531
532    impl fmt::Display for MyCustomError {
533        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
534            write!(f, "MyCustomError({}): {}", self.code, self.message)
535        }
536    }
537
538    impl std::error::Error for MyCustomError {}
539
540    #[test]
541    fn test_external_error_creation() {
542        let custom_err = MyCustomError {
543            code: 42,
544            message: "test error".to_string(),
545        };
546        let err = Error::external(Box::new(custom_err));
547
548        match &err {
549            Error::External { source } => {
550                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
551                assert_eq!(recovered.code, 42);
552                assert_eq!(recovered.message, "test error");
553            }
554            _ => panic!("Expected External variant"),
555        }
556    }
557
558    #[test]
559    fn test_external_source_method() {
560        let custom_err = MyCustomError {
561            code: 123,
562            message: "source test".to_string(),
563        };
564        let err = Error::external(Box::new(custom_err));
565
566        let source = err.external_source().expect("should have external source");
567        let recovered = source.downcast_ref::<MyCustomError>().unwrap();
568        assert_eq!(recovered.code, 123);
569
570        // Test that non-External variants return None
571        let io_err = Error::io("test", snafu::Location::new("test", 1, 1));
572        assert!(io_err.external_source().is_none());
573    }
574
575    #[test]
576    fn test_into_external_method() {
577        let custom_err = MyCustomError {
578            code: 456,
579            message: "into test".to_string(),
580        };
581        let err = Error::external(Box::new(custom_err));
582
583        match err.into_external() {
584            Ok(source) => {
585                let recovered = source.downcast::<MyCustomError>().unwrap();
586                assert_eq!(recovered.code, 456);
587            }
588            Err(_) => panic!("Expected Ok"),
589        }
590
591        // Test that non-External variants return Err(self)
592        let io_err = Error::io("test", snafu::Location::new("test", 1, 1));
593        match io_err.into_external() {
594            Err(Error::IO { .. }) => {}
595            _ => panic!("Expected Err with IO variant"),
596        }
597    }
598
599    #[test]
600    fn test_arrow_external_error_conversion() {
601        let custom_err = MyCustomError {
602            code: 789,
603            message: "arrow test".to_string(),
604        };
605        let arrow_err = ArrowError::ExternalError(Box::new(custom_err));
606        let lance_err: Error = arrow_err.into();
607
608        match lance_err {
609            Error::External { source } => {
610                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
611                assert_eq!(recovered.code, 789);
612            }
613            _ => panic!("Expected External variant, got {:?}", lance_err),
614        }
615    }
616
617    #[test]
618    fn test_external_to_arrow_roundtrip() {
619        let custom_err = MyCustomError {
620            code: 999,
621            message: "roundtrip".to_string(),
622        };
623        let lance_err = Error::external(Box::new(custom_err));
624        let arrow_err: ArrowError = lance_err.into();
625
626        match arrow_err {
627            ArrowError::ExternalError(source) => {
628                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
629                assert_eq!(recovered.code, 999);
630            }
631            _ => panic!("Expected ExternalError variant"),
632        }
633    }
634
635    #[cfg(feature = "datafusion")]
636    #[test]
637    fn test_datafusion_external_error_conversion() {
638        let custom_err = MyCustomError {
639            code: 111,
640            message: "datafusion test".to_string(),
641        };
642        let df_err = datafusion_common::DataFusionError::External(Box::new(custom_err));
643        let lance_err: Error = df_err.into();
644
645        match lance_err {
646            Error::External { source } => {
647                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
648                assert_eq!(recovered.code, 111);
649            }
650            _ => panic!("Expected External variant"),
651        }
652    }
653
654    #[cfg(feature = "datafusion")]
655    #[test]
656    fn test_datafusion_arrow_external_error_conversion() {
657        // Test the nested case: ArrowError::ExternalError inside DataFusionError::ArrowError
658        let custom_err = MyCustomError {
659            code: 222,
660            message: "nested test".to_string(),
661        };
662        let arrow_err = ArrowError::ExternalError(Box::new(custom_err));
663        let df_err = datafusion_common::DataFusionError::ArrowError(Box::new(arrow_err), None);
664        let lance_err: Error = df_err.into();
665
666        match lance_err {
667            Error::External { source } => {
668                let recovered = source.downcast_ref::<MyCustomError>().unwrap();
669                assert_eq!(recovered.code, 222);
670            }
671            _ => panic!("Expected External variant, got {:?}", lance_err),
672        }
673    }
674
675    /// Test that lance_core::Error round-trips through ArrowError.
676    ///
677    /// This simulates the case where a user defines an iterator in terms of
678    /// lance_core::Error, and the error goes through Arrow's error type
679    /// (e.g., via RecordBatchIterator) before being converted back.
680    #[test]
681    fn test_lance_error_roundtrip_through_arrow() {
682        let original = Error::invalid_input(
683            "test validation error",
684            snafu::Location::new("test.rs", 10, 1),
685        );
686
687        // Simulate what happens when using ? in an Arrow context
688        let arrow_err: ArrowError = original.into();
689
690        // Convert back to lance error (as happens when Lance consumes the stream)
691        let recovered: Error = arrow_err.into();
692
693        // Should get back the original lance error directly (not wrapped in External)
694        match recovered {
695            Error::InvalidInput { .. } => {
696                assert!(recovered.to_string().contains("test validation error"));
697            }
698            _ => panic!("Expected InvalidInput variant, got {:?}", recovered),
699        }
700    }
701
702    /// Test that lance_core::Error round-trips through DataFusionError.
703    ///
704    /// This simulates the case where a user defines a stream in terms of
705    /// lance_core::Error, and the error goes through DataFusion's error type
706    /// (e.g., via SendableRecordBatchStream) before being converted back.
707    #[cfg(feature = "datafusion")]
708    #[test]
709    fn test_lance_error_roundtrip_through_datafusion() {
710        let original = Error::invalid_input(
711            "test validation error",
712            snafu::Location::new("test.rs", 10, 1),
713        );
714
715        // Simulate what happens when using ? in a DataFusion context
716        let df_err: datafusion_common::DataFusionError = original.into();
717
718        // Convert back to lance error (as happens when Lance consumes the stream)
719        let recovered: Error = df_err.into();
720
721        // Should get back the original lance error directly (not wrapped in External)
722        match recovered {
723            Error::InvalidInput { .. } => {
724                assert!(recovered.to_string().contains("test validation error"));
725            }
726            _ => panic!("Expected InvalidInput variant, got {:?}", recovered),
727        }
728    }
729}