1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
//! Exceptions for the deltalake crate
use chrono::{DateTime, Utc};
use object_store::Error as ObjectStoreError;
use crate::kernel::Version;
use crate::kernel::transaction::{CommitBuilderError, TransactionError};
/// A result returned by delta-rs
pub type DeltaResult<T, E = DeltaTableError> = Result<T, E>;
/// Delta Table specific error
#[allow(missing_docs)]
#[derive(thiserror::Error, Debug)]
pub enum DeltaTableError {
#[error("Kernel error: {0}")]
KernelError(#[from] delta_kernel::error::Error),
/// Error returned when reading the delta log object failed.
#[error("Failed to read delta log object: {}", .source)]
ObjectStore {
/// Storage error details when reading the delta log object failed.
#[from]
source: ObjectStoreError,
},
/// Error returned when parsing checkpoint parquet.
#[error("Failed to parse parquet: {}", .source)]
Parquet {
/// Parquet error details returned when reading the checkpoint failed.
#[from]
source: parquet::errors::ParquetError,
},
/// Error returned when converting the schema in Arrow format failed.
#[error("Failed to convert into Arrow schema: {}", .source)]
Arrow {
/// Arrow error details returned when converting the schema in Arrow format failed
#[from]
source: arrow::error::ArrowError,
},
/// Error returned when the log record has an invalid JSON.
#[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)]
InvalidJsonLog {
/// JSON error details returned when parsing the record JSON.
json_err: serde_json::error::Error,
/// invalid log entry content.
line: String,
/// corresponding table version for the log file.
version: Version,
},
/// Error returned when the log contains invalid stats JSON.
#[error("Invalid JSON in file stats: {}", .json_err)]
InvalidStatsJson {
/// JSON error details returned when parsing the stats JSON.
json_err: serde_json::error::Error,
},
/// Error returned when the DeltaTable has an invalid version.
#[error("Invalid table version: {0}")]
InvalidVersion(Version),
/// Error returned when an operation requests an older version than the currently loaded one.
#[error(
"Cannot downgrade from version {current_version} to {requested_version}; use DeltaTable.load_version()"
)]
VersionDowngrade {
/// The currently loaded version.
current_version: Version,
/// The requested older version.
requested_version: Version,
},
/// Error returned when the datetime string is invalid for a conversion.
#[error("Invalid datetime string: {}", .source)]
InvalidDateTimeString {
/// Parse error details returned of the datetime string parse error.
#[from]
source: chrono::ParseError,
},
/// Error returned when attempting to write bad data to the table
#[error("{message}")]
InvalidData {
/// Action error details returned of the invalid action.
message: String,
},
/// Error returned when it is not a DeltaTable.
#[error("Not a Delta table: {0}")]
NotATable(String),
/// Error returned when no schema was found in the DeltaTable.
#[error("No schema found, please make sure table is loaded.")]
NoSchema,
/// Error returned when writes are attempted with data that doesn't match the schema of the
/// table
#[error("Data does not match the schema or partitions of the table: {}", msg)]
SchemaMismatch {
/// Information about the mismatch
msg: String,
},
/// Error returned when a partition is not formatted as a Hive Partition.
#[error("This partition is not formatted with key=value: {}", .partition)]
PartitionError {
/// The malformed partition used.
partition: String,
},
/// Error returned when a invalid partition filter was found.
#[error("Invalid partition filter found: {}.", .partition_filter)]
InvalidPartitionFilter {
/// The invalid partition filter used.
partition_filter: String,
},
/// Error returned when a line from log record is invalid.
#[error("Failed to read line from log record")]
Io {
/// Source error details returned while reading the log record.
#[from]
source: std::io::Error,
},
/// Error raised while preparing a commit
#[error("Commit actions are unsound: {source}")]
CommitValidation {
/// The source error
source: CommitBuilderError,
},
/// Error raised while commititng transaction
#[error("Transaction failed: {source}")]
Transaction {
/// The source error
source: TransactionError,
},
/// Error returned when transaction is failed to be committed because given version already exists.
#[error("Delta transaction failed, version {0} already exists.")]
VersionAlreadyExists(Version),
/// Error returned when user attempts to commit actions that don't belong to the next version.
#[error("Delta transaction failed, version {0} does not follow {1}")]
VersionMismatch(Version, Version),
/// A Feature is missing to perform operation
#[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")]
MissingFeature {
/// Name of the missing feature
feature: &'static str,
/// Storage location url
url: String,
},
/// A Feature is missing to perform operation
#[error("Cannot infer storage location from: {0}")]
InvalidTableLocation(String),
/// Generic Delta Table error
#[error("Log JSON serialization error: {json_err}")]
SerializeLogJson {
/// JSON serialization error
json_err: serde_json::error::Error,
},
/// Generic Delta Table error
#[error("Generic DeltaTable error: {0}")]
Generic(String),
/// Generic Delta Table error
#[error("Generic error: {source}")]
GenericError {
/// Source error
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Kernel: {source}")]
Kernel {
#[from]
source: crate::kernel::Error,
},
#[error("Table metadata is invalid: {0}")]
MetadataError(String),
#[error("Table has not yet been initialized")]
NotInitialized,
#[error("Table has not yet been initialized with files, therefore {0} is not supported")]
NotInitializedWithFiles(String),
#[error("Change Data not enabled for version: {version}, Start: {start}, End: {end}")]
ChangeDataNotRecorded {
version: Version,
start: Version,
end: Version,
},
#[error("Reading a table version: {version} that does not have change data enabled")]
ChangeDataNotEnabled { version: Version },
#[error("Invalid version. Start version {start} is greater than end version {end}")]
ChangeDataInvalidVersionRange { start: Version, end: Version },
#[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
ChangeDataTimestampGreaterThanCommit { ending_timestamp: DateTime<Utc> },
#[error("No starting version or timestamp provided for CDC")]
NoStartingVersionOrTimestamp,
}
impl From<object_store::path::Error> for DeltaTableError {
fn from(err: object_store::path::Error) -> Self {
Self::GenericError {
source: Box::new(err),
}
}
}
impl From<serde_json::Error> for DeltaTableError {
fn from(value: serde_json::Error) -> Self {
DeltaTableError::InvalidStatsJson { json_err: value }
}
}
impl DeltaTableError {
/// Crate a NotATable Error with message for given path.
pub fn not_a_table(path: impl AsRef<str>) -> Self {
let msg = format!(
"No snapshot or version 0 found, perhaps {} is an empty dir?",
path.as_ref()
);
Self::NotATable(msg)
}
/// Create a [Generic](DeltaTableError::Generic) error with the given message.
pub fn generic(msg: impl ToString) -> Self {
Self::Generic(msg.to_string())
}
}