#![cfg_attr(all(doc, NIGHTLY_CHANNEL), feature(doc_cfg))]
#![warn(
unreachable_pub,
trivial_numeric_casts,
unused_extern_crates,
rust_2018_idioms,
rust_2021_compatibility,
clippy::unwrap_used,
clippy::expect_used,
clippy::panic
)]
#![cfg_attr(test, allow(clippy::unwrap_used, clippy::expect_used, clippy::panic))]
#[allow(unused_extern_crates)]
extern crate self as delta_kernel;
use std::any::Any;
use std::cmp::Ordering;
use std::fs::DirEntry;
use std::ops::Range;
use std::sync::Arc;
use std::time::SystemTime;
use bytes::Bytes;
use url::Url;
use self::schema::{DataType, SchemaRef};
mod action_reconciliation;
pub mod actions;
pub mod checkpoint;
pub mod committer;
#[cfg(feature = "test-utils")]
pub mod crc;
#[cfg(not(feature = "test-utils"))]
pub(crate) mod crc;
pub mod engine_data;
pub mod error;
pub mod expressions;
mod log_compaction;
mod log_path;
mod log_reader;
pub mod metrics;
pub mod partition;
pub mod scan;
pub mod schema;
pub mod snapshot;
pub mod table_changes;
pub mod table_configuration;
pub mod table_features;
pub mod table_properties;
pub mod transaction;
pub mod transforms;
pub use crc::{FileSizeHistogram, FileStats};
pub use log_path::LogPath;
#[cfg(feature = "test-utils")]
pub mod row_tracking;
#[cfg(not(feature = "test-utils"))]
pub(crate) mod row_tracking;
pub(crate) mod clustering;
mod arrow_compat;
#[cfg(any(feature = "arrow-57", feature = "arrow-58"))]
pub use arrow_compat::*;
#[cfg(feature = "internal-api")]
pub mod column_trie;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod column_trie;
pub mod kernel_predicates;
pub(crate) mod utils;
#[cfg(feature = "internal-api")]
pub use utils::try_parse_uri;
#[cfg(feature = "internal-api")]
pub mod path;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod path;
#[cfg(feature = "internal-api")]
pub mod log_replay;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod log_replay;
#[cfg(feature = "internal-api")]
pub mod log_segment;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod log_segment;
#[cfg(feature = "internal-api")]
pub mod last_checkpoint_hint;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod last_checkpoint_hint;
pub(crate) mod log_segment_files;
#[cfg(feature = "internal-api")]
pub mod history_manager;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod history_manager;
#[cfg(feature = "internal-api")]
pub mod parallel;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod parallel;
pub use action_reconciliation::{ActionReconciliationIterator, ActionReconciliationIteratorState};
pub use delta_kernel_derive;
use delta_kernel_derive::internal_api;
pub use engine_data::{
EngineData, FilteredEngineData, FilteredRowVisitor, GetData, RowIndexIterator, RowVisitor,
};
pub use error::{DeltaResult, Error};
use expressions::{literal_expression_transform, Scalar};
pub use expressions::{Expression, ExpressionRef, Predicate, PredicateRef};
pub use log_compaction::{should_compact, LogCompactionWriter};
pub use metrics::MetricsReporter;
use schema::{StructField, StructType};
pub use snapshot::{Snapshot, SnapshotRef};
#[cfg(any(
feature = "default-engine-native-tls",
feature = "default-engine-rustls",
feature = "arrow-conversion"
))]
pub mod engine;
pub type Version = u64;
pub type FileSize = u64;
pub type FileIndex = u64;
pub type FileSlice = (Url, Option<Range<FileIndex>>);
pub type FileDataReadResult = (FileMeta, Box<dyn EngineData>);
pub type FileDataReadResultIterator =
Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileMeta {
pub location: Url,
pub last_modified: i64,
pub size: FileSize,
}
impl Ord for FileMeta {
fn cmp(&self, other: &Self) -> Ordering {
self.location.cmp(&other.location)
}
}
impl PartialOrd for FileMeta {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl TryFrom<DirEntry> for FileMeta {
type Error = Error;
fn try_from(ent: DirEntry) -> DeltaResult<FileMeta> {
let metadata = ent.metadata()?;
let last_modified = metadata
.modified()?
.duration_since(SystemTime::UNIX_EPOCH)
.map_err(|_| Error::generic("Failed to convert file timestamp to milliseconds"))?;
let location = Url::from_file_path(ent.path())
.map_err(|_| Error::generic(format!("Invalid path: {:?}", ent.path())))?;
let last_modified = last_modified.as_millis().try_into().map_err(|_| {
Error::generic(format!(
"Failed to convert file modification time {:?} into i64",
last_modified.as_millis()
))
})?;
Ok(FileMeta {
location,
last_modified,
size: metadata.len(),
})
}
}
impl FileMeta {
pub fn new(location: Url, last_modified: i64, size: u64) -> Self {
Self {
location,
last_modified,
size,
}
}
}
pub trait AsAny: Any + Send + Sync {
fn any_ref(&self) -> &(dyn Any + Send + Sync);
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync>;
fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync>;
fn type_name(&self) -> &'static str;
}
impl<T: Any + Send + Sync> AsAny for T {
fn any_ref(&self) -> &(dyn Any + Send + Sync) {
self
}
fn as_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self
}
fn into_any(self: Box<Self>) -> Box<dyn Any + Send + Sync> {
self
}
fn type_name(&self) -> &'static str {
std::any::type_name::<Self>()
}
}
pub trait DynPartialEq: AsAny {
fn dyn_eq(&self, other: &dyn Any) -> bool;
}
impl<T: PartialEq + AsAny> DynPartialEq for T {
fn dyn_eq(&self, other: &dyn Any) -> bool {
other.downcast_ref::<T>().is_some_and(|other| self == other)
}
}
pub trait ExpressionEvaluator: AsAny {
fn evaluate(&self, batch: &dyn EngineData) -> DeltaResult<Box<dyn EngineData>>;
}
pub trait PredicateEvaluator: AsAny {
fn evaluate(&self, batch: &dyn EngineData) -> DeltaResult<Box<dyn EngineData>>;
}
pub trait EvaluationHandler: AsAny {
fn new_expression_evaluator(
&self,
input_schema: SchemaRef,
expression: ExpressionRef,
output_type: DataType,
) -> DeltaResult<Arc<dyn ExpressionEvaluator>>;
fn new_predicate_evaluator(
&self,
input_schema: SchemaRef,
predicate: PredicateRef,
) -> DeltaResult<Arc<dyn PredicateEvaluator>>;
fn null_row(&self, output_schema: SchemaRef) -> DeltaResult<Box<dyn EngineData>>;
fn create_many(
&self,
schema: SchemaRef,
rows: &[&[Scalar]],
) -> DeltaResult<Box<dyn EngineData>>;
}
#[allow(dead_code)]
#[internal_api]
trait EvaluationHandlerExtension: EvaluationHandler {
fn create_one(&self, schema: SchemaRef, values: &[Scalar]) -> DeltaResult<Box<dyn EngineData>> {
let null_row_schema = Arc::new(StructType::new_unchecked(vec![StructField::nullable(
"null_col",
DataType::INTEGER,
)]));
let null_row = self.null_row(null_row_schema.clone())?;
let row_expr = literal_expression_transform(schema.as_ref(), values)?;
let eval =
self.new_expression_evaluator(null_row_schema, row_expr.into(), schema.into())?;
eval.evaluate(null_row.as_ref())
}
}
impl<T: EvaluationHandler + ?Sized> EvaluationHandlerExtension for T {}
#[internal_api]
pub(crate) trait IntoEngineData {
fn into_engine_data(
self,
schema: SchemaRef,
engine: &dyn Engine,
) -> DeltaResult<Box<dyn EngineData>>;
}
pub trait StorageHandler: AsAny {
fn list_from(&self, path: &Url)
-> DeltaResult<Box<dyn Iterator<Item = DeltaResult<FileMeta>>>>;
fn read_files(
&self,
files: Vec<FileSlice>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Bytes>>>>;
fn copy_atomic(&self, src: &Url, dest: &Url) -> DeltaResult<()>;
fn put(&self, path: &Url, data: Bytes, overwrite: bool) -> DeltaResult<()>;
fn head(&self, path: &Url) -> DeltaResult<FileMeta>;
}
pub trait JsonHandler: AsAny {
fn parse_json(
&self,
json_strings: Box<dyn EngineData>,
output_schema: SchemaRef,
) -> DeltaResult<Box<dyn EngineData>>;
fn read_json_files(
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
predicate: Option<PredicateRef>,
) -> DeltaResult<FileDataReadResultIterator>;
fn write_json_file(
&self,
path: &Url,
data: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
overwrite: bool,
) -> DeltaResult<()>;
}
pub mod reserved_field_ids {
pub const FILE_NAME: i64 = 2147483646;
}
#[derive(Debug, Clone)]
pub struct ParquetFooter {
pub schema: SchemaRef,
}
pub trait ParquetHandler: AsAny {
fn read_parquet_files(
&self,
files: &[FileMeta],
physical_schema: SchemaRef,
predicate: Option<PredicateRef>,
) -> DeltaResult<FileDataReadResultIterator>;
fn write_parquet_file(
&self,
location: url::Url,
data: Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>,
) -> DeltaResult<()>;
fn read_parquet_footer(&self, file: &FileMeta) -> DeltaResult<ParquetFooter>;
}
pub trait Engine: AsAny {
fn evaluation_handler(&self) -> Arc<dyn EvaluationHandler>;
fn storage_handler(&self) -> Arc<dyn StorageHandler>;
fn json_handler(&self) -> Arc<dyn JsonHandler>;
fn parquet_handler(&self) -> Arc<dyn ParquetHandler>;
fn get_metrics_reporter(&self) -> Option<Arc<dyn MetricsReporter>> {
None
}
}
#[cfg(all(
feature = "default-engine-base",
not(any(
feature = "default-engine-native-tls",
feature = "default-engine-rustls",
))
))]
compile_error!(
"The default-engine-base feature flag is not meant to be used directly. \
Please use either default-engine-native-tls or default-engine-rustls."
);
#[cfg(doctest)]
mod doctests;