lance_core/
error.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::ArrowError;
5use snafu::{Location, Snafu};
6
7type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
8
9/// Allocates error on the heap and then places `e` into it.
10#[inline]
11pub fn box_error(e: impl std::error::Error + Send + Sync + 'static) -> BoxedError {
12    Box::new(e)
13}
14
15#[derive(Debug, Snafu)]
16#[snafu(visibility(pub))]
17pub enum Error {
18    #[snafu(display("Invalid user input: {source}, {location}"))]
19    InvalidInput {
20        source: BoxedError,
21        location: Location,
22    },
23    #[snafu(display("Dataset already exists: {uri}, {location}"))]
24    DatasetAlreadyExists { uri: String, location: Location },
25    #[snafu(display("Append with different schema: {difference}, location: {location}"))]
26    SchemaMismatch {
27        difference: String,
28        location: Location,
29    },
30    #[snafu(display("Dataset at path {path} was not found: {source}, {location}"))]
31    DatasetNotFound {
32        path: String,
33        source: BoxedError,
34        location: Location,
35    },
36    #[snafu(display("Encountered corrupt file {path}: {source}, {location}"))]
37    CorruptFile {
38        path: object_store::path::Path,
39        source: BoxedError,
40        location: Location,
41        // TODO: add backtrace?
42    },
43    #[snafu(display("Not supported: {source}, {location}"))]
44    NotSupported {
45        source: BoxedError,
46        location: Location,
47    },
48    #[snafu(display("Commit conflict for version {version}: {source}, {location}"))]
49    CommitConflict {
50        version: u64,
51        source: BoxedError,
52        location: Location,
53    },
54    #[snafu(display("Retryable commit conflict for version {version}: {source}, {location}"))]
55    RetryableCommitConflict {
56        version: u64,
57        source: BoxedError,
58        location: Location,
59    },
60    #[snafu(display("Too many concurrent writers. {message}, {location}"))]
61    TooMuchWriteContention { message: String, location: Location },
62    #[snafu(display("Encountered internal error. Please file a bug report at https://github.com/lancedb/lance/issues. {message}, {location}"))]
63    Internal { message: String, location: Location },
64    #[snafu(display("A prerequisite task failed: {message}, {location}"))]
65    PrerequisiteFailed { message: String, location: Location },
66    #[snafu(display("LanceError(Arrow): {message}, {location}"))]
67    Arrow { message: String, location: Location },
68    #[snafu(display("LanceError(Schema): {message}, {location}"))]
69    Schema { message: String, location: Location },
70    #[snafu(display("Not found: {uri}, {location}"))]
71    NotFound { uri: String, location: Location },
72    #[snafu(display("LanceError(IO): {source}, {location}"))]
73    IO {
74        source: BoxedError,
75        location: Location,
76    },
77    #[snafu(display("LanceError(Index): {message}, {location}"))]
78    Index { message: String, location: Location },
79    #[snafu(display("Lance index not found: {identity}, {location}"))]
80    IndexNotFound {
81        identity: String,
82        location: Location,
83    },
84    #[snafu(display("Cannot infer storage location from: {message}"))]
85    InvalidTableLocation { message: String },
86    /// Stream early stop
87    Stop,
88    #[snafu(display("Wrapped error: {error}, {location}"))]
89    Wrapped {
90        error: BoxedError,
91        location: Location,
92    },
93    #[snafu(display("Cloned error: {message}, {location}"))]
94    Cloned { message: String, location: Location },
95    #[snafu(display("Query Execution error: {message}, {location}"))]
96    Execution { message: String, location: Location },
97    #[snafu(display("Ref is invalid: {message}"))]
98    InvalidRef { message: String },
99    #[snafu(display("Ref conflict error: {message}"))]
100    RefConflict { message: String },
101    #[snafu(display("Ref not found error: {message}"))]
102    RefNotFound { message: String },
103    #[snafu(display("Cleanup error: {message}"))]
104    Cleanup { message: String },
105    #[snafu(display("Version not found error: {message}"))]
106    VersionNotFound { message: String },
107    #[snafu(display("Version conflict error: {message}"))]
108    VersionConflict {
109        message: String,
110        major_version: u16,
111        minor_version: u16,
112        location: Location,
113    },
114}
115
116impl Error {
117    pub fn corrupt_file(
118        path: object_store::path::Path,
119        message: impl Into<String>,
120        location: Location,
121    ) -> Self {
122        let message: String = message.into();
123        Self::CorruptFile {
124            path,
125            source: message.into(),
126            location,
127        }
128    }
129
130    pub fn invalid_input(message: impl Into<String>, location: Location) -> Self {
131        let message: String = message.into();
132        Self::InvalidInput {
133            source: message.into(),
134            location,
135        }
136    }
137
138    pub fn io(message: impl Into<String>, location: Location) -> Self {
139        let message: String = message.into();
140        Self::IO {
141            source: message.into(),
142            location,
143        }
144    }
145
146    pub fn version_conflict(
147        message: impl Into<String>,
148        major_version: u16,
149        minor_version: u16,
150        location: Location,
151    ) -> Self {
152        let message: String = message.into();
153        Self::VersionConflict {
154            message,
155            major_version,
156            minor_version,
157            location,
158        }
159    }
160}
161
162pub trait LanceOptionExt<T> {
163    /// Unwraps an option, returning an internal error if the option is None.
164    ///
165    /// Can be used when an option is expected to have a value.
166    fn expect_ok(self) -> Result<T>;
167}
168
169impl<T> LanceOptionExt<T> for Option<T> {
170    #[track_caller]
171    fn expect_ok(self) -> Result<T> {
172        let location = std::panic::Location::caller().to_snafu_location();
173        self.ok_or_else(|| Error::Internal {
174            message: "Expected option to have value".to_string(),
175            location,
176        })
177    }
178}
179
180trait ToSnafuLocation {
181    fn to_snafu_location(&'static self) -> snafu::Location;
182}
183
184impl ToSnafuLocation for std::panic::Location<'static> {
185    fn to_snafu_location(&'static self) -> snafu::Location {
186        snafu::Location::new(self.file(), self.line(), self.column())
187    }
188}
189
190pub type Result<T> = std::result::Result<T, Error>;
191pub type ArrowResult<T> = std::result::Result<T, ArrowError>;
192#[cfg(feature = "datafusion")]
193pub type DataFusionResult<T> = std::result::Result<T, datafusion_common::DataFusionError>;
194
195impl From<ArrowError> for Error {
196    #[track_caller]
197    fn from(e: ArrowError) -> Self {
198        Self::Arrow {
199            message: e.to_string(),
200            location: std::panic::Location::caller().to_snafu_location(),
201        }
202    }
203}
204
205impl From<&ArrowError> for Error {
206    #[track_caller]
207    fn from(e: &ArrowError) -> Self {
208        Self::Arrow {
209            message: e.to_string(),
210            location: std::panic::Location::caller().to_snafu_location(),
211        }
212    }
213}
214
215impl From<std::io::Error> for Error {
216    #[track_caller]
217    fn from(e: std::io::Error) -> Self {
218        Self::IO {
219            source: box_error(e),
220            location: std::panic::Location::caller().to_snafu_location(),
221        }
222    }
223}
224
225impl From<object_store::Error> for Error {
226    #[track_caller]
227    fn from(e: object_store::Error) -> Self {
228        Self::IO {
229            source: box_error(e),
230            location: std::panic::Location::caller().to_snafu_location(),
231        }
232    }
233}
234
235impl From<prost::DecodeError> for Error {
236    #[track_caller]
237    fn from(e: prost::DecodeError) -> Self {
238        Self::IO {
239            source: box_error(e),
240            location: std::panic::Location::caller().to_snafu_location(),
241        }
242    }
243}
244
245impl From<prost::EncodeError> for Error {
246    #[track_caller]
247    fn from(e: prost::EncodeError) -> Self {
248        Self::IO {
249            source: box_error(e),
250            location: std::panic::Location::caller().to_snafu_location(),
251        }
252    }
253}
254
255impl From<prost::UnknownEnumValue> for Error {
256    #[track_caller]
257    fn from(e: prost::UnknownEnumValue) -> Self {
258        Self::IO {
259            source: box_error(e),
260            location: std::panic::Location::caller().to_snafu_location(),
261        }
262    }
263}
264
265impl From<tokio::task::JoinError> for Error {
266    #[track_caller]
267    fn from(e: tokio::task::JoinError) -> Self {
268        Self::IO {
269            source: box_error(e),
270            location: std::panic::Location::caller().to_snafu_location(),
271        }
272    }
273}
274
275impl From<object_store::path::Error> for Error {
276    #[track_caller]
277    fn from(e: object_store::path::Error) -> Self {
278        Self::IO {
279            source: box_error(e),
280            location: std::panic::Location::caller().to_snafu_location(),
281        }
282    }
283}
284
285impl From<url::ParseError> for Error {
286    #[track_caller]
287    fn from(e: url::ParseError) -> Self {
288        Self::IO {
289            source: box_error(e),
290            location: std::panic::Location::caller().to_snafu_location(),
291        }
292    }
293}
294
295impl From<serde_json::Error> for Error {
296    #[track_caller]
297    fn from(e: serde_json::Error) -> Self {
298        Self::Arrow {
299            message: e.to_string(),
300            location: std::panic::Location::caller().to_snafu_location(),
301        }
302    }
303}
304
305#[track_caller]
306fn arrow_io_error_from_msg(message: String) -> ArrowError {
307    ArrowError::IoError(message.clone(), std::io::Error::other(message))
308}
309
310impl From<Error> for ArrowError {
311    fn from(value: Error) -> Self {
312        match value {
313            Error::Arrow { message, .. } => arrow_io_error_from_msg(message), // we lose the error type converting to LanceError
314            Error::IO { source, .. } => arrow_io_error_from_msg(source.to_string()),
315            Error::Schema { message, .. } => Self::SchemaError(message),
316            Error::Index { message, .. } => arrow_io_error_from_msg(message),
317            Error::Stop => arrow_io_error_from_msg("early stop".to_string()),
318            e => arrow_io_error_from_msg(e.to_string()), // Find a more scalable way of doing this
319        }
320    }
321}
322
323#[cfg(feature = "datafusion")]
324impl From<datafusion_sql::sqlparser::parser::ParserError> for Error {
325    #[track_caller]
326    fn from(e: datafusion_sql::sqlparser::parser::ParserError) -> Self {
327        Self::IO {
328            source: box_error(e),
329            location: std::panic::Location::caller().to_snafu_location(),
330        }
331    }
332}
333
334#[cfg(feature = "datafusion")]
335impl From<datafusion_sql::sqlparser::tokenizer::TokenizerError> for Error {
336    #[track_caller]
337    fn from(e: datafusion_sql::sqlparser::tokenizer::TokenizerError) -> Self {
338        Self::IO {
339            source: box_error(e),
340            location: std::panic::Location::caller().to_snafu_location(),
341        }
342    }
343}
344
345#[cfg(feature = "datafusion")]
346impl From<Error> for datafusion_common::DataFusionError {
347    #[track_caller]
348    fn from(e: Error) -> Self {
349        Self::Execution(e.to_string())
350    }
351}
352
353#[cfg(feature = "datafusion")]
354impl From<datafusion_common::DataFusionError> for Error {
355    #[track_caller]
356    fn from(e: datafusion_common::DataFusionError) -> Self {
357        let location = std::panic::Location::caller().to_snafu_location();
358        match e {
359            datafusion_common::DataFusionError::SQL(..)
360            | datafusion_common::DataFusionError::Plan(..)
361            | datafusion_common::DataFusionError::Configuration(..) => Self::InvalidInput {
362                source: box_error(e),
363                location,
364            },
365            datafusion_common::DataFusionError::SchemaError(..) => Self::Schema {
366                message: e.to_string(),
367                location,
368            },
369            datafusion_common::DataFusionError::ArrowError(..) => Self::Arrow {
370                message: e.to_string(),
371                location,
372            },
373            datafusion_common::DataFusionError::NotImplemented(..) => Self::NotSupported {
374                source: box_error(e),
375                location,
376            },
377            datafusion_common::DataFusionError::Execution(..) => Self::Execution {
378                message: e.to_string(),
379                location,
380            },
381            _ => Self::IO {
382                source: box_error(e),
383                location,
384            },
385        }
386    }
387}
388
389// This is a bit odd but some object_store functions only accept
390// Stream<Result<T, ObjectStoreError>> and so we need to convert
391// to ObjectStoreError to call the methods.
392impl From<Error> for object_store::Error {
393    fn from(err: Error) -> Self {
394        Self::Generic {
395            store: "N/A",
396            source: Box::new(err),
397        }
398    }
399}
400
401#[track_caller]
402pub fn get_caller_location() -> &'static std::panic::Location<'static> {
403    std::panic::Location::caller()
404}
405
406/// Wrap an error in a new error type that implements Clone
407///
408/// This is useful when two threads/streams share a common fallible source
409/// The base error will always have the full error.  Any cloned results will
410/// only have Error::Cloned with the to_string of the base error.
411pub struct CloneableError(pub Error);
412
413impl Clone for CloneableError {
414    #[track_caller]
415    fn clone(&self) -> Self {
416        Self(Error::Cloned {
417            message: self.0.to_string(),
418            location: std::panic::Location::caller().to_snafu_location(),
419        })
420    }
421}
422
423#[derive(Clone)]
424pub struct CloneableResult<T: Clone>(pub std::result::Result<T, CloneableError>);
425
426impl<T: Clone> From<Result<T>> for CloneableResult<T> {
427    fn from(result: Result<T>) -> Self {
428        Self(result.map_err(CloneableError))
429    }
430}
431
432#[cfg(test)]
433mod test {
434    use super::*;
435
436    #[test]
437    fn test_caller_location_capture() {
438        let current_fn = get_caller_location();
439        // make sure ? captures the correct location
440        // .into() WILL NOT capture the correct location
441        let f: Box<dyn Fn() -> Result<()>> = Box::new(|| {
442            Err(object_store::Error::Generic {
443                store: "",
444                source: "".into(),
445            })?;
446            Ok(())
447        });
448        match f().unwrap_err() {
449            Error::IO { location, .. } => {
450                // +4 is the beginning of object_store::Error::Generic...
451                assert_eq!(location.line, current_fn.line() + 4, "{}", location)
452            }
453            #[allow(unreachable_patterns)]
454            _ => panic!("expected ObjectStore error"),
455        }
456    }
457}