1use chrono::{DateTime, Utc};
3use object_store::Error as ObjectStoreError;
4
5use crate::kernel::transaction::{CommitBuilderError, TransactionError};
6use crate::protocol::ProtocolError;
7
8pub type DeltaResult<T, E = DeltaTableError> = Result<T, E>;
10
11#[allow(missing_docs)]
13#[derive(thiserror::Error, Debug)]
14pub enum DeltaTableError {
15 #[error("Kernel error: {0}")]
16 KernelError(#[from] delta_kernel::error::Error),
17
18 #[error("Delta protocol violation: {source}")]
19 Protocol { source: ProtocolError },
20
21 #[error("Failed to read delta log object: {}", .source)]
23 ObjectStore {
24 #[from]
26 source: ObjectStoreError,
27 },
28
29 #[error("Failed to parse parquet: {}", .source)]
31 Parquet {
32 #[from]
34 source: parquet::errors::ParquetError,
35 },
36
37 #[error("Failed to convert into Arrow schema: {}", .source)]
39 Arrow {
40 #[from]
42 source: arrow::error::ArrowError,
43 },
44
45 #[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)]
47 InvalidJsonLog {
48 json_err: serde_json::error::Error,
50 line: String,
52 version: i64,
54 },
55
56 #[error("Invalid JSON in file stats: {}", .json_err)]
58 InvalidStatsJson {
59 json_err: serde_json::error::Error,
61 },
62
63 #[error("Invalid JSON in invariant expression, line=`{line}`, err=`{json_err}`")]
65 InvalidInvariantJson {
66 json_err: serde_json::error::Error,
68 line: String,
70 },
71
72 #[error("Invalid table version: {0}")]
74 InvalidVersion(i64),
75
76 #[error("Corrupted table, cannot read data file {}: {}", .path, .source)]
78 MissingDataFile {
79 source: std::io::Error,
81 path: String,
83 },
84
85 #[error("Invalid datetime string: {}", .source)]
87 InvalidDateTimeString {
88 #[from]
90 source: chrono::ParseError,
91 },
92
93 #[error("Attempted to write invalid data to the table: {:#?}", violations)]
95 InvalidData {
96 violations: Vec<String>,
98 },
99
100 #[error("Not a Delta table: {0}")]
102 NotATable(String),
103
104 #[error("No metadata found, please make sure table is loaded.")]
106 NoMetadata,
107
108 #[error("No schema found, please make sure table is loaded.")]
110 NoSchema,
111
112 #[error("No partitions found, please make sure table is partitioned.")]
114 LoadPartitions,
115
116 #[error("Data does not match the schema or partitions of the table: {}", msg)]
119 SchemaMismatch {
120 msg: String,
122 },
123
124 #[error("This partition is not formatted with key=value: {}", .partition)]
126 PartitionError {
127 partition: String,
129 },
130
131 #[error("Invalid partition filter found: {}.", .partition_filter)]
133 InvalidPartitionFilter {
134 partition_filter: String,
136 },
137
138 #[error("Tried to filter partitions on non-partitioned columns: {:#?}", .nonpartitioned_columns)]
140 ColumnsNotPartitioned {
141 nonpartitioned_columns: Vec<String>,
143 },
144
145 #[error("Failed to read line from log record")]
147 Io {
148 #[from]
150 source: std::io::Error,
151 },
152
153 #[error("Commit actions are unsound: {source}")]
155 CommitValidation {
156 source: CommitBuilderError,
158 },
159
160 #[error("Transaction failed: {source}")]
162 Transaction {
163 source: TransactionError,
165 },
166
167 #[error("Delta transaction failed, version {0} already exists.")]
169 VersionAlreadyExists(i64),
170
171 #[error("Delta transaction failed, version {0} does not follow {1}")]
173 VersionMismatch(i64, i64),
174
175 #[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")]
177 MissingFeature {
178 feature: &'static str,
180 url: String,
182 },
183
184 #[error("Cannot infer storage location from: {0}")]
186 InvalidTableLocation(String),
187
188 #[error("Log JSON serialization error: {json_err}")]
190 SerializeLogJson {
191 json_err: serde_json::error::Error,
193 },
194
195 #[error("Schema JSON serialization error: {json_err}")]
197 SerializeSchemaJson {
198 json_err: serde_json::error::Error,
200 },
201
202 #[error("Generic DeltaTable error: {0}")]
204 Generic(String),
205
206 #[error("Generic error: {source}")]
208 GenericError {
209 source: Box<dyn std::error::Error + Send + Sync + 'static>,
211 },
212
213 #[error("Kernel: {source}")]
214 Kernel {
215 #[from]
216 source: crate::kernel::Error,
217 },
218
219 #[error("Table metadata is invalid: {0}")]
220 MetadataError(String),
221
222 #[error("Table has not yet been initialized")]
223 NotInitialized,
224
225 #[error("Table has not yet been initialized with files, therefore {0} is not supported")]
226 NotInitializedWithFiles(String),
227
228 #[error("Change Data not enabled for version: {version}, Start: {start}, End: {end}")]
229 ChangeDataNotRecorded { version: i64, start: i64, end: i64 },
230
231 #[error("Reading a table version: {version} that does not have change data enabled")]
232 ChangeDataNotEnabled { version: i64 },
233
234 #[error("Invalid version. Start version {start} is greater than end version {end}")]
235 ChangeDataInvalidVersionRange { start: i64, end: i64 },
236
237 #[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
238 ChangeDataTimestampGreaterThanCommit { ending_timestamp: DateTime<Utc> },
239
240 #[error("No starting version or timestamp provided for CDC")]
241 NoStartingVersionOrTimestamp,
242}
243
244impl From<object_store::path::Error> for DeltaTableError {
245 fn from(err: object_store::path::Error) -> Self {
246 Self::GenericError {
247 source: Box::new(err),
248 }
249 }
250}
251
252impl From<ProtocolError> for DeltaTableError {
253 fn from(value: ProtocolError) -> Self {
254 match value {
255 ProtocolError::Arrow { source } => DeltaTableError::Arrow { source },
256 ProtocolError::IO { source } => DeltaTableError::Io { source },
257 ProtocolError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
258 ProtocolError::ParquetParseError { source } => DeltaTableError::Parquet { source },
259 _ => DeltaTableError::Protocol { source: value },
260 }
261 }
262}
263
264impl From<serde_json::Error> for DeltaTableError {
265 fn from(value: serde_json::Error) -> Self {
266 DeltaTableError::InvalidStatsJson { json_err: value }
267 }
268}
269
270impl DeltaTableError {
271 pub fn not_a_table(path: impl AsRef<str>) -> Self {
273 let msg = format!(
274 "No snapshot or version 0 found, perhaps {} is an empty dir?",
275 path.as_ref()
276 );
277 Self::NotATable(msg)
278 }
279
280 pub fn generic(msg: impl ToString) -> Self {
282 Self::Generic(msg.to_string())
283 }
284}