buoyant_kernel/lib.rs
1//! # Delta Kernel
2//!
3//! Delta-kernel-rs is an experimental [Delta](https://github.com/delta-io/delta/) implementation
4//! focused on interoperability with a wide range of query engines. It supports reads and
5//! (experimental) writes (only blind appends in the write path currently). This library defines a
6//! number of traits which must be implemented to provide a working delta implementation. They are
7//! detailed below. There is a provided "default engine" that implements all these traits and can
8//! be used to ease integration work. See [`DefaultEngine`](engine/default/index.html) for more
9//! information.
10//!
11//! A full `rust` example for reading table data using the default engine can be found in the
12//! [read-table-single-threaded] example (and for a more complex multi-threaded reader see the
13//! [read-table-multi-threaded] example). An example for reading the table changes for a table
14//! using the default engine can be found in the [read-table-changes] example. The [write-table]
15//! example demonstrates how to write data to a Delta table using the default engine.
16//!
17//! [read-table-single-threaded]:
18//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-single-threaded
19//! [read-table-multi-threaded]:
20//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-multi-threaded
21//! [read-table-changes]:
22//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-changes
23//! [write-table]:
24//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/write-table
25//!
26//! # Engine trait
27//!
28//! The [`Engine`] trait allows connectors to bring their own implementation of functionality such
29//! as reading parquet files, listing files in a file system, parsing a JSON string etc. This
30//! trait exposes methods to get sub-engines which expose the core functionalities customizable by
31//! connectors.
32//!
33//! ## Expression handling
34//!
35//! Expression handling is done via the [`EvaluationHandler`], which in turn allows the creation of
36//! [`ExpressionEvaluator`]s. These evaluators are created for a specific predicate [`Expression`]
37//! and allow evaluation of that predicate for a specific batch of data.
38//!
39//! ## File system interactions
40//!
41//! Delta Kernel needs to perform some basic operations against file systems like listing and
42//! reading files. These interactions are encapsulated in the [`StorageHandler`] trait.
43//! Implementers must take care that all assumptions on the behavior of the functions - like sorted
44//! results - are respected.
45//!
46//! ## Reading log and data files
47//!
48//! Delta Kernel requires the capability to read and write json files and read parquet files, which
49//! is exposed via the [`JsonHandler`] and [`ParquetHandler`] respectively. When reading files,
50//! connectors are asked to provide the context information they require to execute the actual
51//! operation. This is done by invoking methods on the [`StorageHandler`] trait.
52
53#![cfg_attr(all(doc, NIGHTLY_CHANNEL), feature(doc_cfg))]
54#![warn(
55 unreachable_pub,
56 trivial_numeric_casts,
57 unused_extern_crates,
58 rust_2018_idioms,
59 rust_2021_compatibility,
60 clippy::unwrap_used,
61 clippy::expect_used,
62 clippy::panic
63)]
64// we re-allow panics in tests
65#![cfg_attr(test, allow(clippy::unwrap_used, clippy::expect_used, clippy::panic))]
66
67/// This `extern crate` declaration allows the macro to reliably refer to
68/// `delta_kernel::schema::DataType` no matter which crate invokes it. Without that, `delta_kernel`
69/// cannot invoke the macro because `delta_kernel` is an unknown crate identifier (you have to use
70/// `crate` instead). We could make the macro use `crate::schema::DataType` instead, but then the
71/// macro is useless outside the `delta_kernel` crate.
72// TODO: when running `cargo package -p delta_kernel` this gives 'unused' warning - #1095
73#[allow(unused_extern_crates)]
74extern crate self as delta_kernel;
75
76use std::any::Any;
77use std::cmp::Ordering;
78use std::fs::DirEntry;
79use std::ops::Range;
80use std::sync::Arc;
81use std::time::SystemTime;
82
83use bytes::Bytes;
84use url::Url;
85
86use self::schema::{DataType, SchemaRef};
87
88mod action_reconciliation;
89pub mod actions;
90pub mod checkpoint;
91pub mod committer;
92// Public under test-utils so integration tests can inspect CRC state via
93// Snapshot::get_current_crc_if_loaded_for_testing.
94#[cfg(feature = "test-utils")]
95pub mod crc;
96#[cfg(not(feature = "test-utils"))]
97pub(crate) mod crc;
98pub mod engine_data;
99pub mod error;
100pub mod expressions;
101mod log_compaction;
102mod log_path;
103mod log_reader;
104pub mod metrics;
105pub mod partition;
106pub mod scan;
107pub mod schema;
108pub mod snapshot;
109pub mod table_changes;
110pub mod table_configuration;
111pub mod table_features;
112pub mod table_properties;
113pub mod transaction;
114pub mod transforms;
115
116pub use crc::{FileSizeHistogram, FileStats};
117pub use log_path::LogPath;
118
119// Public under test-utils so integration tests can call get_high_water_mark via snapshot.
120#[cfg(feature = "test-utils")]
121pub mod row_tracking;
122#[cfg(not(feature = "test-utils"))]
123pub(crate) mod row_tracking;
124
125pub(crate) mod clustering;
126
127mod arrow_compat;
128#[cfg(any(feature = "arrow-57", feature = "arrow-58"))]
129pub use arrow_compat::*;
130
131#[cfg(feature = "internal-api")]
132pub mod column_trie;
133#[cfg(not(feature = "internal-api"))]
134pub(crate) mod column_trie;
135pub mod kernel_predicates;
136pub(crate) mod utils;
137
138#[cfg(feature = "internal-api")]
139pub use utils::try_parse_uri;
140
141// for the below modules, we cannot introduce a macro to clean this up. rustfmt doesn't follow into
142// macros, and so will not format the files associated with these modules if we get too clever. see:
143// https://github.com/rust-lang/rustfmt/issues/3253
144
145#[cfg(feature = "internal-api")]
146pub mod path;
147#[cfg(not(feature = "internal-api"))]
148pub(crate) mod path;
149
150#[cfg(feature = "internal-api")]
151pub mod log_replay;
152#[cfg(not(feature = "internal-api"))]
153pub(crate) mod log_replay;
154
155#[cfg(feature = "internal-api")]
156pub mod log_segment;
157#[cfg(not(feature = "internal-api"))]
158pub(crate) mod log_segment;
159
160#[cfg(feature = "internal-api")]
161pub mod last_checkpoint_hint;
162#[cfg(not(feature = "internal-api"))]
163pub(crate) mod last_checkpoint_hint;
164
165pub(crate) mod log_segment_files;
166
167pub mod history_manager;
168
169#[cfg(feature = "internal-api")]
170pub mod parallel;
171#[cfg(not(feature = "internal-api"))]
172pub(crate) mod parallel;
173
174pub use action_reconciliation::{ActionReconciliationIterator, ActionReconciliationIteratorState};
175pub use delta_kernel_derive;
176use delta_kernel_derive::internal_api;
177pub use engine_data::{
178 EngineData, FilteredEngineData, FilteredRowVisitor, GetData, RowIndexIterator, RowVisitor,
179};
180pub use error::{DeltaResult, Error};
181use expressions::{literal_expression_transform, Scalar};
182pub use expressions::{Expression, ExpressionRef, Predicate, PredicateRef};
183pub use log_compaction::{should_compact, LogCompactionWriter};
184use schema::{StructField, StructType};
185pub use snapshot::{Snapshot, SnapshotRef};
186
187#[cfg(any(
188 feature = "default-engine-native-tls",
189 feature = "default-engine-rustls",
190 feature = "arrow-conversion"
191))]
192pub mod engine;
193
194/// Delta table version is 8 byte unsigned int
195pub type Version = u64;
196
197pub type FileSize = u64;
198pub type FileIndex = u64;
199
200/// A specification for a range of bytes to read from a file location
201pub type FileSlice = (Url, Option<Range<FileIndex>>);
202
203/// Data read from a Delta table file and the corresponding scan file information.
204pub type FileDataReadResult = (FileMeta, Box<dyn EngineData>);
205
206/// An iterator of data read from specified files
207pub type FileDataReadResultIterator =
208 Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>;
209
210/// The metadata that describes an object.
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub struct FileMeta {
213 /// The fully qualified path to the object
214 pub location: Url,
215 /// The last modified time as milliseconds since unix epoch
216 pub last_modified: i64,
217 /// The size in bytes of the object
218 pub size: FileSize,
219}
220
221impl Ord for FileMeta {
222 fn cmp(&self, other: &Self) -> Ordering {
223 self.location.cmp(&other.location)
224 }
225}
226
227impl PartialOrd for FileMeta {
228 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
229 Some(self.cmp(other))
230 }
231}
232
233impl TryFrom<DirEntry> for FileMeta {
234 type Error = Error;
235
236 fn try_from(ent: DirEntry) -> DeltaResult<FileMeta> {
237 let metadata = ent.metadata()?;
238 let last_modified = metadata
239 .modified()?
240 .duration_since(SystemTime::UNIX_EPOCH)
241 .map_err(|_| Error::generic("Failed to convert file timestamp to milliseconds"))?;
242 let location = Url::from_file_path(ent.path())
243 .map_err(|_| Error::generic(format!("Invalid path: {:?}", ent.path())))?;
244 let last_modified = last_modified.as_millis().try_into().map_err(|_| {
245 Error::generic(format!(
246 "Failed to convert file modification time {:?} into i64",
247 last_modified.as_millis()
248 ))
249 })?;
250 Ok(FileMeta {
251 location,
252 last_modified,
253 size: metadata.len(),
254 })
255 }
256}
257
258impl FileMeta {
259 /// Create a new instance of `FileMeta`
260 pub fn new(location: Url, last_modified: i64, size: u64) -> Self {
261 Self {
262 location,
263 last_modified,
264 size,
265 }
266 }
267
268 /// Casts `size` to `i64`. Errors if `size` exceeds `i64::MAX`.
269 pub(crate) fn size_as_i64(&self) -> DeltaResult<i64> {
270 i64::try_from(self.size)
271 .map_err(|_| Error::generic(format!("file size {} exceeds i64::MAX", self.size)))
272 }
273}
274
275/// Extension trait that makes it easier to work with traits objects that implement [`Any`],
276/// implemented automatically for any type that satisfies `Any`, `Send`, and `Sync`. In particular,
277/// given some `trait T: Any + Send + Sync`, it allows upcasting `T` to `dyn Any + Send + Sync`,
278/// which in turn allows downcasting the result to a concrete type.
279///
280/// For example, the following code will compile:
281///
282/// ```
283/// # use buoyant_kernel as delta_kernel;
284/// # use delta_kernel::AsAny;
285/// # use std::any::Any;
286/// # use std::sync::Arc;
287/// trait Foo : AsAny {}
288/// struct Bar;
289/// impl Foo for Bar {}
290///
291/// let f: Arc<dyn Foo> = Arc::new(Bar);
292/// let a: Arc<dyn Any + Send + Sync> = f.as_any();
293/// let b: Arc<Bar> = a.downcast().unwrap();
294/// ```
295///
296/// In contrast, very similar code that relies only on `Any` would fail to compile:
297///
298/// ```fail_compile
299/// # use std::any::Any;
300/// # use std::sync::Arc;
301/// trait Foo: Any + Send + Sync {}
302///
303/// struct Bar;
304/// impl Foo for Bar {}
305///
306/// let f: Arc<dyn Foo> = Arc::new(Bar);
307/// let b: Arc<Bar> = f.downcast().unwrap(); // `Arc::downcast` method not found
308/// ```
309///
310/// As would this:
311///
312/// ```fail_compile
313/// # use std::any::Any;
314/// # use std::sync::Arc;
315/// trait Foo: Any + Send + Sync {}
316///
317/// struct Bar;
318/// impl Foo for Bar {}
319///
320/// let f: Arc<dyn Foo> = Arc::new(Bar);
321/// let a: Arc<dyn Any + Send + Sync> = f; // trait upcasting coercion is not stable rust
322/// let f: Arc<Bar> = a.downcast().unwrap();
323/// ```
324///
325/// NOTE: `AsAny` inherits the `Send + Sync` constraint from [`Arc::downcast`].
326pub trait AsAny: Any + Send + Sync {
327 /// Obtains a `dyn Any` reference to the object:
328 ///
329 /// ```
330 /// # use buoyant_kernel as delta_kernel;
331 /// # use delta_kernel::AsAny;
332 /// # use std::any::Any;
333 /// # use std::sync::Arc;
334 /// trait Foo : AsAny {}
335 /// struct Bar;
336 /// impl Foo for Bar {}
337 ///
338 /// let f: &dyn Foo = &Bar;
339 /// let a: &dyn Any = f.any_ref();
340 /// let b: &Bar = a.downcast_ref().unwrap();
341 /// ```
342 fn any_ref(&self) -> &(dyn Any + Send + Sync);
343
344 /// Obtains an `Arc<dyn Any>` reference to the object:
345 ///
346 /// ```
347 /// # use buoyant_kernel as delta_kernel;
348 /// # use delta_kernel::AsAny;
349 /// # use std::any::Any;
350 /// # use std::sync::Arc;
351 /// trait Foo : AsAny {}
352 /// struct Bar;
353 /// impl Foo for Bar {}
354 ///
355 /// let f: Arc<dyn Foo> = Arc::new(Bar);
356 /// let a: Arc<dyn Any + Send + Sync> = f.as_any();
357 /// let b: Arc<Bar> = a.downcast().unwrap();
358 /// ```
359 fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
360
361 /// Converts the object to `Box<dyn Any>`:
362 ///
363 /// ```
364 /// # use buoyant_kernel as delta_kernel;
365 /// # use delta_kernel::AsAny;
366 /// # use std::any::Any;
367 /// # use std::sync::Arc;
368 /// trait Foo : AsAny {}
369 /// struct Bar;
370 /// impl Foo for Bar {}
371 ///
372 /// let f: Box<dyn Foo> = Box::new(Bar);
373 /// let a: Box<dyn Any> = f.into_any();
374 /// let b: Box<Bar> = a.downcast().unwrap();
375 /// ```
376 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
377
378 /// Convenient wrapper for [`std::any::type_name`], since [`Any`] does not provide it and
379 /// [`Any::type_id`] is useless as a debugging aid (its `Debug` is just a mess of hex digits).
380 fn type_name(&self) -> &'static str;
381}
382
383// Blanket implementation for all eligible types
384impl<T: Any + Send + Sync> AsAny for T {
385 fn any_ref(&self) -> &(dyn Any + Send + Sync) {
386 self
387 }
388 fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
389 self
390 }
391 fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
392 self
393 }
394 fn type_name(&self) -> &'static str {
395 std::any::type_name::<Self>()
396 }
397}
398
399/// Extension trait that facilitates object-safe implementations of `PartialEq`.
400pub trait DynPartialEq: AsAny {
401 fn dyn_eq(&self, other: &dyn Any) -> bool;
402}
403
404// Blanket implementation for all eligible types
405impl<T: PartialEq + AsAny> DynPartialEq for T {
406 fn dyn_eq(&self, other: &dyn Any) -> bool {
407 other.downcast_ref::<T>().is_some_and(|other| self == other)
408 }
409}
410
411/// Trait for implementing an Expression evaluator.
412///
413/// It contains one Expression which can be evaluated on multiple ColumnarBatches.
414/// Connectors can implement this trait to optimize the evaluation using the
415/// connector specific capabilities.
416pub trait ExpressionEvaluator: AsAny {
417 /// Evaluate the expression on a given EngineData.
418 ///
419 /// Produces one value for each row of the input.
420 /// The data type of the output is same as the type output of the expression this evaluator is
421 /// using.
422 fn evaluate(&self, batch: &dyn EngineData) -> DeltaResult<Box<dyn EngineData>>;
423}
424
425/// Trait for implementing a Predicate evaluator.
426///
427/// It contains one Predicate which can be evaluated on multiple ColumnarBatches.
428/// Connectors can implement this trait to optimize the evaluation using the
429/// connector specific capabilities.
430pub trait PredicateEvaluator: AsAny {
431 /// Evaluate the predicate on a given EngineData.
432 ///
433 /// Produces one boolean value for each row of the input.
434 fn evaluate(&self, batch: &dyn EngineData) -> DeltaResult<Box<dyn EngineData>>;
435}
436
437/// Provides expression evaluation capability to Delta Kernel.
438///
439/// Delta Kernel can use this handler to evaluate a predicate on partition filters,
440/// fill up partition column values, and any computation on data using Expressions.
441pub trait EvaluationHandler: AsAny {
442 /// Create an [`ExpressionEvaluator`] that can evaluate the given [`Expression`]
443 /// on columnar batches with the given [`Schema`] to produce data of [`DataType`].
444 ///
445 /// If the provided output type is a struct, its fields describe the columns of output produced
446 /// by the evaluator. Otherwise, the output schema is a single column named "output" of the
447 /// specified `output_type`. In all cases, the output schema is only used for its names (all
448 /// field names will be updated to match) and nullability (non-nullable columns can be converted
449 /// to nullable). Any mismatch in types (including number of columns) will produce an error.
450 ///
451 /// # Parameters
452 ///
453 /// - `input_schema`: Schema of the input data.
454 /// - `expression`: Expression to evaluate.
455 /// - `output_type`: Expected result data type.
456 ///
457 /// [`Schema`]: crate::schema::StructType
458 /// [`DataType`]: crate::schema::DataType
459 fn new_expression_evaluator(
460 &self,
461 input_schema: SchemaRef,
462 expression: ExpressionRef,
463 output_type: DataType,
464 ) -> DeltaResult<Arc<dyn ExpressionEvaluator>>;
465
466 /// Create a [`PredicateEvaluator`] that can evaluate the given [`Predicate`] on columnar
467 /// batches with the given [`Schema`] to produce a column of boolean results.
468 ///
469 /// The output schema is a single nullable boolean column named "output".
470 ///
471 /// # Parameters
472 ///
473 /// - `input_schema`: Schema of the input data.
474 /// - `predicate`: Predicate to evaluate.
475 ///
476 /// [`Schema`]: crate::schema::StructType
477 fn new_predicate_evaluator(
478 &self,
479 input_schema: SchemaRef,
480 predicate: PredicateRef,
481 ) -> DeltaResult<Arc<dyn PredicateEvaluator>>;
482
483 /// Create a single-row all-null-value [`EngineData`] with the schema specified by
484 /// `output_schema`.
485 // NOTE: we should probably allow DataType instead of SchemaRef, but can expand that in the
486 // future.
487 fn null_row(&self, output_schema: SchemaRef) -> DeltaResult<Box<dyn EngineData>>;
488
489 /// Create a multi-row [`EngineData`] by applying the given schema to multiple rows of values.
490 ///
491 /// Each element in `rows` represents one row of data, where each row is a slice of structured
492 /// scalar values (one scalar per top-level field in the schema).
493 ///
494 /// # Parameters
495 ///
496 /// - `schema`: Schema describing the structure of each row.
497 /// - `rows`: Slice of rows, where each row contains one structured scalar per top-level schema
498 /// field.
499 ///
500 /// # Returns
501 ///
502 /// A multi-row `EngineData` containing all rows.
503 ///
504 /// # Errors
505 ///
506 /// Returns an error if any row has a number of scalars that does not match the number of
507 /// top-level fields in `schema`, or if any scalar value cannot be appended to its corresponding
508 /// field's builder (e.g. due to a type mismatch).
509 ///
510 /// # Example
511 ///
512 /// For a schema with fields `[add: Struct, remove: Struct]`, each row should contain exactly 2
513 /// scalars: one for the `add` field and one for the `remove` field.
514 fn create_many(
515 &self,
516 schema: SchemaRef,
517 rows: &[&[Scalar]],
518 ) -> DeltaResult<Box<dyn EngineData>>;
519}
520
521/// Internal trait to allow us to have a private `create_one` API that's implemented for all
522/// EvaluationHandlers.
523// For some reason rustc doesn't detect it's usage so we allow(dead_code) here...
524#[allow(dead_code)]
525#[internal_api]
526trait EvaluationHandlerExtension: EvaluationHandler {
527 /// Create a single-row [`EngineData`] by applying the given schema to the leaf-values given in
528 /// `values`.
529 // Note: we will stick with a Schema instead of DataType (more constrained can expand in
530 // future)
531 fn create_one(&self, schema: SchemaRef, values: &[Scalar]) -> DeltaResult<Box<dyn EngineData>> {
532 // just get a single int column (arbitrary)
533 let null_row_schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable(
534 "null_col",
535 DataType::INTEGER,
536 )]));
537 let null_row = self.null_row(null_row_schema.clone())?;
538
539 // Convert schema and leaf values to an expression
540 let row_expr = literal_expression_transform(schema.as_ref(), values)?;
541
542 let eval =
543 self.new_expression_evaluator(null_row_schema, row_expr.into(), schema.into())?;
544 eval.evaluate(null_row.as_ref())
545 }
546}
547
548// Auto-implement the extension trait for all EvaluationHandlers
549impl<T: EvaluationHandler + ?Sized> EvaluationHandlerExtension for T {}
550
551/// A trait that allows converting a type into (single-row) EngineData
552///
553/// This is typically used with the `#[derive(IntoEngineData)]` macro
554/// which leverages the traits `ToDataType` and `Into<Scalar>` for struct fields
555/// to convert a struct into EngineData.
556///
557/// # Example
558/// ```ignore
559/// # use std::sync::Arc;
560/// # use delta_kernel_derive::{Schema, IntoEngineData};
561///
562/// #[derive(Schema, IntoEngineData)]
563/// struct MyStruct {
564/// a: i32,
565/// b: String,
566/// }
567///
568/// let my_struct = MyStruct { a: 42, b: "Hello".to_string() };
569/// // typically used with ToSchema
570/// let schema = Arc::new(MyStruct::to_schema());
571/// // single-row EngineData
572/// let engine = todo!(); // create an engine
573/// let engine_data = my_struct.into_engine_data(schema, engine);
574/// ```
575#[internal_api]
576pub(crate) trait IntoEngineData {
577 /// Consume this type to produce a single-row EngineData using the provided schema.
578 fn into_engine_data(
579 self,
580 schema: SchemaRef,
581 engine: &dyn Engine,
582 ) -> DeltaResult<Box<dyn EngineData>>;
583}
584
585/// Provides file system related functionalities to Delta Kernel.
586///
587/// Delta Kernel uses this handler whenever it needs to access the underlying
588/// file system where the Delta table is present. Connector implementation of
589/// this trait can hide filesystem specific details from Delta Kernel.
590pub trait StorageHandler: AsAny {
591 /// List the paths in the same directory that are lexicographically greater than
592 /// (UTF-8 sorting) the given `path`. The result should also be sorted by the file name.
593 ///
594 /// If the path is directory-like (ends with '/'), the result should contain
595 /// all the files in the directory.
596 fn list_from(&self, path: &Url)
597 -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>>;
598
599 /// Read data specified by the start and end offset from the file.
600 fn read_files(
601 &self,
602 files: Vec<FileSlice>,
603 ) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>>;
604
605 /// Copy a file atomically from source to destination. If the destination file already exists,
606 /// it must return Err(Error::FileAlreadyExists).
607 fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()>;
608
609 /// Write data to the specified path.
610 ///
611 /// If `overwrite` is false and the file already exists, this must return
612 /// `Err(Error::FileAlreadyExists)`.
613 fn put(&self, path: &Url, data: Bytes, overwrite: bool) -> DeltaResult<()>;
614
615 /// Perform a HEAD request for the given file at a Url, returning the file metadata.
616 ///
617 /// If the file does not exist, this must return an `Err` with [`Error::FileNotFound`].
618 fn head(&self, path: &Url) -> DeltaResult<FileMeta>;
619}
620
621/// Provides JSON handling functionality to Delta Kernel.
622///
623/// Delta Kernel can use this handler to parse JSON strings into Row or read content from JSON
624/// files. Connectors can leverage this trait to provide their best implementation of the JSON
625/// parsing capability to Delta Kernel.
626pub trait JsonHandler: AsAny {
627 /// Parse the given json strings and return the fields requested by output schema as columns in
628 /// [`EngineData`]. json_strings MUST be a single column batch of engine data, and the
629 /// column type must be string
630 fn parse_json(
631 &self,
632 json_strings: Box<dyn EngineData>,
633 output_schema: SchemaRef,
634 ) -> DeltaResult<Box<dyn EngineData>>;
635
636 /// Read and parse the JSON format file at given locations and return the data as EngineData
637 /// with the columns requested by physical schema. Note: The [`FileDataReadResultIterator`]
638 /// must emit data from files in the order that `files` is given. For example if files ["a",
639 /// "b"] is provided, then the engine data iterator must first return all the engine data
640 /// from file "a", _then_ all the engine data from file "b". Moreover, for a given file, all
641 /// of its [`EngineData`] and constituent rows must be in order that they occur in the file.
642 /// Consider a file with rows (1, 2, 3). The following are legal iterator batches:
643 /// iter: [EngineData(1, 2), EngineData(3)]
644 /// iter: [EngineData(1), EngineData(2, 3)]
645 /// iter: [EngineData(1, 2, 3)]
646 /// The following are illegal batches:
647 /// iter: [EngineData(3), EngineData(1, 2)]
648 /// iter: [EngineData(1), EngineData(3, 2)]
649 /// iter: [EngineData(2, 1, 3)]
650 ///
651 /// Additionally, engines may not merge engine data across file boundaries.
652 ///
653 /// # Parameters
654 ///
655 /// - `files` - File metadata for files to be read.
656 /// - `physical_schema` - Select list of columns to read from the JSON file.
657 /// - `predicate` - Optional push-down predicate hint (engine is free to ignore it).
658 fn read_json_files(
659 &self,
660 files: &[FileMeta],
661 physical_schema: SchemaRef,
662 predicate: Option<PredicateRef>,
663 ) -> DeltaResult<FileDataReadResultIterator>;
664
665 /// Atomically (!) write a single JSON file. Each row of the input data should be written as a
666 /// new JSON object appended to the file. this write must:
667 /// (1) serialize the data to newline-delimited json (each row is a json object literal)
668 /// (2) write the data to storage atomically (i.e. if the file already exists, fail unless the
669 /// overwrite flag is set)
670 ///
671 /// For example, the JSON data should be written as { "column1": "val1", "column2": "val2", .. }
672 /// with each row on a new line.
673 ///
674 /// NOTE: Null columns should not be written to the JSON file. For example, if a row has columns
675 /// ["a", "b"] and the value of "b" is null, the JSON object should be written as
676 /// { "a": "..." }. Note that including nulls is technically valid JSON, but would bloat the
677 /// log, therefore we recommend omitting them.
678 ///
679 /// # Parameters
680 ///
681 /// - `path` - URL specifying the location to write the JSON file
682 /// - `data` - Iterator of EngineData to write to the JSON file. Each row should be written as a
683 /// new JSON object appended to the file. (that is, the file is newline-delimited JSON, and
684 /// each row is a JSON object on a single line)
685 /// - `overwrite` - If true, overwrite the file if it exists. If false, the call must fail if
686 /// the file exists.
687 fn write_json_file(
688 &self,
689 path: &Url,
690 data: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
691 overwrite: bool,
692 ) -> DeltaResult<()>;
693}
694
695/// Reserved field IDs for metadata columns in Delta tables.
696///
697/// These field IDs are reserved and should not be used for regular table columns.
698/// They are used to provide file-level metadata as virtual columns during reads.
699pub mod reserved_field_ids {
700 /// Reserved field ID for the file name metadata column (`_file`).
701 /// This column provides the name of the Parquet file that contains each row.
702 pub const FILE_NAME: i64 = 2147483646;
703}
704
705/// Metadata from a Parquet file footer.
706///
707/// This struct contains metadata extracted from a Parquet file's footer, including the schema.
708/// It is designed to be extensible for future additions such as row group statistics.
709#[derive(Debug, Clone)]
710pub struct ParquetFooter {
711 /// The schema of the Parquet file, converted to Delta Kernel's schema format.
712 pub schema: SchemaRef,
713}
714
715/// Provides Parquet file related functionalities to Delta Kernel.
716///
717/// Connectors can leverage this trait to provide their own custom
718/// implementation of Parquet data file functionalities to Delta Kernel.
719pub trait ParquetHandler: AsAny {
720 /// Read and parse the Parquet file at given locations and return the data as EngineData with
721 /// the columns requested by physical schema. The ParquetHandler _must_ return exactly the
722 /// columns specified in `physical_schema`, and they _must_ be in schema order.
723 ///
724 /// # Resolving Parquet schema to the physical schema
725 ///
726 /// When reading the Parquet file, the columns are resolved from the Parquet schema to the
727 /// kernel's `physical_schema`. To do so, the parquet reader must match each Parquet column
728 /// to a [`StructField`] in the `physical_schema`. All columns in the returned `EngineData`
729 /// must be in the same order as specified in `physical_schema`.
730 ///
731 /// Parquet columns are matched to `physical_schema` [`StructField`]s using the following rules:
732 /// 1. **Field ID**: If a [`StructField`] in `physical_schema` contains a field ID (specified in
733 /// [`ColumnMetadataKey::ParquetFieldId`] metadata), use the ID to match the Parquet column's
734 /// field id
735 /// 2. **Field Name**: If no field ID is present in the `physical_schema`'s [`StructField`] or
736 /// no matching parquet field ID is found, fall back to matching by column name
737 ///
738 /// # Metadata Columns
739 ///
740 /// The ParquetHandler must support virtual metadata columns that provide additional information
741 /// about each row. These columns are not stored in the Parquet file but are generated at read
742 /// time.
743 ///
744 /// ## Row Index Column
745 ///
746 /// When a column in `physical_schema` is marked as a row index metadata column (via
747 /// [`StructField::create_metadata_column`] with [`schema::MetadataColumnSpec::RowIndex`]), the
748 /// ParquetHandler must populate it with the 0-based row position within the Parquet file:
749 ///
750 /// - **Column name**: User-specified (commonly `"row_index"` or `"_metadata.row_index"`)
751 /// - **Type**: `LONG` (non-nullable)
752 /// - **Values**: Sequential integers starting at 0 for each file
753 /// - **Use case**: Track row positions for downstream processing, or internally used to compute
754 /// Row IDs
755 ///
756 /// Example: A file with 5 rows would have row_index values `[0, 1, 2, 3, 4]`.
757 ///
758 /// ## File Name Column (Reserved Field ID)
759 ///
760 /// When a column in `physical_schema` has the reserved field ID
761 /// [`reserved_field_ids::FILE_NAME`] (2147483646), the ParquetHandler must populate it
762 /// with the file path/name:
763 ///
764 /// - **Column name**: `"_file"`
765 /// - **Type**: `STRING` (non-nullable)
766 /// - **Field ID**: 2147483646 (reserved)
767 /// - **Values**: The file path/URL (e.g., `"s3://bucket/path/file.parquet"`)
768 /// - **Use case**: Track which file each row came from in multi-file reads
769 ///
770 /// Example: All rows from the same file would have the same `_file` value.
771 ///
772 /// ## Metadata Column Examples
773 ///
774 /// ```rust,ignore
775 /// use delta_kernel::schema::{StructType, StructField, DataType, MetadataColumnSpec};
776 ///
777 /// // Example 1: Schema with row_index metadata column
778 /// let schema_with_row_index = StructType::try_new([
779 /// StructField::nullable("id", DataType::INTEGER),
780 /// StructField::create_metadata_column("row_index", MetadataColumnSpec::RowIndex),
781 /// StructField::nullable("value", DataType::STRING),
782 /// ])?;
783 ///
784 /// // Example 2: Schema with _file metadata column (using reserved field ID)
785 /// let schema_with_file_path = StructType::try_new([
786 /// StructField::nullable("id", DataType::INTEGER),
787 /// StructField::create_metadata_column("_file", MetadataColumnSpec::FilePath),
788 /// StructField::nullable("value", DataType::STRING),
789 /// ])?;
790 /// ```
791 ///
792 /// ---
793 ///
794 /// If no matching Parquet column is found, `NULL` values are returned
795 /// for nullable columns in `physical_schema`. For non-nullable columns, an error is returned.
796 ///
797 ///
798 /// ## Column Matching Examples
799 ///
800 /// Consider a `physical_schema` with the following fields:
801 /// - Column 0: `"i_logical"` (integer, non-null) with field ID 1 (via
802 /// [`ColumnMetadataKey::ParquetFieldId`])
803 /// - Column 1: `"s"` (string, nullable) with no field ID metadata
804 /// - Column 2: `"i2"` (integer, nullable) with no field ID metadata
805 ///
806 /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey::ParquetFieldId
807 ///
808 /// And a Parquet file containing these columns:
809 /// - Column 0: `"i2"` (integer, nullable) with field ID 3
810 /// - Column 1: `"i"` (integer, non-null) with field ID 1
811 /// - No `"s"` column present
812 ///
813 /// The column matching would work as follows:
814 /// - `"i_logical"` matches `"i"` by field ID (both have ID 1)
815 /// - `"i2"` matches `"i2"` by column name (no field ID to match on)
816 /// - `"s"` has no matching Parquet column, so NULL values are returned
817 ///
818 /// The returned data will contain exactly 3 columns in physical schema order:
819 /// `{i_logical: parquet[1], s: NULL.., i2: parquet[0]}`
820 ///
821 /// # Parameters
822 ///
823 /// - `files` - File metadata for files to be read.
824 /// - `physical_schema` - Select list and order of columns to read from the Parquet file.
825 /// - `predicate` - Optional push-down predicate hint (engine is free to ignore it).
826 ///
827 /// # Returns
828 /// A [`DeltaResult`] containing a [`FileDataReadResultIterator`].
829 /// Each element of the iterator is a [`DeltaResult`] of [`EngineData`]. The [`EngineData`]
830 /// has the contents of `files` and must match the provided `physical_schema`.
831 ///
832 /// Note: The [`FileDataReadResultIterator`] must emit data from files in the order that `files`
833 /// is given. For example if files ["a", "b"] is provided, then the engine data iterator must
834 /// first return all the engine data from file "a", _then_ all the engine data from file "b".
835 /// Moreover, for a given file, all of its [`EngineData`] and constituent rows must be in order
836 /// that they occur in the file. Consider a file with rows
837 /// (1, 2, 3). The following are legal iterator batches:
838 /// iter: [EngineData(1, 2), EngineData(3)]
839 /// iter: [EngineData(1), EngineData(2, 3)]
840 /// iter: [EngineData(1, 2, 3)]
841 /// The following are illegal batches:
842 /// iter: [EngineData(3), EngineData(1, 2)]
843 /// iter: [EngineData(1), EngineData(3, 2)]
844 /// iter: [EngineData(2, 1, 3)]
845 ///
846 /// Additionally, engines must not merge engine data across file boundaries.
847 ///
848 /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey
849 fn read_parquet_files(
850 &self,
851 files: &[FileMeta],
852 physical_schema: SchemaRef,
853 predicate: Option<PredicateRef>,
854 ) -> DeltaResult<FileDataReadResultIterator>;
855
856 /// Write data to a Parquet file at the specified URL.
857 ///
858 /// This method writes the provided `data` to a Parquet file at the given `url`.
859 ///
860 /// This will overwrite the file if it already exists. For filesystem-backed
861 /// implementations, the parent directories must be created if they do not exist.
862 ///
863 /// # Parquet field IDs
864 ///
865 /// The engine must write a Parquet `field_id` correctly when the kernel
866 /// [`StructField`] carries a field-id related annotation, including:
867 /// - [`ColumnMetadataKey::ColumnMappingId`] / [`ColumnMetadataKey::ParquetFieldId`]
868 /// - [`ColumnMetadataKey::ColumnMappingNestedIds`]
869 ///
870 /// For how to use these keys, refer to the Delta protocol's [Column Mapping] and
871 /// [IcebergCompatV2] sections.
872 ///
873 /// **Non-compliance produces files with incorrect `field_id`s**, which may lead to
874 /// read failures when column mapping mode is `id` and to failures when converting
875 /// the table to Iceberg.
876 ///
877 /// # Parameters
878 ///
879 /// - `url` - The full URL path where the Parquet file should be written (e.g.,
880 /// `s3://bucket/path/file.parquet`).
881 /// - `data` - An iterator of engine data to be written to the Parquet file.
882 ///
883 /// # Returns
884 ///
885 /// A [`DeltaResult`] indicating success or failure.
886 ///
887 /// [`StructField`]: crate::schema::StructField
888 /// [`ColumnMetadataKey::ColumnMappingId`]: crate::schema::ColumnMetadataKey::ColumnMappingId
889 /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey::ParquetFieldId
890 /// [`ColumnMetadataKey::ColumnMappingNestedIds`]: crate::schema::ColumnMetadataKey::ColumnMappingNestedIds
891 /// [Column Mapping]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#column-mapping
892 /// [IcebergCompatV2]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#iceberg-compatibility-v2
893 fn write_parquet_file(
894 &self,
895 location: url::Url,
896 data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>,
897 ) -> DeltaResult<()>;
898
899 /// Read the footer metadata from a Parquet file without reading the data.
900 ///
901 /// This method reads only the Parquet file footer (metadata section), which is useful for
902 /// schema inspection, compatibility checking, and determining whether parsed statistics
903 /// columns are present and compatible with the current table schema.
904 ///
905 /// # Parameters
906 ///
907 /// - `file` - File metadata for the Parquet file whose footer should be read. The `size` field
908 /// should contain the actual file size to enable efficient footer reads without additional
909 /// I/O operations.
910 ///
911 /// # Returns
912 ///
913 /// A [`DeltaResult`] containing a [`ParquetFooter`] with the Parquet file's metadata, including
914 /// the schema converted to Delta Kernel's format.
915 ///
916 /// # Field IDs
917 ///
918 /// If the Parquet file contains field IDs (written when column mapping is enabled), they are
919 /// preserved in each [`StructField`]'s metadata. Callers can access field IDs via
920 /// [`StructField::get_config_value`] with [`ColumnMetadataKey::ParquetFieldId`].
921 ///
922 /// # Errors
923 ///
924 /// Returns an error if:
925 /// - The file cannot be accessed or does not exist
926 /// - The file is not a valid Parquet file
927 /// - The footer cannot be read or parsed
928 /// - The schema cannot be converted to Delta Kernel's format
929 ///
930 /// [`StructField`]: crate::schema::StructField
931 /// [`StructField::get_config_value`]: crate::schema::StructField::get_config_value
932 /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey::ParquetFieldId
933 fn read_parquet_footer(&self, file: &FileMeta) -> DeltaResult<ParquetFooter>;
934}
935
936/// The `Engine` trait encapsulates all the functionality an engine or connector needs to provide
937/// to the Delta Kernel in order to read the Delta table.
938///
939/// Engines/Connectors are expected to pass an implementation of this trait when reading a Delta
940/// table.
941pub trait Engine: AsAny {
942 /// Get the connector provided [`EvaluationHandler`].
943 fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler>;
944
945 /// Get the connector provided [`StorageHandler`]
946 fn storage_handler(&self) -> Arc<dyn StorageHandler>;
947
948 /// Get the connector provided [`JsonHandler`].
949 fn json_handler(&self) -> Arc<dyn JsonHandler>;
950
951 /// Get the connector provided [`ParquetHandler`].
952 fn parquet_handler(&self) -> Arc<dyn ParquetHandler>;
953}
954
955// we have an 'internal' feature flag: default-engine-base, which is actually just the shared
956// pieces of default-engine-native-tls and default-engine-rustls. the crate can't compile with
957// _only_ default-engine-base, so we give a friendly error here.
958#[cfg(all(
959 feature = "default-engine-base",
960 not(any(
961 feature = "default-engine-native-tls",
962 feature = "default-engine-rustls",
963 ))
964))]
965compile_error!(
966 "The default-engine-base feature flag is not meant to be used directly. \
967 Please use either default-engine-native-tls or default-engine-rustls."
968);
969
970// Rustdoc's documentation tests can do some things that regular unit tests can't. Here we are
971// using doctests to test macros. Specifically, we are testing for failed macro invocations due
972// to invalid input, not the macro output when the macro invocation is successful (which can/should
973// be done in unit tests). This module is not exclusively for macro tests only so other doctests can
974// also be added. https://doc.rust-lang.org/rustdoc/write-documentation/documentation-tests.html#include-items-only-when-collecting-doctests
975#[cfg(doctest)]
976mod doctests;
977
978#[cfg(test)]
979mod tests {
980 use rstest::rstest;
981
982 use super::*;
983
984 #[rstest]
985 #[case::zero(0, Some(0))]
986 #[case::one(1, Some(1))]
987 #[case::i64_max(i64::MAX as u64, Some(i64::MAX))]
988 #[case::just_over_i64_max(i64::MAX as u64 + 1, None)]
989 #[case::u64_max(u64::MAX, None)]
990 /// Tests `FileMeta::size_as_i64` for both success (size fits in i64) and error (size exceeds
991 /// i64::MAX) paths.
992 fn test_file_meta_size_as_i64(#[case] size: u64, #[case] expected: Option<i64>) {
993 let meta = FileMeta::new(Url::parse("file:///x").unwrap(), 0, size);
994 match expected {
995 Some(v) => assert_eq!(meta.size_as_i64().unwrap(), v),
996 None => assert!(meta
997 .size_as_i64()
998 .unwrap_err()
999 .to_string()
1000 .contains("exceeds i64::MAX")),
1001 }
1002 }
1003}