lance_core/
error.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_schema::ArrowError;
5use snafu::{Location, Snafu};
6
7type BoxedError = Box<dyn std::error::Error + Send + Sync + 'static>;
8
9/// Allocates error on the heap and then places `e` into it.
10#[inline]
11pub fn box_error(e: impl std::error::Error + Send + Sync + 'static) -> BoxedError {
12    Box::new(e)
13}
14
15#[derive(Debug, Snafu)]
16#[snafu(visibility(pub))]
17pub enum Error {
18    #[snafu(display("Invalid user input: {source}, {location}"))]
19    InvalidInput {
20        source: BoxedError,
21        location: Location,
22    },
23    #[snafu(display("Dataset already exists: {uri}, {location}"))]
24    DatasetAlreadyExists { uri: String, location: Location },
25    #[snafu(display("Append with different schema: {difference}, location: {location}"))]
26    SchemaMismatch {
27        difference: String,
28        location: Location,
29    },
30    #[snafu(display("Dataset at path {path} was not found: {source}, {location}"))]
31    DatasetNotFound {
32        path: String,
33        source: BoxedError,
34        location: Location,
35    },
36    #[snafu(display("Encountered corrupt file {path}: {source}, {location}"))]
37    CorruptFile {
38        path: object_store::path::Path,
39        source: BoxedError,
40        location: Location,
41        // TODO: add backtrace?
42    },
43    #[snafu(display("Not supported: {source}, {location}"))]
44    NotSupported {
45        source: BoxedError,
46        location: Location,
47    },
48    #[snafu(display("Commit conflict for version {version}: {source}, {location}"))]
49    CommitConflict {
50        version: u64,
51        source: BoxedError,
52        location: Location,
53    },
54    #[snafu(display("Retryable commit conflict for version {version}: {source}, {location}"))]
55    RetryableCommitConflict {
56        version: u64,
57        source: BoxedError,
58        location: Location,
59    },
60    #[snafu(display("Too many concurrent writers. {message}, {location}"))]
61    TooMuchWriteContention { message: String, location: Location },
62    #[snafu(display("Encountered internal error. Please file a bug report at https://github.com/lancedb/lance/issues. {message}, {location}"))]
63    Internal { message: String, location: Location },
64    #[snafu(display("A prerequisite task failed: {message}, {location}"))]
65    PrerequisiteFailed { message: String, location: Location },
66    #[snafu(display("LanceError(Arrow): {message}, {location}"))]
67    Arrow { message: String, location: Location },
68    #[snafu(display("LanceError(Schema): {message}, {location}"))]
69    Schema { message: String, location: Location },
70    #[snafu(display("Not found: {uri}, {location}"))]
71    NotFound { uri: String, location: Location },
72    #[snafu(display("LanceError(IO): {source}, {location}"))]
73    IO {
74        source: BoxedError,
75        location: Location,
76    },
77    #[snafu(display("LanceError(Index): {message}, {location}"))]
78    Index { message: String, location: Location },
79    #[snafu(display("Lance index not found: {identity}, {location}"))]
80    IndexNotFound {
81        identity: String,
82        location: Location,
83    },
84    #[snafu(display("Cannot infer storage location from: {message}"))]
85    InvalidTableLocation { message: String },
86    /// Stream early stop
87    Stop,
88    #[snafu(display("Wrapped error: {error}, {location}"))]
89    Wrapped {
90        error: BoxedError,
91        location: Location,
92    },
93    #[snafu(display("Cloned error: {message}, {location}"))]
94    Cloned { message: String, location: Location },
95    #[snafu(display("Query Execution error: {message}, {location}"))]
96    Execution { message: String, location: Location },
97    #[snafu(display("Ref is invalid: {message}"))]
98    InvalidRef { message: String },
99    #[snafu(display("Ref conflict error: {message}"))]
100    RefConflict { message: String },
101    #[snafu(display("Ref not found error: {message}"))]
102    RefNotFound { message: String },
103    #[snafu(display("Cleanup error: {message}"))]
104    Cleanup { message: String },
105    #[snafu(display("Version not found error: {message}"))]
106    VersionNotFound { message: String },
107    #[snafu(display("Version conflict error: {message}"))]
108    VersionConflict {
109        message: String,
110        major_version: u16,
111        minor_version: u16,
112        location: Location,
113    },
114    #[snafu(display("Namespace error: {source}, {location}"))]
115    Namespace {
116        source: BoxedError,
117        location: Location,
118    },
119}
120
121impl Error {
122    pub fn corrupt_file(
123        path: object_store::path::Path,
124        message: impl Into<String>,
125        location: Location,
126    ) -> Self {
127        let message: String = message.into();
128        Self::CorruptFile {
129            path,
130            source: message.into(),
131            location,
132        }
133    }
134
135    pub fn invalid_input(message: impl Into<String>, location: Location) -> Self {
136        let message: String = message.into();
137        Self::InvalidInput {
138            source: message.into(),
139            location,
140        }
141    }
142
143    pub fn io(message: impl Into<String>, location: Location) -> Self {
144        let message: String = message.into();
145        Self::IO {
146            source: message.into(),
147            location,
148        }
149    }
150
151    pub fn version_conflict(
152        message: impl Into<String>,
153        major_version: u16,
154        minor_version: u16,
155        location: Location,
156    ) -> Self {
157        let message: String = message.into();
158        Self::VersionConflict {
159            message,
160            major_version,
161            minor_version,
162            location,
163        }
164    }
165}
166
167pub trait LanceOptionExt<T> {
168    /// Unwraps an option, returning an internal error if the option is None.
169    ///
170    /// Can be used when an option is expected to have a value.
171    fn expect_ok(self) -> Result<T>;
172}
173
174impl<T> LanceOptionExt<T> for Option<T> {
175    #[track_caller]
176    fn expect_ok(self) -> Result<T> {
177        let location = std::panic::Location::caller().to_snafu_location();
178        self.ok_or_else(|| Error::Internal {
179            message: "Expected option to have value".to_string(),
180            location,
181        })
182    }
183}
184
185trait ToSnafuLocation {
186    fn to_snafu_location(&'static self) -> snafu::Location;
187}
188
189impl ToSnafuLocation for std::panic::Location<'static> {
190    fn to_snafu_location(&'static self) -> snafu::Location {
191        snafu::Location::new(self.file(), self.line(), self.column())
192    }
193}
194
195pub type Result<T> = std::result::Result<T, Error>;
196pub type ArrowResult<T> = std::result::Result<T, ArrowError>;
197#[cfg(feature = "datafusion")]
198pub type DataFusionResult<T> = std::result::Result<T, datafusion_common::DataFusionError>;
199
200impl From<ArrowError> for Error {
201    #[track_caller]
202    fn from(e: ArrowError) -> Self {
203        Self::Arrow {
204            message: e.to_string(),
205            location: std::panic::Location::caller().to_snafu_location(),
206        }
207    }
208}
209
210impl From<&ArrowError> for Error {
211    #[track_caller]
212    fn from(e: &ArrowError) -> Self {
213        Self::Arrow {
214            message: e.to_string(),
215            location: std::panic::Location::caller().to_snafu_location(),
216        }
217    }
218}
219
220impl From<std::io::Error> for Error {
221    #[track_caller]
222    fn from(e: std::io::Error) -> Self {
223        Self::IO {
224            source: box_error(e),
225            location: std::panic::Location::caller().to_snafu_location(),
226        }
227    }
228}
229
230impl From<object_store::Error> for Error {
231    #[track_caller]
232    fn from(e: object_store::Error) -> Self {
233        Self::IO {
234            source: box_error(e),
235            location: std::panic::Location::caller().to_snafu_location(),
236        }
237    }
238}
239
240impl From<prost::DecodeError> for Error {
241    #[track_caller]
242    fn from(e: prost::DecodeError) -> Self {
243        Self::IO {
244            source: box_error(e),
245            location: std::panic::Location::caller().to_snafu_location(),
246        }
247    }
248}
249
250impl From<prost::EncodeError> for Error {
251    #[track_caller]
252    fn from(e: prost::EncodeError) -> Self {
253        Self::IO {
254            source: box_error(e),
255            location: std::panic::Location::caller().to_snafu_location(),
256        }
257    }
258}
259
260impl From<prost::UnknownEnumValue> for Error {
261    #[track_caller]
262    fn from(e: prost::UnknownEnumValue) -> Self {
263        Self::IO {
264            source: box_error(e),
265            location: std::panic::Location::caller().to_snafu_location(),
266        }
267    }
268}
269
270impl From<tokio::task::JoinError> for Error {
271    #[track_caller]
272    fn from(e: tokio::task::JoinError) -> Self {
273        Self::IO {
274            source: box_error(e),
275            location: std::panic::Location::caller().to_snafu_location(),
276        }
277    }
278}
279
280impl From<object_store::path::Error> for Error {
281    #[track_caller]
282    fn from(e: object_store::path::Error) -> Self {
283        Self::IO {
284            source: box_error(e),
285            location: std::panic::Location::caller().to_snafu_location(),
286        }
287    }
288}
289
290impl From<url::ParseError> for Error {
291    #[track_caller]
292    fn from(e: url::ParseError) -> Self {
293        Self::IO {
294            source: box_error(e),
295            location: std::panic::Location::caller().to_snafu_location(),
296        }
297    }
298}
299
300impl From<serde_json::Error> for Error {
301    #[track_caller]
302    fn from(e: serde_json::Error) -> Self {
303        Self::Arrow {
304            message: e.to_string(),
305            location: std::panic::Location::caller().to_snafu_location(),
306        }
307    }
308}
309
310#[track_caller]
311fn arrow_io_error_from_msg(message: String) -> ArrowError {
312    ArrowError::IoError(message.clone(), std::io::Error::other(message))
313}
314
315impl From<Error> for ArrowError {
316    fn from(value: Error) -> Self {
317        match value {
318            Error::Arrow { message, .. } => arrow_io_error_from_msg(message), // we lose the error type converting to LanceError
319            Error::IO { source, .. } => arrow_io_error_from_msg(source.to_string()),
320            Error::Schema { message, .. } => Self::SchemaError(message),
321            Error::Index { message, .. } => arrow_io_error_from_msg(message),
322            Error::Stop => arrow_io_error_from_msg("early stop".to_string()),
323            e => arrow_io_error_from_msg(e.to_string()), // Find a more scalable way of doing this
324        }
325    }
326}
327
328#[cfg(feature = "datafusion")]
329impl From<datafusion_sql::sqlparser::parser::ParserError> for Error {
330    #[track_caller]
331    fn from(e: datafusion_sql::sqlparser::parser::ParserError) -> Self {
332        Self::IO {
333            source: box_error(e),
334            location: std::panic::Location::caller().to_snafu_location(),
335        }
336    }
337}
338
339#[cfg(feature = "datafusion")]
340impl From<datafusion_sql::sqlparser::tokenizer::TokenizerError> for Error {
341    #[track_caller]
342    fn from(e: datafusion_sql::sqlparser::tokenizer::TokenizerError) -> Self {
343        Self::IO {
344            source: box_error(e),
345            location: std::panic::Location::caller().to_snafu_location(),
346        }
347    }
348}
349
350#[cfg(feature = "datafusion")]
351impl From<Error> for datafusion_common::DataFusionError {
352    #[track_caller]
353    fn from(e: Error) -> Self {
354        Self::Execution(e.to_string())
355    }
356}
357
358#[cfg(feature = "datafusion")]
359impl From<datafusion_common::DataFusionError> for Error {
360    #[track_caller]
361    fn from(e: datafusion_common::DataFusionError) -> Self {
362        let location = std::panic::Location::caller().to_snafu_location();
363        match e {
364            datafusion_common::DataFusionError::SQL(..)
365            | datafusion_common::DataFusionError::Plan(..)
366            | datafusion_common::DataFusionError::Configuration(..) => Self::InvalidInput {
367                source: box_error(e),
368                location,
369            },
370            datafusion_common::DataFusionError::SchemaError(..) => Self::Schema {
371                message: e.to_string(),
372                location,
373            },
374            datafusion_common::DataFusionError::ArrowError(..) => Self::Arrow {
375                message: e.to_string(),
376                location,
377            },
378            datafusion_common::DataFusionError::NotImplemented(..) => Self::NotSupported {
379                source: box_error(e),
380                location,
381            },
382            datafusion_common::DataFusionError::Execution(..) => Self::Execution {
383                message: e.to_string(),
384                location,
385            },
386            _ => Self::IO {
387                source: box_error(e),
388                location,
389            },
390        }
391    }
392}
393
394// This is a bit odd but some object_store functions only accept
395// Stream<Result<T, ObjectStoreError>> and so we need to convert
396// to ObjectStoreError to call the methods.
397impl From<Error> for object_store::Error {
398    fn from(err: Error) -> Self {
399        Self::Generic {
400            store: "N/A",
401            source: Box::new(err),
402        }
403    }
404}
405
406#[track_caller]
407pub fn get_caller_location() -> &'static std::panic::Location<'static> {
408    std::panic::Location::caller()
409}
410
411/// Wrap an error in a new error type that implements Clone
412///
413/// This is useful when two threads/streams share a common fallible source
414/// The base error will always have the full error.  Any cloned results will
415/// only have Error::Cloned with the to_string of the base error.
416pub struct CloneableError(pub Error);
417
418impl Clone for CloneableError {
419    #[track_caller]
420    fn clone(&self) -> Self {
421        Self(Error::Cloned {
422            message: self.0.to_string(),
423            location: std::panic::Location::caller().to_snafu_location(),
424        })
425    }
426}
427
428#[derive(Clone)]
429pub struct CloneableResult<T: Clone>(pub std::result::Result<T, CloneableError>);
430
431impl<T: Clone> From<Result<T>> for CloneableResult<T> {
432    fn from(result: Result<T>) -> Self {
433        Self(result.map_err(CloneableError))
434    }
435}
436
437#[cfg(test)]
438mod test {
439    use super::*;
440
441    #[test]
442    fn test_caller_location_capture() {
443        let current_fn = get_caller_location();
444        // make sure ? captures the correct location
445        // .into() WILL NOT capture the correct location
446        let f: Box<dyn Fn() -> Result<()>> = Box::new(|| {
447            Err(object_store::Error::Generic {
448                store: "",
449                source: "".into(),
450            })?;
451            Ok(())
452        });
453        match f().unwrap_err() {
454            Error::IO { location, .. } => {
455                // +4 is the beginning of object_store::Error::Generic...
456                assert_eq!(location.line, current_fn.line() + 4, "{}", location)
457            }
458            #[allow(unreachable_patterns)]
459            _ => panic!("expected ObjectStore error"),
460        }
461    }
462}