Skip to main content

deltalake_core/delta_datafusion/
mod.rs

1//! Datafusion integration for Delta Table
2//!
3//! Example:
4//!
5//! ```rust
6//! use std::sync::Arc;
7//! use datafusion::execution::context::SessionContext;
8//!
9//! async {
10//!   let mut ctx = SessionContext::new();
11//!   let table = deltalake_core::open_table_with_storage_options(
12//!       url::Url::parse("memory://").unwrap(),
13//!       std::collections::HashMap::new()
14//!   )
15//!       .await
16//!       .unwrap();
17//!   ctx.register_table("demo", table.table_provider().await.unwrap()).unwrap();
18//!
19//!   let batches = ctx
20//!       .sql("SELECT * FROM demo").await.unwrap()
21//!       .collect()
22//!       .await.unwrap();
23//! };
24//! ```
25
26use std::fmt::Debug;
27use std::sync::Arc;
28
29use arrow::array::types::UInt16Type;
30use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
31use arrow_cast::{CastOptions, cast_with_options};
32use arrow_schema::{
33    DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef,
34    TimeUnit,
35};
36use datafusion::catalog::{Session, TableProviderFactory};
37use datafusion::common::scalar::ScalarValue;
38use datafusion::common::{
39    Column, DFSchema, DataFusionError, Result as DataFusionResult, TableReference, ToDFSchema,
40};
41use datafusion::datasource::TableProvider;
42use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
43use datafusion::execution::TaskContext;
44use datafusion::execution::context::SessionContext;
45use datafusion::logical_expr::logical_plan::CreateExternalTable;
46use datafusion::logical_expr::utils::conjunction;
47use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
48use datafusion::physical_optimizer::pruning::PruningPredicate;
49use datafusion::physical_plan::ExecutionPlan;
50use datafusion_proto::logical_plan::LogicalExtensionCodec;
51use datafusion_proto::physical_plan::PhysicalExtensionCodec;
52use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
53use either::Either;
54
55use crate::delta_datafusion::expr::parse_predicate_expression;
56use crate::delta_datafusion::table_provider::{DeltaScan, DeltaScanWire};
57use crate::ensure_table_uri;
58use crate::errors::{DeltaResult, DeltaTableError};
59use crate::kernel::{Add, EagerSnapshot, LogDataHandler, Snapshot};
60use crate::{open_table, open_table_with_storage_options};
61
62pub(crate) use self::session::DeltaSessionExt;
63pub use self::session::{
64    DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, DeltaSessionContext,
65    create_session, create_session_state_with_spill_config,
66};
67pub use self::table_provider::next::{DeletionVectorSelection, DeltaScan as DeltaScanNext};
68pub(crate) use self::utils::*;
69pub use cdf::scan::DeltaCdfTableProvider;
70pub(crate) use data_validation::{
71    DataValidationExec, constraints_to_exprs, generated_columns_to_exprs, validation_predicates,
72};
73pub(crate) use find_files::*;
74pub(crate) use table_provider::next::normalize_path_as_file_id;
75pub use table_provider::{
76    DeltaScanConfig, DeltaScanConfigBuilder, TableProviderBuilder, next::DeltaScanExec,
77};
78pub(crate) use table_provider::{
79    next::FILE_ID_COLUMN_DEFAULT, resolve_file_column_name, update_datafusion_session,
80};
81
82pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
83
84#[doc(hidden)]
85pub mod bench_support;
86pub mod cdf;
87mod data_validation;
88pub mod engine;
89pub mod expr;
90mod file_id;
91mod find_files;
92pub mod logical;
93pub mod physical;
94pub mod planner;
95mod session;
96pub use session::SessionFallbackPolicy;
97pub(crate) use session::{SessionResolveContext, resolve_session_state};
98mod table_provider;
99pub(crate) mod utils;
100
101impl From<DeltaTableError> for DataFusionError {
102    fn from(err: DeltaTableError) -> Self {
103        match err {
104            DeltaTableError::Arrow { source } => DataFusionError::from(source),
105            DeltaTableError::Io { source } => DataFusionError::IoError(source),
106            DeltaTableError::ObjectStore { source } => DataFusionError::from(source),
107            DeltaTableError::Parquet { source } => DataFusionError::from(source),
108            _ => DataFusionError::External(Box::new(err)),
109        }
110    }
111}
112
113impl From<DataFusionError> for DeltaTableError {
114    fn from(err: DataFusionError) -> Self {
115        match err {
116            DataFusionError::ArrowError(source, _) => DeltaTableError::from(*source),
117            DataFusionError::IoError(source) => DeltaTableError::Io { source },
118            DataFusionError::ObjectStore(source) => DeltaTableError::from(*source),
119            DataFusionError::ParquetError(source) => DeltaTableError::from(*source),
120            _ => DeltaTableError::Generic(err.to_string()),
121        }
122    }
123}
124
125/// Convenience trait for calling common methods on snapshot hierarchies
126pub trait DataFusionMixins {
127    /// The physical datafusion schema of a table
128    fn read_schema(&self) -> ArrowSchemaRef;
129
130    /// Get the table schema as an [`ArrowSchemaRef`]
131    fn input_schema(&self) -> ArrowSchemaRef;
132
133    /// Parse an expression string into a datafusion [`Expr`]
134    fn parse_predicate_expression(
135        &self,
136        expr: impl AsRef<str>,
137        session: &dyn Session,
138    ) -> DeltaResult<Expr>;
139}
140
141impl DataFusionMixins for Snapshot {
142    fn read_schema(&self) -> ArrowSchemaRef {
143        _arrow_schema(
144            self.arrow_schema(),
145            self.metadata().partition_columns(),
146            true,
147        )
148    }
149
150    fn input_schema(&self) -> ArrowSchemaRef {
151        _arrow_schema(
152            self.arrow_schema(),
153            self.metadata().partition_columns(),
154            false,
155        )
156    }
157
158    fn parse_predicate_expression(
159        &self,
160        expr: impl AsRef<str>,
161        session: &dyn Session,
162    ) -> DeltaResult<Expr> {
163        let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
164        parse_predicate_expression(&schema, expr, session)
165    }
166}
167
168impl DataFusionMixins for LogDataHandler<'_> {
169    fn read_schema(&self) -> ArrowSchemaRef {
170        _arrow_schema(
171            Arc::new(
172                self.table_configuration()
173                    .logical_schema()
174                    .as_ref()
175                    .try_into_arrow()
176                    .unwrap(),
177            ),
178            self.table_configuration().metadata().partition_columns(),
179            true,
180        )
181    }
182
183    fn input_schema(&self) -> ArrowSchemaRef {
184        _arrow_schema(
185            Arc::new(
186                self.table_configuration()
187                    .logical_schema()
188                    .as_ref()
189                    .try_into_arrow()
190                    .unwrap(),
191            ),
192            self.table_configuration().metadata().partition_columns(),
193            false,
194        )
195    }
196
197    fn parse_predicate_expression(
198        &self,
199        expr: impl AsRef<str>,
200        session: &dyn Session,
201    ) -> DeltaResult<Expr> {
202        let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
203        parse_predicate_expression(&schema, expr, session)
204    }
205}
206
207impl DataFusionMixins for EagerSnapshot {
208    fn read_schema(&self) -> ArrowSchemaRef {
209        self.snapshot().read_schema()
210    }
211
212    fn input_schema(&self) -> ArrowSchemaRef {
213        self.snapshot().input_schema()
214    }
215
216    fn parse_predicate_expression(
217        &self,
218        expr: impl AsRef<str>,
219        session: &dyn Session,
220    ) -> DeltaResult<Expr> {
221        self.snapshot().parse_predicate_expression(expr, session)
222    }
223}
224
225fn _arrow_schema(
226    schema: SchemaRef,
227    partition_columns: &[String],
228    wrap_partitions: bool,
229) -> ArrowSchemaRef {
230    let fields = schema
231        .fields()
232        .into_iter()
233        .filter(|f| !partition_columns.contains(&f.name().to_string()))
234        .cloned()
235        .chain(
236            // We need stable order between logical and physical schemas, but the order of
237            // partitioning columns is not always the same in the json schema and the array
238            partition_columns.iter().map(|partition_col| {
239                let field = schema.field_with_name(partition_col).unwrap();
240                let corrected = if wrap_partitions {
241                    match field.data_type() {
242                        // Only dictionary-encode types that may be large
243                        // https://github.com/apache/arrow-datafusion/pull/5545
244                        ArrowDataType::Utf8
245                        | ArrowDataType::LargeUtf8
246                        | ArrowDataType::Binary
247                        | ArrowDataType::LargeBinary => {
248                            wrap_partition_type_in_dict(field.data_type().clone())
249                        }
250                        _ => field.data_type().clone(),
251                    }
252                } else {
253                    field.data_type().clone()
254                };
255                Arc::new(field.clone().with_data_type(corrected))
256            }),
257        )
258        .collect::<Vec<_>>();
259    Arc::new(ArrowSchema::new(fields))
260}
261
262pub(crate) fn files_matching_predicate<'a>(
263    log_data: LogDataHandler<'a>,
264    filters: &[Expr],
265) -> DeltaResult<impl Iterator<Item = Add> + 'a> {
266    if let Some(Some(predicate)) =
267        (!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
268    {
269        let expr = SessionContext::new()
270            .create_physical_expr(predicate, &log_data.read_schema().to_dfschema()?)?;
271        let pruning_predicate = PruningPredicate::try_new(expr, log_data.read_schema())?;
272        let mask = pruning_predicate.prune(&log_data)?;
273
274        Ok(Either::Left(log_data.into_iter().zip(mask).filter_map(
275            |(file, keep_file)| {
276                if keep_file { Some(file.to_add()) } else { None }
277            },
278        )))
279    } else {
280        Ok(Either::Right(
281            log_data.into_iter().map(|file| file.to_add()),
282        ))
283    }
284}
285
286pub(crate) fn get_path_column<'a>(
287    batch: &'a RecordBatch,
288    path_column: &str,
289) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
290    let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
291    batch
292        .column_by_name(path_column)
293        .ok_or_else(err)?
294        .as_any()
295        .downcast_ref::<DictionaryArray<UInt16Type>>()
296        .ok_or_else(err)?
297        .downcast_dict::<StringArray>()
298        .ok_or_else(err)
299}
300
301pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
302    match t {
303        ArrowDataType::Null => Ok(ScalarValue::Null),
304        ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
305        ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
306        ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
307        ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
308        ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
309        ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
310        ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
311        ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
312        ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
313        ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
314        ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
315        ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
316        ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
317        ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
318        ArrowDataType::FixedSizeBinary(size) => {
319            Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
320        }
321        ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
322        ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
323        ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
324        ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
325            None,
326            precision.to_owned(),
327            scale.to_owned(),
328        )),
329        ArrowDataType::Timestamp(unit, tz) => {
330            let tz = tz.to_owned();
331            Ok(match unit {
332                TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
333                TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
334                TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
335                TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
336            })
337        }
338        ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
339            k.clone(),
340            Box::new(get_null_of_arrow_type(v)?),
341        )),
342        //Unsupported types...
343        ArrowDataType::Float16
344        | ArrowDataType::Decimal32(_, _)
345        | ArrowDataType::Decimal64(_, _)
346        | ArrowDataType::Decimal256(_, _)
347        | ArrowDataType::Union(_, _)
348        | ArrowDataType::LargeList(_)
349        | ArrowDataType::Struct(_)
350        | ArrowDataType::List(_)
351        | ArrowDataType::FixedSizeList(_, _)
352        | ArrowDataType::Time32(_)
353        | ArrowDataType::Time64(_)
354        | ArrowDataType::Duration(_)
355        | ArrowDataType::Interval(_)
356        | ArrowDataType::RunEndEncoded(_, _)
357        | ArrowDataType::BinaryView
358        | ArrowDataType::Utf8View
359        | ArrowDataType::LargeListView(_)
360        | ArrowDataType::ListView(_)
361        | ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
362            "Unsupported data type for Delta Lake {t}"
363        ))),
364    }
365}
366
367fn parse_date(
368    stat_val: &serde_json::Value,
369    field_dt: &ArrowDataType,
370) -> DataFusionResult<ScalarValue> {
371    let string = match stat_val {
372        serde_json::Value::String(s) => s.to_owned(),
373        _ => stat_val.to_string(),
374    };
375
376    let time_micro = ScalarValue::try_from_string(string, &ArrowDataType::Date32)?;
377    let cast_arr = cast_with_options(
378        &time_micro.to_array()?,
379        field_dt,
380        &CastOptions {
381            safe: false,
382            ..Default::default()
383        },
384    )?;
385    ScalarValue::try_from_array(&cast_arr, 0)
386}
387
388fn parse_timestamp(
389    stat_val: &serde_json::Value,
390    field_dt: &ArrowDataType,
391) -> DataFusionResult<ScalarValue> {
392    let string = match stat_val {
393        serde_json::Value::String(s) => s.to_owned(),
394        _ => stat_val.to_string(),
395    };
396
397    let time_micro = ScalarValue::try_from_string(
398        string,
399        &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
400    )?;
401    let cast_arr = cast_with_options(
402        &time_micro.to_array()?,
403        field_dt,
404        &CastOptions {
405            safe: false,
406            ..Default::default()
407        },
408    )?;
409    ScalarValue::try_from_array(&cast_arr, 0)
410}
411
412pub(crate) fn to_correct_scalar_value(
413    stat_val: &serde_json::Value,
414    field_dt: &ArrowDataType,
415) -> DataFusionResult<Option<ScalarValue>> {
416    match stat_val {
417        serde_json::Value::Array(_) => Ok(None),
418        serde_json::Value::Object(_) => Ok(None),
419        serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)),
420        serde_json::Value::String(string_val) => match field_dt {
421            ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
422            ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
423            _ => Ok(Some(ScalarValue::try_from_string(
424                string_val.to_owned(),
425                field_dt,
426            )?)),
427        },
428        other => match field_dt {
429            ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
430            ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
431            _ => Ok(Some(ScalarValue::try_from_string(
432                other.to_string(),
433                field_dt,
434            )?)),
435        },
436    }
437}
438
439/// Legacy codec for serialized plans that still contain the retired physical
440/// [`DeltaScan`] wrapper.
441#[deprecated(
442    note = "DeltaPhysicalCodec only supports the retired physical DeltaScan wrapper. \
443            Use DeltaLogicalCodec for table-provider serialization until a DeltaScanExec \
444            physical codec is available."
445)]
446#[derive(Debug)]
447pub struct DeltaPhysicalCodec {}
448
449#[allow(deprecated)]
450impl PhysicalExtensionCodec for DeltaPhysicalCodec {
451    fn try_decode(
452        &self,
453        buf: &[u8],
454        inputs: &[Arc<dyn ExecutionPlan>],
455        _registry: &TaskContext,
456    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
457        let wire: DeltaScanWire = serde_json::from_reader(buf)
458            .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
459        let delta_scan = wire.into_delta_scan((*inputs)[0].clone());
460        Ok(Arc::new(delta_scan))
461    }
462
463    fn try_encode(
464        &self,
465        node: Arc<dyn ExecutionPlan>,
466        buf: &mut Vec<u8>,
467    ) -> Result<(), DataFusionError> {
468        let delta_scan = node
469            .as_any()
470            .downcast_ref::<DeltaScan>()
471            .ok_or_else(|| DataFusionError::Internal("Not a legacy delta scan!".to_string()))?;
472
473        let wire = DeltaScanWire::from(delta_scan);
474        serde_json::to_writer(buf, &wire).map_err(|_| {
475            DataFusionError::Internal("Unable to encode legacy delta scan!".to_string())
476        })?;
477        Ok(())
478    }
479}
480
481/// Does serde on DeltaTables
482#[derive(Debug)]
483pub struct DeltaLogicalCodec {}
484
485impl LogicalExtensionCodec for DeltaLogicalCodec {
486    fn try_decode(
487        &self,
488        _buf: &[u8],
489        _inputs: &[LogicalPlan],
490        _ctx: &TaskContext,
491    ) -> Result<Extension, DataFusionError> {
492        todo!("DeltaLogicalCodec")
493    }
494
495    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<(), DataFusionError> {
496        todo!("DeltaLogicalCodec")
497    }
498
499    fn try_decode_table_provider(
500        &self,
501        buf: &[u8],
502        _table_ref: &TableReference,
503        _schema: SchemaRef,
504        _ctx: &TaskContext,
505    ) -> Result<Arc<dyn TableProvider>, DataFusionError> {
506        let provider: DeltaScanNext = serde_json::from_slice(buf)
507            .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))?;
508        Ok(Arc::new(provider))
509    }
510
511    fn try_encode_table_provider(
512        &self,
513        _table_ref: &TableReference,
514        node: Arc<dyn TableProvider>,
515        buf: &mut Vec<u8>,
516    ) -> Result<(), DataFusionError> {
517        let scan = node
518            .as_ref()
519            .as_any()
520            .downcast_ref::<DeltaScanNext>()
521            .ok_or_else(|| {
522                DataFusionError::Internal("Can't encode non-delta tables".to_string())
523            })?;
524        serde_json::to_writer(buf, scan)
525            .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
526    }
527}
528
529/// Responsible for creating deltatables
530#[derive(Debug)]
531pub struct DeltaTableFactory {}
532
533#[async_trait::async_trait]
534impl TableProviderFactory for DeltaTableFactory {
535    async fn create(
536        &self,
537        ctx: &dyn Session,
538        cmd: &CreateExternalTable,
539    ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
540        let table = if cmd.options.is_empty() {
541            let table_url = ensure_table_uri(&cmd.to_owned().location)?;
542            open_table(table_url).await?
543        } else {
544            let table_url = ensure_table_uri(&cmd.to_owned().location)?;
545            open_table_with_storage_options(table_url, cmd.to_owned().options).await?
546        };
547        let table_uri = table.log_store().root_url().clone();
548        let (session_state, _) = resolve_session_state(
549            Some(ctx),
550            SessionFallbackPolicy::DeriveFromTrait,
551            || create_session().state(),
552            SessionResolveContext {
553                operation: "DeltaTableFactory::create",
554                table_uri: Some(&table_uri),
555                cdc: false,
556            },
557        )?;
558
559        Ok(table
560            .table_provider()
561            .with_session(Arc::new(session_state))
562            .await?)
563    }
564}
565
566/// A wrapper for Deltafusion's Column to preserve case-sensitivity during string conversion
567pub struct DeltaColumn {
568    inner: Column,
569}
570
571impl From<&str> for DeltaColumn {
572    fn from(c: &str) -> Self {
573        DeltaColumn {
574            inner: Column::from_qualified_name_ignore_case(c),
575        }
576    }
577}
578
579/// Create a column, cloning the string
580impl From<&String> for DeltaColumn {
581    fn from(c: &String) -> Self {
582        DeltaColumn {
583            inner: Column::from_qualified_name_ignore_case(c),
584        }
585    }
586}
587
588/// Create a column, reusing the existing string
589impl From<String> for DeltaColumn {
590    fn from(c: String) -> Self {
591        DeltaColumn {
592            inner: Column::from_qualified_name_ignore_case(c),
593        }
594    }
595}
596
597impl From<DeltaColumn> for Column {
598    fn from(value: DeltaColumn) -> Self {
599        value.inner
600    }
601}
602
603/// Create a column, reusing the existing datafusion column
604impl From<Column> for DeltaColumn {
605    fn from(c: Column) -> Self {
606        DeltaColumn { inner: c }
607    }
608}
609
610#[cfg(test)]
611mod tests {
612    use crate::DeltaTable;
613    use crate::operations::write::SchemaMode;
614    use crate::test_utils::{
615        object_store::{
616            RecordedObjectStoreOperation as ObjectStoreOperation, RecordedPathKind as PathKind,
617            drain_recorded_object_store_operations as drain_recorded_ops, recording_log_store,
618        },
619        open_fs_path,
620    };
621    use crate::writer::test_utils::get_delta_schema;
622    use arrow::array::StructArray;
623    use arrow::datatypes::{Field, Schema};
624    use datafusion::assert_batches_sorted_eq;
625    use datafusion::physical_plan::empty::EmptyExec;
626    use datafusion::prelude::SessionConfig;
627    use datafusion_proto::physical_plan::AsExecutionPlan;
628    use datafusion_proto::protobuf;
629    use delta_kernel::schema::ArrayType;
630    use futures::{StreamExt, TryStreamExt};
631    use object_store::ObjectStoreExt as _;
632    use serde_json::json;
633    use std::ops::Range;
634    use url::Url;
635
636    use super::*;
637    use crate::delta_datafusion::table_provider::next::{FileSelection, MissingFilePolicy};
638    // test deserialization of serialized partition values.
639    // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
640    #[test]
641    fn test_to_correct_scalar_value() {
642        let reference_pairs = &[
643            (
644                json!("2015"),
645                ArrowDataType::Int16,
646                ScalarValue::Int16(Some(2015)),
647            ),
648            (
649                json!("2015"),
650                ArrowDataType::Int32,
651                ScalarValue::Int32(Some(2015)),
652            ),
653            (
654                json!("2015"),
655                ArrowDataType::Int64,
656                ScalarValue::Int64(Some(2015)),
657            ),
658            (
659                json!("2015"),
660                ArrowDataType::Float32,
661                ScalarValue::Float32(Some(2015_f32)),
662            ),
663            (
664                json!("2015"),
665                ArrowDataType::Float64,
666                ScalarValue::Float64(Some(2015_f64)),
667            ),
668            (
669                json!(2015),
670                ArrowDataType::Float64,
671                ScalarValue::Float64(Some(2015_f64)),
672            ),
673            (
674                json!("2015-01-01"),
675                ArrowDataType::Date32,
676                ScalarValue::Date32(Some(16436)),
677            ),
678            // (
679            //     json!("2015-01-01"),
680            //     ArrowDataType::Date64,
681            //     ScalarValue::Date64(Some(16436)),
682            // ),
683            // TODO(roeap) there seem to be differences in how precisions are handled locally and in CI, need to investigate
684            // (
685            //     json!("2020-09-08 13:42:29"),
686            //     ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
687            //     ScalarValue::TimestampNanosecond(Some(1599565349000000000), None),
688            // ),
689            // (
690            //     json!("2020-09-08 13:42:29"),
691            //     ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
692            //     ScalarValue::TimestampMicrosecond(Some(1599565349000000), None),
693            // ),
694            // (
695            //     json!("2020-09-08 13:42:29"),
696            //     ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
697            //     ScalarValue::TimestampMillisecond(Some(1599565349000), None),
698            // ),
699            (
700                json!(true),
701                ArrowDataType::Boolean,
702                ScalarValue::Boolean(Some(true)),
703            ),
704        ];
705
706        for (raw, data_type, ref_scalar) in reference_pairs {
707            let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap();
708            assert_eq!(*ref_scalar, scalar)
709        }
710    }
711
712    #[test]
713    #[allow(deprecated)]
714    fn roundtrip_test_delta_exec_plan() {
715        let ctx = SessionContext::new();
716        let codec = DeltaPhysicalCodec {};
717
718        let schema = Arc::new(Schema::new(vec![
719            Field::new("a", ArrowDataType::Utf8, false),
720            Field::new("b", ArrowDataType::Int32, false),
721        ]));
722        let exec_plan = Arc::from(DeltaScan::new(
723            &Url::parse("s3://my_bucket/this/is/some/path").unwrap(),
724            DeltaScanConfig::default(),
725            Arc::from(EmptyExec::new(schema.clone())),
726            schema.clone(),
727        ));
728        let proto: protobuf::PhysicalPlanNode =
729            protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
730                .expect("to proto");
731
732        let task_ctx = ctx.task_ctx();
733        let result_exec_plan: Arc<dyn ExecutionPlan> = proto
734            .try_into_physical_plan(&task_ctx, &codec)
735            .expect("from proto");
736        assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
737    }
738
739    #[tokio::test]
740    async fn roundtrip_test_delta_logical_codec_preserves_file_selection() {
741        let log_store = crate::test_utils::TestTables::Simple
742            .table_builder()
743            .unwrap()
744            .build_storage()
745            .unwrap();
746        let snapshot = Arc::new(
747            crate::kernel::Snapshot::try_new(&log_store, Default::default(), None)
748                .await
749                .unwrap(),
750        );
751        let table_root = snapshot
752            .scan_builder()
753            .build()
754            .unwrap()
755            .table_root()
756            .clone();
757        let selected_file_ids: Vec<String> = snapshot
758            .file_views(log_store.as_ref(), None)
759            .take(1)
760            .map_ok(|view| table_root.join(view.path_raw()).unwrap().to_string())
761            .try_collect()
762            .await
763            .unwrap();
764        assert_eq!(selected_file_ids.len(), 1);
765
766        let provider = DeltaScanNext::builder()
767            .with_snapshot(snapshot)
768            .build()
769            .await
770            .unwrap()
771            .with_file_selection(
772                FileSelection::new(selected_file_ids.clone())
773                    .with_missing_file_policy(MissingFilePolicy::Ignore),
774            );
775
776        let codec = DeltaLogicalCodec {};
777        let table_ref = TableReference::bare("delta_table");
778        let mut encoded = Vec::new();
779        codec
780            .try_encode_table_provider(&table_ref, Arc::new(provider), &mut encoded)
781            .unwrap();
782
783        let ctx = SessionContext::new();
784        let decoded = codec
785            .try_decode_table_provider(
786                &encoded,
787                &table_ref,
788                Arc::new(ArrowSchema::empty()),
789                &ctx.task_ctx(),
790            )
791            .unwrap();
792        let decoded_provider = decoded
793            .as_ref()
794            .as_any()
795            .downcast_ref::<DeltaScanNext>()
796            .unwrap();
797
798        let serialized = serde_json::to_value(decoded_provider).unwrap();
799        let decoded_file_ids = serialized
800            .get("file_selection")
801            .and_then(|value| value.get("file_ids"))
802            .and_then(|value| value.as_array())
803            .unwrap()
804            .iter()
805            .map(|value| value.as_str().unwrap().to_string())
806            .collect::<Vec<_>>();
807        let decoded_policy = serialized
808            .get("file_selection")
809            .and_then(|value| value.get("missing_file_policy"))
810            .and_then(|value| value.as_str())
811            .unwrap();
812
813        assert_eq!(decoded_file_ids, selected_file_ids);
814        assert_eq!(decoded_policy, "Ignore");
815    }
816
817    #[tokio::test]
818    async fn delta_table_provider_with_config() {
819        let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
820        let provider = table
821            .table_provider()
822            .with_file_column("file_source")
823            .await
824            .unwrap();
825        let ctx = SessionContext::new();
826        ctx.register_table("test", provider).unwrap();
827
828        let df = ctx
829            .sql("select c3, c1, c2, right(file_source, 77) as file_source from test")
830            .await
831            .unwrap();
832        let actual = df.collect().await.unwrap();
833        let expected = vec![
834            "+----+----+----+-------------------------------------------------------------------------------+",
835            "| c3 | c1 | c2 | file_source                                                                   |",
836            "+----+----+----+-------------------------------------------------------------------------------+",
837            "| 4  | 6  | a  | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
838            "| 5  | 4  | c  | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
839            "| 6  | 5  | b  | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
840            "+----+----+----+-------------------------------------------------------------------------------+",
841        ];
842        assert_batches_sorted_eq!(&expected, &actual);
843    }
844
845    #[tokio::test]
846    async fn delta_scan_mixed_partition_order() {
847        // Tests issue (1787) where partition columns were incorrect when they
848        // have a different order in the metadata and table schema
849        let schema = Arc::new(ArrowSchema::new(vec![
850            Field::new("modified", ArrowDataType::Utf8, true),
851            Field::new("id", ArrowDataType::Utf8, true),
852            Field::new("value", ArrowDataType::Int32, true),
853        ]));
854
855        let table = DeltaTable::new_in_memory()
856            .create()
857            .with_columns(get_delta_schema().fields().cloned())
858            .with_partition_columns(["modified", "id"])
859            .await
860            .unwrap();
861        assert_eq!(table.version(), Some(0));
862
863        let batch = RecordBatch::try_new(
864            schema.clone(),
865            vec![
866                Arc::new(arrow::array::StringArray::from(vec![
867                    "2021-02-01",
868                    "2021-02-01",
869                    "2021-02-02",
870                    "2021-02-02",
871                ])),
872                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
873                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
874            ],
875        )
876        .unwrap();
877        // write some data
878        let table = table
879            .write(vec![batch.clone()])
880            .with_save_mode(crate::protocol::SaveMode::Append)
881            .await
882            .unwrap();
883
884        let provider = table.table_provider().await.unwrap();
885        let logical_schema = provider.schema();
886        let ctx = SessionContext::new();
887        ctx.runtime_env().register_object_store(
888            table.log_store().root_url(),
889            table.log_store().object_store(None),
890        );
891        ctx.register_table("test", provider).unwrap();
892
893        let expected_logical_order = vec!["id", "value", "modified"];
894        let actual_order: Vec<String> = logical_schema
895            .fields()
896            .iter()
897            .map(|f| f.name().to_owned())
898            .collect();
899
900        let df = ctx.sql("select * from test").await.unwrap();
901        let actual = df.collect().await.unwrap();
902        let expected = vec![
903            "+----+-------+------------+",
904            "| id | value | modified   |",
905            "+----+-------+------------+",
906            "| A  | 1     | 2021-02-01 |",
907            "| B  | 10    | 2021-02-01 |",
908            "| C  | 20    | 2021-02-02 |",
909            "| D  | 100   | 2021-02-02 |",
910            "+----+-------+------------+",
911        ];
912        assert_batches_sorted_eq!(&expected, &actual);
913        assert_eq!(expected_logical_order, actual_order);
914    }
915
916    #[tokio::test]
917    async fn delta_scan_case_sensitive() {
918        let schema = Arc::new(ArrowSchema::new(vec![
919            Field::new("moDified", ArrowDataType::Utf8, true),
920            Field::new("ID", ArrowDataType::Utf8, true),
921            Field::new("vaLue", ArrowDataType::Int32, true),
922        ]));
923
924        let batch = RecordBatch::try_new(
925            schema.clone(),
926            vec![
927                Arc::new(arrow::array::StringArray::from(vec![
928                    "2021-02-01",
929                    "2021-02-01",
930                    "2021-02-02",
931                    "2021-02-02",
932                ])),
933                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
934                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
935            ],
936        )
937        .unwrap();
938        // write some data
939        let table = DeltaTable::new_in_memory()
940            .write(vec![batch.clone()])
941            .with_save_mode(crate::protocol::SaveMode::Append)
942            .await
943            .unwrap();
944
945        let provider = table.table_provider().await.unwrap();
946        let ctx: SessionContext = DeltaSessionContext::default().into();
947        ctx.register_table("test", provider).unwrap();
948
949        let df = ctx
950            .sql("select ID, moDified, vaLue from test")
951            .await
952            .unwrap();
953        let actual = df.collect().await.unwrap();
954        let expected = vec![
955            "+----+------------+-------+",
956            "| ID | moDified   | vaLue |",
957            "+----+------------+-------+",
958            "| A  | 2021-02-01 | 1     |",
959            "| B  | 2021-02-01 | 10    |",
960            "| C  | 2021-02-02 | 20    |",
961            "| D  | 2021-02-02 | 100   |",
962            "+----+------------+-------+",
963        ];
964        assert_batches_sorted_eq!(&expected, &actual);
965
966        /* TODO: Datafusion doesn't have any options to prevent case-sensitivity with the col func */
967        /*
968        let df = ctx
969            .table("test")
970            .await
971            .unwrap()
972            .select(vec![col("ID"), col("moDified"), col("vaLue")])
973            .unwrap();
974        let actual = df.collect().await.unwrap();
975        assert_batches_sorted_eq!(&expected, &actual);
976        */
977    }
978
979    #[tokio::test]
980    async fn delta_scan_supports_missing_columns() {
981        let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
982            "col_1",
983            ArrowDataType::Utf8,
984            true,
985        )]));
986
987        let batch1 = RecordBatch::try_new(
988            schema1.clone(),
989            vec![Arc::new(arrow::array::StringArray::from(vec![
990                Some("A"),
991                Some("B"),
992            ]))],
993        )
994        .unwrap();
995
996        let schema2 = Arc::new(ArrowSchema::new(vec![
997            Field::new("col_1", ArrowDataType::Utf8, true),
998            Field::new("col_2", ArrowDataType::Utf8, true),
999        ]));
1000
1001        let batch2 = RecordBatch::try_new(
1002            schema2.clone(),
1003            vec![
1004                Arc::new(arrow::array::StringArray::from(vec![
1005                    Some("E"),
1006                    Some("F"),
1007                    Some("G"),
1008                ])),
1009                Arc::new(arrow::array::StringArray::from(vec![
1010                    Some("E2"),
1011                    Some("F2"),
1012                    Some("G2"),
1013                ])),
1014            ],
1015        )
1016        .unwrap();
1017
1018        let table = DeltaTable::new_in_memory()
1019            .write(vec![batch2])
1020            .with_save_mode(crate::protocol::SaveMode::Append)
1021            .await
1022            .unwrap();
1023
1024        let table = table
1025            .write(vec![batch1])
1026            .with_schema_mode(SchemaMode::Merge)
1027            .with_save_mode(crate::protocol::SaveMode::Append)
1028            .await
1029            .unwrap();
1030
1031        let provider = table.table_provider().await.unwrap();
1032        let ctx: SessionContext = DeltaSessionContext::default().into();
1033        ctx.register_table("test", provider).unwrap();
1034
1035        let df = ctx.sql("select col_1, col_2 from test").await.unwrap();
1036        let actual = df.collect().await.unwrap();
1037        let expected = vec![
1038            "+-------+-------+",
1039            "| col_1 | col_2 |",
1040            "+-------+-------+",
1041            "| A     |       |",
1042            "| B     |       |",
1043            "| E     | E2    |",
1044            "| F     | F2    |",
1045            "| G     | G2    |",
1046            "+-------+-------+",
1047        ];
1048        assert_batches_sorted_eq!(&expected, &actual);
1049    }
1050
1051    #[tokio::test]
1052    async fn delta_scan_supports_pushdown() {
1053        let schema = Arc::new(ArrowSchema::new(vec![
1054            Field::new("col_1", ArrowDataType::Utf8, false),
1055            Field::new("col_2", ArrowDataType::Utf8, false),
1056        ]));
1057
1058        let batch = RecordBatch::try_new(
1059            schema.clone(),
1060            vec![
1061                Arc::new(arrow::array::StringArray::from(vec![
1062                    Some("A"),
1063                    Some("B"),
1064                    Some("C"),
1065                ])),
1066                Arc::new(arrow::array::StringArray::from(vec![
1067                    Some("A2"),
1068                    Some("B2"),
1069                    Some("C2"),
1070                ])),
1071            ],
1072        )
1073        .unwrap();
1074
1075        let table = DeltaTable::new_in_memory()
1076            .write(vec![batch])
1077            .with_save_mode(crate::protocol::SaveMode::Append)
1078            .await
1079            .unwrap();
1080
1081        let provider = table.table_provider().await.unwrap();
1082
1083        let mut cfg = SessionConfig::default();
1084        cfg.options_mut().execution.parquet.pushdown_filters = true;
1085        let ctx = SessionContext::new_with_config(cfg);
1086        ctx.register_table("test", provider).unwrap();
1087
1088        let df = ctx
1089            .sql("select col_1, col_2 from test WHERE col_1 = 'A'")
1090            .await
1091            .unwrap();
1092        let actual = df.collect().await.unwrap();
1093        let expected = vec![
1094            "+-------+-------+",
1095            "| col_1 | col_2 |",
1096            "+-------+-------+",
1097            "| A     | A2    |",
1098            "+-------+-------+",
1099        ];
1100        assert_batches_sorted_eq!(&expected, &actual);
1101    }
1102
1103    #[tokio::test]
1104    async fn delta_scan_supports_nested_missing_columns() {
1105        let column1_schema1: arrow::datatypes::Fields =
1106            vec![Field::new("col_1a", ArrowDataType::Utf8, true)].into();
1107        let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1108            "col_1",
1109            ArrowDataType::Struct(column1_schema1.clone()),
1110            true,
1111        )]));
1112
1113        let batch1 = RecordBatch::try_new(
1114            schema1.clone(),
1115            vec![Arc::new(StructArray::new(
1116                column1_schema1,
1117                vec![Arc::new(arrow::array::StringArray::from(vec![
1118                    Some("A"),
1119                    Some("B"),
1120                ]))],
1121                None,
1122            ))],
1123        )
1124        .unwrap();
1125
1126        let column1_schema2: arrow_schema::Fields = vec![
1127            Field::new("col_1a", ArrowDataType::Utf8, true),
1128            Field::new("col_1b", ArrowDataType::Utf8, true),
1129        ]
1130        .into();
1131        let schema2 = Arc::new(ArrowSchema::new(vec![Field::new(
1132            "col_1",
1133            ArrowDataType::Struct(column1_schema2.clone()),
1134            true,
1135        )]));
1136
1137        let batch2 = RecordBatch::try_new(
1138            schema2.clone(),
1139            vec![Arc::new(StructArray::new(
1140                column1_schema2,
1141                vec![
1142                    Arc::new(arrow::array::StringArray::from(vec![
1143                        Some("E"),
1144                        Some("F"),
1145                        Some("G"),
1146                    ])),
1147                    Arc::new(arrow::array::StringArray::from(vec![
1148                        Some("E2"),
1149                        Some("F2"),
1150                        Some("G2"),
1151                    ])),
1152                ],
1153                None,
1154            ))],
1155        )
1156        .unwrap();
1157
1158        let table = DeltaTable::new_in_memory()
1159            .write(vec![batch1])
1160            .with_save_mode(crate::protocol::SaveMode::Append)
1161            .await
1162            .unwrap();
1163
1164        let table = table
1165            .write(vec![batch2])
1166            .with_schema_mode(SchemaMode::Merge)
1167            .with_save_mode(crate::protocol::SaveMode::Append)
1168            .await
1169            .unwrap();
1170
1171        let provider = table.table_provider().await.unwrap();
1172        let ctx: SessionContext = DeltaSessionContext::default().into();
1173        ctx.register_table("test", provider).unwrap();
1174
1175        let df = ctx
1176            .sql("select col_1.col_1a, col_1.col_1b from test")
1177            .await
1178            .unwrap();
1179        let actual = df.collect().await.unwrap();
1180        let expected = vec![
1181            "+--------------------+--------------------+",
1182            "| test.col_1[col_1a] | test.col_1[col_1b] |",
1183            "+--------------------+--------------------+",
1184            "| A                  |                    |",
1185            "| B                  |                    |",
1186            "| E                  | E2                 |",
1187            "| F                  | F2                 |",
1188            "| G                  | G2                 |",
1189            "+--------------------+--------------------+",
1190        ];
1191        assert_batches_sorted_eq!(&expected, &actual);
1192    }
1193
1194    #[tokio::test]
1195    async fn test_multiple_predicate_pushdown() {
1196        let schema = Arc::new(ArrowSchema::new(vec![
1197            Field::new("moDified", ArrowDataType::Utf8, true),
1198            Field::new("id", ArrowDataType::Utf8, true),
1199            Field::new("vaLue", ArrowDataType::Int32, true),
1200        ]));
1201
1202        let batch = RecordBatch::try_new(
1203            schema.clone(),
1204            vec![
1205                Arc::new(arrow::array::StringArray::from(vec![
1206                    "2021-02-01",
1207                    "2021-02-01",
1208                    "2021-02-02",
1209                    "2021-02-02",
1210                ])),
1211                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1212                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1213            ],
1214        )
1215        .unwrap();
1216        // write some data
1217        let table = DeltaTable::new_in_memory()
1218            .write(vec![batch.clone()])
1219            .with_save_mode(crate::protocol::SaveMode::Append)
1220            .await
1221            .unwrap();
1222
1223        let datafusion = SessionContext::new();
1224        table
1225            .update_datafusion_session(&datafusion.state())
1226            .unwrap();
1227
1228        datafusion
1229            .register_table("snapshot", table.table_provider().await.unwrap())
1230            .unwrap();
1231
1232        let df = datafusion
1233            .sql("select * from snapshot where id > 10000 and id < 20000")
1234            .await
1235            .unwrap();
1236
1237        df.collect().await.unwrap();
1238    }
1239
1240    // Run a query that filters out all files and sorts.
1241    // Verify that it returns an empty set of rows without panicking.
1242    //
1243    // Historically, we had a bug that caused us to emit a query plan with 0 partitions, which
1244    // datafusion rejected.
1245    #[tokio::test]
1246    async fn passes_sanity_checker_when_all_files_filtered() {
1247        let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
1248        let ctx = create_session().into_inner();
1249        ctx.register_table("test", table.table_provider().await.unwrap())
1250            .unwrap();
1251
1252        let df = ctx
1253            .sql("select * from test where c3 = 100 ORDER BY c1 ASC")
1254            .await
1255            .unwrap();
1256        let actual = df.collect().await.unwrap();
1257
1258        assert_eq!(actual.len(), 0);
1259    }
1260
1261    #[tokio::test]
1262    async fn test_delta_scan_uses_parquet_column_pruning() {
1263        let small: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["a"]));
1264        let large: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec![
1265            "b".repeat(1024).as_str(),
1266        ]));
1267        let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap();
1268        let table = DeltaTable::new_in_memory()
1269            .write(vec![batch])
1270            .with_save_mode(crate::protocol::SaveMode::Append)
1271            .await
1272            .unwrap();
1273
1274        let config = DeltaScanConfigBuilder::new()
1275            .build(table.snapshot().unwrap().snapshot())
1276            .unwrap();
1277
1278        let (log_store, mut operations) = recording_log_store(table.log_store());
1279        let provider = DeltaScanNext::new(table.snapshot().unwrap().snapshot().clone(), config)
1280            .unwrap()
1281            .with_log_store(log_store.clone());
1282        let ctx = SessionContext::new();
1283        ctx.register_table("test", Arc::new(provider)).unwrap();
1284        let state = ctx.state();
1285
1286        let df = ctx.sql("select small from test").await.unwrap();
1287        let plan = df.create_physical_plan().await.unwrap();
1288
1289        let mut stream = plan.execute(0, state.task_ctx()).unwrap();
1290        let Some(Ok(batch)) = stream.next().await else {
1291            panic!()
1292        };
1293        assert!(stream.next().await.is_none());
1294        assert_eq!(1, batch.num_columns());
1295        assert_eq!(1, batch.num_rows());
1296        let small = ScalarValue::try_from_array(batch.column_by_name("small").unwrap(), 0).unwrap();
1297        assert_eq!("a", small.to_string());
1298
1299        let files = table.get_files_by_partitions(&[]).await.unwrap();
1300        assert_eq!(1, files.len());
1301        let object_store = table.object_store();
1302        let file_meta = object_store.head(&files[0]).await.unwrap();
1303        let file_reader = parquet::arrow::async_reader::ParquetObjectReader::new(
1304            object_store,
1305            file_meta.location.clone(),
1306        )
1307        .with_file_size(file_meta.size);
1308        let parquet_metadata =
1309            parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder::new(file_reader)
1310                .await
1311                .unwrap()
1312                .metadata()
1313                .as_ref()
1314                .clone();
1315        let (small_start, small_len) = parquet_metadata.row_group(0).column(0).byte_range();
1316        let small_range = small_start..small_start + small_len;
1317        let (large_start, large_len) = parquet_metadata.row_group(0).column(1).byte_range();
1318        let large_range = large_start..large_start + large_len;
1319
1320        let actual = drain_recorded_ops(&mut operations).await;
1321
1322        let data_ranges = actual
1323            .iter()
1324            .flat_map(|operation| match operation {
1325                ObjectStoreOperation::GetRange(PathKind::Data, range) => vec![range.clone()],
1326                ObjectStoreOperation::GetRanges(PathKind::Data, ranges) => ranges.clone(),
1327                _ => Vec::new(),
1328            })
1329            .collect::<Vec<_>>();
1330
1331        let overlaps = |left: &Range<u64>, right: &Range<u64>| {
1332            left.start < right.end && right.start < left.end
1333        };
1334
1335        assert!(
1336            !data_ranges.is_empty(),
1337            "expected ranged parquet data reads, saw {actual:?}"
1338        );
1339        assert!(
1340            data_ranges
1341                .iter()
1342                .any(|range| overlaps(range, &small_range)),
1343            "expected selected column chunk {small_range:?} to be read, saw {actual:?}"
1344        );
1345        assert!(
1346            data_ranges
1347                .iter()
1348                .all(|range| !overlaps(range, &large_range)),
1349            "expected unselected column chunk {large_range:?} to be pruned, saw {actual:?}"
1350        );
1351        assert!(
1352            !actual.iter().any(|operation| matches!(
1353                operation,
1354                ObjectStoreOperation::Get(PathKind::Data)
1355                    | ObjectStoreOperation::GetOpts(PathKind::Data)
1356            )),
1357            "expected no full data file reads, saw {actual:?}"
1358        );
1359    }
1360
1361    #[tokio::test]
1362    async fn test_push_down_filter_panic_2602() -> DeltaResult<()> {
1363        use crate::kernel::schema::{DataType, PrimitiveType};
1364        let ctx = SessionContext::new();
1365        let table = DeltaTable::new_in_memory()
1366            .create()
1367            .with_column("id", DataType::Primitive(PrimitiveType::Long), true, None)
1368            .with_column(
1369                "name",
1370                DataType::Primitive(PrimitiveType::String),
1371                true,
1372                None,
1373            )
1374            .with_column("b", DataType::Primitive(PrimitiveType::Boolean), true, None)
1375            .with_column(
1376                "ts",
1377                DataType::Primitive(PrimitiveType::Timestamp),
1378                true,
1379                None,
1380            )
1381            .with_column("dt", DataType::Primitive(PrimitiveType::Date), true, None)
1382            .with_column(
1383                "zap",
1384                DataType::Array(Box::new(ArrayType::new(
1385                    DataType::Primitive(PrimitiveType::Boolean),
1386                    true,
1387                ))),
1388                true,
1389                None,
1390            )
1391            .await?;
1392        table.update_datafusion_session(&ctx.state()).unwrap();
1393
1394        ctx.register_table("snapshot", table.table_provider().await.unwrap())
1395            .unwrap();
1396
1397        let df = ctx
1398            .sql("select * from snapshot where id > 10000 and id < 20000")
1399            .await
1400            .unwrap();
1401
1402        let _ = df.collect().await?;
1403        Ok(())
1404    }
1405}