Skip to main content

deltalake_core/delta_datafusion/
mod.rs

1//! Datafusion integration for Delta Table
2//!
3//! Example:
4//!
5//! ```rust
6//! use std::sync::Arc;
7//! use datafusion::execution::context::SessionContext;
8//!
9//! async {
10//!   let mut ctx = SessionContext::new();
11//!   let table = deltalake_core::open_table_with_storage_options(
12//!       url::Url::parse("memory://").unwrap(),
13//!       std::collections::HashMap::new()
14//!   )
15//!       .await
16//!       .unwrap();
17//!   ctx.register_table("demo", table.table_provider().await.unwrap()).unwrap();
18//!
19//!   let batches = ctx
20//!       .sql("SELECT * FROM demo").await.unwrap()
21//!       .collect()
22//!       .await.unwrap();
23//! };
24//! ```
25
26use std::fmt::Debug;
27use std::sync::Arc;
28
29use arrow::array::types::UInt16Type;
30use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
31use arrow_cast::{CastOptions, cast_with_options};
32use arrow_schema::{
33    DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef,
34    TimeUnit,
35};
36use datafusion::catalog::{Session, TableProviderFactory};
37use datafusion::common::scalar::ScalarValue;
38use datafusion::common::{
39    Column, DFSchema, DataFusionError, Result as DataFusionResult, TableReference, ToDFSchema,
40};
41use datafusion::datasource::TableProvider;
42use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
43use datafusion::execution::TaskContext;
44use datafusion::execution::context::SessionContext;
45use datafusion::logical_expr::logical_plan::CreateExternalTable;
46use datafusion::logical_expr::utils::conjunction;
47use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
48use datafusion::physical_optimizer::pruning::PruningPredicate;
49use datafusion::physical_plan::ExecutionPlan;
50use datafusion_proto::logical_plan::LogicalExtensionCodec;
51use datafusion_proto::physical_plan::PhysicalExtensionCodec;
52use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
53use either::Either;
54
55use crate::delta_datafusion::expr::parse_predicate_expression;
56use crate::delta_datafusion::table_provider::DeltaScanWire;
57use crate::ensure_table_uri;
58use crate::errors::{DeltaResult, DeltaTableError};
59use crate::kernel::{Add, EagerSnapshot, LogDataHandler, Snapshot};
60use crate::{open_table, open_table_with_storage_options};
61
62pub(crate) use self::session::DeltaSessionExt;
63pub use self::session::{
64    DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, DeltaSessionContext,
65    create_session,
66};
67pub use self::table_provider::next::{DeletionVectorSelection, DeltaScan as DeltaScanNext};
68pub(crate) use self::utils::*;
69pub use cdf::scan::DeltaCdfTableProvider;
70pub(crate) use data_validation::{
71    DataValidationExec, constraints_to_exprs, generated_columns_to_exprs, validation_predicates,
72};
73pub(crate) use find_files::*;
74pub use table_provider::{
75    DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder,
76    next::DeltaScanExec,
77};
78pub(crate) use table_provider::{
79    DeltaScanBuilder, next::FILE_ID_COLUMN_DEFAULT, update_datafusion_session,
80};
81
82pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
83
84pub mod cdf;
85mod data_validation;
86pub mod engine;
87pub mod expr;
88mod file_id;
89mod find_files;
90pub mod logical;
91pub mod physical;
92pub mod planner;
93mod session;
94pub use session::SessionFallbackPolicy;
95pub(crate) use session::{SessionResolveContext, resolve_session_state};
96mod table_provider;
97pub(crate) mod utils;
98
99impl From<DeltaTableError> for DataFusionError {
100    fn from(err: DeltaTableError) -> Self {
101        match err {
102            DeltaTableError::Arrow { source } => DataFusionError::from(source),
103            DeltaTableError::Io { source } => DataFusionError::IoError(source),
104            DeltaTableError::ObjectStore { source } => DataFusionError::from(source),
105            DeltaTableError::Parquet { source } => DataFusionError::from(source),
106            _ => DataFusionError::External(Box::new(err)),
107        }
108    }
109}
110
111impl From<DataFusionError> for DeltaTableError {
112    fn from(err: DataFusionError) -> Self {
113        match err {
114            DataFusionError::ArrowError(source, _) => DeltaTableError::from(*source),
115            DataFusionError::IoError(source) => DeltaTableError::Io { source },
116            DataFusionError::ObjectStore(source) => DeltaTableError::from(*source),
117            DataFusionError::ParquetError(source) => DeltaTableError::from(*source),
118            _ => DeltaTableError::Generic(err.to_string()),
119        }
120    }
121}
122
123/// Convenience trait for calling common methods on snapshot hierarchies
124pub trait DataFusionMixins {
125    /// The physical datafusion schema of a table
126    fn read_schema(&self) -> ArrowSchemaRef;
127
128    /// Get the table schema as an [`ArrowSchemaRef`]
129    fn input_schema(&self) -> ArrowSchemaRef;
130
131    /// Parse an expression string into a datafusion [`Expr`]
132    fn parse_predicate_expression(
133        &self,
134        expr: impl AsRef<str>,
135        session: &dyn Session,
136    ) -> DeltaResult<Expr>;
137}
138
139impl DataFusionMixins for Snapshot {
140    fn read_schema(&self) -> ArrowSchemaRef {
141        _arrow_schema(
142            self.arrow_schema(),
143            self.metadata().partition_columns(),
144            true,
145        )
146    }
147
148    fn input_schema(&self) -> ArrowSchemaRef {
149        _arrow_schema(
150            self.arrow_schema(),
151            self.metadata().partition_columns(),
152            false,
153        )
154    }
155
156    fn parse_predicate_expression(
157        &self,
158        expr: impl AsRef<str>,
159        session: &dyn Session,
160    ) -> DeltaResult<Expr> {
161        let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
162        parse_predicate_expression(&schema, expr, session)
163    }
164}
165
166impl DataFusionMixins for LogDataHandler<'_> {
167    fn read_schema(&self) -> ArrowSchemaRef {
168        _arrow_schema(
169            Arc::new(
170                self.table_configuration()
171                    .schema()
172                    .as_ref()
173                    .try_into_arrow()
174                    .unwrap(),
175            ),
176            self.table_configuration().metadata().partition_columns(),
177            true,
178        )
179    }
180
181    fn input_schema(&self) -> ArrowSchemaRef {
182        _arrow_schema(
183            Arc::new(
184                self.table_configuration()
185                    .schema()
186                    .as_ref()
187                    .try_into_arrow()
188                    .unwrap(),
189            ),
190            self.table_configuration().metadata().partition_columns(),
191            false,
192        )
193    }
194
195    fn parse_predicate_expression(
196        &self,
197        expr: impl AsRef<str>,
198        session: &dyn Session,
199    ) -> DeltaResult<Expr> {
200        let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
201        parse_predicate_expression(&schema, expr, session)
202    }
203}
204
205impl DataFusionMixins for EagerSnapshot {
206    fn read_schema(&self) -> ArrowSchemaRef {
207        self.snapshot().read_schema()
208    }
209
210    fn input_schema(&self) -> ArrowSchemaRef {
211        self.snapshot().input_schema()
212    }
213
214    fn parse_predicate_expression(
215        &self,
216        expr: impl AsRef<str>,
217        session: &dyn Session,
218    ) -> DeltaResult<Expr> {
219        self.snapshot().parse_predicate_expression(expr, session)
220    }
221}
222
223fn _arrow_schema(
224    schema: SchemaRef,
225    partition_columns: &[String],
226    wrap_partitions: bool,
227) -> ArrowSchemaRef {
228    let fields = schema
229        .fields()
230        .into_iter()
231        .filter(|f| !partition_columns.contains(&f.name().to_string()))
232        .cloned()
233        .chain(
234            // We need stable order between logical and physical schemas, but the order of
235            // partitioning columns is not always the same in the json schema and the array
236            partition_columns.iter().map(|partition_col| {
237                let field = schema.field_with_name(partition_col).unwrap();
238                let corrected = if wrap_partitions {
239                    match field.data_type() {
240                        // Only dictionary-encode types that may be large
241                        // https://github.com/apache/arrow-datafusion/pull/5545
242                        ArrowDataType::Utf8
243                        | ArrowDataType::LargeUtf8
244                        | ArrowDataType::Binary
245                        | ArrowDataType::LargeBinary => {
246                            wrap_partition_type_in_dict(field.data_type().clone())
247                        }
248                        _ => field.data_type().clone(),
249                    }
250                } else {
251                    field.data_type().clone()
252                };
253                Arc::new(field.clone().with_data_type(corrected))
254            }),
255        )
256        .collect::<Vec<_>>();
257    Arc::new(ArrowSchema::new(fields))
258}
259
260pub(crate) fn files_matching_predicate<'a>(
261    log_data: LogDataHandler<'a>,
262    filters: &[Expr],
263) -> DeltaResult<impl Iterator<Item = Add> + 'a> {
264    if let Some(Some(predicate)) =
265        (!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
266    {
267        let expr = SessionContext::new()
268            .create_physical_expr(predicate, &log_data.read_schema().to_dfschema()?)?;
269        let pruning_predicate = PruningPredicate::try_new(expr, log_data.read_schema())?;
270        let mask = pruning_predicate.prune(&log_data)?;
271
272        Ok(Either::Left(log_data.into_iter().zip(mask).filter_map(
273            |(file, keep_file)| {
274                if keep_file {
275                    Some(file.add_action())
276                } else {
277                    None
278                }
279            },
280        )))
281    } else {
282        Ok(Either::Right(
283            log_data.into_iter().map(|file| file.add_action()),
284        ))
285    }
286}
287
288pub(crate) fn get_path_column<'a>(
289    batch: &'a RecordBatch,
290    path_column: &str,
291) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
292    let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
293    batch
294        .column_by_name(path_column)
295        .unwrap()
296        .as_any()
297        .downcast_ref::<DictionaryArray<UInt16Type>>()
298        .ok_or_else(err)?
299        .downcast_dict::<StringArray>()
300        .ok_or_else(err)
301}
302
303pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
304    match t {
305        ArrowDataType::Null => Ok(ScalarValue::Null),
306        ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
307        ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
308        ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
309        ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
310        ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
311        ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
312        ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
313        ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
314        ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
315        ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
316        ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
317        ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
318        ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
319        ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
320        ArrowDataType::FixedSizeBinary(size) => {
321            Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
322        }
323        ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
324        ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
325        ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
326        ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
327            None,
328            precision.to_owned(),
329            scale.to_owned(),
330        )),
331        ArrowDataType::Timestamp(unit, tz) => {
332            let tz = tz.to_owned();
333            Ok(match unit {
334                TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
335                TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
336                TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
337                TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
338            })
339        }
340        ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
341            k.clone(),
342            Box::new(get_null_of_arrow_type(v)?),
343        )),
344        //Unsupported types...
345        ArrowDataType::Float16
346        | ArrowDataType::Decimal32(_, _)
347        | ArrowDataType::Decimal64(_, _)
348        | ArrowDataType::Decimal256(_, _)
349        | ArrowDataType::Union(_, _)
350        | ArrowDataType::LargeList(_)
351        | ArrowDataType::Struct(_)
352        | ArrowDataType::List(_)
353        | ArrowDataType::FixedSizeList(_, _)
354        | ArrowDataType::Time32(_)
355        | ArrowDataType::Time64(_)
356        | ArrowDataType::Duration(_)
357        | ArrowDataType::Interval(_)
358        | ArrowDataType::RunEndEncoded(_, _)
359        | ArrowDataType::BinaryView
360        | ArrowDataType::Utf8View
361        | ArrowDataType::LargeListView(_)
362        | ArrowDataType::ListView(_)
363        | ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
364            "Unsupported data type for Delta Lake {t}"
365        ))),
366    }
367}
368
369fn parse_date(
370    stat_val: &serde_json::Value,
371    field_dt: &ArrowDataType,
372) -> DataFusionResult<ScalarValue> {
373    let string = match stat_val {
374        serde_json::Value::String(s) => s.to_owned(),
375        _ => stat_val.to_string(),
376    };
377
378    let time_micro = ScalarValue::try_from_string(string, &ArrowDataType::Date32)?;
379    let cast_arr = cast_with_options(
380        &time_micro.to_array()?,
381        field_dt,
382        &CastOptions {
383            safe: false,
384            ..Default::default()
385        },
386    )?;
387    ScalarValue::try_from_array(&cast_arr, 0)
388}
389
390fn parse_timestamp(
391    stat_val: &serde_json::Value,
392    field_dt: &ArrowDataType,
393) -> DataFusionResult<ScalarValue> {
394    let string = match stat_val {
395        serde_json::Value::String(s) => s.to_owned(),
396        _ => stat_val.to_string(),
397    };
398
399    let time_micro = ScalarValue::try_from_string(
400        string,
401        &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
402    )?;
403    let cast_arr = cast_with_options(
404        &time_micro.to_array()?,
405        field_dt,
406        &CastOptions {
407            safe: false,
408            ..Default::default()
409        },
410    )?;
411    ScalarValue::try_from_array(&cast_arr, 0)
412}
413
414pub(crate) fn to_correct_scalar_value(
415    stat_val: &serde_json::Value,
416    field_dt: &ArrowDataType,
417) -> DataFusionResult<Option<ScalarValue>> {
418    match stat_val {
419        serde_json::Value::Array(_) => Ok(None),
420        serde_json::Value::Object(_) => Ok(None),
421        serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)),
422        serde_json::Value::String(string_val) => match field_dt {
423            ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
424            ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
425            _ => Ok(Some(ScalarValue::try_from_string(
426                string_val.to_owned(),
427                field_dt,
428            )?)),
429        },
430        other => match field_dt {
431            ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
432            ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
433            _ => Ok(Some(ScalarValue::try_from_string(
434                other.to_string(),
435                field_dt,
436            )?)),
437        },
438    }
439}
440
441/// A codec for deltalake physical plans
442#[derive(Debug)]
443pub struct DeltaPhysicalCodec {}
444
445impl PhysicalExtensionCodec for DeltaPhysicalCodec {
446    fn try_decode(
447        &self,
448        buf: &[u8],
449        inputs: &[Arc<dyn ExecutionPlan>],
450        _registry: &TaskContext,
451    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
452        let wire: DeltaScanWire = serde_json::from_reader(buf)
453            .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
454        let delta_scan = DeltaScan::new(
455            &wire.table_url,
456            wire.config,
457            (*inputs)[0].clone(),
458            wire.logical_schema,
459        );
460        Ok(Arc::new(delta_scan))
461    }
462
463    fn try_encode(
464        &self,
465        node: Arc<dyn ExecutionPlan>,
466        buf: &mut Vec<u8>,
467    ) -> Result<(), DataFusionError> {
468        let delta_scan = node
469            .as_any()
470            .downcast_ref::<DeltaScan>()
471            .ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?;
472
473        let wire = DeltaScanWire::from(delta_scan);
474        serde_json::to_writer(buf, &wire)
475            .map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?;
476        Ok(())
477    }
478}
479
480/// Does serde on DeltaTables
481#[derive(Debug)]
482pub struct DeltaLogicalCodec {}
483
484impl LogicalExtensionCodec for DeltaLogicalCodec {
485    fn try_decode(
486        &self,
487        _buf: &[u8],
488        _inputs: &[LogicalPlan],
489        _ctx: &TaskContext,
490    ) -> Result<Extension, DataFusionError> {
491        todo!("DeltaLogicalCodec")
492    }
493
494    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<(), DataFusionError> {
495        todo!("DeltaLogicalCodec")
496    }
497
498    fn try_decode_table_provider(
499        &self,
500        buf: &[u8],
501        _table_ref: &TableReference,
502        _schema: SchemaRef,
503        _ctx: &TaskContext,
504    ) -> Result<Arc<dyn TableProvider>, DataFusionError> {
505        let provider: DeltaScanNext = serde_json::from_slice(buf)
506            .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))?;
507        Ok(Arc::new(provider))
508    }
509
510    fn try_encode_table_provider(
511        &self,
512        _table_ref: &TableReference,
513        node: Arc<dyn TableProvider>,
514        buf: &mut Vec<u8>,
515    ) -> Result<(), DataFusionError> {
516        let scan = node
517            .as_ref()
518            .as_any()
519            .downcast_ref::<DeltaScanNext>()
520            .ok_or_else(|| {
521                DataFusionError::Internal("Can't encode non-delta tables".to_string())
522            })?;
523        serde_json::to_writer(buf, scan)
524            .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
525    }
526}
527
528/// Responsible for creating deltatables
529#[derive(Debug)]
530pub struct DeltaTableFactory {}
531
532#[async_trait::async_trait]
533impl TableProviderFactory for DeltaTableFactory {
534    async fn create(
535        &self,
536        ctx: &dyn Session,
537        cmd: &CreateExternalTable,
538    ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
539        let table = if cmd.options.is_empty() {
540            let table_url = ensure_table_uri(&cmd.to_owned().location)?;
541            open_table(table_url).await?
542        } else {
543            let table_url = ensure_table_uri(&cmd.to_owned().location)?;
544            open_table_with_storage_options(table_url, cmd.to_owned().options).await?
545        };
546        let table_uri = table.log_store().root_url().clone();
547        let (session_state, _) = resolve_session_state(
548            Some(ctx),
549            SessionFallbackPolicy::DeriveFromTrait,
550            || create_session().state(),
551            SessionResolveContext {
552                operation: "DeltaTableFactory::create",
553                table_uri: Some(&table_uri),
554                cdc: false,
555            },
556        )?;
557
558        Ok(table
559            .table_provider()
560            .with_session(Arc::new(session_state))
561            .await?)
562    }
563}
564
565/// A wrapper for Deltafusion's Column to preserve case-sensitivity during string conversion
566pub struct DeltaColumn {
567    inner: Column,
568}
569
570impl From<&str> for DeltaColumn {
571    fn from(c: &str) -> Self {
572        DeltaColumn {
573            inner: Column::from_qualified_name_ignore_case(c),
574        }
575    }
576}
577
578/// Create a column, cloning the string
579impl From<&String> for DeltaColumn {
580    fn from(c: &String) -> Self {
581        DeltaColumn {
582            inner: Column::from_qualified_name_ignore_case(c),
583        }
584    }
585}
586
587/// Create a column, reusing the existing string
588impl From<String> for DeltaColumn {
589    fn from(c: String) -> Self {
590        DeltaColumn {
591            inner: Column::from_qualified_name_ignore_case(c),
592        }
593    }
594}
595
596impl From<DeltaColumn> for Column {
597    fn from(value: DeltaColumn) -> Self {
598        value.inner
599    }
600}
601
602/// Create a column, reusing the existing datafusion column
603impl From<Column> for DeltaColumn {
604    fn from(c: Column) -> Self {
605        DeltaColumn { inner: c }
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use crate::DeltaTable;
612    use crate::logstore::ObjectStoreRef;
613    use crate::logstore::default_logstore::DefaultLogStore;
614    use crate::operations::write::SchemaMode;
615    use crate::test_utils::open_fs_path;
616    use crate::writer::test_utils::get_delta_schema;
617    use arrow::array::StructArray;
618    use arrow::datatypes::{Field, Schema};
619    use arrow_array::cast::AsArray;
620    use bytes::Bytes;
621    use datafusion::assert_batches_sorted_eq;
622    use datafusion::config::TableParquetOptions;
623    use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
624    use datafusion::datasource::source::DataSourceExec;
625    use datafusion::logical_expr::lit;
626    use datafusion::physical_plan::empty::EmptyExec;
627    use datafusion::physical_plan::{ExecutionPlanVisitor, PhysicalExpr, visit_execution_plan};
628    use datafusion::prelude::{SessionConfig, col};
629    use datafusion_datasource::file::FileSource as _;
630    use datafusion_proto::physical_plan::AsExecutionPlan;
631    use datafusion_proto::protobuf;
632    use delta_kernel::path::{LogPathFileType, ParsedLogPath};
633    use delta_kernel::schema::ArrayType;
634    use futures::{StreamExt, TryStreamExt, stream::BoxStream};
635    use object_store::ObjectMeta;
636    use object_store::{
637        GetOptions, GetResult, ListResult, MultipartUpload, ObjectStore, PutMultipartOptions,
638        PutOptions, PutPayload, PutResult, path::Path,
639    };
640    use serde_json::json;
641    use std::fmt::{self, Debug, Display, Formatter};
642    use std::ops::Range;
643    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
644    use url::Url;
645
646    use super::*;
647    use crate::delta_datafusion::table_provider::next::{FileSelection, MissingFilePolicy};
648    // test deserialization of serialized partition values.
649    // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
650    #[test]
651    fn test_to_correct_scalar_value() {
652        let reference_pairs = &[
653            (
654                json!("2015"),
655                ArrowDataType::Int16,
656                ScalarValue::Int16(Some(2015)),
657            ),
658            (
659                json!("2015"),
660                ArrowDataType::Int32,
661                ScalarValue::Int32(Some(2015)),
662            ),
663            (
664                json!("2015"),
665                ArrowDataType::Int64,
666                ScalarValue::Int64(Some(2015)),
667            ),
668            (
669                json!("2015"),
670                ArrowDataType::Float32,
671                ScalarValue::Float32(Some(2015_f32)),
672            ),
673            (
674                json!("2015"),
675                ArrowDataType::Float64,
676                ScalarValue::Float64(Some(2015_f64)),
677            ),
678            (
679                json!(2015),
680                ArrowDataType::Float64,
681                ScalarValue::Float64(Some(2015_f64)),
682            ),
683            (
684                json!("2015-01-01"),
685                ArrowDataType::Date32,
686                ScalarValue::Date32(Some(16436)),
687            ),
688            // (
689            //     json!("2015-01-01"),
690            //     ArrowDataType::Date64,
691            //     ScalarValue::Date64(Some(16436)),
692            // ),
693            // TODO(roeap) there seem to be differences in how precisions are handled locally and in CI, need to investigate
694            // (
695            //     json!("2020-09-08 13:42:29"),
696            //     ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
697            //     ScalarValue::TimestampNanosecond(Some(1599565349000000000), None),
698            // ),
699            // (
700            //     json!("2020-09-08 13:42:29"),
701            //     ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
702            //     ScalarValue::TimestampMicrosecond(Some(1599565349000000), None),
703            // ),
704            // (
705            //     json!("2020-09-08 13:42:29"),
706            //     ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
707            //     ScalarValue::TimestampMillisecond(Some(1599565349000), None),
708            // ),
709            (
710                json!(true),
711                ArrowDataType::Boolean,
712                ScalarValue::Boolean(Some(true)),
713            ),
714        ];
715
716        for (raw, data_type, ref_scalar) in reference_pairs {
717            let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap();
718            assert_eq!(*ref_scalar, scalar)
719        }
720    }
721
722    #[test]
723    fn roundtrip_test_delta_exec_plan() {
724        let ctx = SessionContext::new();
725        let codec = DeltaPhysicalCodec {};
726
727        let schema = Arc::new(Schema::new(vec![
728            Field::new("a", ArrowDataType::Utf8, false),
729            Field::new("b", ArrowDataType::Int32, false),
730        ]));
731        let exec_plan = Arc::from(DeltaScan::new(
732            &Url::parse("s3://my_bucket/this/is/some/path").unwrap(),
733            DeltaScanConfig::default(),
734            Arc::from(EmptyExec::new(schema.clone())),
735            schema.clone(),
736        ));
737        let proto: protobuf::PhysicalPlanNode =
738            protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
739                .expect("to proto");
740
741        let task_ctx = ctx.task_ctx();
742        let result_exec_plan: Arc<dyn ExecutionPlan> = proto
743            .try_into_physical_plan(&task_ctx, &codec)
744            .expect("from proto");
745        assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
746    }
747
748    #[tokio::test]
749    async fn roundtrip_test_delta_logical_codec_preserves_file_selection() {
750        let log_store = crate::test_utils::TestTables::Simple
751            .table_builder()
752            .unwrap()
753            .build_storage()
754            .unwrap();
755        let snapshot = Arc::new(
756            crate::kernel::Snapshot::try_new(&log_store, Default::default(), None)
757                .await
758                .unwrap(),
759        );
760        let table_root = snapshot
761            .scan_builder()
762            .build()
763            .unwrap()
764            .table_root()
765            .clone();
766        let selected_file_ids: Vec<String> = snapshot
767            .file_views(log_store.as_ref(), None)
768            .take(1)
769            .map_ok(|view| table_root.join(view.path_raw()).unwrap().to_string())
770            .try_collect()
771            .await
772            .unwrap();
773        assert_eq!(selected_file_ids.len(), 1);
774
775        let provider = DeltaScanNext::builder()
776            .with_snapshot(snapshot)
777            .build()
778            .await
779            .unwrap()
780            .with_file_selection(
781                FileSelection::new(selected_file_ids.clone())
782                    .with_missing_file_policy(MissingFilePolicy::Ignore),
783            );
784
785        let codec = DeltaLogicalCodec {};
786        let table_ref = TableReference::bare("delta_table");
787        let mut encoded = Vec::new();
788        codec
789            .try_encode_table_provider(&table_ref, Arc::new(provider), &mut encoded)
790            .unwrap();
791
792        let ctx = SessionContext::new();
793        let decoded = codec
794            .try_decode_table_provider(
795                &encoded,
796                &table_ref,
797                Arc::new(ArrowSchema::empty()),
798                &ctx.task_ctx(),
799            )
800            .unwrap();
801        let decoded_provider = decoded
802            .as_ref()
803            .as_any()
804            .downcast_ref::<DeltaScanNext>()
805            .unwrap();
806
807        let serialized = serde_json::to_value(decoded_provider).unwrap();
808        let decoded_file_ids = serialized
809            .get("file_selection")
810            .and_then(|value| value.get("file_ids"))
811            .and_then(|value| value.as_array())
812            .unwrap()
813            .iter()
814            .map(|value| value.as_str().unwrap().to_string())
815            .collect::<Vec<_>>();
816        let decoded_policy = serialized
817            .get("file_selection")
818            .and_then(|value| value.get("missing_file_policy"))
819            .and_then(|value| value.as_str())
820            .unwrap();
821
822        assert_eq!(decoded_file_ids, selected_file_ids);
823        assert_eq!(decoded_policy, "Ignore");
824    }
825
826    #[tokio::test]
827    async fn delta_table_provider_with_config() {
828        let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
829        let provider = table
830            .table_provider()
831            .with_file_column("file_source")
832            .await
833            .unwrap();
834        let ctx = SessionContext::new();
835        ctx.register_table("test", provider).unwrap();
836
837        let df = ctx
838            .sql("select c3, c1, c2, right(file_source, 77) as file_source from test")
839            .await
840            .unwrap();
841        let actual = df.collect().await.unwrap();
842        let expected = vec![
843            "+----+----+----+-------------------------------------------------------------------------------+",
844            "| c3 | c1 | c2 | file_source                                                                   |",
845            "+----+----+----+-------------------------------------------------------------------------------+",
846            "| 4  | 6  | a  | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
847            "| 5  | 4  | c  | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
848            "| 6  | 5  | b  | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
849            "+----+----+----+-------------------------------------------------------------------------------+",
850        ];
851        assert_batches_sorted_eq!(&expected, &actual);
852    }
853
854    #[tokio::test]
855    async fn delta_scan_mixed_partition_order() {
856        // Tests issue (1787) where partition columns were incorrect when they
857        // have a different order in the metadata and table schema
858        let schema = Arc::new(ArrowSchema::new(vec![
859            Field::new("modified", ArrowDataType::Utf8, true),
860            Field::new("id", ArrowDataType::Utf8, true),
861            Field::new("value", ArrowDataType::Int32, true),
862        ]));
863
864        let table = DeltaTable::new_in_memory()
865            .create()
866            .with_columns(get_delta_schema().fields().cloned())
867            .with_partition_columns(["modified", "id"])
868            .await
869            .unwrap();
870        assert_eq!(table.version(), Some(0));
871
872        let batch = RecordBatch::try_new(
873            schema.clone(),
874            vec![
875                Arc::new(arrow::array::StringArray::from(vec![
876                    "2021-02-01",
877                    "2021-02-01",
878                    "2021-02-02",
879                    "2021-02-02",
880                ])),
881                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
882                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
883            ],
884        )
885        .unwrap();
886        // write some data
887        let table = table
888            .write(vec![batch.clone()])
889            .with_save_mode(crate::protocol::SaveMode::Append)
890            .await
891            .unwrap();
892
893        let provider = table.table_provider().await.unwrap();
894        let logical_schema = provider.schema();
895        let ctx = SessionContext::new();
896        ctx.runtime_env().register_object_store(
897            table.log_store().root_url(),
898            table.log_store().object_store(None),
899        );
900        ctx.register_table("test", provider).unwrap();
901
902        let expected_logical_order = vec!["id", "value", "modified"];
903        let actual_order: Vec<String> = logical_schema
904            .fields()
905            .iter()
906            .map(|f| f.name().to_owned())
907            .collect();
908
909        let df = ctx.sql("select * from test").await.unwrap();
910        let actual = df.collect().await.unwrap();
911        let expected = vec![
912            "+----+-------+------------+",
913            "| id | value | modified   |",
914            "+----+-------+------------+",
915            "| A  | 1     | 2021-02-01 |",
916            "| B  | 10    | 2021-02-01 |",
917            "| C  | 20    | 2021-02-02 |",
918            "| D  | 100   | 2021-02-02 |",
919            "+----+-------+------------+",
920        ];
921        assert_batches_sorted_eq!(&expected, &actual);
922        assert_eq!(expected_logical_order, actual_order);
923    }
924
925    #[tokio::test]
926    async fn delta_scan_case_sensitive() {
927        let schema = Arc::new(ArrowSchema::new(vec![
928            Field::new("moDified", ArrowDataType::Utf8, true),
929            Field::new("ID", ArrowDataType::Utf8, true),
930            Field::new("vaLue", ArrowDataType::Int32, true),
931        ]));
932
933        let batch = RecordBatch::try_new(
934            schema.clone(),
935            vec![
936                Arc::new(arrow::array::StringArray::from(vec![
937                    "2021-02-01",
938                    "2021-02-01",
939                    "2021-02-02",
940                    "2021-02-02",
941                ])),
942                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
943                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
944            ],
945        )
946        .unwrap();
947        // write some data
948        let table = DeltaTable::new_in_memory()
949            .write(vec![batch.clone()])
950            .with_save_mode(crate::protocol::SaveMode::Append)
951            .await
952            .unwrap();
953
954        let config = DeltaScanConfigBuilder::new()
955            .build(table.snapshot().unwrap().snapshot())
956            .unwrap();
957        let log = table.log_store();
958
959        let provider =
960            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
961                .unwrap();
962        let ctx: SessionContext = DeltaSessionContext::default().into();
963        ctx.register_table("test", Arc::new(provider)).unwrap();
964
965        let df = ctx
966            .sql("select ID, moDified, vaLue from test")
967            .await
968            .unwrap();
969        let actual = df.collect().await.unwrap();
970        let expected = vec![
971            "+----+------------+-------+",
972            "| ID | moDified   | vaLue |",
973            "+----+------------+-------+",
974            "| A  | 2021-02-01 | 1     |",
975            "| B  | 2021-02-01 | 10    |",
976            "| C  | 2021-02-02 | 20    |",
977            "| D  | 2021-02-02 | 100   |",
978            "+----+------------+-------+",
979        ];
980        assert_batches_sorted_eq!(&expected, &actual);
981
982        /* TODO: Datafusion doesn't have any options to prevent case-sensitivity with the col func */
983        /*
984        let df = ctx
985            .table("test")
986            .await
987            .unwrap()
988            .select(vec![col("ID"), col("moDified"), col("vaLue")])
989            .unwrap();
990        let actual = df.collect().await.unwrap();
991        assert_batches_sorted_eq!(&expected, &actual);
992        */
993    }
994
995    #[tokio::test]
996    async fn delta_scan_supports_missing_columns() {
997        let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
998            "col_1",
999            ArrowDataType::Utf8,
1000            true,
1001        )]));
1002
1003        let batch1 = RecordBatch::try_new(
1004            schema1.clone(),
1005            vec![Arc::new(arrow::array::StringArray::from(vec![
1006                Some("A"),
1007                Some("B"),
1008            ]))],
1009        )
1010        .unwrap();
1011
1012        let schema2 = Arc::new(ArrowSchema::new(vec![
1013            Field::new("col_1", ArrowDataType::Utf8, true),
1014            Field::new("col_2", ArrowDataType::Utf8, true),
1015        ]));
1016
1017        let batch2 = RecordBatch::try_new(
1018            schema2.clone(),
1019            vec![
1020                Arc::new(arrow::array::StringArray::from(vec![
1021                    Some("E"),
1022                    Some("F"),
1023                    Some("G"),
1024                ])),
1025                Arc::new(arrow::array::StringArray::from(vec![
1026                    Some("E2"),
1027                    Some("F2"),
1028                    Some("G2"),
1029                ])),
1030            ],
1031        )
1032        .unwrap();
1033
1034        let table = DeltaTable::new_in_memory()
1035            .write(vec![batch2])
1036            .with_save_mode(crate::protocol::SaveMode::Append)
1037            .await
1038            .unwrap();
1039
1040        let table = table
1041            .write(vec![batch1])
1042            .with_schema_mode(SchemaMode::Merge)
1043            .with_save_mode(crate::protocol::SaveMode::Append)
1044            .await
1045            .unwrap();
1046
1047        let config = DeltaScanConfigBuilder::new()
1048            .build(table.snapshot().unwrap().snapshot())
1049            .unwrap();
1050        let log = table.log_store();
1051
1052        let provider =
1053            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1054                .unwrap();
1055        let ctx: SessionContext = DeltaSessionContext::default().into();
1056        ctx.register_table("test", Arc::new(provider)).unwrap();
1057
1058        let df = ctx.sql("select col_1, col_2 from test").await.unwrap();
1059        let actual = df.collect().await.unwrap();
1060        let expected = vec![
1061            "+-------+-------+",
1062            "| col_1 | col_2 |",
1063            "+-------+-------+",
1064            "| A     |       |",
1065            "| B     |       |",
1066            "| E     | E2    |",
1067            "| F     | F2    |",
1068            "| G     | G2    |",
1069            "+-------+-------+",
1070        ];
1071        assert_batches_sorted_eq!(&expected, &actual);
1072    }
1073
1074    #[tokio::test]
1075    async fn delta_scan_supports_pushdown() {
1076        let schema = Arc::new(ArrowSchema::new(vec![
1077            Field::new("col_1", ArrowDataType::Utf8, false),
1078            Field::new("col_2", ArrowDataType::Utf8, false),
1079        ]));
1080
1081        let batch = RecordBatch::try_new(
1082            schema.clone(),
1083            vec![
1084                Arc::new(arrow::array::StringArray::from(vec![
1085                    Some("A"),
1086                    Some("B"),
1087                    Some("C"),
1088                ])),
1089                Arc::new(arrow::array::StringArray::from(vec![
1090                    Some("A2"),
1091                    Some("B2"),
1092                    Some("C2"),
1093                ])),
1094            ],
1095        )
1096        .unwrap();
1097
1098        let table = DeltaTable::new_in_memory()
1099            .write(vec![batch])
1100            .with_save_mode(crate::protocol::SaveMode::Append)
1101            .await
1102            .unwrap();
1103
1104        let config = DeltaScanConfigBuilder::new()
1105            .build(table.snapshot().unwrap().snapshot())
1106            .unwrap();
1107        let log = table.log_store();
1108
1109        let provider =
1110            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1111                .unwrap();
1112
1113        let mut cfg = SessionConfig::default();
1114        cfg.options_mut().execution.parquet.pushdown_filters = true;
1115        let ctx = SessionContext::new_with_config(cfg);
1116        ctx.register_table("test", Arc::new(provider)).unwrap();
1117
1118        let df = ctx
1119            .sql("select col_1, col_2 from test WHERE col_1 = 'A'")
1120            .await
1121            .unwrap();
1122        let actual = df.collect().await.unwrap();
1123        let expected = vec![
1124            "+-------+-------+",
1125            "| col_1 | col_2 |",
1126            "+-------+-------+",
1127            "| A     | A2    |",
1128            "+-------+-------+",
1129        ];
1130        assert_batches_sorted_eq!(&expected, &actual);
1131    }
1132
1133    #[tokio::test]
1134    async fn delta_scan_supports_nested_missing_columns() {
1135        let column1_schema1: arrow::datatypes::Fields =
1136            vec![Field::new("col_1a", ArrowDataType::Utf8, true)].into();
1137        let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1138            "col_1",
1139            ArrowDataType::Struct(column1_schema1.clone()),
1140            true,
1141        )]));
1142
1143        let batch1 = RecordBatch::try_new(
1144            schema1.clone(),
1145            vec![Arc::new(StructArray::new(
1146                column1_schema1,
1147                vec![Arc::new(arrow::array::StringArray::from(vec![
1148                    Some("A"),
1149                    Some("B"),
1150                ]))],
1151                None,
1152            ))],
1153        )
1154        .unwrap();
1155
1156        let column1_schema2: arrow_schema::Fields = vec![
1157            Field::new("col_1a", ArrowDataType::Utf8, true),
1158            Field::new("col_1b", ArrowDataType::Utf8, true),
1159        ]
1160        .into();
1161        let schema2 = Arc::new(ArrowSchema::new(vec![Field::new(
1162            "col_1",
1163            ArrowDataType::Struct(column1_schema2.clone()),
1164            true,
1165        )]));
1166
1167        let batch2 = RecordBatch::try_new(
1168            schema2.clone(),
1169            vec![Arc::new(StructArray::new(
1170                column1_schema2,
1171                vec![
1172                    Arc::new(arrow::array::StringArray::from(vec![
1173                        Some("E"),
1174                        Some("F"),
1175                        Some("G"),
1176                    ])),
1177                    Arc::new(arrow::array::StringArray::from(vec![
1178                        Some("E2"),
1179                        Some("F2"),
1180                        Some("G2"),
1181                    ])),
1182                ],
1183                None,
1184            ))],
1185        )
1186        .unwrap();
1187
1188        let table = DeltaTable::new_in_memory()
1189            .write(vec![batch1])
1190            .with_save_mode(crate::protocol::SaveMode::Append)
1191            .await
1192            .unwrap();
1193
1194        let table = table
1195            .write(vec![batch2])
1196            .with_schema_mode(SchemaMode::Merge)
1197            .with_save_mode(crate::protocol::SaveMode::Append)
1198            .await
1199            .unwrap();
1200
1201        let config = DeltaScanConfigBuilder::new()
1202            .build(table.snapshot().unwrap().snapshot())
1203            .unwrap();
1204        let log = table.log_store();
1205
1206        let provider =
1207            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1208                .unwrap();
1209        let ctx: SessionContext = DeltaSessionContext::default().into();
1210        ctx.register_table("test", Arc::new(provider)).unwrap();
1211
1212        let df = ctx
1213            .sql("select col_1.col_1a, col_1.col_1b from test")
1214            .await
1215            .unwrap();
1216        let actual = df.collect().await.unwrap();
1217        let expected = vec![
1218            "+--------------------+--------------------+",
1219            "| test.col_1[col_1a] | test.col_1[col_1b] |",
1220            "+--------------------+--------------------+",
1221            "| A                  |                    |",
1222            "| B                  |                    |",
1223            "| E                  | E2                 |",
1224            "| F                  | F2                 |",
1225            "| G                  | G2                 |",
1226            "+--------------------+--------------------+",
1227        ];
1228        assert_batches_sorted_eq!(&expected, &actual);
1229    }
1230
1231    #[tokio::test]
1232    async fn test_multiple_predicate_pushdown() {
1233        let schema = Arc::new(ArrowSchema::new(vec![
1234            Field::new("moDified", ArrowDataType::Utf8, true),
1235            Field::new("id", ArrowDataType::Utf8, true),
1236            Field::new("vaLue", ArrowDataType::Int32, true),
1237        ]));
1238
1239        let batch = RecordBatch::try_new(
1240            schema.clone(),
1241            vec![
1242                Arc::new(arrow::array::StringArray::from(vec![
1243                    "2021-02-01",
1244                    "2021-02-01",
1245                    "2021-02-02",
1246                    "2021-02-02",
1247                ])),
1248                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1249                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1250            ],
1251        )
1252        .unwrap();
1253        // write some data
1254        let table = DeltaTable::new_in_memory()
1255            .write(vec![batch.clone()])
1256            .with_save_mode(crate::protocol::SaveMode::Append)
1257            .await
1258            .unwrap();
1259
1260        let datafusion = SessionContext::new();
1261        table
1262            .update_datafusion_session(&datafusion.state())
1263            .unwrap();
1264
1265        datafusion
1266            .register_table("snapshot", table.table_provider().await.unwrap())
1267            .unwrap();
1268
1269        let df = datafusion
1270            .sql("select * from snapshot where id > 10000 and id < 20000")
1271            .await
1272            .unwrap();
1273
1274        df.collect().await.unwrap();
1275    }
1276
1277    #[tokio::test]
1278    async fn test_delta_scan_builder_no_scan_config() {
1279        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1280        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1281        let table = DeltaTable::new_in_memory()
1282            .write(vec![batch])
1283            .with_save_mode(crate::protocol::SaveMode::Append)
1284            .await
1285            .unwrap();
1286
1287        let ctx = SessionContext::new();
1288        let state = ctx.state();
1289        let scan = DeltaScanBuilder::new(
1290            table.snapshot().unwrap().snapshot(),
1291            table.log_store(),
1292            &state,
1293        )
1294        .with_filter(Some(col("a").eq(lit("s"))))
1295        .build()
1296        .await
1297        .unwrap();
1298
1299        let mut visitor = ParquetVisitor::default();
1300        visit_execution_plan(&scan, &mut visitor).unwrap();
1301
1302        assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
1303    }
1304
1305    #[tokio::test]
1306    async fn test_delta_scan_builder_scan_config_disable_pushdown() {
1307        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1308        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1309        let table = DeltaTable::new_in_memory()
1310            .write(vec![batch])
1311            .with_save_mode(crate::protocol::SaveMode::Append)
1312            .await
1313            .unwrap();
1314
1315        let snapshot = table.snapshot().unwrap();
1316        let ctx = SessionContext::new();
1317        let state = ctx.state();
1318        let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1319            .with_filter(Some(col("a").eq(lit("s"))))
1320            .with_scan_config(
1321                DeltaScanConfigBuilder::new()
1322                    .with_parquet_pushdown(false)
1323                    .build(snapshot.snapshot())
1324                    .unwrap(),
1325            )
1326            .build()
1327            .await
1328            .unwrap();
1329
1330        let mut visitor = ParquetVisitor::default();
1331        visit_execution_plan(&scan, &mut visitor).unwrap();
1332
1333        assert!(visitor.predicate.is_none());
1334    }
1335
1336    #[tokio::test]
1337    async fn test_delta_scan_applies_parquet_options() {
1338        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1339        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1340        let table = DeltaTable::new_in_memory()
1341            .write(vec![batch])
1342            .with_save_mode(crate::protocol::SaveMode::Append)
1343            .await
1344            .unwrap();
1345
1346        let snapshot = table.snapshot().unwrap();
1347
1348        let mut config = SessionConfig::default();
1349        config.options_mut().execution.parquet.pushdown_filters = true;
1350        let ctx = SessionContext::new_with_config(config);
1351        let state = ctx.state();
1352
1353        let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1354            .build()
1355            .await
1356            .unwrap();
1357
1358        let mut visitor = ParquetVisitor::default();
1359        visit_execution_plan(&scan, &mut visitor).unwrap();
1360
1361        assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
1362    }
1363
1364    /// Extracts fields from the parquet scan
1365    #[derive(Default)]
1366    struct ParquetVisitor {
1367        predicate: Option<Arc<dyn PhysicalExpr>>,
1368        options: Option<TableParquetOptions>,
1369    }
1370
1371    impl ExecutionPlanVisitor for ParquetVisitor {
1372        type Error = DataFusionError;
1373
1374        fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
1375            let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
1376                return Ok(true);
1377            };
1378
1379            let Some(scan_config) = datasource_exec
1380                .data_source()
1381                .as_any()
1382                .downcast_ref::<FileScanConfig>()
1383            else {
1384                return Ok(true);
1385            };
1386
1387            if let Some(parquet_source) = scan_config
1388                .file_source
1389                .as_any()
1390                .downcast_ref::<ParquetSource>()
1391            {
1392                self.options = Some(parquet_source.table_parquet_options().clone());
1393                self.predicate = parquet_source.filter();
1394            }
1395
1396            Ok(true)
1397        }
1398    }
1399
1400    // Run a query that filters out all files and sorts.
1401    // Verify that it returns an empty set of rows without panicking.
1402    //
1403    // Historically, we had a bug that caused us to emit a query plan with 0 partitions, which
1404    // datafusion rejected.
1405    #[tokio::test]
1406    async fn passes_sanity_checker_when_all_files_filtered() {
1407        let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
1408        let ctx = create_session().into_inner();
1409        ctx.register_table("test", table.table_provider().await.unwrap())
1410            .unwrap();
1411
1412        let df = ctx
1413            .sql("select * from test where c3 = 100 ORDER BY c1 ASC")
1414            .await
1415            .unwrap();
1416        let actual = df.collect().await.unwrap();
1417
1418        assert_eq!(actual.len(), 0);
1419    }
1420
1421    #[tokio::test]
1422    async fn test_delta_scan_uses_parquet_column_pruning() {
1423        let small: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["a"]));
1424        let large: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec![
1425            "b".repeat(1024).as_str(),
1426        ]));
1427        let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap();
1428        let table = DeltaTable::new_in_memory()
1429            .write(vec![batch])
1430            .with_save_mode(crate::protocol::SaveMode::Append)
1431            .await
1432            .unwrap();
1433
1434        let config = DeltaScanConfigBuilder::new()
1435            .build(table.snapshot().unwrap().snapshot())
1436            .unwrap();
1437
1438        let (object_store, mut operations) =
1439            RecordingObjectStore::new(table.log_store().object_store(None));
1440        // this uses an in memory store pointing at root...
1441        let both_store = Arc::new(object_store);
1442        let log_store = DefaultLogStore::new(
1443            both_store.clone(),
1444            both_store,
1445            table.log_store().config().clone(),
1446        );
1447        let provider = DeltaTableProvider::try_new(
1448            table.snapshot().unwrap().snapshot().clone(),
1449            Arc::new(log_store),
1450            config,
1451        )
1452        .unwrap();
1453        let ctx = SessionContext::new();
1454        ctx.register_table("test", Arc::new(provider)).unwrap();
1455        let state = ctx.state();
1456
1457        let df = ctx.sql("select small from test").await.unwrap();
1458        let plan = df.create_physical_plan().await.unwrap();
1459
1460        let mut stream = plan.execute(0, state.task_ctx()).unwrap();
1461        let Some(Ok(batch)) = stream.next().await else {
1462            panic!()
1463        };
1464        assert!(stream.next().await.is_none());
1465        assert_eq!(1, batch.num_columns());
1466        assert_eq!(1, batch.num_rows());
1467        let small = batch.column_by_name("small").unwrap().as_string::<i32>();
1468        assert_eq!("a", small.iter().next().unwrap().unwrap());
1469
1470        let expected = vec![
1471            ObjectStoreOperation::Get(LocationType::Commit),
1472            ObjectStoreOperation::GetRange(LocationType::Data, 957..965),
1473            ObjectStoreOperation::GetRange(LocationType::Data, 326..957),
1474        ];
1475        let mut actual = Vec::new();
1476        operations.recv_many(&mut actual, 3).await;
1477        assert_eq!(expected, actual);
1478    }
1479
1480    #[tokio::test]
1481    async fn test_push_down_filter_panic_2602() -> DeltaResult<()> {
1482        use crate::kernel::schema::{DataType, PrimitiveType};
1483        let ctx = SessionContext::new();
1484        let table = DeltaTable::new_in_memory()
1485            .create()
1486            .with_column("id", DataType::Primitive(PrimitiveType::Long), true, None)
1487            .with_column(
1488                "name",
1489                DataType::Primitive(PrimitiveType::String),
1490                true,
1491                None,
1492            )
1493            .with_column("b", DataType::Primitive(PrimitiveType::Boolean), true, None)
1494            .with_column(
1495                "ts",
1496                DataType::Primitive(PrimitiveType::Timestamp),
1497                true,
1498                None,
1499            )
1500            .with_column("dt", DataType::Primitive(PrimitiveType::Date), true, None)
1501            .with_column(
1502                "zap",
1503                DataType::Array(Box::new(ArrayType::new(
1504                    DataType::Primitive(PrimitiveType::Boolean),
1505                    true,
1506                ))),
1507                true,
1508                None,
1509            )
1510            .await?;
1511        table.update_datafusion_session(&ctx.state()).unwrap();
1512
1513        ctx.register_table("snapshot", table.table_provider().await.unwrap())
1514            .unwrap();
1515
1516        let df = ctx
1517            .sql("select * from snapshot where id > 10000 and id < 20000")
1518            .await
1519            .unwrap();
1520
1521        let _ = df.collect().await?;
1522        Ok(())
1523    }
1524
1525    /// Records operations made by the inner object store on a channel obtained at construction
1526    struct RecordingObjectStore {
1527        inner: ObjectStoreRef,
1528        operations: UnboundedSender<ObjectStoreOperation>,
1529    }
1530
1531    impl RecordingObjectStore {
1532        /// Returns an object store and a channel recording all operations made by the inner object store
1533        fn new(inner: ObjectStoreRef) -> (Self, UnboundedReceiver<ObjectStoreOperation>) {
1534            let (operations, operations_receiver) = unbounded_channel();
1535            (Self { inner, operations }, operations_receiver)
1536        }
1537    }
1538
1539    impl Display for RecordingObjectStore {
1540        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1541            Display::fmt(&self.inner, f)
1542        }
1543    }
1544
1545    impl Debug for RecordingObjectStore {
1546        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1547            Debug::fmt(&self.inner, f)
1548        }
1549    }
1550
1551    #[derive(Debug, PartialEq)]
1552    enum ObjectStoreOperation {
1553        GetRanges(LocationType, Vec<Range<u64>>),
1554        GetRange(LocationType, Range<u64>),
1555        GetOpts(LocationType),
1556        Get(LocationType),
1557    }
1558
1559    #[derive(Debug, PartialEq)]
1560    enum LocationType {
1561        Data,
1562        Commit,
1563    }
1564
1565    impl From<&Path> for LocationType {
1566        fn from(value: &Path) -> Self {
1567            let dummy_url = Url::parse("dummy:///").unwrap();
1568            let parsed = ParsedLogPath::try_from(dummy_url.join(value.as_ref()).unwrap()).unwrap();
1569            if let Some(parsed) = parsed
1570                && matches!(parsed.file_type, LogPathFileType::Commit)
1571            {
1572                return LocationType::Commit;
1573            }
1574            if value.to_string().starts_with("part-") {
1575                LocationType::Data
1576            } else {
1577                panic!("Unknown location type: {value:?}")
1578            }
1579        }
1580    }
1581
1582    // Currently only read operations are recorded. Extend as necessary.
1583    #[async_trait::async_trait]
1584    impl ObjectStore for RecordingObjectStore {
1585        async fn put(
1586            &self,
1587            location: &Path,
1588            payload: PutPayload,
1589        ) -> object_store::Result<PutResult> {
1590            self.inner.put(location, payload).await
1591        }
1592
1593        async fn put_opts(
1594            &self,
1595            location: &Path,
1596            payload: PutPayload,
1597            opts: PutOptions,
1598        ) -> object_store::Result<PutResult> {
1599            self.inner.put_opts(location, payload, opts).await
1600        }
1601
1602        async fn put_multipart(
1603            &self,
1604            location: &Path,
1605        ) -> object_store::Result<Box<dyn MultipartUpload>> {
1606            self.inner.put_multipart(location).await
1607        }
1608
1609        async fn put_multipart_opts(
1610            &self,
1611            location: &Path,
1612            opts: PutMultipartOptions,
1613        ) -> object_store::Result<Box<dyn MultipartUpload>> {
1614            self.inner.put_multipart_opts(location, opts).await
1615        }
1616
1617        async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
1618            self.operations
1619                .send(ObjectStoreOperation::Get(location.into()))
1620                .unwrap();
1621            self.inner.get(location).await
1622        }
1623
1624        async fn get_opts(
1625            &self,
1626            location: &Path,
1627            options: GetOptions,
1628        ) -> object_store::Result<GetResult> {
1629            self.operations
1630                .send(ObjectStoreOperation::GetOpts(location.into()))
1631                .unwrap();
1632            self.inner.get_opts(location, options).await
1633        }
1634
1635        async fn get_range(
1636            &self,
1637            location: &Path,
1638            range: Range<u64>,
1639        ) -> object_store::Result<Bytes> {
1640            self.operations
1641                .send(ObjectStoreOperation::GetRange(
1642                    location.into(),
1643                    range.clone(),
1644                ))
1645                .unwrap();
1646            self.inner.get_range(location, range).await
1647        }
1648
1649        async fn get_ranges(
1650            &self,
1651            location: &Path,
1652            ranges: &[Range<u64>],
1653        ) -> object_store::Result<Vec<Bytes>> {
1654            self.operations
1655                .send(ObjectStoreOperation::GetRanges(
1656                    location.into(),
1657                    ranges.to_vec(),
1658                ))
1659                .unwrap();
1660            self.inner.get_ranges(location, ranges).await
1661        }
1662
1663        async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
1664            self.inner.head(location).await
1665        }
1666
1667        async fn delete(&self, location: &Path) -> object_store::Result<()> {
1668            self.inner.delete(location).await
1669        }
1670
1671        fn delete_stream<'a>(
1672            &'a self,
1673            locations: BoxStream<'a, object_store::Result<Path>>,
1674        ) -> BoxStream<'a, object_store::Result<Path>> {
1675            self.inner.delete_stream(locations)
1676        }
1677
1678        fn list(
1679            &self,
1680            prefix: Option<&Path>,
1681        ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1682            self.inner.list(prefix)
1683        }
1684
1685        fn list_with_offset(
1686            &self,
1687            prefix: Option<&Path>,
1688            offset: &Path,
1689        ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1690            self.inner.list_with_offset(prefix, offset)
1691        }
1692
1693        async fn list_with_delimiter(
1694            &self,
1695            prefix: Option<&Path>,
1696        ) -> object_store::Result<ListResult> {
1697            self.inner.list_with_delimiter(prefix).await
1698        }
1699
1700        async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1701            self.inner.copy(from, to).await
1702        }
1703
1704        async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1705            self.inner.rename(from, to).await
1706        }
1707
1708        async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1709            self.inner.copy_if_not_exists(from, to).await
1710        }
1711
1712        async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1713            self.inner.rename_if_not_exists(from, to).await
1714        }
1715    }
1716}