1use chrono::{DateTime, Utc};
3use object_store::Error as ObjectStoreError;
4
5use crate::kernel::Version;
6use crate::kernel::transaction::{CommitBuilderError, TransactionError};
7
8pub type DeltaResult<T, E = DeltaTableError> = Result<T, E>;
10
11#[allow(missing_docs)]
13#[derive(thiserror::Error, Debug)]
14pub enum DeltaTableError {
15 #[error("Kernel error: {0}")]
16 KernelError(#[from] delta_kernel::error::Error),
17
18 #[error("Failed to read delta log object: {}", .source)]
20 ObjectStore {
21 #[from]
23 source: ObjectStoreError,
24 },
25
26 #[error("Failed to parse parquet: {}", .source)]
28 Parquet {
29 #[from]
31 source: parquet::errors::ParquetError,
32 },
33
34 #[error("Failed to convert into Arrow schema: {}", .source)]
36 Arrow {
37 #[from]
39 source: arrow::error::ArrowError,
40 },
41
42 #[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)]
44 InvalidJsonLog {
45 json_err: serde_json::error::Error,
47 line: String,
49 version: Version,
51 },
52
53 #[error("Invalid JSON in file stats: {}", .json_err)]
55 InvalidStatsJson {
56 json_err: serde_json::error::Error,
58 },
59
60 #[error("Invalid table version: {0}")]
62 InvalidVersion(Version),
63
64 #[error(
66 "Cannot downgrade from version {current_version} to {requested_version}; use DeltaTable.load_version()"
67 )]
68 VersionDowngrade {
69 current_version: Version,
71 requested_version: Version,
73 },
74
75 #[error("Invalid datetime string: {}", .source)]
77 InvalidDateTimeString {
78 #[from]
80 source: chrono::ParseError,
81 },
82
83 #[error("{message}")]
85 InvalidData {
86 message: String,
88 },
89
90 #[error("Not a Delta table: {0}")]
92 NotATable(String),
93
94 #[error("No schema found, please make sure table is loaded.")]
96 NoSchema,
97
98 #[error("Data does not match the schema or partitions of the table: {}", msg)]
101 SchemaMismatch {
102 msg: String,
104 },
105
106 #[error("This partition is not formatted with key=value: {}", .partition)]
108 PartitionError {
109 partition: String,
111 },
112
113 #[error("Invalid partition filter found: {}.", .partition_filter)]
115 InvalidPartitionFilter {
116 partition_filter: String,
118 },
119
120 #[error("Failed to read line from log record")]
122 Io {
123 #[from]
125 source: std::io::Error,
126 },
127
128 #[error("Commit actions are unsound: {source}")]
130 CommitValidation {
131 source: CommitBuilderError,
133 },
134
135 #[error("Transaction failed: {source}")]
137 Transaction {
138 source: TransactionError,
140 },
141
142 #[error("Delta transaction failed, version {0} already exists.")]
144 VersionAlreadyExists(Version),
145
146 #[error("Delta transaction failed, version {0} does not follow {1}")]
148 VersionMismatch(Version, Version),
149
150 #[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")]
152 MissingFeature {
153 feature: &'static str,
155 url: String,
157 },
158
159 #[error("Cannot infer storage location from: {0}")]
161 InvalidTableLocation(String),
162
163 #[error("Log JSON serialization error: {json_err}")]
165 SerializeLogJson {
166 json_err: serde_json::error::Error,
168 },
169
170 #[error("Generic DeltaTable error: {0}")]
172 Generic(String),
173
174 #[error("Generic error: {source}")]
176 GenericError {
177 source: Box<dyn std::error::Error + Send + Sync + 'static>,
179 },
180
181 #[error("Kernel: {source}")]
182 Kernel {
183 #[from]
184 source: crate::kernel::Error,
185 },
186
187 #[error("Table metadata is invalid: {0}")]
188 MetadataError(String),
189
190 #[error("Table has not yet been initialized")]
191 NotInitialized,
192
193 #[error("Table has not yet been initialized with files, therefore {0} is not supported")]
194 NotInitializedWithFiles(String),
195
196 #[error("Change Data not enabled for version: {version}, Start: {start}, End: {end}")]
197 ChangeDataNotRecorded {
198 version: Version,
199 start: Version,
200 end: Version,
201 },
202
203 #[error("Reading a table version: {version} that does not have change data enabled")]
204 ChangeDataNotEnabled { version: Version },
205
206 #[error("Invalid version. Start version {start} is greater than end version {end}")]
207 ChangeDataInvalidVersionRange { start: Version, end: Version },
208
209 #[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
210 ChangeDataTimestampGreaterThanCommit { ending_timestamp: DateTime<Utc> },
211
212 #[error("No starting version or timestamp provided for CDC")]
213 NoStartingVersionOrTimestamp,
214}
215
216impl From<object_store::path::Error> for DeltaTableError {
217 fn from(err: object_store::path::Error) -> Self {
218 Self::GenericError {
219 source: Box::new(err),
220 }
221 }
222}
223
224impl From<serde_json::Error> for DeltaTableError {
225 fn from(value: serde_json::Error) -> Self {
226 DeltaTableError::InvalidStatsJson { json_err: value }
227 }
228}
229
230impl DeltaTableError {
231 pub fn not_a_table(path: impl AsRef<str>) -> Self {
233 let msg = format!(
234 "No snapshot or version 0 found, perhaps {} is an empty dir?",
235 path.as_ref()
236 );
237 Self::NotATable(msg)
238 }
239
240 pub fn generic(msg: impl ToString) -> Self {
242 Self::Generic(msg.to_string())
243 }
244}