Skip to main content

buoyant_kernel/
lib.rs

1//! # Delta Kernel
2//!
3//! Delta-kernel-rs is an experimental [Delta](https://github.com/delta-io/delta/) implementation
4//! focused on interoperability with a wide range of query engines. It supports reads and
5//! (experimental) writes (only blind appends in the write path currently). This library defines a
6//! number of traits which must be implemented to provide a working delta implementation. They are
7//! detailed below. There is a provided "default engine" that implements all these traits and can
8//! be used to ease integration work. See [`DefaultEngine`](engine/default/index.html) for more
9//! information.
10//!
11//! A full `rust` example for reading table data using the default engine can be found in the
12//! [read-table-single-threaded] example (and for a more complex multi-threaded reader see the
13//! [read-table-multi-threaded] example). An example for reading the table changes for a table
14//! using the default engine can be found in the [read-table-changes] example. The [write-table]
15//! example demonstrates how to write data to a Delta table using the default engine.
16//!
17//! [read-table-single-threaded]:
18//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-single-threaded
19//! [read-table-multi-threaded]:
20//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-multi-threaded
21//! [read-table-changes]:
22//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-changes
23//! [write-table]:
24//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/write-table
25//!
26//! # Engine trait
27//!
28//! The [`Engine`] trait allows connectors to bring their own implementation of functionality such
29//! as reading parquet files, listing files in a file system, parsing a JSON string etc. This
30//! trait exposes methods to get sub-engines which expose the core functionalities customizable by
31//! connectors.
32//!
33//! ## Expression handling
34//!
35//! Expression handling is done via the [`EvaluationHandler`], which in turn allows the creation of
36//! [`ExpressionEvaluator`]s. These evaluators are created for a specific predicate [`Expression`]
37//! and allow evaluation of that predicate for a specific batch of data.
38//!
39//! ## File system interactions
40//!
41//! Delta Kernel needs to perform some basic operations against file systems like listing and
42//! reading files. These interactions are encapsulated in the [`StorageHandler`] trait.
43//! Implementers must take care that all assumptions on the behavior of the functions - like sorted
44//! results - are respected.
45//!
46//! ## Reading log and data files
47//!
48//! Delta Kernel requires the capability to read and write json files and read parquet files, which
49//! is exposed via the [`JsonHandler`] and [`ParquetHandler`] respectively. When reading files,
50//! connectors are asked to provide the context information they require to execute the actual
51//! operation. This is done by invoking methods on the [`StorageHandler`] trait.
52
53#![cfg_attr(all(doc, NIGHTLY_CHANNEL), feature(doc_cfg))]
54#![warn(
55    unreachable_pub,
56    trivial_numeric_casts,
57    unused_extern_crates,
58    rust_2018_idioms,
59    rust_2021_compatibility,
60    clippy::unwrap_used,
61    clippy::expect_used,
62    clippy::panic
63)]
64// we re-allow panics in tests
65#![cfg_attr(test, allow(clippy::unwrap_used, clippy::expect_used, clippy::panic))]
66
67/// This `extern crate` declaration allows the macro to reliably refer to
68/// `delta_kernel::schema::DataType` no matter which crate invokes it. Without that, `delta_kernel`
69/// cannot invoke the macro because `delta_kernel` is an unknown crate identifier (you have to use
70/// `crate` instead). We could make the macro use `crate::schema::DataType` instead, but then the
71/// macro is useless outside the `delta_kernel` crate.
72// TODO: when running `cargo package -p delta_kernel` this gives 'unused' warning - #1095
73#[allow(unused_extern_crates)]
74extern crate self as delta_kernel;
75
76use std::any::Any;
77use std::cmp::Ordering;
78use std::fs::DirEntry;
79use std::ops::Range;
80use std::sync::Arc;
81use std::time::SystemTime;
82
83use bytes::Bytes;
84use url::Url;
85
86use self::schema::{DataType, SchemaRef};
87
88mod action_reconciliation;
89pub mod actions;
90pub mod checkpoint;
91pub mod committer;
92// Public under test-utils so integration tests can inspect CRC state via
93// Snapshot::get_current_crc_if_loaded_for_testing.
94#[cfg(feature = "test-utils")]
95pub mod crc;
96#[cfg(not(feature = "test-utils"))]
97pub(crate) mod crc;
98pub mod engine_data;
99pub mod error;
100pub mod expressions;
101mod log_compaction;
102mod log_path;
103mod log_reader;
104pub mod metrics;
105pub mod partition;
106pub mod scan;
107pub mod schema;
108pub mod snapshot;
109pub mod table_changes;
110pub mod table_configuration;
111pub mod table_features;
112pub mod table_properties;
113pub mod transaction;
114pub mod transforms;
115
116pub use crc::{FileSizeHistogram, FileStats};
117pub use log_path::LogPath;
118
119// Public under test-utils so integration tests can call get_high_water_mark via snapshot.
120#[cfg(feature = "test-utils")]
121pub mod row_tracking;
122#[cfg(not(feature = "test-utils"))]
123pub(crate) mod row_tracking;
124
125pub(crate) mod clustering;
126
127mod arrow_compat;
128#[cfg(any(feature = "arrow-57", feature = "arrow-58"))]
129pub use arrow_compat::*;
130
131#[cfg(feature = "internal-api")]
132pub mod column_trie;
133#[cfg(not(feature = "internal-api"))]
134pub(crate) mod column_trie;
135pub mod kernel_predicates;
136pub(crate) mod utils;
137
138#[cfg(feature = "internal-api")]
139pub use utils::try_parse_uri;
140
141// for the below modules, we cannot introduce a macro to clean this up. rustfmt doesn't follow into
142// macros, and so will not format the files associated with these modules if we get too clever. see:
143// https://github.com/rust-lang/rustfmt/issues/3253
144
145#[cfg(feature = "internal-api")]
146pub mod path;
147#[cfg(not(feature = "internal-api"))]
148pub(crate) mod path;
149
150#[cfg(feature = "internal-api")]
151pub mod log_replay;
152#[cfg(not(feature = "internal-api"))]
153pub(crate) mod log_replay;
154
155#[cfg(feature = "internal-api")]
156pub mod log_segment;
157#[cfg(not(feature = "internal-api"))]
158pub(crate) mod log_segment;
159
160#[cfg(feature = "internal-api")]
161pub mod last_checkpoint_hint;
162#[cfg(not(feature = "internal-api"))]
163pub(crate) mod last_checkpoint_hint;
164
165pub(crate) mod log_segment_files;
166
167pub mod history_manager;
168
169#[cfg(feature = "internal-api")]
170pub mod parallel;
171#[cfg(not(feature = "internal-api"))]
172pub(crate) mod parallel;
173
174pub use action_reconciliation::{ActionReconciliationIterator, ActionReconciliationIteratorState};
175pub use delta_kernel_derive;
176use delta_kernel_derive::internal_api;
177pub use engine_data::{
178    EngineData, FilteredEngineData, FilteredRowVisitor, GetData, RowIndexIterator, RowVisitor,
179};
180pub use error::{DeltaResult, Error};
181use expressions::{literal_expression_transform, Scalar};
182pub use expressions::{Expression, ExpressionRef, Predicate, PredicateRef};
183pub use log_compaction::{should_compact, LogCompactionWriter};
184use schema::{StructField, StructType};
185pub use snapshot::{Snapshot, SnapshotRef};
186
187#[cfg(any(
188    feature = "default-engine-native-tls",
189    feature = "default-engine-rustls",
190    feature = "arrow-conversion"
191))]
192pub mod engine;
193
194/// Delta table version is 8 byte unsigned int
195pub type Version = u64;
196
197pub type FileSize = u64;
198pub type FileIndex = u64;
199
200/// A specification for a range of bytes to read from a file location
201pub type FileSlice = (Url, Option<Range<FileIndex>>);
202
203/// Data read from a Delta table file and the corresponding scan file information.
204pub type FileDataReadResult = (FileMeta, Box<dyn EngineData>);
205
206/// An iterator of data read from specified files
207pub type FileDataReadResultIterator =
208    Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>;
209
210/// The metadata that describes an object.
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub struct FileMeta {
213    /// The fully qualified path to the object
214    pub location: Url,
215    /// The last modified time as milliseconds since unix epoch
216    pub last_modified: i64,
217    /// The size in bytes of the object
218    pub size: FileSize,
219}
220
221impl Ord for FileMeta {
222    fn cmp(&self, other: &Self) -> Ordering {
223        self.location.cmp(&other.location)
224    }
225}
226
227impl PartialOrd for FileMeta {
228    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
229        Some(self.cmp(other))
230    }
231}
232
233impl TryFrom<DirEntry> for FileMeta {
234    type Error = Error;
235
236    fn try_from(ent: DirEntry) -> DeltaResult<FileMeta> {
237        let metadata = ent.metadata()?;
238        let last_modified = metadata
239            .modified()?
240            .duration_since(SystemTime::UNIX_EPOCH)
241            .map_err(|_| Error::generic("Failed to convert file timestamp to milliseconds"))?;
242        let location = Url::from_file_path(ent.path())
243            .map_err(|_| Error::generic(format!("Invalid path: {:?}", ent.path())))?;
244        let last_modified = last_modified.as_millis().try_into().map_err(|_| {
245            Error::generic(format!(
246                "Failed to convert file modification time {:?} into i64",
247                last_modified.as_millis()
248            ))
249        })?;
250        Ok(FileMeta {
251            location,
252            last_modified,
253            size: metadata.len(),
254        })
255    }
256}
257
258impl FileMeta {
259    /// Create a new instance of `FileMeta`
260    pub fn new(location: Url, last_modified: i64, size: u64) -> Self {
261        Self {
262            location,
263            last_modified,
264            size,
265        }
266    }
267
268    /// Casts `size` to `i64`. Errors if `size` exceeds `i64::MAX`.
269    pub(crate) fn size_as_i64(&self) -> DeltaResult<i64> {
270        i64::try_from(self.size)
271            .map_err(|_| Error::generic(format!("file size {} exceeds i64::MAX", self.size)))
272    }
273}
274
275/// Extension trait that makes it easier to work with traits objects that implement [`Any`],
276/// implemented automatically for any type that satisfies `Any`, `Send`, and `Sync`. In particular,
277/// given some `trait T: Any + Send + Sync`, it allows upcasting `T` to `dyn Any + Send + Sync`,
278/// which in turn allows downcasting the result to a concrete type.
279///
280/// For example, the following code will compile:
281///
282/// ```
283/// # use buoyant_kernel as delta_kernel;
284/// # use delta_kernel::AsAny;
285/// # use std::any::Any;
286/// # use std::sync::Arc;
287/// trait Foo : AsAny {}
288/// struct Bar;
289/// impl Foo for Bar {}
290///
291/// let f: Arc<dyn Foo> = Arc::new(Bar);
292/// let a: Arc<dyn Any + Send + Sync> = f.as_any();
293/// let b: Arc<Bar> = a.downcast().unwrap();
294/// ```
295///
296/// In contrast, very similar code that relies only on `Any` would fail to compile:
297///
298/// ```fail_compile
299/// # use std::any::Any;
300/// # use std::sync::Arc;
301/// trait Foo: Any + Send + Sync {}
302///
303/// struct Bar;
304/// impl Foo for Bar {}
305///
306/// let f: Arc<dyn Foo> = Arc::new(Bar);
307/// let b: Arc<Bar> = f.downcast().unwrap(); // `Arc::downcast` method not found
308/// ```
309///
310/// As would this:
311///
312/// ```fail_compile
313/// # use std::any::Any;
314/// # use std::sync::Arc;
315/// trait Foo: Any + Send + Sync {}
316///
317/// struct Bar;
318/// impl Foo for Bar {}
319///
320/// let f: Arc<dyn Foo> = Arc::new(Bar);
321/// let a: Arc<dyn Any + Send + Sync> = f; // trait upcasting coercion is not stable rust
322/// let f: Arc<Bar> = a.downcast().unwrap();
323/// ```
324///
325/// NOTE: `AsAny` inherits the `Send + Sync` constraint from [`Arc::downcast`].
326pub trait AsAny: Any + Send + Sync {
327    /// Obtains a `dyn Any` reference to the object:
328    ///
329    /// ```
330    /// # use buoyant_kernel as delta_kernel;
331    /// # use delta_kernel::AsAny;
332    /// # use std::any::Any;
333    /// # use std::sync::Arc;
334    /// trait Foo : AsAny {}
335    /// struct Bar;
336    /// impl Foo for Bar {}
337    ///
338    /// let f: &dyn Foo = &Bar;
339    /// let a: &dyn Any = f.any_ref();
340    /// let b: &Bar = a.downcast_ref().unwrap();
341    /// ```
342    fn any_ref(&self) -> &(dyn Any + Send + Sync);
343
344    /// Obtains an `Arc<dyn Any>` reference to the object:
345    ///
346    /// ```
347    /// # use buoyant_kernel as delta_kernel;
348    /// # use delta_kernel::AsAny;
349    /// # use std::any::Any;
350    /// # use std::sync::Arc;
351    /// trait Foo : AsAny {}
352    /// struct Bar;
353    /// impl Foo for Bar {}
354    ///
355    /// let f: Arc<dyn Foo> = Arc::new(Bar);
356    /// let a: Arc<dyn Any + Send + Sync> = f.as_any();
357    /// let b: Arc<Bar> = a.downcast().unwrap();
358    /// ```
359    fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
360
361    /// Converts the object to `Box<dyn Any>`:
362    ///
363    /// ```
364    /// # use buoyant_kernel as delta_kernel;
365    /// # use delta_kernel::AsAny;
366    /// # use std::any::Any;
367    /// # use std::sync::Arc;
368    /// trait Foo : AsAny {}
369    /// struct Bar;
370    /// impl Foo for Bar {}
371    ///
372    /// let f: Box<dyn Foo> = Box::new(Bar);
373    /// let a: Box<dyn Any> = f.into_any();
374    /// let b: Box<Bar> = a.downcast().unwrap();
375    /// ```
376    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
377
378    /// Convenient wrapper for [`std::any::type_name`], since [`Any`] does not provide it and
379    /// [`Any::type_id`] is useless as a debugging aid (its `Debug` is just a mess of hex digits).
380    fn type_name(&self) -> &'static str;
381}
382
383// Blanket implementation for all eligible types
384impl<T: Any + Send + Sync> AsAny for T {
385    fn any_ref(&self) -> &(dyn Any + Send + Sync) {
386        self
387    }
388    fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
389        self
390    }
391    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
392        self
393    }
394    fn type_name(&self) -> &'static str {
395        std::any::type_name::<Self>()
396    }
397}
398
399/// Extension trait that facilitates object-safe implementations of `PartialEq`.
400pub trait DynPartialEq: AsAny {
401    fn dyn_eq(&self, other: &dyn Any) -> bool;
402}
403
404// Blanket implementation for all eligible types
405impl<T: PartialEq + AsAny> DynPartialEq for T {
406    fn dyn_eq(&self, other: &dyn Any) -> bool {
407        other.downcast_ref::<T>().is_some_and(|other| self == other)
408    }
409}
410
411/// Trait for implementing an Expression evaluator.
412///
413/// It contains one Expression which can be evaluated on multiple ColumnarBatches.
414/// Connectors can implement this trait to optimize the evaluation using the
415/// connector specific capabilities.
416pub trait ExpressionEvaluator: AsAny {
417    /// Evaluate the expression on a given EngineData.
418    ///
419    /// Produces one value for each row of the input.
420    /// The data type of the output is same as the type output of the expression this evaluator is
421    /// using.
422    fn evaluate(&self, batch: &dyn EngineData) -> DeltaResult<Box<dyn EngineData>>;
423}
424
425/// Trait for implementing a Predicate evaluator.
426///
427/// It contains one Predicate which can be evaluated on multiple ColumnarBatches.
428/// Connectors can implement this trait to optimize the evaluation using the
429/// connector specific capabilities.
430pub trait PredicateEvaluator: AsAny {
431    /// Evaluate the predicate on a given EngineData.
432    ///
433    /// Produces one boolean value for each row of the input.
434    fn evaluate(&self, batch: &dyn EngineData) -> DeltaResult<Box<dyn EngineData>>;
435}
436
437/// Provides expression evaluation capability to Delta Kernel.
438///
439/// Delta Kernel can use this handler to evaluate a predicate on partition filters,
440/// fill up partition column values, and any computation on data using Expressions.
441pub trait EvaluationHandler: AsAny {
442    /// Create an [`ExpressionEvaluator`] that can evaluate the given [`Expression`]
443    /// on columnar batches with the given [`Schema`] to produce data of [`DataType`].
444    ///
445    /// If the provided output type is a struct, its fields describe the columns of output produced
446    /// by the evaluator. Otherwise, the output schema is a single column named "output" of the
447    /// specified `output_type`. In all cases, the output schema is only used for its names (all
448    /// field names will be updated to match) and nullability (non-nullable columns can be converted
449    /// to nullable). Any mismatch in types (including number of columns) will produce an error.
450    ///
451    /// # Parameters
452    ///
453    /// - `input_schema`: Schema of the input data.
454    /// - `expression`: Expression to evaluate.
455    /// - `output_type`: Expected result data type.
456    ///
457    /// [`Schema`]: crate::schema::StructType
458    /// [`DataType`]: crate::schema::DataType
459    fn new_expression_evaluator(
460        &self,
461        input_schema: SchemaRef,
462        expression: ExpressionRef,
463        output_type: DataType,
464    ) -> DeltaResult<Arc<dyn ExpressionEvaluator>>;
465
466    /// Create a [`PredicateEvaluator`] that can evaluate the given [`Predicate`] on columnar
467    /// batches with the given [`Schema`] to produce a column of boolean results.
468    ///
469    /// The output schema is a single nullable boolean column named "output".
470    ///
471    /// # Parameters
472    ///
473    /// - `input_schema`: Schema of the input data.
474    /// - `predicate`: Predicate to evaluate.
475    ///
476    /// [`Schema`]: crate::schema::StructType
477    fn new_predicate_evaluator(
478        &self,
479        input_schema: SchemaRef,
480        predicate: PredicateRef,
481    ) -> DeltaResult<Arc<dyn PredicateEvaluator>>;
482
483    /// Create a single-row all-null-value [`EngineData`] with the schema specified by
484    /// `output_schema`.
485    // NOTE: we should probably allow DataType instead of SchemaRef, but can expand that in the
486    // future.
487    fn null_row(&self, output_schema: SchemaRef) -> DeltaResult<Box<dyn EngineData>>;
488
489    /// Create a multi-row [`EngineData`] by applying the given schema to multiple rows of values.
490    ///
491    /// Each element in `rows` represents one row of data, where each row is a slice of structured
492    /// scalar values (one scalar per top-level field in the schema).
493    ///
494    /// # Parameters
495    ///
496    /// - `schema`: Schema describing the structure of each row.
497    /// - `rows`: Slice of rows, where each row contains one structured scalar per top-level schema
498    ///   field.
499    ///
500    /// # Returns
501    ///
502    /// A multi-row `EngineData` containing all rows.
503    ///
504    /// # Errors
505    ///
506    /// Returns an error if any row has a number of scalars that does not match the number of
507    /// top-level fields in `schema`, or if any scalar value cannot be appended to its corresponding
508    /// field's builder (e.g. due to a type mismatch).
509    ///
510    /// # Example
511    ///
512    /// For a schema with fields `[add: Struct, remove: Struct]`, each row should contain exactly 2
513    /// scalars: one for the `add` field and one for the `remove` field.
514    fn create_many(
515        &self,
516        schema: SchemaRef,
517        rows: &[&[Scalar]],
518    ) -> DeltaResult<Box<dyn EngineData>>;
519}
520
521/// Internal trait to allow us to have a private `create_one` API that's implemented for all
522/// EvaluationHandlers.
523// For some reason rustc doesn't detect it's usage so we allow(dead_code) here...
524#[allow(dead_code)]
525#[internal_api]
526trait EvaluationHandlerExtension: EvaluationHandler {
527    /// Create a single-row [`EngineData`] by applying the given schema to the leaf-values given in
528    /// `values`.
529    // Note: we will stick with a Schema instead of DataType (more constrained can expand in
530    // future)
531    fn create_one(&self, schema: SchemaRef, values: &[Scalar]) -> DeltaResult<Box<dyn EngineData>> {
532        // just get a single int column (arbitrary)
533        let null_row_schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable(
534            "null_col",
535            DataType::INTEGER,
536        )]));
537        let null_row = self.null_row(null_row_schema.clone())?;
538
539        // Convert schema and leaf values to an expression
540        let row_expr = literal_expression_transform(schema.as_ref(), values)?;
541
542        let eval =
543            self.new_expression_evaluator(null_row_schema, row_expr.into(), schema.into())?;
544        eval.evaluate(null_row.as_ref())
545    }
546}
547
548// Auto-implement the extension trait for all EvaluationHandlers
549impl<T: EvaluationHandler + ?Sized> EvaluationHandlerExtension for T {}
550
551/// A trait that allows converting a type into (single-row) EngineData
552///
553/// This is typically used with the `#[derive(IntoEngineData)]` macro
554/// which leverages the traits `ToDataType` and `Into<Scalar>` for struct fields
555/// to convert a struct into EngineData.
556///
557/// # Example
558/// ```ignore
559/// # use std::sync::Arc;
560/// # use delta_kernel_derive::{Schema, IntoEngineData};
561///
562/// #[derive(Schema, IntoEngineData)]
563/// struct MyStruct {
564///    a: i32,
565///    b: String,
566/// }
567///
568/// let my_struct = MyStruct { a: 42, b: "Hello".to_string() };
569/// // typically used with ToSchema
570/// let schema = Arc::new(MyStruct::to_schema());
571/// // single-row EngineData
572/// let engine = todo!(); // create an engine
573/// let engine_data = my_struct.into_engine_data(schema, engine);
574/// ```
575#[internal_api]
576pub(crate) trait IntoEngineData {
577    /// Consume this type to produce a single-row EngineData using the provided schema.
578    fn into_engine_data(
579        self,
580        schema: SchemaRef,
581        engine: &dyn Engine,
582    ) -> DeltaResult<Box<dyn EngineData>>;
583}
584
585/// Provides file system related functionalities to Delta Kernel.
586///
587/// Delta Kernel uses this handler whenever it needs to access the underlying
588/// file system where the Delta table is present. Connector implementation of
589/// this trait can hide filesystem specific details from Delta Kernel.
590pub trait StorageHandler: AsAny {
591    /// List the paths in the same directory that are lexicographically greater than
592    /// (UTF-8 sorting) the given `path`. The result should also be sorted by the file name.
593    ///
594    /// If the path is directory-like (ends with '/'), the result should contain
595    /// all the files in the directory.
596    fn list_from(&self, path: &Url)
597        -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>>;
598
599    /// Read data specified by the start and end offset from the file.
600    fn read_files(
601        &self,
602        files: Vec<FileSlice>,
603    ) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>>;
604
605    /// Copy a file atomically from source to destination. If the destination file already exists,
606    /// it must return Err(Error::FileAlreadyExists).
607    fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()>;
608
609    /// Write data to the specified path.
610    ///
611    /// If `overwrite` is false and the file already exists, this must return
612    /// `Err(Error::FileAlreadyExists)`.
613    fn put(&self, path: &Url, data: Bytes, overwrite: bool) -> DeltaResult<()>;
614
615    /// Perform a HEAD request for the given file at a Url, returning the file metadata.
616    ///
617    /// If the file does not exist, this must return an `Err` with [`Error::FileNotFound`].
618    fn head(&self, path: &Url) -> DeltaResult<FileMeta>;
619}
620
621/// Provides JSON handling functionality to Delta Kernel.
622///
623/// Delta Kernel can use this handler to parse JSON strings into Row or read content from JSON
624/// files. Connectors can leverage this trait to provide their best implementation of the JSON
625/// parsing capability to Delta Kernel.
626pub trait JsonHandler: AsAny {
627    /// Parse the given json strings and return the fields requested by output schema as columns in
628    /// [`EngineData`]. json_strings MUST be a single column batch of engine data, and the
629    /// column type must be string
630    fn parse_json(
631        &self,
632        json_strings: Box<dyn EngineData>,
633        output_schema: SchemaRef,
634    ) -> DeltaResult<Box<dyn EngineData>>;
635
636    /// Read and parse the JSON format file at given locations and return the data as EngineData
637    /// with the columns requested by physical schema. Note: The [`FileDataReadResultIterator`]
638    /// must emit data from files in the order that `files` is given. For example if files ["a",
639    /// "b"] is provided, then the engine data iterator must first return all the engine data
640    /// from file "a", _then_ all the engine data from file "b". Moreover, for a given file, all
641    /// of its [`EngineData`] and constituent rows must be in order that they occur in the file.
642    /// Consider a file with rows (1, 2, 3). The following are legal iterator batches:
643    ///    iter: [EngineData(1, 2), EngineData(3)]
644    ///    iter: [EngineData(1), EngineData(2, 3)]
645    ///    iter: [EngineData(1, 2, 3)]
646    /// The following are illegal batches:
647    ///    iter: [EngineData(3), EngineData(1, 2)]
648    ///    iter: [EngineData(1), EngineData(3, 2)]
649    ///    iter: [EngineData(2, 1, 3)]
650    ///
651    /// Additionally, engines may not merge engine data across file boundaries.
652    ///
653    /// # Parameters
654    ///
655    /// - `files` - File metadata for files to be read.
656    /// - `physical_schema` - Select list of columns to read from the JSON file.
657    /// - `predicate` - Optional push-down predicate hint (engine is free to ignore it).
658    fn read_json_files(
659        &self,
660        files: &[FileMeta],
661        physical_schema: SchemaRef,
662        predicate: Option<PredicateRef>,
663    ) -> DeltaResult<FileDataReadResultIterator>;
664
665    /// Atomically (!) write a single JSON file. Each row of the input data should be written as a
666    /// new JSON object appended to the file. this write must:
667    /// (1) serialize the data to newline-delimited json (each row is a json object literal)
668    /// (2) write the data to storage atomically (i.e. if the file already exists, fail unless the
669    ///     overwrite flag is set)
670    ///
671    /// For example, the JSON data should be written as { "column1": "val1", "column2": "val2", .. }
672    /// with each row on a new line.
673    ///
674    /// NOTE: Null columns should not be written to the JSON file. For example, if a row has columns
675    /// ["a", "b"] and the value of "b" is null, the JSON object should be written as
676    /// { "a": "..." }. Note that including nulls is technically valid JSON, but would bloat the
677    /// log, therefore we recommend omitting them.
678    ///
679    /// # Parameters
680    ///
681    /// - `path` - URL specifying the location to write the JSON file
682    /// - `data` - Iterator of EngineData to write to the JSON file. Each row should be written as a
683    ///   new JSON object appended to the file. (that is, the file is newline-delimited JSON, and
684    ///   each row is a JSON object on a single line)
685    /// - `overwrite` - If true, overwrite the file if it exists. If false, the call must fail if
686    ///   the file exists.
687    fn write_json_file(
688        &self,
689        path: &Url,
690        data: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
691        overwrite: bool,
692    ) -> DeltaResult<()>;
693}
694
695/// Reserved field IDs for metadata columns in Delta tables.
696///
697/// These field IDs are reserved and should not be used for regular table columns.
698/// They are used to provide file-level metadata as virtual columns during reads.
699pub mod reserved_field_ids {
700    /// Reserved field ID for the file name metadata column (`_file`).
701    /// This column provides the name of the Parquet file that contains each row.
702    pub const FILE_NAME: i64 = 2147483646;
703}
704
705/// Metadata from a Parquet file footer.
706///
707/// This struct contains metadata extracted from a Parquet file's footer, including the schema.
708/// It is designed to be extensible for future additions such as row group statistics.
709#[derive(Debug, Clone)]
710pub struct ParquetFooter {
711    /// The schema of the Parquet file, converted to Delta Kernel's schema format.
712    pub schema: SchemaRef,
713}
714
715/// Provides Parquet file related functionalities to Delta Kernel.
716///
717/// Connectors can leverage this trait to provide their own custom
718/// implementation of Parquet data file functionalities to Delta Kernel.
719pub trait ParquetHandler: AsAny {
720    /// Read and parse the Parquet file at given locations and return the data as EngineData with
721    /// the columns requested by physical schema. The ParquetHandler _must_ return exactly the
722    /// columns specified in `physical_schema`, and they _must_ be in schema order.
723    ///
724    /// # Resolving Parquet schema to the physical schema
725    ///
726    /// When reading the Parquet file, the columns are resolved from the Parquet schema to the
727    /// kernel's `physical_schema`. To do so, the parquet reader must match each Parquet column
728    /// to a [`StructField`] in the `physical_schema`. All columns in the returned `EngineData`
729    /// must be in the same order as specified in `physical_schema`.
730    ///
731    /// Parquet columns are matched to `physical_schema` [`StructField`]s using the following rules:
732    /// 1. **Field ID**: If a [`StructField`] in `physical_schema` contains a field ID (specified in
733    ///    [`ColumnMetadataKey::ParquetFieldId`] metadata), use the ID to match the Parquet column's
734    ///    field id
735    /// 2. **Field Name**: If no field ID is present in the `physical_schema`'s [`StructField`] or
736    ///    no matching parquet field ID is found, fall back to matching by column name
737    ///
738    /// # Metadata Columns
739    ///
740    /// The ParquetHandler must support virtual metadata columns that provide additional information
741    /// about each row. These columns are not stored in the Parquet file but are generated at read
742    /// time.
743    ///
744    /// ## Row Index Column
745    ///
746    /// When a column in `physical_schema` is marked as a row index metadata column (via
747    /// [`StructField::create_metadata_column`] with [`schema::MetadataColumnSpec::RowIndex`]), the
748    /// ParquetHandler must populate it with the 0-based row position within the Parquet file:
749    ///
750    /// - **Column name**: User-specified (commonly `"row_index"` or `"_metadata.row_index"`)
751    /// - **Type**: `LONG` (non-nullable)
752    /// - **Values**: Sequential integers starting at 0 for each file
753    /// - **Use case**: Track row positions for downstream processing, or internally used to compute
754    ///   Row IDs
755    ///
756    /// Example: A file with 5 rows would have row_index values `[0, 1, 2, 3, 4]`.
757    ///
758    /// ## File Name Column (Reserved Field ID)
759    ///
760    /// When a column in `physical_schema` has the reserved field ID
761    /// [`reserved_field_ids::FILE_NAME`] (2147483646), the ParquetHandler must populate it
762    /// with the file path/name:
763    ///
764    /// - **Column name**: `"_file"`
765    /// - **Type**: `STRING` (non-nullable)
766    /// - **Field ID**: 2147483646 (reserved)
767    /// - **Values**: The file path/URL (e.g., `"s3://bucket/path/file.parquet"`)
768    /// - **Use case**: Track which file each row came from in multi-file reads
769    ///
770    /// Example: All rows from the same file would have the same `_file` value.
771    ///
772    /// ## Metadata Column Examples
773    ///
774    /// ```rust,ignore
775    /// use delta_kernel::schema::{StructType, StructField, DataType, MetadataColumnSpec};
776    ///
777    /// // Example 1: Schema with row_index metadata column
778    /// let schema_with_row_index = StructType::try_new([
779    ///     StructField::nullable("id", DataType::INTEGER),
780    ///     StructField::create_metadata_column("row_index", MetadataColumnSpec::RowIndex),
781    ///     StructField::nullable("value", DataType::STRING),
782    /// ])?;
783    ///
784    /// // Example 2: Schema with _file metadata column (using reserved field ID)
785    /// let schema_with_file_path = StructType::try_new([
786    ///     StructField::nullable("id", DataType::INTEGER),
787    ///     StructField::create_metadata_column("_file", MetadataColumnSpec::FilePath),
788    ///     StructField::nullable("value", DataType::STRING),
789    /// ])?;
790    /// ```
791    ///
792    /// ---
793    ///
794    ///  If no matching Parquet column is found, `NULL` values are returned
795    ///  for nullable columns in `physical_schema`. For non-nullable columns, an error is returned.
796    ///
797    ///
798    /// ## Column Matching Examples
799    ///
800    /// Consider a `physical_schema` with the following fields:
801    /// - Column 0:  `"i_logical"` (integer, non-null) with field ID 1 (via
802    ///   [`ColumnMetadataKey::ParquetFieldId`])
803    /// - Column 1: `"s"` (string, nullable) with no field ID metadata
804    /// - Column 2: `"i2"` (integer, nullable) with no field ID metadata
805    ///
806    /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey::ParquetFieldId
807    ///
808    /// And a Parquet file containing these columns:
809    /// - Column 0: `"i2"` (integer, nullable) with field ID 3
810    /// - Column 1: `"i"` (integer, non-null) with field ID 1
811    /// - No `"s"` column present
812    ///
813    /// The column matching would work as follows:
814    /// - `"i_logical"` matches `"i"` by field ID (both have ID 1)
815    /// - `"i2"` matches `"i2"` by column name (no field ID to match on)
816    /// - `"s"` has no matching Parquet column, so NULL values are returned
817    ///
818    /// The returned data will contain exactly 3 columns in physical schema order:
819    /// `{i_logical: parquet[1], s: NULL.., i2: parquet[0]}`
820    ///
821    /// # Parameters
822    ///
823    /// - `files` - File metadata for files to be read.
824    /// - `physical_schema` - Select list and order of columns to read from the Parquet file.
825    /// - `predicate` - Optional push-down predicate hint (engine is free to ignore it).
826    ///
827    /// # Returns
828    /// A [`DeltaResult`] containing a [`FileDataReadResultIterator`].
829    /// Each element of the iterator is a [`DeltaResult`] of [`EngineData`]. The [`EngineData`]
830    /// has the contents of `files` and must match the provided `physical_schema`.
831    ///
832    /// Note: The [`FileDataReadResultIterator`] must emit data from files in the order that `files`
833    /// is given. For example if files ["a", "b"] is provided, then the engine data iterator must
834    /// first return all the engine data from file "a", _then_ all the engine data from file "b".
835    /// Moreover, for a given file, all of its [`EngineData`] and constituent rows must be in order
836    /// that they occur in the file. Consider a file with rows
837    /// (1, 2, 3). The following are legal iterator batches:
838    ///    iter: [EngineData(1, 2), EngineData(3)]
839    ///    iter: [EngineData(1), EngineData(2, 3)]
840    ///    iter: [EngineData(1, 2, 3)]
841    /// The following are illegal batches:
842    ///    iter: [EngineData(3), EngineData(1, 2)]
843    ///    iter: [EngineData(1), EngineData(3, 2)]
844    ///    iter: [EngineData(2, 1, 3)]
845    ///
846    /// Additionally, engines must not merge engine data across file boundaries.
847    ///
848    /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey
849    fn read_parquet_files(
850        &self,
851        files: &[FileMeta],
852        physical_schema: SchemaRef,
853        predicate: Option<PredicateRef>,
854    ) -> DeltaResult<FileDataReadResultIterator>;
855
856    /// Write data to a Parquet file at the specified URL.
857    ///
858    /// This method writes the provided `data` to a Parquet file at the given `url`.
859    ///
860    /// This will overwrite the file if it already exists. For filesystem-backed
861    /// implementations, the parent directories must be created if they do not exist.
862    ///
863    /// # Parquet field IDs
864    ///
865    /// The engine must write a Parquet `field_id` correctly when the kernel
866    /// [`StructField`] carries a field-id related annotation, including:
867    /// - [`ColumnMetadataKey::ColumnMappingId`] / [`ColumnMetadataKey::ParquetFieldId`]
868    /// - [`ColumnMetadataKey::ColumnMappingNestedIds`]
869    ///
870    /// For how to use these keys, refer to the Delta protocol's [Column Mapping] and
871    /// [IcebergCompatV2] sections.
872    ///
873    /// **Non-compliance produces files with incorrect `field_id`s**, which may lead to
874    /// read failures when column mapping mode is `id` and to failures when converting
875    /// the table to Iceberg.   
876    ///
877    /// # Parameters
878    ///
879    /// - `url` - The full URL path where the Parquet file should be written (e.g.,
880    ///   `s3://bucket/path/file.parquet`).
881    /// - `data` - An iterator of engine data to be written to the Parquet file.
882    ///
883    /// # Returns
884    ///
885    /// A [`DeltaResult`] indicating success or failure.
886    ///
887    /// [`StructField`]: crate::schema::StructField
888    /// [`ColumnMetadataKey::ColumnMappingId`]: crate::schema::ColumnMetadataKey::ColumnMappingId
889    /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey::ParquetFieldId
890    /// [`ColumnMetadataKey::ColumnMappingNestedIds`]: crate::schema::ColumnMetadataKey::ColumnMappingNestedIds
891    /// [Column Mapping]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#column-mapping
892    /// [IcebergCompatV2]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#iceberg-compatibility-v2
893    fn write_parquet_file(
894        &self,
895        location: url::Url,
896        data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>,
897    ) -> DeltaResult<()>;
898
899    /// Read the footer metadata from a Parquet file without reading the data.
900    ///
901    /// This method reads only the Parquet file footer (metadata section), which is useful for
902    /// schema inspection, compatibility checking, and determining whether parsed statistics
903    /// columns are present and compatible with the current table schema.
904    ///
905    /// # Parameters
906    ///
907    /// - `file` - File metadata for the Parquet file whose footer should be read. The `size` field
908    ///   should contain the actual file size to enable efficient footer reads without additional
909    ///   I/O operations.
910    ///
911    /// # Returns
912    ///
913    /// A [`DeltaResult`] containing a [`ParquetFooter`] with the Parquet file's metadata, including
914    /// the schema converted to Delta Kernel's format.
915    ///
916    /// # Field IDs
917    ///
918    /// If the Parquet file contains field IDs (written when column mapping is enabled), they are
919    /// preserved in each [`StructField`]'s metadata. Callers can access field IDs via
920    /// [`StructField::get_config_value`] with [`ColumnMetadataKey::ParquetFieldId`].
921    ///
922    /// # Errors
923    ///
924    /// Returns an error if:
925    /// - The file cannot be accessed or does not exist
926    /// - The file is not a valid Parquet file
927    /// - The footer cannot be read or parsed
928    /// - The schema cannot be converted to Delta Kernel's format
929    ///
930    /// [`StructField`]: crate::schema::StructField
931    /// [`StructField::get_config_value`]: crate::schema::StructField::get_config_value
932    /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey::ParquetFieldId
933    fn read_parquet_footer(&self, file: &FileMeta) -> DeltaResult<ParquetFooter>;
934}
935
936/// The `Engine` trait encapsulates all the functionality an engine or connector needs to provide
937/// to the Delta Kernel in order to read the Delta table.
938///
939/// Engines/Connectors are expected to pass an implementation of this trait when reading a Delta
940/// table.
941pub trait Engine: AsAny {
942    /// Get the connector provided [`EvaluationHandler`].
943    fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler>;
944
945    /// Get the connector provided [`StorageHandler`]
946    fn storage_handler(&self) -> Arc<dyn StorageHandler>;
947
948    /// Get the connector provided [`JsonHandler`].
949    fn json_handler(&self) -> Arc<dyn JsonHandler>;
950
951    /// Get the connector provided [`ParquetHandler`].
952    fn parquet_handler(&self) -> Arc<dyn ParquetHandler>;
953}
954
955// we have an 'internal' feature flag: default-engine-base, which is actually just the shared
956// pieces of default-engine-native-tls and default-engine-rustls. the crate can't compile with
957// _only_ default-engine-base, so we give a friendly error here.
958#[cfg(all(
959    feature = "default-engine-base",
960    not(any(
961        feature = "default-engine-native-tls",
962        feature = "default-engine-rustls",
963    ))
964))]
965compile_error!(
966    "The default-engine-base feature flag is not meant to be used directly. \
967    Please use either default-engine-native-tls or default-engine-rustls."
968);
969
970// Rustdoc's documentation tests can do some things that regular unit tests can't. Here we are
971// using doctests to test macros. Specifically, we are testing for failed macro invocations due
972// to invalid input, not the macro output when the macro invocation is successful (which can/should
973// be done in unit tests). This module is not exclusively for macro tests only so other doctests can
974// also be added. https://doc.rust-lang.org/rustdoc/write-documentation/documentation-tests.html#include-items-only-when-collecting-doctests
975#[cfg(doctest)]
976mod doctests;
977
978#[cfg(test)]
979mod tests {
980    use rstest::rstest;
981
982    use super::*;
983
984    #[rstest]
985    #[case::zero(0, Some(0))]
986    #[case::one(1, Some(1))]
987    #[case::i64_max(i64::MAX as u64, Some(i64::MAX))]
988    #[case::just_over_i64_max(i64::MAX as u64 + 1, None)]
989    #[case::u64_max(u64::MAX, None)]
990    /// Tests `FileMeta::size_as_i64` for both success (size fits in i64) and error (size exceeds
991    /// i64::MAX) paths.
992    fn test_file_meta_size_as_i64(#[case] size: u64, #[case] expected: Option<i64>) {
993        let meta = FileMeta::new(Url::parse("file:///x").unwrap(), 0, size);
994        match expected {
995            Some(v) => assert_eq!(meta.size_as_i64().unwrap(), v),
996            None => assert!(meta
997                .size_as_i64()
998                .unwrap_err()
999                .to_string()
1000                .contains("exceeds i64::MAX")),
1001        }
1002    }
1003}