Skip to main content

buoyant_kernel/
lib.rs

1//! # Delta Kernel
2//!
3//! Delta-kernel-rs is an experimental [Delta](https://github.com/delta-io/delta/) implementation
4//! focused on interoperability with a wide range of query engines. It supports reads and
5//! (experimental) writes (only blind appends in the write path currently). This library defines a
6//! number of traits which must be implemented to provide a working delta implementation. They are
7//! detailed below. There is a provided "default engine" that implements all these traits and can
8//! be used to ease integration work. See [`DefaultEngine`](engine/default/index.html) for more
9//! information.
10//!
11//! A full `rust` example for reading table data using the default engine can be found in the
12//! [read-table-single-threaded] example (and for a more complex multi-threaded reader see the
13//! [read-table-multi-threaded] example). An example for reading the table changes for a table
14//! using the default engine can be found in the [read-table-changes] example. The [write-table]
15//! example demonstrates how to write data to a Delta table using the default engine.
16//!
17//! [read-table-single-threaded]:
18//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-single-threaded
19//! [read-table-multi-threaded]:
20//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-multi-threaded
21//! [read-table-changes]:
22//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/read-table-changes
23//! [write-table]:
24//! https://github.com/delta-io/delta-kernel-rs/tree/main/kernel/examples/write-table
25//!
26//! # Engine trait
27//!
28//! The [`Engine`] trait allows connectors to bring their own implementation of functionality such
29//! as reading parquet files, listing files in a file system, parsing a JSON string etc. This
30//! trait exposes methods to get sub-engines which expose the core functionalities customizable by
31//! connectors.
32//!
33//! ## Expression handling
34//!
35//! Expression handling is done via the [`EvaluationHandler`], which in turn allows the creation of
36//! [`ExpressionEvaluator`]s. These evaluators are created for a specific predicate [`Expression`]
37//! and allow evaluation of that predicate for a specific batch of data.
38//!
39//! ## File system interactions
40//!
41//! Delta Kernel needs to perform some basic operations against file systems like listing and
42//! reading files. These interactions are encapsulated in the [`StorageHandler`] trait.
43//! Implementers must take care that all assumptions on the behavior of the functions - like sorted
44//! results - are respected.
45//!
46//! ## Reading log and data files
47//!
48//! Delta Kernel requires the capability to read and write json files and read parquet files, which
49//! is exposed via the [`JsonHandler`] and [`ParquetHandler`] respectively. When reading files,
50//! connectors are asked to provide the context information they require to execute the actual
51//! operation. This is done by invoking methods on the [`StorageHandler`] trait.
52
53#![cfg_attr(all(doc, NIGHTLY_CHANNEL), feature(doc_cfg))]
54#![warn(
55    unreachable_pub,
56    trivial_numeric_casts,
57    unused_extern_crates,
58    rust_2018_idioms,
59    rust_2021_compatibility,
60    clippy::unwrap_used,
61    clippy::expect_used,
62    clippy::panic
63)]
64// we re-allow panics in tests
65#![cfg_attr(test, allow(clippy::unwrap_used, clippy::expect_used, clippy::panic))]
66
67/// This `extern crate` declaration allows the macro to reliably refer to
68/// `delta_kernel::schema::DataType` no matter which crate invokes it. Without that, `delta_kernel`
69/// cannot invoke the macro because `delta_kernel` is an unknown crate identifier (you have to use
70/// `crate` instead). We could make the macro use `crate::schema::DataType` instead, but then the
71/// macro is useless outside the `delta_kernel` crate.
72// TODO: when running `cargo package -p delta_kernel` this gives 'unused' warning - #1095
73#[allow(unused_extern_crates)]
74extern crate self as delta_kernel;
75
76use std::any::Any;
77use std::cmp::Ordering;
78use std::fs::DirEntry;
79use std::ops::Range;
80use std::sync::Arc;
81use std::time::SystemTime;
82
83use bytes::Bytes;
84use url::Url;
85
86use self::schema::{DataType, SchemaRef};
87
88mod action_reconciliation;
89pub mod actions;
90pub mod checkpoint;
91pub mod committer;
92// Public under test-utils so integration tests can inspect CRC state via
93// Snapshot::get_current_crc_if_loaded_for_testing.
94#[cfg(feature = "test-utils")]
95pub mod crc;
96#[cfg(not(feature = "test-utils"))]
97pub(crate) mod crc;
98pub mod engine_data;
99pub mod error;
100pub mod expressions;
101pub mod incremental_scan;
102mod log_compaction;
103mod log_path;
104mod log_reader;
105pub mod metrics;
106pub mod partition;
107pub mod scan;
108pub mod schema;
109pub mod snapshot;
110pub mod table_changes;
111pub mod table_configuration;
112pub mod table_features;
113pub mod table_properties;
114pub mod transaction;
115pub mod transforms;
116
117pub use crc::{FileSizeHistogram, FileStats};
118pub use log_path::LogPath;
119
120// Public under test-utils so integration tests can call get_high_water_mark via snapshot.
121#[cfg(feature = "test-utils")]
122pub mod row_tracking;
123#[cfg(not(feature = "test-utils"))]
124pub(crate) mod row_tracking;
125
126pub(crate) mod clustering;
127
128mod arrow_compat;
129#[cfg(any(feature = "arrow-57", feature = "arrow-58"))]
130pub use arrow_compat::*;
131
132#[cfg(feature = "internal-api")]
133pub mod column_trie;
134#[cfg(not(feature = "internal-api"))]
135pub(crate) mod column_trie;
136pub mod kernel_predicates;
137pub(crate) mod utils;
138
139#[cfg(feature = "internal-api")]
140pub use utils::try_parse_uri;
141
142// for the below modules, we cannot introduce a macro to clean this up. rustfmt doesn't follow into
143// macros, and so will not format the files associated with these modules if we get too clever. see:
144// https://github.com/rust-lang/rustfmt/issues/3253
145
146#[cfg(feature = "internal-api")]
147pub mod path;
148#[cfg(not(feature = "internal-api"))]
149pub(crate) mod path;
150
151#[cfg(feature = "internal-api")]
152pub mod log_replay;
153#[cfg(not(feature = "internal-api"))]
154pub(crate) mod log_replay;
155
156#[cfg(feature = "internal-api")]
157pub mod log_segment;
158#[cfg(not(feature = "internal-api"))]
159pub(crate) mod log_segment;
160
161#[cfg(feature = "internal-api")]
162pub mod last_checkpoint_hint;
163#[cfg(not(feature = "internal-api"))]
164pub(crate) mod last_checkpoint_hint;
165
166pub(crate) mod log_segment_files;
167
168pub mod history_manager;
169
170#[cfg(feature = "internal-api")]
171pub mod parallel;
172#[cfg(not(feature = "internal-api"))]
173pub(crate) mod parallel;
174
175pub use action_reconciliation::{ActionReconciliationIterator, ActionReconciliationIteratorState};
176pub use delta_kernel_derive;
177use delta_kernel_derive::internal_api;
178pub use engine_data::{
179    EngineData, FilteredEngineData, FilteredRowVisitor, GetData, RowIndexIterator, RowVisitor,
180};
181pub use error::{DeltaResult, Error};
182use expressions::{literal_expression_transform, Scalar};
183pub use expressions::{Expression, ExpressionRef, Predicate, PredicateRef};
184pub use log_compaction::{should_compact, LogCompactionWriter};
185use schema::{StructField, StructType};
186pub use snapshot::{Snapshot, SnapshotRef};
187
188#[cfg(any(
189    feature = "default-engine-native-tls",
190    feature = "default-engine-rustls",
191    feature = "arrow-conversion"
192))]
193pub mod engine;
194
195/// Delta table version is 8 byte unsigned int
196pub type Version = u64;
197
198pub type FileSize = u64;
199pub type FileIndex = u64;
200
201/// A specification for a range of bytes to read from a file location
202pub type FileSlice = (Url, Option<Range<FileIndex>>);
203
204/// Data read from a Delta table file and the corresponding scan file information.
205pub type FileDataReadResult = (FileMeta, Box<dyn EngineData>);
206
207/// An iterator of data read from specified files
208pub type FileDataReadResultIterator =
209    Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>;
210
211/// The metadata that describes an object.
212#[derive(Debug, Clone, PartialEq, Eq)]
213pub struct FileMeta {
214    /// The fully qualified path to the object
215    pub location: Url,
216    /// The last modified time as milliseconds since unix epoch
217    pub last_modified: i64,
218    /// The size in bytes of the object
219    pub size: FileSize,
220}
221
222impl Ord for FileMeta {
223    fn cmp(&self, other: &Self) -> Ordering {
224        self.location.cmp(&other.location)
225    }
226}
227
228impl PartialOrd for FileMeta {
229    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
230        Some(self.cmp(other))
231    }
232}
233
234impl TryFrom<DirEntry> for FileMeta {
235    type Error = Error;
236
237    fn try_from(ent: DirEntry) -> DeltaResult<FileMeta> {
238        let metadata = ent.metadata()?;
239        let last_modified = metadata
240            .modified()?
241            .duration_since(SystemTime::UNIX_EPOCH)
242            .map_err(|_| Error::generic("Failed to convert file timestamp to milliseconds"))?;
243        let location = Url::from_file_path(ent.path())
244            .map_err(|_| Error::generic(format!("Invalid path: {:?}", ent.path())))?;
245        let last_modified = last_modified.as_millis().try_into().map_err(|_| {
246            Error::generic(format!(
247                "Failed to convert file modification time {:?} into i64",
248                last_modified.as_millis()
249            ))
250        })?;
251        Ok(FileMeta {
252            location,
253            last_modified,
254            size: metadata.len(),
255        })
256    }
257}
258
259impl FileMeta {
260    /// Create a new instance of `FileMeta`
261    pub fn new(location: Url, last_modified: i64, size: u64) -> Self {
262        Self {
263            location,
264            last_modified,
265            size,
266        }
267    }
268
269    /// Casts `size` to `i64`. Errors if `size` exceeds `i64::MAX`.
270    pub(crate) fn size_as_i64(&self) -> DeltaResult<i64> {
271        i64::try_from(self.size)
272            .map_err(|_| Error::generic(format!("file size {} exceeds i64::MAX", self.size)))
273    }
274}
275
276/// Extension trait that makes it easier to work with traits objects that implement [`Any`],
277/// implemented automatically for any type that satisfies `Any`, `Send`, and `Sync`. In particular,
278/// given some `trait T: Any + Send + Sync`, it allows upcasting `T` to `dyn Any + Send + Sync`,
279/// which in turn allows downcasting the result to a concrete type.
280///
281/// For example, the following code will compile:
282///
283/// ```
284/// # use buoyant_kernel as delta_kernel;
285/// # use delta_kernel::AsAny;
286/// # use std::any::Any;
287/// # use std::sync::Arc;
288/// trait Foo : AsAny {}
289/// struct Bar;
290/// impl Foo for Bar {}
291///
292/// let f: Arc<dyn Foo> = Arc::new(Bar);
293/// let a: Arc<dyn Any + Send + Sync> = f.as_any();
294/// let b: Arc<Bar> = a.downcast().unwrap();
295/// ```
296///
297/// In contrast, very similar code that relies only on `Any` would fail to compile:
298///
299/// ```fail_compile
300/// # use std::any::Any;
301/// # use std::sync::Arc;
302/// trait Foo: Any + Send + Sync {}
303///
304/// struct Bar;
305/// impl Foo for Bar {}
306///
307/// let f: Arc<dyn Foo> = Arc::new(Bar);
308/// let b: Arc<Bar> = f.downcast().unwrap(); // `Arc::downcast` method not found
309/// ```
310///
311/// As would this:
312///
313/// ```fail_compile
314/// # use std::any::Any;
315/// # use std::sync::Arc;
316/// trait Foo: Any + Send + Sync {}
317///
318/// struct Bar;
319/// impl Foo for Bar {}
320///
321/// let f: Arc<dyn Foo> = Arc::new(Bar);
322/// let a: Arc<dyn Any + Send + Sync> = f; // trait upcasting coercion is not stable rust
323/// let f: Arc<Bar> = a.downcast().unwrap();
324/// ```
325///
326/// NOTE: `AsAny` inherits the `Send + Sync` constraint from [`Arc::downcast`].
327pub trait AsAny: Any + Send + Sync {
328    /// Obtains a `dyn Any` reference to the object:
329    ///
330    /// ```
331    /// # use buoyant_kernel as delta_kernel;
332    /// # use delta_kernel::AsAny;
333    /// # use std::any::Any;
334    /// # use std::sync::Arc;
335    /// trait Foo : AsAny {}
336    /// struct Bar;
337    /// impl Foo for Bar {}
338    ///
339    /// let f: &dyn Foo = &Bar;
340    /// let a: &dyn Any = f.any_ref();
341    /// let b: &Bar = a.downcast_ref().unwrap();
342    /// ```
343    fn any_ref(&self) -> &(dyn Any + Send + Sync);
344
345    /// Obtains an `Arc<dyn Any>` reference to the object:
346    ///
347    /// ```
348    /// # use buoyant_kernel as delta_kernel;
349    /// # use delta_kernel::AsAny;
350    /// # use std::any::Any;
351    /// # use std::sync::Arc;
352    /// trait Foo : AsAny {}
353    /// struct Bar;
354    /// impl Foo for Bar {}
355    ///
356    /// let f: Arc<dyn Foo> = Arc::new(Bar);
357    /// let a: Arc<dyn Any + Send + Sync> = f.as_any();
358    /// let b: Arc<Bar> = a.downcast().unwrap();
359    /// ```
360    fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
361
362    /// Converts the object to `Box<dyn Any>`:
363    ///
364    /// ```
365    /// # use buoyant_kernel as delta_kernel;
366    /// # use delta_kernel::AsAny;
367    /// # use std::any::Any;
368    /// # use std::sync::Arc;
369    /// trait Foo : AsAny {}
370    /// struct Bar;
371    /// impl Foo for Bar {}
372    ///
373    /// let f: Box<dyn Foo> = Box::new(Bar);
374    /// let a: Box<dyn Any> = f.into_any();
375    /// let b: Box<Bar> = a.downcast().unwrap();
376    /// ```
377    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
378
379    /// Convenient wrapper for [`std::any::type_name`], since [`Any`] does not provide it and
380    /// [`Any::type_id`] is useless as a debugging aid (its `Debug` is just a mess of hex digits).
381    fn type_name(&self) -> &'static str;
382}
383
384// Blanket implementation for all eligible types
385impl<T: Any + Send + Sync> AsAny for T {
386    fn any_ref(&self) -> &(dyn Any + Send + Sync) {
387        self
388    }
389    fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
390        self
391    }
392    fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
393        self
394    }
395    fn type_name(&self) -> &'static str {
396        std::any::type_name::<Self>()
397    }
398}
399
400/// Extension trait that facilitates object-safe implementations of `PartialEq`.
401pub trait DynPartialEq: AsAny {
402    fn dyn_eq(&self, other: &dyn Any) -> bool;
403}
404
405// Blanket implementation for all eligible types
406impl<T: PartialEq + AsAny> DynPartialEq for T {
407    fn dyn_eq(&self, other: &dyn Any) -> bool {
408        other.downcast_ref::<T>().is_some_and(|other| self == other)
409    }
410}
411
412/// Trait for implementing an Expression evaluator.
413///
414/// It contains one Expression which can be evaluated on multiple ColumnarBatches.
415/// Connectors can implement this trait to optimize the evaluation using the
416/// connector specific capabilities.
417pub trait ExpressionEvaluator: AsAny {
418    /// Evaluate the expression on a given EngineData.
419    ///
420    /// Produces one value for each row of the input.
421    /// The data type of the output is same as the type output of the expression this evaluator is
422    /// using.
423    fn evaluate(&self, batch: &dyn EngineData) -> DeltaResult<Box<dyn EngineData>>;
424}
425
426/// Trait for implementing a Predicate evaluator.
427///
428/// It contains one Predicate which can be evaluated on multiple ColumnarBatches.
429/// Connectors can implement this trait to optimize the evaluation using the
430/// connector specific capabilities.
431pub trait PredicateEvaluator: AsAny {
432    /// Evaluate the predicate on a given EngineData.
433    ///
434    /// Produces one boolean value for each row of the input.
435    fn evaluate(&self, batch: &dyn EngineData) -> DeltaResult<Box<dyn EngineData>>;
436}
437
438/// Provides expression evaluation capability to Delta Kernel.
439///
440/// Delta Kernel can use this handler to evaluate a predicate on partition filters,
441/// fill up partition column values, and any computation on data using Expressions.
442pub trait EvaluationHandler: AsAny {
443    /// Create an [`ExpressionEvaluator`] that can evaluate the given [`Expression`]
444    /// on columnar batches with the given [`Schema`] to produce data of [`DataType`].
445    ///
446    /// If the provided output type is a struct, its fields describe the columns of output produced
447    /// by the evaluator. Otherwise, the output schema is a single column named "output" of the
448    /// specified `output_type`. In all cases, the output schema is only used for its names (all
449    /// field names will be updated to match) and nullability (non-nullable columns can be converted
450    /// to nullable). Any mismatch in types (including number of columns) will produce an error.
451    ///
452    /// # Parameters
453    ///
454    /// - `input_schema`: Schema of the input data.
455    /// - `expression`: Expression to evaluate.
456    /// - `output_type`: Expected result data type.
457    ///
458    /// [`Schema`]: crate::schema::StructType
459    /// [`DataType`]: crate::schema::DataType
460    fn new_expression_evaluator(
461        &self,
462        input_schema: SchemaRef,
463        expression: ExpressionRef,
464        output_type: DataType,
465    ) -> DeltaResult<Arc<dyn ExpressionEvaluator>>;
466
467    /// Create a [`PredicateEvaluator`] that can evaluate the given [`Predicate`] on columnar
468    /// batches with the given [`Schema`] to produce a column of boolean results.
469    ///
470    /// The output schema is a single nullable boolean column named "output".
471    ///
472    /// # Parameters
473    ///
474    /// - `input_schema`: Schema of the input data.
475    /// - `predicate`: Predicate to evaluate.
476    ///
477    /// [`Schema`]: crate::schema::StructType
478    fn new_predicate_evaluator(
479        &self,
480        input_schema: SchemaRef,
481        predicate: PredicateRef,
482    ) -> DeltaResult<Arc<dyn PredicateEvaluator>>;
483
484    /// Create a single-row all-null-value [`EngineData`] with the schema specified by
485    /// `output_schema`.
486    // NOTE: we should probably allow DataType instead of SchemaRef, but can expand that in the
487    // future.
488    fn null_row(&self, output_schema: SchemaRef) -> DeltaResult<Box<dyn EngineData>>;
489
490    /// Create a multi-row [`EngineData`] by applying the given schema to multiple rows of values.
491    ///
492    /// Each element in `rows` represents one row of data, where each row is a slice of structured
493    /// scalar values (one scalar per top-level field in the schema).
494    ///
495    /// # Parameters
496    ///
497    /// - `schema`: Schema describing the structure of each row.
498    /// - `rows`: Slice of rows, where each row contains one structured scalar per top-level schema
499    ///   field.
500    ///
501    /// # Returns
502    ///
503    /// A multi-row `EngineData` containing all rows.
504    ///
505    /// # Errors
506    ///
507    /// Returns an error if any row has a number of scalars that does not match the number of
508    /// top-level fields in `schema`, or if any scalar value cannot be appended to its corresponding
509    /// field's builder (e.g. due to a type mismatch).
510    ///
511    /// # Example
512    ///
513    /// For a schema with fields `[add: Struct, remove: Struct]`, each row should contain exactly 2
514    /// scalars: one for the `add` field and one for the `remove` field.
515    fn create_many(
516        &self,
517        schema: SchemaRef,
518        rows: &[&[Scalar]],
519    ) -> DeltaResult<Box<dyn EngineData>>;
520}
521
522/// Internal trait to allow us to have a private `create_one` API that's implemented for all
523/// EvaluationHandlers.
524// For some reason rustc doesn't detect it's usage so we allow(dead_code) here...
525#[allow(dead_code)]
526#[internal_api]
527trait EvaluationHandlerExtension: EvaluationHandler {
528    /// Create a single-row [`EngineData`] by applying the given schema to the leaf-values given in
529    /// `values`.
530    // Note: we will stick with a Schema instead of DataType (more constrained can expand in
531    // future)
532    fn create_one(&self, schema: SchemaRef, values: &[Scalar]) -> DeltaResult<Box<dyn EngineData>> {
533        // just get a single int column (arbitrary)
534        let null_row_schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable(
535            "null_col",
536            DataType::INTEGER,
537        )]));
538        let null_row = self.null_row(null_row_schema.clone())?;
539
540        // Convert schema and leaf values to an expression
541        let row_expr = literal_expression_transform(schema.as_ref(), values)?;
542
543        let eval =
544            self.new_expression_evaluator(null_row_schema, row_expr.into(), schema.into())?;
545        eval.evaluate(null_row.as_ref())
546    }
547}
548
549// Auto-implement the extension trait for all EvaluationHandlers
550impl<T: EvaluationHandler + ?Sized> EvaluationHandlerExtension for T {}
551
552/// A trait that allows converting a type into (single-row) EngineData
553///
554/// This is typically used with the `#[derive(IntoEngineData)]` macro
555/// which leverages the traits `ToDataType` and `Into<Scalar>` for struct fields
556/// to convert a struct into EngineData.
557///
558/// # Example
559/// ```ignore
560/// # use std::sync::Arc;
561/// # use delta_kernel_derive::{Schema, IntoEngineData};
562///
563/// #[derive(Schema, IntoEngineData)]
564/// struct MyStruct {
565///    a: i32,
566///    b: String,
567/// }
568///
569/// let my_struct = MyStruct { a: 42, b: "Hello".to_string() };
570/// // typically used with ToSchema
571/// let schema = Arc::new(MyStruct::to_schema());
572/// // single-row EngineData
573/// let engine = todo!(); // create an engine
574/// let engine_data = my_struct.into_engine_data(schema, engine);
575/// ```
576#[internal_api]
577pub(crate) trait IntoEngineData {
578    /// Consume this type to produce a single-row EngineData using the provided schema.
579    fn into_engine_data(
580        self,
581        schema: SchemaRef,
582        engine: &dyn Engine,
583    ) -> DeltaResult<Box<dyn EngineData>>;
584}
585
586/// Provides file system related functionalities to Delta Kernel.
587///
588/// Delta Kernel uses this handler whenever it needs to access the underlying
589/// file system where the Delta table is present. Connector implementation of
590/// this trait can hide filesystem specific details from Delta Kernel.
591pub trait StorageHandler: AsAny {
592    /// Recursively list files whose full path is lexicographically greater than (UTF-8 sorting)
593    /// the given `path`, restricted to descendants of `path`'s parent directory. The result must
594    /// be sorted by the full path (UTF-8 byte order).
595    ///
596    /// The listing is **recursive**: files in nested subdirectories are included, not just files
597    /// directly under the parent. For example, listing from `dir/0001.json` may return
598    /// `dir/0002.json`, `dir/sub/0003.json`, and `dir/sub/nested/0004.json`, all interleaved
599    /// in lexicographic order.
600    ///
601    /// The parent directory is derived from `path`:
602    /// - If `path` is directory-like (ends with `/`), the parent is `path` itself and the result
603    ///   contains all files at or below that directory.
604    /// - Otherwise, the parent is the directory containing `path`, and only files (at any depth
605    ///   under that parent) whose full path sorts strictly greater than `path` are returned.
606    fn list_from(&self, path: &Url)
607        -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>>;
608
609    /// Read data specified by the start and end offset from the file.
610    fn read_files(
611        &self,
612        files: Vec<FileSlice>,
613    ) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>>;
614
615    /// Copy a file atomically from source to destination. If the destination file already exists,
616    /// it must return Err(Error::FileAlreadyExists).
617    fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()>;
618
619    /// Write data to the specified path.
620    ///
621    /// If `overwrite` is false and the file already exists, this must return
622    /// `Err(Error::FileAlreadyExists)`.
623    fn put(&self, path: &Url, data: Bytes, overwrite: bool) -> DeltaResult<()>;
624
625    /// Perform a HEAD request for the given file at a Url, returning the file metadata.
626    ///
627    /// If the file does not exist, this must return an `Err` with [`Error::FileNotFound`].
628    fn head(&self, path: &Url) -> DeltaResult<FileMeta>;
629}
630
631/// Provides JSON handling functionality to Delta Kernel.
632///
633/// Delta Kernel can use this handler to parse JSON strings into Row or read content from JSON
634/// files. Connectors can leverage this trait to provide their best implementation of the JSON
635/// parsing capability to Delta Kernel.
636pub trait JsonHandler: AsAny {
637    /// Parse the given json strings and return the fields requested by output schema as columns in
638    /// [`EngineData`]. json_strings MUST be a single column batch of engine data, and the
639    /// column type must be string
640    fn parse_json(
641        &self,
642        json_strings: Box<dyn EngineData>,
643        output_schema: SchemaRef,
644    ) -> DeltaResult<Box<dyn EngineData>>;
645
646    /// Read and parse the JSON format file at given locations and return the data as EngineData
647    /// with the columns requested by physical schema. Note: The [`FileDataReadResultIterator`]
648    /// must emit data from files in the order that `files` is given. For example if files ["a",
649    /// "b"] is provided, then the engine data iterator must first return all the engine data
650    /// from file "a", _then_ all the engine data from file "b". Moreover, for a given file, all
651    /// of its [`EngineData`] and constituent rows must be in order that they occur in the file.
652    /// Consider a file with rows (1, 2, 3). The following are legal iterator batches:
653    ///    iter: [EngineData(1, 2), EngineData(3)]
654    ///    iter: [EngineData(1), EngineData(2, 3)]
655    ///    iter: [EngineData(1, 2, 3)]
656    /// The following are illegal batches:
657    ///    iter: [EngineData(3), EngineData(1, 2)]
658    ///    iter: [EngineData(1), EngineData(3, 2)]
659    ///    iter: [EngineData(2, 1, 3)]
660    ///
661    /// Additionally, engines may not merge engine data across file boundaries.
662    ///
663    /// # Parameters
664    ///
665    /// - `files` - File metadata for files to be read.
666    /// - `physical_schema` - Select list of columns to read from the JSON file.
667    /// - `predicate` - Optional push-down predicate hint (engine is free to ignore it).
668    fn read_json_files(
669        &self,
670        files: &[FileMeta],
671        physical_schema: SchemaRef,
672        predicate: Option<PredicateRef>,
673    ) -> DeltaResult<FileDataReadResultIterator>;
674
675    /// Atomically (!) write a single JSON file. Each row of the input data should be written as a
676    /// new JSON object appended to the file. this write must:
677    /// (1) serialize the data to newline-delimited json (each row is a json object literal)
678    /// (2) write the data to storage atomically (i.e. if the file already exists, fail unless the
679    ///     overwrite flag is set)
680    ///
681    /// For example, the JSON data should be written as { "column1": "val1", "column2": "val2", .. }
682    /// with each row on a new line.
683    ///
684    /// NOTE: Null columns should not be written to the JSON file. For example, if a row has columns
685    /// ["a", "b"] and the value of "b" is null, the JSON object should be written as
686    /// { "a": "..." }. Note that including nulls is technically valid JSON, but would bloat the
687    /// log, therefore we recommend omitting them.
688    ///
689    /// # Parameters
690    ///
691    /// - `path` - URL specifying the location to write the JSON file
692    /// - `data` - Iterator of EngineData to write to the JSON file. Each row should be written as a
693    ///   new JSON object appended to the file. (that is, the file is newline-delimited JSON, and
694    ///   each row is a JSON object on a single line)
695    /// - `overwrite` - If true, overwrite the file if it exists. If false, the call must fail if
696    ///   the file exists.
697    fn write_json_file(
698        &self,
699        path: &Url,
700        data: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
701        overwrite: bool,
702    ) -> DeltaResult<()>;
703}
704
705/// Reserved field IDs for metadata columns in Delta tables.
706///
707/// These field IDs are reserved and should not be used for regular table columns.
708/// They are used to provide file-level metadata as virtual columns during reads.
709pub mod reserved_field_ids {
710    /// Reserved field ID for the file name metadata column (`_file`).
711    /// This column provides the name of the Parquet file that contains each row.
712    pub const FILE_NAME: i64 = 2147483646;
713}
714
715/// Metadata from a Parquet file footer.
716///
717/// This struct contains metadata extracted from a Parquet file's footer, including the schema.
718/// It is designed to be extensible for future additions such as row group statistics.
719#[derive(Debug, Clone)]
720pub struct ParquetFooter {
721    /// The schema of the Parquet file, converted to Delta Kernel's schema format.
722    pub schema: SchemaRef,
723}
724
725/// Provides Parquet file related functionalities to Delta Kernel.
726///
727/// Connectors can leverage this trait to provide their own custom
728/// implementation of Parquet data file functionalities to Delta Kernel.
729pub trait ParquetHandler: AsAny {
730    /// Read and parse the Parquet file at given locations and return the data as EngineData with
731    /// the columns requested by physical schema. The ParquetHandler _must_ return exactly the
732    /// columns specified in `physical_schema`, and they _must_ be in schema order.
733    ///
734    /// # Resolving Parquet schema to the physical schema
735    ///
736    /// When reading the Parquet file, the columns are resolved from the Parquet schema to the
737    /// kernel's `physical_schema`. To do so, the parquet reader must match each Parquet column
738    /// to a [`StructField`] in the `physical_schema`. All columns in the returned `EngineData`
739    /// must be in the same order as specified in `physical_schema`.
740    ///
741    /// Parquet columns are matched to `physical_schema` [`StructField`]s using the following rules:
742    /// 1. **Field ID**: If a [`StructField`] in `physical_schema` contains a field ID (specified in
743    ///    [`ColumnMetadataKey::ParquetFieldId`] metadata), use the ID to match the Parquet column's
744    ///    field id
745    /// 2. **Field Name**: If no field ID is present in the `physical_schema`'s [`StructField`] or
746    ///    no matching parquet field ID is found, fall back to matching by column name
747    ///
748    /// # Metadata Columns
749    ///
750    /// The ParquetHandler must support virtual metadata columns that provide additional information
751    /// about each row. These columns are not stored in the Parquet file but are generated at read
752    /// time.
753    ///
754    /// ## Row Index Column
755    ///
756    /// When a column in `physical_schema` is marked as a row index metadata column (via
757    /// [`StructField::create_metadata_column`] with [`schema::MetadataColumnSpec::RowIndex`]), the
758    /// ParquetHandler must populate it with the 0-based row position within the Parquet file:
759    ///
760    /// - **Column name**: User-specified (commonly `"row_index"` or `"_metadata.row_index"`)
761    /// - **Type**: `LONG` (non-nullable)
762    /// - **Values**: Sequential integers starting at 0 for each file
763    /// - **Use case**: Track row positions for downstream processing, or internally used to compute
764    ///   Row IDs
765    ///
766    /// Example: A file with 5 rows would have row_index values `[0, 1, 2, 3, 4]`.
767    ///
768    /// ## File Name Column (Reserved Field ID)
769    ///
770    /// When a column in `physical_schema` has the reserved field ID
771    /// [`reserved_field_ids::FILE_NAME`] (2147483646), the ParquetHandler must populate it
772    /// with the file path/name:
773    ///
774    /// - **Column name**: `"_file"`
775    /// - **Type**: `STRING` (non-nullable)
776    /// - **Field ID**: 2147483646 (reserved)
777    /// - **Values**: The file path/URL (e.g., `"s3://bucket/path/file.parquet"`)
778    /// - **Use case**: Track which file each row came from in multi-file reads
779    ///
780    /// Example: All rows from the same file would have the same `_file` value.
781    ///
782    /// ## Metadata Column Examples
783    ///
784    /// ```rust,ignore
785    /// use delta_kernel::schema::{StructType, StructField, DataType, MetadataColumnSpec};
786    ///
787    /// // Example 1: Schema with row_index metadata column
788    /// let schema_with_row_index = StructType::try_new([
789    ///     StructField::nullable("id", DataType::INTEGER),
790    ///     StructField::create_metadata_column("row_index", MetadataColumnSpec::RowIndex),
791    ///     StructField::nullable("value", DataType::STRING),
792    /// ])?;
793    ///
794    /// // Example 2: Schema with _file metadata column (using reserved field ID)
795    /// let schema_with_file_path = StructType::try_new([
796    ///     StructField::nullable("id", DataType::INTEGER),
797    ///     StructField::create_metadata_column("_file", MetadataColumnSpec::FilePath),
798    ///     StructField::nullable("value", DataType::STRING),
799    /// ])?;
800    /// ```
801    ///
802    /// ---
803    ///
804    ///  If no matching Parquet column is found, `NULL` values are returned
805    ///  for nullable columns in `physical_schema`. For non-nullable columns, an error is returned.
806    ///
807    ///
808    /// ## Column Matching Examples
809    ///
810    /// Consider a `physical_schema` with the following fields:
811    /// - Column 0:  `"i_logical"` (integer, non-null) with field ID 1 (via
812    ///   [`ColumnMetadataKey::ParquetFieldId`])
813    /// - Column 1: `"s"` (string, nullable) with no field ID metadata
814    /// - Column 2: `"i2"` (integer, nullable) with no field ID metadata
815    ///
816    /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey::ParquetFieldId
817    ///
818    /// And a Parquet file containing these columns:
819    /// - Column 0: `"i2"` (integer, nullable) with field ID 3
820    /// - Column 1: `"i"` (integer, non-null) with field ID 1
821    /// - No `"s"` column present
822    ///
823    /// The column matching would work as follows:
824    /// - `"i_logical"` matches `"i"` by field ID (both have ID 1)
825    /// - `"i2"` matches `"i2"` by column name (no field ID to match on)
826    /// - `"s"` has no matching Parquet column, so NULL values are returned
827    ///
828    /// The returned data will contain exactly 3 columns in physical schema order:
829    /// `{i_logical: parquet[1], s: NULL.., i2: parquet[0]}`
830    ///
831    /// # Parameters
832    ///
833    /// - `files` - File metadata for files to be read.
834    /// - `physical_schema` - Select list and order of columns to read from the Parquet file.
835    /// - `predicate` - Optional push-down predicate hint (engine is free to ignore it).
836    ///
837    /// # Returns
838    /// A [`DeltaResult`] containing a [`FileDataReadResultIterator`].
839    /// Each element of the iterator is a [`DeltaResult`] of [`EngineData`]. The [`EngineData`]
840    /// has the contents of `files` and must match the provided `physical_schema`.
841    ///
842    /// Note: The [`FileDataReadResultIterator`] must emit data from files in the order that `files`
843    /// is given. For example if files ["a", "b"] is provided, then the engine data iterator must
844    /// first return all the engine data from file "a", _then_ all the engine data from file "b".
845    /// Moreover, for a given file, all of its [`EngineData`] and constituent rows must be in order
846    /// that they occur in the file. Consider a file with rows
847    /// (1, 2, 3). The following are legal iterator batches:
848    ///    iter: [EngineData(1, 2), EngineData(3)]
849    ///    iter: [EngineData(1), EngineData(2, 3)]
850    ///    iter: [EngineData(1, 2, 3)]
851    /// The following are illegal batches:
852    ///    iter: [EngineData(3), EngineData(1, 2)]
853    ///    iter: [EngineData(1), EngineData(3, 2)]
854    ///    iter: [EngineData(2, 1, 3)]
855    ///
856    /// Additionally, engines must not merge engine data across file boundaries.
857    ///
858    /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey
859    fn read_parquet_files(
860        &self,
861        files: &[FileMeta],
862        physical_schema: SchemaRef,
863        predicate: Option<PredicateRef>,
864    ) -> DeltaResult<FileDataReadResultIterator>;
865
866    /// Write data to a Parquet file at the specified URL.
867    ///
868    /// This method writes the provided `data` to a Parquet file at the given `url`.
869    ///
870    /// This will overwrite the file if it already exists. For filesystem-backed
871    /// implementations, the parent directories must be created if they do not exist.
872    ///
873    /// # Parquet field IDs
874    ///
875    /// The engine must write a Parquet `field_id` correctly when the kernel
876    /// [`StructField`] carries a field-id related annotation, including:
877    /// - [`ColumnMetadataKey::ColumnMappingId`] / [`ColumnMetadataKey::ParquetFieldId`]
878    /// - [`ColumnMetadataKey::ColumnMappingNestedIds`]
879    ///
880    /// For how to use these keys, refer to the Delta protocol's [Column Mapping] and
881    /// [IcebergCompatV2] sections.
882    ///
883    /// **Non-compliance produces files with incorrect `field_id`s**, which may lead to
884    /// read failures when column mapping mode is `id` and to failures when converting
885    /// the table to Iceberg.   
886    ///
887    /// # Parameters
888    ///
889    /// - `url` - The full URL path where the Parquet file should be written (e.g.,
890    ///   `s3://bucket/path/file.parquet`).
891    /// - `data` - An iterator of engine data to be written to the Parquet file.
892    ///
893    /// # Returns
894    ///
895    /// A [`DeltaResult`] indicating success or failure.
896    ///
897    /// [`StructField`]: crate::schema::StructField
898    /// [`ColumnMetadataKey::ColumnMappingId`]: crate::schema::ColumnMetadataKey::ColumnMappingId
899    /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey::ParquetFieldId
900    /// [`ColumnMetadataKey::ColumnMappingNestedIds`]: crate::schema::ColumnMetadataKey::ColumnMappingNestedIds
901    /// [Column Mapping]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#column-mapping
902    /// [IcebergCompatV2]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#iceberg-compatibility-v2
903    fn write_parquet_file(
904        &self,
905        location: url::Url,
906        data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>,
907    ) -> DeltaResult<()>;
908
909    /// Read the footer metadata from a Parquet file without reading the data.
910    ///
911    /// This method reads only the Parquet file footer (metadata section), which is useful for
912    /// schema inspection, compatibility checking, and determining whether parsed statistics
913    /// columns are present and compatible with the current table schema.
914    ///
915    /// # Parameters
916    ///
917    /// - `file` - File metadata for the Parquet file whose footer should be read. The `size` field
918    ///   should contain the actual file size to enable efficient footer reads without additional
919    ///   I/O operations.
920    ///
921    /// # Returns
922    ///
923    /// A [`DeltaResult`] containing a [`ParquetFooter`] with the Parquet file's metadata, including
924    /// the schema converted to Delta Kernel's format.
925    ///
926    /// # Field IDs
927    ///
928    /// If the Parquet file contains field IDs (written when column mapping is enabled), they are
929    /// preserved in each [`StructField`]'s metadata. Callers can access field IDs via
930    /// [`StructField::get_config_value`] with [`ColumnMetadataKey::ParquetFieldId`].
931    ///
932    /// # Errors
933    ///
934    /// Returns an error if:
935    /// - The file cannot be accessed or does not exist
936    /// - The file is not a valid Parquet file
937    /// - The footer cannot be read or parsed
938    /// - The schema cannot be converted to Delta Kernel's format
939    ///
940    /// [`StructField`]: crate::schema::StructField
941    /// [`StructField::get_config_value`]: crate::schema::StructField::get_config_value
942    /// [`ColumnMetadataKey::ParquetFieldId`]: crate::schema::ColumnMetadataKey::ParquetFieldId
943    fn read_parquet_footer(&self, file: &FileMeta) -> DeltaResult<ParquetFooter>;
944}
945
946/// The `Engine` trait encapsulates all the functionality an engine or connector needs to provide
947/// to the Delta Kernel in order to read the Delta table.
948///
949/// Engines/Connectors are expected to pass an implementation of this trait when reading a Delta
950/// table.
951pub trait Engine: AsAny {
952    /// Get the connector provided [`EvaluationHandler`].
953    fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler>;
954
955    /// Get the connector provided [`StorageHandler`]
956    fn storage_handler(&self) -> Arc<dyn StorageHandler>;
957
958    /// Get the connector provided [`JsonHandler`].
959    fn json_handler(&self) -> Arc<dyn JsonHandler>;
960
961    /// Get the connector provided [`ParquetHandler`].
962    fn parquet_handler(&self) -> Arc<dyn ParquetHandler>;
963}
964
965// we have an 'internal' feature flag: default-engine-base, which is actually just the shared
966// pieces of default-engine-native-tls and default-engine-rustls. the crate can't compile with
967// _only_ default-engine-base, so we give a friendly error here.
968#[cfg(all(
969    feature = "default-engine-base",
970    not(any(
971        feature = "default-engine-native-tls",
972        feature = "default-engine-rustls",
973    ))
974))]
975compile_error!(
976    "The default-engine-base feature flag is not meant to be used directly. \
977    Please use either default-engine-native-tls or default-engine-rustls."
978);
979
980// Rustdoc's documentation tests can do some things that regular unit tests can't. Here we are
981// using doctests to test macros. Specifically, we are testing for failed macro invocations due
982// to invalid input, not the macro output when the macro invocation is successful (which can/should
983// be done in unit tests). This module is not exclusively for macro tests only so other doctests can
984// also be added. https://doc.rust-lang.org/rustdoc/write-documentation/documentation-tests.html#include-items-only-when-collecting-doctests
985#[cfg(doctest)]
986mod doctests;
987
988#[cfg(test)]
989mod tests {
990    use rstest::rstest;
991
992    use super::*;
993
994    #[rstest]
995    #[case::zero(0, Some(0))]
996    #[case::one(1, Some(1))]
997    #[case::i64_max(i64::MAX as u64, Some(i64::MAX))]
998    #[case::just_over_i64_max(i64::MAX as u64 + 1, None)]
999    #[case::u64_max(u64::MAX, None)]
1000    /// Tests `FileMeta::size_as_i64` for both success (size fits in i64) and error (size exceeds
1001    /// i64::MAX) paths.
1002    fn test_file_meta_size_as_i64(#[case] size: u64, #[case] expected: Option<i64>) {
1003        let meta = FileMeta::new(Url::parse("file:///x").unwrap(), 0, size);
1004        match expected {
1005            Some(v) => assert_eq!(meta.size_as_i64().unwrap(), v),
1006            None => assert!(meta
1007                .size_as_i64()
1008                .unwrap_err()
1009                .to_string()
1010                .contains("exceeds i64::MAX")),
1011        }
1012    }
1013}