1use chrono::{DateTime, Utc};
3use object_store::Error as ObjectStoreError;
4
5use crate::kernel::transaction::{CommitBuilderError, TransactionError};
6
7pub type DeltaResult<T, E = DeltaTableError> = Result<T, E>;
9
10#[allow(missing_docs)]
12#[derive(thiserror::Error, Debug)]
13pub enum DeltaTableError {
14 #[error("Kernel error: {0}")]
15 KernelError(#[from] delta_kernel::error::Error),
16
17 #[error("Failed to read delta log object: {}", .source)]
19 ObjectStore {
20 #[from]
22 source: ObjectStoreError,
23 },
24
25 #[error("Failed to parse parquet: {}", .source)]
27 Parquet {
28 #[from]
30 source: parquet::errors::ParquetError,
31 },
32
33 #[error("Failed to convert into Arrow schema: {}", .source)]
35 Arrow {
36 #[from]
38 source: arrow::error::ArrowError,
39 },
40
41 #[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)]
43 InvalidJsonLog {
44 json_err: serde_json::error::Error,
46 line: String,
48 version: i64,
50 },
51
52 #[error("Invalid JSON in file stats: {}", .json_err)]
54 InvalidStatsJson {
55 json_err: serde_json::error::Error,
57 },
58
59 #[error("Invalid JSON in invariant expression, line=`{line}`, err=`{json_err}`")]
61 InvalidInvariantJson {
62 json_err: serde_json::error::Error,
64 line: String,
66 },
67
68 #[error("Invalid table version: {0}")]
70 InvalidVersion(i64),
71
72 #[error("Corrupted table, cannot read data file {}: {}", .path, .source)]
74 MissingDataFile {
75 source: std::io::Error,
77 path: String,
79 },
80
81 #[error("Invalid datetime string: {}", .source)]
83 InvalidDateTimeString {
84 #[from]
86 source: chrono::ParseError,
87 },
88
89 #[error("Attempted to write invalid data to the table: {:#?}", violations)]
91 InvalidData {
92 violations: Vec<String>,
94 },
95
96 #[error("Not a Delta table: {0}")]
98 NotATable(String),
99
100 #[error("No schema found, please make sure table is loaded.")]
102 NoSchema,
103
104 #[error("No partitions found, please make sure table is partitioned.")]
106 LoadPartitions,
107
108 #[error("Data does not match the schema or partitions of the table: {}", msg)]
111 SchemaMismatch {
112 msg: String,
114 },
115
116 #[error("This partition is not formatted with key=value: {}", .partition)]
118 PartitionError {
119 partition: String,
121 },
122
123 #[error("Invalid partition filter found: {}.", .partition_filter)]
125 InvalidPartitionFilter {
126 partition_filter: String,
128 },
129
130 #[error("Tried to filter partitions on non-partitioned columns: {:#?}", .nonpartitioned_columns)]
132 ColumnsNotPartitioned {
133 nonpartitioned_columns: Vec<String>,
135 },
136
137 #[error("Failed to read line from log record")]
139 Io {
140 #[from]
142 source: std::io::Error,
143 },
144
145 #[error("Commit actions are unsound: {source}")]
147 CommitValidation {
148 source: CommitBuilderError,
150 },
151
152 #[error("Transaction failed: {source}")]
154 Transaction {
155 source: TransactionError,
157 },
158
159 #[error("Delta transaction failed, version {0} already exists.")]
161 VersionAlreadyExists(i64),
162
163 #[error("Delta transaction failed, version {0} does not follow {1}")]
165 VersionMismatch(i64, i64),
166
167 #[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")]
169 MissingFeature {
170 feature: &'static str,
172 url: String,
174 },
175
176 #[error("Cannot infer storage location from: {0}")]
178 InvalidTableLocation(String),
179
180 #[error("Log JSON serialization error: {json_err}")]
182 SerializeLogJson {
183 json_err: serde_json::error::Error,
185 },
186
187 #[error("Schema JSON serialization error: {json_err}")]
189 SerializeSchemaJson {
190 json_err: serde_json::error::Error,
192 },
193
194 #[error("Generic DeltaTable error: {0}")]
196 Generic(String),
197
198 #[error("Generic error: {source}")]
200 GenericError {
201 source: Box<dyn std::error::Error + Send + Sync + 'static>,
203 },
204
205 #[error("Kernel: {source}")]
206 Kernel {
207 #[from]
208 source: crate::kernel::Error,
209 },
210
211 #[error("Table metadata is invalid: {0}")]
212 MetadataError(String),
213
214 #[error("Table has not yet been initialized")]
215 NotInitialized,
216
217 #[error("Table has not yet been initialized with files, therefore {0} is not supported")]
218 NotInitializedWithFiles(String),
219
220 #[error("Change Data not enabled for version: {version}, Start: {start}, End: {end}")]
221 ChangeDataNotRecorded { version: i64, start: i64, end: i64 },
222
223 #[error("Reading a table version: {version} that does not have change data enabled")]
224 ChangeDataNotEnabled { version: i64 },
225
226 #[error("Invalid version. Start version {start} is greater than end version {end}")]
227 ChangeDataInvalidVersionRange { start: i64, end: i64 },
228
229 #[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
230 ChangeDataTimestampGreaterThanCommit { ending_timestamp: DateTime<Utc> },
231
232 #[error("No starting version or timestamp provided for CDC")]
233 NoStartingVersionOrTimestamp,
234}
235
236impl From<object_store::path::Error> for DeltaTableError {
237 fn from(err: object_store::path::Error) -> Self {
238 Self::GenericError {
239 source: Box::new(err),
240 }
241 }
242}
243
244impl From<serde_json::Error> for DeltaTableError {
245 fn from(value: serde_json::Error) -> Self {
246 DeltaTableError::InvalidStatsJson { json_err: value }
247 }
248}
249
250impl DeltaTableError {
251 pub fn not_a_table(path: impl AsRef<str>) -> Self {
253 let msg = format!(
254 "No snapshot or version 0 found, perhaps {} is an empty dir?",
255 path.as_ref()
256 );
257 Self::NotATable(msg)
258 }
259
260 pub fn generic(msg: impl ToString) -> Self {
262 Self::Generic(msg.to_string())
263 }
264}