Skip to main content

deltalake_core/delta_datafusion/
mod.rs

1//! Datafusion integration for Delta Table
2//!
3//! Example:
4//!
5//! ```rust
6//! use std::sync::Arc;
7//! use datafusion::execution::context::SessionContext;
8//!
9//! async {
10//!   let mut ctx = SessionContext::new();
11//!   let table = deltalake_core::open_table_with_storage_options(
12//!       url::Url::parse("memory://").unwrap(),
13//!       std::collections::HashMap::new()
14//!   )
15//!       .await
16//!       .unwrap();
17//!   ctx.register_table("demo", table.table_provider().await.unwrap()).unwrap();
18//!
19//!   let batches = ctx
20//!       .sql("SELECT * FROM demo").await.unwrap()
21//!       .collect()
22//!       .await.unwrap();
23//! };
24//! ```
25
26use std::fmt::Debug;
27use std::sync::Arc;
28
29use arrow::array::types::UInt16Type;
30use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
31use arrow_cast::{CastOptions, cast_with_options};
32use arrow_schema::{
33    DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef,
34    TimeUnit,
35};
36use datafusion::catalog::{Session, TableProviderFactory};
37use datafusion::common::scalar::ScalarValue;
38use datafusion::common::{
39    Column, DFSchema, DataFusionError, Result as DataFusionResult, TableReference, ToDFSchema,
40};
41use datafusion::datasource::TableProvider;
42use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
43use datafusion::execution::TaskContext;
44use datafusion::execution::context::SessionContext;
45use datafusion::logical_expr::logical_plan::CreateExternalTable;
46use datafusion::logical_expr::utils::conjunction;
47use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
48use datafusion::physical_optimizer::pruning::PruningPredicate;
49use datafusion::physical_plan::ExecutionPlan;
50use datafusion_proto::logical_plan::LogicalExtensionCodec;
51use datafusion_proto::physical_plan::PhysicalExtensionCodec;
52use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
53use either::Either;
54
55use crate::delta_datafusion::expr::parse_predicate_expression;
56use crate::delta_datafusion::table_provider::DeltaScanWire;
57use crate::ensure_table_uri;
58use crate::errors::{DeltaResult, DeltaTableError};
59use crate::kernel::{Add, EagerSnapshot, LogDataHandler, Snapshot};
60use crate::{open_table, open_table_with_storage_options};
61
62pub(crate) use self::session::DeltaSessionExt;
63pub use self::session::{
64    DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, DeltaSessionContext,
65    create_session, create_session_state_with_spill_config,
66};
67pub use self::table_provider::next::{DeletionVectorSelection, DeltaScan as DeltaScanNext};
68pub(crate) use self::utils::*;
69pub use cdf::scan::DeltaCdfTableProvider;
70pub(crate) use data_validation::{
71    DataValidationExec, constraints_to_exprs, generated_columns_to_exprs, validation_predicates,
72};
73pub(crate) use find_files::*;
74pub(crate) use table_provider::next::normalize_path_as_file_id;
75pub use table_provider::{
76    DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder,
77    next::DeltaScanExec,
78};
79pub(crate) use table_provider::{
80    DeltaScanBuilder, next::FILE_ID_COLUMN_DEFAULT, resolve_file_column_name,
81    update_datafusion_session,
82};
83
84pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
85
86pub mod cdf;
87mod data_validation;
88pub mod engine;
89pub mod expr;
90mod file_id;
91mod find_files;
92pub mod logical;
93pub mod physical;
94pub mod planner;
95mod session;
96pub use session::SessionFallbackPolicy;
97pub(crate) use session::{SessionResolveContext, resolve_session_state};
98mod table_provider;
99pub(crate) mod utils;
100
101impl From<DeltaTableError> for DataFusionError {
102    fn from(err: DeltaTableError) -> Self {
103        match err {
104            DeltaTableError::Arrow { source } => DataFusionError::from(source),
105            DeltaTableError::Io { source } => DataFusionError::IoError(source),
106            DeltaTableError::ObjectStore { source } => DataFusionError::from(source),
107            DeltaTableError::Parquet { source } => DataFusionError::from(source),
108            _ => DataFusionError::External(Box::new(err)),
109        }
110    }
111}
112
113impl From<DataFusionError> for DeltaTableError {
114    fn from(err: DataFusionError) -> Self {
115        match err {
116            DataFusionError::ArrowError(source, _) => DeltaTableError::from(*source),
117            DataFusionError::IoError(source) => DeltaTableError::Io { source },
118            DataFusionError::ObjectStore(source) => DeltaTableError::from(*source),
119            DataFusionError::ParquetError(source) => DeltaTableError::from(*source),
120            _ => DeltaTableError::Generic(err.to_string()),
121        }
122    }
123}
124
125/// Convenience trait for calling common methods on snapshot hierarchies
126pub trait DataFusionMixins {
127    /// The physical datafusion schema of a table
128    fn read_schema(&self) -> ArrowSchemaRef;
129
130    /// Get the table schema as an [`ArrowSchemaRef`]
131    fn input_schema(&self) -> ArrowSchemaRef;
132
133    /// Parse an expression string into a datafusion [`Expr`]
134    fn parse_predicate_expression(
135        &self,
136        expr: impl AsRef<str>,
137        session: &dyn Session,
138    ) -> DeltaResult<Expr>;
139}
140
141impl DataFusionMixins for Snapshot {
142    fn read_schema(&self) -> ArrowSchemaRef {
143        _arrow_schema(
144            self.arrow_schema(),
145            self.metadata().partition_columns(),
146            true,
147        )
148    }
149
150    fn input_schema(&self) -> ArrowSchemaRef {
151        _arrow_schema(
152            self.arrow_schema(),
153            self.metadata().partition_columns(),
154            false,
155        )
156    }
157
158    fn parse_predicate_expression(
159        &self,
160        expr: impl AsRef<str>,
161        session: &dyn Session,
162    ) -> DeltaResult<Expr> {
163        let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
164        parse_predicate_expression(&schema, expr, session)
165    }
166}
167
168impl DataFusionMixins for LogDataHandler<'_> {
169    fn read_schema(&self) -> ArrowSchemaRef {
170        _arrow_schema(
171            Arc::new(
172                self.table_configuration()
173                    .schema()
174                    .as_ref()
175                    .try_into_arrow()
176                    .unwrap(),
177            ),
178            self.table_configuration().metadata().partition_columns(),
179            true,
180        )
181    }
182
183    fn input_schema(&self) -> ArrowSchemaRef {
184        _arrow_schema(
185            Arc::new(
186                self.table_configuration()
187                    .schema()
188                    .as_ref()
189                    .try_into_arrow()
190                    .unwrap(),
191            ),
192            self.table_configuration().metadata().partition_columns(),
193            false,
194        )
195    }
196
197    fn parse_predicate_expression(
198        &self,
199        expr: impl AsRef<str>,
200        session: &dyn Session,
201    ) -> DeltaResult<Expr> {
202        let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
203        parse_predicate_expression(&schema, expr, session)
204    }
205}
206
207impl DataFusionMixins for EagerSnapshot {
208    fn read_schema(&self) -> ArrowSchemaRef {
209        self.snapshot().read_schema()
210    }
211
212    fn input_schema(&self) -> ArrowSchemaRef {
213        self.snapshot().input_schema()
214    }
215
216    fn parse_predicate_expression(
217        &self,
218        expr: impl AsRef<str>,
219        session: &dyn Session,
220    ) -> DeltaResult<Expr> {
221        self.snapshot().parse_predicate_expression(expr, session)
222    }
223}
224
225fn _arrow_schema(
226    schema: SchemaRef,
227    partition_columns: &[String],
228    wrap_partitions: bool,
229) -> ArrowSchemaRef {
230    let fields = schema
231        .fields()
232        .into_iter()
233        .filter(|f| !partition_columns.contains(&f.name().to_string()))
234        .cloned()
235        .chain(
236            // We need stable order between logical and physical schemas, but the order of
237            // partitioning columns is not always the same in the json schema and the array
238            partition_columns.iter().map(|partition_col| {
239                let field = schema.field_with_name(partition_col).unwrap();
240                let corrected = if wrap_partitions {
241                    match field.data_type() {
242                        // Only dictionary-encode types that may be large
243                        // https://github.com/apache/arrow-datafusion/pull/5545
244                        ArrowDataType::Utf8
245                        | ArrowDataType::LargeUtf8
246                        | ArrowDataType::Binary
247                        | ArrowDataType::LargeBinary => {
248                            wrap_partition_type_in_dict(field.data_type().clone())
249                        }
250                        _ => field.data_type().clone(),
251                    }
252                } else {
253                    field.data_type().clone()
254                };
255                Arc::new(field.clone().with_data_type(corrected))
256            }),
257        )
258        .collect::<Vec<_>>();
259    Arc::new(ArrowSchema::new(fields))
260}
261
262pub(crate) fn files_matching_predicate<'a>(
263    log_data: LogDataHandler<'a>,
264    filters: &[Expr],
265) -> DeltaResult<impl Iterator<Item = Add> + 'a> {
266    if let Some(Some(predicate)) =
267        (!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
268    {
269        let expr = SessionContext::new()
270            .create_physical_expr(predicate, &log_data.read_schema().to_dfschema()?)?;
271        let pruning_predicate = PruningPredicate::try_new(expr, log_data.read_schema())?;
272        let mask = pruning_predicate.prune(&log_data)?;
273
274        Ok(Either::Left(log_data.into_iter().zip(mask).filter_map(
275            |(file, keep_file)| {
276                if keep_file {
277                    Some(file.add_action())
278                } else {
279                    None
280                }
281            },
282        )))
283    } else {
284        Ok(Either::Right(
285            log_data.into_iter().map(|file| file.add_action()),
286        ))
287    }
288}
289
290pub(crate) fn get_path_column<'a>(
291    batch: &'a RecordBatch,
292    path_column: &str,
293) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
294    let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
295    batch
296        .column_by_name(path_column)
297        .unwrap()
298        .as_any()
299        .downcast_ref::<DictionaryArray<UInt16Type>>()
300        .ok_or_else(err)?
301        .downcast_dict::<StringArray>()
302        .ok_or_else(err)
303}
304
305pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
306    match t {
307        ArrowDataType::Null => Ok(ScalarValue::Null),
308        ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
309        ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
310        ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
311        ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
312        ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
313        ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
314        ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
315        ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
316        ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
317        ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
318        ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
319        ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
320        ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
321        ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
322        ArrowDataType::FixedSizeBinary(size) => {
323            Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
324        }
325        ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
326        ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
327        ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
328        ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
329            None,
330            precision.to_owned(),
331            scale.to_owned(),
332        )),
333        ArrowDataType::Timestamp(unit, tz) => {
334            let tz = tz.to_owned();
335            Ok(match unit {
336                TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
337                TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
338                TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
339                TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
340            })
341        }
342        ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
343            k.clone(),
344            Box::new(get_null_of_arrow_type(v)?),
345        )),
346        //Unsupported types...
347        ArrowDataType::Float16
348        | ArrowDataType::Decimal32(_, _)
349        | ArrowDataType::Decimal64(_, _)
350        | ArrowDataType::Decimal256(_, _)
351        | ArrowDataType::Union(_, _)
352        | ArrowDataType::LargeList(_)
353        | ArrowDataType::Struct(_)
354        | ArrowDataType::List(_)
355        | ArrowDataType::FixedSizeList(_, _)
356        | ArrowDataType::Time32(_)
357        | ArrowDataType::Time64(_)
358        | ArrowDataType::Duration(_)
359        | ArrowDataType::Interval(_)
360        | ArrowDataType::RunEndEncoded(_, _)
361        | ArrowDataType::BinaryView
362        | ArrowDataType::Utf8View
363        | ArrowDataType::LargeListView(_)
364        | ArrowDataType::ListView(_)
365        | ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
366            "Unsupported data type for Delta Lake {t}"
367        ))),
368    }
369}
370
371fn parse_date(
372    stat_val: &serde_json::Value,
373    field_dt: &ArrowDataType,
374) -> DataFusionResult<ScalarValue> {
375    let string = match stat_val {
376        serde_json::Value::String(s) => s.to_owned(),
377        _ => stat_val.to_string(),
378    };
379
380    let time_micro = ScalarValue::try_from_string(string, &ArrowDataType::Date32)?;
381    let cast_arr = cast_with_options(
382        &time_micro.to_array()?,
383        field_dt,
384        &CastOptions {
385            safe: false,
386            ..Default::default()
387        },
388    )?;
389    ScalarValue::try_from_array(&cast_arr, 0)
390}
391
392fn parse_timestamp(
393    stat_val: &serde_json::Value,
394    field_dt: &ArrowDataType,
395) -> DataFusionResult<ScalarValue> {
396    let string = match stat_val {
397        serde_json::Value::String(s) => s.to_owned(),
398        _ => stat_val.to_string(),
399    };
400
401    let time_micro = ScalarValue::try_from_string(
402        string,
403        &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
404    )?;
405    let cast_arr = cast_with_options(
406        &time_micro.to_array()?,
407        field_dt,
408        &CastOptions {
409            safe: false,
410            ..Default::default()
411        },
412    )?;
413    ScalarValue::try_from_array(&cast_arr, 0)
414}
415
416pub(crate) fn to_correct_scalar_value(
417    stat_val: &serde_json::Value,
418    field_dt: &ArrowDataType,
419) -> DataFusionResult<Option<ScalarValue>> {
420    match stat_val {
421        serde_json::Value::Array(_) => Ok(None),
422        serde_json::Value::Object(_) => Ok(None),
423        serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)),
424        serde_json::Value::String(string_val) => match field_dt {
425            ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
426            ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
427            _ => Ok(Some(ScalarValue::try_from_string(
428                string_val.to_owned(),
429                field_dt,
430            )?)),
431        },
432        other => match field_dt {
433            ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
434            ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
435            _ => Ok(Some(ScalarValue::try_from_string(
436                other.to_string(),
437                field_dt,
438            )?)),
439        },
440    }
441}
442
443/// A codec for deltalake physical plans
444#[derive(Debug)]
445pub struct DeltaPhysicalCodec {}
446
447impl PhysicalExtensionCodec for DeltaPhysicalCodec {
448    fn try_decode(
449        &self,
450        buf: &[u8],
451        inputs: &[Arc<dyn ExecutionPlan>],
452        _registry: &TaskContext,
453    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
454        let wire: DeltaScanWire = serde_json::from_reader(buf)
455            .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
456        let delta_scan = DeltaScan::new(
457            &wire.table_url,
458            wire.config,
459            (*inputs)[0].clone(),
460            wire.logical_schema,
461        );
462        Ok(Arc::new(delta_scan))
463    }
464
465    fn try_encode(
466        &self,
467        node: Arc<dyn ExecutionPlan>,
468        buf: &mut Vec<u8>,
469    ) -> Result<(), DataFusionError> {
470        let delta_scan = node
471            .as_any()
472            .downcast_ref::<DeltaScan>()
473            .ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?;
474
475        let wire = DeltaScanWire::from(delta_scan);
476        serde_json::to_writer(buf, &wire)
477            .map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?;
478        Ok(())
479    }
480}
481
482/// Does serde on DeltaTables
483#[derive(Debug)]
484pub struct DeltaLogicalCodec {}
485
486impl LogicalExtensionCodec for DeltaLogicalCodec {
487    fn try_decode(
488        &self,
489        _buf: &[u8],
490        _inputs: &[LogicalPlan],
491        _ctx: &TaskContext,
492    ) -> Result<Extension, DataFusionError> {
493        todo!("DeltaLogicalCodec")
494    }
495
496    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<(), DataFusionError> {
497        todo!("DeltaLogicalCodec")
498    }
499
500    fn try_decode_table_provider(
501        &self,
502        buf: &[u8],
503        _table_ref: &TableReference,
504        _schema: SchemaRef,
505        _ctx: &TaskContext,
506    ) -> Result<Arc<dyn TableProvider>, DataFusionError> {
507        let provider: DeltaScanNext = serde_json::from_slice(buf)
508            .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))?;
509        Ok(Arc::new(provider))
510    }
511
512    fn try_encode_table_provider(
513        &self,
514        _table_ref: &TableReference,
515        node: Arc<dyn TableProvider>,
516        buf: &mut Vec<u8>,
517    ) -> Result<(), DataFusionError> {
518        let scan = node
519            .as_ref()
520            .as_any()
521            .downcast_ref::<DeltaScanNext>()
522            .ok_or_else(|| {
523                DataFusionError::Internal("Can't encode non-delta tables".to_string())
524            })?;
525        serde_json::to_writer(buf, scan)
526            .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
527    }
528}
529
530/// Responsible for creating deltatables
531#[derive(Debug)]
532pub struct DeltaTableFactory {}
533
534#[async_trait::async_trait]
535impl TableProviderFactory for DeltaTableFactory {
536    async fn create(
537        &self,
538        ctx: &dyn Session,
539        cmd: &CreateExternalTable,
540    ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
541        let table = if cmd.options.is_empty() {
542            let table_url = ensure_table_uri(&cmd.to_owned().location)?;
543            open_table(table_url).await?
544        } else {
545            let table_url = ensure_table_uri(&cmd.to_owned().location)?;
546            open_table_with_storage_options(table_url, cmd.to_owned().options).await?
547        };
548        let table_uri = table.log_store().root_url().clone();
549        let (session_state, _) = resolve_session_state(
550            Some(ctx),
551            SessionFallbackPolicy::DeriveFromTrait,
552            || create_session().state(),
553            SessionResolveContext {
554                operation: "DeltaTableFactory::create",
555                table_uri: Some(&table_uri),
556                cdc: false,
557            },
558        )?;
559
560        Ok(table
561            .table_provider()
562            .with_session(Arc::new(session_state))
563            .await?)
564    }
565}
566
567/// A wrapper for Deltafusion's Column to preserve case-sensitivity during string conversion
568pub struct DeltaColumn {
569    inner: Column,
570}
571
572impl From<&str> for DeltaColumn {
573    fn from(c: &str) -> Self {
574        DeltaColumn {
575            inner: Column::from_qualified_name_ignore_case(c),
576        }
577    }
578}
579
580/// Create a column, cloning the string
581impl From<&String> for DeltaColumn {
582    fn from(c: &String) -> Self {
583        DeltaColumn {
584            inner: Column::from_qualified_name_ignore_case(c),
585        }
586    }
587}
588
589/// Create a column, reusing the existing string
590impl From<String> for DeltaColumn {
591    fn from(c: String) -> Self {
592        DeltaColumn {
593            inner: Column::from_qualified_name_ignore_case(c),
594        }
595    }
596}
597
598impl From<DeltaColumn> for Column {
599    fn from(value: DeltaColumn) -> Self {
600        value.inner
601    }
602}
603
604/// Create a column, reusing the existing datafusion column
605impl From<Column> for DeltaColumn {
606    fn from(c: Column) -> Self {
607        DeltaColumn { inner: c }
608    }
609}
610
611#[cfg(test)]
612mod tests {
613    use crate::DeltaTable;
614    use crate::logstore::ObjectStoreRef;
615    use crate::logstore::default_logstore::DefaultLogStore;
616    use crate::operations::write::SchemaMode;
617    use crate::test_utils::open_fs_path;
618    use crate::writer::test_utils::get_delta_schema;
619    use arrow::array::StructArray;
620    use arrow::datatypes::{Field, Schema};
621    use arrow_array::cast::AsArray;
622    use bytes::Bytes;
623    use datafusion::assert_batches_sorted_eq;
624    use datafusion::config::TableParquetOptions;
625    use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
626    use datafusion::datasource::source::DataSourceExec;
627    use datafusion::logical_expr::lit;
628    use datafusion::physical_plan::empty::EmptyExec;
629    use datafusion::physical_plan::{ExecutionPlanVisitor, PhysicalExpr, visit_execution_plan};
630    use datafusion::prelude::{SessionConfig, col};
631    use datafusion_datasource::file::FileSource as _;
632    use datafusion_proto::physical_plan::AsExecutionPlan;
633    use datafusion_proto::protobuf;
634    use delta_kernel::path::{LogPathFileType, ParsedLogPath};
635    use delta_kernel::schema::ArrayType;
636    use futures::{StreamExt, TryStreamExt, stream::BoxStream};
637    use object_store::ObjectMeta;
638    use object_store::{
639        GetOptions, GetResult, ListResult, MultipartUpload, ObjectStore, PutMultipartOptions,
640        PutOptions, PutPayload, PutResult, path::Path,
641    };
642    use serde_json::json;
643    use std::fmt::{self, Debug, Display, Formatter};
644    use std::ops::Range;
645    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
646    use url::Url;
647
648    use super::*;
649    use crate::delta_datafusion::table_provider::next::{FileSelection, MissingFilePolicy};
650    // test deserialization of serialized partition values.
651    // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
652    #[test]
653    fn test_to_correct_scalar_value() {
654        let reference_pairs = &[
655            (
656                json!("2015"),
657                ArrowDataType::Int16,
658                ScalarValue::Int16(Some(2015)),
659            ),
660            (
661                json!("2015"),
662                ArrowDataType::Int32,
663                ScalarValue::Int32(Some(2015)),
664            ),
665            (
666                json!("2015"),
667                ArrowDataType::Int64,
668                ScalarValue::Int64(Some(2015)),
669            ),
670            (
671                json!("2015"),
672                ArrowDataType::Float32,
673                ScalarValue::Float32(Some(2015_f32)),
674            ),
675            (
676                json!("2015"),
677                ArrowDataType::Float64,
678                ScalarValue::Float64(Some(2015_f64)),
679            ),
680            (
681                json!(2015),
682                ArrowDataType::Float64,
683                ScalarValue::Float64(Some(2015_f64)),
684            ),
685            (
686                json!("2015-01-01"),
687                ArrowDataType::Date32,
688                ScalarValue::Date32(Some(16436)),
689            ),
690            // (
691            //     json!("2015-01-01"),
692            //     ArrowDataType::Date64,
693            //     ScalarValue::Date64(Some(16436)),
694            // ),
695            // TODO(roeap) there seem to be differences in how precisions are handled locally and in CI, need to investigate
696            // (
697            //     json!("2020-09-08 13:42:29"),
698            //     ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
699            //     ScalarValue::TimestampNanosecond(Some(1599565349000000000), None),
700            // ),
701            // (
702            //     json!("2020-09-08 13:42:29"),
703            //     ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
704            //     ScalarValue::TimestampMicrosecond(Some(1599565349000000), None),
705            // ),
706            // (
707            //     json!("2020-09-08 13:42:29"),
708            //     ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
709            //     ScalarValue::TimestampMillisecond(Some(1599565349000), None),
710            // ),
711            (
712                json!(true),
713                ArrowDataType::Boolean,
714                ScalarValue::Boolean(Some(true)),
715            ),
716        ];
717
718        for (raw, data_type, ref_scalar) in reference_pairs {
719            let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap();
720            assert_eq!(*ref_scalar, scalar)
721        }
722    }
723
724    #[test]
725    fn roundtrip_test_delta_exec_plan() {
726        let ctx = SessionContext::new();
727        let codec = DeltaPhysicalCodec {};
728
729        let schema = Arc::new(Schema::new(vec![
730            Field::new("a", ArrowDataType::Utf8, false),
731            Field::new("b", ArrowDataType::Int32, false),
732        ]));
733        let exec_plan = Arc::from(DeltaScan::new(
734            &Url::parse("s3://my_bucket/this/is/some/path").unwrap(),
735            DeltaScanConfig::default(),
736            Arc::from(EmptyExec::new(schema.clone())),
737            schema.clone(),
738        ));
739        let proto: protobuf::PhysicalPlanNode =
740            protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
741                .expect("to proto");
742
743        let task_ctx = ctx.task_ctx();
744        let result_exec_plan: Arc<dyn ExecutionPlan> = proto
745            .try_into_physical_plan(&task_ctx, &codec)
746            .expect("from proto");
747        assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
748    }
749
750    #[tokio::test]
751    async fn roundtrip_test_delta_logical_codec_preserves_file_selection() {
752        let log_store = crate::test_utils::TestTables::Simple
753            .table_builder()
754            .unwrap()
755            .build_storage()
756            .unwrap();
757        let snapshot = Arc::new(
758            crate::kernel::Snapshot::try_new(&log_store, Default::default(), None)
759                .await
760                .unwrap(),
761        );
762        let table_root = snapshot
763            .scan_builder()
764            .build()
765            .unwrap()
766            .table_root()
767            .clone();
768        let selected_file_ids: Vec<String> = snapshot
769            .file_views(log_store.as_ref(), None)
770            .take(1)
771            .map_ok(|view| table_root.join(view.path_raw()).unwrap().to_string())
772            .try_collect()
773            .await
774            .unwrap();
775        assert_eq!(selected_file_ids.len(), 1);
776
777        let provider = DeltaScanNext::builder()
778            .with_snapshot(snapshot)
779            .build()
780            .await
781            .unwrap()
782            .with_file_selection(
783                FileSelection::new(selected_file_ids.clone())
784                    .with_missing_file_policy(MissingFilePolicy::Ignore),
785            );
786
787        let codec = DeltaLogicalCodec {};
788        let table_ref = TableReference::bare("delta_table");
789        let mut encoded = Vec::new();
790        codec
791            .try_encode_table_provider(&table_ref, Arc::new(provider), &mut encoded)
792            .unwrap();
793
794        let ctx = SessionContext::new();
795        let decoded = codec
796            .try_decode_table_provider(
797                &encoded,
798                &table_ref,
799                Arc::new(ArrowSchema::empty()),
800                &ctx.task_ctx(),
801            )
802            .unwrap();
803        let decoded_provider = decoded
804            .as_ref()
805            .as_any()
806            .downcast_ref::<DeltaScanNext>()
807            .unwrap();
808
809        let serialized = serde_json::to_value(decoded_provider).unwrap();
810        let decoded_file_ids = serialized
811            .get("file_selection")
812            .and_then(|value| value.get("file_ids"))
813            .and_then(|value| value.as_array())
814            .unwrap()
815            .iter()
816            .map(|value| value.as_str().unwrap().to_string())
817            .collect::<Vec<_>>();
818        let decoded_policy = serialized
819            .get("file_selection")
820            .and_then(|value| value.get("missing_file_policy"))
821            .and_then(|value| value.as_str())
822            .unwrap();
823
824        assert_eq!(decoded_file_ids, selected_file_ids);
825        assert_eq!(decoded_policy, "Ignore");
826    }
827
828    #[tokio::test]
829    async fn delta_table_provider_with_config() {
830        let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
831        let provider = table
832            .table_provider()
833            .with_file_column("file_source")
834            .await
835            .unwrap();
836        let ctx = SessionContext::new();
837        ctx.register_table("test", provider).unwrap();
838
839        let df = ctx
840            .sql("select c3, c1, c2, right(file_source, 77) as file_source from test")
841            .await
842            .unwrap();
843        let actual = df.collect().await.unwrap();
844        let expected = vec![
845            "+----+----+----+-------------------------------------------------------------------------------+",
846            "| c3 | c1 | c2 | file_source                                                                   |",
847            "+----+----+----+-------------------------------------------------------------------------------+",
848            "| 4  | 6  | a  | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
849            "| 5  | 4  | c  | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
850            "| 6  | 5  | b  | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
851            "+----+----+----+-------------------------------------------------------------------------------+",
852        ];
853        assert_batches_sorted_eq!(&expected, &actual);
854    }
855
856    #[tokio::test]
857    async fn delta_scan_mixed_partition_order() {
858        // Tests issue (1787) where partition columns were incorrect when they
859        // have a different order in the metadata and table schema
860        let schema = Arc::new(ArrowSchema::new(vec![
861            Field::new("modified", ArrowDataType::Utf8, true),
862            Field::new("id", ArrowDataType::Utf8, true),
863            Field::new("value", ArrowDataType::Int32, true),
864        ]));
865
866        let table = DeltaTable::new_in_memory()
867            .create()
868            .with_columns(get_delta_schema().fields().cloned())
869            .with_partition_columns(["modified", "id"])
870            .await
871            .unwrap();
872        assert_eq!(table.version(), Some(0));
873
874        let batch = RecordBatch::try_new(
875            schema.clone(),
876            vec![
877                Arc::new(arrow::array::StringArray::from(vec![
878                    "2021-02-01",
879                    "2021-02-01",
880                    "2021-02-02",
881                    "2021-02-02",
882                ])),
883                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
884                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
885            ],
886        )
887        .unwrap();
888        // write some data
889        let table = table
890            .write(vec![batch.clone()])
891            .with_save_mode(crate::protocol::SaveMode::Append)
892            .await
893            .unwrap();
894
895        let provider = table.table_provider().await.unwrap();
896        let logical_schema = provider.schema();
897        let ctx = SessionContext::new();
898        ctx.runtime_env().register_object_store(
899            table.log_store().root_url(),
900            table.log_store().object_store(None),
901        );
902        ctx.register_table("test", provider).unwrap();
903
904        let expected_logical_order = vec!["id", "value", "modified"];
905        let actual_order: Vec<String> = logical_schema
906            .fields()
907            .iter()
908            .map(|f| f.name().to_owned())
909            .collect();
910
911        let df = ctx.sql("select * from test").await.unwrap();
912        let actual = df.collect().await.unwrap();
913        let expected = vec![
914            "+----+-------+------------+",
915            "| id | value | modified   |",
916            "+----+-------+------------+",
917            "| A  | 1     | 2021-02-01 |",
918            "| B  | 10    | 2021-02-01 |",
919            "| C  | 20    | 2021-02-02 |",
920            "| D  | 100   | 2021-02-02 |",
921            "+----+-------+------------+",
922        ];
923        assert_batches_sorted_eq!(&expected, &actual);
924        assert_eq!(expected_logical_order, actual_order);
925    }
926
927    #[tokio::test]
928    async fn delta_scan_case_sensitive() {
929        let schema = Arc::new(ArrowSchema::new(vec![
930            Field::new("moDified", ArrowDataType::Utf8, true),
931            Field::new("ID", ArrowDataType::Utf8, true),
932            Field::new("vaLue", ArrowDataType::Int32, true),
933        ]));
934
935        let batch = RecordBatch::try_new(
936            schema.clone(),
937            vec![
938                Arc::new(arrow::array::StringArray::from(vec![
939                    "2021-02-01",
940                    "2021-02-01",
941                    "2021-02-02",
942                    "2021-02-02",
943                ])),
944                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
945                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
946            ],
947        )
948        .unwrap();
949        // write some data
950        let table = DeltaTable::new_in_memory()
951            .write(vec![batch.clone()])
952            .with_save_mode(crate::protocol::SaveMode::Append)
953            .await
954            .unwrap();
955
956        let config = DeltaScanConfigBuilder::new()
957            .build(table.snapshot().unwrap().snapshot())
958            .unwrap();
959        let log = table.log_store();
960
961        let provider =
962            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
963                .unwrap();
964        let ctx: SessionContext = DeltaSessionContext::default().into();
965        ctx.register_table("test", Arc::new(provider)).unwrap();
966
967        let df = ctx
968            .sql("select ID, moDified, vaLue from test")
969            .await
970            .unwrap();
971        let actual = df.collect().await.unwrap();
972        let expected = vec![
973            "+----+------------+-------+",
974            "| ID | moDified   | vaLue |",
975            "+----+------------+-------+",
976            "| A  | 2021-02-01 | 1     |",
977            "| B  | 2021-02-01 | 10    |",
978            "| C  | 2021-02-02 | 20    |",
979            "| D  | 2021-02-02 | 100   |",
980            "+----+------------+-------+",
981        ];
982        assert_batches_sorted_eq!(&expected, &actual);
983
984        /* TODO: Datafusion doesn't have any options to prevent case-sensitivity with the col func */
985        /*
986        let df = ctx
987            .table("test")
988            .await
989            .unwrap()
990            .select(vec![col("ID"), col("moDified"), col("vaLue")])
991            .unwrap();
992        let actual = df.collect().await.unwrap();
993        assert_batches_sorted_eq!(&expected, &actual);
994        */
995    }
996
997    #[tokio::test]
998    async fn delta_scan_supports_missing_columns() {
999        let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1000            "col_1",
1001            ArrowDataType::Utf8,
1002            true,
1003        )]));
1004
1005        let batch1 = RecordBatch::try_new(
1006            schema1.clone(),
1007            vec![Arc::new(arrow::array::StringArray::from(vec![
1008                Some("A"),
1009                Some("B"),
1010            ]))],
1011        )
1012        .unwrap();
1013
1014        let schema2 = Arc::new(ArrowSchema::new(vec![
1015            Field::new("col_1", ArrowDataType::Utf8, true),
1016            Field::new("col_2", ArrowDataType::Utf8, true),
1017        ]));
1018
1019        let batch2 = RecordBatch::try_new(
1020            schema2.clone(),
1021            vec![
1022                Arc::new(arrow::array::StringArray::from(vec![
1023                    Some("E"),
1024                    Some("F"),
1025                    Some("G"),
1026                ])),
1027                Arc::new(arrow::array::StringArray::from(vec![
1028                    Some("E2"),
1029                    Some("F2"),
1030                    Some("G2"),
1031                ])),
1032            ],
1033        )
1034        .unwrap();
1035
1036        let table = DeltaTable::new_in_memory()
1037            .write(vec![batch2])
1038            .with_save_mode(crate::protocol::SaveMode::Append)
1039            .await
1040            .unwrap();
1041
1042        let table = table
1043            .write(vec![batch1])
1044            .with_schema_mode(SchemaMode::Merge)
1045            .with_save_mode(crate::protocol::SaveMode::Append)
1046            .await
1047            .unwrap();
1048
1049        let config = DeltaScanConfigBuilder::new()
1050            .build(table.snapshot().unwrap().snapshot())
1051            .unwrap();
1052        let log = table.log_store();
1053
1054        let provider =
1055            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1056                .unwrap();
1057        let ctx: SessionContext = DeltaSessionContext::default().into();
1058        ctx.register_table("test", Arc::new(provider)).unwrap();
1059
1060        let df = ctx.sql("select col_1, col_2 from test").await.unwrap();
1061        let actual = df.collect().await.unwrap();
1062        let expected = vec![
1063            "+-------+-------+",
1064            "| col_1 | col_2 |",
1065            "+-------+-------+",
1066            "| A     |       |",
1067            "| B     |       |",
1068            "| E     | E2    |",
1069            "| F     | F2    |",
1070            "| G     | G2    |",
1071            "+-------+-------+",
1072        ];
1073        assert_batches_sorted_eq!(&expected, &actual);
1074    }
1075
1076    #[tokio::test]
1077    async fn delta_scan_supports_pushdown() {
1078        let schema = Arc::new(ArrowSchema::new(vec![
1079            Field::new("col_1", ArrowDataType::Utf8, false),
1080            Field::new("col_2", ArrowDataType::Utf8, false),
1081        ]));
1082
1083        let batch = RecordBatch::try_new(
1084            schema.clone(),
1085            vec![
1086                Arc::new(arrow::array::StringArray::from(vec![
1087                    Some("A"),
1088                    Some("B"),
1089                    Some("C"),
1090                ])),
1091                Arc::new(arrow::array::StringArray::from(vec![
1092                    Some("A2"),
1093                    Some("B2"),
1094                    Some("C2"),
1095                ])),
1096            ],
1097        )
1098        .unwrap();
1099
1100        let table = DeltaTable::new_in_memory()
1101            .write(vec![batch])
1102            .with_save_mode(crate::protocol::SaveMode::Append)
1103            .await
1104            .unwrap();
1105
1106        let config = DeltaScanConfigBuilder::new()
1107            .build(table.snapshot().unwrap().snapshot())
1108            .unwrap();
1109        let log = table.log_store();
1110
1111        let provider =
1112            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1113                .unwrap();
1114
1115        let mut cfg = SessionConfig::default();
1116        cfg.options_mut().execution.parquet.pushdown_filters = true;
1117        let ctx = SessionContext::new_with_config(cfg);
1118        ctx.register_table("test", Arc::new(provider)).unwrap();
1119
1120        let df = ctx
1121            .sql("select col_1, col_2 from test WHERE col_1 = 'A'")
1122            .await
1123            .unwrap();
1124        let actual = df.collect().await.unwrap();
1125        let expected = vec![
1126            "+-------+-------+",
1127            "| col_1 | col_2 |",
1128            "+-------+-------+",
1129            "| A     | A2    |",
1130            "+-------+-------+",
1131        ];
1132        assert_batches_sorted_eq!(&expected, &actual);
1133    }
1134
1135    #[tokio::test]
1136    async fn delta_scan_supports_nested_missing_columns() {
1137        let column1_schema1: arrow::datatypes::Fields =
1138            vec![Field::new("col_1a", ArrowDataType::Utf8, true)].into();
1139        let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1140            "col_1",
1141            ArrowDataType::Struct(column1_schema1.clone()),
1142            true,
1143        )]));
1144
1145        let batch1 = RecordBatch::try_new(
1146            schema1.clone(),
1147            vec![Arc::new(StructArray::new(
1148                column1_schema1,
1149                vec![Arc::new(arrow::array::StringArray::from(vec![
1150                    Some("A"),
1151                    Some("B"),
1152                ]))],
1153                None,
1154            ))],
1155        )
1156        .unwrap();
1157
1158        let column1_schema2: arrow_schema::Fields = vec![
1159            Field::new("col_1a", ArrowDataType::Utf8, true),
1160            Field::new("col_1b", ArrowDataType::Utf8, true),
1161        ]
1162        .into();
1163        let schema2 = Arc::new(ArrowSchema::new(vec![Field::new(
1164            "col_1",
1165            ArrowDataType::Struct(column1_schema2.clone()),
1166            true,
1167        )]));
1168
1169        let batch2 = RecordBatch::try_new(
1170            schema2.clone(),
1171            vec![Arc::new(StructArray::new(
1172                column1_schema2,
1173                vec![
1174                    Arc::new(arrow::array::StringArray::from(vec![
1175                        Some("E"),
1176                        Some("F"),
1177                        Some("G"),
1178                    ])),
1179                    Arc::new(arrow::array::StringArray::from(vec![
1180                        Some("E2"),
1181                        Some("F2"),
1182                        Some("G2"),
1183                    ])),
1184                ],
1185                None,
1186            ))],
1187        )
1188        .unwrap();
1189
1190        let table = DeltaTable::new_in_memory()
1191            .write(vec![batch1])
1192            .with_save_mode(crate::protocol::SaveMode::Append)
1193            .await
1194            .unwrap();
1195
1196        let table = table
1197            .write(vec![batch2])
1198            .with_schema_mode(SchemaMode::Merge)
1199            .with_save_mode(crate::protocol::SaveMode::Append)
1200            .await
1201            .unwrap();
1202
1203        let config = DeltaScanConfigBuilder::new()
1204            .build(table.snapshot().unwrap().snapshot())
1205            .unwrap();
1206        let log = table.log_store();
1207
1208        let provider =
1209            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1210                .unwrap();
1211        let ctx: SessionContext = DeltaSessionContext::default().into();
1212        ctx.register_table("test", Arc::new(provider)).unwrap();
1213
1214        let df = ctx
1215            .sql("select col_1.col_1a, col_1.col_1b from test")
1216            .await
1217            .unwrap();
1218        let actual = df.collect().await.unwrap();
1219        let expected = vec![
1220            "+--------------------+--------------------+",
1221            "| test.col_1[col_1a] | test.col_1[col_1b] |",
1222            "+--------------------+--------------------+",
1223            "| A                  |                    |",
1224            "| B                  |                    |",
1225            "| E                  | E2                 |",
1226            "| F                  | F2                 |",
1227            "| G                  | G2                 |",
1228            "+--------------------+--------------------+",
1229        ];
1230        assert_batches_sorted_eq!(&expected, &actual);
1231    }
1232
1233    #[tokio::test]
1234    async fn test_multiple_predicate_pushdown() {
1235        let schema = Arc::new(ArrowSchema::new(vec![
1236            Field::new("moDified", ArrowDataType::Utf8, true),
1237            Field::new("id", ArrowDataType::Utf8, true),
1238            Field::new("vaLue", ArrowDataType::Int32, true),
1239        ]));
1240
1241        let batch = RecordBatch::try_new(
1242            schema.clone(),
1243            vec![
1244                Arc::new(arrow::array::StringArray::from(vec![
1245                    "2021-02-01",
1246                    "2021-02-01",
1247                    "2021-02-02",
1248                    "2021-02-02",
1249                ])),
1250                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1251                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1252            ],
1253        )
1254        .unwrap();
1255        // write some data
1256        let table = DeltaTable::new_in_memory()
1257            .write(vec![batch.clone()])
1258            .with_save_mode(crate::protocol::SaveMode::Append)
1259            .await
1260            .unwrap();
1261
1262        let datafusion = SessionContext::new();
1263        table
1264            .update_datafusion_session(&datafusion.state())
1265            .unwrap();
1266
1267        datafusion
1268            .register_table("snapshot", table.table_provider().await.unwrap())
1269            .unwrap();
1270
1271        let df = datafusion
1272            .sql("select * from snapshot where id > 10000 and id < 20000")
1273            .await
1274            .unwrap();
1275
1276        df.collect().await.unwrap();
1277    }
1278
1279    #[tokio::test]
1280    async fn test_delta_scan_builder_no_scan_config() {
1281        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1282        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1283        let table = DeltaTable::new_in_memory()
1284            .write(vec![batch])
1285            .with_save_mode(crate::protocol::SaveMode::Append)
1286            .await
1287            .unwrap();
1288
1289        let ctx = SessionContext::new();
1290        let state = ctx.state();
1291        let scan = DeltaScanBuilder::new(
1292            table.snapshot().unwrap().snapshot(),
1293            table.log_store(),
1294            &state,
1295        )
1296        .with_filter(Some(col("a").eq(lit("s"))))
1297        .build()
1298        .await
1299        .unwrap();
1300
1301        let mut visitor = ParquetVisitor::default();
1302        visit_execution_plan(&scan, &mut visitor).unwrap();
1303
1304        assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
1305    }
1306
1307    #[tokio::test]
1308    async fn test_delta_scan_builder_scan_config_disable_pushdown() {
1309        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1310        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1311        let table = DeltaTable::new_in_memory()
1312            .write(vec![batch])
1313            .with_save_mode(crate::protocol::SaveMode::Append)
1314            .await
1315            .unwrap();
1316
1317        let snapshot = table.snapshot().unwrap();
1318        let ctx = SessionContext::new();
1319        let state = ctx.state();
1320        let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1321            .with_filter(Some(col("a").eq(lit("s"))))
1322            .with_scan_config(
1323                DeltaScanConfigBuilder::new()
1324                    .with_parquet_pushdown(false)
1325                    .build(snapshot.snapshot())
1326                    .unwrap(),
1327            )
1328            .build()
1329            .await
1330            .unwrap();
1331
1332        let mut visitor = ParquetVisitor::default();
1333        visit_execution_plan(&scan, &mut visitor).unwrap();
1334
1335        assert!(visitor.predicate.is_none());
1336    }
1337
1338    #[tokio::test]
1339    async fn test_delta_scan_applies_parquet_options() {
1340        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1341        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1342        let table = DeltaTable::new_in_memory()
1343            .write(vec![batch])
1344            .with_save_mode(crate::protocol::SaveMode::Append)
1345            .await
1346            .unwrap();
1347
1348        let snapshot = table.snapshot().unwrap();
1349
1350        let mut config = SessionConfig::default();
1351        config.options_mut().execution.parquet.pushdown_filters = true;
1352        let ctx = SessionContext::new_with_config(config);
1353        let state = ctx.state();
1354
1355        let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1356            .build()
1357            .await
1358            .unwrap();
1359
1360        let mut visitor = ParquetVisitor::default();
1361        visit_execution_plan(&scan, &mut visitor).unwrap();
1362
1363        assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
1364    }
1365
1366    /// Extracts fields from the parquet scan
1367    #[derive(Default)]
1368    struct ParquetVisitor {
1369        predicate: Option<Arc<dyn PhysicalExpr>>,
1370        options: Option<TableParquetOptions>,
1371    }
1372
1373    impl ExecutionPlanVisitor for ParquetVisitor {
1374        type Error = DataFusionError;
1375
1376        fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
1377            let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
1378                return Ok(true);
1379            };
1380
1381            let Some(scan_config) = datasource_exec
1382                .data_source()
1383                .as_any()
1384                .downcast_ref::<FileScanConfig>()
1385            else {
1386                return Ok(true);
1387            };
1388
1389            if let Some(parquet_source) = scan_config
1390                .file_source
1391                .as_any()
1392                .downcast_ref::<ParquetSource>()
1393            {
1394                self.options = Some(parquet_source.table_parquet_options().clone());
1395                self.predicate = parquet_source.filter();
1396            }
1397
1398            Ok(true)
1399        }
1400    }
1401
1402    // Run a query that filters out all files and sorts.
1403    // Verify that it returns an empty set of rows without panicking.
1404    //
1405    // Historically, we had a bug that caused us to emit a query plan with 0 partitions, which
1406    // datafusion rejected.
1407    #[tokio::test]
1408    async fn passes_sanity_checker_when_all_files_filtered() {
1409        let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
1410        let ctx = create_session().into_inner();
1411        ctx.register_table("test", table.table_provider().await.unwrap())
1412            .unwrap();
1413
1414        let df = ctx
1415            .sql("select * from test where c3 = 100 ORDER BY c1 ASC")
1416            .await
1417            .unwrap();
1418        let actual = df.collect().await.unwrap();
1419
1420        assert_eq!(actual.len(), 0);
1421    }
1422
1423    #[tokio::test]
1424    async fn test_delta_scan_uses_parquet_column_pruning() {
1425        let small: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["a"]));
1426        let large: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec![
1427            "b".repeat(1024).as_str(),
1428        ]));
1429        let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap();
1430        let table = DeltaTable::new_in_memory()
1431            .write(vec![batch])
1432            .with_save_mode(crate::protocol::SaveMode::Append)
1433            .await
1434            .unwrap();
1435
1436        let config = DeltaScanConfigBuilder::new()
1437            .build(table.snapshot().unwrap().snapshot())
1438            .unwrap();
1439
1440        let (object_store, mut operations) =
1441            RecordingObjectStore::new(table.log_store().object_store(None));
1442        // this uses an in memory store pointing at root...
1443        let both_store = Arc::new(object_store);
1444        let log_store = DefaultLogStore::new(
1445            both_store.clone(),
1446            both_store,
1447            table.log_store().config().clone(),
1448        );
1449        let provider = DeltaTableProvider::try_new(
1450            table.snapshot().unwrap().snapshot().clone(),
1451            Arc::new(log_store),
1452            config,
1453        )
1454        .unwrap();
1455        let ctx = SessionContext::new();
1456        ctx.register_table("test", Arc::new(provider)).unwrap();
1457        let state = ctx.state();
1458
1459        let df = ctx.sql("select small from test").await.unwrap();
1460        let plan = df.create_physical_plan().await.unwrap();
1461
1462        let mut stream = plan.execute(0, state.task_ctx()).unwrap();
1463        let Some(Ok(batch)) = stream.next().await else {
1464            panic!()
1465        };
1466        assert!(stream.next().await.is_none());
1467        assert_eq!(1, batch.num_columns());
1468        assert_eq!(1, batch.num_rows());
1469        let small = batch.column_by_name("small").unwrap().as_string::<i32>();
1470        assert_eq!("a", small.iter().next().unwrap().unwrap());
1471
1472        let expected = vec![
1473            ObjectStoreOperation::Get(LocationType::Commit),
1474            ObjectStoreOperation::GetRange(LocationType::Data, 957..965),
1475            ObjectStoreOperation::GetRange(LocationType::Data, 326..957),
1476        ];
1477        let mut actual = Vec::new();
1478        operations.recv_many(&mut actual, 3).await;
1479        assert_eq!(expected, actual);
1480    }
1481
1482    #[tokio::test]
1483    async fn test_push_down_filter_panic_2602() -> DeltaResult<()> {
1484        use crate::kernel::schema::{DataType, PrimitiveType};
1485        let ctx = SessionContext::new();
1486        let table = DeltaTable::new_in_memory()
1487            .create()
1488            .with_column("id", DataType::Primitive(PrimitiveType::Long), true, None)
1489            .with_column(
1490                "name",
1491                DataType::Primitive(PrimitiveType::String),
1492                true,
1493                None,
1494            )
1495            .with_column("b", DataType::Primitive(PrimitiveType::Boolean), true, None)
1496            .with_column(
1497                "ts",
1498                DataType::Primitive(PrimitiveType::Timestamp),
1499                true,
1500                None,
1501            )
1502            .with_column("dt", DataType::Primitive(PrimitiveType::Date), true, None)
1503            .with_column(
1504                "zap",
1505                DataType::Array(Box::new(ArrayType::new(
1506                    DataType::Primitive(PrimitiveType::Boolean),
1507                    true,
1508                ))),
1509                true,
1510                None,
1511            )
1512            .await?;
1513        table.update_datafusion_session(&ctx.state()).unwrap();
1514
1515        ctx.register_table("snapshot", table.table_provider().await.unwrap())
1516            .unwrap();
1517
1518        let df = ctx
1519            .sql("select * from snapshot where id > 10000 and id < 20000")
1520            .await
1521            .unwrap();
1522
1523        let _ = df.collect().await?;
1524        Ok(())
1525    }
1526
1527    /// Records operations made by the inner object store on a channel obtained at construction
1528    struct RecordingObjectStore {
1529        inner: ObjectStoreRef,
1530        operations: UnboundedSender<ObjectStoreOperation>,
1531    }
1532
1533    impl RecordingObjectStore {
1534        /// Returns an object store and a channel recording all operations made by the inner object store
1535        fn new(inner: ObjectStoreRef) -> (Self, UnboundedReceiver<ObjectStoreOperation>) {
1536            let (operations, operations_receiver) = unbounded_channel();
1537            (Self { inner, operations }, operations_receiver)
1538        }
1539    }
1540
1541    impl Display for RecordingObjectStore {
1542        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1543            Display::fmt(&self.inner, f)
1544        }
1545    }
1546
1547    impl Debug for RecordingObjectStore {
1548        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1549            Debug::fmt(&self.inner, f)
1550        }
1551    }
1552
1553    #[derive(Debug, PartialEq)]
1554    enum ObjectStoreOperation {
1555        GetRanges(LocationType, Vec<Range<u64>>),
1556        GetRange(LocationType, Range<u64>),
1557        GetOpts(LocationType),
1558        Get(LocationType),
1559    }
1560
1561    #[derive(Debug, PartialEq)]
1562    enum LocationType {
1563        Data,
1564        Commit,
1565    }
1566
1567    impl From<&Path> for LocationType {
1568        fn from(value: &Path) -> Self {
1569            let dummy_url = Url::parse("dummy:///").unwrap();
1570            let parsed = ParsedLogPath::try_from(dummy_url.join(value.as_ref()).unwrap()).unwrap();
1571            if let Some(parsed) = parsed
1572                && matches!(parsed.file_type, LogPathFileType::Commit)
1573            {
1574                return LocationType::Commit;
1575            }
1576            if value.to_string().starts_with("part-") {
1577                LocationType::Data
1578            } else {
1579                panic!("Unknown location type: {value:?}")
1580            }
1581        }
1582    }
1583
1584    // Currently only read operations are recorded. Extend as necessary.
1585    #[async_trait::async_trait]
1586    impl ObjectStore for RecordingObjectStore {
1587        async fn put(
1588            &self,
1589            location: &Path,
1590            payload: PutPayload,
1591        ) -> object_store::Result<PutResult> {
1592            self.inner.put(location, payload).await
1593        }
1594
1595        async fn put_opts(
1596            &self,
1597            location: &Path,
1598            payload: PutPayload,
1599            opts: PutOptions,
1600        ) -> object_store::Result<PutResult> {
1601            self.inner.put_opts(location, payload, opts).await
1602        }
1603
1604        async fn put_multipart(
1605            &self,
1606            location: &Path,
1607        ) -> object_store::Result<Box<dyn MultipartUpload>> {
1608            self.inner.put_multipart(location).await
1609        }
1610
1611        async fn put_multipart_opts(
1612            &self,
1613            location: &Path,
1614            opts: PutMultipartOptions,
1615        ) -> object_store::Result<Box<dyn MultipartUpload>> {
1616            self.inner.put_multipart_opts(location, opts).await
1617        }
1618
1619        async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
1620            self.operations
1621                .send(ObjectStoreOperation::Get(location.into()))
1622                .unwrap();
1623            self.inner.get(location).await
1624        }
1625
1626        async fn get_opts(
1627            &self,
1628            location: &Path,
1629            options: GetOptions,
1630        ) -> object_store::Result<GetResult> {
1631            self.operations
1632                .send(ObjectStoreOperation::GetOpts(location.into()))
1633                .unwrap();
1634            self.inner.get_opts(location, options).await
1635        }
1636
1637        async fn get_range(
1638            &self,
1639            location: &Path,
1640            range: Range<u64>,
1641        ) -> object_store::Result<Bytes> {
1642            self.operations
1643                .send(ObjectStoreOperation::GetRange(
1644                    location.into(),
1645                    range.clone(),
1646                ))
1647                .unwrap();
1648            self.inner.get_range(location, range).await
1649        }
1650
1651        async fn get_ranges(
1652            &self,
1653            location: &Path,
1654            ranges: &[Range<u64>],
1655        ) -> object_store::Result<Vec<Bytes>> {
1656            self.operations
1657                .send(ObjectStoreOperation::GetRanges(
1658                    location.into(),
1659                    ranges.to_vec(),
1660                ))
1661                .unwrap();
1662            self.inner.get_ranges(location, ranges).await
1663        }
1664
1665        async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
1666            self.inner.head(location).await
1667        }
1668
1669        async fn delete(&self, location: &Path) -> object_store::Result<()> {
1670            self.inner.delete(location).await
1671        }
1672
1673        fn delete_stream<'a>(
1674            &'a self,
1675            locations: BoxStream<'a, object_store::Result<Path>>,
1676        ) -> BoxStream<'a, object_store::Result<Path>> {
1677            self.inner.delete_stream(locations)
1678        }
1679
1680        fn list(
1681            &self,
1682            prefix: Option<&Path>,
1683        ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1684            self.inner.list(prefix)
1685        }
1686
1687        fn list_with_offset(
1688            &self,
1689            prefix: Option<&Path>,
1690            offset: &Path,
1691        ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1692            self.inner.list_with_offset(prefix, offset)
1693        }
1694
1695        async fn list_with_delimiter(
1696            &self,
1697            prefix: Option<&Path>,
1698        ) -> object_store::Result<ListResult> {
1699            self.inner.list_with_delimiter(prefix).await
1700        }
1701
1702        async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1703            self.inner.copy(from, to).await
1704        }
1705
1706        async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1707            self.inner.rename(from, to).await
1708        }
1709
1710        async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1711            self.inner.copy_if_not_exists(from, to).await
1712        }
1713
1714        async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1715            self.inner.rename_if_not_exists(from, to).await
1716        }
1717    }
1718}