1use std::fmt::Debug;
27use std::sync::Arc;
28
29use arrow::array::types::UInt16Type;
30use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
31use arrow_cast::{CastOptions, cast_with_options};
32use arrow_schema::{
33 DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef,
34 TimeUnit,
35};
36use datafusion::catalog::{Session, TableProviderFactory};
37use datafusion::common::scalar::ScalarValue;
38use datafusion::common::{
39 Column, DFSchema, DataFusionError, Result as DataFusionResult, TableReference, ToDFSchema,
40};
41use datafusion::datasource::TableProvider;
42use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
43use datafusion::execution::TaskContext;
44use datafusion::execution::context::SessionContext;
45use datafusion::logical_expr::logical_plan::CreateExternalTable;
46use datafusion::logical_expr::utils::conjunction;
47use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
48use datafusion::physical_optimizer::pruning::PruningPredicate;
49use datafusion::physical_plan::ExecutionPlan;
50use datafusion_proto::logical_plan::LogicalExtensionCodec;
51use datafusion_proto::physical_plan::PhysicalExtensionCodec;
52use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
53use either::Either;
54
55use crate::delta_datafusion::expr::parse_predicate_expression;
56use crate::delta_datafusion::table_provider::DeltaScanWire;
57use crate::ensure_table_uri;
58use crate::errors::{DeltaResult, DeltaTableError};
59use crate::kernel::{Add, EagerSnapshot, LogDataHandler, Snapshot};
60use crate::{open_table, open_table_with_storage_options};
61
62pub(crate) use self::session::DeltaSessionExt;
63pub use self::session::{
64 DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, DeltaSessionContext,
65 create_session, create_session_state_with_spill_config,
66};
67pub use self::table_provider::next::{DeletionVectorSelection, DeltaScan as DeltaScanNext};
68pub(crate) use self::utils::*;
69pub use cdf::scan::DeltaCdfTableProvider;
70pub(crate) use data_validation::{
71 DataValidationExec, constraints_to_exprs, generated_columns_to_exprs, validation_predicates,
72};
73pub(crate) use find_files::*;
74pub(crate) use table_provider::next::normalize_path_as_file_id;
75pub use table_provider::{
76 DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder,
77 next::DeltaScanExec,
78};
79pub(crate) use table_provider::{
80 DeltaScanBuilder, next::FILE_ID_COLUMN_DEFAULT, resolve_file_column_name,
81 update_datafusion_session,
82};
83
84pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
85
86pub mod cdf;
87mod data_validation;
88pub mod engine;
89pub mod expr;
90mod file_id;
91mod find_files;
92pub mod logical;
93pub mod physical;
94pub mod planner;
95mod session;
96pub use session::SessionFallbackPolicy;
97pub(crate) use session::{SessionResolveContext, resolve_session_state};
98mod table_provider;
99pub(crate) mod utils;
100
101impl From<DeltaTableError> for DataFusionError {
102 fn from(err: DeltaTableError) -> Self {
103 match err {
104 DeltaTableError::Arrow { source } => DataFusionError::from(source),
105 DeltaTableError::Io { source } => DataFusionError::IoError(source),
106 DeltaTableError::ObjectStore { source } => DataFusionError::from(source),
107 DeltaTableError::Parquet { source } => DataFusionError::from(source),
108 _ => DataFusionError::External(Box::new(err)),
109 }
110 }
111}
112
113impl From<DataFusionError> for DeltaTableError {
114 fn from(err: DataFusionError) -> Self {
115 match err {
116 DataFusionError::ArrowError(source, _) => DeltaTableError::from(*source),
117 DataFusionError::IoError(source) => DeltaTableError::Io { source },
118 DataFusionError::ObjectStore(source) => DeltaTableError::from(*source),
119 DataFusionError::ParquetError(source) => DeltaTableError::from(*source),
120 _ => DeltaTableError::Generic(err.to_string()),
121 }
122 }
123}
124
125pub trait DataFusionMixins {
127 fn read_schema(&self) -> ArrowSchemaRef;
129
130 fn input_schema(&self) -> ArrowSchemaRef;
132
133 fn parse_predicate_expression(
135 &self,
136 expr: impl AsRef<str>,
137 session: &dyn Session,
138 ) -> DeltaResult<Expr>;
139}
140
141impl DataFusionMixins for Snapshot {
142 fn read_schema(&self) -> ArrowSchemaRef {
143 _arrow_schema(
144 self.arrow_schema(),
145 self.metadata().partition_columns(),
146 true,
147 )
148 }
149
150 fn input_schema(&self) -> ArrowSchemaRef {
151 _arrow_schema(
152 self.arrow_schema(),
153 self.metadata().partition_columns(),
154 false,
155 )
156 }
157
158 fn parse_predicate_expression(
159 &self,
160 expr: impl AsRef<str>,
161 session: &dyn Session,
162 ) -> DeltaResult<Expr> {
163 let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
164 parse_predicate_expression(&schema, expr, session)
165 }
166}
167
168impl DataFusionMixins for LogDataHandler<'_> {
169 fn read_schema(&self) -> ArrowSchemaRef {
170 _arrow_schema(
171 Arc::new(
172 self.table_configuration()
173 .schema()
174 .as_ref()
175 .try_into_arrow()
176 .unwrap(),
177 ),
178 self.table_configuration().metadata().partition_columns(),
179 true,
180 )
181 }
182
183 fn input_schema(&self) -> ArrowSchemaRef {
184 _arrow_schema(
185 Arc::new(
186 self.table_configuration()
187 .schema()
188 .as_ref()
189 .try_into_arrow()
190 .unwrap(),
191 ),
192 self.table_configuration().metadata().partition_columns(),
193 false,
194 )
195 }
196
197 fn parse_predicate_expression(
198 &self,
199 expr: impl AsRef<str>,
200 session: &dyn Session,
201 ) -> DeltaResult<Expr> {
202 let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
203 parse_predicate_expression(&schema, expr, session)
204 }
205}
206
207impl DataFusionMixins for EagerSnapshot {
208 fn read_schema(&self) -> ArrowSchemaRef {
209 self.snapshot().read_schema()
210 }
211
212 fn input_schema(&self) -> ArrowSchemaRef {
213 self.snapshot().input_schema()
214 }
215
216 fn parse_predicate_expression(
217 &self,
218 expr: impl AsRef<str>,
219 session: &dyn Session,
220 ) -> DeltaResult<Expr> {
221 self.snapshot().parse_predicate_expression(expr, session)
222 }
223}
224
225fn _arrow_schema(
226 schema: SchemaRef,
227 partition_columns: &[String],
228 wrap_partitions: bool,
229) -> ArrowSchemaRef {
230 let fields = schema
231 .fields()
232 .into_iter()
233 .filter(|f| !partition_columns.contains(&f.name().to_string()))
234 .cloned()
235 .chain(
236 partition_columns.iter().map(|partition_col| {
239 let field = schema.field_with_name(partition_col).unwrap();
240 let corrected = if wrap_partitions {
241 match field.data_type() {
242 ArrowDataType::Utf8
245 | ArrowDataType::LargeUtf8
246 | ArrowDataType::Binary
247 | ArrowDataType::LargeBinary => {
248 wrap_partition_type_in_dict(field.data_type().clone())
249 }
250 _ => field.data_type().clone(),
251 }
252 } else {
253 field.data_type().clone()
254 };
255 Arc::new(field.clone().with_data_type(corrected))
256 }),
257 )
258 .collect::<Vec<_>>();
259 Arc::new(ArrowSchema::new(fields))
260}
261
262pub(crate) fn files_matching_predicate<'a>(
263 log_data: LogDataHandler<'a>,
264 filters: &[Expr],
265) -> DeltaResult<impl Iterator<Item = Add> + 'a> {
266 if let Some(Some(predicate)) =
267 (!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
268 {
269 let expr = SessionContext::new()
270 .create_physical_expr(predicate, &log_data.read_schema().to_dfschema()?)?;
271 let pruning_predicate = PruningPredicate::try_new(expr, log_data.read_schema())?;
272 let mask = pruning_predicate.prune(&log_data)?;
273
274 Ok(Either::Left(log_data.into_iter().zip(mask).filter_map(
275 |(file, keep_file)| {
276 if keep_file {
277 Some(file.add_action())
278 } else {
279 None
280 }
281 },
282 )))
283 } else {
284 Ok(Either::Right(
285 log_data.into_iter().map(|file| file.add_action()),
286 ))
287 }
288}
289
290pub(crate) fn get_path_column<'a>(
291 batch: &'a RecordBatch,
292 path_column: &str,
293) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
294 let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
295 batch
296 .column_by_name(path_column)
297 .unwrap()
298 .as_any()
299 .downcast_ref::<DictionaryArray<UInt16Type>>()
300 .ok_or_else(err)?
301 .downcast_dict::<StringArray>()
302 .ok_or_else(err)
303}
304
305pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
306 match t {
307 ArrowDataType::Null => Ok(ScalarValue::Null),
308 ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
309 ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
310 ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
311 ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
312 ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
313 ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
314 ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
315 ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
316 ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
317 ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
318 ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
319 ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
320 ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
321 ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
322 ArrowDataType::FixedSizeBinary(size) => {
323 Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
324 }
325 ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
326 ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
327 ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
328 ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
329 None,
330 precision.to_owned(),
331 scale.to_owned(),
332 )),
333 ArrowDataType::Timestamp(unit, tz) => {
334 let tz = tz.to_owned();
335 Ok(match unit {
336 TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
337 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
338 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
339 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
340 })
341 }
342 ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
343 k.clone(),
344 Box::new(get_null_of_arrow_type(v)?),
345 )),
346 ArrowDataType::Float16
348 | ArrowDataType::Decimal32(_, _)
349 | ArrowDataType::Decimal64(_, _)
350 | ArrowDataType::Decimal256(_, _)
351 | ArrowDataType::Union(_, _)
352 | ArrowDataType::LargeList(_)
353 | ArrowDataType::Struct(_)
354 | ArrowDataType::List(_)
355 | ArrowDataType::FixedSizeList(_, _)
356 | ArrowDataType::Time32(_)
357 | ArrowDataType::Time64(_)
358 | ArrowDataType::Duration(_)
359 | ArrowDataType::Interval(_)
360 | ArrowDataType::RunEndEncoded(_, _)
361 | ArrowDataType::BinaryView
362 | ArrowDataType::Utf8View
363 | ArrowDataType::LargeListView(_)
364 | ArrowDataType::ListView(_)
365 | ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
366 "Unsupported data type for Delta Lake {t}"
367 ))),
368 }
369}
370
371fn parse_date(
372 stat_val: &serde_json::Value,
373 field_dt: &ArrowDataType,
374) -> DataFusionResult<ScalarValue> {
375 let string = match stat_val {
376 serde_json::Value::String(s) => s.to_owned(),
377 _ => stat_val.to_string(),
378 };
379
380 let time_micro = ScalarValue::try_from_string(string, &ArrowDataType::Date32)?;
381 let cast_arr = cast_with_options(
382 &time_micro.to_array()?,
383 field_dt,
384 &CastOptions {
385 safe: false,
386 ..Default::default()
387 },
388 )?;
389 ScalarValue::try_from_array(&cast_arr, 0)
390}
391
392fn parse_timestamp(
393 stat_val: &serde_json::Value,
394 field_dt: &ArrowDataType,
395) -> DataFusionResult<ScalarValue> {
396 let string = match stat_val {
397 serde_json::Value::String(s) => s.to_owned(),
398 _ => stat_val.to_string(),
399 };
400
401 let time_micro = ScalarValue::try_from_string(
402 string,
403 &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
404 )?;
405 let cast_arr = cast_with_options(
406 &time_micro.to_array()?,
407 field_dt,
408 &CastOptions {
409 safe: false,
410 ..Default::default()
411 },
412 )?;
413 ScalarValue::try_from_array(&cast_arr, 0)
414}
415
416pub(crate) fn to_correct_scalar_value(
417 stat_val: &serde_json::Value,
418 field_dt: &ArrowDataType,
419) -> DataFusionResult<Option<ScalarValue>> {
420 match stat_val {
421 serde_json::Value::Array(_) => Ok(None),
422 serde_json::Value::Object(_) => Ok(None),
423 serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)),
424 serde_json::Value::String(string_val) => match field_dt {
425 ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
426 ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
427 _ => Ok(Some(ScalarValue::try_from_string(
428 string_val.to_owned(),
429 field_dt,
430 )?)),
431 },
432 other => match field_dt {
433 ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
434 ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
435 _ => Ok(Some(ScalarValue::try_from_string(
436 other.to_string(),
437 field_dt,
438 )?)),
439 },
440 }
441}
442
443#[derive(Debug)]
445pub struct DeltaPhysicalCodec {}
446
447impl PhysicalExtensionCodec for DeltaPhysicalCodec {
448 fn try_decode(
449 &self,
450 buf: &[u8],
451 inputs: &[Arc<dyn ExecutionPlan>],
452 _registry: &TaskContext,
453 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
454 let wire: DeltaScanWire = serde_json::from_reader(buf)
455 .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
456 let delta_scan = DeltaScan::new(
457 &wire.table_url,
458 wire.config,
459 (*inputs)[0].clone(),
460 wire.logical_schema,
461 );
462 Ok(Arc::new(delta_scan))
463 }
464
465 fn try_encode(
466 &self,
467 node: Arc<dyn ExecutionPlan>,
468 buf: &mut Vec<u8>,
469 ) -> Result<(), DataFusionError> {
470 let delta_scan = node
471 .as_any()
472 .downcast_ref::<DeltaScan>()
473 .ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?;
474
475 let wire = DeltaScanWire::from(delta_scan);
476 serde_json::to_writer(buf, &wire)
477 .map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?;
478 Ok(())
479 }
480}
481
482#[derive(Debug)]
484pub struct DeltaLogicalCodec {}
485
486impl LogicalExtensionCodec for DeltaLogicalCodec {
487 fn try_decode(
488 &self,
489 _buf: &[u8],
490 _inputs: &[LogicalPlan],
491 _ctx: &TaskContext,
492 ) -> Result<Extension, DataFusionError> {
493 todo!("DeltaLogicalCodec")
494 }
495
496 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<(), DataFusionError> {
497 todo!("DeltaLogicalCodec")
498 }
499
500 fn try_decode_table_provider(
501 &self,
502 buf: &[u8],
503 _table_ref: &TableReference,
504 _schema: SchemaRef,
505 _ctx: &TaskContext,
506 ) -> Result<Arc<dyn TableProvider>, DataFusionError> {
507 let provider: DeltaScanNext = serde_json::from_slice(buf)
508 .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))?;
509 Ok(Arc::new(provider))
510 }
511
512 fn try_encode_table_provider(
513 &self,
514 _table_ref: &TableReference,
515 node: Arc<dyn TableProvider>,
516 buf: &mut Vec<u8>,
517 ) -> Result<(), DataFusionError> {
518 let scan = node
519 .as_ref()
520 .as_any()
521 .downcast_ref::<DeltaScanNext>()
522 .ok_or_else(|| {
523 DataFusionError::Internal("Can't encode non-delta tables".to_string())
524 })?;
525 serde_json::to_writer(buf, scan)
526 .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
527 }
528}
529
530#[derive(Debug)]
532pub struct DeltaTableFactory {}
533
534#[async_trait::async_trait]
535impl TableProviderFactory for DeltaTableFactory {
536 async fn create(
537 &self,
538 ctx: &dyn Session,
539 cmd: &CreateExternalTable,
540 ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
541 let table = if cmd.options.is_empty() {
542 let table_url = ensure_table_uri(&cmd.to_owned().location)?;
543 open_table(table_url).await?
544 } else {
545 let table_url = ensure_table_uri(&cmd.to_owned().location)?;
546 open_table_with_storage_options(table_url, cmd.to_owned().options).await?
547 };
548 let table_uri = table.log_store().root_url().clone();
549 let (session_state, _) = resolve_session_state(
550 Some(ctx),
551 SessionFallbackPolicy::DeriveFromTrait,
552 || create_session().state(),
553 SessionResolveContext {
554 operation: "DeltaTableFactory::create",
555 table_uri: Some(&table_uri),
556 cdc: false,
557 },
558 )?;
559
560 Ok(table
561 .table_provider()
562 .with_session(Arc::new(session_state))
563 .await?)
564 }
565}
566
567pub struct DeltaColumn {
569 inner: Column,
570}
571
572impl From<&str> for DeltaColumn {
573 fn from(c: &str) -> Self {
574 DeltaColumn {
575 inner: Column::from_qualified_name_ignore_case(c),
576 }
577 }
578}
579
580impl From<&String> for DeltaColumn {
582 fn from(c: &String) -> Self {
583 DeltaColumn {
584 inner: Column::from_qualified_name_ignore_case(c),
585 }
586 }
587}
588
589impl From<String> for DeltaColumn {
591 fn from(c: String) -> Self {
592 DeltaColumn {
593 inner: Column::from_qualified_name_ignore_case(c),
594 }
595 }
596}
597
598impl From<DeltaColumn> for Column {
599 fn from(value: DeltaColumn) -> Self {
600 value.inner
601 }
602}
603
604impl From<Column> for DeltaColumn {
606 fn from(c: Column) -> Self {
607 DeltaColumn { inner: c }
608 }
609}
610
611#[cfg(test)]
612mod tests {
613 use crate::DeltaTable;
614 use crate::logstore::ObjectStoreRef;
615 use crate::logstore::default_logstore::DefaultLogStore;
616 use crate::operations::write::SchemaMode;
617 use crate::test_utils::open_fs_path;
618 use crate::writer::test_utils::get_delta_schema;
619 use arrow::array::StructArray;
620 use arrow::datatypes::{Field, Schema};
621 use arrow_array::cast::AsArray;
622 use bytes::Bytes;
623 use datafusion::assert_batches_sorted_eq;
624 use datafusion::config::TableParquetOptions;
625 use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
626 use datafusion::datasource::source::DataSourceExec;
627 use datafusion::logical_expr::lit;
628 use datafusion::physical_plan::empty::EmptyExec;
629 use datafusion::physical_plan::{ExecutionPlanVisitor, PhysicalExpr, visit_execution_plan};
630 use datafusion::prelude::{SessionConfig, col};
631 use datafusion_datasource::file::FileSource as _;
632 use datafusion_proto::physical_plan::AsExecutionPlan;
633 use datafusion_proto::protobuf;
634 use delta_kernel::path::{LogPathFileType, ParsedLogPath};
635 use delta_kernel::schema::ArrayType;
636 use futures::{StreamExt, TryStreamExt, stream::BoxStream};
637 use object_store::ObjectMeta;
638 use object_store::{
639 GetOptions, GetResult, ListResult, MultipartUpload, ObjectStore, PutMultipartOptions,
640 PutOptions, PutPayload, PutResult, path::Path,
641 };
642 use serde_json::json;
643 use std::fmt::{self, Debug, Display, Formatter};
644 use std::ops::Range;
645 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
646 use url::Url;
647
648 use super::*;
649 use crate::delta_datafusion::table_provider::next::{FileSelection, MissingFilePolicy};
650 #[test]
653 fn test_to_correct_scalar_value() {
654 let reference_pairs = &[
655 (
656 json!("2015"),
657 ArrowDataType::Int16,
658 ScalarValue::Int16(Some(2015)),
659 ),
660 (
661 json!("2015"),
662 ArrowDataType::Int32,
663 ScalarValue::Int32(Some(2015)),
664 ),
665 (
666 json!("2015"),
667 ArrowDataType::Int64,
668 ScalarValue::Int64(Some(2015)),
669 ),
670 (
671 json!("2015"),
672 ArrowDataType::Float32,
673 ScalarValue::Float32(Some(2015_f32)),
674 ),
675 (
676 json!("2015"),
677 ArrowDataType::Float64,
678 ScalarValue::Float64(Some(2015_f64)),
679 ),
680 (
681 json!(2015),
682 ArrowDataType::Float64,
683 ScalarValue::Float64(Some(2015_f64)),
684 ),
685 (
686 json!("2015-01-01"),
687 ArrowDataType::Date32,
688 ScalarValue::Date32(Some(16436)),
689 ),
690 (
712 json!(true),
713 ArrowDataType::Boolean,
714 ScalarValue::Boolean(Some(true)),
715 ),
716 ];
717
718 for (raw, data_type, ref_scalar) in reference_pairs {
719 let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap();
720 assert_eq!(*ref_scalar, scalar)
721 }
722 }
723
724 #[test]
725 fn roundtrip_test_delta_exec_plan() {
726 let ctx = SessionContext::new();
727 let codec = DeltaPhysicalCodec {};
728
729 let schema = Arc::new(Schema::new(vec![
730 Field::new("a", ArrowDataType::Utf8, false),
731 Field::new("b", ArrowDataType::Int32, false),
732 ]));
733 let exec_plan = Arc::from(DeltaScan::new(
734 &Url::parse("s3://my_bucket/this/is/some/path").unwrap(),
735 DeltaScanConfig::default(),
736 Arc::from(EmptyExec::new(schema.clone())),
737 schema.clone(),
738 ));
739 let proto: protobuf::PhysicalPlanNode =
740 protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
741 .expect("to proto");
742
743 let task_ctx = ctx.task_ctx();
744 let result_exec_plan: Arc<dyn ExecutionPlan> = proto
745 .try_into_physical_plan(&task_ctx, &codec)
746 .expect("from proto");
747 assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
748 }
749
750 #[tokio::test]
751 async fn roundtrip_test_delta_logical_codec_preserves_file_selection() {
752 let log_store = crate::test_utils::TestTables::Simple
753 .table_builder()
754 .unwrap()
755 .build_storage()
756 .unwrap();
757 let snapshot = Arc::new(
758 crate::kernel::Snapshot::try_new(&log_store, Default::default(), None)
759 .await
760 .unwrap(),
761 );
762 let table_root = snapshot
763 .scan_builder()
764 .build()
765 .unwrap()
766 .table_root()
767 .clone();
768 let selected_file_ids: Vec<String> = snapshot
769 .file_views(log_store.as_ref(), None)
770 .take(1)
771 .map_ok(|view| table_root.join(view.path_raw()).unwrap().to_string())
772 .try_collect()
773 .await
774 .unwrap();
775 assert_eq!(selected_file_ids.len(), 1);
776
777 let provider = DeltaScanNext::builder()
778 .with_snapshot(snapshot)
779 .build()
780 .await
781 .unwrap()
782 .with_file_selection(
783 FileSelection::new(selected_file_ids.clone())
784 .with_missing_file_policy(MissingFilePolicy::Ignore),
785 );
786
787 let codec = DeltaLogicalCodec {};
788 let table_ref = TableReference::bare("delta_table");
789 let mut encoded = Vec::new();
790 codec
791 .try_encode_table_provider(&table_ref, Arc::new(provider), &mut encoded)
792 .unwrap();
793
794 let ctx = SessionContext::new();
795 let decoded = codec
796 .try_decode_table_provider(
797 &encoded,
798 &table_ref,
799 Arc::new(ArrowSchema::empty()),
800 &ctx.task_ctx(),
801 )
802 .unwrap();
803 let decoded_provider = decoded
804 .as_ref()
805 .as_any()
806 .downcast_ref::<DeltaScanNext>()
807 .unwrap();
808
809 let serialized = serde_json::to_value(decoded_provider).unwrap();
810 let decoded_file_ids = serialized
811 .get("file_selection")
812 .and_then(|value| value.get("file_ids"))
813 .and_then(|value| value.as_array())
814 .unwrap()
815 .iter()
816 .map(|value| value.as_str().unwrap().to_string())
817 .collect::<Vec<_>>();
818 let decoded_policy = serialized
819 .get("file_selection")
820 .and_then(|value| value.get("missing_file_policy"))
821 .and_then(|value| value.as_str())
822 .unwrap();
823
824 assert_eq!(decoded_file_ids, selected_file_ids);
825 assert_eq!(decoded_policy, "Ignore");
826 }
827
828 #[tokio::test]
829 async fn delta_table_provider_with_config() {
830 let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
831 let provider = table
832 .table_provider()
833 .with_file_column("file_source")
834 .await
835 .unwrap();
836 let ctx = SessionContext::new();
837 ctx.register_table("test", provider).unwrap();
838
839 let df = ctx
840 .sql("select c3, c1, c2, right(file_source, 77) as file_source from test")
841 .await
842 .unwrap();
843 let actual = df.collect().await.unwrap();
844 let expected = vec![
845 "+----+----+----+-------------------------------------------------------------------------------+",
846 "| c3 | c1 | c2 | file_source |",
847 "+----+----+----+-------------------------------------------------------------------------------+",
848 "| 4 | 6 | a | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
849 "| 5 | 4 | c | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
850 "| 6 | 5 | b | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
851 "+----+----+----+-------------------------------------------------------------------------------+",
852 ];
853 assert_batches_sorted_eq!(&expected, &actual);
854 }
855
856 #[tokio::test]
857 async fn delta_scan_mixed_partition_order() {
858 let schema = Arc::new(ArrowSchema::new(vec![
861 Field::new("modified", ArrowDataType::Utf8, true),
862 Field::new("id", ArrowDataType::Utf8, true),
863 Field::new("value", ArrowDataType::Int32, true),
864 ]));
865
866 let table = DeltaTable::new_in_memory()
867 .create()
868 .with_columns(get_delta_schema().fields().cloned())
869 .with_partition_columns(["modified", "id"])
870 .await
871 .unwrap();
872 assert_eq!(table.version(), Some(0));
873
874 let batch = RecordBatch::try_new(
875 schema.clone(),
876 vec![
877 Arc::new(arrow::array::StringArray::from(vec![
878 "2021-02-01",
879 "2021-02-01",
880 "2021-02-02",
881 "2021-02-02",
882 ])),
883 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
884 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
885 ],
886 )
887 .unwrap();
888 let table = table
890 .write(vec![batch.clone()])
891 .with_save_mode(crate::protocol::SaveMode::Append)
892 .await
893 .unwrap();
894
895 let provider = table.table_provider().await.unwrap();
896 let logical_schema = provider.schema();
897 let ctx = SessionContext::new();
898 ctx.runtime_env().register_object_store(
899 table.log_store().root_url(),
900 table.log_store().object_store(None),
901 );
902 ctx.register_table("test", provider).unwrap();
903
904 let expected_logical_order = vec!["id", "value", "modified"];
905 let actual_order: Vec<String> = logical_schema
906 .fields()
907 .iter()
908 .map(|f| f.name().to_owned())
909 .collect();
910
911 let df = ctx.sql("select * from test").await.unwrap();
912 let actual = df.collect().await.unwrap();
913 let expected = vec![
914 "+----+-------+------------+",
915 "| id | value | modified |",
916 "+----+-------+------------+",
917 "| A | 1 | 2021-02-01 |",
918 "| B | 10 | 2021-02-01 |",
919 "| C | 20 | 2021-02-02 |",
920 "| D | 100 | 2021-02-02 |",
921 "+----+-------+------------+",
922 ];
923 assert_batches_sorted_eq!(&expected, &actual);
924 assert_eq!(expected_logical_order, actual_order);
925 }
926
927 #[tokio::test]
928 async fn delta_scan_case_sensitive() {
929 let schema = Arc::new(ArrowSchema::new(vec![
930 Field::new("moDified", ArrowDataType::Utf8, true),
931 Field::new("ID", ArrowDataType::Utf8, true),
932 Field::new("vaLue", ArrowDataType::Int32, true),
933 ]));
934
935 let batch = RecordBatch::try_new(
936 schema.clone(),
937 vec![
938 Arc::new(arrow::array::StringArray::from(vec![
939 "2021-02-01",
940 "2021-02-01",
941 "2021-02-02",
942 "2021-02-02",
943 ])),
944 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
945 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
946 ],
947 )
948 .unwrap();
949 let table = DeltaTable::new_in_memory()
951 .write(vec![batch.clone()])
952 .with_save_mode(crate::protocol::SaveMode::Append)
953 .await
954 .unwrap();
955
956 let config = DeltaScanConfigBuilder::new()
957 .build(table.snapshot().unwrap().snapshot())
958 .unwrap();
959 let log = table.log_store();
960
961 let provider =
962 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
963 .unwrap();
964 let ctx: SessionContext = DeltaSessionContext::default().into();
965 ctx.register_table("test", Arc::new(provider)).unwrap();
966
967 let df = ctx
968 .sql("select ID, moDified, vaLue from test")
969 .await
970 .unwrap();
971 let actual = df.collect().await.unwrap();
972 let expected = vec![
973 "+----+------------+-------+",
974 "| ID | moDified | vaLue |",
975 "+----+------------+-------+",
976 "| A | 2021-02-01 | 1 |",
977 "| B | 2021-02-01 | 10 |",
978 "| C | 2021-02-02 | 20 |",
979 "| D | 2021-02-02 | 100 |",
980 "+----+------------+-------+",
981 ];
982 assert_batches_sorted_eq!(&expected, &actual);
983
984 }
996
997 #[tokio::test]
998 async fn delta_scan_supports_missing_columns() {
999 let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1000 "col_1",
1001 ArrowDataType::Utf8,
1002 true,
1003 )]));
1004
1005 let batch1 = RecordBatch::try_new(
1006 schema1.clone(),
1007 vec![Arc::new(arrow::array::StringArray::from(vec![
1008 Some("A"),
1009 Some("B"),
1010 ]))],
1011 )
1012 .unwrap();
1013
1014 let schema2 = Arc::new(ArrowSchema::new(vec![
1015 Field::new("col_1", ArrowDataType::Utf8, true),
1016 Field::new("col_2", ArrowDataType::Utf8, true),
1017 ]));
1018
1019 let batch2 = RecordBatch::try_new(
1020 schema2.clone(),
1021 vec![
1022 Arc::new(arrow::array::StringArray::from(vec![
1023 Some("E"),
1024 Some("F"),
1025 Some("G"),
1026 ])),
1027 Arc::new(arrow::array::StringArray::from(vec![
1028 Some("E2"),
1029 Some("F2"),
1030 Some("G2"),
1031 ])),
1032 ],
1033 )
1034 .unwrap();
1035
1036 let table = DeltaTable::new_in_memory()
1037 .write(vec![batch2])
1038 .with_save_mode(crate::protocol::SaveMode::Append)
1039 .await
1040 .unwrap();
1041
1042 let table = table
1043 .write(vec![batch1])
1044 .with_schema_mode(SchemaMode::Merge)
1045 .with_save_mode(crate::protocol::SaveMode::Append)
1046 .await
1047 .unwrap();
1048
1049 let config = DeltaScanConfigBuilder::new()
1050 .build(table.snapshot().unwrap().snapshot())
1051 .unwrap();
1052 let log = table.log_store();
1053
1054 let provider =
1055 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1056 .unwrap();
1057 let ctx: SessionContext = DeltaSessionContext::default().into();
1058 ctx.register_table("test", Arc::new(provider)).unwrap();
1059
1060 let df = ctx.sql("select col_1, col_2 from test").await.unwrap();
1061 let actual = df.collect().await.unwrap();
1062 let expected = vec![
1063 "+-------+-------+",
1064 "| col_1 | col_2 |",
1065 "+-------+-------+",
1066 "| A | |",
1067 "| B | |",
1068 "| E | E2 |",
1069 "| F | F2 |",
1070 "| G | G2 |",
1071 "+-------+-------+",
1072 ];
1073 assert_batches_sorted_eq!(&expected, &actual);
1074 }
1075
1076 #[tokio::test]
1077 async fn delta_scan_supports_pushdown() {
1078 let schema = Arc::new(ArrowSchema::new(vec![
1079 Field::new("col_1", ArrowDataType::Utf8, false),
1080 Field::new("col_2", ArrowDataType::Utf8, false),
1081 ]));
1082
1083 let batch = RecordBatch::try_new(
1084 schema.clone(),
1085 vec![
1086 Arc::new(arrow::array::StringArray::from(vec![
1087 Some("A"),
1088 Some("B"),
1089 Some("C"),
1090 ])),
1091 Arc::new(arrow::array::StringArray::from(vec![
1092 Some("A2"),
1093 Some("B2"),
1094 Some("C2"),
1095 ])),
1096 ],
1097 )
1098 .unwrap();
1099
1100 let table = DeltaTable::new_in_memory()
1101 .write(vec![batch])
1102 .with_save_mode(crate::protocol::SaveMode::Append)
1103 .await
1104 .unwrap();
1105
1106 let config = DeltaScanConfigBuilder::new()
1107 .build(table.snapshot().unwrap().snapshot())
1108 .unwrap();
1109 let log = table.log_store();
1110
1111 let provider =
1112 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1113 .unwrap();
1114
1115 let mut cfg = SessionConfig::default();
1116 cfg.options_mut().execution.parquet.pushdown_filters = true;
1117 let ctx = SessionContext::new_with_config(cfg);
1118 ctx.register_table("test", Arc::new(provider)).unwrap();
1119
1120 let df = ctx
1121 .sql("select col_1, col_2 from test WHERE col_1 = 'A'")
1122 .await
1123 .unwrap();
1124 let actual = df.collect().await.unwrap();
1125 let expected = vec![
1126 "+-------+-------+",
1127 "| col_1 | col_2 |",
1128 "+-------+-------+",
1129 "| A | A2 |",
1130 "+-------+-------+",
1131 ];
1132 assert_batches_sorted_eq!(&expected, &actual);
1133 }
1134
1135 #[tokio::test]
1136 async fn delta_scan_supports_nested_missing_columns() {
1137 let column1_schema1: arrow::datatypes::Fields =
1138 vec![Field::new("col_1a", ArrowDataType::Utf8, true)].into();
1139 let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1140 "col_1",
1141 ArrowDataType::Struct(column1_schema1.clone()),
1142 true,
1143 )]));
1144
1145 let batch1 = RecordBatch::try_new(
1146 schema1.clone(),
1147 vec![Arc::new(StructArray::new(
1148 column1_schema1,
1149 vec![Arc::new(arrow::array::StringArray::from(vec![
1150 Some("A"),
1151 Some("B"),
1152 ]))],
1153 None,
1154 ))],
1155 )
1156 .unwrap();
1157
1158 let column1_schema2: arrow_schema::Fields = vec![
1159 Field::new("col_1a", ArrowDataType::Utf8, true),
1160 Field::new("col_1b", ArrowDataType::Utf8, true),
1161 ]
1162 .into();
1163 let schema2 = Arc::new(ArrowSchema::new(vec![Field::new(
1164 "col_1",
1165 ArrowDataType::Struct(column1_schema2.clone()),
1166 true,
1167 )]));
1168
1169 let batch2 = RecordBatch::try_new(
1170 schema2.clone(),
1171 vec![Arc::new(StructArray::new(
1172 column1_schema2,
1173 vec![
1174 Arc::new(arrow::array::StringArray::from(vec![
1175 Some("E"),
1176 Some("F"),
1177 Some("G"),
1178 ])),
1179 Arc::new(arrow::array::StringArray::from(vec![
1180 Some("E2"),
1181 Some("F2"),
1182 Some("G2"),
1183 ])),
1184 ],
1185 None,
1186 ))],
1187 )
1188 .unwrap();
1189
1190 let table = DeltaTable::new_in_memory()
1191 .write(vec![batch1])
1192 .with_save_mode(crate::protocol::SaveMode::Append)
1193 .await
1194 .unwrap();
1195
1196 let table = table
1197 .write(vec![batch2])
1198 .with_schema_mode(SchemaMode::Merge)
1199 .with_save_mode(crate::protocol::SaveMode::Append)
1200 .await
1201 .unwrap();
1202
1203 let config = DeltaScanConfigBuilder::new()
1204 .build(table.snapshot().unwrap().snapshot())
1205 .unwrap();
1206 let log = table.log_store();
1207
1208 let provider =
1209 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1210 .unwrap();
1211 let ctx: SessionContext = DeltaSessionContext::default().into();
1212 ctx.register_table("test", Arc::new(provider)).unwrap();
1213
1214 let df = ctx
1215 .sql("select col_1.col_1a, col_1.col_1b from test")
1216 .await
1217 .unwrap();
1218 let actual = df.collect().await.unwrap();
1219 let expected = vec![
1220 "+--------------------+--------------------+",
1221 "| test.col_1[col_1a] | test.col_1[col_1b] |",
1222 "+--------------------+--------------------+",
1223 "| A | |",
1224 "| B | |",
1225 "| E | E2 |",
1226 "| F | F2 |",
1227 "| G | G2 |",
1228 "+--------------------+--------------------+",
1229 ];
1230 assert_batches_sorted_eq!(&expected, &actual);
1231 }
1232
1233 #[tokio::test]
1234 async fn test_multiple_predicate_pushdown() {
1235 let schema = Arc::new(ArrowSchema::new(vec![
1236 Field::new("moDified", ArrowDataType::Utf8, true),
1237 Field::new("id", ArrowDataType::Utf8, true),
1238 Field::new("vaLue", ArrowDataType::Int32, true),
1239 ]));
1240
1241 let batch = RecordBatch::try_new(
1242 schema.clone(),
1243 vec![
1244 Arc::new(arrow::array::StringArray::from(vec![
1245 "2021-02-01",
1246 "2021-02-01",
1247 "2021-02-02",
1248 "2021-02-02",
1249 ])),
1250 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1251 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1252 ],
1253 )
1254 .unwrap();
1255 let table = DeltaTable::new_in_memory()
1257 .write(vec![batch.clone()])
1258 .with_save_mode(crate::protocol::SaveMode::Append)
1259 .await
1260 .unwrap();
1261
1262 let datafusion = SessionContext::new();
1263 table
1264 .update_datafusion_session(&datafusion.state())
1265 .unwrap();
1266
1267 datafusion
1268 .register_table("snapshot", table.table_provider().await.unwrap())
1269 .unwrap();
1270
1271 let df = datafusion
1272 .sql("select * from snapshot where id > 10000 and id < 20000")
1273 .await
1274 .unwrap();
1275
1276 df.collect().await.unwrap();
1277 }
1278
1279 #[tokio::test]
1280 async fn test_delta_scan_builder_no_scan_config() {
1281 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1282 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1283 let table = DeltaTable::new_in_memory()
1284 .write(vec![batch])
1285 .with_save_mode(crate::protocol::SaveMode::Append)
1286 .await
1287 .unwrap();
1288
1289 let ctx = SessionContext::new();
1290 let state = ctx.state();
1291 let scan = DeltaScanBuilder::new(
1292 table.snapshot().unwrap().snapshot(),
1293 table.log_store(),
1294 &state,
1295 )
1296 .with_filter(Some(col("a").eq(lit("s"))))
1297 .build()
1298 .await
1299 .unwrap();
1300
1301 let mut visitor = ParquetVisitor::default();
1302 visit_execution_plan(&scan, &mut visitor).unwrap();
1303
1304 assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
1305 }
1306
1307 #[tokio::test]
1308 async fn test_delta_scan_builder_scan_config_disable_pushdown() {
1309 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1310 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1311 let table = DeltaTable::new_in_memory()
1312 .write(vec![batch])
1313 .with_save_mode(crate::protocol::SaveMode::Append)
1314 .await
1315 .unwrap();
1316
1317 let snapshot = table.snapshot().unwrap();
1318 let ctx = SessionContext::new();
1319 let state = ctx.state();
1320 let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1321 .with_filter(Some(col("a").eq(lit("s"))))
1322 .with_scan_config(
1323 DeltaScanConfigBuilder::new()
1324 .with_parquet_pushdown(false)
1325 .build(snapshot.snapshot())
1326 .unwrap(),
1327 )
1328 .build()
1329 .await
1330 .unwrap();
1331
1332 let mut visitor = ParquetVisitor::default();
1333 visit_execution_plan(&scan, &mut visitor).unwrap();
1334
1335 assert!(visitor.predicate.is_none());
1336 }
1337
1338 #[tokio::test]
1339 async fn test_delta_scan_applies_parquet_options() {
1340 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1341 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1342 let table = DeltaTable::new_in_memory()
1343 .write(vec![batch])
1344 .with_save_mode(crate::protocol::SaveMode::Append)
1345 .await
1346 .unwrap();
1347
1348 let snapshot = table.snapshot().unwrap();
1349
1350 let mut config = SessionConfig::default();
1351 config.options_mut().execution.parquet.pushdown_filters = true;
1352 let ctx = SessionContext::new_with_config(config);
1353 let state = ctx.state();
1354
1355 let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1356 .build()
1357 .await
1358 .unwrap();
1359
1360 let mut visitor = ParquetVisitor::default();
1361 visit_execution_plan(&scan, &mut visitor).unwrap();
1362
1363 assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
1364 }
1365
1366 #[derive(Default)]
1368 struct ParquetVisitor {
1369 predicate: Option<Arc<dyn PhysicalExpr>>,
1370 options: Option<TableParquetOptions>,
1371 }
1372
1373 impl ExecutionPlanVisitor for ParquetVisitor {
1374 type Error = DataFusionError;
1375
1376 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
1377 let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
1378 return Ok(true);
1379 };
1380
1381 let Some(scan_config) = datasource_exec
1382 .data_source()
1383 .as_any()
1384 .downcast_ref::<FileScanConfig>()
1385 else {
1386 return Ok(true);
1387 };
1388
1389 if let Some(parquet_source) = scan_config
1390 .file_source
1391 .as_any()
1392 .downcast_ref::<ParquetSource>()
1393 {
1394 self.options = Some(parquet_source.table_parquet_options().clone());
1395 self.predicate = parquet_source.filter();
1396 }
1397
1398 Ok(true)
1399 }
1400 }
1401
1402 #[tokio::test]
1408 async fn passes_sanity_checker_when_all_files_filtered() {
1409 let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
1410 let ctx = create_session().into_inner();
1411 ctx.register_table("test", table.table_provider().await.unwrap())
1412 .unwrap();
1413
1414 let df = ctx
1415 .sql("select * from test where c3 = 100 ORDER BY c1 ASC")
1416 .await
1417 .unwrap();
1418 let actual = df.collect().await.unwrap();
1419
1420 assert_eq!(actual.len(), 0);
1421 }
1422
1423 #[tokio::test]
1424 async fn test_delta_scan_uses_parquet_column_pruning() {
1425 let small: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["a"]));
1426 let large: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec![
1427 "b".repeat(1024).as_str(),
1428 ]));
1429 let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap();
1430 let table = DeltaTable::new_in_memory()
1431 .write(vec![batch])
1432 .with_save_mode(crate::protocol::SaveMode::Append)
1433 .await
1434 .unwrap();
1435
1436 let config = DeltaScanConfigBuilder::new()
1437 .build(table.snapshot().unwrap().snapshot())
1438 .unwrap();
1439
1440 let (object_store, mut operations) =
1441 RecordingObjectStore::new(table.log_store().object_store(None));
1442 let both_store = Arc::new(object_store);
1444 let log_store = DefaultLogStore::new(
1445 both_store.clone(),
1446 both_store,
1447 table.log_store().config().clone(),
1448 );
1449 let provider = DeltaTableProvider::try_new(
1450 table.snapshot().unwrap().snapshot().clone(),
1451 Arc::new(log_store),
1452 config,
1453 )
1454 .unwrap();
1455 let ctx = SessionContext::new();
1456 ctx.register_table("test", Arc::new(provider)).unwrap();
1457 let state = ctx.state();
1458
1459 let df = ctx.sql("select small from test").await.unwrap();
1460 let plan = df.create_physical_plan().await.unwrap();
1461
1462 let mut stream = plan.execute(0, state.task_ctx()).unwrap();
1463 let Some(Ok(batch)) = stream.next().await else {
1464 panic!()
1465 };
1466 assert!(stream.next().await.is_none());
1467 assert_eq!(1, batch.num_columns());
1468 assert_eq!(1, batch.num_rows());
1469 let small = batch.column_by_name("small").unwrap().as_string::<i32>();
1470 assert_eq!("a", small.iter().next().unwrap().unwrap());
1471
1472 let expected = vec![
1473 ObjectStoreOperation::Get(LocationType::Commit),
1474 ObjectStoreOperation::GetRange(LocationType::Data, 957..965),
1475 ObjectStoreOperation::GetRange(LocationType::Data, 326..957),
1476 ];
1477 let mut actual = Vec::new();
1478 operations.recv_many(&mut actual, 3).await;
1479 assert_eq!(expected, actual);
1480 }
1481
1482 #[tokio::test]
1483 async fn test_push_down_filter_panic_2602() -> DeltaResult<()> {
1484 use crate::kernel::schema::{DataType, PrimitiveType};
1485 let ctx = SessionContext::new();
1486 let table = DeltaTable::new_in_memory()
1487 .create()
1488 .with_column("id", DataType::Primitive(PrimitiveType::Long), true, None)
1489 .with_column(
1490 "name",
1491 DataType::Primitive(PrimitiveType::String),
1492 true,
1493 None,
1494 )
1495 .with_column("b", DataType::Primitive(PrimitiveType::Boolean), true, None)
1496 .with_column(
1497 "ts",
1498 DataType::Primitive(PrimitiveType::Timestamp),
1499 true,
1500 None,
1501 )
1502 .with_column("dt", DataType::Primitive(PrimitiveType::Date), true, None)
1503 .with_column(
1504 "zap",
1505 DataType::Array(Box::new(ArrayType::new(
1506 DataType::Primitive(PrimitiveType::Boolean),
1507 true,
1508 ))),
1509 true,
1510 None,
1511 )
1512 .await?;
1513 table.update_datafusion_session(&ctx.state()).unwrap();
1514
1515 ctx.register_table("snapshot", table.table_provider().await.unwrap())
1516 .unwrap();
1517
1518 let df = ctx
1519 .sql("select * from snapshot where id > 10000 and id < 20000")
1520 .await
1521 .unwrap();
1522
1523 let _ = df.collect().await?;
1524 Ok(())
1525 }
1526
1527 struct RecordingObjectStore {
1529 inner: ObjectStoreRef,
1530 operations: UnboundedSender<ObjectStoreOperation>,
1531 }
1532
1533 impl RecordingObjectStore {
1534 fn new(inner: ObjectStoreRef) -> (Self, UnboundedReceiver<ObjectStoreOperation>) {
1536 let (operations, operations_receiver) = unbounded_channel();
1537 (Self { inner, operations }, operations_receiver)
1538 }
1539 }
1540
1541 impl Display for RecordingObjectStore {
1542 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1543 Display::fmt(&self.inner, f)
1544 }
1545 }
1546
1547 impl Debug for RecordingObjectStore {
1548 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1549 Debug::fmt(&self.inner, f)
1550 }
1551 }
1552
1553 #[derive(Debug, PartialEq)]
1554 enum ObjectStoreOperation {
1555 GetRanges(LocationType, Vec<Range<u64>>),
1556 GetRange(LocationType, Range<u64>),
1557 GetOpts(LocationType),
1558 Get(LocationType),
1559 }
1560
1561 #[derive(Debug, PartialEq)]
1562 enum LocationType {
1563 Data,
1564 Commit,
1565 }
1566
1567 impl From<&Path> for LocationType {
1568 fn from(value: &Path) -> Self {
1569 let dummy_url = Url::parse("dummy:///").unwrap();
1570 let parsed = ParsedLogPath::try_from(dummy_url.join(value.as_ref()).unwrap()).unwrap();
1571 if let Some(parsed) = parsed
1572 && matches!(parsed.file_type, LogPathFileType::Commit)
1573 {
1574 return LocationType::Commit;
1575 }
1576 if value.to_string().starts_with("part-") {
1577 LocationType::Data
1578 } else {
1579 panic!("Unknown location type: {value:?}")
1580 }
1581 }
1582 }
1583
1584 #[async_trait::async_trait]
1586 impl ObjectStore for RecordingObjectStore {
1587 async fn put(
1588 &self,
1589 location: &Path,
1590 payload: PutPayload,
1591 ) -> object_store::Result<PutResult> {
1592 self.inner.put(location, payload).await
1593 }
1594
1595 async fn put_opts(
1596 &self,
1597 location: &Path,
1598 payload: PutPayload,
1599 opts: PutOptions,
1600 ) -> object_store::Result<PutResult> {
1601 self.inner.put_opts(location, payload, opts).await
1602 }
1603
1604 async fn put_multipart(
1605 &self,
1606 location: &Path,
1607 ) -> object_store::Result<Box<dyn MultipartUpload>> {
1608 self.inner.put_multipart(location).await
1609 }
1610
1611 async fn put_multipart_opts(
1612 &self,
1613 location: &Path,
1614 opts: PutMultipartOptions,
1615 ) -> object_store::Result<Box<dyn MultipartUpload>> {
1616 self.inner.put_multipart_opts(location, opts).await
1617 }
1618
1619 async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
1620 self.operations
1621 .send(ObjectStoreOperation::Get(location.into()))
1622 .unwrap();
1623 self.inner.get(location).await
1624 }
1625
1626 async fn get_opts(
1627 &self,
1628 location: &Path,
1629 options: GetOptions,
1630 ) -> object_store::Result<GetResult> {
1631 self.operations
1632 .send(ObjectStoreOperation::GetOpts(location.into()))
1633 .unwrap();
1634 self.inner.get_opts(location, options).await
1635 }
1636
1637 async fn get_range(
1638 &self,
1639 location: &Path,
1640 range: Range<u64>,
1641 ) -> object_store::Result<Bytes> {
1642 self.operations
1643 .send(ObjectStoreOperation::GetRange(
1644 location.into(),
1645 range.clone(),
1646 ))
1647 .unwrap();
1648 self.inner.get_range(location, range).await
1649 }
1650
1651 async fn get_ranges(
1652 &self,
1653 location: &Path,
1654 ranges: &[Range<u64>],
1655 ) -> object_store::Result<Vec<Bytes>> {
1656 self.operations
1657 .send(ObjectStoreOperation::GetRanges(
1658 location.into(),
1659 ranges.to_vec(),
1660 ))
1661 .unwrap();
1662 self.inner.get_ranges(location, ranges).await
1663 }
1664
1665 async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
1666 self.inner.head(location).await
1667 }
1668
1669 async fn delete(&self, location: &Path) -> object_store::Result<()> {
1670 self.inner.delete(location).await
1671 }
1672
1673 fn delete_stream<'a>(
1674 &'a self,
1675 locations: BoxStream<'a, object_store::Result<Path>>,
1676 ) -> BoxStream<'a, object_store::Result<Path>> {
1677 self.inner.delete_stream(locations)
1678 }
1679
1680 fn list(
1681 &self,
1682 prefix: Option<&Path>,
1683 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1684 self.inner.list(prefix)
1685 }
1686
1687 fn list_with_offset(
1688 &self,
1689 prefix: Option<&Path>,
1690 offset: &Path,
1691 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1692 self.inner.list_with_offset(prefix, offset)
1693 }
1694
1695 async fn list_with_delimiter(
1696 &self,
1697 prefix: Option<&Path>,
1698 ) -> object_store::Result<ListResult> {
1699 self.inner.list_with_delimiter(prefix).await
1700 }
1701
1702 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1703 self.inner.copy(from, to).await
1704 }
1705
1706 async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1707 self.inner.rename(from, to).await
1708 }
1709
1710 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1711 self.inner.copy_if_not_exists(from, to).await
1712 }
1713
1714 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1715 self.inner.rename_if_not_exists(from, to).await
1716 }
1717 }
1718}