deltalake_core/delta_datafusion/
mod.rs

1//! Datafusion integration for Delta Table
2//!
3//! Example:
4//!
5//! ```rust
6//! use std::sync::Arc;
7//! use datafusion::execution::context::SessionContext;
8//!
9//! async {
10//!   let mut ctx = SessionContext::new();
11//!   let table = deltalake_core::open_table("./tests/data/simple_table")
12//!       .await
13//!       .unwrap();
14//!   ctx.register_table("demo", Arc::new(table)).unwrap();
15//!
16//!   let batches = ctx
17//!       .sql("SELECT * FROM demo").await.unwrap()
18//!       .collect()
19//!       .await.unwrap();
20//! };
21//! ```
22
23use std::any::Any;
24use std::borrow::Cow;
25use std::collections::{HashMap, HashSet};
26use std::fmt::{self, Debug};
27use std::sync::Arc;
28
29use arrow_array::types::UInt16Type;
30use arrow_array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
31use arrow_cast::display::array_value_to_string;
32use arrow_cast::{cast_with_options, CastOptions};
33use arrow_schema::{
34    ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema, SchemaRef,
35    SchemaRef as ArrowSchemaRef, TimeUnit,
36};
37use arrow_select::concat::concat_batches;
38use async_trait::async_trait;
39use chrono::{DateTime, TimeZone, Utc};
40use datafusion::catalog::memory::DataSourceExec;
41use datafusion::catalog::{Session, TableProviderFactory};
42use datafusion::config::TableParquetOptions;
43use datafusion::datasource::physical_plan::{
44    wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileGroup, FileScanConfigBuilder,
45    ParquetSource,
46};
47use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
48use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
49use datafusion::execution::runtime_env::RuntimeEnv;
50use datafusion::execution::FunctionRegistry;
51use datafusion::optimizer::simplify_expressions::ExprSimplifier;
52use datafusion::physical_optimizer::pruning::PruningPredicate;
53use datafusion_common::scalar::ScalarValue;
54use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
55use datafusion_common::{
56    config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult,
57    TableReference, ToDFSchema,
58};
59use datafusion_expr::execution_props::ExecutionProps;
60use datafusion_expr::logical_plan::CreateExternalTable;
61use datafusion_expr::simplify::SimplifyContext;
62use datafusion_expr::utils::conjunction;
63use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility};
64use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
65use datafusion_physical_plan::filter::FilterExec;
66use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
67use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
68use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
69use datafusion_physical_plan::projection::ProjectionExec;
70use datafusion_physical_plan::{
71    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
72    Statistics,
73};
74use datafusion_proto::logical_plan::LogicalExtensionCodec;
75use datafusion_proto::physical_plan::PhysicalExtensionCodec;
76use datafusion_sql::planner::ParserOptions;
77use either::Either;
78use futures::TryStreamExt;
79use itertools::Itertools;
80use object_store::ObjectMeta;
81use parking_lot::RwLock;
82use serde::{Deserialize, Serialize};
83
84use url::Url;
85
86use crate::delta_datafusion::expr::parse_predicate_expression;
87use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;
88use crate::errors::{DeltaResult, DeltaTableError};
89use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt};
90use crate::logstore::LogStoreRef;
91use crate::table::builder::ensure_table_uri;
92use crate::table::state::DeltaTableState;
93use crate::table::{Constraint, GeneratedColumn};
94use crate::{open_table, open_table_with_storage_options, DeltaTable};
95
96pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
97
98pub mod cdf;
99pub mod expr;
100pub mod logical;
101pub mod physical;
102pub mod planner;
103
104pub use cdf::scan::DeltaCdfTableProvider;
105
106mod schema_adapter;
107
108impl From<DeltaTableError> for DataFusionError {
109    fn from(err: DeltaTableError) -> Self {
110        match err {
111            DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source, None),
112            DeltaTableError::Io { source } => DataFusionError::IoError(source),
113            DeltaTableError::ObjectStore { source } => DataFusionError::ObjectStore(source),
114            DeltaTableError::Parquet { source } => DataFusionError::ParquetError(source),
115            _ => DataFusionError::External(Box::new(err)),
116        }
117    }
118}
119
120impl From<DataFusionError> for DeltaTableError {
121    fn from(err: DataFusionError) -> Self {
122        match err {
123            DataFusionError::ArrowError(source, _) => DeltaTableError::Arrow { source },
124            DataFusionError::IoError(source) => DeltaTableError::Io { source },
125            DataFusionError::ObjectStore(source) => DeltaTableError::ObjectStore { source },
126            DataFusionError::ParquetError(source) => DeltaTableError::Parquet { source },
127            _ => DeltaTableError::Generic(err.to_string()),
128        }
129    }
130}
131
132/// Convenience trait for calling common methods on snapshot hierarchies
133pub trait DataFusionMixins {
134    /// The physical datafusion schema of a table
135    fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef>;
136
137    /// Get the table schema as an [`ArrowSchemaRef`]
138    fn input_schema(&self) -> DeltaResult<ArrowSchemaRef>;
139
140    /// Parse an expression string into a datafusion [`Expr`]
141    fn parse_predicate_expression(
142        &self,
143        expr: impl AsRef<str>,
144        df_state: &SessionState,
145    ) -> DeltaResult<Expr>;
146}
147
148impl DataFusionMixins for Snapshot {
149    fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
150        _arrow_schema(self, true)
151    }
152
153    fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
154        _arrow_schema(self, false)
155    }
156
157    fn parse_predicate_expression(
158        &self,
159        expr: impl AsRef<str>,
160        df_state: &SessionState,
161    ) -> DeltaResult<Expr> {
162        let schema = DFSchema::try_from(self.arrow_schema()?.as_ref().to_owned())?;
163        parse_predicate_expression(&schema, expr, df_state)
164    }
165}
166
167impl DataFusionMixins for EagerSnapshot {
168    fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
169        self.snapshot().arrow_schema()
170    }
171
172    fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
173        self.snapshot().input_schema()
174    }
175
176    fn parse_predicate_expression(
177        &self,
178        expr: impl AsRef<str>,
179        df_state: &SessionState,
180    ) -> DeltaResult<Expr> {
181        self.snapshot().parse_predicate_expression(expr, df_state)
182    }
183}
184
185impl DataFusionMixins for DeltaTableState {
186    fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
187        self.snapshot.arrow_schema()
188    }
189
190    fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
191        self.snapshot.input_schema()
192    }
193
194    fn parse_predicate_expression(
195        &self,
196        expr: impl AsRef<str>,
197        df_state: &SessionState,
198    ) -> DeltaResult<Expr> {
199        self.snapshot.parse_predicate_expression(expr, df_state)
200    }
201}
202
203fn _arrow_schema(snapshot: &Snapshot, wrap_partitions: bool) -> DeltaResult<ArrowSchemaRef> {
204    let meta = snapshot.metadata();
205
206    let schema = meta.schema()?;
207    let fields = schema
208        .fields()
209        .filter(|f| !meta.partition_columns.contains(&f.name().to_string()))
210        .map(|f| f.try_into())
211        .chain(
212            // We need stable order between logical and physical schemas, but the order of
213            // partitioning columns is not always the same in the json schema and the array
214            meta.partition_columns.iter().map(|partition_col| {
215                let f = schema.field(partition_col).unwrap();
216                let field = Field::try_from(f)?;
217                let corrected = if wrap_partitions {
218                    match field.data_type() {
219                        // Only dictionary-encode types that may be large
220                        // // https://github.com/apache/arrow-datafusion/pull/5545
221                        ArrowDataType::Utf8
222                        | ArrowDataType::LargeUtf8
223                        | ArrowDataType::Binary
224                        | ArrowDataType::LargeBinary => {
225                            wrap_partition_type_in_dict(field.data_type().clone())
226                        }
227                        _ => field.data_type().clone(),
228                    }
229                } else {
230                    field.data_type().clone()
231                };
232                Ok(field.with_data_type(corrected))
233            }),
234        )
235        .collect::<Result<Vec<Field>, _>>()?;
236
237    Ok(Arc::new(ArrowSchema::new(fields)))
238}
239
240pub(crate) fn files_matching_predicate<'a>(
241    snapshot: &'a EagerSnapshot,
242    filters: &[Expr],
243) -> DeltaResult<impl Iterator<Item = Add> + 'a> {
244    if let Some(Some(predicate)) =
245        (!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
246    {
247        let expr = SessionContext::new()
248            .create_physical_expr(predicate, &snapshot.arrow_schema()?.to_dfschema()?)?;
249        let pruning_predicate = PruningPredicate::try_new(expr, snapshot.arrow_schema()?)?;
250        Ok(Either::Left(
251            snapshot
252                .file_actions()?
253                .zip(pruning_predicate.prune(snapshot)?)
254                .filter_map(
255                    |(action, keep_file)| {
256                        if keep_file {
257                            Some(action)
258                        } else {
259                            None
260                        }
261                    },
262                ),
263        ))
264    } else {
265        Ok(Either::Right(snapshot.file_actions()?))
266    }
267}
268
269pub(crate) fn get_path_column<'a>(
270    batch: &'a RecordBatch,
271    path_column: &str,
272) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
273    let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
274    batch
275        .column_by_name(path_column)
276        .unwrap()
277        .as_any()
278        .downcast_ref::<DictionaryArray<UInt16Type>>()
279        .ok_or_else(err)?
280        .downcast_dict::<StringArray>()
281        .ok_or_else(err)
282}
283
284impl DeltaTableState {
285    /// Provide table level statistics to Datafusion
286    pub fn datafusion_table_statistics(&self) -> Option<Statistics> {
287        self.snapshot.datafusion_table_statistics()
288    }
289}
290
291// each delta table must register a specific object store, since paths are internally
292// handled relative to the table root.
293pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
294    let object_store_url = store.object_store_url();
295    let url: &Url = object_store_url.as_ref();
296    env.register_object_store(url, store.object_store(None));
297}
298
299/// The logical schema for a Deltatable is different from the protocol level schema since partition
300/// columns must appear at the end of the schema. This is to align with how partition are handled
301/// at the physical level
302pub(crate) fn df_logical_schema(
303    snapshot: &DeltaTableState,
304    file_column_name: &Option<String>,
305    schema: Option<ArrowSchemaRef>,
306) -> DeltaResult<SchemaRef> {
307    let input_schema = match schema {
308        Some(schema) => schema,
309        None => snapshot.input_schema()?,
310    };
311    let table_partition_cols = &snapshot.metadata().partition_columns;
312
313    let mut fields: Vec<Arc<Field>> = input_schema
314        .fields()
315        .iter()
316        .filter(|f| !table_partition_cols.contains(f.name()))
317        .cloned()
318        .collect();
319
320    for partition_col in table_partition_cols.iter() {
321        fields.push(Arc::new(
322            input_schema
323                .field_with_name(partition_col)
324                .unwrap()
325                .to_owned(),
326        ));
327    }
328
329    if let Some(file_column_name) = file_column_name {
330        fields.push(Arc::new(Field::new(
331            file_column_name,
332            ArrowDataType::Utf8,
333            true,
334        )));
335    }
336
337    Ok(Arc::new(ArrowSchema::new(fields)))
338}
339
340#[derive(Debug, Clone)]
341/// Used to specify if additional metadata columns are exposed to the user
342pub struct DeltaScanConfigBuilder {
343    /// Include the source path for each record. The name of this column is determined by `file_column_name`
344    include_file_column: bool,
345    /// Column name that contains the source path.
346    ///
347    /// If include_file_column is true and the name is None then it will be auto-generated
348    /// Otherwise the user provided name will be used
349    file_column_name: Option<String>,
350    /// Whether to wrap partition values in a dictionary encoding to potentially save space
351    wrap_partition_values: Option<bool>,
352    /// Whether to push down filter in end result or just prune the files
353    enable_parquet_pushdown: bool,
354    /// Schema to scan table with
355    schema: Option<SchemaRef>,
356}
357
358impl Default for DeltaScanConfigBuilder {
359    fn default() -> Self {
360        DeltaScanConfigBuilder {
361            include_file_column: false,
362            file_column_name: None,
363            wrap_partition_values: None,
364            enable_parquet_pushdown: true,
365            schema: None,
366        }
367    }
368}
369
370impl DeltaScanConfigBuilder {
371    /// Construct a new instance of `DeltaScanConfigBuilder`
372    pub fn new() -> Self {
373        Self::default()
374    }
375
376    /// Indicate that a column containing a records file path is included.
377    /// Column name is generated and can be determined once this Config is built
378    pub fn with_file_column(mut self, include: bool) -> Self {
379        self.include_file_column = include;
380        self.file_column_name = None;
381        self
382    }
383
384    /// Indicate that a column containing a records file path is included and column name is user defined.
385    pub fn with_file_column_name<S: ToString>(mut self, name: &S) -> Self {
386        self.file_column_name = Some(name.to_string());
387        self.include_file_column = true;
388        self
389    }
390
391    /// Whether to wrap partition values in a dictionary encoding
392    pub fn wrap_partition_values(mut self, wrap: bool) -> Self {
393        self.wrap_partition_values = Some(wrap);
394        self
395    }
396
397    /// Allow pushdown of the scan filter
398    /// When disabled the filter will only be used for pruning files
399    pub fn with_parquet_pushdown(mut self, pushdown: bool) -> Self {
400        self.enable_parquet_pushdown = pushdown;
401        self
402    }
403
404    /// Use the provided [SchemaRef] for the [DeltaScan]
405    pub fn with_schema(mut self, schema: SchemaRef) -> Self {
406        self.schema = Some(schema);
407        self
408    }
409
410    /// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing
411    pub fn build(&self, snapshot: &DeltaTableState) -> DeltaResult<DeltaScanConfig> {
412        let file_column_name = if self.include_file_column {
413            let input_schema = snapshot.input_schema()?;
414            let mut column_names: HashSet<&String> = HashSet::new();
415            for field in input_schema.fields.iter() {
416                column_names.insert(field.name());
417            }
418
419            match &self.file_column_name {
420                Some(name) => {
421                    if column_names.contains(name) {
422                        return Err(DeltaTableError::Generic(format!(
423                            "Unable to add file path column since column with name {name} exits"
424                        )));
425                    }
426
427                    Some(name.to_owned())
428                }
429                None => {
430                    let prefix = PATH_COLUMN;
431                    let mut idx = 0;
432                    let mut name = prefix.to_owned();
433
434                    while column_names.contains(&name) {
435                        idx += 1;
436                        name = format!("{prefix}_{idx}");
437                    }
438
439                    Some(name)
440                }
441            }
442        } else {
443            None
444        };
445
446        Ok(DeltaScanConfig {
447            file_column_name,
448            wrap_partition_values: self.wrap_partition_values.unwrap_or(true),
449            enable_parquet_pushdown: self.enable_parquet_pushdown,
450            schema: self.schema.clone(),
451        })
452    }
453}
454
455#[derive(Debug, Clone, Default, Serialize, Deserialize)]
456/// Include additional metadata columns during a [`DeltaScan`]
457pub struct DeltaScanConfig {
458    /// Include the source path for each record
459    pub file_column_name: Option<String>,
460    /// Wrap partition values in a dictionary encoding
461    pub wrap_partition_values: bool,
462    /// Allow pushdown of the scan filter
463    pub enable_parquet_pushdown: bool,
464    /// Schema to read as
465    pub schema: Option<SchemaRef>,
466}
467
468pub(crate) struct DeltaScanBuilder<'a> {
469    snapshot: &'a DeltaTableState,
470    log_store: LogStoreRef,
471    filter: Option<Expr>,
472    session: &'a dyn Session,
473    projection: Option<&'a Vec<usize>>,
474    limit: Option<usize>,
475    files: Option<&'a [Add]>,
476    config: Option<DeltaScanConfig>,
477}
478
479impl<'a> DeltaScanBuilder<'a> {
480    pub fn new(
481        snapshot: &'a DeltaTableState,
482        log_store: LogStoreRef,
483        session: &'a dyn Session,
484    ) -> Self {
485        DeltaScanBuilder {
486            snapshot,
487            log_store,
488            filter: None,
489            session,
490            projection: None,
491            limit: None,
492            files: None,
493            config: None,
494        }
495    }
496
497    pub fn with_filter(mut self, filter: Option<Expr>) -> Self {
498        self.filter = filter;
499        self
500    }
501
502    pub fn with_files(mut self, files: &'a [Add]) -> Self {
503        self.files = Some(files);
504        self
505    }
506
507    pub fn with_projection(mut self, projection: Option<&'a Vec<usize>>) -> Self {
508        self.projection = projection;
509        self
510    }
511
512    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
513        self.limit = limit;
514        self
515    }
516
517    pub fn with_scan_config(mut self, config: DeltaScanConfig) -> Self {
518        self.config = Some(config);
519        self
520    }
521
522    pub async fn build(self) -> DeltaResult<DeltaScan> {
523        let config = match self.config {
524            Some(config) => config,
525            None => DeltaScanConfigBuilder::new().build(self.snapshot)?,
526        };
527
528        let schema = match config.schema.clone() {
529            Some(value) => Ok(value),
530            None => self.snapshot.arrow_schema(),
531        }?;
532
533        let logical_schema = df_logical_schema(
534            self.snapshot,
535            &config.file_column_name,
536            Some(schema.clone()),
537        )?;
538
539        let logical_schema = if let Some(used_columns) = self.projection {
540            let mut fields = vec![];
541            for idx in used_columns {
542                fields.push(logical_schema.field(*idx).to_owned());
543            }
544            Arc::new(ArrowSchema::new(fields))
545        } else {
546            logical_schema
547        };
548
549        let context = SessionContext::new();
550        let df_schema = logical_schema.clone().to_dfschema()?;
551
552        let logical_filter = self.filter.map(|expr| {
553            // Simplify the expression first
554            let props = ExecutionProps::new();
555            let simplify_context =
556                SimplifyContext::new(&props).with_schema(df_schema.clone().into());
557            let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10);
558            let simplified = simplifier.simplify(expr).unwrap();
559
560            context
561                .create_physical_expr(simplified, &df_schema)
562                .unwrap()
563        });
564
565        // Perform Pruning of files to scan
566        let (files, files_scanned, files_pruned) = match self.files {
567            Some(files) => {
568                let files = files.to_owned();
569                let files_scanned = files.len();
570                (files, files_scanned, 0)
571            }
572            None => {
573                if let Some(predicate) = &logical_filter {
574                    let pruning_predicate =
575                        PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
576                    let files_to_prune = pruning_predicate.prune(self.snapshot)?;
577                    let mut files_pruned = 0usize;
578                    let files = self
579                        .snapshot
580                        .file_actions_iter()?
581                        .zip(files_to_prune.into_iter())
582                        .filter_map(|(action, keep)| {
583                            if keep {
584                                Some(action.to_owned())
585                            } else {
586                                files_pruned += 1;
587                                None
588                            }
589                        })
590                        .collect::<Vec<_>>();
591
592                    let files_scanned = files.len();
593                    (files, files_scanned, files_pruned)
594                } else {
595                    let files = self.snapshot.file_actions()?;
596                    let files_scanned = files.len();
597                    (files, files_scanned, 0)
598                }
599            }
600        };
601
602        // TODO we group files together by their partition values. If the table is partitioned
603        // and partitions are somewhat evenly distributed, probably not the worst choice ...
604        // However we may want to do some additional balancing in case we are far off from the above.
605        let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();
606
607        let table_partition_cols = &self.snapshot.metadata().partition_columns;
608
609        for action in files.iter() {
610            let mut part = partitioned_file_from_action(action, table_partition_cols, &schema);
611
612            if config.file_column_name.is_some() {
613                let partition_value = if config.wrap_partition_values {
614                    wrap_partition_value_in_dict(ScalarValue::Utf8(Some(action.path.clone())))
615                } else {
616                    ScalarValue::Utf8(Some(action.path.clone()))
617                };
618                part.partition_values.push(partition_value);
619            }
620
621            file_groups
622                .entry(part.partition_values.clone())
623                .or_default()
624                .push(part);
625        }
626
627        let file_schema = Arc::new(ArrowSchema::new(
628            schema
629                .fields()
630                .iter()
631                .filter(|f| !table_partition_cols.contains(f.name()))
632                .cloned()
633                .collect::<Vec<arrow::datatypes::FieldRef>>(),
634        ));
635
636        let mut table_partition_cols = table_partition_cols
637            .iter()
638            .map(|name| schema.field_with_name(name).map(|f| f.to_owned()))
639            .collect::<Result<Vec<_>, ArrowError>>()?;
640
641        if let Some(file_column_name) = &config.file_column_name {
642            let field_name_datatype = if config.wrap_partition_values {
643                wrap_partition_type_in_dict(ArrowDataType::Utf8)
644            } else {
645                ArrowDataType::Utf8
646            };
647            table_partition_cols.push(Field::new(
648                file_column_name.clone(),
649                field_name_datatype,
650                false,
651            ));
652        }
653
654        let stats = self
655            .snapshot
656            .datafusion_table_statistics()
657            .unwrap_or(Statistics::new_unknown(&schema));
658
659        let parquet_options = TableParquetOptions {
660            global: self.session.config().options().execution.parquet.clone(),
661            ..Default::default()
662        };
663
664        let mut file_source = ParquetSource::new(parquet_options)
665            .with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}));
666
667        // Sometimes (i.e Merge) we want to prune files that don't make the
668        // filter and read the entire contents for files that do match the
669        // filter
670        if let Some(predicate) = logical_filter {
671            if config.enable_parquet_pushdown {
672                file_source = file_source.with_predicate(Arc::clone(&file_schema), predicate);
673            }
674        };
675
676        let file_scan_config = FileScanConfigBuilder::new(
677            self.log_store.object_store_url(),
678            file_schema,
679            Arc::new(file_source),
680        )
681        .with_file_groups(
682            // If all files were filtered out, we still need to emit at least one partition to
683            // pass datafusion sanity checks.
684            //
685            // See https://github.com/apache/datafusion/issues/11322
686            if file_groups.is_empty() {
687                vec![FileGroup::from(vec![])]
688            } else {
689                file_groups.into_values().map(FileGroup::from).collect()
690            },
691        )
692        .with_statistics(stats)
693        .with_projection(self.projection.cloned())
694        .with_limit(self.limit)
695        .with_table_partition_cols(table_partition_cols)
696        .build();
697
698        let metrics = ExecutionPlanMetricsSet::new();
699        MetricBuilder::new(&metrics)
700            .global_counter("files_scanned")
701            .add(files_scanned);
702        MetricBuilder::new(&metrics)
703            .global_counter("files_pruned")
704            .add(files_pruned);
705
706        Ok(DeltaScan {
707            table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(),
708            parquet_scan: DataSourceExec::from_data_source(file_scan_config),
709            config,
710            logical_schema,
711            metrics,
712        })
713    }
714}
715
716// TODO: implement this for Snapshot, not for DeltaTable
717#[async_trait]
718impl TableProvider for DeltaTable {
719    fn as_any(&self) -> &dyn Any {
720        self
721    }
722
723    fn schema(&self) -> Arc<ArrowSchema> {
724        self.snapshot().unwrap().arrow_schema().unwrap()
725    }
726
727    fn table_type(&self) -> TableType {
728        TableType::Base
729    }
730
731    fn get_table_definition(&self) -> Option<&str> {
732        None
733    }
734
735    fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
736        None
737    }
738
739    async fn scan(
740        &self,
741        session: &dyn Session,
742        projection: Option<&Vec<usize>>,
743        filters: &[Expr],
744        limit: Option<usize>,
745    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
746        register_store(self.log_store(), session.runtime_env().clone());
747        let filter_expr = conjunction(filters.iter().cloned());
748
749        let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session)
750            .with_projection(projection)
751            .with_limit(limit)
752            .with_filter(filter_expr)
753            .build()
754            .await?;
755
756        Ok(Arc::new(scan))
757    }
758
759    fn supports_filters_pushdown(
760        &self,
761        filter: &[&Expr],
762    ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
763        Ok(filter
764            .iter()
765            .map(|_| TableProviderFilterPushDown::Inexact)
766            .collect())
767    }
768
769    fn statistics(&self) -> Option<Statistics> {
770        self.snapshot().ok()?.datafusion_table_statistics()
771    }
772}
773
774/// A Delta table provider that enables additional metadata columns to be included during the scan
775#[derive(Debug)]
776pub struct DeltaTableProvider {
777    snapshot: DeltaTableState,
778    log_store: LogStoreRef,
779    config: DeltaScanConfig,
780    schema: Arc<ArrowSchema>,
781    files: Option<Vec<Add>>,
782}
783
784impl DeltaTableProvider {
785    /// Build a DeltaTableProvider
786    pub fn try_new(
787        snapshot: DeltaTableState,
788        log_store: LogStoreRef,
789        config: DeltaScanConfig,
790    ) -> DeltaResult<Self> {
791        Ok(DeltaTableProvider {
792            schema: df_logical_schema(&snapshot, &config.file_column_name, config.schema.clone())?,
793            snapshot,
794            log_store,
795            config,
796            files: None,
797        })
798    }
799
800    /// Define which files to consider while building a scan, for advanced usecases
801    pub fn with_files(mut self, files: Vec<Add>) -> DeltaTableProvider {
802        self.files = Some(files);
803        self
804    }
805}
806
807#[async_trait]
808impl TableProvider for DeltaTableProvider {
809    fn as_any(&self) -> &dyn Any {
810        self
811    }
812
813    fn schema(&self) -> Arc<ArrowSchema> {
814        self.schema.clone()
815    }
816
817    fn table_type(&self) -> TableType {
818        TableType::Base
819    }
820
821    fn get_table_definition(&self) -> Option<&str> {
822        None
823    }
824
825    fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
826        None
827    }
828
829    async fn scan(
830        &self,
831        session: &dyn Session,
832        projection: Option<&Vec<usize>>,
833        filters: &[Expr],
834        limit: Option<usize>,
835    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
836        register_store(self.log_store.clone(), session.runtime_env().clone());
837        let filter_expr = conjunction(filters.iter().cloned());
838
839        let mut scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
840            .with_projection(projection)
841            .with_limit(limit)
842            .with_filter(filter_expr)
843            .with_scan_config(self.config.clone());
844
845        if let Some(files) = &self.files {
846            scan = scan.with_files(files);
847        }
848        Ok(Arc::new(scan.build().await?))
849    }
850
851    fn supports_filters_pushdown(
852        &self,
853        filter: &[&Expr],
854    ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
855        Ok(filter
856            .iter()
857            .map(|_| TableProviderFilterPushDown::Inexact)
858            .collect())
859    }
860
861    fn statistics(&self) -> Option<Statistics> {
862        self.snapshot.datafusion_table_statistics()
863    }
864}
865
866#[derive(Debug)]
867pub struct LazyTableProvider {
868    schema: Arc<ArrowSchema>,
869    batches: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
870}
871
872impl LazyTableProvider {
873    /// Build a DeltaTableProvider
874    pub fn try_new(
875        schema: Arc<ArrowSchema>,
876        batches: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
877    ) -> DeltaResult<Self> {
878        Ok(LazyTableProvider { schema, batches })
879    }
880}
881
882#[async_trait]
883impl TableProvider for LazyTableProvider {
884    fn as_any(&self) -> &dyn Any {
885        self
886    }
887
888    fn schema(&self) -> Arc<ArrowSchema> {
889        self.schema.clone()
890    }
891
892    fn table_type(&self) -> TableType {
893        TableType::Base
894    }
895
896    fn get_table_definition(&self) -> Option<&str> {
897        None
898    }
899
900    fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
901        None
902    }
903
904    async fn scan(
905        &self,
906        _session: &dyn Session,
907        projection: Option<&Vec<usize>>,
908        filters: &[Expr],
909        limit: Option<usize>,
910    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
911        let mut plan: Arc<dyn ExecutionPlan> = Arc::new(LazyMemoryExec::try_new(
912            self.schema(),
913            self.batches.clone(),
914        )?);
915
916        let df_schema: DFSchema = plan.schema().try_into()?;
917
918        if let Some(filter_expr) = conjunction(filters.iter().cloned()) {
919            let physical_expr =
920                create_physical_expr(&filter_expr, &df_schema, &ExecutionProps::new())?;
921            plan = Arc::new(FilterExec::try_new(physical_expr, plan)?);
922        }
923
924        if let Some(projection) = projection {
925            let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();
926            if projection != &current_projection {
927                let execution_props = &ExecutionProps::new();
928                let fields: DeltaResult<Vec<(Arc<dyn PhysicalExpr>, String)>> = projection
929                    .iter()
930                    .map(|i| {
931                        let (table_ref, field) = df_schema.qualified_field(*i);
932                        create_physical_expr(
933                            &Expr::Column(Column::from((table_ref, field))),
934                            &df_schema,
935                            execution_props,
936                        )
937                        .map(|expr| (expr, field.name().clone()))
938                        .map_err(DeltaTableError::from)
939                    })
940                    .collect();
941                plan = Arc::new(ProjectionExec::try_new(fields?, plan)?);
942            }
943        }
944
945        if let Some(limit) = limit {
946            plan = Arc::new(GlobalLimitExec::new(plan, 0, Some(limit)))
947        };
948
949        Ok(plan)
950    }
951
952    fn supports_filters_pushdown(
953        &self,
954        filter: &[&Expr],
955    ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
956        Ok(filter
957            .iter()
958            .map(|_| TableProviderFilterPushDown::Inexact)
959            .collect())
960    }
961
962    fn statistics(&self) -> Option<Statistics> {
963        None
964    }
965}
966
967// TODO: this will likely also need to perform column mapping later when we support reader protocol v2
968/// A wrapper for parquet scans
969#[derive(Debug)]
970pub struct DeltaScan {
971    /// The URL of the ObjectStore root
972    pub table_uri: String,
973    /// Column that contains an index that maps to the original metadata Add
974    pub config: DeltaScanConfig,
975    /// The parquet scan to wrap
976    pub parquet_scan: Arc<dyn ExecutionPlan>,
977    /// The schema of the table to be used when evaluating expressions
978    pub logical_schema: Arc<ArrowSchema>,
979    /// Metrics for scan reported via DataFusion
980    metrics: ExecutionPlanMetricsSet,
981}
982
983#[derive(Debug, Serialize, Deserialize)]
984struct DeltaScanWire {
985    pub table_uri: String,
986    pub config: DeltaScanConfig,
987    pub logical_schema: Arc<ArrowSchema>,
988}
989
990impl DisplayAs for DeltaScan {
991    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
992        write!(f, "DeltaScan")
993    }
994}
995
996impl ExecutionPlan for DeltaScan {
997    fn name(&self) -> &str {
998        Self::static_name()
999    }
1000
1001    fn as_any(&self) -> &dyn Any {
1002        self
1003    }
1004
1005    fn schema(&self) -> SchemaRef {
1006        self.parquet_scan.schema()
1007    }
1008
1009    fn properties(&self) -> &PlanProperties {
1010        self.parquet_scan.properties()
1011    }
1012
1013    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1014        vec![&self.parquet_scan]
1015    }
1016
1017    fn with_new_children(
1018        self: Arc<Self>,
1019        children: Vec<Arc<dyn ExecutionPlan>>,
1020    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
1021        if children.len() != 1 {
1022            return Err(DataFusionError::Plan(format!(
1023                "DeltaScan wrong number of children {}",
1024                children.len()
1025            )));
1026        }
1027        Ok(Arc::new(DeltaScan {
1028            table_uri: self.table_uri.clone(),
1029            config: self.config.clone(),
1030            parquet_scan: children[0].clone(),
1031            logical_schema: self.logical_schema.clone(),
1032            metrics: self.metrics.clone(),
1033        }))
1034    }
1035
1036    fn execute(
1037        &self,
1038        partition: usize,
1039        context: Arc<TaskContext>,
1040    ) -> DataFusionResult<SendableRecordBatchStream> {
1041        self.parquet_scan.execute(partition, context)
1042    }
1043
1044    fn metrics(&self) -> Option<MetricsSet> {
1045        Some(self.metrics.clone_inner())
1046    }
1047
1048    fn statistics(&self) -> DataFusionResult<Statistics> {
1049        self.parquet_scan.statistics()
1050    }
1051
1052    fn repartitioned(
1053        &self,
1054        target_partitions: usize,
1055        config: &ConfigOptions,
1056    ) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> {
1057        if let Some(parquet_scan) = self.parquet_scan.repartitioned(target_partitions, config)? {
1058            Ok(Some(Arc::new(DeltaScan {
1059                table_uri: self.table_uri.clone(),
1060                config: self.config.clone(),
1061                parquet_scan,
1062                logical_schema: self.logical_schema.clone(),
1063                metrics: self.metrics.clone(),
1064            })))
1065        } else {
1066            Ok(None)
1067        }
1068    }
1069}
1070
1071pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
1072    match t {
1073        ArrowDataType::Null => Ok(ScalarValue::Null),
1074        ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
1075        ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
1076        ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
1077        ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
1078        ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
1079        ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
1080        ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
1081        ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
1082        ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
1083        ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
1084        ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
1085        ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
1086        ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
1087        ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
1088        ArrowDataType::FixedSizeBinary(size) => {
1089            Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
1090        }
1091        ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
1092        ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
1093        ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
1094        ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
1095            None,
1096            precision.to_owned(),
1097            scale.to_owned(),
1098        )),
1099        ArrowDataType::Timestamp(unit, tz) => {
1100            let tz = tz.to_owned();
1101            Ok(match unit {
1102                TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
1103                TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
1104                TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
1105                TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
1106            })
1107        }
1108        ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
1109            k.clone(),
1110            Box::new(get_null_of_arrow_type(v).unwrap()),
1111        )),
1112        //Unsupported types...
1113        ArrowDataType::Float16
1114        | ArrowDataType::Decimal256(_, _)
1115        | ArrowDataType::Union(_, _)
1116        | ArrowDataType::LargeList(_)
1117        | ArrowDataType::Struct(_)
1118        | ArrowDataType::List(_)
1119        | ArrowDataType::FixedSizeList(_, _)
1120        | ArrowDataType::Time32(_)
1121        | ArrowDataType::Time64(_)
1122        | ArrowDataType::Duration(_)
1123        | ArrowDataType::Interval(_)
1124        | ArrowDataType::RunEndEncoded(_, _)
1125        | ArrowDataType::BinaryView
1126        | ArrowDataType::Utf8View
1127        | ArrowDataType::LargeListView(_)
1128        | ArrowDataType::ListView(_)
1129        | ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
1130            "Unsupported data type for Delta Lake {t}"
1131        ))),
1132    }
1133}
1134
1135fn partitioned_file_from_action(
1136    action: &Add,
1137    partition_columns: &[String],
1138    schema: &ArrowSchema,
1139) -> PartitionedFile {
1140    let partition_values = partition_columns
1141        .iter()
1142        .map(|part| {
1143            action
1144                .partition_values
1145                .get(part)
1146                .map(|val| {
1147                    schema
1148                        .field_with_name(part)
1149                        .map(|field| match val {
1150                            Some(value) => to_correct_scalar_value(
1151                                &serde_json::Value::String(value.to_string()),
1152                                field.data_type(),
1153                            )
1154                            .unwrap_or(Some(ScalarValue::Null))
1155                            .unwrap_or(ScalarValue::Null),
1156                            None => get_null_of_arrow_type(field.data_type())
1157                                .unwrap_or(ScalarValue::Null),
1158                        })
1159                        .unwrap_or(ScalarValue::Null)
1160                })
1161                .unwrap_or(ScalarValue::Null)
1162        })
1163        .collect::<Vec<_>>();
1164
1165    let ts_secs = action.modification_time / 1000;
1166    let ts_ns = (action.modification_time % 1000) * 1_000_000;
1167    let last_modified = Utc.from_utc_datetime(
1168        &DateTime::from_timestamp(ts_secs, ts_ns as u32)
1169            .unwrap()
1170            .naive_utc(),
1171    );
1172    PartitionedFile {
1173        object_meta: ObjectMeta {
1174            last_modified,
1175            ..action.try_into().unwrap()
1176        },
1177        partition_values,
1178        range: None,
1179        extensions: None,
1180        statistics: None,
1181        metadata_size_hint: None,
1182    }
1183}
1184
1185fn parse_date(
1186    stat_val: &serde_json::Value,
1187    field_dt: &ArrowDataType,
1188) -> DataFusionResult<ScalarValue> {
1189    let string = match stat_val {
1190        serde_json::Value::String(s) => s.to_owned(),
1191        _ => stat_val.to_string(),
1192    };
1193
1194    let time_micro = ScalarValue::try_from_string(string, &ArrowDataType::Date32)?;
1195    let cast_arr = cast_with_options(
1196        &time_micro.to_array()?,
1197        field_dt,
1198        &CastOptions {
1199            safe: false,
1200            ..Default::default()
1201        },
1202    )?;
1203    ScalarValue::try_from_array(&cast_arr, 0)
1204}
1205
1206fn parse_timestamp(
1207    stat_val: &serde_json::Value,
1208    field_dt: &ArrowDataType,
1209) -> DataFusionResult<ScalarValue> {
1210    let string = match stat_val {
1211        serde_json::Value::String(s) => s.to_owned(),
1212        _ => stat_val.to_string(),
1213    };
1214
1215    let time_micro = ScalarValue::try_from_string(
1216        string,
1217        &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
1218    )?;
1219    let cast_arr = cast_with_options(
1220        &time_micro.to_array()?,
1221        field_dt,
1222        &CastOptions {
1223            safe: false,
1224            ..Default::default()
1225        },
1226    )?;
1227    ScalarValue::try_from_array(&cast_arr, 0)
1228}
1229
1230pub(crate) fn to_correct_scalar_value(
1231    stat_val: &serde_json::Value,
1232    field_dt: &ArrowDataType,
1233) -> DataFusionResult<Option<ScalarValue>> {
1234    match stat_val {
1235        serde_json::Value::Array(_) => Ok(None),
1236        serde_json::Value::Object(_) => Ok(None),
1237        serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)),
1238        serde_json::Value::String(string_val) => match field_dt {
1239            ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
1240            ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
1241            _ => Ok(Some(ScalarValue::try_from_string(
1242                string_val.to_owned(),
1243                field_dt,
1244            )?)),
1245        },
1246        other => match field_dt {
1247            ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
1248            ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
1249            _ => Ok(Some(ScalarValue::try_from_string(
1250                other.to_string(),
1251                field_dt,
1252            )?)),
1253        },
1254    }
1255}
1256
1257pub(crate) async fn execute_plan_to_batch(
1258    state: &SessionState,
1259    plan: Arc<dyn ExecutionPlan>,
1260) -> DeltaResult<arrow::record_batch::RecordBatch> {
1261    let data = futures::future::try_join_all(
1262        (0..plan.properties().output_partitioning().partition_count()).map(|p| {
1263            let plan_copy = plan.clone();
1264            let task_context = state.task_ctx().clone();
1265            async move {
1266                let batch_stream = plan_copy.execute(p, task_context)?;
1267
1268                let schema = batch_stream.schema();
1269
1270                let batches = batch_stream.try_collect::<Vec<_>>().await?;
1271
1272                DataFusionResult::<_>::Ok(concat_batches(&schema, batches.iter())?)
1273            }
1274        }),
1275    )
1276    .await?;
1277
1278    Ok(concat_batches(&plan.schema(), data.iter())?)
1279}
1280
1281/// Responsible for checking batches of data conform to table's invariants, constraints and nullability.
1282#[derive(Clone, Default)]
1283pub struct DeltaDataChecker {
1284    constraints: Vec<Constraint>,
1285    invariants: Vec<Invariant>,
1286    generated_columns: Vec<GeneratedColumn>,
1287    non_nullable_columns: Vec<String>,
1288    ctx: SessionContext,
1289}
1290
1291impl DeltaDataChecker {
1292    /// Create a new DeltaDataChecker with no invariants or constraints
1293    pub fn empty() -> Self {
1294        Self {
1295            invariants: vec![],
1296            constraints: vec![],
1297            generated_columns: vec![],
1298            non_nullable_columns: vec![],
1299            ctx: DeltaSessionContext::default().into(),
1300        }
1301    }
1302
1303    /// Create a new DeltaDataChecker with a specified set of invariants
1304    pub fn new_with_invariants(invariants: Vec<Invariant>) -> Self {
1305        Self {
1306            invariants,
1307            constraints: vec![],
1308            generated_columns: vec![],
1309            non_nullable_columns: vec![],
1310            ctx: DeltaSessionContext::default().into(),
1311        }
1312    }
1313
1314    /// Create a new DeltaDataChecker with a specified set of constraints
1315    pub fn new_with_constraints(constraints: Vec<Constraint>) -> Self {
1316        Self {
1317            constraints,
1318            invariants: vec![],
1319            generated_columns: vec![],
1320            non_nullable_columns: vec![],
1321            ctx: DeltaSessionContext::default().into(),
1322        }
1323    }
1324
1325    /// Create a new DeltaDataChecker with a specified set of generated columns
1326    pub fn new_with_generated_columns(generated_columns: Vec<GeneratedColumn>) -> Self {
1327        Self {
1328            constraints: vec![],
1329            invariants: vec![],
1330            generated_columns,
1331            non_nullable_columns: vec![],
1332            ctx: DeltaSessionContext::default().into(),
1333        }
1334    }
1335
1336    /// Specify the Datafusion context
1337    pub fn with_session_context(mut self, context: SessionContext) -> Self {
1338        self.ctx = context;
1339        self
1340    }
1341
1342    /// Add the specified set of constraints to the current DeltaDataChecker's constraints
1343    pub fn with_extra_constraints(mut self, constraints: Vec<Constraint>) -> Self {
1344        self.constraints.extend(constraints);
1345        self
1346    }
1347
1348    /// Create a new DeltaDataChecker
1349    pub fn new(snapshot: &DeltaTableState) -> Self {
1350        let invariants = snapshot.schema().get_invariants().unwrap_or_default();
1351        let generated_columns = snapshot
1352            .schema()
1353            .get_generated_columns()
1354            .unwrap_or_default();
1355        let constraints = snapshot.table_config().get_constraints();
1356        let non_nullable_columns = snapshot
1357            .schema()
1358            .fields()
1359            .filter_map(|f| {
1360                if !f.is_nullable() {
1361                    Some(f.name().clone())
1362                } else {
1363                    None
1364                }
1365            })
1366            .collect_vec();
1367        Self {
1368            invariants,
1369            constraints,
1370            generated_columns,
1371            non_nullable_columns,
1372            ctx: DeltaSessionContext::default().into(),
1373        }
1374    }
1375
1376    /// Check that a record batch conforms to table's invariants.
1377    ///
1378    /// If it does not, it will return [DeltaTableError::InvalidData] with a list
1379    /// of values that violated each invariant.
1380    pub async fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
1381        self.check_nullability(record_batch)?;
1382        self.enforce_checks(record_batch, &self.invariants).await?;
1383        self.enforce_checks(record_batch, &self.constraints).await?;
1384        self.enforce_checks(record_batch, &self.generated_columns)
1385            .await
1386    }
1387
1388    /// Return true if all the nullability checks are valid
1389    fn check_nullability(&self, record_batch: &RecordBatch) -> Result<bool, DeltaTableError> {
1390        let mut violations = Vec::new();
1391        for col in self.non_nullable_columns.iter() {
1392            if let Some(arr) = record_batch.column_by_name(col) {
1393                if arr.null_count() > 0 {
1394                    violations.push(format!(
1395                        "Non-nullable column violation for {col}, found {} null values",
1396                        arr.null_count()
1397                    ));
1398                }
1399            } else {
1400                violations.push(format!(
1401                    "Non-nullable column violation for {col}, not found in batch!"
1402                ));
1403            }
1404        }
1405        if !violations.is_empty() {
1406            Err(DeltaTableError::InvalidData { violations })
1407        } else {
1408            Ok(true)
1409        }
1410    }
1411
1412    async fn enforce_checks<C: DataCheck>(
1413        &self,
1414        record_batch: &RecordBatch,
1415        checks: &[C],
1416    ) -> Result<(), DeltaTableError> {
1417        if checks.is_empty() {
1418            return Ok(());
1419        }
1420        let table = MemTable::try_new(record_batch.schema(), vec![vec![record_batch.clone()]])?;
1421        let schema = table.schema();
1422        // Use a random table name to avoid clashes when running multiple parallel tasks, e.g. when using a partitioned table
1423        let table_name: String = uuid::Uuid::new_v4().to_string();
1424        self.ctx.register_table(&table_name, Arc::new(table))?;
1425
1426        let mut violations: Vec<String> = Vec::new();
1427
1428        for check in checks {
1429            let check_name = check.get_name();
1430            if check_name.contains('.') {
1431                return Err(DeltaTableError::Generic(
1432                    "Support for nested columns is not supported.".to_string(),
1433                ));
1434            }
1435
1436            let field_to_select = if check.as_any().is::<Constraint>() {
1437                "*"
1438            } else {
1439                check_name
1440            };
1441
1442            // Loop through schema to find the matching field. If the field has a whitespace, we
1443            // need to backtick it, since the expression is an unquoted string
1444            let mut expression = check.get_expression().to_string();
1445            for field in schema.fields() {
1446                if expression.contains(field.name()) {
1447                    expression =
1448                        expression.replace(field.name(), format!("`{}` ", field.name()).as_str());
1449                    break;
1450                }
1451            }
1452            let sql = format!(
1453                "SELECT {} FROM `{table_name}` WHERE NOT ({}) LIMIT 1",
1454                field_to_select, expression
1455            );
1456
1457            let dfs: Vec<RecordBatch> = self.ctx.sql(&sql).await?.collect().await?;
1458            if !dfs.is_empty() && dfs[0].num_rows() > 0 {
1459                let value: String = dfs[0]
1460                    .columns()
1461                    .iter()
1462                    .map(|c| array_value_to_string(c, 0).unwrap_or(String::from("null")))
1463                    .join(", ");
1464
1465                let msg = format!(
1466                    "Check or Invariant ({}) violated by value in row: [{value}]",
1467                    check.get_expression(),
1468                );
1469                violations.push(msg);
1470            }
1471        }
1472
1473        self.ctx.deregister_table(&table_name)?;
1474        if !violations.is_empty() {
1475            Err(DeltaTableError::InvalidData { violations })
1476        } else {
1477            Ok(())
1478        }
1479    }
1480}
1481
1482/// A codec for deltalake physical plans
1483#[derive(Debug)]
1484pub struct DeltaPhysicalCodec {}
1485
1486impl PhysicalExtensionCodec for DeltaPhysicalCodec {
1487    fn try_decode(
1488        &self,
1489        buf: &[u8],
1490        inputs: &[Arc<dyn ExecutionPlan>],
1491        _registry: &dyn FunctionRegistry,
1492    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
1493        let wire: DeltaScanWire = serde_json::from_reader(buf)
1494            .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
1495        let delta_scan = DeltaScan {
1496            table_uri: wire.table_uri,
1497            parquet_scan: (*inputs)[0].clone(),
1498            config: wire.config,
1499            logical_schema: wire.logical_schema,
1500            metrics: ExecutionPlanMetricsSet::new(),
1501        };
1502        Ok(Arc::new(delta_scan))
1503    }
1504
1505    fn try_encode(
1506        &self,
1507        node: Arc<dyn ExecutionPlan>,
1508        buf: &mut Vec<u8>,
1509    ) -> Result<(), DataFusionError> {
1510        let delta_scan = node
1511            .as_any()
1512            .downcast_ref::<DeltaScan>()
1513            .ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?;
1514
1515        let wire = DeltaScanWire {
1516            table_uri: delta_scan.table_uri.to_owned(),
1517            config: delta_scan.config.clone(),
1518            logical_schema: delta_scan.logical_schema.clone(),
1519        };
1520        serde_json::to_writer(buf, &wire)
1521            .map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?;
1522        Ok(())
1523    }
1524}
1525
1526/// Does serde on DeltaTables
1527#[derive(Debug)]
1528pub struct DeltaLogicalCodec {}
1529
1530impl LogicalExtensionCodec for DeltaLogicalCodec {
1531    fn try_decode(
1532        &self,
1533        _buf: &[u8],
1534        _inputs: &[LogicalPlan],
1535        _ctx: &SessionContext,
1536    ) -> Result<Extension, DataFusionError> {
1537        todo!("DeltaLogicalCodec")
1538    }
1539
1540    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<(), DataFusionError> {
1541        todo!("DeltaLogicalCodec")
1542    }
1543
1544    fn try_decode_table_provider(
1545        &self,
1546        buf: &[u8],
1547        _table_ref: &TableReference,
1548        _schema: SchemaRef,
1549        _ctx: &SessionContext,
1550    ) -> Result<Arc<dyn TableProvider>, DataFusionError> {
1551        let provider: DeltaTable = serde_json::from_slice(buf)
1552            .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))?;
1553        Ok(Arc::new(provider))
1554    }
1555
1556    fn try_encode_table_provider(
1557        &self,
1558        _table_ref: &TableReference,
1559        node: Arc<dyn TableProvider>,
1560        buf: &mut Vec<u8>,
1561    ) -> Result<(), DataFusionError> {
1562        let table = node
1563            .as_ref()
1564            .as_any()
1565            .downcast_ref::<DeltaTable>()
1566            .ok_or_else(|| {
1567                DataFusionError::Internal("Can't encode non-delta tables".to_string())
1568            })?;
1569        serde_json::to_writer(buf, table)
1570            .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
1571    }
1572}
1573
1574/// Responsible for creating deltatables
1575#[derive(Debug)]
1576pub struct DeltaTableFactory {}
1577
1578#[async_trait]
1579impl TableProviderFactory for DeltaTableFactory {
1580    async fn create(
1581        &self,
1582        _ctx: &dyn Session,
1583        cmd: &CreateExternalTable,
1584    ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
1585        let provider = if cmd.options.is_empty() {
1586            open_table(cmd.to_owned().location).await?
1587        } else {
1588            open_table_with_storage_options(cmd.to_owned().location, cmd.to_owned().options).await?
1589        };
1590        Ok(Arc::new(provider))
1591    }
1592}
1593
1594pub(crate) struct FindFilesExprProperties {
1595    pub partition_columns: Vec<String>,
1596
1597    pub partition_only: bool,
1598    pub result: DeltaResult<()>,
1599}
1600
1601/// Ensure only expressions that make sense are accepted, check for
1602/// non-deterministic functions, and determine if the expression only contains
1603/// partition columns
1604impl TreeNodeVisitor<'_> for FindFilesExprProperties {
1605    type Node = Expr;
1606
1607    fn f_down(&mut self, expr: &Self::Node) -> datafusion_common::Result<TreeNodeRecursion> {
1608        // TODO: We can likely relax the volatility to STABLE. Would require further
1609        // research to confirm the same value is generated during the scan and
1610        // rewrite phases.
1611
1612        match expr {
1613            Expr::Column(c) => {
1614                if !self.partition_columns.contains(&c.name) {
1615                    self.partition_only = false;
1616                }
1617            }
1618            Expr::ScalarVariable(_, _)
1619            | Expr::Literal(_)
1620            | Expr::Alias(_)
1621            | Expr::BinaryExpr(_)
1622            | Expr::Like(_)
1623            | Expr::SimilarTo(_)
1624            | Expr::Not(_)
1625            | Expr::IsNotNull(_)
1626            | Expr::IsNull(_)
1627            | Expr::IsTrue(_)
1628            | Expr::IsFalse(_)
1629            | Expr::IsUnknown(_)
1630            | Expr::IsNotTrue(_)
1631            | Expr::IsNotFalse(_)
1632            | Expr::IsNotUnknown(_)
1633            | Expr::Negative(_)
1634            | Expr::InList { .. }
1635            | Expr::Between(_)
1636            | Expr::Case(_)
1637            | Expr::Cast(_)
1638            | Expr::TryCast(_) => (),
1639            Expr::ScalarFunction(scalar_function) => {
1640                match scalar_function.func.signature().volatility {
1641                    Volatility::Immutable => (),
1642                    _ => {
1643                        self.result = Err(DeltaTableError::Generic(format!(
1644                            "Find files predicate contains nondeterministic function {}",
1645                            scalar_function.func.name()
1646                        )));
1647                        return Ok(TreeNodeRecursion::Stop);
1648                    }
1649                }
1650            }
1651            _ => {
1652                self.result = Err(DeltaTableError::Generic(format!(
1653                    "Find files predicate contains unsupported expression {expr}"
1654                )));
1655                return Ok(TreeNodeRecursion::Stop);
1656            }
1657        }
1658
1659        Ok(TreeNodeRecursion::Continue)
1660    }
1661}
1662
1663#[derive(Debug, Hash, Eq, PartialEq)]
1664/// Representing the result of the [find_files] function.
1665pub struct FindFiles {
1666    /// A list of `Add` objects that match the given predicate
1667    pub candidates: Vec<Add>,
1668    /// Was a physical read to the datastore required to determine the candidates
1669    pub partition_scan: bool,
1670}
1671
1672fn join_batches_with_add_actions(
1673    batches: Vec<RecordBatch>,
1674    mut actions: HashMap<String, Add>,
1675    path_column: &str,
1676    dict_array: bool,
1677) -> DeltaResult<Vec<Add>> {
1678    // Given RecordBatches that contains `__delta_rs_path` perform a hash join
1679    // with actions to obtain original add actions
1680
1681    let mut files = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum());
1682    for batch in batches {
1683        let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
1684
1685        let iter: Box<dyn Iterator<Item = Option<&str>>> = if dict_array {
1686            let array = get_path_column(&batch, path_column)?;
1687            Box::new(array.into_iter())
1688        } else {
1689            let array = batch
1690                .column_by_name(path_column)
1691                .ok_or_else(err)?
1692                .as_any()
1693                .downcast_ref::<StringArray>()
1694                .ok_or_else(err)?;
1695            Box::new(array.into_iter())
1696        };
1697
1698        for path in iter {
1699            let path = path.ok_or(DeltaTableError::Generic(format!(
1700                "{path_column} cannot be null"
1701            )))?;
1702
1703            match actions.remove(path) {
1704                Some(action) => files.push(action),
1705                None => {
1706                    return Err(DeltaTableError::Generic(
1707                        "Unable to map __delta_rs_path to action.".to_owned(),
1708                    ))
1709                }
1710            }
1711        }
1712    }
1713    Ok(files)
1714}
1715
1716/// Determine which files contain a record that satisfies the predicate
1717pub(crate) async fn find_files_scan(
1718    snapshot: &DeltaTableState,
1719    log_store: LogStoreRef,
1720    state: &SessionState,
1721    expression: Expr,
1722) -> DeltaResult<Vec<Add>> {
1723    let candidate_map: HashMap<String, Add> = snapshot
1724        .file_actions_iter()?
1725        .map(|add| (add.path.clone(), add.to_owned()))
1726        .collect();
1727
1728    let scan_config = DeltaScanConfigBuilder {
1729        include_file_column: true,
1730        ..Default::default()
1731    }
1732    .build(snapshot)?;
1733
1734    let logical_schema = df_logical_schema(snapshot, &scan_config.file_column_name, None)?;
1735
1736    // Identify which columns we need to project
1737    let mut used_columns = expression
1738        .column_refs()
1739        .into_iter()
1740        .map(|column| logical_schema.index_of(&column.name))
1741        .collect::<Result<Vec<usize>, ArrowError>>()?;
1742    // Add path column
1743    used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);
1744
1745    let scan = DeltaScanBuilder::new(snapshot, log_store, state)
1746        .with_filter(Some(expression.clone()))
1747        .with_projection(Some(&used_columns))
1748        .with_scan_config(scan_config)
1749        .build()
1750        .await?;
1751    let scan = Arc::new(scan);
1752
1753    let config = &scan.config;
1754    let input_schema = scan.logical_schema.as_ref().to_owned();
1755    let input_dfschema = input_schema.clone().try_into()?;
1756
1757    let predicate_expr =
1758        state.create_physical_expr(Expr::IsTrue(Box::new(expression.clone())), &input_dfschema)?;
1759
1760    let filter: Arc<dyn ExecutionPlan> =
1761        Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);
1762    let limit: Arc<dyn ExecutionPlan> = Arc::new(LocalLimitExec::new(filter, 1));
1763
1764    let task_ctx = Arc::new(TaskContext::from(state));
1765    let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?;
1766
1767    join_batches_with_add_actions(
1768        path_batches,
1769        candidate_map,
1770        config.file_column_name.as_ref().unwrap(),
1771        true,
1772    )
1773}
1774
1775pub(crate) async fn scan_memory_table(
1776    snapshot: &DeltaTableState,
1777    predicate: &Expr,
1778) -> DeltaResult<Vec<Add>> {
1779    let actions = snapshot.file_actions()?;
1780
1781    let batch = snapshot.add_actions_table(true)?;
1782    let mut arrays = Vec::new();
1783    let mut fields = Vec::new();
1784
1785    let schema = batch.schema();
1786
1787    arrays.push(
1788        batch
1789            .column_by_name("path")
1790            .ok_or(DeltaTableError::Generic(
1791                "Column with name `path` does not exist".to_owned(),
1792            ))?
1793            .to_owned(),
1794    );
1795    fields.push(Field::new(PATH_COLUMN, ArrowDataType::Utf8, false));
1796
1797    for field in schema.fields() {
1798        if field.name().starts_with("partition.") {
1799            let name = field.name().strip_prefix("partition.").unwrap();
1800
1801            arrays.push(batch.column_by_name(field.name()).unwrap().to_owned());
1802            fields.push(Field::new(
1803                name,
1804                field.data_type().to_owned(),
1805                field.is_nullable(),
1806            ));
1807        }
1808    }
1809
1810    let schema = Arc::new(ArrowSchema::new(fields));
1811    let batch = RecordBatch::try_new(schema, arrays)?;
1812    let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
1813
1814    let ctx = SessionContext::new();
1815    let mut df = ctx.read_table(Arc::new(mem_table))?;
1816    df = df
1817        .filter(predicate.to_owned())?
1818        .select(vec![col(PATH_COLUMN)])?;
1819    let batches = df.collect().await?;
1820
1821    let map = actions
1822        .into_iter()
1823        .map(|action| (action.path.clone(), action))
1824        .collect::<HashMap<String, Add>>();
1825
1826    join_batches_with_add_actions(batches, map, PATH_COLUMN, false)
1827}
1828
1829/// Finds files in a snapshot that match the provided predicate.
1830pub async fn find_files(
1831    snapshot: &DeltaTableState,
1832    log_store: LogStoreRef,
1833    state: &SessionState,
1834    predicate: Option<Expr>,
1835) -> DeltaResult<FindFiles> {
1836    let current_metadata = snapshot.metadata();
1837
1838    match &predicate {
1839        Some(predicate) => {
1840            // Validate the Predicate and determine if it only contains partition columns
1841            let mut expr_properties = FindFilesExprProperties {
1842                partition_only: true,
1843                partition_columns: current_metadata.partition_columns.clone(),
1844                result: Ok(()),
1845            };
1846
1847            TreeNode::visit(predicate, &mut expr_properties)?;
1848            expr_properties.result?;
1849
1850            if expr_properties.partition_only {
1851                let candidates = scan_memory_table(snapshot, predicate).await?;
1852                Ok(FindFiles {
1853                    candidates,
1854                    partition_scan: true,
1855                })
1856            } else {
1857                let candidates =
1858                    find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?;
1859
1860                Ok(FindFiles {
1861                    candidates,
1862                    partition_scan: false,
1863                })
1864            }
1865        }
1866        None => Ok(FindFiles {
1867            candidates: snapshot.file_actions()?,
1868            partition_scan: true,
1869        }),
1870    }
1871}
1872
1873/// A wrapper for sql_parser's ParserOptions to capture sane default table defaults
1874pub struct DeltaParserOptions {
1875    inner: ParserOptions,
1876}
1877
1878impl Default for DeltaParserOptions {
1879    fn default() -> Self {
1880        DeltaParserOptions {
1881            inner: ParserOptions {
1882                enable_ident_normalization: false,
1883                ..ParserOptions::default()
1884            },
1885        }
1886    }
1887}
1888
1889impl From<DeltaParserOptions> for ParserOptions {
1890    fn from(value: DeltaParserOptions) -> Self {
1891        value.inner
1892    }
1893}
1894
1895/// A wrapper for Deltafusion's SessionConfig to capture sane default table defaults
1896pub struct DeltaSessionConfig {
1897    inner: SessionConfig,
1898}
1899
1900impl Default for DeltaSessionConfig {
1901    fn default() -> Self {
1902        DeltaSessionConfig {
1903            inner: SessionConfig::default()
1904                .set_bool("datafusion.sql_parser.enable_ident_normalization", false),
1905        }
1906    }
1907}
1908
1909impl From<DeltaSessionConfig> for SessionConfig {
1910    fn from(value: DeltaSessionConfig) -> Self {
1911        value.inner
1912    }
1913}
1914
1915/// A wrapper for Deltafusion's SessionContext to capture sane default table defaults
1916pub struct DeltaSessionContext {
1917    inner: SessionContext,
1918}
1919
1920impl Default for DeltaSessionContext {
1921    fn default() -> Self {
1922        DeltaSessionContext {
1923            inner: SessionContext::new_with_config(DeltaSessionConfig::default().into()),
1924        }
1925    }
1926}
1927
1928impl From<DeltaSessionContext> for SessionContext {
1929    fn from(value: DeltaSessionContext) -> Self {
1930        value.inner
1931    }
1932}
1933
1934/// A wrapper for Deltafusion's Column to preserve case-sensitivity during string conversion
1935pub struct DeltaColumn {
1936    inner: Column,
1937}
1938
1939impl From<&str> for DeltaColumn {
1940    fn from(c: &str) -> Self {
1941        DeltaColumn {
1942            inner: Column::from_qualified_name_ignore_case(c),
1943        }
1944    }
1945}
1946
1947/// Create a column, cloning the string
1948impl From<&String> for DeltaColumn {
1949    fn from(c: &String) -> Self {
1950        DeltaColumn {
1951            inner: Column::from_qualified_name_ignore_case(c),
1952        }
1953    }
1954}
1955
1956/// Create a column, reusing the existing string
1957impl From<String> for DeltaColumn {
1958    fn from(c: String) -> Self {
1959        DeltaColumn {
1960            inner: Column::from_qualified_name_ignore_case(c),
1961        }
1962    }
1963}
1964
1965impl From<DeltaColumn> for Column {
1966    fn from(value: DeltaColumn) -> Self {
1967        value.inner
1968    }
1969}
1970
1971/// Create a column, resuing the existing datafusion column
1972impl From<Column> for DeltaColumn {
1973    fn from(c: Column) -> Self {
1974        DeltaColumn { inner: c }
1975    }
1976}
1977
1978#[cfg(test)]
1979mod tests {
1980    use crate::kernel::log_segment::PathExt;
1981    use crate::logstore::default_logstore::DefaultLogStore;
1982    use crate::logstore::ObjectStoreRef;
1983    use crate::operations::write::SchemaMode;
1984    use crate::writer::test_utils::get_delta_schema;
1985    use arrow::array::StructArray;
1986    use arrow::datatypes::{Field, Schema};
1987    use arrow_array::cast::AsArray;
1988    use bytes::Bytes;
1989    use chrono::{TimeZone, Utc};
1990    use datafusion::assert_batches_sorted_eq;
1991    use datafusion::datasource::physical_plan::FileScanConfig;
1992    use datafusion::datasource::source::DataSourceExec;
1993    use datafusion::physical_plan::empty::EmptyExec;
1994    use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr};
1995    use datafusion_expr::lit;
1996    use datafusion_proto::physical_plan::AsExecutionPlan;
1997    use datafusion_proto::protobuf;
1998    use futures::{stream::BoxStream, StreamExt};
1999    use object_store::{
2000        path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectStore,
2001        PutMultipartOpts, PutOptions, PutPayload, PutResult,
2002    };
2003    use serde_json::json;
2004    use std::fmt::{Debug, Display, Formatter};
2005    use std::ops::{Deref, Range};
2006    use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
2007
2008    use super::*;
2009
2010    // test deserialization of serialized partition values.
2011    // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
2012    #[test]
2013    fn test_to_correct_scalar_value() {
2014        let reference_pairs = &[
2015            (
2016                json!("2015"),
2017                ArrowDataType::Int16,
2018                ScalarValue::Int16(Some(2015)),
2019            ),
2020            (
2021                json!("2015"),
2022                ArrowDataType::Int32,
2023                ScalarValue::Int32(Some(2015)),
2024            ),
2025            (
2026                json!("2015"),
2027                ArrowDataType::Int64,
2028                ScalarValue::Int64(Some(2015)),
2029            ),
2030            (
2031                json!("2015"),
2032                ArrowDataType::Float32,
2033                ScalarValue::Float32(Some(2015_f32)),
2034            ),
2035            (
2036                json!("2015"),
2037                ArrowDataType::Float64,
2038                ScalarValue::Float64(Some(2015_f64)),
2039            ),
2040            (
2041                json!(2015),
2042                ArrowDataType::Float64,
2043                ScalarValue::Float64(Some(2015_f64)),
2044            ),
2045            (
2046                json!("2015-01-01"),
2047                ArrowDataType::Date32,
2048                ScalarValue::Date32(Some(16436)),
2049            ),
2050            // (
2051            //     json!("2015-01-01"),
2052            //     ArrowDataType::Date64,
2053            //     ScalarValue::Date64(Some(16436)),
2054            // ),
2055            // TODO(roeap) there seem to be differences in how precisions are handled locally and in CI, need to investigate
2056            // (
2057            //     json!("2020-09-08 13:42:29"),
2058            //     ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
2059            //     ScalarValue::TimestampNanosecond(Some(1599565349000000000), None),
2060            // ),
2061            // (
2062            //     json!("2020-09-08 13:42:29"),
2063            //     ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
2064            //     ScalarValue::TimestampMicrosecond(Some(1599565349000000), None),
2065            // ),
2066            // (
2067            //     json!("2020-09-08 13:42:29"),
2068            //     ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
2069            //     ScalarValue::TimestampMillisecond(Some(1599565349000), None),
2070            // ),
2071            (
2072                json!(true),
2073                ArrowDataType::Boolean,
2074                ScalarValue::Boolean(Some(true)),
2075            ),
2076        ];
2077
2078        for (raw, data_type, ref_scalar) in reference_pairs {
2079            let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap();
2080            assert_eq!(*ref_scalar, scalar)
2081        }
2082    }
2083
2084    #[test]
2085    fn test_partitioned_file_from_action() {
2086        let mut partition_values = std::collections::HashMap::new();
2087        partition_values.insert("month".to_string(), Some("1".to_string()));
2088        partition_values.insert("year".to_string(), Some("2015".to_string()));
2089        let action = Add {
2090            path: "year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string(),
2091            size: 10644,
2092            partition_values,
2093            modification_time: 1660497727833,
2094            data_change: true,
2095            stats: None,
2096            deletion_vector: None,
2097            stats_parsed: None,
2098            tags: None,
2099            base_row_id: None,
2100            default_row_commit_version: None,
2101            clustering_provider: None,
2102        };
2103        let schema = ArrowSchema::new(vec![
2104            Field::new("year", ArrowDataType::Int64, true),
2105            Field::new("month", ArrowDataType::Int64, true),
2106        ]);
2107
2108        let part_columns = vec!["year".to_string(), "month".to_string()];
2109        let file = partitioned_file_from_action(&action, &part_columns, &schema);
2110        let ref_file = PartitionedFile {
2111            object_meta: object_store::ObjectMeta {
2112                location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
2113                last_modified: Utc.timestamp_millis_opt(1660497727833).unwrap(),
2114                size: 10644,
2115                e_tag: None,
2116                version: None,
2117            },
2118            partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(),
2119            range: None,
2120            extensions: None,
2121            statistics: None,
2122            metadata_size_hint: None,
2123        };
2124        assert_eq!(file.partition_values, ref_file.partition_values)
2125    }
2126
2127    #[tokio::test]
2128    async fn test_enforce_invariants() {
2129        let schema = Arc::new(Schema::new(vec![
2130            Field::new("a", ArrowDataType::Utf8, false),
2131            Field::new("b", ArrowDataType::Int32, false),
2132        ]));
2133        let batch = RecordBatch::try_new(
2134            Arc::clone(&schema),
2135            vec![
2136                Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c", "d"])),
2137                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
2138            ],
2139        )
2140        .unwrap();
2141        // Empty invariants is okay
2142        let invariants: Vec<Invariant> = vec![];
2143        assert!(DeltaDataChecker::new_with_invariants(invariants)
2144            .check_batch(&batch)
2145            .await
2146            .is_ok());
2147
2148        // Valid invariants return Ok(())
2149        let invariants = vec![
2150            Invariant::new("a", "a is not null"),
2151            Invariant::new("b", "b < 1000"),
2152        ];
2153        assert!(DeltaDataChecker::new_with_invariants(invariants)
2154            .check_batch(&batch)
2155            .await
2156            .is_ok());
2157
2158        // Violated invariants returns an error with list of violations
2159        let invariants = vec![
2160            Invariant::new("a", "a is null"),
2161            Invariant::new("b", "b < 100"),
2162        ];
2163        let result = DeltaDataChecker::new_with_invariants(invariants)
2164            .check_batch(&batch)
2165            .await;
2166        assert!(result.is_err());
2167        assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
2168        if let Err(DeltaTableError::InvalidData { violations }) = result {
2169            assert_eq!(violations.len(), 2);
2170        }
2171
2172        // Irrelevant invariants return a different error
2173        let invariants = vec![Invariant::new("c", "c > 2000")];
2174        let result = DeltaDataChecker::new_with_invariants(invariants)
2175            .check_batch(&batch)
2176            .await;
2177        assert!(result.is_err());
2178
2179        // Nested invariants are unsupported
2180        let struct_fields = schema.fields().clone();
2181        let schema = Arc::new(Schema::new(vec![Field::new(
2182            "x",
2183            ArrowDataType::Struct(struct_fields),
2184            false,
2185        )]));
2186        let inner = Arc::new(StructArray::from(batch));
2187        let batch = RecordBatch::try_new(schema, vec![inner]).unwrap();
2188
2189        let invariants = vec![Invariant::new("x.b", "x.b < 1000")];
2190        let result = DeltaDataChecker::new_with_invariants(invariants)
2191            .check_batch(&batch)
2192            .await;
2193        assert!(result.is_err());
2194        assert!(matches!(result, Err(DeltaTableError::Generic { .. })));
2195    }
2196
2197    #[tokio::test]
2198    async fn test_enforce_constraints() {
2199        let schema = Arc::new(Schema::new(vec![
2200            Field::new("a", ArrowDataType::Utf8, false),
2201            Field::new("b", ArrowDataType::Int32, false),
2202        ]));
2203        let batch = RecordBatch::try_new(
2204            Arc::clone(&schema),
2205            vec![
2206                Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c", "d"])),
2207                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
2208            ],
2209        )
2210        .unwrap();
2211        // Empty constraints is okay
2212        let constraints: Vec<Constraint> = vec![];
2213        assert!(DeltaDataChecker::new_with_constraints(constraints)
2214            .check_batch(&batch)
2215            .await
2216            .is_ok());
2217
2218        // Valid invariants return Ok(())
2219        let constraints = vec![
2220            Constraint::new("custom_a", "a is not null"),
2221            Constraint::new("custom_b", "b < 1000"),
2222        ];
2223        assert!(DeltaDataChecker::new_with_constraints(constraints)
2224            .check_batch(&batch)
2225            .await
2226            .is_ok());
2227
2228        // Violated invariants returns an error with list of violations
2229        let constraints = vec![
2230            Constraint::new("custom_a", "a is null"),
2231            Constraint::new("custom_B", "b < 100"),
2232        ];
2233        let result = DeltaDataChecker::new_with_constraints(constraints)
2234            .check_batch(&batch)
2235            .await;
2236        assert!(result.is_err());
2237        assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
2238        if let Err(DeltaTableError::InvalidData { violations }) = result {
2239            assert_eq!(violations.len(), 2);
2240        }
2241
2242        // Irrelevant constraints return a different error
2243        let constraints = vec![Constraint::new("custom_c", "c > 2000")];
2244        let result = DeltaDataChecker::new_with_constraints(constraints)
2245            .check_batch(&batch)
2246            .await;
2247        assert!(result.is_err());
2248    }
2249
2250    /// Ensure that constraints when there are spaces in the field name still work
2251    ///
2252    /// See <https://github.com/delta-io/delta-rs/pull/3374>
2253    #[tokio::test]
2254    async fn test_constraints_with_spacey_fields() -> DeltaResult<()> {
2255        let schema = Arc::new(Schema::new(vec![
2256            Field::new("a", ArrowDataType::Utf8, false),
2257            Field::new("b bop", ArrowDataType::Int32, false),
2258        ]));
2259        let batch = RecordBatch::try_new(
2260            Arc::clone(&schema),
2261            vec![
2262                Arc::new(arrow::array::StringArray::from(vec![
2263                    "a", "b bop", "c", "d",
2264                ])),
2265                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
2266            ],
2267        )?;
2268
2269        // Valid invariants return Ok(())
2270        let constraints = vec![
2271            Constraint::new("custom a", "a is not null"),
2272            Constraint::new("custom_b", "b bop < 1000"),
2273        ];
2274        assert!(DeltaDataChecker::new_with_constraints(constraints)
2275            .check_batch(&batch)
2276            .await
2277            .is_ok());
2278
2279        // Violated invariants returns an error with list of violations
2280        let constraints = vec![
2281            Constraint::new("custom_a", "a is null"),
2282            Constraint::new("custom_B", "b bop < 100"),
2283        ];
2284        let result = DeltaDataChecker::new_with_constraints(constraints)
2285            .check_batch(&batch)
2286            .await;
2287        assert!(result.is_err());
2288        assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
2289        if let Err(DeltaTableError::InvalidData { violations }) = result {
2290            assert_eq!(violations.len(), 2);
2291        }
2292
2293        // Irrelevant constraints return a different error
2294        let constraints = vec![Constraint::new("custom_c", "c > 2000")];
2295        let result = DeltaDataChecker::new_with_constraints(constraints)
2296            .check_batch(&batch)
2297            .await;
2298        assert!(result.is_err());
2299        Ok(())
2300    }
2301
2302    #[test]
2303    fn roundtrip_test_delta_exec_plan() {
2304        let ctx = SessionContext::new();
2305        let codec = DeltaPhysicalCodec {};
2306
2307        let schema = Arc::new(Schema::new(vec![
2308            Field::new("a", ArrowDataType::Utf8, false),
2309            Field::new("b", ArrowDataType::Int32, false),
2310        ]));
2311        let exec_plan = Arc::from(DeltaScan {
2312            table_uri: "s3://my_bucket/this/is/some/path".to_string(),
2313            parquet_scan: Arc::from(EmptyExec::new(schema.clone())),
2314            config: DeltaScanConfig::default(),
2315            logical_schema: schema.clone(),
2316            metrics: ExecutionPlanMetricsSet::new(),
2317        });
2318        let proto: protobuf::PhysicalPlanNode =
2319            protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
2320                .expect("to proto");
2321
2322        let runtime = ctx.runtime_env();
2323        let result_exec_plan: Arc<dyn ExecutionPlan> = proto
2324            .try_into_physical_plan(&ctx, runtime.deref(), &codec)
2325            .expect("from proto");
2326        assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
2327    }
2328
2329    #[tokio::test]
2330    async fn delta_table_provider_with_config() {
2331        let table = crate::open_table("../test/tests/data/delta-2.2.0-partitioned-types")
2332            .await
2333            .unwrap();
2334        let config = DeltaScanConfigBuilder::new()
2335            .with_file_column_name(&"file_source")
2336            .build(table.snapshot().unwrap())
2337            .unwrap();
2338
2339        let log_store = table.log_store();
2340        let provider =
2341            DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config)
2342                .unwrap();
2343        let ctx = SessionContext::new();
2344        ctx.register_table("test", Arc::new(provider)).unwrap();
2345
2346        let df = ctx.sql("select * from test").await.unwrap();
2347        let actual = df.collect().await.unwrap();
2348        let expected = vec! [
2349                "+----+----+----+-------------------------------------------------------------------------------+",
2350                "| c3 | c1 | c2 | file_source                                                                   |",
2351                "+----+----+----+-------------------------------------------------------------------------------+",
2352                "| 4  | 6  | a  | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
2353                "| 5  | 4  | c  | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
2354                "| 6  | 5  | b  | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
2355                "+----+----+----+-------------------------------------------------------------------------------+",
2356            ];
2357        assert_batches_sorted_eq!(&expected, &actual);
2358    }
2359
2360    #[tokio::test]
2361    async fn delta_scan_mixed_partition_order() {
2362        // Tests issue (1787) where partition columns were incorrect when they
2363        // have a different order in the metadata and table schema
2364        let schema = Arc::new(ArrowSchema::new(vec![
2365            Field::new("modified", ArrowDataType::Utf8, true),
2366            Field::new("id", ArrowDataType::Utf8, true),
2367            Field::new("value", ArrowDataType::Int32, true),
2368        ]));
2369
2370        let table = crate::DeltaOps::new_in_memory()
2371            .create()
2372            .with_columns(get_delta_schema().fields().cloned())
2373            .with_partition_columns(["modified", "id"])
2374            .await
2375            .unwrap();
2376        assert_eq!(table.version(), 0);
2377
2378        let batch = RecordBatch::try_new(
2379            schema.clone(),
2380            vec![
2381                Arc::new(arrow::array::StringArray::from(vec![
2382                    "2021-02-01",
2383                    "2021-02-01",
2384                    "2021-02-02",
2385                    "2021-02-02",
2386                ])),
2387                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
2388                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
2389            ],
2390        )
2391        .unwrap();
2392        // write some data
2393        let table = crate::DeltaOps(table)
2394            .write(vec![batch.clone()])
2395            .with_save_mode(crate::protocol::SaveMode::Append)
2396            .await
2397            .unwrap();
2398
2399        let config = DeltaScanConfigBuilder::new()
2400            .build(table.snapshot().unwrap())
2401            .unwrap();
2402
2403        let log_store = table.log_store();
2404        let provider =
2405            DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config)
2406                .unwrap();
2407        let logical_schema = provider.schema();
2408        let ctx = SessionContext::new();
2409        ctx.register_table("test", Arc::new(provider)).unwrap();
2410
2411        let expected_logical_order = vec!["value", "modified", "id"];
2412        let actual_order: Vec<String> = logical_schema
2413            .fields()
2414            .iter()
2415            .map(|f| f.name().to_owned())
2416            .collect();
2417
2418        let df = ctx.sql("select * from test").await.unwrap();
2419        let actual = df.collect().await.unwrap();
2420        let expected = vec![
2421            "+-------+------------+----+",
2422            "| value | modified   | id |",
2423            "+-------+------------+----+",
2424            "| 1     | 2021-02-01 | A  |",
2425            "| 10    | 2021-02-01 | B  |",
2426            "| 100   | 2021-02-02 | D  |",
2427            "| 20    | 2021-02-02 | C  |",
2428            "+-------+------------+----+",
2429        ];
2430        assert_batches_sorted_eq!(&expected, &actual);
2431        assert_eq!(expected_logical_order, actual_order);
2432    }
2433
2434    #[tokio::test]
2435    async fn delta_scan_case_sensitive() {
2436        let schema = Arc::new(ArrowSchema::new(vec![
2437            Field::new("moDified", ArrowDataType::Utf8, true),
2438            Field::new("ID", ArrowDataType::Utf8, true),
2439            Field::new("vaLue", ArrowDataType::Int32, true),
2440        ]));
2441
2442        let batch = RecordBatch::try_new(
2443            schema.clone(),
2444            vec![
2445                Arc::new(arrow::array::StringArray::from(vec![
2446                    "2021-02-01",
2447                    "2021-02-01",
2448                    "2021-02-02",
2449                    "2021-02-02",
2450                ])),
2451                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
2452                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
2453            ],
2454        )
2455        .unwrap();
2456        // write some data
2457        let table = crate::DeltaOps::new_in_memory()
2458            .write(vec![batch.clone()])
2459            .with_save_mode(crate::protocol::SaveMode::Append)
2460            .await
2461            .unwrap();
2462
2463        let config = DeltaScanConfigBuilder::new()
2464            .build(table.snapshot().unwrap())
2465            .unwrap();
2466        let log = table.log_store();
2467
2468        let provider =
2469            DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
2470        let ctx: SessionContext = DeltaSessionContext::default().into();
2471        ctx.register_table("test", Arc::new(provider)).unwrap();
2472
2473        let df = ctx
2474            .sql("select ID, moDified, vaLue from test")
2475            .await
2476            .unwrap();
2477        let actual = df.collect().await.unwrap();
2478        let expected = vec![
2479            "+----+------------+-------+",
2480            "| ID | moDified   | vaLue |",
2481            "+----+------------+-------+",
2482            "| A  | 2021-02-01 | 1     |",
2483            "| B  | 2021-02-01 | 10    |",
2484            "| C  | 2021-02-02 | 20    |",
2485            "| D  | 2021-02-02 | 100   |",
2486            "+----+------------+-------+",
2487        ];
2488        assert_batches_sorted_eq!(&expected, &actual);
2489
2490        /* TODO: Datafusion doesn't have any options to prevent case-sensitivity with the col func */
2491        /*
2492        let df = ctx
2493            .table("test")
2494            .await
2495            .unwrap()
2496            .select(vec![col("ID"), col("moDified"), col("vaLue")])
2497            .unwrap();
2498        let actual = df.collect().await.unwrap();
2499        assert_batches_sorted_eq!(&expected, &actual);
2500        */
2501    }
2502
2503    #[tokio::test]
2504    async fn delta_scan_supports_missing_columns() {
2505        let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
2506            "col_1",
2507            ArrowDataType::Utf8,
2508            true,
2509        )]));
2510
2511        let batch1 = RecordBatch::try_new(
2512            schema1.clone(),
2513            vec![Arc::new(arrow::array::StringArray::from(vec![
2514                Some("A"),
2515                Some("B"),
2516            ]))],
2517        )
2518        .unwrap();
2519
2520        let schema2 = Arc::new(ArrowSchema::new(vec![
2521            Field::new("col_1", ArrowDataType::Utf8, true),
2522            Field::new("col_2", ArrowDataType::Utf8, true),
2523        ]));
2524
2525        let batch2 = RecordBatch::try_new(
2526            schema2.clone(),
2527            vec![
2528                Arc::new(arrow::array::StringArray::from(vec![
2529                    Some("E"),
2530                    Some("F"),
2531                    Some("G"),
2532                ])),
2533                Arc::new(arrow::array::StringArray::from(vec![
2534                    Some("E2"),
2535                    Some("F2"),
2536                    Some("G2"),
2537                ])),
2538            ],
2539        )
2540        .unwrap();
2541
2542        let table = crate::DeltaOps::new_in_memory()
2543            .write(vec![batch2])
2544            .with_save_mode(crate::protocol::SaveMode::Append)
2545            .await
2546            .unwrap();
2547
2548        let table = crate::DeltaOps(table)
2549            .write(vec![batch1])
2550            .with_schema_mode(SchemaMode::Merge)
2551            .with_save_mode(crate::protocol::SaveMode::Append)
2552            .await
2553            .unwrap();
2554
2555        let config = DeltaScanConfigBuilder::new()
2556            .build(table.snapshot().unwrap())
2557            .unwrap();
2558        let log = table.log_store();
2559
2560        let provider =
2561            DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
2562        let ctx: SessionContext = DeltaSessionContext::default().into();
2563        ctx.register_table("test", Arc::new(provider)).unwrap();
2564
2565        let df = ctx.sql("select col_1, col_2 from test").await.unwrap();
2566        let actual = df.collect().await.unwrap();
2567        let expected = vec![
2568            "+-------+-------+",
2569            "| col_1 | col_2 |",
2570            "+-------+-------+",
2571            "| A     |       |",
2572            "| B     |       |",
2573            "| E     | E2    |",
2574            "| F     | F2    |",
2575            "| G     | G2    |",
2576            "+-------+-------+",
2577        ];
2578        assert_batches_sorted_eq!(&expected, &actual);
2579    }
2580
2581    #[tokio::test]
2582    async fn delta_scan_supports_pushdown() {
2583        let schema = Arc::new(ArrowSchema::new(vec![
2584            Field::new("col_1", ArrowDataType::Utf8, false),
2585            Field::new("col_2", ArrowDataType::Utf8, false),
2586        ]));
2587
2588        let batch = RecordBatch::try_new(
2589            schema.clone(),
2590            vec![
2591                Arc::new(arrow::array::StringArray::from(vec![
2592                    Some("A"),
2593                    Some("B"),
2594                    Some("C"),
2595                ])),
2596                Arc::new(arrow::array::StringArray::from(vec![
2597                    Some("A2"),
2598                    Some("B2"),
2599                    Some("C2"),
2600                ])),
2601            ],
2602        )
2603        .unwrap();
2604
2605        let table = crate::DeltaOps::new_in_memory()
2606            .write(vec![batch])
2607            .with_save_mode(crate::protocol::SaveMode::Append)
2608            .await
2609            .unwrap();
2610
2611        let config = DeltaScanConfigBuilder::new()
2612            .build(table.snapshot().unwrap())
2613            .unwrap();
2614        let log = table.log_store();
2615
2616        let provider =
2617            DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
2618
2619        let mut cfg = SessionConfig::default();
2620        cfg.options_mut().execution.parquet.pushdown_filters = true;
2621        let ctx = SessionContext::new_with_config(cfg);
2622        ctx.register_table("test", Arc::new(provider)).unwrap();
2623
2624        let df = ctx
2625            .sql("select col_1, col_2 from test WHERE col_1 = 'A'")
2626            .await
2627            .unwrap();
2628        let actual = df.collect().await.unwrap();
2629        let expected = vec![
2630            "+-------+-------+",
2631            "| col_1 | col_2 |",
2632            "+-------+-------+",
2633            "| A     | A2    |",
2634            "+-------+-------+",
2635        ];
2636        assert_batches_sorted_eq!(&expected, &actual);
2637    }
2638
2639    #[tokio::test]
2640    async fn delta_scan_supports_nested_missing_columns() {
2641        let column1_schema1: arrow::datatypes::Fields =
2642            vec![Field::new("col_1a", ArrowDataType::Utf8, true)].into();
2643        let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
2644            "col_1",
2645            ArrowDataType::Struct(column1_schema1.clone()),
2646            true,
2647        )]));
2648
2649        let batch1 = RecordBatch::try_new(
2650            schema1.clone(),
2651            vec![Arc::new(StructArray::new(
2652                column1_schema1,
2653                vec![Arc::new(arrow::array::StringArray::from(vec![
2654                    Some("A"),
2655                    Some("B"),
2656                ]))],
2657                None,
2658            ))],
2659        )
2660        .unwrap();
2661
2662        let column1_schema2: arrow_schema::Fields = vec![
2663            Field::new("col_1a", ArrowDataType::Utf8, true),
2664            Field::new("col_1b", ArrowDataType::Utf8, true),
2665        ]
2666        .into();
2667        let schema2 = Arc::new(ArrowSchema::new(vec![Field::new(
2668            "col_1",
2669            ArrowDataType::Struct(column1_schema2.clone()),
2670            true,
2671        )]));
2672
2673        let batch2 = RecordBatch::try_new(
2674            schema2.clone(),
2675            vec![Arc::new(StructArray::new(
2676                column1_schema2,
2677                vec![
2678                    Arc::new(arrow::array::StringArray::from(vec![
2679                        Some("E"),
2680                        Some("F"),
2681                        Some("G"),
2682                    ])),
2683                    Arc::new(arrow::array::StringArray::from(vec![
2684                        Some("E2"),
2685                        Some("F2"),
2686                        Some("G2"),
2687                    ])),
2688                ],
2689                None,
2690            ))],
2691        )
2692        .unwrap();
2693
2694        let table = crate::DeltaOps::new_in_memory()
2695            .write(vec![batch1])
2696            .with_save_mode(crate::protocol::SaveMode::Append)
2697            .await
2698            .unwrap();
2699
2700        let table = crate::DeltaOps(table)
2701            .write(vec![batch2])
2702            .with_schema_mode(SchemaMode::Merge)
2703            .with_save_mode(crate::protocol::SaveMode::Append)
2704            .await
2705            .unwrap();
2706
2707        let config = DeltaScanConfigBuilder::new()
2708            .build(table.snapshot().unwrap())
2709            .unwrap();
2710        let log = table.log_store();
2711
2712        let provider =
2713            DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
2714        let ctx: SessionContext = DeltaSessionContext::default().into();
2715        ctx.register_table("test", Arc::new(provider)).unwrap();
2716
2717        let df = ctx
2718            .sql("select col_1.col_1a, col_1.col_1b from test")
2719            .await
2720            .unwrap();
2721        let actual = df.collect().await.unwrap();
2722        let expected = vec![
2723            "+--------------------+--------------------+",
2724            "| test.col_1[col_1a] | test.col_1[col_1b] |",
2725            "+--------------------+--------------------+",
2726            "| A                  |                    |",
2727            "| B                  |                    |",
2728            "| E                  | E2                 |",
2729            "| F                  | F2                 |",
2730            "| G                  | G2                 |",
2731            "+--------------------+--------------------+",
2732        ];
2733        assert_batches_sorted_eq!(&expected, &actual);
2734    }
2735
2736    #[tokio::test]
2737    async fn test_multiple_predicate_pushdown() {
2738        use crate::datafusion::prelude::SessionContext;
2739        let schema = Arc::new(ArrowSchema::new(vec![
2740            Field::new("moDified", ArrowDataType::Utf8, true),
2741            Field::new("id", ArrowDataType::Utf8, true),
2742            Field::new("vaLue", ArrowDataType::Int32, true),
2743        ]));
2744
2745        let batch = RecordBatch::try_new(
2746            schema.clone(),
2747            vec![
2748                Arc::new(arrow::array::StringArray::from(vec![
2749                    "2021-02-01",
2750                    "2021-02-01",
2751                    "2021-02-02",
2752                    "2021-02-02",
2753                ])),
2754                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
2755                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
2756            ],
2757        )
2758        .unwrap();
2759        // write some data
2760        let table = crate::DeltaOps::new_in_memory()
2761            .write(vec![batch.clone()])
2762            .with_save_mode(crate::protocol::SaveMode::Append)
2763            .await
2764            .unwrap();
2765
2766        let datafusion = SessionContext::new();
2767        let table = Arc::new(table);
2768
2769        datafusion.register_table("snapshot", table).unwrap();
2770
2771        let df = datafusion
2772            .sql("select * from snapshot where id > 10000 and id < 20000")
2773            .await
2774            .unwrap();
2775
2776        df.collect().await.unwrap();
2777    }
2778
2779    #[tokio::test]
2780    async fn test_delta_scan_builder_no_scan_config() {
2781        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
2782        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
2783        let table = crate::DeltaOps::new_in_memory()
2784            .write(vec![batch])
2785            .with_save_mode(crate::protocol::SaveMode::Append)
2786            .await
2787            .unwrap();
2788
2789        let ctx = SessionContext::new();
2790        let state = ctx.state();
2791        let scan = DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store(), &state)
2792            .with_filter(Some(col("a").eq(lit("s"))))
2793            .build()
2794            .await
2795            .unwrap();
2796
2797        let mut visitor = ParquetVisitor::default();
2798        visit_execution_plan(&scan, &mut visitor).unwrap();
2799
2800        assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
2801    }
2802
2803    #[tokio::test]
2804    async fn test_delta_scan_builder_scan_config_disable_pushdown() {
2805        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
2806        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
2807        let table = crate::DeltaOps::new_in_memory()
2808            .write(vec![batch])
2809            .with_save_mode(crate::protocol::SaveMode::Append)
2810            .await
2811            .unwrap();
2812
2813        let snapshot = table.snapshot().unwrap();
2814        let ctx = SessionContext::new();
2815        let state = ctx.state();
2816        let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state)
2817            .with_filter(Some(col("a").eq(lit("s"))))
2818            .with_scan_config(
2819                DeltaScanConfigBuilder::new()
2820                    .with_parquet_pushdown(false)
2821                    .build(snapshot)
2822                    .unwrap(),
2823            )
2824            .build()
2825            .await
2826            .unwrap();
2827
2828        let mut visitor = ParquetVisitor::default();
2829        visit_execution_plan(&scan, &mut visitor).unwrap();
2830
2831        assert!(visitor.predicate.is_none());
2832    }
2833
2834    #[tokio::test]
2835    async fn test_delta_scan_applies_parquet_options() {
2836        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
2837        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
2838        let table = crate::DeltaOps::new_in_memory()
2839            .write(vec![batch])
2840            .with_save_mode(crate::protocol::SaveMode::Append)
2841            .await
2842            .unwrap();
2843
2844        let snapshot = table.snapshot().unwrap();
2845
2846        let mut config = SessionConfig::default();
2847        config.options_mut().execution.parquet.pushdown_filters = true;
2848        let ctx = SessionContext::new_with_config(config);
2849        let state = ctx.state();
2850
2851        let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state)
2852            .build()
2853            .await
2854            .unwrap();
2855
2856        let mut visitor = ParquetVisitor::default();
2857        visit_execution_plan(&scan, &mut visitor).unwrap();
2858
2859        assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
2860    }
2861
2862    /// Extracts fields from the parquet scan
2863    #[derive(Default)]
2864    struct ParquetVisitor {
2865        predicate: Option<Arc<dyn PhysicalExpr>>,
2866        options: Option<TableParquetOptions>,
2867    }
2868
2869    impl ExecutionPlanVisitor for ParquetVisitor {
2870        type Error = DataFusionError;
2871
2872        fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
2873            let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
2874                return Ok(true);
2875            };
2876
2877            let Some(scan_config) = datasource_exec
2878                .data_source()
2879                .as_any()
2880                .downcast_ref::<FileScanConfig>()
2881            else {
2882                return Ok(true);
2883            };
2884
2885            if let Some(parquet_source) = scan_config
2886                .file_source
2887                .as_any()
2888                .downcast_ref::<ParquetSource>()
2889            {
2890                self.options = Some(parquet_source.table_parquet_options().clone());
2891                self.predicate = parquet_source.predicate().cloned();
2892            }
2893
2894            Ok(true)
2895        }
2896    }
2897
2898    #[tokio::test]
2899    async fn passes_sanity_checker_when_all_files_filtered() {
2900        // Run a query that filters out all files and sorts.
2901        // Verify that it returns an empty set of rows without panicking.
2902        //
2903        // Historically, we had a bug that caused us to emit a query plan with 0 partitions, which
2904        // datafusion rejected.
2905        let table = crate::open_table("../test/tests/data/delta-2.2.0-partitioned-types")
2906            .await
2907            .unwrap();
2908        let ctx = SessionContext::new();
2909        ctx.register_table("test", Arc::new(table)).unwrap();
2910
2911        let df = ctx
2912            .sql("select * from test where c3 = 100 ORDER BY c1 ASC")
2913            .await
2914            .unwrap();
2915        let actual = df.collect().await.unwrap();
2916
2917        assert_eq!(actual.len(), 0);
2918    }
2919
2920    #[tokio::test]
2921    async fn test_check_nullability() -> DeltaResult<()> {
2922        use arrow::array::StringArray;
2923
2924        let data_checker = DeltaDataChecker {
2925            non_nullable_columns: vec!["zed".to_string(), "yap".to_string()],
2926            ..Default::default()
2927        };
2928
2929        let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
2930        let nulls: Arc<dyn Array> = Arc::new(StringArray::new_null(1));
2931        let batch = RecordBatch::try_from_iter(vec![("a", arr), ("zed", nulls)]).unwrap();
2932
2933        let result = data_checker.check_nullability(&batch);
2934        assert!(
2935            result.is_err(),
2936            "The result should have errored! {result:?}"
2937        );
2938
2939        let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
2940        let batch = RecordBatch::try_from_iter(vec![("zed", arr)]).unwrap();
2941        let result = data_checker.check_nullability(&batch);
2942        assert!(
2943            result.is_err(),
2944            "The result should have errored! {result:?}"
2945        );
2946
2947        let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
2948        let batch = RecordBatch::try_from_iter(vec![("zed", arr.clone()), ("yap", arr)]).unwrap();
2949        let _ = data_checker.check_nullability(&batch)?;
2950
2951        Ok(())
2952    }
2953
2954    #[tokio::test]
2955    async fn test_delta_scan_uses_parquet_column_pruning() {
2956        let small: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["a"]));
2957        let large: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["b"
2958            .repeat(1024)
2959            .as_str()]));
2960        let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap();
2961        let table = crate::DeltaOps::new_in_memory()
2962            .write(vec![batch])
2963            .with_save_mode(crate::protocol::SaveMode::Append)
2964            .await
2965            .unwrap();
2966
2967        let config = DeltaScanConfigBuilder::new()
2968            .build(table.snapshot().unwrap())
2969            .unwrap();
2970
2971        let (object_store, mut operations) =
2972            RecordingObjectStore::new(table.log_store().object_store(None));
2973        let log_store =
2974            DefaultLogStore::new(Arc::new(object_store), table.log_store().config().clone());
2975        let provider = DeltaTableProvider::try_new(
2976            table.snapshot().unwrap().clone(),
2977            Arc::new(log_store),
2978            config,
2979        )
2980        .unwrap();
2981        let ctx = SessionContext::new();
2982        ctx.register_table("test", Arc::new(provider)).unwrap();
2983        let state = ctx.state();
2984
2985        let df = ctx.sql("select small from test").await.unwrap();
2986        let plan = df.create_physical_plan().await.unwrap();
2987
2988        let mut stream = plan.execute(0, state.task_ctx()).unwrap();
2989        let Some(Ok(batch)) = stream.next().await else {
2990            panic!()
2991        };
2992        assert!(stream.next().await.is_none());
2993        assert_eq!(1, batch.num_columns());
2994        assert_eq!(1, batch.num_rows());
2995        let small = batch.column_by_name("small").unwrap().as_string::<i32>();
2996        assert_eq!("a", small.iter().next().unwrap().unwrap());
2997
2998        let expected = vec![
2999            ObjectStoreOperation::GetRange(LocationType::Data, 4920..4928),
3000            ObjectStoreOperation::GetRange(LocationType::Data, 2399..4920),
3001            #[expect(clippy::single_range_in_vec_init)]
3002            ObjectStoreOperation::GetRanges(LocationType::Data, vec![4..58]),
3003        ];
3004        let mut actual = Vec::new();
3005        operations.recv_many(&mut actual, 3).await;
3006        assert_eq!(expected, actual);
3007    }
3008
3009    /// Records operations made by the inner object store on a channel obtained at construction
3010    struct RecordingObjectStore {
3011        inner: ObjectStoreRef,
3012        operations: UnboundedSender<ObjectStoreOperation>,
3013    }
3014
3015    impl RecordingObjectStore {
3016        /// Returns an object store and a channel recording all operations made by the inner object store
3017        fn new(inner: ObjectStoreRef) -> (Self, UnboundedReceiver<ObjectStoreOperation>) {
3018            let (operations, operations_receiver) = unbounded_channel();
3019            (Self { inner, operations }, operations_receiver)
3020        }
3021    }
3022
3023    impl Display for RecordingObjectStore {
3024        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3025            Display::fmt(&self.inner, f)
3026        }
3027    }
3028
3029    impl Debug for RecordingObjectStore {
3030        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3031            Debug::fmt(&self.inner, f)
3032        }
3033    }
3034
3035    #[derive(Debug, PartialEq)]
3036    enum ObjectStoreOperation {
3037        GetRanges(LocationType, Vec<Range<u64>>),
3038        GetRange(LocationType, Range<u64>),
3039        GetOpts(LocationType),
3040        Get(LocationType),
3041    }
3042
3043    #[derive(Debug, PartialEq)]
3044    enum LocationType {
3045        Data,
3046        Commit,
3047    }
3048
3049    impl From<&Path> for LocationType {
3050        fn from(value: &Path) -> Self {
3051            if value.is_commit_file() {
3052                LocationType::Commit
3053            } else if value.to_string().starts_with("part-") {
3054                LocationType::Data
3055            } else {
3056                panic!("Unknown location type: {value:?}")
3057            }
3058        }
3059    }
3060
3061    // Currently only read operations are recorded. Extend as necessary.
3062    #[async_trait]
3063    impl ObjectStore for RecordingObjectStore {
3064        async fn put(
3065            &self,
3066            location: &Path,
3067            payload: PutPayload,
3068        ) -> object_store::Result<PutResult> {
3069            self.inner.put(location, payload).await
3070        }
3071
3072        async fn put_opts(
3073            &self,
3074            location: &Path,
3075            payload: PutPayload,
3076            opts: PutOptions,
3077        ) -> object_store::Result<PutResult> {
3078            self.inner.put_opts(location, payload, opts).await
3079        }
3080
3081        async fn put_multipart(
3082            &self,
3083            location: &Path,
3084        ) -> object_store::Result<Box<dyn MultipartUpload>> {
3085            self.inner.put_multipart(location).await
3086        }
3087
3088        async fn put_multipart_opts(
3089            &self,
3090            location: &Path,
3091            opts: PutMultipartOpts,
3092        ) -> object_store::Result<Box<dyn MultipartUpload>> {
3093            self.inner.put_multipart_opts(location, opts).await
3094        }
3095
3096        async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
3097            self.operations
3098                .send(ObjectStoreOperation::Get(location.into()))
3099                .unwrap();
3100            self.inner.get(location).await
3101        }
3102
3103        async fn get_opts(
3104            &self,
3105            location: &Path,
3106            options: GetOptions,
3107        ) -> object_store::Result<GetResult> {
3108            self.operations
3109                .send(ObjectStoreOperation::GetOpts(location.into()))
3110                .unwrap();
3111            self.inner.get_opts(location, options).await
3112        }
3113
3114        async fn get_range(
3115            &self,
3116            location: &Path,
3117            range: Range<u64>,
3118        ) -> object_store::Result<Bytes> {
3119            self.operations
3120                .send(ObjectStoreOperation::GetRange(
3121                    location.into(),
3122                    range.clone(),
3123                ))
3124                .unwrap();
3125            self.inner.get_range(location, range).await
3126        }
3127
3128        async fn get_ranges(
3129            &self,
3130            location: &Path,
3131            ranges: &[Range<u64>],
3132        ) -> object_store::Result<Vec<Bytes>> {
3133            self.operations
3134                .send(ObjectStoreOperation::GetRanges(
3135                    location.into(),
3136                    ranges.to_vec(),
3137                ))
3138                .unwrap();
3139            self.inner.get_ranges(location, ranges).await
3140        }
3141
3142        async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
3143            self.inner.head(location).await
3144        }
3145
3146        async fn delete(&self, location: &Path) -> object_store::Result<()> {
3147            self.inner.delete(location).await
3148        }
3149
3150        fn delete_stream<'a>(
3151            &'a self,
3152            locations: BoxStream<'a, object_store::Result<Path>>,
3153        ) -> BoxStream<'a, object_store::Result<Path>> {
3154            self.inner.delete_stream(locations)
3155        }
3156
3157        fn list(
3158            &self,
3159            prefix: Option<&Path>,
3160        ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
3161            self.inner.list(prefix)
3162        }
3163
3164        fn list_with_offset(
3165            &self,
3166            prefix: Option<&Path>,
3167            offset: &Path,
3168        ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
3169            self.inner.list_with_offset(prefix, offset)
3170        }
3171
3172        async fn list_with_delimiter(
3173            &self,
3174            prefix: Option<&Path>,
3175        ) -> object_store::Result<ListResult> {
3176            self.inner.list_with_delimiter(prefix).await
3177        }
3178
3179        async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
3180            self.inner.copy(from, to).await
3181        }
3182
3183        async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
3184            self.inner.rename(from, to).await
3185        }
3186
3187        async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
3188            self.inner.copy_if_not_exists(from, to).await
3189        }
3190
3191        async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
3192            self.inner.rename_if_not_exists(from, to).await
3193        }
3194    }
3195}