1use chrono::{DateTime, Utc};
3use object_store::Error as ObjectStoreError;
4
5use crate::kernel::Version;
6use crate::kernel::transaction::{CommitBuilderError, TransactionError};
7
8pub type DeltaResult<T, E = DeltaTableError> = Result<T, E>;
10
11pub(crate) fn unsupported_column_mapping_write(operation: &str) -> DeltaTableError {
12 DeltaTableError::Generic(format!(
13 "column mapping writes are not supported for {operation} yet"
14 ))
15}
16
17#[allow(missing_docs)]
19#[derive(thiserror::Error, Debug)]
20pub enum DeltaTableError {
21 #[error("Kernel error: {0}")]
22 KernelError(#[from] delta_kernel::error::Error),
23
24 #[error("Failed to read delta log object: {}", .source)]
26 ObjectStore {
27 #[from]
29 source: ObjectStoreError,
30 },
31
32 #[error("Failed to parse parquet: {}", .source)]
34 Parquet {
35 #[from]
37 source: parquet::errors::ParquetError,
38 },
39
40 #[error("Failed to convert into Arrow schema: {}", .source)]
42 Arrow {
43 #[from]
45 source: arrow::error::ArrowError,
46 },
47
48 #[error("Invalid JSON in log record, version={}, line=`{}`, err=`{}`", .version, .line, .json_err)]
50 InvalidJsonLog {
51 json_err: serde_json::error::Error,
53 line: String,
55 version: Version,
57 },
58
59 #[error("Invalid JSON in file stats: {}", .json_err)]
61 InvalidStatsJson {
62 json_err: serde_json::error::Error,
64 },
65
66 #[error("Invalid table version: {0}")]
68 InvalidVersion(Version),
69
70 #[error(
72 "Cannot downgrade from version {current_version} to {requested_version}; use DeltaTable.load_version()"
73 )]
74 VersionDowngrade {
75 current_version: Version,
77 requested_version: Version,
79 },
80
81 #[error("Invalid datetime string: {}", .source)]
83 InvalidDateTimeString {
84 #[from]
86 source: chrono::ParseError,
87 },
88
89 #[error("{message}")]
91 InvalidData {
92 message: String,
94 },
95
96 #[error("Not a Delta table: {0}")]
98 NotATable(String),
99
100 #[error("No schema found, please make sure table is loaded.")]
102 NoSchema,
103
104 #[error("Data does not match the schema or partitions of the table: {}", msg)]
107 SchemaMismatch {
108 msg: String,
110 },
111
112 #[error("This partition is not formatted with key=value: {}", .partition)]
114 PartitionError {
115 partition: String,
117 },
118
119 #[error("Invalid partition filter found: {}.", .partition_filter)]
121 InvalidPartitionFilter {
122 partition_filter: String,
124 },
125
126 #[error("Failed to read line from log record")]
128 Io {
129 #[from]
131 source: std::io::Error,
132 },
133
134 #[error("Commit actions are unsound: {source}")]
136 CommitValidation {
137 source: CommitBuilderError,
139 },
140
141 #[error("Transaction failed: {source}")]
143 Transaction {
144 source: TransactionError,
146 },
147
148 #[error("Delta transaction failed, version {0} already exists.")]
150 VersionAlreadyExists(Version),
151
152 #[error("Delta transaction failed, version {0} does not follow {1}")]
154 VersionMismatch(Version, Version),
155
156 #[error("Delta-rs must be build with feature '{feature}' to support loading from: {url}.")]
158 MissingFeature {
159 feature: &'static str,
161 url: String,
163 },
164
165 #[error("Cannot infer storage location from: {0}")]
167 InvalidTableLocation(String),
168
169 #[error("Log JSON serialization error: {json_err}")]
171 SerializeLogJson {
172 json_err: serde_json::error::Error,
174 },
175
176 #[error("Generic DeltaTable error: {0}")]
178 Generic(String),
179
180 #[error("Generic error: {source}")]
182 GenericError {
183 source: Box<dyn std::error::Error + Send + Sync + 'static>,
185 },
186
187 #[error("Kernel: {source}")]
188 Kernel {
189 #[from]
190 source: crate::kernel::Error,
191 },
192
193 #[error("Table metadata is invalid: {0}")]
194 MetadataError(String),
195
196 #[error("Table has not yet been initialized")]
197 NotInitialized,
198
199 #[error("Table has not yet been initialized with files, therefore {0} is not supported")]
200 NotInitializedWithFiles(String),
201
202 #[error("Change Data not enabled for version: {version}, Start: {start}, End: {end}")]
203 ChangeDataNotRecorded {
204 version: Version,
205 start: Version,
206 end: Version,
207 },
208
209 #[error("Reading a table version: {version} that does not have change data enabled")]
210 ChangeDataNotEnabled { version: Version },
211
212 #[error("Invalid version. Start version {start} is greater than end version {end}")]
213 ChangeDataInvalidVersionRange { start: Version, end: Version },
214
215 #[error("End timestamp {ending_timestamp} is greater than latest commit timestamp")]
216 ChangeDataTimestampGreaterThanCommit { ending_timestamp: DateTime<Utc> },
217
218 #[error("No starting version or timestamp provided for CDC")]
219 NoStartingVersionOrTimestamp,
220}
221
222impl From<object_store::path::Error> for DeltaTableError {
223 fn from(err: object_store::path::Error) -> Self {
224 Self::GenericError {
225 source: Box::new(err),
226 }
227 }
228}
229
230impl From<serde_json::Error> for DeltaTableError {
231 fn from(value: serde_json::Error) -> Self {
232 DeltaTableError::InvalidStatsJson { json_err: value }
233 }
234}
235
236impl DeltaTableError {
237 pub fn not_a_table(path: impl AsRef<str>) -> Self {
239 let msg = format!(
240 "No snapshot or version 0 found, perhaps {} is an empty dir?",
241 path.as_ref()
242 );
243 Self::NotATable(msg)
244 }
245
246 pub fn generic(msg: impl ToString) -> Self {
248 Self::Generic(msg.to_string())
249 }
250}