1use std::fmt::Debug;
27use std::sync::Arc;
28
29use arrow::array::types::UInt16Type;
30use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
31use arrow_cast::{CastOptions, cast_with_options};
32use arrow_schema::{
33 DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef,
34 TimeUnit,
35};
36use datafusion::catalog::{Session, TableProviderFactory};
37use datafusion::common::scalar::ScalarValue;
38use datafusion::common::{
39 Column, DFSchema, DataFusionError, Result as DataFusionResult, TableReference, ToDFSchema,
40};
41use datafusion::datasource::TableProvider;
42use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
43use datafusion::execution::TaskContext;
44use datafusion::execution::context::SessionContext;
45use datafusion::logical_expr::logical_plan::CreateExternalTable;
46use datafusion::logical_expr::utils::conjunction;
47use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
48use datafusion::physical_optimizer::pruning::PruningPredicate;
49use datafusion::physical_plan::ExecutionPlan;
50use datafusion_proto::logical_plan::LogicalExtensionCodec;
51use datafusion_proto::physical_plan::PhysicalExtensionCodec;
52use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
53use either::Either;
54
55use crate::delta_datafusion::expr::parse_predicate_expression;
56use crate::delta_datafusion::table_provider::{DeltaScan, DeltaScanWire};
57use crate::ensure_table_uri;
58use crate::errors::{DeltaResult, DeltaTableError};
59use crate::kernel::{Add, EagerSnapshot, LogDataHandler, Snapshot};
60use crate::{open_table, open_table_with_storage_options};
61
62pub(crate) use self::session::DeltaSessionExt;
63pub use self::session::{
64 DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, DeltaSessionContext,
65 create_session, create_session_state_with_spill_config,
66};
67pub use self::table_provider::next::{DeletionVectorSelection, DeltaScan as DeltaScanNext};
68pub(crate) use self::utils::*;
69pub use cdf::scan::DeltaCdfTableProvider;
70pub(crate) use data_validation::{
71 DataValidationExec, constraints_to_exprs, generated_columns_to_exprs, validation_predicates,
72};
73pub(crate) use find_files::*;
74pub(crate) use table_provider::next::normalize_path_as_file_id;
75pub use table_provider::{
76 DeltaScanConfig, DeltaScanConfigBuilder, TableProviderBuilder, next::DeltaScanExec,
77};
78pub(crate) use table_provider::{
79 next::FILE_ID_COLUMN_DEFAULT, resolve_file_column_name, update_datafusion_session,
80};
81
82pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
83
84#[doc(hidden)]
85pub mod bench_support;
86pub mod cdf;
87mod data_validation;
88pub mod engine;
89pub mod expr;
90mod file_id;
91mod find_files;
92pub mod logical;
93pub mod physical;
94pub mod planner;
95mod session;
96pub use session::SessionFallbackPolicy;
97pub(crate) use session::{SessionResolveContext, resolve_session_state};
98mod table_provider;
99pub(crate) mod utils;
100
101impl From<DeltaTableError> for DataFusionError {
102 fn from(err: DeltaTableError) -> Self {
103 match err {
104 DeltaTableError::Arrow { source } => DataFusionError::from(source),
105 DeltaTableError::Io { source } => DataFusionError::IoError(source),
106 DeltaTableError::ObjectStore { source } => DataFusionError::from(source),
107 DeltaTableError::Parquet { source } => DataFusionError::from(source),
108 _ => DataFusionError::External(Box::new(err)),
109 }
110 }
111}
112
113impl From<DataFusionError> for DeltaTableError {
114 fn from(err: DataFusionError) -> Self {
115 match err {
116 DataFusionError::ArrowError(source, _) => DeltaTableError::from(*source),
117 DataFusionError::IoError(source) => DeltaTableError::Io { source },
118 DataFusionError::ObjectStore(source) => DeltaTableError::from(*source),
119 DataFusionError::ParquetError(source) => DeltaTableError::from(*source),
120 _ => DeltaTableError::Generic(err.to_string()),
121 }
122 }
123}
124
125pub trait DataFusionMixins {
127 fn read_schema(&self) -> ArrowSchemaRef;
129
130 fn input_schema(&self) -> ArrowSchemaRef;
132
133 fn parse_predicate_expression(
135 &self,
136 expr: impl AsRef<str>,
137 session: &dyn Session,
138 ) -> DeltaResult<Expr>;
139}
140
141impl DataFusionMixins for Snapshot {
142 fn read_schema(&self) -> ArrowSchemaRef {
143 _arrow_schema(
144 self.arrow_schema(),
145 self.metadata().partition_columns(),
146 true,
147 )
148 }
149
150 fn input_schema(&self) -> ArrowSchemaRef {
151 _arrow_schema(
152 self.arrow_schema(),
153 self.metadata().partition_columns(),
154 false,
155 )
156 }
157
158 fn parse_predicate_expression(
159 &self,
160 expr: impl AsRef<str>,
161 session: &dyn Session,
162 ) -> DeltaResult<Expr> {
163 let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
164 parse_predicate_expression(&schema, expr, session)
165 }
166}
167
168impl DataFusionMixins for LogDataHandler<'_> {
169 fn read_schema(&self) -> ArrowSchemaRef {
170 _arrow_schema(
171 Arc::new(
172 self.table_configuration()
173 .logical_schema()
174 .as_ref()
175 .try_into_arrow()
176 .unwrap(),
177 ),
178 self.table_configuration().metadata().partition_columns(),
179 true,
180 )
181 }
182
183 fn input_schema(&self) -> ArrowSchemaRef {
184 _arrow_schema(
185 Arc::new(
186 self.table_configuration()
187 .logical_schema()
188 .as_ref()
189 .try_into_arrow()
190 .unwrap(),
191 ),
192 self.table_configuration().metadata().partition_columns(),
193 false,
194 )
195 }
196
197 fn parse_predicate_expression(
198 &self,
199 expr: impl AsRef<str>,
200 session: &dyn Session,
201 ) -> DeltaResult<Expr> {
202 let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
203 parse_predicate_expression(&schema, expr, session)
204 }
205}
206
207impl DataFusionMixins for EagerSnapshot {
208 fn read_schema(&self) -> ArrowSchemaRef {
209 self.snapshot().read_schema()
210 }
211
212 fn input_schema(&self) -> ArrowSchemaRef {
213 self.snapshot().input_schema()
214 }
215
216 fn parse_predicate_expression(
217 &self,
218 expr: impl AsRef<str>,
219 session: &dyn Session,
220 ) -> DeltaResult<Expr> {
221 self.snapshot().parse_predicate_expression(expr, session)
222 }
223}
224
225fn _arrow_schema(
226 schema: SchemaRef,
227 partition_columns: &[String],
228 wrap_partitions: bool,
229) -> ArrowSchemaRef {
230 let fields = schema
231 .fields()
232 .into_iter()
233 .filter(|f| !partition_columns.contains(&f.name().to_string()))
234 .cloned()
235 .chain(
236 partition_columns.iter().map(|partition_col| {
239 let field = schema.field_with_name(partition_col).unwrap();
240 let corrected = if wrap_partitions {
241 match field.data_type() {
242 ArrowDataType::Utf8
245 | ArrowDataType::LargeUtf8
246 | ArrowDataType::Binary
247 | ArrowDataType::LargeBinary => {
248 wrap_partition_type_in_dict(field.data_type().clone())
249 }
250 _ => field.data_type().clone(),
251 }
252 } else {
253 field.data_type().clone()
254 };
255 Arc::new(field.clone().with_data_type(corrected))
256 }),
257 )
258 .collect::<Vec<_>>();
259 Arc::new(ArrowSchema::new(fields))
260}
261
262pub(crate) fn files_matching_predicate<'a>(
263 log_data: LogDataHandler<'a>,
264 filters: &[Expr],
265) -> DeltaResult<impl Iterator<Item = Add> + 'a> {
266 if let Some(Some(predicate)) =
267 (!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
268 {
269 let expr = SessionContext::new()
270 .create_physical_expr(predicate, &log_data.read_schema().to_dfschema()?)?;
271 let pruning_predicate = PruningPredicate::try_new(expr, log_data.read_schema())?;
272 let mask = pruning_predicate.prune(&log_data)?;
273
274 Ok(Either::Left(log_data.into_iter().zip(mask).filter_map(
275 |(file, keep_file)| {
276 if keep_file { Some(file.to_add()) } else { None }
277 },
278 )))
279 } else {
280 Ok(Either::Right(
281 log_data.into_iter().map(|file| file.to_add()),
282 ))
283 }
284}
285
286pub(crate) fn get_path_column<'a>(
287 batch: &'a RecordBatch,
288 path_column: &str,
289) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
290 let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
291 batch
292 .column_by_name(path_column)
293 .ok_or_else(err)?
294 .as_any()
295 .downcast_ref::<DictionaryArray<UInt16Type>>()
296 .ok_or_else(err)?
297 .downcast_dict::<StringArray>()
298 .ok_or_else(err)
299}
300
301pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
302 match t {
303 ArrowDataType::Null => Ok(ScalarValue::Null),
304 ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
305 ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
306 ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
307 ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
308 ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
309 ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
310 ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
311 ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
312 ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
313 ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
314 ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
315 ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
316 ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
317 ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
318 ArrowDataType::FixedSizeBinary(size) => {
319 Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
320 }
321 ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
322 ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
323 ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
324 ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
325 None,
326 precision.to_owned(),
327 scale.to_owned(),
328 )),
329 ArrowDataType::Timestamp(unit, tz) => {
330 let tz = tz.to_owned();
331 Ok(match unit {
332 TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
333 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
334 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
335 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
336 })
337 }
338 ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
339 k.clone(),
340 Box::new(get_null_of_arrow_type(v)?),
341 )),
342 ArrowDataType::Float16
344 | ArrowDataType::Decimal32(_, _)
345 | ArrowDataType::Decimal64(_, _)
346 | ArrowDataType::Decimal256(_, _)
347 | ArrowDataType::Union(_, _)
348 | ArrowDataType::LargeList(_)
349 | ArrowDataType::Struct(_)
350 | ArrowDataType::List(_)
351 | ArrowDataType::FixedSizeList(_, _)
352 | ArrowDataType::Time32(_)
353 | ArrowDataType::Time64(_)
354 | ArrowDataType::Duration(_)
355 | ArrowDataType::Interval(_)
356 | ArrowDataType::RunEndEncoded(_, _)
357 | ArrowDataType::BinaryView
358 | ArrowDataType::Utf8View
359 | ArrowDataType::LargeListView(_)
360 | ArrowDataType::ListView(_)
361 | ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
362 "Unsupported data type for Delta Lake {t}"
363 ))),
364 }
365}
366
367fn parse_date(
368 stat_val: &serde_json::Value,
369 field_dt: &ArrowDataType,
370) -> DataFusionResult<ScalarValue> {
371 let string = match stat_val {
372 serde_json::Value::String(s) => s.to_owned(),
373 _ => stat_val.to_string(),
374 };
375
376 let time_micro = ScalarValue::try_from_string(string, &ArrowDataType::Date32)?;
377 let cast_arr = cast_with_options(
378 &time_micro.to_array()?,
379 field_dt,
380 &CastOptions {
381 safe: false,
382 ..Default::default()
383 },
384 )?;
385 ScalarValue::try_from_array(&cast_arr, 0)
386}
387
388fn parse_timestamp(
389 stat_val: &serde_json::Value,
390 field_dt: &ArrowDataType,
391) -> DataFusionResult<ScalarValue> {
392 let string = match stat_val {
393 serde_json::Value::String(s) => s.to_owned(),
394 _ => stat_val.to_string(),
395 };
396
397 let time_micro = ScalarValue::try_from_string(
398 string,
399 &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
400 )?;
401 let cast_arr = cast_with_options(
402 &time_micro.to_array()?,
403 field_dt,
404 &CastOptions {
405 safe: false,
406 ..Default::default()
407 },
408 )?;
409 ScalarValue::try_from_array(&cast_arr, 0)
410}
411
412pub(crate) fn to_correct_scalar_value(
413 stat_val: &serde_json::Value,
414 field_dt: &ArrowDataType,
415) -> DataFusionResult<Option<ScalarValue>> {
416 match stat_val {
417 serde_json::Value::Array(_) => Ok(None),
418 serde_json::Value::Object(_) => Ok(None),
419 serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)),
420 serde_json::Value::String(string_val) => match field_dt {
421 ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
422 ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
423 _ => Ok(Some(ScalarValue::try_from_string(
424 string_val.to_owned(),
425 field_dt,
426 )?)),
427 },
428 other => match field_dt {
429 ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
430 ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
431 _ => Ok(Some(ScalarValue::try_from_string(
432 other.to_string(),
433 field_dt,
434 )?)),
435 },
436 }
437}
438
439#[deprecated(
442 note = "DeltaPhysicalCodec only supports the retired physical DeltaScan wrapper. \
443 Use DeltaLogicalCodec for table-provider serialization until a DeltaScanExec \
444 physical codec is available."
445)]
446#[derive(Debug)]
447pub struct DeltaPhysicalCodec {}
448
449#[allow(deprecated)]
450impl PhysicalExtensionCodec for DeltaPhysicalCodec {
451 fn try_decode(
452 &self,
453 buf: &[u8],
454 inputs: &[Arc<dyn ExecutionPlan>],
455 _registry: &TaskContext,
456 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
457 let wire: DeltaScanWire = serde_json::from_reader(buf)
458 .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
459 let delta_scan = wire.into_delta_scan((*inputs)[0].clone());
460 Ok(Arc::new(delta_scan))
461 }
462
463 fn try_encode(
464 &self,
465 node: Arc<dyn ExecutionPlan>,
466 buf: &mut Vec<u8>,
467 ) -> Result<(), DataFusionError> {
468 let delta_scan = node
469 .as_any()
470 .downcast_ref::<DeltaScan>()
471 .ok_or_else(|| DataFusionError::Internal("Not a legacy delta scan!".to_string()))?;
472
473 let wire = DeltaScanWire::from(delta_scan);
474 serde_json::to_writer(buf, &wire).map_err(|_| {
475 DataFusionError::Internal("Unable to encode legacy delta scan!".to_string())
476 })?;
477 Ok(())
478 }
479}
480
481#[derive(Debug)]
483pub struct DeltaLogicalCodec {}
484
485impl LogicalExtensionCodec for DeltaLogicalCodec {
486 fn try_decode(
487 &self,
488 _buf: &[u8],
489 _inputs: &[LogicalPlan],
490 _ctx: &TaskContext,
491 ) -> Result<Extension, DataFusionError> {
492 todo!("DeltaLogicalCodec")
493 }
494
495 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<(), DataFusionError> {
496 todo!("DeltaLogicalCodec")
497 }
498
499 fn try_decode_table_provider(
500 &self,
501 buf: &[u8],
502 _table_ref: &TableReference,
503 _schema: SchemaRef,
504 _ctx: &TaskContext,
505 ) -> Result<Arc<dyn TableProvider>, DataFusionError> {
506 let provider: DeltaScanNext = serde_json::from_slice(buf)
507 .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))?;
508 Ok(Arc::new(provider))
509 }
510
511 fn try_encode_table_provider(
512 &self,
513 _table_ref: &TableReference,
514 node: Arc<dyn TableProvider>,
515 buf: &mut Vec<u8>,
516 ) -> Result<(), DataFusionError> {
517 let scan = node
518 .as_ref()
519 .as_any()
520 .downcast_ref::<DeltaScanNext>()
521 .ok_or_else(|| {
522 DataFusionError::Internal("Can't encode non-delta tables".to_string())
523 })?;
524 serde_json::to_writer(buf, scan)
525 .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
526 }
527}
528
529#[derive(Debug)]
531pub struct DeltaTableFactory {}
532
533#[async_trait::async_trait]
534impl TableProviderFactory for DeltaTableFactory {
535 async fn create(
536 &self,
537 ctx: &dyn Session,
538 cmd: &CreateExternalTable,
539 ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
540 let table = if cmd.options.is_empty() {
541 let table_url = ensure_table_uri(&cmd.to_owned().location)?;
542 open_table(table_url).await?
543 } else {
544 let table_url = ensure_table_uri(&cmd.to_owned().location)?;
545 open_table_with_storage_options(table_url, cmd.to_owned().options).await?
546 };
547 let table_uri = table.log_store().root_url().clone();
548 let (session_state, _) = resolve_session_state(
549 Some(ctx),
550 SessionFallbackPolicy::DeriveFromTrait,
551 || create_session().state(),
552 SessionResolveContext {
553 operation: "DeltaTableFactory::create",
554 table_uri: Some(&table_uri),
555 cdc: false,
556 },
557 )?;
558
559 Ok(table
560 .table_provider()
561 .with_session(Arc::new(session_state))
562 .await?)
563 }
564}
565
566pub struct DeltaColumn {
568 inner: Column,
569}
570
571impl From<&str> for DeltaColumn {
572 fn from(c: &str) -> Self {
573 DeltaColumn {
574 inner: Column::from_qualified_name_ignore_case(c),
575 }
576 }
577}
578
579impl From<&String> for DeltaColumn {
581 fn from(c: &String) -> Self {
582 DeltaColumn {
583 inner: Column::from_qualified_name_ignore_case(c),
584 }
585 }
586}
587
588impl From<String> for DeltaColumn {
590 fn from(c: String) -> Self {
591 DeltaColumn {
592 inner: Column::from_qualified_name_ignore_case(c),
593 }
594 }
595}
596
597impl From<DeltaColumn> for Column {
598 fn from(value: DeltaColumn) -> Self {
599 value.inner
600 }
601}
602
603impl From<Column> for DeltaColumn {
605 fn from(c: Column) -> Self {
606 DeltaColumn { inner: c }
607 }
608}
609
610#[cfg(test)]
611mod tests {
612 use crate::DeltaTable;
613 use crate::operations::write::SchemaMode;
614 use crate::test_utils::{
615 object_store::{
616 RecordedObjectStoreOperation as ObjectStoreOperation, RecordedPathKind as PathKind,
617 drain_recorded_object_store_operations as drain_recorded_ops, recording_log_store,
618 },
619 open_fs_path,
620 };
621 use crate::writer::test_utils::get_delta_schema;
622 use arrow::array::StructArray;
623 use arrow::datatypes::{Field, Schema};
624 use datafusion::assert_batches_sorted_eq;
625 use datafusion::physical_plan::empty::EmptyExec;
626 use datafusion::prelude::SessionConfig;
627 use datafusion_proto::physical_plan::AsExecutionPlan;
628 use datafusion_proto::protobuf;
629 use delta_kernel::schema::ArrayType;
630 use futures::{StreamExt, TryStreamExt};
631 use object_store::ObjectStoreExt as _;
632 use serde_json::json;
633 use std::ops::Range;
634 use url::Url;
635
636 use super::*;
637 use crate::delta_datafusion::table_provider::next::{FileSelection, MissingFilePolicy};
638 #[test]
641 fn test_to_correct_scalar_value() {
642 let reference_pairs = &[
643 (
644 json!("2015"),
645 ArrowDataType::Int16,
646 ScalarValue::Int16(Some(2015)),
647 ),
648 (
649 json!("2015"),
650 ArrowDataType::Int32,
651 ScalarValue::Int32(Some(2015)),
652 ),
653 (
654 json!("2015"),
655 ArrowDataType::Int64,
656 ScalarValue::Int64(Some(2015)),
657 ),
658 (
659 json!("2015"),
660 ArrowDataType::Float32,
661 ScalarValue::Float32(Some(2015_f32)),
662 ),
663 (
664 json!("2015"),
665 ArrowDataType::Float64,
666 ScalarValue::Float64(Some(2015_f64)),
667 ),
668 (
669 json!(2015),
670 ArrowDataType::Float64,
671 ScalarValue::Float64(Some(2015_f64)),
672 ),
673 (
674 json!("2015-01-01"),
675 ArrowDataType::Date32,
676 ScalarValue::Date32(Some(16436)),
677 ),
678 (
700 json!(true),
701 ArrowDataType::Boolean,
702 ScalarValue::Boolean(Some(true)),
703 ),
704 ];
705
706 for (raw, data_type, ref_scalar) in reference_pairs {
707 let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap();
708 assert_eq!(*ref_scalar, scalar)
709 }
710 }
711
712 #[test]
713 #[allow(deprecated)]
714 fn roundtrip_test_delta_exec_plan() {
715 let ctx = SessionContext::new();
716 let codec = DeltaPhysicalCodec {};
717
718 let schema = Arc::new(Schema::new(vec![
719 Field::new("a", ArrowDataType::Utf8, false),
720 Field::new("b", ArrowDataType::Int32, false),
721 ]));
722 let exec_plan = Arc::from(DeltaScan::new(
723 &Url::parse("s3://my_bucket/this/is/some/path").unwrap(),
724 DeltaScanConfig::default(),
725 Arc::from(EmptyExec::new(schema.clone())),
726 schema.clone(),
727 ));
728 let proto: protobuf::PhysicalPlanNode =
729 protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
730 .expect("to proto");
731
732 let task_ctx = ctx.task_ctx();
733 let result_exec_plan: Arc<dyn ExecutionPlan> = proto
734 .try_into_physical_plan(&task_ctx, &codec)
735 .expect("from proto");
736 assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
737 }
738
739 #[tokio::test]
740 async fn roundtrip_test_delta_logical_codec_preserves_file_selection() {
741 let log_store = crate::test_utils::TestTables::Simple
742 .table_builder()
743 .unwrap()
744 .build_storage()
745 .unwrap();
746 let snapshot = Arc::new(
747 crate::kernel::Snapshot::try_new(&log_store, Default::default(), None)
748 .await
749 .unwrap(),
750 );
751 let table_root = snapshot
752 .scan_builder()
753 .build()
754 .unwrap()
755 .table_root()
756 .clone();
757 let selected_file_ids: Vec<String> = snapshot
758 .file_views(log_store.as_ref(), None)
759 .take(1)
760 .map_ok(|view| table_root.join(view.path_raw()).unwrap().to_string())
761 .try_collect()
762 .await
763 .unwrap();
764 assert_eq!(selected_file_ids.len(), 1);
765
766 let provider = DeltaScanNext::builder()
767 .with_snapshot(snapshot)
768 .build()
769 .await
770 .unwrap()
771 .with_file_selection(
772 FileSelection::new(selected_file_ids.clone())
773 .with_missing_file_policy(MissingFilePolicy::Ignore),
774 );
775
776 let codec = DeltaLogicalCodec {};
777 let table_ref = TableReference::bare("delta_table");
778 let mut encoded = Vec::new();
779 codec
780 .try_encode_table_provider(&table_ref, Arc::new(provider), &mut encoded)
781 .unwrap();
782
783 let ctx = SessionContext::new();
784 let decoded = codec
785 .try_decode_table_provider(
786 &encoded,
787 &table_ref,
788 Arc::new(ArrowSchema::empty()),
789 &ctx.task_ctx(),
790 )
791 .unwrap();
792 let decoded_provider = decoded
793 .as_ref()
794 .as_any()
795 .downcast_ref::<DeltaScanNext>()
796 .unwrap();
797
798 let serialized = serde_json::to_value(decoded_provider).unwrap();
799 let decoded_file_ids = serialized
800 .get("file_selection")
801 .and_then(|value| value.get("file_ids"))
802 .and_then(|value| value.as_array())
803 .unwrap()
804 .iter()
805 .map(|value| value.as_str().unwrap().to_string())
806 .collect::<Vec<_>>();
807 let decoded_policy = serialized
808 .get("file_selection")
809 .and_then(|value| value.get("missing_file_policy"))
810 .and_then(|value| value.as_str())
811 .unwrap();
812
813 assert_eq!(decoded_file_ids, selected_file_ids);
814 assert_eq!(decoded_policy, "Ignore");
815 }
816
817 #[tokio::test]
818 async fn delta_table_provider_with_config() {
819 let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
820 let provider = table
821 .table_provider()
822 .with_file_column("file_source")
823 .await
824 .unwrap();
825 let ctx = SessionContext::new();
826 ctx.register_table("test", provider).unwrap();
827
828 let df = ctx
829 .sql("select c3, c1, c2, right(file_source, 77) as file_source from test")
830 .await
831 .unwrap();
832 let actual = df.collect().await.unwrap();
833 let expected = vec![
834 "+----+----+----+-------------------------------------------------------------------------------+",
835 "| c3 | c1 | c2 | file_source |",
836 "+----+----+----+-------------------------------------------------------------------------------+",
837 "| 4 | 6 | a | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
838 "| 5 | 4 | c | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
839 "| 6 | 5 | b | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
840 "+----+----+----+-------------------------------------------------------------------------------+",
841 ];
842 assert_batches_sorted_eq!(&expected, &actual);
843 }
844
845 #[tokio::test]
846 async fn delta_scan_mixed_partition_order() {
847 let schema = Arc::new(ArrowSchema::new(vec![
850 Field::new("modified", ArrowDataType::Utf8, true),
851 Field::new("id", ArrowDataType::Utf8, true),
852 Field::new("value", ArrowDataType::Int32, true),
853 ]));
854
855 let table = DeltaTable::new_in_memory()
856 .create()
857 .with_columns(get_delta_schema().fields().cloned())
858 .with_partition_columns(["modified", "id"])
859 .await
860 .unwrap();
861 assert_eq!(table.version(), Some(0));
862
863 let batch = RecordBatch::try_new(
864 schema.clone(),
865 vec![
866 Arc::new(arrow::array::StringArray::from(vec![
867 "2021-02-01",
868 "2021-02-01",
869 "2021-02-02",
870 "2021-02-02",
871 ])),
872 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
873 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
874 ],
875 )
876 .unwrap();
877 let table = table
879 .write(vec![batch.clone()])
880 .with_save_mode(crate::protocol::SaveMode::Append)
881 .await
882 .unwrap();
883
884 let provider = table.table_provider().await.unwrap();
885 let logical_schema = provider.schema();
886 let ctx = SessionContext::new();
887 ctx.runtime_env().register_object_store(
888 table.log_store().root_url(),
889 table.log_store().object_store(None),
890 );
891 ctx.register_table("test", provider).unwrap();
892
893 let expected_logical_order = vec!["id", "value", "modified"];
894 let actual_order: Vec<String> = logical_schema
895 .fields()
896 .iter()
897 .map(|f| f.name().to_owned())
898 .collect();
899
900 let df = ctx.sql("select * from test").await.unwrap();
901 let actual = df.collect().await.unwrap();
902 let expected = vec![
903 "+----+-------+------------+",
904 "| id | value | modified |",
905 "+----+-------+------------+",
906 "| A | 1 | 2021-02-01 |",
907 "| B | 10 | 2021-02-01 |",
908 "| C | 20 | 2021-02-02 |",
909 "| D | 100 | 2021-02-02 |",
910 "+----+-------+------------+",
911 ];
912 assert_batches_sorted_eq!(&expected, &actual);
913 assert_eq!(expected_logical_order, actual_order);
914 }
915
916 #[tokio::test]
917 async fn delta_scan_case_sensitive() {
918 let schema = Arc::new(ArrowSchema::new(vec![
919 Field::new("moDified", ArrowDataType::Utf8, true),
920 Field::new("ID", ArrowDataType::Utf8, true),
921 Field::new("vaLue", ArrowDataType::Int32, true),
922 ]));
923
924 let batch = RecordBatch::try_new(
925 schema.clone(),
926 vec![
927 Arc::new(arrow::array::StringArray::from(vec![
928 "2021-02-01",
929 "2021-02-01",
930 "2021-02-02",
931 "2021-02-02",
932 ])),
933 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
934 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
935 ],
936 )
937 .unwrap();
938 let table = DeltaTable::new_in_memory()
940 .write(vec![batch.clone()])
941 .with_save_mode(crate::protocol::SaveMode::Append)
942 .await
943 .unwrap();
944
945 let provider = table.table_provider().await.unwrap();
946 let ctx: SessionContext = DeltaSessionContext::default().into();
947 ctx.register_table("test", provider).unwrap();
948
949 let df = ctx
950 .sql("select ID, moDified, vaLue from test")
951 .await
952 .unwrap();
953 let actual = df.collect().await.unwrap();
954 let expected = vec![
955 "+----+------------+-------+",
956 "| ID | moDified | vaLue |",
957 "+----+------------+-------+",
958 "| A | 2021-02-01 | 1 |",
959 "| B | 2021-02-01 | 10 |",
960 "| C | 2021-02-02 | 20 |",
961 "| D | 2021-02-02 | 100 |",
962 "+----+------------+-------+",
963 ];
964 assert_batches_sorted_eq!(&expected, &actual);
965
966 }
978
979 #[tokio::test]
980 async fn delta_scan_supports_missing_columns() {
981 let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
982 "col_1",
983 ArrowDataType::Utf8,
984 true,
985 )]));
986
987 let batch1 = RecordBatch::try_new(
988 schema1.clone(),
989 vec![Arc::new(arrow::array::StringArray::from(vec![
990 Some("A"),
991 Some("B"),
992 ]))],
993 )
994 .unwrap();
995
996 let schema2 = Arc::new(ArrowSchema::new(vec![
997 Field::new("col_1", ArrowDataType::Utf8, true),
998 Field::new("col_2", ArrowDataType::Utf8, true),
999 ]));
1000
1001 let batch2 = RecordBatch::try_new(
1002 schema2.clone(),
1003 vec![
1004 Arc::new(arrow::array::StringArray::from(vec![
1005 Some("E"),
1006 Some("F"),
1007 Some("G"),
1008 ])),
1009 Arc::new(arrow::array::StringArray::from(vec![
1010 Some("E2"),
1011 Some("F2"),
1012 Some("G2"),
1013 ])),
1014 ],
1015 )
1016 .unwrap();
1017
1018 let table = DeltaTable::new_in_memory()
1019 .write(vec![batch2])
1020 .with_save_mode(crate::protocol::SaveMode::Append)
1021 .await
1022 .unwrap();
1023
1024 let table = table
1025 .write(vec![batch1])
1026 .with_schema_mode(SchemaMode::Merge)
1027 .with_save_mode(crate::protocol::SaveMode::Append)
1028 .await
1029 .unwrap();
1030
1031 let provider = table.table_provider().await.unwrap();
1032 let ctx: SessionContext = DeltaSessionContext::default().into();
1033 ctx.register_table("test", provider).unwrap();
1034
1035 let df = ctx.sql("select col_1, col_2 from test").await.unwrap();
1036 let actual = df.collect().await.unwrap();
1037 let expected = vec![
1038 "+-------+-------+",
1039 "| col_1 | col_2 |",
1040 "+-------+-------+",
1041 "| A | |",
1042 "| B | |",
1043 "| E | E2 |",
1044 "| F | F2 |",
1045 "| G | G2 |",
1046 "+-------+-------+",
1047 ];
1048 assert_batches_sorted_eq!(&expected, &actual);
1049 }
1050
1051 #[tokio::test]
1052 async fn delta_scan_supports_pushdown() {
1053 let schema = Arc::new(ArrowSchema::new(vec![
1054 Field::new("col_1", ArrowDataType::Utf8, false),
1055 Field::new("col_2", ArrowDataType::Utf8, false),
1056 ]));
1057
1058 let batch = RecordBatch::try_new(
1059 schema.clone(),
1060 vec![
1061 Arc::new(arrow::array::StringArray::from(vec![
1062 Some("A"),
1063 Some("B"),
1064 Some("C"),
1065 ])),
1066 Arc::new(arrow::array::StringArray::from(vec![
1067 Some("A2"),
1068 Some("B2"),
1069 Some("C2"),
1070 ])),
1071 ],
1072 )
1073 .unwrap();
1074
1075 let table = DeltaTable::new_in_memory()
1076 .write(vec![batch])
1077 .with_save_mode(crate::protocol::SaveMode::Append)
1078 .await
1079 .unwrap();
1080
1081 let provider = table.table_provider().await.unwrap();
1082
1083 let mut cfg = SessionConfig::default();
1084 cfg.options_mut().execution.parquet.pushdown_filters = true;
1085 let ctx = SessionContext::new_with_config(cfg);
1086 ctx.register_table("test", provider).unwrap();
1087
1088 let df = ctx
1089 .sql("select col_1, col_2 from test WHERE col_1 = 'A'")
1090 .await
1091 .unwrap();
1092 let actual = df.collect().await.unwrap();
1093 let expected = vec![
1094 "+-------+-------+",
1095 "| col_1 | col_2 |",
1096 "+-------+-------+",
1097 "| A | A2 |",
1098 "+-------+-------+",
1099 ];
1100 assert_batches_sorted_eq!(&expected, &actual);
1101 }
1102
1103 #[tokio::test]
1104 async fn delta_scan_supports_nested_missing_columns() {
1105 let column1_schema1: arrow::datatypes::Fields =
1106 vec![Field::new("col_1a", ArrowDataType::Utf8, true)].into();
1107 let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1108 "col_1",
1109 ArrowDataType::Struct(column1_schema1.clone()),
1110 true,
1111 )]));
1112
1113 let batch1 = RecordBatch::try_new(
1114 schema1.clone(),
1115 vec![Arc::new(StructArray::new(
1116 column1_schema1,
1117 vec![Arc::new(arrow::array::StringArray::from(vec![
1118 Some("A"),
1119 Some("B"),
1120 ]))],
1121 None,
1122 ))],
1123 )
1124 .unwrap();
1125
1126 let column1_schema2: arrow_schema::Fields = vec![
1127 Field::new("col_1a", ArrowDataType::Utf8, true),
1128 Field::new("col_1b", ArrowDataType::Utf8, true),
1129 ]
1130 .into();
1131 let schema2 = Arc::new(ArrowSchema::new(vec![Field::new(
1132 "col_1",
1133 ArrowDataType::Struct(column1_schema2.clone()),
1134 true,
1135 )]));
1136
1137 let batch2 = RecordBatch::try_new(
1138 schema2.clone(),
1139 vec![Arc::new(StructArray::new(
1140 column1_schema2,
1141 vec![
1142 Arc::new(arrow::array::StringArray::from(vec![
1143 Some("E"),
1144 Some("F"),
1145 Some("G"),
1146 ])),
1147 Arc::new(arrow::array::StringArray::from(vec![
1148 Some("E2"),
1149 Some("F2"),
1150 Some("G2"),
1151 ])),
1152 ],
1153 None,
1154 ))],
1155 )
1156 .unwrap();
1157
1158 let table = DeltaTable::new_in_memory()
1159 .write(vec![batch1])
1160 .with_save_mode(crate::protocol::SaveMode::Append)
1161 .await
1162 .unwrap();
1163
1164 let table = table
1165 .write(vec![batch2])
1166 .with_schema_mode(SchemaMode::Merge)
1167 .with_save_mode(crate::protocol::SaveMode::Append)
1168 .await
1169 .unwrap();
1170
1171 let provider = table.table_provider().await.unwrap();
1172 let ctx: SessionContext = DeltaSessionContext::default().into();
1173 ctx.register_table("test", provider).unwrap();
1174
1175 let df = ctx
1176 .sql("select col_1.col_1a, col_1.col_1b from test")
1177 .await
1178 .unwrap();
1179 let actual = df.collect().await.unwrap();
1180 let expected = vec![
1181 "+--------------------+--------------------+",
1182 "| test.col_1[col_1a] | test.col_1[col_1b] |",
1183 "+--------------------+--------------------+",
1184 "| A | |",
1185 "| B | |",
1186 "| E | E2 |",
1187 "| F | F2 |",
1188 "| G | G2 |",
1189 "+--------------------+--------------------+",
1190 ];
1191 assert_batches_sorted_eq!(&expected, &actual);
1192 }
1193
1194 #[tokio::test]
1195 async fn test_multiple_predicate_pushdown() {
1196 let schema = Arc::new(ArrowSchema::new(vec![
1197 Field::new("moDified", ArrowDataType::Utf8, true),
1198 Field::new("id", ArrowDataType::Utf8, true),
1199 Field::new("vaLue", ArrowDataType::Int32, true),
1200 ]));
1201
1202 let batch = RecordBatch::try_new(
1203 schema.clone(),
1204 vec![
1205 Arc::new(arrow::array::StringArray::from(vec![
1206 "2021-02-01",
1207 "2021-02-01",
1208 "2021-02-02",
1209 "2021-02-02",
1210 ])),
1211 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1212 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1213 ],
1214 )
1215 .unwrap();
1216 let table = DeltaTable::new_in_memory()
1218 .write(vec![batch.clone()])
1219 .with_save_mode(crate::protocol::SaveMode::Append)
1220 .await
1221 .unwrap();
1222
1223 let datafusion = SessionContext::new();
1224 table
1225 .update_datafusion_session(&datafusion.state())
1226 .unwrap();
1227
1228 datafusion
1229 .register_table("snapshot", table.table_provider().await.unwrap())
1230 .unwrap();
1231
1232 let df = datafusion
1233 .sql("select * from snapshot where id > 10000 and id < 20000")
1234 .await
1235 .unwrap();
1236
1237 df.collect().await.unwrap();
1238 }
1239
1240 #[tokio::test]
1246 async fn passes_sanity_checker_when_all_files_filtered() {
1247 let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
1248 let ctx = create_session().into_inner();
1249 ctx.register_table("test", table.table_provider().await.unwrap())
1250 .unwrap();
1251
1252 let df = ctx
1253 .sql("select * from test where c3 = 100 ORDER BY c1 ASC")
1254 .await
1255 .unwrap();
1256 let actual = df.collect().await.unwrap();
1257
1258 assert_eq!(actual.len(), 0);
1259 }
1260
1261 #[tokio::test]
1262 async fn test_delta_scan_uses_parquet_column_pruning() {
1263 let small: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["a"]));
1264 let large: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec![
1265 "b".repeat(1024).as_str(),
1266 ]));
1267 let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap();
1268 let table = DeltaTable::new_in_memory()
1269 .write(vec![batch])
1270 .with_save_mode(crate::protocol::SaveMode::Append)
1271 .await
1272 .unwrap();
1273
1274 let config = DeltaScanConfigBuilder::new()
1275 .build(table.snapshot().unwrap().snapshot())
1276 .unwrap();
1277
1278 let (log_store, mut operations) = recording_log_store(table.log_store());
1279 let provider = DeltaScanNext::new(table.snapshot().unwrap().snapshot().clone(), config)
1280 .unwrap()
1281 .with_log_store(log_store.clone());
1282 let ctx = SessionContext::new();
1283 ctx.register_table("test", Arc::new(provider)).unwrap();
1284 let state = ctx.state();
1285
1286 let df = ctx.sql("select small from test").await.unwrap();
1287 let plan = df.create_physical_plan().await.unwrap();
1288
1289 let mut stream = plan.execute(0, state.task_ctx()).unwrap();
1290 let Some(Ok(batch)) = stream.next().await else {
1291 panic!()
1292 };
1293 assert!(stream.next().await.is_none());
1294 assert_eq!(1, batch.num_columns());
1295 assert_eq!(1, batch.num_rows());
1296 let small = ScalarValue::try_from_array(batch.column_by_name("small").unwrap(), 0).unwrap();
1297 assert_eq!("a", small.to_string());
1298
1299 let files = table.get_files_by_partitions(&[]).await.unwrap();
1300 assert_eq!(1, files.len());
1301 let object_store = table.object_store();
1302 let file_meta = object_store.head(&files[0]).await.unwrap();
1303 let file_reader = parquet::arrow::async_reader::ParquetObjectReader::new(
1304 object_store,
1305 file_meta.location.clone(),
1306 )
1307 .with_file_size(file_meta.size);
1308 let parquet_metadata =
1309 parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder::new(file_reader)
1310 .await
1311 .unwrap()
1312 .metadata()
1313 .as_ref()
1314 .clone();
1315 let (small_start, small_len) = parquet_metadata.row_group(0).column(0).byte_range();
1316 let small_range = small_start..small_start + small_len;
1317 let (large_start, large_len) = parquet_metadata.row_group(0).column(1).byte_range();
1318 let large_range = large_start..large_start + large_len;
1319
1320 let actual = drain_recorded_ops(&mut operations).await;
1321
1322 let data_ranges = actual
1323 .iter()
1324 .flat_map(|operation| match operation {
1325 ObjectStoreOperation::GetRange(PathKind::Data, range) => vec![range.clone()],
1326 ObjectStoreOperation::GetRanges(PathKind::Data, ranges) => ranges.clone(),
1327 _ => Vec::new(),
1328 })
1329 .collect::<Vec<_>>();
1330
1331 let overlaps = |left: &Range<u64>, right: &Range<u64>| {
1332 left.start < right.end && right.start < left.end
1333 };
1334
1335 assert!(
1336 !data_ranges.is_empty(),
1337 "expected ranged parquet data reads, saw {actual:?}"
1338 );
1339 assert!(
1340 data_ranges
1341 .iter()
1342 .any(|range| overlaps(range, &small_range)),
1343 "expected selected column chunk {small_range:?} to be read, saw {actual:?}"
1344 );
1345 assert!(
1346 data_ranges
1347 .iter()
1348 .all(|range| !overlaps(range, &large_range)),
1349 "expected unselected column chunk {large_range:?} to be pruned, saw {actual:?}"
1350 );
1351 assert!(
1352 !actual.iter().any(|operation| matches!(
1353 operation,
1354 ObjectStoreOperation::Get(PathKind::Data)
1355 | ObjectStoreOperation::GetOpts(PathKind::Data)
1356 )),
1357 "expected no full data file reads, saw {actual:?}"
1358 );
1359 }
1360
1361 #[tokio::test]
1362 async fn test_push_down_filter_panic_2602() -> DeltaResult<()> {
1363 use crate::kernel::schema::{DataType, PrimitiveType};
1364 let ctx = SessionContext::new();
1365 let table = DeltaTable::new_in_memory()
1366 .create()
1367 .with_column("id", DataType::Primitive(PrimitiveType::Long), true, None)
1368 .with_column(
1369 "name",
1370 DataType::Primitive(PrimitiveType::String),
1371 true,
1372 None,
1373 )
1374 .with_column("b", DataType::Primitive(PrimitiveType::Boolean), true, None)
1375 .with_column(
1376 "ts",
1377 DataType::Primitive(PrimitiveType::Timestamp),
1378 true,
1379 None,
1380 )
1381 .with_column("dt", DataType::Primitive(PrimitiveType::Date), true, None)
1382 .with_column(
1383 "zap",
1384 DataType::Array(Box::new(ArrayType::new(
1385 DataType::Primitive(PrimitiveType::Boolean),
1386 true,
1387 ))),
1388 true,
1389 None,
1390 )
1391 .await?;
1392 table.update_datafusion_session(&ctx.state()).unwrap();
1393
1394 ctx.register_table("snapshot", table.table_provider().await.unwrap())
1395 .unwrap();
1396
1397 let df = ctx
1398 .sql("select * from snapshot where id > 10000 and id < 20000")
1399 .await
1400 .unwrap();
1401
1402 let _ = df.collect().await?;
1403 Ok(())
1404 }
1405}