delta_kernel 0.24.0

Core crate providing a Delta/Deltalake implementation focused on interoperability with a wide range of query engines.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
//! Definitions of errors that the delta kernel can encounter

use std::backtrace::{Backtrace, BacktraceStatus};
use std::convert::Infallible;
use std::num::ParseIntError;
use std::str::Utf8Error;

#[cfg(feature = "default-engine-base")]
use crate::arrow::error::ArrowError;
#[cfg(feature = "default-engine-base")]
use crate::object_store;
use crate::schema::{DataType, StructType};
use crate::table_properties::ParseIntervalError;
use crate::Version;

/// A [`std::result::Result`] that has the kernel [`Error`] as the error variant
pub type DeltaResult<T, E = Error> = std::result::Result<T, E>;

/// A boxed, `Send` iterator of [`DeltaResult<T>`] items.
///
/// Convenience alias for the common pattern of returning a streaming, fallible iterator from
/// kernel APIs.
pub type DeltaResultIterator<'a, T> = Box<dyn Iterator<Item = DeltaResult<T>> + Send + 'a>;

/// `'static` counterpart to [`DeltaResultIterator`] for cases where the iterator does not
/// reference borrowed data.
pub type DeltaResultIteratorStatic<T> = DeltaResultIterator<'static, T>;

/// All the types of errors that the kernel can run into
#[non_exhaustive]
#[derive(thiserror::Error, Debug)]
pub enum Error {
    /// This is an error that includes a backtrace. To have a particular type of error include such
    /// backtrace (when RUST_BACKTRACE=1), annotate the error with `#[error(transparent)]` and then
    /// add the error type and enum variant to the `from_with_backtrace!` macro invocation
    /// below. See IOError for an example.
    #[error("{source}\n{backtrace}")]
    Backtraced {
        source: Box<Self>,
        backtrace: Box<Backtrace>,
    },

    /// An error performing operations on arrow data
    #[cfg(feature = "default-engine-base")]
    #[error(transparent)]
    Arrow(ArrowError),

    #[error("Error writing checkpoint: {0}")]
    CheckpointWrite(String),

    /// User tried to convert engine data to the wrong type
    #[error("Invalid engine data type. Could not convert to {0}")]
    EngineDataType(String),

    /// Could not extract the specified type
    #[error("Error extracting type {0}: {1}")]
    Extract(&'static str, &'static str),

    /// A generic error with a message
    #[error("Generic delta kernel error: {0}")]
    Generic(String),

    /// A generic error wrapping another error
    #[error("Generic error: {source}")]
    GenericError {
        /// Source error
        source: Box<dyn std::error::Error + Send + Sync + 'static>,
    },

    /// Some kind of [`std::io::Error`]
    #[error(transparent)]
    IOError(std::io::Error),

    /// An internal error that means kernel found an unexpected situation, which is likely a bug
    #[error("Internal error {0}. This is a kernel bug, please report.")]
    InternalError(String),

