1use std::fmt::Debug;
27use std::sync::Arc;
28
29use arrow::array::types::UInt16Type;
30use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
31use arrow_cast::{CastOptions, cast_with_options};
32use arrow_schema::{
33 DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef,
34 TimeUnit,
35};
36use datafusion::catalog::{Session, TableProviderFactory};
37use datafusion::common::scalar::ScalarValue;
38use datafusion::common::{
39 Column, DFSchema, DataFusionError, Result as DataFusionResult, TableReference, ToDFSchema,
40};
41use datafusion::datasource::TableProvider;
42use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
43use datafusion::execution::TaskContext;
44use datafusion::execution::context::SessionContext;
45use datafusion::logical_expr::logical_plan::CreateExternalTable;
46use datafusion::logical_expr::utils::conjunction;
47use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
48use datafusion::physical_optimizer::pruning::PruningPredicate;
49use datafusion::physical_plan::ExecutionPlan;
50use datafusion_proto::logical_plan::LogicalExtensionCodec;
51use datafusion_proto::physical_plan::PhysicalExtensionCodec;
52use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
53use either::Either;
54
55use crate::delta_datafusion::expr::parse_predicate_expression;
56use crate::delta_datafusion::table_provider::DeltaScanWire;
57use crate::ensure_table_uri;
58use crate::errors::{DeltaResult, DeltaTableError};
59use crate::kernel::{Add, EagerSnapshot, LogDataHandler, Snapshot};
60use crate::{open_table, open_table_with_storage_options};
61
62pub(crate) use self::session::DeltaSessionExt;
63pub use self::session::{
64 DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, DeltaSessionContext,
65 create_session, create_session_state_with_spill_config,
66};
67pub use self::table_provider::next::{DeletionVectorSelection, DeltaScan as DeltaScanNext};
68pub(crate) use self::utils::*;
69pub use cdf::scan::DeltaCdfTableProvider;
70pub(crate) use data_validation::{
71 DataValidationExec, constraints_to_exprs, generated_columns_to_exprs, validation_predicates,
72};
73pub(crate) use find_files::*;
74pub(crate) use table_provider::next::normalize_path_as_file_id;
75pub use table_provider::{
76 DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder,
77 next::DeltaScanExec,
78};
79pub(crate) use table_provider::{
80 next::FILE_ID_COLUMN_DEFAULT, resolve_file_column_name, update_datafusion_session,
81};
82
83pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
84
85#[doc(hidden)]
86pub mod bench_support;
87pub mod cdf;
88mod data_validation;
89pub mod engine;
90pub mod expr;
91mod file_id;
92mod find_files;
93pub mod logical;
94pub mod physical;
95pub mod planner;
96mod session;
97pub use session::SessionFallbackPolicy;
98pub(crate) use session::{SessionResolveContext, resolve_session_state};
99mod table_provider;
100pub(crate) mod utils;
101
102impl From<DeltaTableError> for DataFusionError {
103 fn from(err: DeltaTableError) -> Self {
104 match err {
105 DeltaTableError::Arrow { source } => DataFusionError::from(source),
106 DeltaTableError::Io { source } => DataFusionError::IoError(source),
107 DeltaTableError::ObjectStore { source } => DataFusionError::from(source),
108 DeltaTableError::Parquet { source } => DataFusionError::from(source),
109 _ => DataFusionError::External(Box::new(err)),
110 }
111 }
112}
113
114impl From<DataFusionError> for DeltaTableError {
115 fn from(err: DataFusionError) -> Self {
116 match err {
117 DataFusionError::ArrowError(source, _) => DeltaTableError::from(*source),
118 DataFusionError::IoError(source) => DeltaTableError::Io { source },
119 DataFusionError::ObjectStore(source) => DeltaTableError::from(*source),
120 DataFusionError::ParquetError(source) => DeltaTableError::from(*source),
121 _ => DeltaTableError::Generic(err.to_string()),
122 }
123 }
124}
125
126pub trait DataFusionMixins {
128 fn read_schema(&self) -> ArrowSchemaRef;
130
131 fn input_schema(&self) -> ArrowSchemaRef;
133
134 fn parse_predicate_expression(
136 &self,
137 expr: impl AsRef<str>,
138 session: &dyn Session,
139 ) -> DeltaResult<Expr>;
140}
141
142impl DataFusionMixins for Snapshot {
143 fn read_schema(&self) -> ArrowSchemaRef {
144 _arrow_schema(
145 self.arrow_schema(),
146 self.metadata().partition_columns(),
147 true,
148 )
149 }
150
151 fn input_schema(&self) -> ArrowSchemaRef {
152 _arrow_schema(
153 self.arrow_schema(),
154 self.metadata().partition_columns(),
155 false,
156 )
157 }
158
159 fn parse_predicate_expression(
160 &self,
161 expr: impl AsRef<str>,
162 session: &dyn Session,
163 ) -> DeltaResult<Expr> {
164 let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
165 parse_predicate_expression(&schema, expr, session)
166 }
167}
168
169impl DataFusionMixins for LogDataHandler<'_> {
170 fn read_schema(&self) -> ArrowSchemaRef {
171 _arrow_schema(
172 Arc::new(
173 self.table_configuration()
174 .logical_schema()
175 .as_ref()
176 .try_into_arrow()
177 .unwrap(),
178 ),
179 self.table_configuration().metadata().partition_columns(),
180 true,
181 )
182 }
183
184 fn input_schema(&self) -> ArrowSchemaRef {
185 _arrow_schema(
186 Arc::new(
187 self.table_configuration()
188 .logical_schema()
189 .as_ref()
190 .try_into_arrow()
191 .unwrap(),
192 ),
193 self.table_configuration().metadata().partition_columns(),
194 false,
195 )
196 }
197
198 fn parse_predicate_expression(
199 &self,
200 expr: impl AsRef<str>,
201 session: &dyn Session,
202 ) -> DeltaResult<Expr> {
203 let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
204 parse_predicate_expression(&schema, expr, session)
205 }
206}
207
208impl DataFusionMixins for EagerSnapshot {
209 fn read_schema(&self) -> ArrowSchemaRef {
210 self.snapshot().read_schema()
211 }
212
213 fn input_schema(&self) -> ArrowSchemaRef {
214 self.snapshot().input_schema()
215 }
216
217 fn parse_predicate_expression(
218 &self,
219 expr: impl AsRef<str>,
220 session: &dyn Session,
221 ) -> DeltaResult<Expr> {
222 self.snapshot().parse_predicate_expression(expr, session)
223 }
224}
225
226fn _arrow_schema(
227 schema: SchemaRef,
228 partition_columns: &[String],
229 wrap_partitions: bool,
230) -> ArrowSchemaRef {
231 let fields = schema
232 .fields()
233 .into_iter()
234 .filter(|f| !partition_columns.contains(&f.name().to_string()))
235 .cloned()
236 .chain(
237 partition_columns.iter().map(|partition_col| {
240 let field = schema.field_with_name(partition_col).unwrap();
241 let corrected = if wrap_partitions {
242 match field.data_type() {
243 ArrowDataType::Utf8
246 | ArrowDataType::LargeUtf8
247 | ArrowDataType::Binary
248 | ArrowDataType::LargeBinary => {
249 wrap_partition_type_in_dict(field.data_type().clone())
250 }
251 _ => field.data_type().clone(),
252 }
253 } else {
254 field.data_type().clone()
255 };
256 Arc::new(field.clone().with_data_type(corrected))
257 }),
258 )
259 .collect::<Vec<_>>();
260 Arc::new(ArrowSchema::new(fields))
261}
262
263pub(crate) fn files_matching_predicate<'a>(
264 log_data: LogDataHandler<'a>,
265 filters: &[Expr],
266) -> DeltaResult<impl Iterator<Item = Add> + 'a> {
267 if let Some(Some(predicate)) =
268 (!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
269 {
270 let expr = SessionContext::new()
271 .create_physical_expr(predicate, &log_data.read_schema().to_dfschema()?)?;
272 let pruning_predicate = PruningPredicate::try_new(expr, log_data.read_schema())?;
273 let mask = pruning_predicate.prune(&log_data)?;
274
275 Ok(Either::Left(log_data.into_iter().zip(mask).filter_map(
276 |(file, keep_file)| {
277 if keep_file { Some(file.to_add()) } else { None }
278 },
279 )))
280 } else {
281 Ok(Either::Right(
282 log_data.into_iter().map(|file| file.to_add()),
283 ))
284 }
285}
286
287pub(crate) fn get_path_column<'a>(
288 batch: &'a RecordBatch,
289 path_column: &str,
290) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
291 let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
292 batch
293 .column_by_name(path_column)
294 .ok_or_else(err)?
295 .as_any()
296 .downcast_ref::<DictionaryArray<UInt16Type>>()
297 .ok_or_else(err)?
298 .downcast_dict::<StringArray>()
299 .ok_or_else(err)
300}
301
302pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
303 match t {
304 ArrowDataType::Null => Ok(ScalarValue::Null),
305 ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
306 ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
307 ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
308 ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
309 ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
310 ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
311 ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
312 ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
313 ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
314 ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
315 ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
316 ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
317 ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
318 ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
319 ArrowDataType::FixedSizeBinary(size) => {
320 Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
321 }
322 ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
323 ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
324 ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
325 ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
326 None,
327 precision.to_owned(),
328 scale.to_owned(),
329 )),
330 ArrowDataType::Timestamp(unit, tz) => {
331 let tz = tz.to_owned();
332 Ok(match unit {
333 TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
334 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
335 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
336 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
337 })
338 }
339 ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
340 k.clone(),
341 Box::new(get_null_of_arrow_type(v)?),
342 )),
343 ArrowDataType::Float16
345 | ArrowDataType::Decimal32(_, _)
346 | ArrowDataType::Decimal64(_, _)
347 | ArrowDataType::Decimal256(_, _)
348 | ArrowDataType::Union(_, _)
349 | ArrowDataType::LargeList(_)
350 | ArrowDataType::Struct(_)
351 | ArrowDataType::List(_)
352 | ArrowDataType::FixedSizeList(_, _)
353 | ArrowDataType::Time32(_)
354 | ArrowDataType::Time64(_)
355 | ArrowDataType::Duration(_)
356 | ArrowDataType::Interval(_)
357 | ArrowDataType::RunEndEncoded(_, _)
358 | ArrowDataType::BinaryView
359 | ArrowDataType::Utf8View
360 | ArrowDataType::LargeListView(_)
361 | ArrowDataType::ListView(_)
362 | ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
363 "Unsupported data type for Delta Lake {t}"
364 ))),
365 }
366}
367
368fn parse_date(
369 stat_val: &serde_json::Value,
370 field_dt: &ArrowDataType,
371) -> DataFusionResult<ScalarValue> {
372 let string = match stat_val {
373 serde_json::Value::String(s) => s.to_owned(),
374 _ => stat_val.to_string(),
375 };
376
377 let time_micro = ScalarValue::try_from_string(string, &ArrowDataType::Date32)?;
378 let cast_arr = cast_with_options(
379 &time_micro.to_array()?,
380 field_dt,
381 &CastOptions {
382 safe: false,
383 ..Default::default()
384 },
385 )?;
386 ScalarValue::try_from_array(&cast_arr, 0)
387}
388
389fn parse_timestamp(
390 stat_val: &serde_json::Value,
391 field_dt: &ArrowDataType,
392) -> DataFusionResult<ScalarValue> {
393 let string = match stat_val {
394 serde_json::Value::String(s) => s.to_owned(),
395 _ => stat_val.to_string(),
396 };
397
398 let time_micro = ScalarValue::try_from_string(
399 string,
400 &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
401 )?;
402 let cast_arr = cast_with_options(
403 &time_micro.to_array()?,
404 field_dt,
405 &CastOptions {
406 safe: false,
407 ..Default::default()
408 },
409 )?;
410 ScalarValue::try_from_array(&cast_arr, 0)
411}
412
413pub(crate) fn to_correct_scalar_value(
414 stat_val: &serde_json::Value,
415 field_dt: &ArrowDataType,
416) -> DataFusionResult<Option<ScalarValue>> {
417 match stat_val {
418 serde_json::Value::Array(_) => Ok(None),
419 serde_json::Value::Object(_) => Ok(None),
420 serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)),
421 serde_json::Value::String(string_val) => match field_dt {
422 ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
423 ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
424 _ => Ok(Some(ScalarValue::try_from_string(
425 string_val.to_owned(),
426 field_dt,
427 )?)),
428 },
429 other => match field_dt {
430 ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
431 ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
432 _ => Ok(Some(ScalarValue::try_from_string(
433 other.to_string(),
434 field_dt,
435 )?)),
436 },
437 }
438}
439
440#[derive(Debug)]
442pub struct DeltaPhysicalCodec {}
443
444impl PhysicalExtensionCodec for DeltaPhysicalCodec {
445 fn try_decode(
446 &self,
447 buf: &[u8],
448 inputs: &[Arc<dyn ExecutionPlan>],
449 _registry: &TaskContext,
450 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
451 let wire: DeltaScanWire = serde_json::from_reader(buf)
452 .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
453 let delta_scan = DeltaScan::new(
454 &wire.table_url,
455 wire.config,
456 (*inputs)[0].clone(),
457 wire.logical_schema,
458 );
459 Ok(Arc::new(delta_scan))
460 }
461
462 fn try_encode(
463 &self,
464 node: Arc<dyn ExecutionPlan>,
465 buf: &mut Vec<u8>,
466 ) -> Result<(), DataFusionError> {
467 let delta_scan = node
468 .as_any()
469 .downcast_ref::<DeltaScan>()
470 .ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?;
471
472 let wire = DeltaScanWire::from(delta_scan);
473 serde_json::to_writer(buf, &wire)
474 .map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?;
475 Ok(())
476 }
477}
478
479#[derive(Debug)]
481pub struct DeltaLogicalCodec {}
482
483impl LogicalExtensionCodec for DeltaLogicalCodec {
484 fn try_decode(
485 &self,
486 _buf: &[u8],
487 _inputs: &[LogicalPlan],
488 _ctx: &TaskContext,
489 ) -> Result<Extension, DataFusionError> {
490 todo!("DeltaLogicalCodec")
491 }
492
493 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<(), DataFusionError> {
494 todo!("DeltaLogicalCodec")
495 }
496
497 fn try_decode_table_provider(
498 &self,
499 buf: &[u8],
500 _table_ref: &TableReference,
501 _schema: SchemaRef,
502 _ctx: &TaskContext,
503 ) -> Result<Arc<dyn TableProvider>, DataFusionError> {
504 let provider: DeltaScanNext = serde_json::from_slice(buf)
505 .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))?;
506 Ok(Arc::new(provider))
507 }
508
509 fn try_encode_table_provider(
510 &self,
511 _table_ref: &TableReference,
512 node: Arc<dyn TableProvider>,
513 buf: &mut Vec<u8>,
514 ) -> Result<(), DataFusionError> {
515 let scan = node
516 .as_ref()
517 .as_any()
518 .downcast_ref::<DeltaScanNext>()
519 .ok_or_else(|| {
520 DataFusionError::Internal("Can't encode non-delta tables".to_string())
521 })?;
522 serde_json::to_writer(buf, scan)
523 .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
524 }
525}
526
527#[derive(Debug)]
529pub struct DeltaTableFactory {}
530
531#[async_trait::async_trait]
532impl TableProviderFactory for DeltaTableFactory {
533 async fn create(
534 &self,
535 ctx: &dyn Session,
536 cmd: &CreateExternalTable,
537 ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
538 let table = if cmd.options.is_empty() {
539 let table_url = ensure_table_uri(&cmd.to_owned().location)?;
540 open_table(table_url).await?
541 } else {
542 let table_url = ensure_table_uri(&cmd.to_owned().location)?;
543 open_table_with_storage_options(table_url, cmd.to_owned().options).await?
544 };
545 let table_uri = table.log_store().root_url().clone();
546 let (session_state, _) = resolve_session_state(
547 Some(ctx),
548 SessionFallbackPolicy::DeriveFromTrait,
549 || create_session().state(),
550 SessionResolveContext {
551 operation: "DeltaTableFactory::create",
552 table_uri: Some(&table_uri),
553 cdc: false,
554 },
555 )?;
556
557 Ok(table
558 .table_provider()
559 .with_session(Arc::new(session_state))
560 .await?)
561 }
562}
563
564pub struct DeltaColumn {
566 inner: Column,
567}
568
569impl From<&str> for DeltaColumn {
570 fn from(c: &str) -> Self {
571 DeltaColumn {
572 inner: Column::from_qualified_name_ignore_case(c),
573 }
574 }
575}
576
577impl From<&String> for DeltaColumn {
579 fn from(c: &String) -> Self {
580 DeltaColumn {
581 inner: Column::from_qualified_name_ignore_case(c),
582 }
583 }
584}
585
586impl From<String> for DeltaColumn {
588 fn from(c: String) -> Self {
589 DeltaColumn {
590 inner: Column::from_qualified_name_ignore_case(c),
591 }
592 }
593}
594
595impl From<DeltaColumn> for Column {
596 fn from(value: DeltaColumn) -> Self {
597 value.inner
598 }
599}
600
601impl From<Column> for DeltaColumn {
603 fn from(c: Column) -> Self {
604 DeltaColumn { inner: c }
605 }
606}
607
608#[cfg(test)]
609mod tests {
610 use crate::DeltaTable;
611 use crate::operations::write::SchemaMode;
612 use crate::test_utils::{
613 object_store::{
614 RecordedObjectStoreOperation as ObjectStoreOperation, RecordedPathKind as PathKind,
615 drain_recorded_object_store_operations as drain_recorded_ops, recording_log_store,
616 },
617 open_fs_path,
618 };
619 use crate::writer::test_utils::get_delta_schema;
620 use arrow::array::StructArray;
621 use arrow::datatypes::{Field, Schema};
622 use arrow_array::cast::AsArray;
623 use datafusion::assert_batches_sorted_eq;
624 use datafusion::config::TableParquetOptions;
625 use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
626 use datafusion::datasource::source::DataSourceExec;
627 use datafusion::logical_expr::lit;
628 use datafusion::physical_plan::empty::EmptyExec;
629 use datafusion::physical_plan::{ExecutionPlanVisitor, PhysicalExpr, visit_execution_plan};
630 use datafusion::prelude::{SessionConfig, col};
631 use datafusion_datasource::file::FileSource as _;
632 use datafusion_proto::physical_plan::AsExecutionPlan;
633 use datafusion_proto::protobuf;
634 use delta_kernel::schema::ArrayType;
635 use futures::{StreamExt, TryStreamExt};
636 use object_store::ObjectStoreExt as _;
637 use serde_json::json;
638 use std::ops::Range;
639 use url::Url;
640
641 use super::*;
642 use crate::delta_datafusion::table_provider::next::{FileSelection, MissingFilePolicy};
643 #[test]
646 fn test_to_correct_scalar_value() {
647 let reference_pairs = &[
648 (
649 json!("2015"),
650 ArrowDataType::Int16,
651 ScalarValue::Int16(Some(2015)),
652 ),
653 (
654 json!("2015"),
655 ArrowDataType::Int32,
656 ScalarValue::Int32(Some(2015)),
657 ),
658 (
659 json!("2015"),
660 ArrowDataType::Int64,
661 ScalarValue::Int64(Some(2015)),
662 ),
663 (
664 json!("2015"),
665 ArrowDataType::Float32,
666 ScalarValue::Float32(Some(2015_f32)),
667 ),
668 (
669 json!("2015"),
670 ArrowDataType::Float64,
671 ScalarValue::Float64(Some(2015_f64)),
672 ),
673 (
674 json!(2015),
675 ArrowDataType::Float64,
676 ScalarValue::Float64(Some(2015_f64)),
677 ),
678 (
679 json!("2015-01-01"),
680 ArrowDataType::Date32,
681 ScalarValue::Date32(Some(16436)),
682 ),
683 (
705 json!(true),
706 ArrowDataType::Boolean,
707 ScalarValue::Boolean(Some(true)),
708 ),
709 ];
710
711 for (raw, data_type, ref_scalar) in reference_pairs {
712 let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap();
713 assert_eq!(*ref_scalar, scalar)
714 }
715 }
716
717 #[test]
718 fn roundtrip_test_delta_exec_plan() {
719 let ctx = SessionContext::new();
720 let codec = DeltaPhysicalCodec {};
721
722 let schema = Arc::new(Schema::new(vec![
723 Field::new("a", ArrowDataType::Utf8, false),
724 Field::new("b", ArrowDataType::Int32, false),
725 ]));
726 let exec_plan = Arc::from(DeltaScan::new(
727 &Url::parse("s3://my_bucket/this/is/some/path").unwrap(),
728 DeltaScanConfig::default(),
729 Arc::from(EmptyExec::new(schema.clone())),
730 schema.clone(),
731 ));
732 let proto: protobuf::PhysicalPlanNode =
733 protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
734 .expect("to proto");
735
736 let task_ctx = ctx.task_ctx();
737 let result_exec_plan: Arc<dyn ExecutionPlan> = proto
738 .try_into_physical_plan(&task_ctx, &codec)
739 .expect("from proto");
740 assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
741 }
742
743 #[tokio::test]
744 async fn roundtrip_test_delta_logical_codec_preserves_file_selection() {
745 let log_store = crate::test_utils::TestTables::Simple
746 .table_builder()
747 .unwrap()
748 .build_storage()
749 .unwrap();
750 let snapshot = Arc::new(
751 crate::kernel::Snapshot::try_new(&log_store, Default::default(), None)
752 .await
753 .unwrap(),
754 );
755 let table_root = snapshot
756 .scan_builder()
757 .build()
758 .unwrap()
759 .table_root()
760 .clone();
761 let selected_file_ids: Vec<String> = snapshot
762 .file_views(log_store.as_ref(), None)
763 .take(1)
764 .map_ok(|view| table_root.join(view.path_raw()).unwrap().to_string())
765 .try_collect()
766 .await
767 .unwrap();
768 assert_eq!(selected_file_ids.len(), 1);
769
770 let provider = DeltaScanNext::builder()
771 .with_snapshot(snapshot)
772 .build()
773 .await
774 .unwrap()
775 .with_file_selection(
776 FileSelection::new(selected_file_ids.clone())
777 .with_missing_file_policy(MissingFilePolicy::Ignore),
778 );
779
780 let codec = DeltaLogicalCodec {};
781 let table_ref = TableReference::bare("delta_table");
782 let mut encoded = Vec::new();
783 codec
784 .try_encode_table_provider(&table_ref, Arc::new(provider), &mut encoded)
785 .unwrap();
786
787 let ctx = SessionContext::new();
788 let decoded = codec
789 .try_decode_table_provider(
790 &encoded,
791 &table_ref,
792 Arc::new(ArrowSchema::empty()),
793 &ctx.task_ctx(),
794 )
795 .unwrap();
796 let decoded_provider = decoded
797 .as_ref()
798 .as_any()
799 .downcast_ref::<DeltaScanNext>()
800 .unwrap();
801
802 let serialized = serde_json::to_value(decoded_provider).unwrap();
803 let decoded_file_ids = serialized
804 .get("file_selection")
805 .and_then(|value| value.get("file_ids"))
806 .and_then(|value| value.as_array())
807 .unwrap()
808 .iter()
809 .map(|value| value.as_str().unwrap().to_string())
810 .collect::<Vec<_>>();
811 let decoded_policy = serialized
812 .get("file_selection")
813 .and_then(|value| value.get("missing_file_policy"))
814 .and_then(|value| value.as_str())
815 .unwrap();
816
817 assert_eq!(decoded_file_ids, selected_file_ids);
818 assert_eq!(decoded_policy, "Ignore");
819 }
820
821 #[tokio::test]
822 async fn delta_table_provider_with_config() {
823 let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
824 let provider = table
825 .table_provider()
826 .with_file_column("file_source")
827 .await
828 .unwrap();
829 let ctx = SessionContext::new();
830 ctx.register_table("test", provider).unwrap();
831
832 let df = ctx
833 .sql("select c3, c1, c2, right(file_source, 77) as file_source from test")
834 .await
835 .unwrap();
836 let actual = df.collect().await.unwrap();
837 let expected = vec![
838 "+----+----+----+-------------------------------------------------------------------------------+",
839 "| c3 | c1 | c2 | file_source |",
840 "+----+----+----+-------------------------------------------------------------------------------+",
841 "| 4 | 6 | a | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
842 "| 5 | 4 | c | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
843 "| 6 | 5 | b | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
844 "+----+----+----+-------------------------------------------------------------------------------+",
845 ];
846 assert_batches_sorted_eq!(&expected, &actual);
847 }
848
849 #[tokio::test]
850 async fn delta_scan_mixed_partition_order() {
851 let schema = Arc::new(ArrowSchema::new(vec![
854 Field::new("modified", ArrowDataType::Utf8, true),
855 Field::new("id", ArrowDataType::Utf8, true),
856 Field::new("value", ArrowDataType::Int32, true),
857 ]));
858
859 let table = DeltaTable::new_in_memory()
860 .create()
861 .with_columns(get_delta_schema().fields().cloned())
862 .with_partition_columns(["modified", "id"])
863 .await
864 .unwrap();
865 assert_eq!(table.version(), Some(0));
866
867 let batch = RecordBatch::try_new(
868 schema.clone(),
869 vec![
870 Arc::new(arrow::array::StringArray::from(vec![
871 "2021-02-01",
872 "2021-02-01",
873 "2021-02-02",
874 "2021-02-02",
875 ])),
876 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
877 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
878 ],
879 )
880 .unwrap();
881 let table = table
883 .write(vec![batch.clone()])
884 .with_save_mode(crate::protocol::SaveMode::Append)
885 .await
886 .unwrap();
887
888 let provider = table.table_provider().await.unwrap();
889 let logical_schema = provider.schema();
890 let ctx = SessionContext::new();
891 ctx.runtime_env().register_object_store(
892 table.log_store().root_url(),
893 table.log_store().object_store(None),
894 );
895 ctx.register_table("test", provider).unwrap();
896
897 let expected_logical_order = vec!["id", "value", "modified"];
898 let actual_order: Vec<String> = logical_schema
899 .fields()
900 .iter()
901 .map(|f| f.name().to_owned())
902 .collect();
903
904 let df = ctx.sql("select * from test").await.unwrap();
905 let actual = df.collect().await.unwrap();
906 let expected = vec![
907 "+----+-------+------------+",
908 "| id | value | modified |",
909 "+----+-------+------------+",
910 "| A | 1 | 2021-02-01 |",
911 "| B | 10 | 2021-02-01 |",
912 "| C | 20 | 2021-02-02 |",
913 "| D | 100 | 2021-02-02 |",
914 "+----+-------+------------+",
915 ];
916 assert_batches_sorted_eq!(&expected, &actual);
917 assert_eq!(expected_logical_order, actual_order);
918 }
919
920 #[tokio::test]
921 async fn delta_scan_case_sensitive() {
922 let schema = Arc::new(ArrowSchema::new(vec![
923 Field::new("moDified", ArrowDataType::Utf8, true),
924 Field::new("ID", ArrowDataType::Utf8, true),
925 Field::new("vaLue", ArrowDataType::Int32, true),
926 ]));
927
928 let batch = RecordBatch::try_new(
929 schema.clone(),
930 vec![
931 Arc::new(arrow::array::StringArray::from(vec![
932 "2021-02-01",
933 "2021-02-01",
934 "2021-02-02",
935 "2021-02-02",
936 ])),
937 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
938 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
939 ],
940 )
941 .unwrap();
942 let table = DeltaTable::new_in_memory()
944 .write(vec![batch.clone()])
945 .with_save_mode(crate::protocol::SaveMode::Append)
946 .await
947 .unwrap();
948
949 let config = DeltaScanConfigBuilder::new()
950 .build(table.snapshot().unwrap().snapshot())
951 .unwrap();
952 let log = table.log_store();
953
954 let provider =
955 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
956 .unwrap();
957 let ctx: SessionContext = DeltaSessionContext::default().into();
958 ctx.register_table("test", Arc::new(provider)).unwrap();
959
960 let df = ctx
961 .sql("select ID, moDified, vaLue from test")
962 .await
963 .unwrap();
964 let actual = df.collect().await.unwrap();
965 let expected = vec![
966 "+----+------------+-------+",
967 "| ID | moDified | vaLue |",
968 "+----+------------+-------+",
969 "| A | 2021-02-01 | 1 |",
970 "| B | 2021-02-01 | 10 |",
971 "| C | 2021-02-02 | 20 |",
972 "| D | 2021-02-02 | 100 |",
973 "+----+------------+-------+",
974 ];
975 assert_batches_sorted_eq!(&expected, &actual);
976
977 }
989
990 #[tokio::test]
991 async fn delta_scan_supports_missing_columns() {
992 let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
993 "col_1",
994 ArrowDataType::Utf8,
995 true,
996 )]));
997
998 let batch1 = RecordBatch::try_new(
999 schema1.clone(),
1000 vec![Arc::new(arrow::array::StringArray::from(vec![
1001 Some("A"),
1002 Some("B"),
1003 ]))],
1004 )
1005 .unwrap();
1006
1007 let schema2 = Arc::new(ArrowSchema::new(vec![
1008 Field::new("col_1", ArrowDataType::Utf8, true),
1009 Field::new("col_2", ArrowDataType::Utf8, true),
1010 ]));
1011
1012 let batch2 = RecordBatch::try_new(
1013 schema2.clone(),
1014 vec![
1015 Arc::new(arrow::array::StringArray::from(vec![
1016 Some("E"),
1017 Some("F"),
1018 Some("G"),
1019 ])),
1020 Arc::new(arrow::array::StringArray::from(vec![
1021 Some("E2"),
1022 Some("F2"),
1023 Some("G2"),
1024 ])),
1025 ],
1026 )
1027 .unwrap();
1028
1029 let table = DeltaTable::new_in_memory()
1030 .write(vec![batch2])
1031 .with_save_mode(crate::protocol::SaveMode::Append)
1032 .await
1033 .unwrap();
1034
1035 let table = table
1036 .write(vec![batch1])
1037 .with_schema_mode(SchemaMode::Merge)
1038 .with_save_mode(crate::protocol::SaveMode::Append)
1039 .await
1040 .unwrap();
1041
1042 let config = DeltaScanConfigBuilder::new()
1043 .build(table.snapshot().unwrap().snapshot())
1044 .unwrap();
1045 let log = table.log_store();
1046
1047 let provider =
1048 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1049 .unwrap();
1050 let ctx: SessionContext = DeltaSessionContext::default().into();
1051 ctx.register_table("test", Arc::new(provider)).unwrap();
1052
1053 let df = ctx.sql("select col_1, col_2 from test").await.unwrap();
1054 let actual = df.collect().await.unwrap();
1055 let expected = vec![
1056 "+-------+-------+",
1057 "| col_1 | col_2 |",
1058 "+-------+-------+",
1059 "| A | |",
1060 "| B | |",
1061 "| E | E2 |",
1062 "| F | F2 |",
1063 "| G | G2 |",
1064 "+-------+-------+",
1065 ];
1066 assert_batches_sorted_eq!(&expected, &actual);
1067 }
1068
1069 #[tokio::test]
1070 async fn delta_scan_supports_pushdown() {
1071 let schema = Arc::new(ArrowSchema::new(vec![
1072 Field::new("col_1", ArrowDataType::Utf8, false),
1073 Field::new("col_2", ArrowDataType::Utf8, false),
1074 ]));
1075
1076 let batch = RecordBatch::try_new(
1077 schema.clone(),
1078 vec![
1079 Arc::new(arrow::array::StringArray::from(vec![
1080 Some("A"),
1081 Some("B"),
1082 Some("C"),
1083 ])),
1084 Arc::new(arrow::array::StringArray::from(vec![
1085 Some("A2"),
1086 Some("B2"),
1087 Some("C2"),
1088 ])),
1089 ],
1090 )
1091 .unwrap();
1092
1093 let table = DeltaTable::new_in_memory()
1094 .write(vec![batch])
1095 .with_save_mode(crate::protocol::SaveMode::Append)
1096 .await
1097 .unwrap();
1098
1099 let config = DeltaScanConfigBuilder::new()
1100 .build(table.snapshot().unwrap().snapshot())
1101 .unwrap();
1102 let log = table.log_store();
1103
1104 let provider =
1105 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1106 .unwrap();
1107
1108 let mut cfg = SessionConfig::default();
1109 cfg.options_mut().execution.parquet.pushdown_filters = true;
1110 let ctx = SessionContext::new_with_config(cfg);
1111 ctx.register_table("test", Arc::new(provider)).unwrap();
1112
1113 let df = ctx
1114 .sql("select col_1, col_2 from test WHERE col_1 = 'A'")
1115 .await
1116 .unwrap();
1117 let actual = df.collect().await.unwrap();
1118 let expected = vec![
1119 "+-------+-------+",
1120 "| col_1 | col_2 |",
1121 "+-------+-------+",
1122 "| A | A2 |",
1123 "+-------+-------+",
1124 ];
1125 assert_batches_sorted_eq!(&expected, &actual);
1126 }
1127
1128 #[tokio::test]
1129 async fn delta_scan_supports_nested_missing_columns() {
1130 let column1_schema1: arrow::datatypes::Fields =
1131 vec![Field::new("col_1a", ArrowDataType::Utf8, true)].into();
1132 let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1133 "col_1",
1134 ArrowDataType::Struct(column1_schema1.clone()),
1135 true,
1136 )]));
1137
1138 let batch1 = RecordBatch::try_new(
1139 schema1.clone(),
1140 vec![Arc::new(StructArray::new(
1141 column1_schema1,
1142 vec![Arc::new(arrow::array::StringArray::from(vec![
1143 Some("A"),
1144 Some("B"),
1145 ]))],
1146 None,
1147 ))],
1148 )
1149 .unwrap();
1150
1151 let column1_schema2: arrow_schema::Fields = vec![
1152 Field::new("col_1a", ArrowDataType::Utf8, true),
1153 Field::new("col_1b", ArrowDataType::Utf8, true),
1154 ]
1155 .into();
1156 let schema2 = Arc::new(ArrowSchema::new(vec![Field::new(
1157 "col_1",
1158 ArrowDataType::Struct(column1_schema2.clone()),
1159 true,
1160 )]));
1161
1162 let batch2 = RecordBatch::try_new(
1163 schema2.clone(),
1164 vec![Arc::new(StructArray::new(
1165 column1_schema2,
1166 vec![
1167 Arc::new(arrow::array::StringArray::from(vec![
1168 Some("E"),
1169 Some("F"),
1170 Some("G"),
1171 ])),
1172 Arc::new(arrow::array::StringArray::from(vec![
1173 Some("E2"),
1174 Some("F2"),
1175 Some("G2"),
1176 ])),
1177 ],
1178 None,
1179 ))],
1180 )
1181 .unwrap();
1182
1183 let table = DeltaTable::new_in_memory()
1184 .write(vec![batch1])
1185 .with_save_mode(crate::protocol::SaveMode::Append)
1186 .await
1187 .unwrap();
1188
1189 let table = table
1190 .write(vec![batch2])
1191 .with_schema_mode(SchemaMode::Merge)
1192 .with_save_mode(crate::protocol::SaveMode::Append)
1193 .await
1194 .unwrap();
1195
1196 let config = DeltaScanConfigBuilder::new()
1197 .build(table.snapshot().unwrap().snapshot())
1198 .unwrap();
1199 let log = table.log_store();
1200
1201 let provider =
1202 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1203 .unwrap();
1204 let ctx: SessionContext = DeltaSessionContext::default().into();
1205 ctx.register_table("test", Arc::new(provider)).unwrap();
1206
1207 let df = ctx
1208 .sql("select col_1.col_1a, col_1.col_1b from test")
1209 .await
1210 .unwrap();
1211 let actual = df.collect().await.unwrap();
1212 let expected = vec![
1213 "+--------------------+--------------------+",
1214 "| test.col_1[col_1a] | test.col_1[col_1b] |",
1215 "+--------------------+--------------------+",
1216 "| A | |",
1217 "| B | |",
1218 "| E | E2 |",
1219 "| F | F2 |",
1220 "| G | G2 |",
1221 "+--------------------+--------------------+",
1222 ];
1223 assert_batches_sorted_eq!(&expected, &actual);
1224 }
1225
1226 #[tokio::test]
1227 async fn test_multiple_predicate_pushdown() {
1228 let schema = Arc::new(ArrowSchema::new(vec![
1229 Field::new("moDified", ArrowDataType::Utf8, true),
1230 Field::new("id", ArrowDataType::Utf8, true),
1231 Field::new("vaLue", ArrowDataType::Int32, true),
1232 ]));
1233
1234 let batch = RecordBatch::try_new(
1235 schema.clone(),
1236 vec![
1237 Arc::new(arrow::array::StringArray::from(vec![
1238 "2021-02-01",
1239 "2021-02-01",
1240 "2021-02-02",
1241 "2021-02-02",
1242 ])),
1243 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1244 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1245 ],
1246 )
1247 .unwrap();
1248 let table = DeltaTable::new_in_memory()
1250 .write(vec![batch.clone()])
1251 .with_save_mode(crate::protocol::SaveMode::Append)
1252 .await
1253 .unwrap();
1254
1255 let datafusion = SessionContext::new();
1256 table
1257 .update_datafusion_session(&datafusion.state())
1258 .unwrap();
1259
1260 datafusion
1261 .register_table("snapshot", table.table_provider().await.unwrap())
1262 .unwrap();
1263
1264 let df = datafusion
1265 .sql("select * from snapshot where id > 10000 and id < 20000")
1266 .await
1267 .unwrap();
1268
1269 df.collect().await.unwrap();
1270 }
1271
1272 #[tokio::test]
1273 async fn test_delta_scan_builder_no_scan_config() {
1274 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1275 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1276 let table = DeltaTable::new_in_memory()
1277 .write(vec![batch])
1278 .with_save_mode(crate::protocol::SaveMode::Append)
1279 .await
1280 .unwrap();
1281
1282 let ctx = SessionContext::new();
1283 let state = ctx.state();
1284 let scan = table_provider::DeltaScanBuilder::new(
1285 table.snapshot().unwrap().snapshot(),
1286 table.log_store(),
1287 &state,
1288 )
1289 .with_filter(Some(col("a").eq(lit("s"))))
1290 .build()
1291 .await
1292 .unwrap();
1293
1294 let mut visitor = ParquetVisitor::default();
1295 visit_execution_plan(&scan, &mut visitor).unwrap();
1296
1297 assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
1298 }
1299
1300 #[tokio::test]
1301 async fn test_delta_scan_builder_scan_config_disable_pushdown() {
1302 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1303 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1304 let table = DeltaTable::new_in_memory()
1305 .write(vec![batch])
1306 .with_save_mode(crate::protocol::SaveMode::Append)
1307 .await
1308 .unwrap();
1309
1310 let snapshot = table.snapshot().unwrap();
1311 let ctx = SessionContext::new();
1312 let state = ctx.state();
1313 let scan =
1314 table_provider::DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1315 .with_filter(Some(col("a").eq(lit("s"))))
1316 .with_scan_config(
1317 DeltaScanConfigBuilder::new()
1318 .with_parquet_pushdown(false)
1319 .build(snapshot.snapshot())
1320 .unwrap(),
1321 )
1322 .build()
1323 .await
1324 .unwrap();
1325
1326 let mut visitor = ParquetVisitor::default();
1327 visit_execution_plan(&scan, &mut visitor).unwrap();
1328
1329 assert!(visitor.predicate.is_none());
1330 }
1331
1332 #[tokio::test]
1333 async fn test_delta_scan_applies_parquet_options() {
1334 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1335 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1336 let table = DeltaTable::new_in_memory()
1337 .write(vec![batch])
1338 .with_save_mode(crate::protocol::SaveMode::Append)
1339 .await
1340 .unwrap();
1341
1342 let snapshot = table.snapshot().unwrap();
1343
1344 let mut config = SessionConfig::default();
1345 config.options_mut().execution.parquet.pushdown_filters = true;
1346 let ctx = SessionContext::new_with_config(config);
1347 let state = ctx.state();
1348
1349 let scan =
1350 table_provider::DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1351 .build()
1352 .await
1353 .unwrap();
1354
1355 let mut visitor = ParquetVisitor::default();
1356 visit_execution_plan(&scan, &mut visitor).unwrap();
1357
1358 assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
1359 }
1360
1361 #[derive(Default)]
1363 struct ParquetVisitor {
1364 predicate: Option<Arc<dyn PhysicalExpr>>,
1365 options: Option<TableParquetOptions>,
1366 }
1367
1368 impl ExecutionPlanVisitor for ParquetVisitor {
1369 type Error = DataFusionError;
1370
1371 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
1372 let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
1373 return Ok(true);
1374 };
1375
1376 let Some(scan_config) = datasource_exec
1377 .data_source()
1378 .as_any()
1379 .downcast_ref::<FileScanConfig>()
1380 else {
1381 return Ok(true);
1382 };
1383
1384 if let Some(parquet_source) = scan_config
1385 .file_source
1386 .as_any()
1387 .downcast_ref::<ParquetSource>()
1388 {
1389 self.options = Some(parquet_source.table_parquet_options().clone());
1390 self.predicate = parquet_source.filter();
1391 }
1392
1393 Ok(true)
1394 }
1395 }
1396
1397 #[tokio::test]
1403 async fn passes_sanity_checker_when_all_files_filtered() {
1404 let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
1405 let ctx = create_session().into_inner();
1406 ctx.register_table("test", table.table_provider().await.unwrap())
1407 .unwrap();
1408
1409 let df = ctx
1410 .sql("select * from test where c3 = 100 ORDER BY c1 ASC")
1411 .await
1412 .unwrap();
1413 let actual = df.collect().await.unwrap();
1414
1415 assert_eq!(actual.len(), 0);
1416 }
1417
1418 #[tokio::test]
1419 async fn test_delta_scan_uses_parquet_column_pruning() {
1420 let small: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["a"]));
1421 let large: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec![
1422 "b".repeat(1024).as_str(),
1423 ]));
1424 let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap();
1425 let table = DeltaTable::new_in_memory()
1426 .write(vec![batch])
1427 .with_save_mode(crate::protocol::SaveMode::Append)
1428 .await
1429 .unwrap();
1430
1431 let config = DeltaScanConfigBuilder::new()
1432 .build(table.snapshot().unwrap().snapshot())
1433 .unwrap();
1434
1435 let (log_store, mut operations) = recording_log_store(table.log_store());
1436 let provider = DeltaTableProvider::try_new(
1437 table.snapshot().unwrap().snapshot().clone(),
1438 log_store.clone(),
1439 config,
1440 )
1441 .unwrap();
1442 let ctx = SessionContext::new();
1443 ctx.register_table("test", Arc::new(provider)).unwrap();
1444 let state = ctx.state();
1445
1446 let df = ctx.sql("select small from test").await.unwrap();
1447 let plan = df.create_physical_plan().await.unwrap();
1448
1449 let mut stream = plan.execute(0, state.task_ctx()).unwrap();
1450 let Some(Ok(batch)) = stream.next().await else {
1451 panic!()
1452 };
1453 assert!(stream.next().await.is_none());
1454 assert_eq!(1, batch.num_columns());
1455 assert_eq!(1, batch.num_rows());
1456 let small = batch.column_by_name("small").unwrap().as_string::<i32>();
1457 assert_eq!("a", small.iter().next().unwrap().unwrap());
1458
1459 let files = table.get_files_by_partitions(&[]).await.unwrap();
1460 assert_eq!(1, files.len());
1461 let object_store = table.object_store();
1462 let file_meta = object_store.head(&files[0]).await.unwrap();
1463 let file_reader = parquet::arrow::async_reader::ParquetObjectReader::new(
1464 object_store,
1465 file_meta.location.clone(),
1466 )
1467 .with_file_size(file_meta.size);
1468 let parquet_metadata =
1469 parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder::new(file_reader)
1470 .await
1471 .unwrap()
1472 .metadata()
1473 .as_ref()
1474 .clone();
1475 let (small_start, small_len) = parquet_metadata.row_group(0).column(0).byte_range();
1476 let small_range = small_start..small_start + small_len;
1477 let (large_start, large_len) = parquet_metadata.row_group(0).column(1).byte_range();
1478 let large_range = large_start..large_start + large_len;
1479
1480 let actual = drain_recorded_ops(&mut operations).await;
1481
1482 let data_ranges = actual
1483 .iter()
1484 .flat_map(|operation| match operation {
1485 ObjectStoreOperation::GetRange(PathKind::Data, range) => vec![range.clone()],
1486 ObjectStoreOperation::GetRanges(PathKind::Data, ranges) => ranges.clone(),
1487 _ => Vec::new(),
1488 })
1489 .collect::<Vec<_>>();
1490
1491 let overlaps = |left: &Range<u64>, right: &Range<u64>| {
1492 left.start < right.end && right.start < left.end
1493 };
1494
1495 assert!(
1496 !data_ranges.is_empty(),
1497 "expected ranged parquet data reads, saw {actual:?}"
1498 );
1499 assert!(
1500 data_ranges
1501 .iter()
1502 .any(|range| overlaps(range, &small_range)),
1503 "expected selected column chunk {small_range:?} to be read, saw {actual:?}"
1504 );
1505 assert!(
1506 data_ranges
1507 .iter()
1508 .all(|range| !overlaps(range, &large_range)),
1509 "expected unselected column chunk {large_range:?} to be pruned, saw {actual:?}"
1510 );
1511 assert!(
1512 !actual.iter().any(|operation| matches!(
1513 operation,
1514 ObjectStoreOperation::Get(PathKind::Data)
1515 | ObjectStoreOperation::GetOpts(PathKind::Data)
1516 )),
1517 "expected no full data file reads, saw {actual:?}"
1518 );
1519 }
1520
1521 #[tokio::test]
1522 async fn test_push_down_filter_panic_2602() -> DeltaResult<()> {
1523 use crate::kernel::schema::{DataType, PrimitiveType};
1524 let ctx = SessionContext::new();
1525 let table = DeltaTable::new_in_memory()
1526 .create()
1527 .with_column("id", DataType::Primitive(PrimitiveType::Long), true, None)
1528 .with_column(
1529 "name",
1530 DataType::Primitive(PrimitiveType::String),
1531 true,
1532 None,
1533 )
1534 .with_column("b", DataType::Primitive(PrimitiveType::Boolean), true, None)
1535 .with_column(
1536 "ts",
1537 DataType::Primitive(PrimitiveType::Timestamp),
1538 true,
1539 None,
1540 )
1541 .with_column("dt", DataType::Primitive(PrimitiveType::Date), true, None)
1542 .with_column(
1543 "zap",
1544 DataType::Array(Box::new(ArrayType::new(
1545 DataType::Primitive(PrimitiveType::Boolean),
1546 true,
1547 ))),
1548 true,
1549 None,
1550 )
1551 .await?;
1552 table.update_datafusion_session(&ctx.state()).unwrap();
1553
1554 ctx.register_table("snapshot", table.table_provider().await.unwrap())
1555 .unwrap();
1556
1557 let df = ctx
1558 .sql("select * from snapshot where id > 10000 and id < 20000")
1559 .await
1560 .unwrap();
1561
1562 let _ = df.collect().await?;
1563 Ok(())
1564 }
1565}