buoyant_kernel/lib.rs
1//! # Delta Kernel
2//!
3//! Delta-kernel-rs is an experimental [Delta](https://github.com/delta-io/delta/) implementation
4//! focused on interoperability with a wide range of query engines. It supports reads and
5//! (experimental) writes (only blind appends in the write path currently). This library defines a
6//! number of traits which must be implemented to provide a working delta implementation. They are
7//! detailed below. There is a provided "default engine" that implements all these traits and can
8//! be used to ease integration work. See [`DefaultEngine`](engine/default/index.html) for more
9//! information.
10//!
11//! A full `rust` example for reading table data using the default engine can be found in the
12//! [read-table-single-threaded] example (and for a more complex multi-threaded reader see the
13//! [read-table-multi-threaded] example). An example for reading the table changes for a table
14//! using the default engine can be found in the [read-table-changes] example. The [write-table]
15//! example demonstrates how to write data to a Delta table using the default engine.
16//!
17//! [read-table-single-threaded]:
18//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-single-threaded
19//! [read-table-multi-threaded]:
20//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-multi-threaded
21//! [read-table-changes]:
22//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-changes
23//! [write-table]:
24//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/write-table
25//!
26//! # Engine trait
27//!
28//! The [`Engine`] trait allows connectors to bring their own implementation of functionality such
29//! as reading parquet files, listing files in a file system, parsing a JSON string etc. This
30//! trait exposes methods to get sub-engines which expose the core functionalities customizable by
31//! connectors.
32//!
33//! ## Expression handling
34//!
35//! Expression handling is done via the [`EvaluationHandler`], which in turn allows the creation of
36//! [`ExpressionEvaluator`]s. These evaluators are created for a specific predicate [`Expression`]
37//! and allow evaluation of that predicate for a specific batch of data.
38//!
39//! ## File system interactions
40//!
41//! Delta Kernel needs to perform some basic operations against file systems like listing and
42//! reading files. These interactions are encapsulated in the [`StorageHandler`] trait.
43//! Implementers must take care that all assumptions on the behavior of the functions - like sorted
44//! results - are respected.
45//!
46//! ## Reading log and data files
47//!
48//! Delta Kernel requires the capability to read and write json files and read parquet files, which
49//! is exposed via the [`JsonHandler`] and [`ParquetHandler`] respectively. When reading files,
50//! connectors are asked to provide the context information they require to execute the actual
51//! operation. This is done by invoking methods on the [`StorageHandler`] trait.
52
53#![cfg_attr(all(doc, NIGHTLY_CHANNEL), feature(doc_cfg))]
54#![warn(
55 unreachable_pub,
56 trivial_numeric_casts,
57 unused_extern_crates,
58 rust_2018_idioms,
59 rust_2021_compatibility,
60 clippy::unwrap_used,
61 clippy::expect_used,
62 clippy::panic
63)]
64// we re-allow panics in tests
65#![cfg_attr(test, allow(clippy::unwrap_used, clippy::expect_used, clippy::panic))]
66
67/// This `extern crate` declaration allows the macro to reliably refer to
68/// `delta_kernel::schema::DataType` no matter which crate invokes it. Without that, `delta_kernel`
69/// cannot invoke the macro because `delta_kernel` is an unknown crate identifier (you have to use
70/// `crate` instead). We could make the macro use `crate::schema::DataType` instead, but then the
71/// macro is useless outside the `delta_kernel` crate.
72// TODO: when running `cargo package -p delta_kernel` this gives 'unused' warning - #1095
73#[allow(unused_extern_crates)]
74extern crate self as delta_kernel;
75
76use std::any::Any;
77use std::cmp::Ordering;
78use std::fs::DirEntry;
79use std::ops::Range;
80use std::sync::Arc;
81use std::time::SystemTime;
82
83use bytes::Bytes;
84use url::Url;
85
86use self::schema::{DataType, SchemaRef};
87
88mod action_reconciliation;
89pub mod actions;
90pub mod checkpoint;
91pub mod committer;
92// Public under test-utils so integration tests can inspect CRC state via
93// Snapshot::get_current_crc_if_loaded_for_testing.
94#[cfg(feature = "test-utils")]
95pub mod crc;
96#[cfg(not(feature = "test-utils"))]
97pub(crate) mod crc;
98pub mod engine_data;
99pub mod error;
100pub mod expressions;
101pub mod incremental_scan;
102mod log_compaction;
103mod log_path;
104mod log_reader;
105pub mod metrics;
106pub mod partition;
107pub mod scan;
108pub mod schema;
109pub mod snapshot;
110pub mod table_changes;
111pub mod table_configuration;
112pub mod table_features;
113pub mod table_properties;
114pub mod transaction;
115pub mod transforms;
116
117pub use crc::{FileSizeHistogram, FileStats};
118pub use log_path::LogPath;
119
120// Public under test-utils so integration tests can call get_high_water_mark via snapshot.
121#[cfg(feature = "test-utils")]
122pub mod row_tracking;
123#[cfg(not(feature = "test-utils"))]
124pub(crate) mod row_tracking;
125
126pub(crate) mod clustering;
127
128mod arrow_compat;
129#[cfg(any(feature = "arrow-57", feature = "arrow-58"))]
130pub use arrow_compat::*;
131
132#[cfg(feature = "internal-api")]
133pub mod column_trie;
134#[cfg(not(feature = "internal-api"))]
135pub(crate) mod column_trie;
136pub mod kernel_predicates;
137pub(crate) mod utils;
138
139#[cfg(feature = "internal-api")]
140pub use utils::try_parse_uri;
141
142// for the below modules, we cannot introduce a macro to clean this up. rustfmt doesn't follow into
143// macros, and so will not format the files associated with these modules if we get too clever. see:
144// https://github.com/rust-lang/rustfmt/issues/3253
145
146#[cfg(feature = "internal-api")]
147pub mod path;
148#[cfg(not(feature = "internal-api"))]
149pub(crate) mod path;
150
151#[cfg(feature = "internal-api")]
152pub mod log_replay;
153#[cfg(not(feature = "internal-api"))]
154pub(crate) mod log_replay;
155
156#[cfg(feature = "internal-api")]
157pub mod log_segment;
158#[cfg(not(feature = "internal-api"))]
159pub(crate) mod log_segment;
160
161#[cfg(feature = "internal-api")]
162pub mod last_checkpoint_hint;
163#[cfg(not(feature = "internal-api"))]
164pub(crate) mod last_checkpoint_hint;
165
166pub(crate) mod log_segment_files;
167
168pub mod history_manager;
169
170#[cfg(feature = "internal-api")]
171pub mod parallel;
172#[cfg(not(feature = "internal-api"))]
173pub(crate) mod parallel;
174
175pub use action_reconciliation::{ActionReconciliationIterator, ActionReconciliationIteratorState};
176pub use delta_kernel_derive;
177use delta_kernel_derive::internal_api;
178pub use engine_data::{
179 EngineData, FilteredEngineData, FilteredRowVisitor, GetData, RowIndexIterator, RowVisitor,
180};
181pub use error::{DeltaResult, Error};
182use expressions::{literal_expression_transform, Scalar};
183pub use expressions::{Expression, ExpressionRef, Predicate, PredicateRef};
184pub use log_compaction::{should_compact, LogCompactionWriter};
185use schema::{StructField, StructType};
186pub use snapshot::{Snapshot, SnapshotRef};
187
188#[cfg(any(
189 feature = "default-engine-native-tls",
190 feature = "default-engine-rustls",
191 feature = "arrow-conversion"
192))]
193pub mod engine;
194
195/// Delta table version is 8 byte unsigned int
196pub type Version = u64;
197
198pub type FileSize = u64;
199pub type FileIndex = u64;
200
201/// A specification for a range of bytes to read from a file location
202pub type FileSlice = (Url, Option<Range<FileIndex>>);
203
204/// Data read from a Delta table file and the corresponding scan file information.
205pub type FileDataReadResult = (FileMeta, Box<dyn EngineData>);
206
207/// An iterator of data read from specified files
208pub type FileDataReadResultIterator =
209 Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>;
210
211/// The metadata that describes an object.
212#[derive(Debug, Clone, PartialEq, Eq)]
213pub struct FileMeta {
214 /// The fully qualified path to the object
215 pub location: Url,
216 /// The last modified time as milliseconds since unix epoch
217 pub last_modified: i64,
218 /// The size in bytes of the object
219 pub size: FileSize,
220}
221
222impl Ord for FileMeta {
223 fn cmp(&self, other: &Self) -> Ordering {
224 self.location.cmp(&other.location)
225 }
226}
227
228impl PartialOrd for FileMeta {
229 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
230 Some(self.cmp(other))
231 }
232}
233
234impl TryFrom<DirEntry> for FileMeta {
235 type Error = Error;
236
237 fn try_from(ent: DirEntry) -> DeltaResult<FileMeta> {
238 let metadata = ent.metadata()?;
239 let last_modified = metadata
240 .modified()?
241 .duration_since(SystemTime::UNIX_EPOCH)
242 .map_err(|_| Error::generic("Failed to convert file timestamp to milliseconds"))?;
243 let location = Url::from_file_path(ent.path())
244 .map_err(|_| Error::generic(format!("Invalid path: {:?}", ent.path())))?;
245 let last_modified = last_modified.as_millis().try_into().map_err(|_| {
246 Error::generic(format!(
247 "Failed to convert file modification time {:?} into i64",
248 last_modified.as_millis()
249 ))
250 })?;
251 Ok(FileMeta {
252 location,
253 last_modified,
254 size: metadata.len(),
255 })
256 }
257}
258
259impl FileMeta {
260 /// Create a new instance of `FileMeta`
261 pub fn new(location: Url, last_modified: i64, size: u64) -> Self {
262 Self {
263 location,
264 last_modified,
265 size,
266 }
267 }
268
269 /// Casts `size` to `i64`. Errors if `size` exceeds `i64::MAX`.
270 pub(crate) fn size_as_i64(&self) -> DeltaResult<i64> {
271 i64::try_from(self.size)
272 .map_err(|_| Error::generic(format!("file size {} exceeds i64::MAX", self.size)))
273 }
274}
275
276/// Extension trait that makes it easier to work with traits objects that implement [`Any`],
277/// implemented automatically for any type that satisfies `Any`, `Send`, and `Sync`. In particular,
278/// given some `trait T: Any + Send + Sync`, it allows upcasting `T` to `dyn Any + Send + Sync`,
279/// which in turn allows downcasting the result to a concrete type.
280///
281/// For example, the following code will compile:
282///
283/// ```
284/// # use buoyant_kernel as delta_kernel;
285/// # use delta_kernel::AsAny;
286/// # use std::any::Any;
287/// # use std::sync::Arc;
288/// trait Foo : AsAny {}
289/// struct Bar;
290/// impl Foo for Bar {}
291///
292/// let f: Arc<dyn Foo> = Arc::new(Bar);
293/// let a: Arc<dyn Any + Send + Sync> = f.as_any();
294/// let b: Arc<Bar> = a.downcast().unwrap();
295/// ```
296///
297/// In contrast, very similar code that relies only on `Any` would fail to compile:
298///
299/// ```fail_compile
300/// # use std::any::Any;
301/// # use std::sync::Arc;
302/// trait Foo: Any + Send + Sync {}
303///
304/// struct Bar;
305/// impl Foo for Bar {}
306///
307/// let f: Arc<dyn Foo> = Arc::new(Bar);
308/// let b: Arc<Bar> = f.downcast().unwrap(); // `Arc::downcast` method not found
309/// ```
310///
311/// As would this:
312///
313/// ```fail_compile
314/// # use std::any::Any;
315/// # use std::sync::Arc;
316/// trait Foo: Any + Send + Sync {}
317///
318/// struct Bar;
319/// impl Foo for Bar {}
320///
321/// let f: Arc<dyn Foo> = Arc::new(Bar);
322/// let a: Arc<dyn Any + Send + Sync> = f; // trait upcasting coercion is not stable rust
323/// let f: Arc<Bar> = a.downcast().unwrap();
324/// ```
325///
326/// NOTE: `AsAny` inherits the `Send + Sync` constraint from [`Arc::downcast`].
327pub trait AsAny: Any + Send + Sync {
328 /// Obtains a `dyn Any` reference to the object:
329 ///
330 /// ```
331 /// # use buoyant_kernel as delta_kernel;
332 /// # use delta_kernel::AsAny;
333 /// # use std::any::Any;
334 /// # use std::sync::Arc;
335 /// trait Foo : AsAny {}
336 /// struct Bar;
337 /// impl Foo for Bar {}
338 ///
339 /// let f: &dyn Foo = &Bar;
340 /// let a: &dyn Any = f.any_ref();
341 /// let b: &Bar = a.downcast_ref().unwrap();
342 /// ```
343 fn any_ref(&self) -> &(dyn Any + Send + Sync);
344
345 /// Obtains an `Arc<dyn Any>` reference to the object:
346 ///
347 /// ```
348 /// # use buoyant_kernel as delta_kernel;
349 /// # use delta_kernel::AsAny;
350 /// # use std::any::Any;
351 /// # use std::sync::Arc;
352 /// trait Foo : AsAny {}
353 /// struct Bar;
354 /// impl Foo for Bar {}
355 ///
356 /// let f: Arc<dyn Foo> = Arc::new(Bar);
357 /// let a: Arc<dyn Any + Send + Sync> = f.as_any();
358 /// let b: Arc<Bar> = a.downcast().unwrap();
359 /// ```
360 fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
361
362 /// Converts the object to `Box<dyn Any>`:
363 ///
364 /// ```
365 /// # use buoyant_kernel as delta_kernel;
366 /// # use delta_kernel::AsAny;
367 /// # use std::any::Any;
368 /// # use std::sync::Arc;
369 /// trait Foo : AsAny {}
370 /// struct Bar;
371 /// impl Foo for Bar {}
372 ///
373 /// let f: Box<dyn Foo> = Box::new(Bar);
374 /// let a: Box<dyn Any> = f.into_any();
375 /// let b: Box<Bar> = a.downcast().unwrap();
376 /// ```
377 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
378
379 /// Convenient wrapper for [`std::any::type_name`], since [`Any`] does not provide it and
380 /// [`Any::type_id`] is useless as a debugging aid (its `Debug` is just a mess of hex digits).
381 fn type_name(&self) -> &'static str;
382}
383
384// Blanket implementation for all eligible types
385impl<T: Any + Send + Sync> AsAny for T {
386 fn any_ref(&self) -> &(dyn Any + Send + Sync) {
387 self
388 }
389 fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
390 self
391 }
392 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
393 self
394 }
395 fn type_name(&self) -> &'static str {
396 std::any::type_name::<Self>()
397 }
398}
399
400/// Extension trait that facilitates object-safe implementations of `PartialEq`.
401pub trait DynPartialEq: AsAny {
402 fn dyn_eq(&self, other: &dyn Any) -> bool;
403}
404
405// Blanket implementation for all eligible types
406impl<T: PartialEq + AsAny> DynPartialEq for T {
407 fn dyn_eq(&self, other: &dyn Any) -> bool {
408 other.downcast_ref::<T>().is_some_and(|other| self == other)
409 }
410}
411
412/// Trait for implementing an Expression evaluator.
413///
414/// It contains one Expression which can be evaluated on multiple ColumnarBatches.
415/// Connectors can implement this trait to optimize the evaluation using the
416/// connector specific capabilities.
417pub trait ExpressionEvaluator: AsAny {
418 /// Evaluate the expression on a given EngineData.
419 ///
420 /// Produces one value for each row of the input.
421 /// The data type of the output is same as the type output of the expression this evaluator is
422 /// using.
423 fn evaluate(&self, batch: &dyn EngineData) -> DeltaResult<Box<dyn EngineData>>;
424}
425
426/// Trait for implementing a Predicate evaluator.
427///
428/// It contains one Predicate which can be evaluated on multiple ColumnarBatches.
429/// Connectors can implement this trait to optimize the evaluation using the
430/// connector specific capabilities.
431pub trait PredicateEvaluator: AsAny {
432 /// Evaluate the predicate on a given EngineData.
433 ///
434 /// Produces one boolean value for each row of the input.
435 fn evaluate(&self, batch: &dyn EngineData) -> DeltaResult<Box<dyn EngineData>>;
436}
437
438/// Provides expression evaluation capability to Delta Kernel.
439///
440/// Delta Kernel can use this handler to evaluate a predicate on partition filters,
441/// fill up partition column values, and any computation on data using Expressions.
442pub trait EvaluationHandler: AsAny {
443 /// Create an [`ExpressionEvaluator`] that can evaluate the given [`Expression`]
444 /// on columnar batches with the given [`Schema`] to produce data of [`DataType`].
445 ///
446 /// If the provided output type is a struct, its fields describe the columns of output produced
447 /// by the evaluator. Otherwise, the output schema is a single column named "output" of the
448 /// specified `output_type`. In all cases, the output schema is only used for its names (all
449 /// field names will be updated to match) and nullability (non-nullable columns can be converted
450 /// to nullable). Any mismatch in types (including number of columns) will produce an error.
451 ///
452 /// # Parameters
453 ///
454 /// - `input_schema`: Schema of the input data.
455 /// - `expression`: Expression to evaluate.
456 /// - `output_type`: Expected result data type.
457 ///
458 /// [`Schema`]: crate::schema::StructType
459 /// [`DataType`]: crate::schema::DataType
460 fn new_expression_evaluator(
461 &self,
462 input_schema: SchemaRef,
463 expression: ExpressionRef,
464 output_type: DataType,
465 ) -> DeltaResult<Arc<dyn ExpressionEvaluator>>;
466
467 /// Create a [`PredicateEvaluator`] that can evaluate the given [`Predicate`] on columnar
468 /// batches with the given [`Schema`] to produce a column of boolean results.
469 ///
470 /// The output schema is a single nullable boolean column named "output".
471 ///
472 /// # Parameters
473 ///
474 /// - `input_schema`: Schema of the input data.
475 /// - `predicate`: Predicate to evaluate.
476 ///
477 /// [`Schema`]: crate::schema::StructType
478 fn new_predicate_evaluator(
479 &self,
480 input_schema: SchemaRef,
481 predicate: PredicateRef,
482 ) -> DeltaResult<Arc<dyn PredicateEvaluator>>;
483
484 /// Create a single-row all-null-value [`EngineData`] with the schema specified by
485 /// `output_schema`.
486 // NOTE: we should probably allow DataType instead of SchemaRef, but can expand that in the
487 // future.
488 fn null_row(&self, output_schema: SchemaRef) -> DeltaResult<Box<dyn EngineData>>;
489
490 /// Create a multi-row [`EngineData`] by applying the given schema to multiple rows of values.
491 ///
492 /// Each element in `rows` represents one row of data, where each row is a slice of structured
493 /// scalar values (one scalar per top-level field in the schema).
494 ///
495 /// # Parameters
496 ///
497 /// - `schema`: Schema describing the structure of each row.
498 /// - `rows`: Slice of rows, where each row contains one structured scalar per top-level schema
499 /// field.
500 ///
501 /// # Returns
502 ///
503 /// A multi-row `EngineData` containing all rows.
504 ///
505 /// # Errors
506 ///
507 /// Returns an error if any row has a number of scalars that does not match the number of
508 /// top-level fields in `schema`, or if any scalar value cannot be appended to its corresponding
509 /// field's builder (e.g. due to a type mismatch).
510 ///
511 /// # Example
512 ///
513 /// For a schema with fields `[add: Struct, remove: Struct]`, each row should contain exactly 2
514 /// scalars: one for the `add` field and one for the `remove` field.
515 fn create_many(
516 &self,
517 schema: SchemaRef,
518 rows: &[&[Scalar]],
519 ) -> DeltaResult<Box<dyn EngineData>>;
520}
521
522/// Internal trait to allow us to have a private `create_one` API that's implemented for all
523/// EvaluationHandlers.
524// For some reason rustc doesn't detect it's usage so we allow(dead_code) here...
525#[allow(dead_code)]
526#[internal_api]
527trait EvaluationHandlerExtension: EvaluationHandler {
528 /// Create a single-row [`EngineData`] by applying the given schema to the leaf-values given in
529 /// `values`.
530 // Note: we will stick with a Schema instead of DataType (more constrained can expand in
531 // future)
532 fn create_one(&self, schema: SchemaRef, values: &[Scalar]) -> DeltaResult<Box<dyn EngineData>> {
533 // just get a single int column (arbitrary)
534 let null_row_schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable(
535 "null_col",
536 DataType::INTEGER,
537 )]));
538 let null_row = self.null_row(null_row_schema.clone())?;
539
540 // Convert schema and leaf values to an expression
541 let row_expr = literal_expression_transform(schema.as_ref(), values)?;
542
543 let eval =
544 self.new_expression_evaluator(null_row_schema, row_expr.into(), schema.into())?;
545 eval.evaluate(null_row.as_ref())
546 }
547}
548
549// Auto-implement the extension trait for all EvaluationHandlers
550impl<T: EvaluationHandler + ?Sized> EvaluationHandlerExtension for T {}
551
552/// A trait that allows converting a type into (single-row) EngineData
553///
554/// This is typically used with the `#[derive(IntoEngineData)]` macro
555/// which leverages the traits `ToDataType` and `Into<Scalar>` for struct fields
556/// to convert a struct into EngineData.
557///
558/// # Example
559/// ```ignore
560/// # use std::sync::Arc;
561/// # use delta_kernel_derive::{Schema, IntoEngineData};
562///
563/// #[derive(Schema, IntoEngineData)]
564/// struct MyStruct {
565/// a: i32,
566/// b: String,
567/// }
568///
569/// let my_struct = MyStruct { a: 42, b: "Hello".to_string() };
570/// // typically used with ToSchema
571/// let schema = Arc::new(MyStruct::to_schema());
572/// // single-row EngineData
573/// let engine = todo!(); // create an engine
574/// let engine_data = my_struct.into_engine_data(schema, engine);
575/// ```
576#[internal_api]
577pub(crate) trait IntoEngineData {
578 /// Consume this type to produce a single-row EngineData using the provided schema.
579 fn into_engine_data(
580 self,
581 schema: SchemaRef,
582 engine: &dyn Engine,
583 ) -> DeltaResult<Box<dyn EngineData>>;
584}
585
586/// Provides file system related functionalities to Delta Kernel.
587///
588/// Delta Kernel uses this handler whenever it needs to access the underlying
589/// file system where the Delta table is present. Connector implementation of
590/// this trait can hide filesystem specific details from Delta Kernel.
591pub trait StorageHandler: AsAny {
592 /// Recursively list files whose full path is lexicographically greater than (UTF-8 sorting)
593 /// the given `path`, restricted to descendants of `path`'s parent directory. The result must
594 /// be sorted by the full path (UTF-8 byte order).
595 ///
596 /// The listing is **recursive**: files in nested subdirectories are included, not just files
597 /// directly under the parent. For example, listing from `dir/0001.json` may return
598 /// `dir/0002.json`, `dir/sub/0003.json`, and `dir/sub/nested/0004.json`, all interleaved
599 /// in lexicographic order.
600 ///
601 /// The parent directory is derived from `path`:
602 /// - If `path` is directory-like (ends with `/`), the parent is `path` itself and the result
603 /// contains all files at or below that directory.
604 /// - Otherwise, the parent is the directory containing `path`, and only files (at any depth
605 /// under that parent) whose full path sorts strictly greater than `path` are returned.
606 fn list_from(&self, path: &Url)
607 -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>>;
608
609 /// Read data specified by the start and end offset from the file.
610 fn read_files(
611 &self,
612 files: Vec<FileSlice>,
613 ) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>>;
614
615 /// Copy a file atomically from source to destination. If the destination file already exists,
616 /// it must return Err(Error::FileAlreadyExists).
617 fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()>;
618
619 /// Write data to the specified path.
620 ///
621 /// If `overwrite` is false and the file already exists, this must return
622 /// `Err(Error::FileAlreadyExists)`.
623 fn put(&self, path: &Url, data: Bytes, overwrite: bool) -> DeltaResult<()>;
624
625 /// Perform a HEAD request for the given file at a Url, returning the file metadata.
626 ///
627 /// If the file does not exist, this must return an `Err` with [`Error::FileNotFound`].
628 fn head(&self, path: &Url) -> DeltaResult<FileMeta>;
629}
630
631/// Provides JSON handling functionality to Delta Kernel.
632///
633/// Delta Kernel can use this handler to parse JSON strings into Row or read content from JSON
634/// files. Connectors can leverage this trait to provide their best implementation of the JSON
635/// parsing capability to Delta Kernel.
636pub trait JsonHandler: AsAny {
637 /// Parse the given json strings and return the fields requested by output schema as columns in
638 /// [`EngineData`]. json_strings MUST be a single column batch of engine data, and the
639 /// column type must be string
640 fn parse_json(
641 &self,
642 json_strings: Box<dyn EngineData>,
643 output_schema: SchemaRef,
644 ) -> DeltaResult<Box<dyn EngineData>>;
645
646 /// Read and parse the JSON format file at given locations and return the data as EngineData
647 /// with the columns requested by physical schema. Note: The [`FileDataReadResultIterator`]
648 /// must emit data from files in the order that `files` is given. For example if files ["a",
649 /// "b"] is provided, then the engine data iterator must first return all the engine data
650 /// from file "a", _then_ all the engine data from file "b". Moreover, for a given file, all
651 /// of its [`EngineData`] and constituent rows must be in order that they occur in the file.
652 /// Consider a file with rows (1, 2, 3). The following are legal iterator batches:
653 /// iter: [EngineData(1, 2), EngineData(3)]
654 /// iter: [EngineData(1), EngineData(2, 3)]
655 /// iter: [EngineData(1, 2, 3)]
656 /// The following are illegal batches:
657 /// iter: [EngineData(3), EngineData(1, 2)]
658 /// iter: [EngineData(1), EngineData(3, 2)]
659 /// iter: [EngineData(2, 1, 3)]
660 ///
661 /// Additionally, engines may not merge engine data across file boundaries.
662 ///
663 /// # Parameters
664 ///
665 /// - `files` - File metadata for files to be read.
666 /// - `physical_schema` - Select list of columns to read from the JSON file.
667 /// - `predicate` - Optional push-down predicate hint (engine is free to ignore it).
668 fn read_json_files(
669 &self,
670 files: &[FileMeta],
671 physical_schema: SchemaRef,
672 predicate: Option<PredicateRef>,
673 ) -> DeltaResult<FileDataReadResultIterator>;
674
675 /// Atomically (!) write a single JSON file. Each row of the input data should be written as a
676 /// new JSON object appended to the file. this write must:
677 /// (1) serialize the data to newline-delimited json (each row is a json object literal)
678 /// (2) write the data to storage atomically (i.e. if the file already exists, fail unless the
679 /// overwrite flag is set)
680 ///
681 /// For example, the JSON data should be written as { "column1": "val1", "column2": "val2", .. }
682 /// with each row on a new line.
683 ///
684 /// NOTE: Null columns should not be written to the JSON file. For example, if a row has columns
685 /// ["a", "b"] and the value of "b" is null, the JSON object should be written as
686 /// { "a": "..." }. Note that including nulls is technically valid JSON, but would bloat the
687 /// log, therefore we recommend omitting them.
688 ///
689 /// # Parameters
690 ///
691 /// - `path` - URL specifying the location to write the JSON file
692 /// - `data` - Iterator of EngineData to write to the JSON file. Each row should be written as a
693 /// new JSON object appended to the file. (that is, the file is newline-delimited JSON, and
694 /// each row is a JSON object on a single line)
695 /// - `overwrite` - If true, overwrite the file if it exists. If false, the call must fail if
696 /// the file exists.
697 fn write_json_file(
698 &self,
699 path: &Url,
700 data: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
701 overwrite: bool,
702 ) -> DeltaResult<()>;
703}
704
705/// Reserved field IDs for metadata columns in Delta tables.
706///
707/// These field IDs are reserved and should not be used for regular table columns.
708/// They are used to provide file-level metadata as virtual columns during reads.
709pub mod reserved_field_ids {
710 /// Reserved field ID for the file name metadata column (`_file`).
711 /// This column provides the name of the Parquet file that contains each row.
712 pub const FILE_NAME: i64 = 2147483646;
713}
714
715/// Metadata from a Parquet file footer.
716///
717/// This struct contains metadata extracted from a Parquet file's footer, including the schema.
718/// It is designed to be extensible for future additions such as row group statistics.
719#[derive(Debug, Clone)]
720pub struct ParquetFooter {
721 /// The schema of the Parquet file, converted to Delta Kernel's schema format.
722 pub schema: SchemaRef,
723}
724
725/// Provides Parquet file related functionalities to Delta Kernel.
726///
727/// Connectors can leverage this trait to provide their own custom
728/// implementation of Parquet data file functionalities to Delta Kernel.
729pub trait ParquetHandler: AsAny {
730 /// Read and parse the Parquet file at given locations and return the data as EngineData with
731 /// the columns requested by physical schema. The ParquetHandler _must_ return exactly the
732 /// columns specified in `physical_schema`, and they _must_ be in schema order.
733 ///
734 /// # Resolving Parquet schema to the physical schema
735 ///
736 /// When reading the Parquet file, the columns are resolved from the Parquet schema to the
737 /// kernel's `physical_schema`. To do so, the parquet reader must match each Parquet column
738 /// to a [`StructField`] in the `physical_schema`. All columns in the returned `EngineData`
739 /// must be in the same order as specified in `physical_schema`.
740 ///
741 /// Parquet columns are matched to `physical_schema` [`StructField`]s using the following rules:
742 /// 1. **Field ID**: If a [`StructField`] in `physical_schema` contains a field ID (specified in
743 /// [`ColumnMetadataKey::ParquetFieldId`] metadata), use the ID to match the Parquet column's
744 /// field id
745 /// 2. **Field Name**: If no field ID is present in the `physical_schema`'s [`StructField`] or
746 /// no matching parquet field ID is found, fall back to matching by column name
747 ///
748 /// # Metadata Columns
749 ///
750 /// The ParquetHandler must support virtual metadata columns that provide additional information
751 /// about each row. These columns are not stored in the Parquet file but are generated at read
752 /// time.
753 ///
754 /// ## Row Index Column
755 ///
756 /// When a column in `physical_schema` is marked as a row index metadata column (via
757 /// [`StructField::create_metadata_column`] with [`schema::MetadataColumnSpec::RowIndex`]), the
758 /// ParquetHandler must populate it with the 0-based row position within the Parquet file:
759 ///
760 /// - **Column name**: User-specified (commonly `"row_index"` or `"_metadata.row_index"`)
761 /// - **Type**: `LONG` (non-nullable)
762 /// - **Values**: Sequential integers starting at 0 for each file
763 /// - **Use case**: Track row positions for downstream processing, or internally used to compute
764 /// Row IDs
765 ///
766 /// Example: A file with 5 rows would have row_index values `[0, 1, 2, 3, 4]`.
767 ///
768 /// ## File Name Column (Reserved Field ID)
769 ///
770 /// When a column in `physical_schema` has the reserved field ID
771 /// [`reserved_field_ids::FILE_NAME`] (2147483646), the ParquetHandler must populate it
772 /// with the file path/name:
773 ///
774 /// - **Column name**: `"_file"`
775 /// - **Type**: `STRING` (non-nullable)
776 /// - **Field ID**: 2147483646 (reserved)
777 /// - **Values**: The file path/URL (e.g., `"s3://bucket/path/file.parquet"`)
778 /// - **Use case**: Track which file each row came from in multi-file reads
779 ///
780 /// Example: All rows from the same file would have the same `_file` value.
781 ///
782 /// ## Metadata Column Examples
783 ///
784 /// ```rust,ignore
785 /// use delta_kernel::schema::{StructType, StructField, DataType, MetadataColumnSpec};
786 ///
787 /// // Example 1: Schema with row_index metadata column
788 /// let schema_with_row_index = StructType::try_new([
789 /// StructField::nullable("id", DataType::INTEGER),
790 /// StructField::create_metadata_column("row_index", MetadataColumnSpec::RowIndex),
791 /// StructField::nullable("value", DataType::STRING),
792 /// ])?;
793 ///
794 /// // Example 2: Schema with _file metadata column (using reserved field ID)
795 /// let schema_with_file_path = StructType::try_new([
796 /// StructField::nullable("id", DataType::INTEGER),
797 /// StructField::create_metadata_column("_file", MetadataColumnSpec::FilePath),
798 /// StructField::nullable("value", DataType::STRING),
799 /// ])?;
800 /// ```
801 ///
802 /// ---
803 ///
804 /// If no matching Parquet column is found, `NULL` values are returned
805 /// for nullable columns in `physical_schema`. For non-nullable columns, an error is returned.
806 ///
807 ///
808 /// ## Column Matching Examples
809 ///
810 /// Consider a `physical_schema` with the following fields:
811 /// - Column 0: `"i_logical"` (integer, non-null) with field ID 1 (via
812 /// [`ColumnMetadataKey::ParquetFieldId`])
813 /// - Column 1: `"s"` (string, nullable) with no field ID metadata
814 /// - Column 2: `"i2"` (integer, nullable) with no field ID metadata
815 ///
816 /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey::ParquetFieldId
817 ///
818 /// And a Parquet file containing these columns:
819 /// - Column 0: `"i2"` (integer, nullable) with field ID 3
820 /// - Column 1: `"i"` (integer, non-null) with field ID 1
821 /// - No `"s"` column present
822 ///
823 /// The column matching would work as follows:
824 /// - `"i_logical"` matches `"i"` by field ID (both have ID 1)
825 /// - `"i2"` matches `"i2"` by column name (no field ID to match on)
826 /// - `"s"` has no matching Parquet column, so NULL values are returned
827 ///
828 /// The returned data will contain exactly 3 columns in physical schema order:
829 /// `{i_logical: parquet[1], s: NULL.., i2: parquet[0]}`
830 ///
831 /// # Parameters
832 ///
833 /// - `files` - File metadata for files to be read.
834 /// - `physical_schema` - Select list and order of columns to read from the Parquet file.
835 /// - `predicate` - Optional push-down predicate hint (engine is free to ignore it).
836 ///
837 /// # Returns
838 /// A [`DeltaResult`] containing a [`FileDataReadResultIterator`].
839 /// Each element of the iterator is a [`DeltaResult`] of [`EngineData`]. The [`EngineData`]
840 /// has the contents of `files` and must match the provided `physical_schema`.
841 ///
842 /// Note: The [`FileDataReadResultIterator`] must emit data from files in the order that `files`
843 /// is given. For example if files ["a", "b"] is provided, then the engine data iterator must
844 /// first return all the engine data from file "a", _then_ all the engine data from file "b".
845 /// Moreover, for a given file, all of its [`EngineData`] and constituent rows must be in order
846 /// that they occur in the file. Consider a file with rows
847 /// (1, 2, 3). The following are legal iterator batches:
848 /// iter: [EngineData(1, 2), EngineData(3)]
849 /// iter: [EngineData(1), EngineData(2, 3)]
850 /// iter: [EngineData(1, 2, 3)]
851 /// The following are illegal batches:
852 /// iter: [EngineData(3), EngineData(1, 2)]
853 /// iter: [EngineData(1), EngineData(3, 2)]
854 /// iter: [EngineData(2, 1, 3)]
855 ///
856 /// Additionally, engines must not merge engine data across file boundaries.
857 ///
858 /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey
859 fn read_parquet_files(
860 &self,
861 files: &[FileMeta],
862 physical_schema: SchemaRef,
863 predicate: Option<PredicateRef>,
864 ) -> DeltaResult<FileDataReadResultIterator>;
865
866 /// Write data to a Parquet file at the specified URL.
867 ///
868 /// This method writes the provided `data` to a Parquet file at the given `url`.
869 ///
870 /// This will overwrite the file if it already exists. For filesystem-backed
871 /// implementations, the parent directories must be created if they do not exist.
872 ///
873 /// # Parquet field IDs
874 ///
875 /// The engine must write a Parquet `field_id` correctly when the kernel
876 /// [`StructField`] carries a field-id related annotation, including:
877 /// - [`ColumnMetadataKey::ColumnMappingId`] / [`ColumnMetadataKey::ParquetFieldId`]
878 /// - [`ColumnMetadataKey::ColumnMappingNestedIds`]
879 ///
880 /// For how to use these keys, refer to the Delta protocol's [Column Mapping] and
881 /// [IcebergCompatV2] sections.
882 ///
883 /// **Non-compliance produces files with incorrect `field_id`s**, which may lead to
884 /// read failures when column mapping mode is `id` and to failures when converting
885 /// the table to Iceberg.
886 ///
887 /// # Parameters
888 ///
889 /// - `url` - The full URL path where the Parquet file should be written (e.g.,
890 /// `s3://bucket/path/file.parquet`).
891 /// - `data` - An iterator of engine data to be written to the Parquet file.
892 ///
893 /// # Returns
894 ///
895 /// A [`DeltaResult`] indicating success or failure.
896 ///
897 /// [`StructField`]: crate::schema::StructField
898 /// [`ColumnMetadataKey::ColumnMappingId`]: crate::schema::ColumnMetadataKey::ColumnMappingId
899 /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey::ParquetFieldId
900 /// [`ColumnMetadataKey::ColumnMappingNestedIds`]: crate::schema::ColumnMetadataKey::ColumnMappingNestedIds
901 /// [Column Mapping]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#column-mapping
902 /// [IcebergCompatV2]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#iceberg-compatibility-v2
903 fn write_parquet_file(
904 &self,
905 location: url::Url,
906 data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>,
907 ) -> DeltaResult<()>;
908
909 /// Read the footer metadata from a Parquet file without reading the data.
910 ///
911 /// This method reads only the Parquet file footer (metadata section), which is useful for
912 /// schema inspection, compatibility checking, and determining whether parsed statistics
913 /// columns are present and compatible with the current table schema.
914 ///
915 /// # Parameters
916 ///
917 /// - `file` - File metadata for the Parquet file whose footer should be read. The `size` field
918 /// should contain the actual file size to enable efficient footer reads without additional
919 /// I/O operations.
920 ///
921 /// # Returns
922 ///
923 /// A [`DeltaResult`] containing a [`ParquetFooter`] with the Parquet file's metadata, including
924 /// the schema converted to Delta Kernel's format.
925 ///
926 /// # Field IDs
927 ///
928 /// If the Parquet file contains field IDs (written when column mapping is enabled), they are
929 /// preserved in each [`StructField`]'s metadata. Callers can access field IDs via
930 /// [`StructField::get_config_value`] with [`ColumnMetadataKey::ParquetFieldId`].
931 ///
932 /// # Errors
933 ///
934 /// Returns an error if:
935 /// - The file cannot be accessed or does not exist
936 /// - The file is not a valid Parquet file
937 /// - The footer cannot be read or parsed
938 /// - The schema cannot be converted to Delta Kernel's format
939 ///
940 /// [`StructField`]: crate::schema::StructField
941 /// [`StructField::get_config_value`]: crate::schema::StructField::get_config_value
942 /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey::ParquetFieldId
943 fn read_parquet_footer(&self, file: &FileMeta) -> DeltaResult<ParquetFooter>;
944}
945
946/// The `Engine` trait encapsulates all the functionality an engine or connector needs to provide
947/// to the Delta Kernel in order to read the Delta table.
948///
949/// Engines/Connectors are expected to pass an implementation of this trait when reading a Delta
950/// table.
951pub trait Engine: AsAny {
952 /// Get the connector provided [`EvaluationHandler`].
953 fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler>;
954
955 /// Get the connector provided [`StorageHandler`]
956 fn storage_handler(&self) -> Arc<dyn StorageHandler>;
957
958 /// Get the connector provided [`JsonHandler`].
959 fn json_handler(&self) -> Arc<dyn JsonHandler>;
960
961 /// Get the connector provided [`ParquetHandler`].
962 fn parquet_handler(&self) -> Arc<dyn ParquetHandler>;
963}
964
965// we have an 'internal' feature flag: default-engine-base, which is actually just the shared
966// pieces of default-engine-native-tls and default-engine-rustls. the crate can't compile with
967// _only_ default-engine-base, so we give a friendly error here.
968#[cfg(all(
969 feature = "default-engine-base",
970 not(any(
971 feature = "default-engine-native-tls",
972 feature = "default-engine-rustls",
973 ))
974))]
975compile_error!(
976 "The default-engine-base feature flag is not meant to be used directly. \
977 Please use either default-engine-native-tls or default-engine-rustls."
978);
979
980// Rustdoc's documentation tests can do some things that regular unit tests can't. Here we are
981// using doctests to test macros. Specifically, we are testing for failed macro invocations due
982// to invalid input, not the macro output when the macro invocation is successful (which can/should
983// be done in unit tests). This module is not exclusively for macro tests only so other doctests can
984// also be added. https://doc.rust-lang.org/rustdoc/write-documentation/documentation-tests.html#include-items-only-when-collecting-doctests
985#[cfg(doctest)]
986mod doctests;
987
988#[cfg(test)]
989mod tests {
990 use rstest::rstest;
991
992 use super::*;
993
994 #[rstest]
995 #[case::zero(0, Some(0))]
996 #[case::one(1, Some(1))]
997 #[case::i64_max(i64::MAX as u64, Some(i64::MAX))]
998 #[case::just_over_i64_max(i64::MAX as u64 + 1, None)]
999 #[case::u64_max(u64::MAX, None)]
1000 /// Tests `FileMeta::size_as_i64` for both success (size fits in i64) and error (size exceeds
1001 /// i64::MAX) paths.
1002 fn test_file_meta_size_as_i64(#[case] size: u64, #[case] expected: Option<i64>) {
1003 let meta = FileMeta::new(Url::parse("file:///x").unwrap(), 0, size);
1004 match expected {
1005 Some(v) => assert_eq!(meta.size_as_i64().unwrap(), v),
1006 None => assert!(meta
1007 .size_as_i64()
1008 .unwrap_err()
1009 .to_string()
1010 .contains("exceeds i64::MAX")),
1011 }
1012 }
1013}