lance_core/
error.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::ArrowError;
5use snafu::{Location, Snafu};
6
7type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
8
9/// Allocates error on the heap and then places `e` into it.
10#[inline]
11pub fn box_error(e: impl std::error::Error + Send + Sync + 'static) -> BoxedError {
12    Box::new(e)
13}
14
15#[derive(Debug, Snafu)]
16#[snafu(visibility(pub))]
17pub enum Error {
18    #[snafu(display("Invalid user input: {source}, {location}"))]
19    InvalidInput {
20        source: BoxedError,
21        location: Location,
22    },
23    #[snafu(display("Dataset already exists: {uri}, {location}"))]
24    DatasetAlreadyExists { uri: String, location: Location },
25    #[snafu(display("Append with different schema: {difference}, location: {location}"))]
26    SchemaMismatch {
27        difference: String,
28        location: Location,
29    },
30    #[snafu(display("Dataset at path {path} was not found: {source}, {location}"))]
31    DatasetNotFound {
32        path: String,
33        source: BoxedError,
34        location: Location,
35    },
36    #[snafu(display("Encountered corrupt file {path}: {source}, {location}"))]
37    CorruptFile {
38        path: object_store::path::Path,
39        source: BoxedError,
40        location: Location,
41        // TODO: add backtrace?
42    },
43    #[snafu(display("Not supported: {source}, {location}"))]
44    NotSupported {
45        source: BoxedError,
46        location: Location,
47    },
48    #[snafu(display("Commit conflict for version {version}: {source}, {location}"))]
49    CommitConflict {
50        version: u64,
51        source: BoxedError,
52        location: Location,
53    },
54    #[snafu(display("Encountered internal error. Please file a bug report at https://github.com/lancedb/lance/issues. {message}, {location}"))]
55    Internal { message: String, location: Location },
56    #[snafu(display("A prerequisite task failed: {message}, {location}"))]
57    PrerequisiteFailed { message: String, location: Location },
58    #[snafu(display("LanceError(Arrow): {message}, {location}"))]
59    Arrow { message: String, location: Location },
60    #[snafu(display("LanceError(Schema): {message}, {location}"))]
61    Schema { message: String, location: Location },
62    #[snafu(display("Not found: {uri}, {location}"))]
63    NotFound { uri: String, location: Location },
64    #[snafu(display("LanceError(IO): {source}, {location}"))]
65    IO {
66        source: BoxedError,
67        location: Location,
68    },
69    #[snafu(display("LanceError(Index): {message}, {location}"))]
70    Index { message: String, location: Location },
71    #[snafu(display("Lance index not found: {identity}, {location}"))]
72    IndexNotFound {
73        identity: String,
74        location: Location,
75    },
76    #[snafu(display("Cannot infer storage location from: {message}"))]
77    InvalidTableLocation { message: String },
78    /// Stream early stop
79    Stop,
80    #[snafu(display("Wrapped error: {error}, {location}"))]
81    Wrapped {
82        error: BoxedError,
83        location: Location,
84    },
85    #[snafu(display("Cloned error: {message}, {location}"))]
86    Cloned { message: String, location: Location },
87    #[snafu(display("Query Execution error: {message}, {location}"))]
88    Execution { message: String, location: Location },
89    #[snafu(display("Ref is invalid: {message}"))]
90    InvalidRef { message: String },
91    #[snafu(display("Ref conflict error: {message}"))]
92    RefConflict { message: String },
93    #[snafu(display("Ref not found error: {message}"))]
94    RefNotFound { message: String },
95    #[snafu(display("Cleanup error: {message}"))]
96    Cleanup { message: String },
97    #[snafu(display("Version not found error: {message}"))]
98    VersionNotFound { message: String },
99    #[snafu(display("Version conflict error: {message}"))]
100    VersionConflict {
101        message: String,
102        major_version: u16,
103        minor_version: u16,
104        location: Location,
105    },
106}
107
108impl Error {
109    pub fn corrupt_file(
110        path: object_store::path::Path,
111        message: impl Into<String>,
112        location: Location,
113    ) -> Self {
114        let message: String = message.into();
115        Self::CorruptFile {
116            path,
117            source: message.into(),
118            location,
119        }
120    }
121
122    pub fn invalid_input(message: impl Into<String>, location: Location) -> Self {
123        let message: String = message.into();
124        Self::InvalidInput {
125            source: message.into(),
126            location,
127        }
128    }
129
130    pub fn io(message: impl Into<String>, location: Location) -> Self {
131        let message: String = message.into();
132        Self::IO {
133            source: message.into(),
134            location,
135        }
136    }
137
138    pub fn version_conflict(
139        message: impl Into<String>,
140        major_version: u16,
141        minor_version: u16,
142        location: Location,
143    ) -> Self {
144        let message: String = message.into();
145        Self::VersionConflict {
146            message,
147            major_version,
148            minor_version,
149            location,
150        }
151    }
152}
153
154pub trait LanceOptionExt<T> {
155    /// Unwraps an option, returning an internal error if the option is None.
156    ///
157    /// Can be used when an option is expected to have a value.
158    fn expect_ok(self) -> Result<T>;
159}
160
161impl<T> LanceOptionExt<T> for Option<T> {
162    #[track_caller]
163    fn expect_ok(self) -> Result<T> {
164        let location = std::panic::Location::caller().to_snafu_location();
165        self.ok_or_else(|| Error::Internal {
166            message: "Expected option to have value".to_string(),
167            location,
168        })
169    }
170}
171
172trait ToSnafuLocation {
173    fn to_snafu_location(&'static self) -> snafu::Location;
174}
175
176impl ToSnafuLocation for std::panic::Location<'static> {
177    fn to_snafu_location(&'static self) -> snafu::Location {
178        snafu::Location::new(self.file(), self.line(), self.column())
179    }
180}
181
182pub type Result<T> = std::result::Result<T, Error>;
183pub type ArrowResult<T> = std::result::Result<T, ArrowError>;
184#[cfg(feature = "datafusion")]
185pub type DataFusionResult<T> = std::result::Result<T, datafusion_common::DataFusionError>;
186
187impl From<ArrowError> for Error {
188    #[track_caller]
189    fn from(e: ArrowError) -> Self {
190        Self::Arrow {
191            message: e.to_string(),
192            location: std::panic::Location::caller().to_snafu_location(),
193        }
194    }
195}
196
197impl From<&ArrowError> for Error {
198    #[track_caller]
199    fn from(e: &ArrowError) -> Self {
200        Self::Arrow {
201            message: e.to_string(),
202            location: std::panic::Location::caller().to_snafu_location(),
203        }
204    }
205}
206
207impl From<std::io::Error> for Error {
208    #[track_caller]
209    fn from(e: std::io::Error) -> Self {
210        Self::IO {
211            source: box_error(e),
212            location: std::panic::Location::caller().to_snafu_location(),
213        }
214    }
215}
216
217impl From<object_store::Error> for Error {
218    #[track_caller]
219    fn from(e: object_store::Error) -> Self {
220        Self::IO {
221            source: box_error(e),
222            location: std::panic::Location::caller().to_snafu_location(),
223        }
224    }
225}
226
227impl From<prost::DecodeError> for Error {
228    #[track_caller]
229    fn from(e: prost::DecodeError) -> Self {
230        Self::IO {
231            source: box_error(e),
232            location: std::panic::Location::caller().to_snafu_location(),
233        }
234    }
235}
236
237impl From<prost::EncodeError> for Error {
238    #[track_caller]
239    fn from(e: prost::EncodeError) -> Self {
240        Self::IO {
241            source: box_error(e),
242            location: std::panic::Location::caller().to_snafu_location(),
243        }
244    }
245}
246
247impl From<prost::UnknownEnumValue> for Error {
248    #[track_caller]
249    fn from(e: prost::UnknownEnumValue) -> Self {
250        Self::IO {
251            source: box_error(e),
252            location: std::panic::Location::caller().to_snafu_location(),
253        }
254    }
255}
256
257impl From<tokio::task::JoinError> for Error {
258    #[track_caller]
259    fn from(e: tokio::task::JoinError) -> Self {
260        Self::IO {
261            source: box_error(e),
262            location: std::panic::Location::caller().to_snafu_location(),
263        }
264    }
265}
266
267impl From<object_store::path::Error> for Error {
268    #[track_caller]
269    fn from(e: object_store::path::Error) -> Self {
270        Self::IO {
271            source: box_error(e),
272            location: std::panic::Location::caller().to_snafu_location(),
273        }
274    }
275}
276
277impl From<url::ParseError> for Error {
278    #[track_caller]
279    fn from(e: url::ParseError) -> Self {
280        Self::IO {
281            source: box_error(e),
282            location: std::panic::Location::caller().to_snafu_location(),
283        }
284    }
285}
286
287impl From<serde_json::Error> for Error {
288    #[track_caller]
289    fn from(e: serde_json::Error) -> Self {
290        Self::Arrow {
291            message: e.to_string(),
292            location: std::panic::Location::caller().to_snafu_location(),
293        }
294    }
295}
296
297#[track_caller]
298fn arrow_io_error_from_msg(message: String) -> ArrowError {
299    ArrowError::IoError(
300        message.clone(),
301        std::io::Error::new(std::io::ErrorKind::Other, message),
302    )
303}
304
305impl From<Error> for ArrowError {
306    fn from(value: Error) -> Self {
307        match value {
308            Error::Arrow { message, .. } => arrow_io_error_from_msg(message), // we lose the error type converting to LanceError
309            Error::IO { source, .. } => arrow_io_error_from_msg(source.to_string()),
310            Error::Schema { message, .. } => Self::SchemaError(message),
311            Error::Index { message, .. } => arrow_io_error_from_msg(message),
312            Error::Stop => arrow_io_error_from_msg("early stop".to_string()),
313            e => arrow_io_error_from_msg(e.to_string()), // Find a more scalable way of doing this
314        }
315    }
316}
317
318#[cfg(feature = "datafusion")]
319impl From<datafusion_sql::sqlparser::parser::ParserError> for Error {
320    #[track_caller]
321    fn from(e: datafusion_sql::sqlparser::parser::ParserError) -> Self {
322        Self::IO {
323            source: box_error(e),
324            location: std::panic::Location::caller().to_snafu_location(),
325        }
326    }
327}
328
329#[cfg(feature = "datafusion")]
330impl From<datafusion_sql::sqlparser::tokenizer::TokenizerError> for Error {
331    #[track_caller]
332    fn from(e: datafusion_sql::sqlparser::tokenizer::TokenizerError) -> Self {
333        Self::IO {
334            source: box_error(e),
335            location: std::panic::Location::caller().to_snafu_location(),
336        }
337    }
338}
339
340#[cfg(feature = "datafusion")]
341impl From<Error> for datafusion_common::DataFusionError {
342    #[track_caller]
343    fn from(e: Error) -> Self {
344        Self::Execution(e.to_string())
345    }
346}
347
348#[cfg(feature = "datafusion")]
349impl From<datafusion_common::DataFusionError> for Error {
350    #[track_caller]
351    fn from(e: datafusion_common::DataFusionError) -> Self {
352        let location = std::panic::Location::caller().to_snafu_location();
353        match e {
354            datafusion_common::DataFusionError::SQL(..)
355            | datafusion_common::DataFusionError::Plan(..)
356            | datafusion_common::DataFusionError::Configuration(..) => Self::InvalidInput {
357                source: box_error(e),
358                location,
359            },
360            datafusion_common::DataFusionError::SchemaError(..) => Self::Schema {
361                message: e.to_string(),
362                location,
363            },
364            datafusion_common::DataFusionError::ArrowError(..) => Self::Arrow {
365                message: e.to_string(),
366                location,
367            },
368            datafusion_common::DataFusionError::NotImplemented(..) => Self::NotSupported {
369                source: box_error(e),
370                location,
371            },
372            datafusion_common::DataFusionError::Execution(..) => Self::Execution {
373                message: e.to_string(),
374                location,
375            },
376            _ => Self::IO {
377                source: box_error(e),
378                location,
379            },
380        }
381    }
382}
383
384// This is a bit odd but some object_store functions only accept
385// Stream<Result<T, ObjectStoreError>> and so we need to convert
386// to ObjectStoreError to call the methods.
387impl From<Error> for object_store::Error {
388    fn from(err: Error) -> Self {
389        Self::Generic {
390            store: "N/A",
391            source: Box::new(err),
392        }
393    }
394}
395
396#[track_caller]
397pub fn get_caller_location() -> &'static std::panic::Location<'static> {
398    std::panic::Location::caller()
399}
400
401/// Wrap an error in a new error type that implements Clone
402///
403/// This is useful when two threads/streams share a common fallible source
404/// The base error will always have the full error.  Any cloned results will
405/// only have Error::Cloned with the to_string of the base error.
406pub struct CloneableError(pub Error);
407
408impl Clone for CloneableError {
409    #[track_caller]
410    fn clone(&self) -> Self {
411        Self(Error::Cloned {
412            message: self.0.to_string(),
413            location: std::panic::Location::caller().to_snafu_location(),
414        })
415    }
416}
417
418#[derive(Clone)]
419pub struct CloneableResult<T: Clone>(pub std::result::Result<T, CloneableError>);
420
421impl<T: Clone> From<Result<T>> for CloneableResult<T> {
422    fn from(result: Result<T>) -> Self {
423        Self(result.map_err(CloneableError))
424    }
425}
426
427#[cfg(test)]
428mod test {
429    use super::*;
430
431    #[test]
432    fn test_caller_location_capture() {
433        let current_fn = get_caller_location();
434        // make sure ? captures the correct location
435        // .into() WILL NOT capture the correct location
436        let f: Box<dyn Fn() -> Result<()>> = Box::new(|| {
437            Err(object_store::Error::Generic {
438                store: "",
439                source: "".into(),
440            })?;
441            Ok(())
442        });
443        match f().unwrap_err() {
444            Error::IO { location, .. } => {
445                // +4 is the beginning of object_store::Error::Generic...
446                assert_eq!(location.line, current_fn.line() + 4, "{}", location)
447            }
448            #[allow(unreachable_patterns)]
449            _ => panic!("expected ObjectStore error"),
450        }
451    }
452}