Skip to main content

deltalake_core/
errors.rs

1//! Exceptions for the deltalake crate
2use chrono::{DateTime, Utc};
3use object_store::Error as ObjectStoreError;
4
5use crate::kernel::Version;
6use crate::kernel::transaction::{CommitBuilderError, TransactionError};
7
8/// A result returned by delta-rs
9pub type DeltaResult<T, E = DeltaTableError> = Result<T, E>;
10
11pub(crate) fn unsupported_column_mapping_write(operation: &str) -> DeltaTableError {
12    DeltaTableError::Generic(format!(
13        "column mapping writes are not supported for {operation} yet"
14    ))
15}
16
17/// Delta Table specific error
18#[allow(missing_docs)]
19#[derive(thiserror::Error, Debug)]
20pub enum DeltaTableError {
21    #[error("Kernel error: {0}")]
22    KernelError(#[from] delta_kernel::error::Error),
23
24    /// Error returned when reading the delta log object failed.
25    #[error("Failed to read delta log object: {}", .source)]
26    ObjectStore {
27        /// Storage error details when reading the delta log object failed.
28        #[from]
29        source: ObjectStoreError,
30    },
31
32    /// Error returned when parsing checkpoint parquet.
33    #[error("Failed to parse parquet: {}", .source)]
34    Parquet {
35        /// Parquet error details returned when reading the checkpoint failed.
36        #[from]
37        source: parquet::errors::ParquetError,
38    },
39
40    /// Error returned when converting the schema in Arrow format failed.
41    #[error("Failed to convert into Arrow schema: {}", .source)]
42    Arrow {
43        /// Arrow error details returned when converting the schema in Arrow format failed
44        #[from]
45        source: arrow::error::ArrowError,
46    },
47
48    /// Error returned when the log record has an invalid JSON.
49    #[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)]
50    InvalidJsonLog {
51        /// JSON error details returned when parsing the record JSON.
52        json_err: serde_json::error::Error,
53        /// invalid log entry content.
54        line: String,
55        /// corresponding table version for the log file.
56        version: Version,
57    },
58
59    /// Error returned when the log contains invalid stats JSON.
60    #[error("Invalid JSON in file stats: {}", .json_err)]
61    InvalidStatsJson {
62        /// JSON error details returned when parsing the stats JSON.
63        json_err: serde_json::error::Error,
64    },
65
66    /// Error returned when the DeltaTable has an invalid version.
67    #[error("Invalid table version: {0}")]
68    InvalidVersion(Version),
69
70    /// Error returned when an operation requests an older version than the currently loaded one.
71    #[error(
72        "Cannot downgrade from version {current_version} to {requested_version}; use DeltaTable.load_version()"
73    )]
74    VersionDowngrade {
75        /// The currently loaded version.
76        current_version: Version,
77        /// The requested older version.
78        requested_version: Version,
79    },
80
81    /// Error returned when the datetime string is invalid for a conversion.
82    #[error("Invalid datetime string: {}", .source)]
83    InvalidDateTimeString {
84        /// Parse error details returned of the datetime string parse error.
85        #[from]
86        source: chrono::ParseError,
87    },
88
89    /// Error returned when attempting to write bad data to the table
90    #[error("{message}")]
91    InvalidData {
92        /// Action error details returned of the invalid action.
93        message: String,
94    },
95
96    /// Error returned when it is not a DeltaTable.
97    #[error("Not a Delta table: {0}")]
98    NotATable(String),
99
100    /// Error returned when no schema was found in the DeltaTable.
101    #[error("No schema found, please make sure table is loaded.")]
102    NoSchema,
103
104    /// Error returned when writes are attempted with data that doesn't match the schema of the
105    /// table
106    #[error("Data does not match the schema or partitions of the table: {}", msg)]
107    SchemaMismatch {
108        /// Information about the mismatch
109        msg: String,
110    },
111
112    /// Error returned when a partition is not formatted as a Hive Partition.
113    #[error("This partition is not formatted with key=value: {}", .partition)]
114    PartitionError {
115        /// The malformed partition used.
116        partition: String,
117    },
118
119    /// Error returned when a invalid partition filter was found.
120    #[error("Invalid partition filter found: {}.", .partition_filter)]
121    InvalidPartitionFilter {
122        /// The invalid partition filter used.
123        partition_filter: String,
124    },
125
126    /// Error returned when a line from log record is invalid.
127    #[error("Failed to read line from log record")]
128    Io {
129        /// Source error details returned while reading the log record.
130        #[from]
131        source: std::io::Error,
132    },
133
134    /// Error raised while preparing a commit
135    #[error("Commit actions are unsound: {source}")]
136    CommitValidation {
137        /// The source error
138        source: CommitBuilderError,
139    },
140
141    /// Error raised while commititng transaction
142    #[error("Transaction failed: {source}")]
143    Transaction {
144        /// The source error
145        source: TransactionError,
146    },
147
148    /// Error returned when transaction is failed to be committed because given version already exists.
149    #[error("Delta transaction failed, version {0} already exists.")]
150    VersionAlreadyExists(Version),
151
152    /// Error returned when user attempts to commit actions that don't belong to the next version.
153    #[error("Delta transaction failed, version {0} does not follow {1}")]
154    VersionMismatch(Version, Version),
155
156    /// A Feature is missing to perform operation
157    #[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")]
158    MissingFeature {
159        /// Name of the missing feature
160        feature: &'static str,
161        /// Storage location url
162        url: String,
163    },
164
165    /// A Feature is missing to perform operation
166    #[error("Cannot infer storage location from: {0}")]
167    InvalidTableLocation(String),
168
169    /// Generic Delta Table error
170    #[error("Log JSON serialization error: {json_err}")]
171    SerializeLogJson {
172        /// JSON serialization error
173        json_err: serde_json::error::Error,
174    },
175
176    /// Generic Delta Table error
177    #[error("Generic DeltaTable error: {0}")]
178    Generic(String),
179
180    /// Generic Delta Table error
181    #[error("Generic error: {source}")]
182    GenericError {
183        /// Source error
184        source: Box<dyn std::error::Error + Send + Sync + 'static>,
185    },
186
187    #[error("Kernel: {source}")]
188    Kernel {
189        #[from]
190        source: crate::kernel::Error,
191    },
192
193    #[error("Table metadata is invalid: {0}")]
194    MetadataError(String),
195
196    #[error("Table has not yet been initialized")]
197    NotInitialized,
198
199    #[error("Table has not yet been initialized with files, therefore {0} is not supported")]
200    NotInitializedWithFiles(String),
201
202    #[error("Change Data not enabled for version: {version}, Start: {start}, End: {end}")]
203    ChangeDataNotRecorded {
204        version: Version,
205        start: Version,
206        end: Version,
207    },
208
209    #[error("Reading a table version: {version} that does not have change data enabled")]
210    ChangeDataNotEnabled { version: Version },
211
212    #[error("Invalid version. Start version {start} is greater than end version {end}")]
213    ChangeDataInvalidVersionRange { start: Version, end: Version },
214
215    #[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
216    ChangeDataTimestampGreaterThanCommit { ending_timestamp: DateTime<Utc> },
217
218    #[error("No starting version or timestamp provided for CDC")]
219    NoStartingVersionOrTimestamp,
220}
221
222impl From<object_store::path::Error> for DeltaTableError {
223    fn from(err: object_store::path::Error) -> Self {
224        Self::GenericError {
225            source: Box::new(err),
226        }
227    }
228}
229
230impl From<serde_json::Error> for DeltaTableError {
231    fn from(value: serde_json::Error) -> Self {
232        DeltaTableError::InvalidStatsJson { json_err: value }
233    }
234}
235
236impl DeltaTableError {
237    /// Crate a NotATable Error with message for given path.
238    pub fn not_a_table(path: impl AsRef<str>) -> Self {
239        let msg = format!(
240            "No snapshot or version 0 found, perhaps {} is an empty dir?",
241            path.as_ref()
242        );
243        Self::NotATable(msg)
244    }
245
246    /// Create a [Generic](DeltaTableError::Generic) error with the given message.
247    pub fn generic(msg: impl ToString) -> Self {
248        Self::Generic(msg.to_string())
249    }
250}