    /// An error enountered while working with parquet data
    #[cfg(feature = "default-engine-base")]
    #[error("Arrow error: {0}")]
    Parquet(#[from] crate::parquet::errors::ParquetError),

    /// An error interacting with the object_store crate
    // We don't use [#from] object_store::Error here as our From impl transforms
    // object_store::Error::NotFound into Self::FileNotFound
    #[cfg(feature = "default-engine-base")]
    #[error("Error interacting with object store: {0}")]
    ObjectStore(object_store::Error),

    /// An error working with paths from the object_store crate
    #[cfg(feature = "default-engine-base")]
    #[error("Object store path error: {0}")]
    ObjectStorePath(#[from] object_store::path::Error),

    #[cfg(feature = "default-engine-base")]
    #[error("Reqwest Error: {0}")]
    Reqwest(#[from] reqwest::Error),

    /// A specified file could not be found
    #[error("File not found: {0}")]
    FileNotFound(String),

    /// A column was requested, but not found
    #[error("{0}")]
    MissingColumn(String),

    /// The connector-provided partition values are invalid (missing/extra/duplicate keys,
    /// or a value type does not match the schema column type).
    #[error("Invalid partition values: {0}")]
    InvalidPartitionValues(String),

    /// A column was specified with a specific type, but it is not of that type
    #[error("Expected column type: {0}")]
    UnexpectedColumnType(String),

    /// Data was expected, but not found
    #[error("Expected is missing: {0}")]
    MissingData(String),

    /// A version for the delta table could not be found in the log
    #[error("No table version found.")]
    MissingVersion,

    /// An error occurred while working with deletion vectors
    #[error("Deletion Vector error: {0}")]
    DeletionVector(String),

    /// A selection vector is larger than data length
    #[error("Selection vector is larger than data length: {0}")]
    InvalidSelectionVector(String),

    /// Transaction state is invalid for the requested operation
    #[error("Invalid transaction state: {0}")]
    InvalidTransactionState(String),

    /// A specified URL was invalid
    #[error("Invalid url: {0}")]
    InvalidUrl(#[from] url::ParseError),

    /// serde encountered malformed json
    #[error(transparent)]
    MalformedJson(serde_json::Error),

    /// There was no metadata action in the delta log
    #[error("No table metadata found in delta log.")]
    MissingMetadata,

    /// There was no protocol action in the delta log
    #[error("No protocol found in delta log.")]
    MissingProtocol,

    /// Invalid protocol action was read from the log
    #[error("Invalid protocol action in the delta log: {0}")]
    InvalidProtocol(String),

    /// Neither metadata nor protocol could be found in the delta log
    #[error("No table metadata or protocol found in delta log.")]
    MissingMetadataAndProtocol,

    /// A string failed to parse as the specified data type
    #[error("Failed to parse value '{0}' as '{1}'")]
    ParseError(String, DataType),

    /// A tokio executor failed to join a task
    #[error("Join failure: {0}")]
    JoinFailure(String),

    /// Could not convert to string from utf-8
    #[error("Could not convert to string from utf-8: {0}")]
    Utf8Error(#[from] Utf8Error),

    /// Could not parse an integer
    #[error("Could not parse int: {0}")]
    ParseIntError(#[from] ParseIntError),

    #[error("Invalid column mapping mode: {0}")]
    InvalidColumnMappingMode(String),

    /// Asked for a table at an invalid location
    #[error("Invalid table location: {0}.")]
    InvalidTableLocation(String),

    /// Precision or scale not compliant with delta specification
    #[error("Invalid decimal: {0}")]
    InvalidDecimal(String),

    /// Inconsistent data passed to struct scalar
    #[error("Invalid struct data: {0}")]
    InvalidStructData(String),

    /// Expressions did not parse or evaluate correctly
    #[error("Invalid expression evaluation: {0}")]
    InvalidExpressionEvaluation(String),

    /// Unable to parse the name of a log path
    #[error("Invalid log path: {0}")]
    InvalidLogPath(String),

    /// The file already exists at the path, prohibiting a non-overwrite write
    #[error("File already exists: {0}")]
    FileAlreadyExists(String),

    /// Some functionality is currently unsupported
    #[error("Unsupported: {0}")]
    Unsupported(String),

    /// Cannot write a version checksum (CRC) file for this snapshot
    #[error("Checksum write unsupported: {0}")]
    ChecksumWriteUnsupported(String),

    /// Parsing error when attempting to deserialize an interval
    #[error(transparent)]
    ParseIntervalError(#[from] ParseIntervalError),

    #[error("Change data feed is unsupported for the table at version {0}")]
    ChangeDataFeedUnsupported(Version),

    #[error("Change data feed encountered incompatible schema. Expected {0}, got {1}")]
    ChangeDataFeedIncompatibleSchema(String, String),

    /// Invalid checkpoint files
    #[error("Invalid Checkpoint: {0}")]
    InvalidCheckpoint(String),

    /// Error while transforming a schema + leaves into an Expression of literals
    #[error(transparent)]
    LiteralExpressionTransformError(
        #[from] crate::expressions::literal_expression_transform::Error,
    ),

    /// Schema mismatch has occurred or invalid schema used somewhere
    #[error("Schema error: {0}")]
    Schema(String),

    /// Validation error for file statistics (e.g., missing required clustering column stats)
    #[error("Stats validation error: {0}")]
    StatsValidation(String),

    /// Error during log history operations (timestamp queries, version lookups)
    #[error(transparent)]
    LogHistory(#[from] Box<crate::history_manager::error::LogHistoryError>),

    #[cfg(feature = "declarative-plans")]
    #[error("Declarative plan execution yielded the incorrect type: expected PlanResult::{expected}, got PlanResult::{actual}")]
    PlanResultTypeMismatch {
        expected: &'static str,
        actual: &'static str,
    },
}

// Convenience constructors for Error types that take a String argument
impl Error {
    pub(crate) fn checkpoint_write(msg: impl ToString) -> Self {
        Self::CheckpointWrite(msg.to_string())
    }

    pub fn generic_err(source: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Self {
        Self::GenericError {
            source: source.into(),
        }
    }
    pub fn generic(msg: impl ToString) -> Self {
        Self::Generic(msg.to_string())
    }
    pub fn file_not_found(path: impl ToString) -> Self {
        Self::FileNotFound(path.to_string())
    }
    pub fn missing_column(name: impl ToString) -> Self {
        Self::MissingColumn(name.to_string()).with_backtrace()
    }
    pub fn unexpected_column_type(name: impl ToString) -> Self {
        Self::UnexpectedColumnType(name.to_string())
    }
    pub fn invalid_partition_values(msg: impl ToString) -> Self {
        Self::InvalidPartitionValues(msg.to_string())
    }
    pub fn missing_data(name: impl ToString) -> Self {
        Self::MissingData(name.to_string())
    }
    pub fn deletion_vector(msg: impl ToString) -> Self {
        Self::DeletionVector(msg.to_string())
    }
    pub fn engine_data_type(msg: impl ToString) -> Self {
        Self::EngineDataType(msg.to_string())
    }
    pub fn join_failure(msg: impl ToString) -> Self {
        Self::JoinFailure(msg.to_string())
    }
    pub fn invalid_table_location(location: impl ToString) -> Self {
        Self::InvalidTableLocation(location.to_string())
    }
    pub fn invalid_column_mapping_mode(mode: impl ToString) -> Self {
        Self::InvalidColumnMappingMode(mode.to_string())
    }
    pub fn invalid_decimal(msg: impl ToString) -> Self {
        Self::InvalidDecimal(msg.to_string())
    }
    pub fn invalid_struct_data(msg: impl ToString) -> Self {
        Self::InvalidStructData(msg.to_string())
    }
    pub fn invalid_expression(msg: impl ToString) -> Self {
        Self::InvalidExpressionEvaluation(msg.to_string())
    }
    pub(crate) fn invalid_log_path(msg: impl ToString) -> Self {
        Self::InvalidLogPath(msg.to_string())
    }

    pub fn internal_error(msg: impl ToString) -> Self {
        Self::InternalError(msg.to_string()).with_backtrace()
    }

    pub fn invalid_protocol(msg: impl ToString) -> Self {
        Self::InvalidProtocol(msg.to_string())
    }

    pub fn invalid_transaction_state(msg: impl ToString) -> Self {
        Self::InvalidTransactionState(msg.to_string())
    }

    pub fn unsupported(msg: impl ToString) -> Self {
        Self::Unsupported(msg.to_string())
    }
    pub fn change_data_feed_unsupported(version: impl Into<Version>) -> Self {
        Self::ChangeDataFeedUnsupported(version.into())
    }
    pub(crate) fn change_data_feed_incompatible_schema(
        expected: &StructType,
        actual: &StructType,
    ) -> Self {
        Self::ChangeDataFeedIncompatibleSchema(format!("{expected:?}"), format!("{actual:?}"))
    }

    pub fn invalid_checkpoint(msg: impl ToString) -> Self {
        Self::InvalidCheckpoint(msg.to_string())
    }

    pub fn schema(msg: impl ToString) -> Self {
        Self::Schema(msg.to_string())
    }

    pub fn stats_validation(msg: impl ToString) -> Self {
        Self::StatsValidation(msg.to_string())
    }

    #[cfg(feature = "declarative-plans")]
    pub fn plan_result_type_mismatch(expected: &'static str, actual: &'static str) -> Self {
        Self::PlanResultTypeMismatch { expected, actual }
    }

    // Capture a backtrace when the error is constructed.
    #[must_use]
    pub fn with_backtrace(self) -> Self {
        let backtrace = Backtrace::capture();
        match backtrace.status() {
            BacktraceStatus::Captured => Self::Backtraced {
                source: Box::new(self),
                backtrace: Box::new(backtrace),
            },
            _ => self,
        }
    }
}

macro_rules! from_with_backtrace(
    ( $(($error_type: ty, $error_variant: ident)), * ) => {
        $(
            impl From<$error_type> for Error {
                fn from(value: $error_type) -> Self {
                    Self::$error_variant(value).with_backtrace()
                }
            }
        )*
    };
);

from_with_backtrace!(
    (serde_json::Error, MalformedJson),
    (std::io::Error, IOError)
);

#[cfg(feature = "default-engine-base")]
impl From<ArrowError> for Error {
    fn from(value: ArrowError) -> Self {
        Self::Arrow(value).with_backtrace()
    }
}

#[cfg(feature = "default-engine-base")]
impl From<object_store::Error> for Error {
    fn from(value: object_store::Error) -> Self {
        match value {
            object_store::Error::NotFound { path, .. } => Self::file_not_found(path),
            err => Self::ObjectStore(err),
        }
    }
}

/// This impl is needed so the `?` operator can auto-convert `Result<T, Infallible>` to
/// `DeltaResult<T>`. For example, `TryFrom` impls for infallible conversions use `Infallible` as
/// their error type, and this allows those results to be propagated with `?` in functions
/// returning `DeltaResult`. The match is unreachable since `Infallible` has no variants.
impl From<Infallible> for Error {
    fn from(value: Infallible) -> Self {
        match value {}
    }
}