lance_core/
error.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::ArrowError;
5use snafu::{Location, Snafu};
6
7type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
8
9/// Allocates error on the heap and then places `e` into it.
10#[inline]
11pub fn box_error(e: impl std::error::Error + Send + Sync + 'static) -> BoxedError {
12    Box::new(e)
13}
14
15#[derive(Debug, Snafu)]
16#[snafu(visibility(pub))]
17pub enum Error {
18    #[snafu(display("Invalid user input: {source}, {location}"))]
19    InvalidInput {
20        source: BoxedError,
21        location: Location,
22    },
23    #[snafu(display("Dataset already exists: {uri}, {location}"))]
24    DatasetAlreadyExists { uri: String, location: Location },
25    #[snafu(display("Append with different schema: {difference}, location: {location}"))]
26    SchemaMismatch {
27        difference: String,
28        location: Location,
29    },
30    #[snafu(display("Dataset at path {path} was not found: {source}, {location}"))]
31    DatasetNotFound {
32        path: String,
33        source: BoxedError,
34        location: Location,
35    },
36    #[snafu(display("Encountered corrupt file {path}: {source}, {location}"))]
37    CorruptFile {
38        path: object_store::path::Path,
39        source: BoxedError,
40        location: Location,
41        // TODO: add backtrace?
42    },
43    #[snafu(display("Not supported: {source}, {location}"))]
44    NotSupported {
45        source: BoxedError,
46        location: Location,
47    },
48    #[snafu(display("Commit conflict for version {version}: {source}, {location}"))]
49    CommitConflict {
50        version: u64,
51        source: BoxedError,
52        location: Location,
53    },
54    #[snafu(display("Retryable commit conflict for version {version}: {source}, {location}"))]
55    RetryableCommitConflict {
56        version: u64,
57        source: BoxedError,
58        location: Location,
59    },
60    #[snafu(display("Too many concurrent writers. {message}, {location}"))]
61    TooMuchWriteContention { message: String, location: Location },
62    #[snafu(display("Encountered internal error. Please file a bug report at https://github.com/lance-format/lance/issues. {message}, {location}"))]
63    Internal { message: String, location: Location },
64    #[snafu(display("A prerequisite task failed: {message}, {location}"))]
65    PrerequisiteFailed { message: String, location: Location },
66    #[snafu(display("Unprocessable: {message}, {location}"))]
67    Unprocessable { message: String, location: Location },
68    #[snafu(display("LanceError(Arrow): {message}, {location}"))]
69    Arrow { message: String, location: Location },
70    #[snafu(display("LanceError(Schema): {message}, {location}"))]
71    Schema { message: String, location: Location },
72    #[snafu(display("Not found: {uri}, {location}"))]
73    NotFound { uri: String, location: Location },
74    #[snafu(display("LanceError(IO): {source}, {location}"))]
75    IO {
76        source: BoxedError,
77        location: Location,
78    },
79    #[snafu(display("LanceError(Index): {message}, {location}"))]
80    Index { message: String, location: Location },
81    #[snafu(display("Lance index not found: {identity}, {location}"))]
82    IndexNotFound {
83        identity: String,
84        location: Location,
85    },
86    #[snafu(display("Cannot infer storage location from: {message}"))]
87    InvalidTableLocation { message: String },
88    /// Stream early stop
89    Stop,
90    #[snafu(display("Wrapped error: {error}, {location}"))]
91    Wrapped {
92        error: BoxedError,
93        location: Location,
94    },
95    #[snafu(display("Cloned error: {message}, {location}"))]
96    Cloned { message: String, location: Location },
97    #[snafu(display("Query Execution error: {message}, {location}"))]
98    Execution { message: String, location: Location },
99    #[snafu(display("Ref is invalid: {message}"))]
100    InvalidRef { message: String },
101    #[snafu(display("Ref conflict error: {message}"))]
102    RefConflict { message: String },
103    #[snafu(display("Ref not found error: {message}"))]
104    RefNotFound { message: String },
105    #[snafu(display("Cleanup error: {message}"))]
106    Cleanup { message: String },
107    #[snafu(display("Version not found error: {message}"))]
108    VersionNotFound { message: String },
109    #[snafu(display("Version conflict error: {message}"))]
110    VersionConflict {
111        message: String,
112        major_version: u16,
113        minor_version: u16,
114        location: Location,
115    },
116    #[snafu(display("Namespace error: {source}, {location}"))]
117    Namespace {
118        source: BoxedError,
119        location: Location,
120    },
121}
122
123impl Error {
124    pub fn corrupt_file(
125        path: object_store::path::Path,
126        message: impl Into<String>,
127        location: Location,
128    ) -> Self {
129        let message: String = message.into();
130        Self::CorruptFile {
131            path,
132            source: message.into(),
133            location,
134        }
135    }
136
137    pub fn invalid_input(message: impl Into<String>, location: Location) -> Self {
138        let message: String = message.into();
139        Self::InvalidInput {
140            source: message.into(),
141            location,
142        }
143    }
144
145    pub fn io(message: impl Into<String>, location: Location) -> Self {
146        let message: String = message.into();
147        Self::IO {
148            source: message.into(),
149            location,
150        }
151    }
152
153    pub fn version_conflict(
154        message: impl Into<String>,
155        major_version: u16,
156        minor_version: u16,
157        location: Location,
158    ) -> Self {
159        let message: String = message.into();
160        Self::VersionConflict {
161            message,
162            major_version,
163            minor_version,
164            location,
165        }
166    }
167}
168
169pub trait LanceOptionExt<T> {
170    /// Unwraps an option, returning an internal error if the option is None.
171    ///
172    /// Can be used when an option is expected to have a value.
173    fn expect_ok(self) -> Result<T>;
174}
175
176impl<T> LanceOptionExt<T> for Option<T> {
177    #[track_caller]
178    fn expect_ok(self) -> Result<T> {
179        let location = std::panic::Location::caller().to_snafu_location();
180        self.ok_or_else(|| Error::Internal {
181            message: "Expected option to have value".to_string(),
182            location,
183        })
184    }
185}
186
187trait ToSnafuLocation {
188    fn to_snafu_location(&'static self) -> snafu::Location;
189}
190
191impl ToSnafuLocation for std::panic::Location<'static> {
192    fn to_snafu_location(&'static self) -> snafu::Location {
193        snafu::Location::new(self.file(), self.line(), self.column())
194    }
195}
196
197pub type Result<T> = std::result::Result<T, Error>;
198pub type ArrowResult<T> = std::result::Result<T, ArrowError>;
199#[cfg(feature = "datafusion")]
200pub type DataFusionResult<T> = std::result::Result<T, datafusion_common::DataFusionError>;
201
202impl From<ArrowError> for Error {
203    #[track_caller]
204    fn from(e: ArrowError) -> Self {
205        Self::Arrow {
206            message: e.to_string(),
207            location: std::panic::Location::caller().to_snafu_location(),
208        }
209    }
210}
211
212impl From<&ArrowError> for Error {
213    #[track_caller]
214    fn from(e: &ArrowError) -> Self {
215        Self::Arrow {
216            message: e.to_string(),
217            location: std::panic::Location::caller().to_snafu_location(),
218        }
219    }
220}
221
222impl From<std::io::Error> for Error {
223    #[track_caller]
224    fn from(e: std::io::Error) -> Self {
225        Self::IO {
226            source: box_error(e),
227            location: std::panic::Location::caller().to_snafu_location(),
228        }
229    }
230}
231
232impl From<object_store::Error> for Error {
233    #[track_caller]
234    fn from(e: object_store::Error) -> Self {
235        Self::IO {
236            source: box_error(e),
237            location: std::panic::Location::caller().to_snafu_location(),
238        }
239    }
240}
241
242impl From<prost::DecodeError> for Error {
243    #[track_caller]
244    fn from(e: prost::DecodeError) -> Self {
245        Self::IO {
246            source: box_error(e),
247            location: std::panic::Location::caller().to_snafu_location(),
248        }
249    }
250}
251
252impl From<prost::EncodeError> for Error {
253    #[track_caller]
254    fn from(e: prost::EncodeError) -> Self {
255        Self::IO {
256            source: box_error(e),
257            location: std::panic::Location::caller().to_snafu_location(),
258        }
259    }
260}
261
262impl From<prost::UnknownEnumValue> for Error {
263    #[track_caller]
264    fn from(e: prost::UnknownEnumValue) -> Self {
265        Self::IO {
266            source: box_error(e),
267            location: std::panic::Location::caller().to_snafu_location(),
268        }
269    }
270}
271
272impl From<tokio::task::JoinError> for Error {
273    #[track_caller]
274    fn from(e: tokio::task::JoinError) -> Self {
275        Self::IO {
276            source: box_error(e),
277            location: std::panic::Location::caller().to_snafu_location(),
278        }
279    }
280}
281
282impl From<object_store::path::Error> for Error {
283    #[track_caller]
284    fn from(e: object_store::path::Error) -> Self {
285        Self::IO {
286            source: box_error(e),
287            location: std::panic::Location::caller().to_snafu_location(),
288        }
289    }
290}
291
292impl From<url::ParseError> for Error {
293    #[track_caller]
294    fn from(e: url::ParseError) -> Self {
295        Self::IO {
296            source: box_error(e),
297            location: std::panic::Location::caller().to_snafu_location(),
298        }
299    }
300}
301
302impl From<serde_json::Error> for Error {
303    #[track_caller]
304    fn from(e: serde_json::Error) -> Self {
305        Self::Arrow {
306            message: e.to_string(),
307            location: std::panic::Location::caller().to_snafu_location(),
308        }
309    }
310}
311
312#[track_caller]
313fn arrow_io_error_from_msg(message: String) -> ArrowError {
314    ArrowError::IoError(message.clone(), std::io::Error::other(message))
315}
316
317impl From<Error> for ArrowError {
318    fn from(value: Error) -> Self {
319        match value {
320            Error::Arrow { message, .. } => arrow_io_error_from_msg(message), // we lose the error type converting to LanceError
321            Error::IO { source, .. } => arrow_io_error_from_msg(source.to_string()),
322            Error::Schema { message, .. } => Self::SchemaError(message),
323            Error::Index { message, .. } => arrow_io_error_from_msg(message),
324            Error::Stop => arrow_io_error_from_msg("early stop".to_string()),
325            e => arrow_io_error_from_msg(e.to_string()), // Find a more scalable way of doing this
326        }
327    }
328}
329
330#[cfg(feature = "datafusion")]
331impl From<datafusion_sql::sqlparser::parser::ParserError> for Error {
332    #[track_caller]
333    fn from(e: datafusion_sql::sqlparser::parser::ParserError) -> Self {
334        Self::IO {
335            source: box_error(e),
336            location: std::panic::Location::caller().to_snafu_location(),
337        }
338    }
339}
340
341#[cfg(feature = "datafusion")]
342impl From<datafusion_sql::sqlparser::tokenizer::TokenizerError> for Error {
343    #[track_caller]
344    fn from(e: datafusion_sql::sqlparser::tokenizer::TokenizerError) -> Self {
345        Self::IO {
346            source: box_error(e),
347            location: std::panic::Location::caller().to_snafu_location(),
348        }
349    }
350}
351
352#[cfg(feature = "datafusion")]
353impl From<Error> for datafusion_common::DataFusionError {
354    #[track_caller]
355    fn from(e: Error) -> Self {
356        Self::Execution(e.to_string())
357    }
358}
359
360#[cfg(feature = "datafusion")]
361impl From<datafusion_common::DataFusionError> for Error {
362    #[track_caller]
363    fn from(e: datafusion_common::DataFusionError) -> Self {
364        let location = std::panic::Location::caller().to_snafu_location();
365        match e {
366            datafusion_common::DataFusionError::SQL(..)
367            | datafusion_common::DataFusionError::Plan(..)
368            | datafusion_common::DataFusionError::Configuration(..) => Self::InvalidInput {
369                source: box_error(e),
370                location,
371            },
372            datafusion_common::DataFusionError::SchemaError(..) => Self::Schema {
373                message: e.to_string(),
374                location,
375            },
376            datafusion_common::DataFusionError::ArrowError(..) => Self::Arrow {
377                message: e.to_string(),
378                location,
379            },
380            datafusion_common::DataFusionError::NotImplemented(..) => Self::NotSupported {
381                source: box_error(e),
382                location,
383            },
384            datafusion_common::DataFusionError::Execution(..) => Self::Execution {
385                message: e.to_string(),
386                location,
387            },
388            _ => Self::IO {
389                source: box_error(e),
390                location,
391            },
392        }
393    }
394}
395
396// This is a bit odd but some object_store functions only accept
397// Stream<Result<T, ObjectStoreError>> and so we need to convert
398// to ObjectStoreError to call the methods.
399impl From<Error> for object_store::Error {
400    fn from(err: Error) -> Self {
401        Self::Generic {
402            store: "N/A",
403            source: Box::new(err),
404        }
405    }
406}
407
408#[track_caller]
409pub fn get_caller_location() -> &'static std::panic::Location<'static> {
410    std::panic::Location::caller()
411}
412
413/// Wrap an error in a new error type that implements Clone
414///
415/// This is useful when two threads/streams share a common fallible source
416/// The base error will always have the full error.  Any cloned results will
417/// only have Error::Cloned with the to_string of the base error.
418pub struct CloneableError(pub Error);
419
420impl Clone for CloneableError {
421    #[track_caller]
422    fn clone(&self) -> Self {
423        Self(Error::Cloned {
424            message: self.0.to_string(),
425            location: std::panic::Location::caller().to_snafu_location(),
426        })
427    }
428}
429
430#[derive(Clone)]
431pub struct CloneableResult<T: Clone>(pub std::result::Result<T, CloneableError>);
432
433impl<T: Clone> From<Result<T>> for CloneableResult<T> {
434    fn from(result: Result<T>) -> Self {
435        Self(result.map_err(CloneableError))
436    }
437}
438
439#[cfg(test)]
440mod test {
441    use super::*;
442
443    #[test]
444    fn test_caller_location_capture() {
445        let current_fn = get_caller_location();
446        // make sure ? captures the correct location
447        // .into() WILL NOT capture the correct location
448        let f: Box<dyn Fn() -> Result<()>> = Box::new(|| {
449            Err(object_store::Error::Generic {
450                store: "",
451                source: "".into(),
452            })?;
453            Ok(())
454        });
455        match f().unwrap_err() {
456            Error::IO { location, .. } => {
457                // +4 is the beginning of object_store::Error::Generic...
458                assert_eq!(location.line, current_fn.line() + 4, "{}", location)
459            }
460            #[allow(unreachable_patterns)]
461            _ => panic!("expected ObjectStore error"),
462        }
463    }
464}