Skip to main content

deltalake_core/delta_datafusion/
mod.rs

1//! Datafusion integration for Delta Table
2//!
3//! Example:
4//!
5//! ```rust
6//! use std::sync::Arc;
7//! use datafusion::execution::context::SessionContext;
8//!
9//! async {
10//!   let mut ctx = SessionContext::new();
11//!   let table = deltalake_core::open_table_with_storage_options(
12//!       url::Url::parse("memory://").unwrap(),
13//!       std::collections::HashMap::new()
14//!   )
15//!       .await
16//!       .unwrap();
17//!   ctx.register_table("demo", table.table_provider().await.unwrap()).unwrap();
18//!
19//!   let batches = ctx
20//!       .sql("SELECT * FROM demo").await.unwrap()
21//!       .collect()
22//!       .await.unwrap();
23//! };
24//! ```
25
26use std::fmt::Debug;
27use std::sync::Arc;
28
29use arrow::array::types::UInt16Type;
30use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
31use arrow_cast::{CastOptions, cast_with_options};
32use arrow_schema::{
33    DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef,
34    TimeUnit,
35};
36use datafusion::catalog::{Session, TableProviderFactory};
37use datafusion::common::scalar::ScalarValue;
38use datafusion::common::{
39    Column, DFSchema, DataFusionError, Result as DataFusionResult, TableReference, ToDFSchema,
40};
41use datafusion::datasource::TableProvider;
42use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
43use datafusion::execution::TaskContext;
44use datafusion::execution::context::SessionContext;
45use datafusion::logical_expr::logical_plan::CreateExternalTable;
46use datafusion::logical_expr::utils::conjunction;
47use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
48use datafusion::physical_optimizer::pruning::PruningPredicate;
49use datafusion::physical_plan::ExecutionPlan;
50use datafusion_proto::logical_plan::LogicalExtensionCodec;
51use datafusion_proto::physical_plan::PhysicalExtensionCodec;
52use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
53use either::Either;
54
55use crate::delta_datafusion::expr::parse_predicate_expression;
56use crate::delta_datafusion::table_provider::DeltaScanWire;
57use crate::ensure_table_uri;
58use crate::errors::{DeltaResult, DeltaTableError};
59use crate::kernel::{Add, EagerSnapshot, LogDataHandler, Snapshot};
60use crate::{open_table, open_table_with_storage_options};
61
62pub(crate) use self::session::DeltaSessionExt;
63pub use self::session::{
64    DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, DeltaSessionContext,
65    create_session, create_session_state_with_spill_config,
66};
67pub use self::table_provider::next::{DeletionVectorSelection, DeltaScan as DeltaScanNext};
68pub(crate) use self::utils::*;
69pub use cdf::scan::DeltaCdfTableProvider;
70pub(crate) use data_validation::{
71    DataValidationExec, constraints_to_exprs, generated_columns_to_exprs, validation_predicates,
72};
73pub(crate) use find_files::*;
74pub(crate) use table_provider::next::normalize_path_as_file_id;
75pub use table_provider::{
76    DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder,
77    next::DeltaScanExec,
78};
79pub(crate) use table_provider::{
80    next::FILE_ID_COLUMN_DEFAULT, resolve_file_column_name, update_datafusion_session,
81};
82
83pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
84
85#[doc(hidden)]
86pub mod bench_support;
87pub mod cdf;
88mod data_validation;
89pub mod engine;
90pub mod expr;
91mod file_id;
92mod find_files;
93pub mod logical;
94pub mod physical;
95pub mod planner;
96mod session;
97pub use session::SessionFallbackPolicy;
98pub(crate) use session::{SessionResolveContext, resolve_session_state};
99mod table_provider;
100pub(crate) mod utils;
101
102impl From<DeltaTableError> for DataFusionError {
103    fn from(err: DeltaTableError) -> Self {
104        match err {
105            DeltaTableError::Arrow { source } => DataFusionError::from(source),
106            DeltaTableError::Io { source } => DataFusionError::IoError(source),
107            DeltaTableError::ObjectStore { source } => DataFusionError::from(source),
108            DeltaTableError::Parquet { source } => DataFusionError::from(source),
109            _ => DataFusionError::External(Box::new(err)),
110        }
111    }
112}
113
114impl From<DataFusionError> for DeltaTableError {
115    fn from(err: DataFusionError) -> Self {
116        match err {
117            DataFusionError::ArrowError(source, _) => DeltaTableError::from(*source),
118            DataFusionError::IoError(source) => DeltaTableError::Io { source },
119            DataFusionError::ObjectStore(source) => DeltaTableError::from(*source),
120            DataFusionError::ParquetError(source) => DeltaTableError::from(*source),
121            _ => DeltaTableError::Generic(err.to_string()),
122        }
123    }
124}
125
126/// Convenience trait for calling common methods on snapshot hierarchies
127pub trait DataFusionMixins {
128    /// The physical datafusion schema of a table
129    fn read_schema(&self) -> ArrowSchemaRef;
130
131    /// Get the table schema as an [`ArrowSchemaRef`]
132    fn input_schema(&self) -> ArrowSchemaRef;
133
134    /// Parse an expression string into a datafusion [`Expr`]
135    fn parse_predicate_expression(
136        &self,
137        expr: impl AsRef<str>,
138        session: &dyn Session,
139    ) -> DeltaResult<Expr>;
140}
141
142impl DataFusionMixins for Snapshot {
143    fn read_schema(&self) -> ArrowSchemaRef {
144        _arrow_schema(
145            self.arrow_schema(),
146            self.metadata().partition_columns(),
147            true,
148        )
149    }
150
151    fn input_schema(&self) -> ArrowSchemaRef {
152        _arrow_schema(
153            self.arrow_schema(),
154            self.metadata().partition_columns(),
155            false,
156        )
157    }
158
159    fn parse_predicate_expression(
160        &self,
161        expr: impl AsRef<str>,
162        session: &dyn Session,
163    ) -> DeltaResult<Expr> {
164        let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
165        parse_predicate_expression(&schema, expr, session)
166    }
167}
168
169impl DataFusionMixins for LogDataHandler<'_> {
170    fn read_schema(&self) -> ArrowSchemaRef {
171        _arrow_schema(
172            Arc::new(
173                self.table_configuration()
174                    .logical_schema()
175                    .as_ref()
176                    .try_into_arrow()
177                    .unwrap(),
178            ),
179            self.table_configuration().metadata().partition_columns(),
180            true,
181        )
182    }
183
184    fn input_schema(&self) -> ArrowSchemaRef {
185        _arrow_schema(
186            Arc::new(
187                self.table_configuration()
188                    .logical_schema()
189                    .as_ref()
190                    .try_into_arrow()
191                    .unwrap(),
192            ),
193            self.table_configuration().metadata().partition_columns(),
194            false,
195        )
196    }
197
198    fn parse_predicate_expression(
199        &self,
200        expr: impl AsRef<str>,
201        session: &dyn Session,
202    ) -> DeltaResult<Expr> {
203        let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
204        parse_predicate_expression(&schema, expr, session)
205    }
206}
207
208impl DataFusionMixins for EagerSnapshot {
209    fn read_schema(&self) -> ArrowSchemaRef {
210        self.snapshot().read_schema()
211    }
212
213    fn input_schema(&self) -> ArrowSchemaRef {
214        self.snapshot().input_schema()
215    }
216
217    fn parse_predicate_expression(
218        &self,
219        expr: impl AsRef<str>,
220        session: &dyn Session,
221    ) -> DeltaResult<Expr> {
222        self.snapshot().parse_predicate_expression(expr, session)
223    }
224}
225
226fn _arrow_schema(
227    schema: SchemaRef,
228    partition_columns: &[String],
229    wrap_partitions: bool,
230) -> ArrowSchemaRef {
231    let fields = schema
232        .fields()
233        .into_iter()
234        .filter(|f| !partition_columns.contains(&f.name().to_string()))
235        .cloned()
236        .chain(
237            // We need stable order between logical and physical schemas, but the order of
238            // partitioning columns is not always the same in the json schema and the array
239            partition_columns.iter().map(|partition_col| {
240                let field = schema.field_with_name(partition_col).unwrap();
241                let corrected = if wrap_partitions {
242                    match field.data_type() {
243                        // Only dictionary-encode types that may be large
244                        // https://github.com/apache/arrow-datafusion/pull/5545
245                        ArrowDataType::Utf8
246                        | ArrowDataType::LargeUtf8
247                        | ArrowDataType::Binary
248                        | ArrowDataType::LargeBinary => {
249                            wrap_partition_type_in_dict(field.data_type().clone())
250                        }
251                        _ => field.data_type().clone(),
252                    }
253                } else {
254                    field.data_type().clone()
255                };
256                Arc::new(field.clone().with_data_type(corrected))
257            }),
258        )
259        .collect::<Vec<_>>();
260    Arc::new(ArrowSchema::new(fields))
261}
262
263pub(crate) fn files_matching_predicate<'a>(
264    log_data: LogDataHandler<'a>,
265    filters: &[Expr],
266) -> DeltaResult<impl Iterator<Item = Add> + 'a> {
267    if let Some(Some(predicate)) =
268        (!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
269    {
270        let expr = SessionContext::new()
271            .create_physical_expr(predicate, &log_data.read_schema().to_dfschema()?)?;
272        let pruning_predicate = PruningPredicate::try_new(expr, log_data.read_schema())?;
273        let mask = pruning_predicate.prune(&log_data)?;
274
275        Ok(Either::Left(log_data.into_iter().zip(mask).filter_map(
276            |(file, keep_file)| {
277                if keep_file { Some(file.to_add()) } else { None }
278            },
279        )))
280    } else {
281        Ok(Either::Right(
282            log_data.into_iter().map(|file| file.to_add()),
283        ))
284    }
285}
286
287pub(crate) fn get_path_column<'a>(
288    batch: &'a RecordBatch,
289    path_column: &str,
290) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
291    let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
292    batch
293        .column_by_name(path_column)
294        .ok_or_else(err)?
295        .as_any()
296        .downcast_ref::<DictionaryArray<UInt16Type>>()
297        .ok_or_else(err)?
298        .downcast_dict::<StringArray>()
299        .ok_or_else(err)
300}
301
302pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
303    match t {
304        ArrowDataType::Null => Ok(ScalarValue::Null),
305        ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
306        ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
307        ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
308        ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
309        ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
310        ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
311        ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
312        ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
313        ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
314        ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
315        ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
316        ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
317        ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
318        ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
319        ArrowDataType::FixedSizeBinary(size) => {
320            Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
321        }
322        ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
323        ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
324        ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
325        ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
326            None,
327            precision.to_owned(),
328            scale.to_owned(),
329        )),
330        ArrowDataType::Timestamp(unit, tz) => {
331            let tz = tz.to_owned();
332            Ok(match unit {
333                TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
334                TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
335                TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
336                TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
337            })
338        }
339        ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
340            k.clone(),
341            Box::new(get_null_of_arrow_type(v)?),
342        )),
343        //Unsupported types...
344        ArrowDataType::Float16
345        | ArrowDataType::Decimal32(_, _)
346        | ArrowDataType::Decimal64(_, _)
347        | ArrowDataType::Decimal256(_, _)
348        | ArrowDataType::Union(_, _)
349        | ArrowDataType::LargeList(_)
350        | ArrowDataType::Struct(_)
351        | ArrowDataType::List(_)
352        | ArrowDataType::FixedSizeList(_, _)
353        | ArrowDataType::Time32(_)
354        | ArrowDataType::Time64(_)
355        | ArrowDataType::Duration(_)
356        | ArrowDataType::Interval(_)
357        | ArrowDataType::RunEndEncoded(_, _)
358        | ArrowDataType::BinaryView
359        | ArrowDataType::Utf8View
360        | ArrowDataType::LargeListView(_)
361        | ArrowDataType::ListView(_)
362        | ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
363            "Unsupported data type for Delta Lake {t}"
364        ))),
365    }
366}
367
368fn parse_date(
369    stat_val: &serde_json::Value,
370    field_dt: &ArrowDataType,
371) -> DataFusionResult<ScalarValue> {
372    let string = match stat_val {
373        serde_json::Value::String(s) => s.to_owned(),
374        _ => stat_val.to_string(),
375    };
376
377    let time_micro = ScalarValue::try_from_string(string, &ArrowDataType::Date32)?;
378    let cast_arr = cast_with_options(
379        &time_micro.to_array()?,
380        field_dt,
381        &CastOptions {
382            safe: false,
383            ..Default::default()
384        },
385    )?;
386    ScalarValue::try_from_array(&cast_arr, 0)
387}
388
389fn parse_timestamp(
390    stat_val: &serde_json::Value,
391    field_dt: &ArrowDataType,
392) -> DataFusionResult<ScalarValue> {
393    let string = match stat_val {
394        serde_json::Value::String(s) => s.to_owned(),
395        _ => stat_val.to_string(),
396    };
397
398    let time_micro = ScalarValue::try_from_string(
399        string,
400        &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
401    )?;
402    let cast_arr = cast_with_options(
403        &time_micro.to_array()?,
404        field_dt,
405        &CastOptions {
406            safe: false,
407            ..Default::default()
408        },
409    )?;
410    ScalarValue::try_from_array(&cast_arr, 0)
411}
412
413pub(crate) fn to_correct_scalar_value(
414    stat_val: &serde_json::Value,
415    field_dt: &ArrowDataType,
416) -> DataFusionResult<Option<ScalarValue>> {
417    match stat_val {
418        serde_json::Value::Array(_) => Ok(None),
419        serde_json::Value::Object(_) => Ok(None),
420        serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)),
421        serde_json::Value::String(string_val) => match field_dt {
422            ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
423            ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
424            _ => Ok(Some(ScalarValue::try_from_string(
425                string_val.to_owned(),
426                field_dt,
427            )?)),
428        },
429        other => match field_dt {
430            ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
431            ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
432            _ => Ok(Some(ScalarValue::try_from_string(
433                other.to_string(),
434                field_dt,
435            )?)),
436        },
437    }
438}
439
440/// A codec for deltalake physical plans
441#[derive(Debug)]
442pub struct DeltaPhysicalCodec {}
443
444impl PhysicalExtensionCodec for DeltaPhysicalCodec {
445    fn try_decode(
446        &self,
447        buf: &[u8],
448        inputs: &[Arc<dyn ExecutionPlan>],
449        _registry: &TaskContext,
450    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
451        let wire: DeltaScanWire = serde_json::from_reader(buf)
452            .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
453        let delta_scan = DeltaScan::new(
454            &wire.table_url,
455            wire.config,
456            (*inputs)[0].clone(),
457            wire.logical_schema,
458        );
459        Ok(Arc::new(delta_scan))
460    }
461
462    fn try_encode(
463        &self,
464        node: Arc<dyn ExecutionPlan>,
465        buf: &mut Vec<u8>,
466    ) -> Result<(), DataFusionError> {
467        let delta_scan = node
468            .as_any()
469            .downcast_ref::<DeltaScan>()
470            .ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?;
471
472        let wire = DeltaScanWire::from(delta_scan);
473        serde_json::to_writer(buf, &wire)
474            .map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?;
475        Ok(())
476    }
477}
478
479/// Does serde on DeltaTables
480#[derive(Debug)]
481pub struct DeltaLogicalCodec {}
482
483impl LogicalExtensionCodec for DeltaLogicalCodec {
484    fn try_decode(
485        &self,
486        _buf: &[u8],
487        _inputs: &[LogicalPlan],
488        _ctx: &TaskContext,
489    ) -> Result<Extension, DataFusionError> {
490        todo!("DeltaLogicalCodec")
491    }
492
493    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<(), DataFusionError> {
494        todo!("DeltaLogicalCodec")
495    }
496
497    fn try_decode_table_provider(
498        &self,
499        buf: &[u8],
500        _table_ref: &TableReference,
501        _schema: SchemaRef,
502        _ctx: &TaskContext,
503    ) -> Result<Arc<dyn TableProvider>, DataFusionError> {
504        let provider: DeltaScanNext = serde_json::from_slice(buf)
505            .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))?;
506        Ok(Arc::new(provider))
507    }
508
509    fn try_encode_table_provider(
510        &self,
511        _table_ref: &TableReference,
512        node: Arc<dyn TableProvider>,
513        buf: &mut Vec<u8>,
514    ) -> Result<(), DataFusionError> {
515        let scan = node
516            .as_ref()
517            .as_any()
518            .downcast_ref::<DeltaScanNext>()
519            .ok_or_else(|| {
520                DataFusionError::Internal("Can't encode non-delta tables".to_string())
521            })?;
522        serde_json::to_writer(buf, scan)
523            .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
524    }
525}
526
527/// Responsible for creating deltatables
528#[derive(Debug)]
529pub struct DeltaTableFactory {}
530
531#[async_trait::async_trait]
532impl TableProviderFactory for DeltaTableFactory {
533    async fn create(
534        &self,
535        ctx: &dyn Session,
536        cmd: &CreateExternalTable,
537    ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
538        let table = if cmd.options.is_empty() {
539            let table_url = ensure_table_uri(&cmd.to_owned().location)?;
540            open_table(table_url).await?
541        } else {
542            let table_url = ensure_table_uri(&cmd.to_owned().location)?;
543            open_table_with_storage_options(table_url, cmd.to_owned().options).await?
544        };
545        let table_uri = table.log_store().root_url().clone();
546        let (session_state, _) = resolve_session_state(
547            Some(ctx),
548            SessionFallbackPolicy::DeriveFromTrait,
549            || create_session().state(),
550            SessionResolveContext {
551                operation: "DeltaTableFactory::create",
552                table_uri: Some(&table_uri),
553                cdc: false,
554            },
555        )?;
556
557        Ok(table
558            .table_provider()
559            .with_session(Arc::new(session_state))
560            .await?)
561    }
562}
563
564/// A wrapper for Deltafusion's Column to preserve case-sensitivity during string conversion
565pub struct DeltaColumn {
566    inner: Column,
567}
568
569impl From<&str> for DeltaColumn {
570    fn from(c: &str) -> Self {
571        DeltaColumn {
572            inner: Column::from_qualified_name_ignore_case(c),
573        }
574    }
575}
576
577/// Create a column, cloning the string
578impl From<&String> for DeltaColumn {
579    fn from(c: &String) -> Self {
580        DeltaColumn {
581            inner: Column::from_qualified_name_ignore_case(c),
582        }
583    }
584}
585
586/// Create a column, reusing the existing string
587impl From<String> for DeltaColumn {
588    fn from(c: String) -> Self {
589        DeltaColumn {
590            inner: Column::from_qualified_name_ignore_case(c),
591        }
592    }
593}
594
595impl From<DeltaColumn> for Column {
596    fn from(value: DeltaColumn) -> Self {
597        value.inner
598    }
599}
600
601/// Create a column, reusing the existing datafusion column
602impl From<Column> for DeltaColumn {
603    fn from(c: Column) -> Self {
604        DeltaColumn { inner: c }
605    }
606}
607
608#[cfg(test)]
609mod tests {
610    use crate::DeltaTable;
611    use crate::operations::write::SchemaMode;
612    use crate::test_utils::{
613        object_store::{
614            RecordedObjectStoreOperation as ObjectStoreOperation, RecordedPathKind as PathKind,
615            drain_recorded_object_store_operations as drain_recorded_ops, recording_log_store,
616        },
617        open_fs_path,
618    };
619    use crate::writer::test_utils::get_delta_schema;
620    use arrow::array::StructArray;
621    use arrow::datatypes::{Field, Schema};
622    use arrow_array::cast::AsArray;
623    use datafusion::assert_batches_sorted_eq;
624    use datafusion::config::TableParquetOptions;
625    use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
626    use datafusion::datasource::source::DataSourceExec;
627    use datafusion::logical_expr::lit;
628    use datafusion::physical_plan::empty::EmptyExec;
629    use datafusion::physical_plan::{ExecutionPlanVisitor, PhysicalExpr, visit_execution_plan};
630    use datafusion::prelude::{SessionConfig, col};
631    use datafusion_datasource::file::FileSource as _;
632    use datafusion_proto::physical_plan::AsExecutionPlan;
633    use datafusion_proto::protobuf;
634    use delta_kernel::schema::ArrayType;
635    use futures::{StreamExt, TryStreamExt};
636    use object_store::ObjectStoreExt as _;
637    use serde_json::json;
638    use std::ops::Range;
639    use url::Url;
640
641    use super::*;
642    use crate::delta_datafusion::table_provider::next::{FileSelection, MissingFilePolicy};
643    // test deserialization of serialized partition values.
644    // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
645    #[test]
646    fn test_to_correct_scalar_value() {
647        let reference_pairs = &[
648            (
649                json!("2015"),
650                ArrowDataType::Int16,
651                ScalarValue::Int16(Some(2015)),
652            ),
653            (
654                json!("2015"),
655                ArrowDataType::Int32,
656                ScalarValue::Int32(Some(2015)),
657            ),
658            (
659                json!("2015"),
660                ArrowDataType::Int64,
661                ScalarValue::Int64(Some(2015)),
662            ),
663            (
664                json!("2015"),
665                ArrowDataType::Float32,
666                ScalarValue::Float32(Some(2015_f32)),
667            ),
668            (
669                json!("2015"),
670                ArrowDataType::Float64,
671                ScalarValue::Float64(Some(2015_f64)),
672            ),
673            (
674                json!(2015),
675                ArrowDataType::Float64,
676                ScalarValue::Float64(Some(2015_f64)),
677            ),
678            (
679                json!("2015-01-01"),
680                ArrowDataType::Date32,
681                ScalarValue::Date32(Some(16436)),
682            ),
683            // (
684            //     json!("2015-01-01"),
685            //     ArrowDataType::Date64,
686            //     ScalarValue::Date64(Some(16436)),
687            // ),
688            // TODO(roeap) there seem to be differences in how precisions are handled locally and in CI, need to investigate
689            // (
690            //     json!("2020-09-08 13:42:29"),
691            //     ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
692            //     ScalarValue::TimestampNanosecond(Some(1599565349000000000), None),
693            // ),
694            // (
695            //     json!("2020-09-08 13:42:29"),
696            //     ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
697            //     ScalarValue::TimestampMicrosecond(Some(1599565349000000), None),
698            // ),
699            // (
700            //     json!("2020-09-08 13:42:29"),
701            //     ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
702            //     ScalarValue::TimestampMillisecond(Some(1599565349000), None),
703            // ),
704            (
705                json!(true),
706                ArrowDataType::Boolean,
707                ScalarValue::Boolean(Some(true)),
708            ),
709        ];
710
711        for (raw, data_type, ref_scalar) in reference_pairs {
712            let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap();
713            assert_eq!(*ref_scalar, scalar)
714        }
715    }
716
717    #[test]
718    fn roundtrip_test_delta_exec_plan() {
719        let ctx = SessionContext::new();
720        let codec = DeltaPhysicalCodec {};
721
722        let schema = Arc::new(Schema::new(vec![
723            Field::new("a", ArrowDataType::Utf8, false),
724            Field::new("b", ArrowDataType::Int32, false),
725        ]));
726        let exec_plan = Arc::from(DeltaScan::new(
727            &Url::parse("s3://my_bucket/this/is/some/path").unwrap(),
728            DeltaScanConfig::default(),
729            Arc::from(EmptyExec::new(schema.clone())),
730            schema.clone(),
731        ));
732        let proto: protobuf::PhysicalPlanNode =
733            protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
734                .expect("to proto");
735
736        let task_ctx = ctx.task_ctx();
737        let result_exec_plan: Arc<dyn ExecutionPlan> = proto
738            .try_into_physical_plan(&task_ctx, &codec)
739            .expect("from proto");
740        assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
741    }
742
743    #[tokio::test]
744    async fn roundtrip_test_delta_logical_codec_preserves_file_selection() {
745        let log_store = crate::test_utils::TestTables::Simple
746            .table_builder()
747            .unwrap()
748            .build_storage()
749            .unwrap();
750        let snapshot = Arc::new(
751            crate::kernel::Snapshot::try_new(&log_store, Default::default(), None)
752                .await
753                .unwrap(),
754        );
755        let table_root = snapshot
756            .scan_builder()
757            .build()
758            .unwrap()
759            .table_root()
760            .clone();
761        let selected_file_ids: Vec<String> = snapshot
762            .file_views(log_store.as_ref(), None)
763            .take(1)
764            .map_ok(|view| table_root.join(view.path_raw()).unwrap().to_string())
765            .try_collect()
766            .await
767            .unwrap();
768        assert_eq!(selected_file_ids.len(), 1);
769
770        let provider = DeltaScanNext::builder()
771            .with_snapshot(snapshot)
772            .build()
773            .await
774            .unwrap()
775            .with_file_selection(
776                FileSelection::new(selected_file_ids.clone())
777                    .with_missing_file_policy(MissingFilePolicy::Ignore),
778            );
779
780        let codec = DeltaLogicalCodec {};
781        let table_ref = TableReference::bare("delta_table");
782        let mut encoded = Vec::new();
783        codec
784            .try_encode_table_provider(&table_ref, Arc::new(provider), &mut encoded)
785            .unwrap();
786
787        let ctx = SessionContext::new();
788        let decoded = codec
789            .try_decode_table_provider(
790                &encoded,
791                &table_ref,
792                Arc::new(ArrowSchema::empty()),
793                &ctx.task_ctx(),
794            )
795            .unwrap();
796        let decoded_provider = decoded
797            .as_ref()
798            .as_any()
799            .downcast_ref::<DeltaScanNext>()
800            .unwrap();
801
802        let serialized = serde_json::to_value(decoded_provider).unwrap();
803        let decoded_file_ids = serialized
804            .get("file_selection")
805            .and_then(|value| value.get("file_ids"))
806            .and_then(|value| value.as_array())
807            .unwrap()
808            .iter()
809            .map(|value| value.as_str().unwrap().to_string())
810            .collect::<Vec<_>>();
811        let decoded_policy = serialized
812            .get("file_selection")
813            .and_then(|value| value.get("missing_file_policy"))
814            .and_then(|value| value.as_str())
815            .unwrap();
816
817        assert_eq!(decoded_file_ids, selected_file_ids);
818        assert_eq!(decoded_policy, "Ignore");
819    }
820
821    #[tokio::test]
822    async fn delta_table_provider_with_config() {
823        let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
824        let provider = table
825            .table_provider()
826            .with_file_column("file_source")
827            .await
828            .unwrap();
829        let ctx = SessionContext::new();
830        ctx.register_table("test", provider).unwrap();
831
832        let df = ctx
833            .sql("select c3, c1, c2, right(file_source, 77) as file_source from test")
834            .await
835            .unwrap();
836        let actual = df.collect().await.unwrap();
837        let expected = vec![
838            "+----+----+----+-------------------------------------------------------------------------------+",
839            "| c3 | c1 | c2 | file_source                                                                   |",
840            "+----+----+----+-------------------------------------------------------------------------------+",
841            "| 4  | 6  | a  | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
842            "| 5  | 4  | c  | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
843            "| 6  | 5  | b  | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
844            "+----+----+----+-------------------------------------------------------------------------------+",
845        ];
846        assert_batches_sorted_eq!(&expected, &actual);
847    }
848
849    #[tokio::test]
850    async fn delta_scan_mixed_partition_order() {
851        // Tests issue (1787) where partition columns were incorrect when they
852        // have a different order in the metadata and table schema
853        let schema = Arc::new(ArrowSchema::new(vec![
854            Field::new("modified", ArrowDataType::Utf8, true),
855            Field::new("id", ArrowDataType::Utf8, true),
856            Field::new("value", ArrowDataType::Int32, true),
857        ]));
858
859        let table = DeltaTable::new_in_memory()
860            .create()
861            .with_columns(get_delta_schema().fields().cloned())
862            .with_partition_columns(["modified", "id"])
863            .await
864            .unwrap();
865        assert_eq!(table.version(), Some(0));
866
867        let batch = RecordBatch::try_new(
868            schema.clone(),
869            vec![
870                Arc::new(arrow::array::StringArray::from(vec![
871                    "2021-02-01",
872                    "2021-02-01",
873                    "2021-02-02",
874                    "2021-02-02",
875                ])),
876                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
877                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
878            ],
879        )
880        .unwrap();
881        // write some data
882        let table = table
883            .write(vec![batch.clone()])
884            .with_save_mode(crate::protocol::SaveMode::Append)
885            .await
886            .unwrap();
887
888        let provider = table.table_provider().await.unwrap();
889        let logical_schema = provider.schema();
890        let ctx = SessionContext::new();
891        ctx.runtime_env().register_object_store(
892            table.log_store().root_url(),
893            table.log_store().object_store(None),
894        );
895        ctx.register_table("test", provider).unwrap();
896
897        let expected_logical_order = vec!["id", "value", "modified"];
898        let actual_order: Vec<String> = logical_schema
899            .fields()
900            .iter()
901            .map(|f| f.name().to_owned())
902            .collect();
903
904        let df = ctx.sql("select * from test").await.unwrap();
905        let actual = df.collect().await.unwrap();
906        let expected = vec![
907            "+----+-------+------------+",
908            "| id | value | modified   |",
909            "+----+-------+------------+",
910            "| A  | 1     | 2021-02-01 |",
911            "| B  | 10    | 2021-02-01 |",
912            "| C  | 20    | 2021-02-02 |",
913            "| D  | 100   | 2021-02-02 |",
914            "+----+-------+------------+",
915        ];
916        assert_batches_sorted_eq!(&expected, &actual);
917        assert_eq!(expected_logical_order, actual_order);
918    }
919
920    #[tokio::test]
921    async fn delta_scan_case_sensitive() {
922        let schema = Arc::new(ArrowSchema::new(vec![
923            Field::new("moDified", ArrowDataType::Utf8, true),
924            Field::new("ID", ArrowDataType::Utf8, true),
925            Field::new("vaLue", ArrowDataType::Int32, true),
926        ]));
927
928        let batch = RecordBatch::try_new(
929            schema.clone(),
930            vec![
931                Arc::new(arrow::array::StringArray::from(vec![
932                    "2021-02-01",
933                    "2021-02-01",
934                    "2021-02-02",
935                    "2021-02-02",
936                ])),
937                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
938                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
939            ],
940        )
941        .unwrap();
942        // write some data
943        let table = DeltaTable::new_in_memory()
944            .write(vec![batch.clone()])
945            .with_save_mode(crate::protocol::SaveMode::Append)
946            .await
947            .unwrap();
948
949        let config = DeltaScanConfigBuilder::new()
950            .build(table.snapshot().unwrap().snapshot())
951            .unwrap();
952        let log = table.log_store();
953
954        let provider =
955            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
956                .unwrap();
957        let ctx: SessionContext = DeltaSessionContext::default().into();
958        ctx.register_table("test", Arc::new(provider)).unwrap();
959
960        let df = ctx
961            .sql("select ID, moDified, vaLue from test")
962            .await
963            .unwrap();
964        let actual = df.collect().await.unwrap();
965        let expected = vec![
966            "+----+------------+-------+",
967            "| ID | moDified   | vaLue |",
968            "+----+------------+-------+",
969            "| A  | 2021-02-01 | 1     |",
970            "| B  | 2021-02-01 | 10    |",
971            "| C  | 2021-02-02 | 20    |",
972            "| D  | 2021-02-02 | 100   |",
973            "+----+------------+-------+",
974        ];
975        assert_batches_sorted_eq!(&expected, &actual);
976
977        /* TODO: Datafusion doesn't have any options to prevent case-sensitivity with the col func */
978        /*
979        let df = ctx
980            .table("test")
981            .await
982            .unwrap()
983            .select(vec![col("ID"), col("moDified"), col("vaLue")])
984            .unwrap();
985        let actual = df.collect().await.unwrap();
986        assert_batches_sorted_eq!(&expected, &actual);
987        */
988    }
989
990    #[tokio::test]
991    async fn delta_scan_supports_missing_columns() {
992        let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
993            "col_1",
994            ArrowDataType::Utf8,
995            true,
996        )]));
997
998        let batch1 = RecordBatch::try_new(
999            schema1.clone(),
1000            vec![Arc::new(arrow::array::StringArray::from(vec![
1001                Some("A"),
1002                Some("B"),
1003            ]))],
1004        )
1005        .unwrap();
1006
1007        let schema2 = Arc::new(ArrowSchema::new(vec![
1008            Field::new("col_1", ArrowDataType::Utf8, true),
1009            Field::new("col_2", ArrowDataType::Utf8, true),
1010        ]));
1011
1012        let batch2 = RecordBatch::try_new(
1013            schema2.clone(),
1014            vec![
1015                Arc::new(arrow::array::StringArray::from(vec![
1016                    Some("E"),
1017                    Some("F"),
1018                    Some("G"),
1019                ])),
1020                Arc::new(arrow::array::StringArray::from(vec![
1021                    Some("E2"),
1022                    Some("F2"),
1023                    Some("G2"),
1024                ])),
1025            ],
1026        )
1027        .unwrap();
1028
1029        let table = DeltaTable::new_in_memory()
1030            .write(vec![batch2])
1031            .with_save_mode(crate::protocol::SaveMode::Append)
1032            .await
1033            .unwrap();
1034
1035        let table = table
1036            .write(vec![batch1])
1037            .with_schema_mode(SchemaMode::Merge)
1038            .with_save_mode(crate::protocol::SaveMode::Append)
1039            .await
1040            .unwrap();
1041
1042        let config = DeltaScanConfigBuilder::new()
1043            .build(table.snapshot().unwrap().snapshot())
1044            .unwrap();
1045        let log = table.log_store();
1046
1047        let provider =
1048            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1049                .unwrap();
1050        let ctx: SessionContext = DeltaSessionContext::default().into();
1051        ctx.register_table("test", Arc::new(provider)).unwrap();
1052
1053        let df = ctx.sql("select col_1, col_2 from test").await.unwrap();
1054        let actual = df.collect().await.unwrap();
1055        let expected = vec![
1056            "+-------+-------+",
1057            "| col_1 | col_2 |",
1058            "+-------+-------+",
1059            "| A     |       |",
1060            "| B     |       |",
1061            "| E     | E2    |",
1062            "| F     | F2    |",
1063            "| G     | G2    |",
1064            "+-------+-------+",
1065        ];
1066        assert_batches_sorted_eq!(&expected, &actual);
1067    }
1068
1069    #[tokio::test]
1070    async fn delta_scan_supports_pushdown() {
1071        let schema = Arc::new(ArrowSchema::new(vec![
1072            Field::new("col_1", ArrowDataType::Utf8, false),
1073            Field::new("col_2", ArrowDataType::Utf8, false),
1074        ]));
1075
1076        let batch = RecordBatch::try_new(
1077            schema.clone(),
1078            vec![
1079                Arc::new(arrow::array::StringArray::from(vec![
1080                    Some("A"),
1081                    Some("B"),
1082                    Some("C"),
1083                ])),
1084                Arc::new(arrow::array::StringArray::from(vec![
1085                    Some("A2"),
1086                    Some("B2"),
1087                    Some("C2"),
1088                ])),
1089            ],
1090        )
1091        .unwrap();
1092
1093        let table = DeltaTable::new_in_memory()
1094            .write(vec![batch])
1095            .with_save_mode(crate::protocol::SaveMode::Append)
1096            .await
1097            .unwrap();
1098
1099        let config = DeltaScanConfigBuilder::new()
1100            .build(table.snapshot().unwrap().snapshot())
1101            .unwrap();
1102        let log = table.log_store();
1103
1104        let provider =
1105            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1106                .unwrap();
1107
1108        let mut cfg = SessionConfig::default();
1109        cfg.options_mut().execution.parquet.pushdown_filters = true;
1110        let ctx = SessionContext::new_with_config(cfg);
1111        ctx.register_table("test", Arc::new(provider)).unwrap();
1112
1113        let df = ctx
1114            .sql("select col_1, col_2 from test WHERE col_1 = 'A'")
1115            .await
1116            .unwrap();
1117        let actual = df.collect().await.unwrap();
1118        let expected = vec![
1119            "+-------+-------+",
1120            "| col_1 | col_2 |",
1121            "+-------+-------+",
1122            "| A     | A2    |",
1123            "+-------+-------+",
1124        ];
1125        assert_batches_sorted_eq!(&expected, &actual);
1126    }
1127
1128    #[tokio::test]
1129    async fn delta_scan_supports_nested_missing_columns() {
1130        let column1_schema1: arrow::datatypes::Fields =
1131            vec![Field::new("col_1a", ArrowDataType::Utf8, true)].into();
1132        let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1133            "col_1",
1134            ArrowDataType::Struct(column1_schema1.clone()),
1135            true,
1136        )]));
1137
1138        let batch1 = RecordBatch::try_new(
1139            schema1.clone(),
1140            vec![Arc::new(StructArray::new(
1141                column1_schema1,
1142                vec![Arc::new(arrow::array::StringArray::from(vec![
1143                    Some("A"),
1144                    Some("B"),
1145                ]))],
1146                None,
1147            ))],
1148        )
1149        .unwrap();
1150
1151        let column1_schema2: arrow_schema::Fields = vec![
1152            Field::new("col_1a", ArrowDataType::Utf8, true),
1153            Field::new("col_1b", ArrowDataType::Utf8, true),
1154        ]
1155        .into();
1156        let schema2 = Arc::new(ArrowSchema::new(vec![Field::new(
1157            "col_1",
1158            ArrowDataType::Struct(column1_schema2.clone()),
1159            true,
1160        )]));
1161
1162        let batch2 = RecordBatch::try_new(
1163            schema2.clone(),
1164            vec![Arc::new(StructArray::new(
1165                column1_schema2,
1166                vec![
1167                    Arc::new(arrow::array::StringArray::from(vec![
1168                        Some("E"),
1169                        Some("F"),
1170                        Some("G"),
1171                    ])),
1172                    Arc::new(arrow::array::StringArray::from(vec![
1173                        Some("E2"),
1174                        Some("F2"),
1175                        Some("G2"),
1176                    ])),
1177                ],
1178                None,
1179            ))],
1180        )
1181        .unwrap();
1182
1183        let table = DeltaTable::new_in_memory()
1184            .write(vec![batch1])
1185            .with_save_mode(crate::protocol::SaveMode::Append)
1186            .await
1187            .unwrap();
1188
1189        let table = table
1190            .write(vec![batch2])
1191            .with_schema_mode(SchemaMode::Merge)
1192            .with_save_mode(crate::protocol::SaveMode::Append)
1193            .await
1194            .unwrap();
1195
1196        let config = DeltaScanConfigBuilder::new()
1197            .build(table.snapshot().unwrap().snapshot())
1198            .unwrap();
1199        let log = table.log_store();
1200
1201        let provider =
1202            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1203                .unwrap();
1204        let ctx: SessionContext = DeltaSessionContext::default().into();
1205        ctx.register_table("test", Arc::new(provider)).unwrap();
1206
1207        let df = ctx
1208            .sql("select col_1.col_1a, col_1.col_1b from test")
1209            .await
1210            .unwrap();
1211        let actual = df.collect().await.unwrap();
1212        let expected = vec![
1213            "+--------------------+--------------------+",
1214            "| test.col_1[col_1a] | test.col_1[col_1b] |",
1215            "+--------------------+--------------------+",
1216            "| A                  |                    |",
1217            "| B                  |                    |",
1218            "| E                  | E2                 |",
1219            "| F                  | F2                 |",
1220            "| G                  | G2                 |",
1221            "+--------------------+--------------------+",
1222        ];
1223        assert_batches_sorted_eq!(&expected, &actual);
1224    }
1225
1226    #[tokio::test]
1227    async fn test_multiple_predicate_pushdown() {
1228        let schema = Arc::new(ArrowSchema::new(vec![
1229            Field::new("moDified", ArrowDataType::Utf8, true),
1230            Field::new("id", ArrowDataType::Utf8, true),
1231            Field::new("vaLue", ArrowDataType::Int32, true),
1232        ]));
1233
1234        let batch = RecordBatch::try_new(
1235            schema.clone(),
1236            vec![
1237                Arc::new(arrow::array::StringArray::from(vec![
1238                    "2021-02-01",
1239                    "2021-02-01",
1240                    "2021-02-02",
1241                    "2021-02-02",
1242                ])),
1243                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1244                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1245            ],
1246        )
1247        .unwrap();
1248        // write some data
1249        let table = DeltaTable::new_in_memory()
1250            .write(vec![batch.clone()])
1251            .with_save_mode(crate::protocol::SaveMode::Append)
1252            .await
1253            .unwrap();
1254
1255        let datafusion = SessionContext::new();
1256        table
1257            .update_datafusion_session(&datafusion.state())
1258            .unwrap();
1259
1260        datafusion
1261            .register_table("snapshot", table.table_provider().await.unwrap())
1262            .unwrap();
1263
1264        let df = datafusion
1265            .sql("select * from snapshot where id > 10000 and id < 20000")
1266            .await
1267            .unwrap();
1268
1269        df.collect().await.unwrap();
1270    }
1271
1272    #[tokio::test]
1273    async fn test_delta_scan_builder_no_scan_config() {
1274        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1275        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1276        let table = DeltaTable::new_in_memory()
1277            .write(vec![batch])
1278            .with_save_mode(crate::protocol::SaveMode::Append)
1279            .await
1280            .unwrap();
1281
1282        let ctx = SessionContext::new();
1283        let state = ctx.state();
1284        let scan = table_provider::DeltaScanBuilder::new(
1285            table.snapshot().unwrap().snapshot(),
1286            table.log_store(),
1287            &state,
1288        )
1289        .with_filter(Some(col("a").eq(lit("s"))))
1290        .build()
1291        .await
1292        .unwrap();
1293
1294        let mut visitor = ParquetVisitor::default();
1295        visit_execution_plan(&scan, &mut visitor).unwrap();
1296
1297        assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
1298    }
1299
1300    #[tokio::test]
1301    async fn test_delta_scan_builder_scan_config_disable_pushdown() {
1302        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1303        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1304        let table = DeltaTable::new_in_memory()
1305            .write(vec![batch])
1306            .with_save_mode(crate::protocol::SaveMode::Append)
1307            .await
1308            .unwrap();
1309
1310        let snapshot = table.snapshot().unwrap();
1311        let ctx = SessionContext::new();
1312        let state = ctx.state();
1313        let scan =
1314            table_provider::DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1315                .with_filter(Some(col("a").eq(lit("s"))))
1316                .with_scan_config(
1317                    DeltaScanConfigBuilder::new()
1318                        .with_parquet_pushdown(false)
1319                        .build(snapshot.snapshot())
1320                        .unwrap(),
1321                )
1322                .build()
1323                .await
1324                .unwrap();
1325
1326        let mut visitor = ParquetVisitor::default();
1327        visit_execution_plan(&scan, &mut visitor).unwrap();
1328
1329        assert!(visitor.predicate.is_none());
1330    }
1331
1332    #[tokio::test]
1333    async fn test_delta_scan_applies_parquet_options() {
1334        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1335        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1336        let table = DeltaTable::new_in_memory()
1337            .write(vec![batch])
1338            .with_save_mode(crate::protocol::SaveMode::Append)
1339            .await
1340            .unwrap();
1341
1342        let snapshot = table.snapshot().unwrap();
1343
1344        let mut config = SessionConfig::default();
1345        config.options_mut().execution.parquet.pushdown_filters = true;
1346        let ctx = SessionContext::new_with_config(config);
1347        let state = ctx.state();
1348
1349        let scan =
1350            table_provider::DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1351                .build()
1352                .await
1353                .unwrap();
1354
1355        let mut visitor = ParquetVisitor::default();
1356        visit_execution_plan(&scan, &mut visitor).unwrap();
1357
1358        assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
1359    }
1360
1361    /// Extracts fields from the parquet scan
1362    #[derive(Default)]
1363    struct ParquetVisitor {
1364        predicate: Option<Arc<dyn PhysicalExpr>>,
1365        options: Option<TableParquetOptions>,
1366    }
1367
1368    impl ExecutionPlanVisitor for ParquetVisitor {
1369        type Error = DataFusionError;
1370
1371        fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
1372            let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
1373                return Ok(true);
1374            };
1375
1376            let Some(scan_config) = datasource_exec
1377                .data_source()
1378                .as_any()
1379                .downcast_ref::<FileScanConfig>()
1380            else {
1381                return Ok(true);
1382            };
1383
1384            if let Some(parquet_source) = scan_config
1385                .file_source
1386                .as_any()
1387                .downcast_ref::<ParquetSource>()
1388            {
1389                self.options = Some(parquet_source.table_parquet_options().clone());
1390                self.predicate = parquet_source.filter();
1391            }
1392
1393            Ok(true)
1394        }
1395    }
1396
1397    // Run a query that filters out all files and sorts.
1398    // Verify that it returns an empty set of rows without panicking.
1399    //
1400    // Historically, we had a bug that caused us to emit a query plan with 0 partitions, which
1401    // datafusion rejected.
1402    #[tokio::test]
1403    async fn passes_sanity_checker_when_all_files_filtered() {
1404        let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
1405        let ctx = create_session().into_inner();
1406        ctx.register_table("test", table.table_provider().await.unwrap())
1407            .unwrap();
1408
1409        let df = ctx
1410            .sql("select * from test where c3 = 100 ORDER BY c1 ASC")
1411            .await
1412            .unwrap();
1413        let actual = df.collect().await.unwrap();
1414
1415        assert_eq!(actual.len(), 0);
1416    }
1417
1418    #[tokio::test]
1419    async fn test_delta_scan_uses_parquet_column_pruning() {
1420        let small: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["a"]));
1421        let large: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec![
1422            "b".repeat(1024).as_str(),
1423        ]));
1424        let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap();
1425        let table = DeltaTable::new_in_memory()
1426            .write(vec![batch])
1427            .with_save_mode(crate::protocol::SaveMode::Append)
1428            .await
1429            .unwrap();
1430
1431        let config = DeltaScanConfigBuilder::new()
1432            .build(table.snapshot().unwrap().snapshot())
1433            .unwrap();
1434
1435        let (log_store, mut operations) = recording_log_store(table.log_store());
1436        let provider = DeltaTableProvider::try_new(
1437            table.snapshot().unwrap().snapshot().clone(),
1438            log_store.clone(),
1439            config,
1440        )
1441        .unwrap();
1442        let ctx = SessionContext::new();
1443        ctx.register_table("test", Arc::new(provider)).unwrap();
1444        let state = ctx.state();
1445
1446        let df = ctx.sql("select small from test").await.unwrap();
1447        let plan = df.create_physical_plan().await.unwrap();
1448
1449        let mut stream = plan.execute(0, state.task_ctx()).unwrap();
1450        let Some(Ok(batch)) = stream.next().await else {
1451            panic!()
1452        };
1453        assert!(stream.next().await.is_none());
1454        assert_eq!(1, batch.num_columns());
1455        assert_eq!(1, batch.num_rows());
1456        let small = batch.column_by_name("small").unwrap().as_string::<i32>();
1457        assert_eq!("a", small.iter().next().unwrap().unwrap());
1458
1459        let files = table.get_files_by_partitions(&[]).await.unwrap();
1460        assert_eq!(1, files.len());
1461        let object_store = table.object_store();
1462        let file_meta = object_store.head(&files[0]).await.unwrap();
1463        let file_reader = parquet::arrow::async_reader::ParquetObjectReader::new(
1464            object_store,
1465            file_meta.location.clone(),
1466        )
1467        .with_file_size(file_meta.size);
1468        let parquet_metadata =
1469            parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder::new(file_reader)
1470                .await
1471                .unwrap()
1472                .metadata()
1473                .as_ref()
1474                .clone();
1475        let (small_start, small_len) = parquet_metadata.row_group(0).column(0).byte_range();
1476        let small_range = small_start..small_start + small_len;
1477        let (large_start, large_len) = parquet_metadata.row_group(0).column(1).byte_range();
1478        let large_range = large_start..large_start + large_len;
1479
1480        let actual = drain_recorded_ops(&mut operations).await;
1481
1482        let data_ranges = actual
1483            .iter()
1484            .flat_map(|operation| match operation {
1485                ObjectStoreOperation::GetRange(PathKind::Data, range) => vec![range.clone()],
1486                ObjectStoreOperation::GetRanges(PathKind::Data, ranges) => ranges.clone(),
1487                _ => Vec::new(),
1488            })
1489            .collect::<Vec<_>>();
1490
1491        let overlaps = |left: &Range<u64>, right: &Range<u64>| {
1492            left.start < right.end && right.start < left.end
1493        };
1494
1495        assert!(
1496            !data_ranges.is_empty(),
1497            "expected ranged parquet data reads, saw {actual:?}"
1498        );
1499        assert!(
1500            data_ranges
1501                .iter()
1502                .any(|range| overlaps(range, &small_range)),
1503            "expected selected column chunk {small_range:?} to be read, saw {actual:?}"
1504        );
1505        assert!(
1506            data_ranges
1507                .iter()
1508                .all(|range| !overlaps(range, &large_range)),
1509            "expected unselected column chunk {large_range:?} to be pruned, saw {actual:?}"
1510        );
1511        assert!(
1512            !actual.iter().any(|operation| matches!(
1513                operation,
1514                ObjectStoreOperation::Get(PathKind::Data)
1515                    | ObjectStoreOperation::GetOpts(PathKind::Data)
1516            )),
1517            "expected no full data file reads, saw {actual:?}"
1518        );
1519    }
1520
1521    #[tokio::test]
1522    async fn test_push_down_filter_panic_2602() -> DeltaResult<()> {
1523        use crate::kernel::schema::{DataType, PrimitiveType};
1524        let ctx = SessionContext::new();
1525        let table = DeltaTable::new_in_memory()
1526            .create()
1527            .with_column("id", DataType::Primitive(PrimitiveType::Long), true, None)
1528            .with_column(
1529                "name",
1530                DataType::Primitive(PrimitiveType::String),
1531                true,
1532                None,
1533            )
1534            .with_column("b", DataType::Primitive(PrimitiveType::Boolean), true, None)
1535            .with_column(
1536                "ts",
1537                DataType::Primitive(PrimitiveType::Timestamp),
1538                true,
1539                None,
1540            )
1541            .with_column("dt", DataType::Primitive(PrimitiveType::Date), true, None)
1542            .with_column(
1543                "zap",
1544                DataType::Array(Box::new(ArrayType::new(
1545                    DataType::Primitive(PrimitiveType::Boolean),
1546                    true,
1547                ))),
1548                true,
1549                None,
1550            )
1551            .await?;
1552        table.update_datafusion_session(&ctx.state()).unwrap();
1553
1554        ctx.register_table("snapshot", table.table_provider().await.unwrap())
1555            .unwrap();
1556
1557        let df = ctx
1558            .sql("select * from snapshot where id > 10000 and id < 20000")
1559            .await
1560            .unwrap();
1561
1562        let _ = df.collect().await?;
1563        Ok(())
1564    }
1565}