1use std::fmt::Debug;
27use std::sync::Arc;
28
29use arrow::array::types::UInt16Type;
30use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
31use arrow_cast::{CastOptions, cast_with_options};
32use arrow_schema::{
33 DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef,
34 TimeUnit,
35};
36use datafusion::catalog::{Session, TableProviderFactory};
37use datafusion::common::scalar::ScalarValue;
38use datafusion::common::{
39 Column, DFSchema, DataFusionError, Result as DataFusionResult, TableReference, ToDFSchema,
40};
41use datafusion::datasource::TableProvider;
42use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
43use datafusion::execution::TaskContext;
44use datafusion::execution::context::SessionContext;
45use datafusion::logical_expr::logical_plan::CreateExternalTable;
46use datafusion::logical_expr::utils::conjunction;
47use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
48use datafusion::physical_optimizer::pruning::PruningPredicate;
49use datafusion::physical_plan::ExecutionPlan;
50use datafusion_proto::logical_plan::LogicalExtensionCodec;
51use datafusion_proto::physical_plan::PhysicalExtensionCodec;
52use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
53use either::Either;
54
55use crate::delta_datafusion::expr::parse_predicate_expression;
56use crate::delta_datafusion::table_provider::DeltaScanWire;
57use crate::ensure_table_uri;
58use crate::errors::{DeltaResult, DeltaTableError};
59use crate::kernel::{Add, EagerSnapshot, LogDataHandler, Snapshot};
60use crate::{open_table, open_table_with_storage_options};
61
62pub(crate) use self::session::DeltaSessionExt;
63pub use self::session::{
64 DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, DeltaSessionContext,
65 create_session,
66};
67pub use self::table_provider::next::{DeletionVectorSelection, DeltaScan as DeltaScanNext};
68pub(crate) use self::utils::*;
69pub use cdf::scan::DeltaCdfTableProvider;
70pub(crate) use data_validation::{
71 DataValidationExec, constraints_to_exprs, generated_columns_to_exprs, validation_predicates,
72};
73pub(crate) use find_files::*;
74pub use table_provider::{
75 DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder,
76 next::DeltaScanExec,
77};
78pub(crate) use table_provider::{
79 DeltaScanBuilder, next::FILE_ID_COLUMN_DEFAULT, update_datafusion_session,
80};
81
82pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
83
84pub mod cdf;
85mod data_validation;
86pub mod engine;
87pub mod expr;
88mod file_id;
89mod find_files;
90pub mod logical;
91pub mod physical;
92pub mod planner;
93mod session;
94pub use session::SessionFallbackPolicy;
95pub(crate) use session::{SessionResolveContext, resolve_session_state};
96mod table_provider;
97pub(crate) mod utils;
98
99impl From<DeltaTableError> for DataFusionError {
100 fn from(err: DeltaTableError) -> Self {
101 match err {
102 DeltaTableError::Arrow { source } => DataFusionError::from(source),
103 DeltaTableError::Io { source } => DataFusionError::IoError(source),
104 DeltaTableError::ObjectStore { source } => DataFusionError::from(source),
105 DeltaTableError::Parquet { source } => DataFusionError::from(source),
106 _ => DataFusionError::External(Box::new(err)),
107 }
108 }
109}
110
111impl From<DataFusionError> for DeltaTableError {
112 fn from(err: DataFusionError) -> Self {
113 match err {
114 DataFusionError::ArrowError(source, _) => DeltaTableError::from(*source),
115 DataFusionError::IoError(source) => DeltaTableError::Io { source },
116 DataFusionError::ObjectStore(source) => DeltaTableError::from(*source),
117 DataFusionError::ParquetError(source) => DeltaTableError::from(*source),
118 _ => DeltaTableError::Generic(err.to_string()),
119 }
120 }
121}
122
123pub trait DataFusionMixins {
125 fn read_schema(&self) -> ArrowSchemaRef;
127
128 fn input_schema(&self) -> ArrowSchemaRef;
130
131 fn parse_predicate_expression(
133 &self,
134 expr: impl AsRef<str>,
135 session: &dyn Session,
136 ) -> DeltaResult<Expr>;
137}
138
139impl DataFusionMixins for Snapshot {
140 fn read_schema(&self) -> ArrowSchemaRef {
141 _arrow_schema(
142 self.arrow_schema(),
143 self.metadata().partition_columns(),
144 true,
145 )
146 }
147
148 fn input_schema(&self) -> ArrowSchemaRef {
149 _arrow_schema(
150 self.arrow_schema(),
151 self.metadata().partition_columns(),
152 false,
153 )
154 }
155
156 fn parse_predicate_expression(
157 &self,
158 expr: impl AsRef<str>,
159 session: &dyn Session,
160 ) -> DeltaResult<Expr> {
161 let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
162 parse_predicate_expression(&schema, expr, session)
163 }
164}
165
166impl DataFusionMixins for LogDataHandler<'_> {
167 fn read_schema(&self) -> ArrowSchemaRef {
168 _arrow_schema(
169 Arc::new(
170 self.table_configuration()
171 .schema()
172 .as_ref()
173 .try_into_arrow()
174 .unwrap(),
175 ),
176 self.table_configuration().metadata().partition_columns(),
177 true,
178 )
179 }
180
181 fn input_schema(&self) -> ArrowSchemaRef {
182 _arrow_schema(
183 Arc::new(
184 self.table_configuration()
185 .schema()
186 .as_ref()
187 .try_into_arrow()
188 .unwrap(),
189 ),
190 self.table_configuration().metadata().partition_columns(),
191 false,
192 )
193 }
194
195 fn parse_predicate_expression(
196 &self,
197 expr: impl AsRef<str>,
198 session: &dyn Session,
199 ) -> DeltaResult<Expr> {
200 let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
201 parse_predicate_expression(&schema, expr, session)
202 }
203}
204
205impl DataFusionMixins for EagerSnapshot {
206 fn read_schema(&self) -> ArrowSchemaRef {
207 self.snapshot().read_schema()
208 }
209
210 fn input_schema(&self) -> ArrowSchemaRef {
211 self.snapshot().input_schema()
212 }
213
214 fn parse_predicate_expression(
215 &self,
216 expr: impl AsRef<str>,
217 session: &dyn Session,
218 ) -> DeltaResult<Expr> {
219 self.snapshot().parse_predicate_expression(expr, session)
220 }
221}
222
223fn _arrow_schema(
224 schema: SchemaRef,
225 partition_columns: &[String],
226 wrap_partitions: bool,
227) -> ArrowSchemaRef {
228 let fields = schema
229 .fields()
230 .into_iter()
231 .filter(|f| !partition_columns.contains(&f.name().to_string()))
232 .cloned()
233 .chain(
234 partition_columns.iter().map(|partition_col| {
237 let field = schema.field_with_name(partition_col).unwrap();
238 let corrected = if wrap_partitions {
239 match field.data_type() {
240 ArrowDataType::Utf8
243 | ArrowDataType::LargeUtf8
244 | ArrowDataType::Binary
245 | ArrowDataType::LargeBinary => {
246 wrap_partition_type_in_dict(field.data_type().clone())
247 }
248 _ => field.data_type().clone(),
249 }
250 } else {
251 field.data_type().clone()
252 };
253 Arc::new(field.clone().with_data_type(corrected))
254 }),
255 )
256 .collect::<Vec<_>>();
257 Arc::new(ArrowSchema::new(fields))
258}
259
260pub(crate) fn files_matching_predicate<'a>(
261 log_data: LogDataHandler<'a>,
262 filters: &[Expr],
263) -> DeltaResult<impl Iterator<Item = Add> + 'a> {
264 if let Some(Some(predicate)) =
265 (!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
266 {
267 let expr = SessionContext::new()
268 .create_physical_expr(predicate, &log_data.read_schema().to_dfschema()?)?;
269 let pruning_predicate = PruningPredicate::try_new(expr, log_data.read_schema())?;
270 let mask = pruning_predicate.prune(&log_data)?;
271
272 Ok(Either::Left(log_data.into_iter().zip(mask).filter_map(
273 |(file, keep_file)| {
274 if keep_file {
275 Some(file.add_action())
276 } else {
277 None
278 }
279 },
280 )))
281 } else {
282 Ok(Either::Right(
283 log_data.into_iter().map(|file| file.add_action()),
284 ))
285 }
286}
287
288pub(crate) fn get_path_column<'a>(
289 batch: &'a RecordBatch,
290 path_column: &str,
291) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
292 let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
293 batch
294 .column_by_name(path_column)
295 .unwrap()
296 .as_any()
297 .downcast_ref::<DictionaryArray<UInt16Type>>()
298 .ok_or_else(err)?
299 .downcast_dict::<StringArray>()
300 .ok_or_else(err)
301}
302
303pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
304 match t {
305 ArrowDataType::Null => Ok(ScalarValue::Null),
306 ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
307 ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
308 ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
309 ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
310 ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
311 ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
312 ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
313 ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
314 ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
315 ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
316 ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
317 ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
318 ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
319 ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
320 ArrowDataType::FixedSizeBinary(size) => {
321 Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
322 }
323 ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
324 ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
325 ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
326 ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
327 None,
328 precision.to_owned(),
329 scale.to_owned(),
330 )),
331 ArrowDataType::Timestamp(unit, tz) => {
332 let tz = tz.to_owned();
333 Ok(match unit {
334 TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
335 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
336 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
337 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
338 })
339 }
340 ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
341 k.clone(),
342 Box::new(get_null_of_arrow_type(v)?),
343 )),
344 ArrowDataType::Float16
346 | ArrowDataType::Decimal32(_, _)
347 | ArrowDataType::Decimal64(_, _)
348 | ArrowDataType::Decimal256(_, _)
349 | ArrowDataType::Union(_, _)
350 | ArrowDataType::LargeList(_)
351 | ArrowDataType::Struct(_)
352 | ArrowDataType::List(_)
353 | ArrowDataType::FixedSizeList(_, _)
354 | ArrowDataType::Time32(_)
355 | ArrowDataType::Time64(_)
356 | ArrowDataType::Duration(_)
357 | ArrowDataType::Interval(_)
358 | ArrowDataType::RunEndEncoded(_, _)
359 | ArrowDataType::BinaryView
360 | ArrowDataType::Utf8View
361 | ArrowDataType::LargeListView(_)
362 | ArrowDataType::ListView(_)
363 | ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
364 "Unsupported data type for Delta Lake {t}"
365 ))),
366 }
367}
368
369fn parse_date(
370 stat_val: &serde_json::Value,
371 field_dt: &ArrowDataType,
372) -> DataFusionResult<ScalarValue> {
373 let string = match stat_val {
374 serde_json::Value::String(s) => s.to_owned(),
375 _ => stat_val.to_string(),
376 };
377
378 let time_micro = ScalarValue::try_from_string(string, &ArrowDataType::Date32)?;
379 let cast_arr = cast_with_options(
380 &time_micro.to_array()?,
381 field_dt,
382 &CastOptions {
383 safe: false,
384 ..Default::default()
385 },
386 )?;
387 ScalarValue::try_from_array(&cast_arr, 0)
388}
389
390fn parse_timestamp(
391 stat_val: &serde_json::Value,
392 field_dt: &ArrowDataType,
393) -> DataFusionResult<ScalarValue> {
394 let string = match stat_val {
395 serde_json::Value::String(s) => s.to_owned(),
396 _ => stat_val.to_string(),
397 };
398
399 let time_micro = ScalarValue::try_from_string(
400 string,
401 &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
402 )?;
403 let cast_arr = cast_with_options(
404 &time_micro.to_array()?,
405 field_dt,
406 &CastOptions {
407 safe: false,
408 ..Default::default()
409 },
410 )?;
411 ScalarValue::try_from_array(&cast_arr, 0)
412}
413
414pub(crate) fn to_correct_scalar_value(
415 stat_val: &serde_json::Value,
416 field_dt: &ArrowDataType,
417) -> DataFusionResult<Option<ScalarValue>> {
418 match stat_val {
419 serde_json::Value::Array(_) => Ok(None),
420 serde_json::Value::Object(_) => Ok(None),
421 serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)),
422 serde_json::Value::String(string_val) => match field_dt {
423 ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
424 ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
425 _ => Ok(Some(ScalarValue::try_from_string(
426 string_val.to_owned(),
427 field_dt,
428 )?)),
429 },
430 other => match field_dt {
431 ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
432 ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
433 _ => Ok(Some(ScalarValue::try_from_string(
434 other.to_string(),
435 field_dt,
436 )?)),
437 },
438 }
439}
440
441#[derive(Debug)]
443pub struct DeltaPhysicalCodec {}
444
445impl PhysicalExtensionCodec for DeltaPhysicalCodec {
446 fn try_decode(
447 &self,
448 buf: &[u8],
449 inputs: &[Arc<dyn ExecutionPlan>],
450 _registry: &TaskContext,
451 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
452 let wire: DeltaScanWire = serde_json::from_reader(buf)
453 .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
454 let delta_scan = DeltaScan::new(
455 &wire.table_url,
456 wire.config,
457 (*inputs)[0].clone(),
458 wire.logical_schema,
459 );
460 Ok(Arc::new(delta_scan))
461 }
462
463 fn try_encode(
464 &self,
465 node: Arc<dyn ExecutionPlan>,
466 buf: &mut Vec<u8>,
467 ) -> Result<(), DataFusionError> {
468 let delta_scan = node
469 .as_any()
470 .downcast_ref::<DeltaScan>()
471 .ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?;
472
473 let wire = DeltaScanWire::from(delta_scan);
474 serde_json::to_writer(buf, &wire)
475 .map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?;
476 Ok(())
477 }
478}
479
480#[derive(Debug)]
482pub struct DeltaLogicalCodec {}
483
484impl LogicalExtensionCodec for DeltaLogicalCodec {
485 fn try_decode(
486 &self,
487 _buf: &[u8],
488 _inputs: &[LogicalPlan],
489 _ctx: &TaskContext,
490 ) -> Result<Extension, DataFusionError> {
491 todo!("DeltaLogicalCodec")
492 }
493
494 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<(), DataFusionError> {
495 todo!("DeltaLogicalCodec")
496 }
497
498 fn try_decode_table_provider(
499 &self,
500 buf: &[u8],
501 _table_ref: &TableReference,
502 _schema: SchemaRef,
503 _ctx: &TaskContext,
504 ) -> Result<Arc<dyn TableProvider>, DataFusionError> {
505 let provider: DeltaScanNext = serde_json::from_slice(buf)
506 .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))?;
507 Ok(Arc::new(provider))
508 }
509
510 fn try_encode_table_provider(
511 &self,
512 _table_ref: &TableReference,
513 node: Arc<dyn TableProvider>,
514 buf: &mut Vec<u8>,
515 ) -> Result<(), DataFusionError> {
516 let scan = node
517 .as_ref()
518 .as_any()
519 .downcast_ref::<DeltaScanNext>()
520 .ok_or_else(|| {
521 DataFusionError::Internal("Can't encode non-delta tables".to_string())
522 })?;
523 serde_json::to_writer(buf, scan)
524 .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
525 }
526}
527
528#[derive(Debug)]
530pub struct DeltaTableFactory {}
531
532#[async_trait::async_trait]
533impl TableProviderFactory for DeltaTableFactory {
534 async fn create(
535 &self,
536 ctx: &dyn Session,
537 cmd: &CreateExternalTable,
538 ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
539 let table = if cmd.options.is_empty() {
540 let table_url = ensure_table_uri(&cmd.to_owned().location)?;
541 open_table(table_url).await?
542 } else {
543 let table_url = ensure_table_uri(&cmd.to_owned().location)?;
544 open_table_with_storage_options(table_url, cmd.to_owned().options).await?
545 };
546 let table_uri = table.log_store().root_url().clone();
547 let (session_state, _) = resolve_session_state(
548 Some(ctx),
549 SessionFallbackPolicy::DeriveFromTrait,
550 || create_session().state(),
551 SessionResolveContext {
552 operation: "DeltaTableFactory::create",
553 table_uri: Some(&table_uri),
554 cdc: false,
555 },
556 )?;
557
558 Ok(table
559 .table_provider()
560 .with_session(Arc::new(session_state))
561 .await?)
562 }
563}
564
565pub struct DeltaColumn {
567 inner: Column,
568}
569
570impl From<&str> for DeltaColumn {
571 fn from(c: &str) -> Self {
572 DeltaColumn {
573 inner: Column::from_qualified_name_ignore_case(c),
574 }
575 }
576}
577
578impl From<&String> for DeltaColumn {
580 fn from(c: &String) -> Self {
581 DeltaColumn {
582 inner: Column::from_qualified_name_ignore_case(c),
583 }
584 }
585}
586
587impl From<String> for DeltaColumn {
589 fn from(c: String) -> Self {
590 DeltaColumn {
591 inner: Column::from_qualified_name_ignore_case(c),
592 }
593 }
594}
595
596impl From<DeltaColumn> for Column {
597 fn from(value: DeltaColumn) -> Self {
598 value.inner
599 }
600}
601
602impl From<Column> for DeltaColumn {
604 fn from(c: Column) -> Self {
605 DeltaColumn { inner: c }
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use crate::DeltaTable;
612 use crate::logstore::ObjectStoreRef;
613 use crate::logstore::default_logstore::DefaultLogStore;
614 use crate::operations::write::SchemaMode;
615 use crate::test_utils::open_fs_path;
616 use crate::writer::test_utils::get_delta_schema;
617 use arrow::array::StructArray;
618 use arrow::datatypes::{Field, Schema};
619 use arrow_array::cast::AsArray;
620 use bytes::Bytes;
621 use datafusion::assert_batches_sorted_eq;
622 use datafusion::config::TableParquetOptions;
623 use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
624 use datafusion::datasource::source::DataSourceExec;
625 use datafusion::logical_expr::lit;
626 use datafusion::physical_plan::empty::EmptyExec;
627 use datafusion::physical_plan::{ExecutionPlanVisitor, PhysicalExpr, visit_execution_plan};
628 use datafusion::prelude::{SessionConfig, col};
629 use datafusion_datasource::file::FileSource as _;
630 use datafusion_proto::physical_plan::AsExecutionPlan;
631 use datafusion_proto::protobuf;
632 use delta_kernel::path::{LogPathFileType, ParsedLogPath};
633 use delta_kernel::schema::ArrayType;
634 use futures::{StreamExt, TryStreamExt, stream::BoxStream};
635 use object_store::ObjectMeta;
636 use object_store::{
637 GetOptions, GetResult, ListResult, MultipartUpload, ObjectStore, PutMultipartOptions,
638 PutOptions, PutPayload, PutResult, path::Path,
639 };
640 use serde_json::json;
641 use std::fmt::{self, Debug, Display, Formatter};
642 use std::ops::Range;
643 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
644 use url::Url;
645
646 use super::*;
647 use crate::delta_datafusion::table_provider::next::{FileSelection, MissingFilePolicy};
648 #[test]
651 fn test_to_correct_scalar_value() {
652 let reference_pairs = &[
653 (
654 json!("2015"),
655 ArrowDataType::Int16,
656 ScalarValue::Int16(Some(2015)),
657 ),
658 (
659 json!("2015"),
660 ArrowDataType::Int32,
661 ScalarValue::Int32(Some(2015)),
662 ),
663 (
664 json!("2015"),
665 ArrowDataType::Int64,
666 ScalarValue::Int64(Some(2015)),
667 ),
668 (
669 json!("2015"),
670 ArrowDataType::Float32,
671 ScalarValue::Float32(Some(2015_f32)),
672 ),
673 (
674 json!("2015"),
675 ArrowDataType::Float64,
676 ScalarValue::Float64(Some(2015_f64)),
677 ),
678 (
679 json!(2015),
680 ArrowDataType::Float64,
681 ScalarValue::Float64(Some(2015_f64)),
682 ),
683 (
684 json!("2015-01-01"),
685 ArrowDataType::Date32,
686 ScalarValue::Date32(Some(16436)),
687 ),
688 (
710 json!(true),
711 ArrowDataType::Boolean,
712 ScalarValue::Boolean(Some(true)),
713 ),
714 ];
715
716 for (raw, data_type, ref_scalar) in reference_pairs {
717 let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap();
718 assert_eq!(*ref_scalar, scalar)
719 }
720 }
721
722 #[test]
723 fn roundtrip_test_delta_exec_plan() {
724 let ctx = SessionContext::new();
725 let codec = DeltaPhysicalCodec {};
726
727 let schema = Arc::new(Schema::new(vec![
728 Field::new("a", ArrowDataType::Utf8, false),
729 Field::new("b", ArrowDataType::Int32, false),
730 ]));
731 let exec_plan = Arc::from(DeltaScan::new(
732 &Url::parse("s3://my_bucket/this/is/some/path").unwrap(),
733 DeltaScanConfig::default(),
734 Arc::from(EmptyExec::new(schema.clone())),
735 schema.clone(),
736 ));
737 let proto: protobuf::PhysicalPlanNode =
738 protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
739 .expect("to proto");
740
741 let task_ctx = ctx.task_ctx();
742 let result_exec_plan: Arc<dyn ExecutionPlan> = proto
743 .try_into_physical_plan(&task_ctx, &codec)
744 .expect("from proto");
745 assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
746 }
747
748 #[tokio::test]
749 async fn roundtrip_test_delta_logical_codec_preserves_file_selection() {
750 let log_store = crate::test_utils::TestTables::Simple
751 .table_builder()
752 .unwrap()
753 .build_storage()
754 .unwrap();
755 let snapshot = Arc::new(
756 crate::kernel::Snapshot::try_new(&log_store, Default::default(), None)
757 .await
758 .unwrap(),
759 );
760 let table_root = snapshot
761 .scan_builder()
762 .build()
763 .unwrap()
764 .table_root()
765 .clone();
766 let selected_file_ids: Vec<String> = snapshot
767 .file_views(log_store.as_ref(), None)
768 .take(1)
769 .map_ok(|view| table_root.join(view.path_raw()).unwrap().to_string())
770 .try_collect()
771 .await
772 .unwrap();
773 assert_eq!(selected_file_ids.len(), 1);
774
775 let provider = DeltaScanNext::builder()
776 .with_snapshot(snapshot)
777 .build()
778 .await
779 .unwrap()
780 .with_file_selection(
781 FileSelection::new(selected_file_ids.clone())
782 .with_missing_file_policy(MissingFilePolicy::Ignore),
783 );
784
785 let codec = DeltaLogicalCodec {};
786 let table_ref = TableReference::bare("delta_table");
787 let mut encoded = Vec::new();
788 codec
789 .try_encode_table_provider(&table_ref, Arc::new(provider), &mut encoded)
790 .unwrap();
791
792 let ctx = SessionContext::new();
793 let decoded = codec
794 .try_decode_table_provider(
795 &encoded,
796 &table_ref,
797 Arc::new(ArrowSchema::empty()),
798 &ctx.task_ctx(),
799 )
800 .unwrap();
801 let decoded_provider = decoded
802 .as_ref()
803 .as_any()
804 .downcast_ref::<DeltaScanNext>()
805 .unwrap();
806
807 let serialized = serde_json::to_value(decoded_provider).unwrap();
808 let decoded_file_ids = serialized
809 .get("file_selection")
810 .and_then(|value| value.get("file_ids"))
811 .and_then(|value| value.as_array())
812 .unwrap()
813 .iter()
814 .map(|value| value.as_str().unwrap().to_string())
815 .collect::<Vec<_>>();
816 let decoded_policy = serialized
817 .get("file_selection")
818 .and_then(|value| value.get("missing_file_policy"))
819 .and_then(|value| value.as_str())
820 .unwrap();
821
822 assert_eq!(decoded_file_ids, selected_file_ids);
823 assert_eq!(decoded_policy, "Ignore");
824 }
825
826 #[tokio::test]
827 async fn delta_table_provider_with_config() {
828 let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
829 let provider = table
830 .table_provider()
831 .with_file_column("file_source")
832 .await
833 .unwrap();
834 let ctx = SessionContext::new();
835 ctx.register_table("test", provider).unwrap();
836
837 let df = ctx
838 .sql("select c3, c1, c2, right(file_source, 77) as file_source from test")
839 .await
840 .unwrap();
841 let actual = df.collect().await.unwrap();
842 let expected = vec![
843 "+----+----+----+-------------------------------------------------------------------------------+",
844 "| c3 | c1 | c2 | file_source |",
845 "+----+----+----+-------------------------------------------------------------------------------+",
846 "| 4 | 6 | a | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
847 "| 5 | 4 | c | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
848 "| 6 | 5 | b | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
849 "+----+----+----+-------------------------------------------------------------------------------+",
850 ];
851 assert_batches_sorted_eq!(&expected, &actual);
852 }
853
854 #[tokio::test]
855 async fn delta_scan_mixed_partition_order() {
856 let schema = Arc::new(ArrowSchema::new(vec![
859 Field::new("modified", ArrowDataType::Utf8, true),
860 Field::new("id", ArrowDataType::Utf8, true),
861 Field::new("value", ArrowDataType::Int32, true),
862 ]));
863
864 let table = DeltaTable::new_in_memory()
865 .create()
866 .with_columns(get_delta_schema().fields().cloned())
867 .with_partition_columns(["modified", "id"])
868 .await
869 .unwrap();
870 assert_eq!(table.version(), Some(0));
871
872 let batch = RecordBatch::try_new(
873 schema.clone(),
874 vec![
875 Arc::new(arrow::array::StringArray::from(vec![
876 "2021-02-01",
877 "2021-02-01",
878 "2021-02-02",
879 "2021-02-02",
880 ])),
881 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
882 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
883 ],
884 )
885 .unwrap();
886 let table = table
888 .write(vec![batch.clone()])
889 .with_save_mode(crate::protocol::SaveMode::Append)
890 .await
891 .unwrap();
892
893 let provider = table.table_provider().await.unwrap();
894 let logical_schema = provider.schema();
895 let ctx = SessionContext::new();
896 ctx.runtime_env().register_object_store(
897 table.log_store().root_url(),
898 table.log_store().object_store(None),
899 );
900 ctx.register_table("test", provider).unwrap();
901
902 let expected_logical_order = vec!["id", "value", "modified"];
903 let actual_order: Vec<String> = logical_schema
904 .fields()
905 .iter()
906 .map(|f| f.name().to_owned())
907 .collect();
908
909 let df = ctx.sql("select * from test").await.unwrap();
910 let actual = df.collect().await.unwrap();
911 let expected = vec![
912 "+----+-------+------------+",
913 "| id | value | modified |",
914 "+----+-------+------------+",
915 "| A | 1 | 2021-02-01 |",
916 "| B | 10 | 2021-02-01 |",
917 "| C | 20 | 2021-02-02 |",
918 "| D | 100 | 2021-02-02 |",
919 "+----+-------+------------+",
920 ];
921 assert_batches_sorted_eq!(&expected, &actual);
922 assert_eq!(expected_logical_order, actual_order);
923 }
924
925 #[tokio::test]
926 async fn delta_scan_case_sensitive() {
927 let schema = Arc::new(ArrowSchema::new(vec![
928 Field::new("moDified", ArrowDataType::Utf8, true),
929 Field::new("ID", ArrowDataType::Utf8, true),
930 Field::new("vaLue", ArrowDataType::Int32, true),
931 ]));
932
933 let batch = RecordBatch::try_new(
934 schema.clone(),
935 vec![
936 Arc::new(arrow::array::StringArray::from(vec![
937 "2021-02-01",
938 "2021-02-01",
939 "2021-02-02",
940 "2021-02-02",
941 ])),
942 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
943 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
944 ],
945 )
946 .unwrap();
947 let table = DeltaTable::new_in_memory()
949 .write(vec![batch.clone()])
950 .with_save_mode(crate::protocol::SaveMode::Append)
951 .await
952 .unwrap();
953
954 let config = DeltaScanConfigBuilder::new()
955 .build(table.snapshot().unwrap().snapshot())
956 .unwrap();
957 let log = table.log_store();
958
959 let provider =
960 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
961 .unwrap();
962 let ctx: SessionContext = DeltaSessionContext::default().into();
963 ctx.register_table("test", Arc::new(provider)).unwrap();
964
965 let df = ctx
966 .sql("select ID, moDified, vaLue from test")
967 .await
968 .unwrap();
969 let actual = df.collect().await.unwrap();
970 let expected = vec![
971 "+----+------------+-------+",
972 "| ID | moDified | vaLue |",
973 "+----+------------+-------+",
974 "| A | 2021-02-01 | 1 |",
975 "| B | 2021-02-01 | 10 |",
976 "| C | 2021-02-02 | 20 |",
977 "| D | 2021-02-02 | 100 |",
978 "+----+------------+-------+",
979 ];
980 assert_batches_sorted_eq!(&expected, &actual);
981
982 }
994
995 #[tokio::test]
996 async fn delta_scan_supports_missing_columns() {
997 let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
998 "col_1",
999 ArrowDataType::Utf8,
1000 true,
1001 )]));
1002
1003 let batch1 = RecordBatch::try_new(
1004 schema1.clone(),
1005 vec![Arc::new(arrow::array::StringArray::from(vec![
1006 Some("A"),
1007 Some("B"),
1008 ]))],
1009 )
1010 .unwrap();
1011
1012 let schema2 = Arc::new(ArrowSchema::new(vec![
1013 Field::new("col_1", ArrowDataType::Utf8, true),
1014 Field::new("col_2", ArrowDataType::Utf8, true),
1015 ]));
1016
1017 let batch2 = RecordBatch::try_new(
1018 schema2.clone(),
1019 vec![
1020 Arc::new(arrow::array::StringArray::from(vec![
1021 Some("E"),
1022 Some("F"),
1023 Some("G"),
1024 ])),
1025 Arc::new(arrow::array::StringArray::from(vec![
1026 Some("E2"),
1027 Some("F2"),
1028 Some("G2"),
1029 ])),
1030 ],
1031 )
1032 .unwrap();
1033
1034 let table = DeltaTable::new_in_memory()
1035 .write(vec![batch2])
1036 .with_save_mode(crate::protocol::SaveMode::Append)
1037 .await
1038 .unwrap();
1039
1040 let table = table
1041 .write(vec![batch1])
1042 .with_schema_mode(SchemaMode::Merge)
1043 .with_save_mode(crate::protocol::SaveMode::Append)
1044 .await
1045 .unwrap();
1046
1047 let config = DeltaScanConfigBuilder::new()
1048 .build(table.snapshot().unwrap().snapshot())
1049 .unwrap();
1050 let log = table.log_store();
1051
1052 let provider =
1053 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1054 .unwrap();
1055 let ctx: SessionContext = DeltaSessionContext::default().into();
1056 ctx.register_table("test", Arc::new(provider)).unwrap();
1057
1058 let df = ctx.sql("select col_1, col_2 from test").await.unwrap();
1059 let actual = df.collect().await.unwrap();
1060 let expected = vec![
1061 "+-------+-------+",
1062 "| col_1 | col_2 |",
1063 "+-------+-------+",
1064 "| A | |",
1065 "| B | |",
1066 "| E | E2 |",
1067 "| F | F2 |",
1068 "| G | G2 |",
1069 "+-------+-------+",
1070 ];
1071 assert_batches_sorted_eq!(&expected, &actual);
1072 }
1073
1074 #[tokio::test]
1075 async fn delta_scan_supports_pushdown() {
1076 let schema = Arc::new(ArrowSchema::new(vec![
1077 Field::new("col_1", ArrowDataType::Utf8, false),
1078 Field::new("col_2", ArrowDataType::Utf8, false),
1079 ]));
1080
1081 let batch = RecordBatch::try_new(
1082 schema.clone(),
1083 vec![
1084 Arc::new(arrow::array::StringArray::from(vec![
1085 Some("A"),
1086 Some("B"),
1087 Some("C"),
1088 ])),
1089 Arc::new(arrow::array::StringArray::from(vec![
1090 Some("A2"),
1091 Some("B2"),
1092 Some("C2"),
1093 ])),
1094 ],
1095 )
1096 .unwrap();
1097
1098 let table = DeltaTable::new_in_memory()
1099 .write(vec![batch])
1100 .with_save_mode(crate::protocol::SaveMode::Append)
1101 .await
1102 .unwrap();
1103
1104 let config = DeltaScanConfigBuilder::new()
1105 .build(table.snapshot().unwrap().snapshot())
1106 .unwrap();
1107 let log = table.log_store();
1108
1109 let provider =
1110 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1111 .unwrap();
1112
1113 let mut cfg = SessionConfig::default();
1114 cfg.options_mut().execution.parquet.pushdown_filters = true;
1115 let ctx = SessionContext::new_with_config(cfg);
1116 ctx.register_table("test", Arc::new(provider)).unwrap();
1117
1118 let df = ctx
1119 .sql("select col_1, col_2 from test WHERE col_1 = 'A'")
1120 .await
1121 .unwrap();
1122 let actual = df.collect().await.unwrap();
1123 let expected = vec![
1124 "+-------+-------+",
1125 "| col_1 | col_2 |",
1126 "+-------+-------+",
1127 "| A | A2 |",
1128 "+-------+-------+",
1129 ];
1130 assert_batches_sorted_eq!(&expected, &actual);
1131 }
1132
1133 #[tokio::test]
1134 async fn delta_scan_supports_nested_missing_columns() {
1135 let column1_schema1: arrow::datatypes::Fields =
1136 vec![Field::new("col_1a", ArrowDataType::Utf8, true)].into();
1137 let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1138 "col_1",
1139 ArrowDataType::Struct(column1_schema1.clone()),
1140 true,
1141 )]));
1142
1143 let batch1 = RecordBatch::try_new(
1144 schema1.clone(),
1145 vec![Arc::new(StructArray::new(
1146 column1_schema1,
1147 vec![Arc::new(arrow::array::StringArray::from(vec![
1148 Some("A"),
1149 Some("B"),
1150 ]))],
1151 None,
1152 ))],
1153 )
1154 .unwrap();
1155
1156 let column1_schema2: arrow_schema::Fields = vec![
1157 Field::new("col_1a", ArrowDataType::Utf8, true),
1158 Field::new("col_1b", ArrowDataType::Utf8, true),
1159 ]
1160 .into();
1161 let schema2 = Arc::new(ArrowSchema::new(vec![Field::new(
1162 "col_1",
1163 ArrowDataType::Struct(column1_schema2.clone()),
1164 true,
1165 )]));
1166
1167 let batch2 = RecordBatch::try_new(
1168 schema2.clone(),
1169 vec![Arc::new(StructArray::new(
1170 column1_schema2,
1171 vec![
1172 Arc::new(arrow::array::StringArray::from(vec![
1173 Some("E"),
1174 Some("F"),
1175 Some("G"),
1176 ])),
1177 Arc::new(arrow::array::StringArray::from(vec![
1178 Some("E2"),
1179 Some("F2"),
1180 Some("G2"),
1181 ])),
1182 ],
1183 None,
1184 ))],
1185 )
1186 .unwrap();
1187
1188 let table = DeltaTable::new_in_memory()
1189 .write(vec![batch1])
1190 .with_save_mode(crate::protocol::SaveMode::Append)
1191 .await
1192 .unwrap();
1193
1194 let table = table
1195 .write(vec![batch2])
1196 .with_schema_mode(SchemaMode::Merge)
1197 .with_save_mode(crate::protocol::SaveMode::Append)
1198 .await
1199 .unwrap();
1200
1201 let config = DeltaScanConfigBuilder::new()
1202 .build(table.snapshot().unwrap().snapshot())
1203 .unwrap();
1204 let log = table.log_store();
1205
1206 let provider =
1207 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1208 .unwrap();
1209 let ctx: SessionContext = DeltaSessionContext::default().into();
1210 ctx.register_table("test", Arc::new(provider)).unwrap();
1211
1212 let df = ctx
1213 .sql("select col_1.col_1a, col_1.col_1b from test")
1214 .await
1215 .unwrap();
1216 let actual = df.collect().await.unwrap();
1217 let expected = vec![
1218 "+--------------------+--------------------+",
1219 "| test.col_1[col_1a] | test.col_1[col_1b] |",
1220 "+--------------------+--------------------+",
1221 "| A | |",
1222 "| B | |",
1223 "| E | E2 |",
1224 "| F | F2 |",
1225 "| G | G2 |",
1226 "+--------------------+--------------------+",
1227 ];
1228 assert_batches_sorted_eq!(&expected, &actual);
1229 }
1230
1231 #[tokio::test]
1232 async fn test_multiple_predicate_pushdown() {
1233 let schema = Arc::new(ArrowSchema::new(vec![
1234 Field::new("moDified", ArrowDataType::Utf8, true),
1235 Field::new("id", ArrowDataType::Utf8, true),
1236 Field::new("vaLue", ArrowDataType::Int32, true),
1237 ]));
1238
1239 let batch = RecordBatch::try_new(
1240 schema.clone(),
1241 vec![
1242 Arc::new(arrow::array::StringArray::from(vec![
1243 "2021-02-01",
1244 "2021-02-01",
1245 "2021-02-02",
1246 "2021-02-02",
1247 ])),
1248 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1249 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1250 ],
1251 )
1252 .unwrap();
1253 let table = DeltaTable::new_in_memory()
1255 .write(vec![batch.clone()])
1256 .with_save_mode(crate::protocol::SaveMode::Append)
1257 .await
1258 .unwrap();
1259
1260 let datafusion = SessionContext::new();
1261 table
1262 .update_datafusion_session(&datafusion.state())
1263 .unwrap();
1264
1265 datafusion
1266 .register_table("snapshot", table.table_provider().await.unwrap())
1267 .unwrap();
1268
1269 let df = datafusion
1270 .sql("select * from snapshot where id > 10000 and id < 20000")
1271 .await
1272 .unwrap();
1273
1274 df.collect().await.unwrap();
1275 }
1276
1277 #[tokio::test]
1278 async fn test_delta_scan_builder_no_scan_config() {
1279 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1280 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1281 let table = DeltaTable::new_in_memory()
1282 .write(vec![batch])
1283 .with_save_mode(crate::protocol::SaveMode::Append)
1284 .await
1285 .unwrap();
1286
1287 let ctx = SessionContext::new();
1288 let state = ctx.state();
1289 let scan = DeltaScanBuilder::new(
1290 table.snapshot().unwrap().snapshot(),
1291 table.log_store(),
1292 &state,
1293 )
1294 .with_filter(Some(col("a").eq(lit("s"))))
1295 .build()
1296 .await
1297 .unwrap();
1298
1299 let mut visitor = ParquetVisitor::default();
1300 visit_execution_plan(&scan, &mut visitor).unwrap();
1301
1302 assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
1303 }
1304
1305 #[tokio::test]
1306 async fn test_delta_scan_builder_scan_config_disable_pushdown() {
1307 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1308 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1309 let table = DeltaTable::new_in_memory()
1310 .write(vec![batch])
1311 .with_save_mode(crate::protocol::SaveMode::Append)
1312 .await
1313 .unwrap();
1314
1315 let snapshot = table.snapshot().unwrap();
1316 let ctx = SessionContext::new();
1317 let state = ctx.state();
1318 let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1319 .with_filter(Some(col("a").eq(lit("s"))))
1320 .with_scan_config(
1321 DeltaScanConfigBuilder::new()
1322 .with_parquet_pushdown(false)
1323 .build(snapshot.snapshot())
1324 .unwrap(),
1325 )
1326 .build()
1327 .await
1328 .unwrap();
1329
1330 let mut visitor = ParquetVisitor::default();
1331 visit_execution_plan(&scan, &mut visitor).unwrap();
1332
1333 assert!(visitor.predicate.is_none());
1334 }
1335
1336 #[tokio::test]
1337 async fn test_delta_scan_applies_parquet_options() {
1338 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1339 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1340 let table = DeltaTable::new_in_memory()
1341 .write(vec![batch])
1342 .with_save_mode(crate::protocol::SaveMode::Append)
1343 .await
1344 .unwrap();
1345
1346 let snapshot = table.snapshot().unwrap();
1347
1348 let mut config = SessionConfig::default();
1349 config.options_mut().execution.parquet.pushdown_filters = true;
1350 let ctx = SessionContext::new_with_config(config);
1351 let state = ctx.state();
1352
1353 let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1354 .build()
1355 .await
1356 .unwrap();
1357
1358 let mut visitor = ParquetVisitor::default();
1359 visit_execution_plan(&scan, &mut visitor).unwrap();
1360
1361 assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
1362 }
1363
1364 #[derive(Default)]
1366 struct ParquetVisitor {
1367 predicate: Option<Arc<dyn PhysicalExpr>>,
1368 options: Option<TableParquetOptions>,
1369 }
1370
1371 impl ExecutionPlanVisitor for ParquetVisitor {
1372 type Error = DataFusionError;
1373
1374 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
1375 let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
1376 return Ok(true);
1377 };
1378
1379 let Some(scan_config) = datasource_exec
1380 .data_source()
1381 .as_any()
1382 .downcast_ref::<FileScanConfig>()
1383 else {
1384 return Ok(true);
1385 };
1386
1387 if let Some(parquet_source) = scan_config
1388 .file_source
1389 .as_any()
1390 .downcast_ref::<ParquetSource>()
1391 {
1392 self.options = Some(parquet_source.table_parquet_options().clone());
1393 self.predicate = parquet_source.filter();
1394 }
1395
1396 Ok(true)
1397 }
1398 }
1399
1400 #[tokio::test]
1406 async fn passes_sanity_checker_when_all_files_filtered() {
1407 let table = open_fs_path("../test/tests/data/delta-2.2.0-partitioned-types");
1408 let ctx = create_session().into_inner();
1409 ctx.register_table("test", table.table_provider().await.unwrap())
1410 .unwrap();
1411
1412 let df = ctx
1413 .sql("select * from test where c3 = 100 ORDER BY c1 ASC")
1414 .await
1415 .unwrap();
1416 let actual = df.collect().await.unwrap();
1417
1418 assert_eq!(actual.len(), 0);
1419 }
1420
1421 #[tokio::test]
1422 async fn test_delta_scan_uses_parquet_column_pruning() {
1423 let small: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["a"]));
1424 let large: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec![
1425 "b".repeat(1024).as_str(),
1426 ]));
1427 let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap();
1428 let table = DeltaTable::new_in_memory()
1429 .write(vec![batch])
1430 .with_save_mode(crate::protocol::SaveMode::Append)
1431 .await
1432 .unwrap();
1433
1434 let config = DeltaScanConfigBuilder::new()
1435 .build(table.snapshot().unwrap().snapshot())
1436 .unwrap();
1437
1438 let (object_store, mut operations) =
1439 RecordingObjectStore::new(table.log_store().object_store(None));
1440 let both_store = Arc::new(object_store);
1442 let log_store = DefaultLogStore::new(
1443 both_store.clone(),
1444 both_store,
1445 table.log_store().config().clone(),
1446 );
1447 let provider = DeltaTableProvider::try_new(
1448 table.snapshot().unwrap().snapshot().clone(),
1449 Arc::new(log_store),
1450 config,
1451 )
1452 .unwrap();
1453 let ctx = SessionContext::new();
1454 ctx.register_table("test", Arc::new(provider)).unwrap();
1455 let state = ctx.state();
1456
1457 let df = ctx.sql("select small from test").await.unwrap();
1458 let plan = df.create_physical_plan().await.unwrap();
1459
1460 let mut stream = plan.execute(0, state.task_ctx()).unwrap();
1461 let Some(Ok(batch)) = stream.next().await else {
1462 panic!()
1463 };
1464 assert!(stream.next().await.is_none());
1465 assert_eq!(1, batch.num_columns());
1466 assert_eq!(1, batch.num_rows());
1467 let small = batch.column_by_name("small").unwrap().as_string::<i32>();
1468 assert_eq!("a", small.iter().next().unwrap().unwrap());
1469
1470 let expected = vec![
1471 ObjectStoreOperation::Get(LocationType::Commit),
1472 ObjectStoreOperation::GetRange(LocationType::Data, 957..965),
1473 ObjectStoreOperation::GetRange(LocationType::Data, 326..957),
1474 ];
1475 let mut actual = Vec::new();
1476 operations.recv_many(&mut actual, 3).await;
1477 assert_eq!(expected, actual);
1478 }
1479
1480 #[tokio::test]
1481 async fn test_push_down_filter_panic_2602() -> DeltaResult<()> {
1482 use crate::kernel::schema::{DataType, PrimitiveType};
1483 let ctx = SessionContext::new();
1484 let table = DeltaTable::new_in_memory()
1485 .create()
1486 .with_column("id", DataType::Primitive(PrimitiveType::Long), true, None)
1487 .with_column(
1488 "name",
1489 DataType::Primitive(PrimitiveType::String),
1490 true,
1491 None,
1492 )
1493 .with_column("b", DataType::Primitive(PrimitiveType::Boolean), true, None)
1494 .with_column(
1495 "ts",
1496 DataType::Primitive(PrimitiveType::Timestamp),
1497 true,
1498 None,
1499 )
1500 .with_column("dt", DataType::Primitive(PrimitiveType::Date), true, None)
1501 .with_column(
1502 "zap",
1503 DataType::Array(Box::new(ArrayType::new(
1504 DataType::Primitive(PrimitiveType::Boolean),
1505 true,
1506 ))),
1507 true,
1508 None,
1509 )
1510 .await?;
1511 table.update_datafusion_session(&ctx.state()).unwrap();
1512
1513 ctx.register_table("snapshot", table.table_provider().await.unwrap())
1514 .unwrap();
1515
1516 let df = ctx
1517 .sql("select * from snapshot where id > 10000 and id < 20000")
1518 .await
1519 .unwrap();
1520
1521 let _ = df.collect().await?;
1522 Ok(())
1523 }
1524
1525 struct RecordingObjectStore {
1527 inner: ObjectStoreRef,
1528 operations: UnboundedSender<ObjectStoreOperation>,
1529 }
1530
1531 impl RecordingObjectStore {
1532 fn new(inner: ObjectStoreRef) -> (Self, UnboundedReceiver<ObjectStoreOperation>) {
1534 let (operations, operations_receiver) = unbounded_channel();
1535 (Self { inner, operations }, operations_receiver)
1536 }
1537 }
1538
1539 impl Display for RecordingObjectStore {
1540 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1541 Display::fmt(&self.inner, f)
1542 }
1543 }
1544
1545 impl Debug for RecordingObjectStore {
1546 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1547 Debug::fmt(&self.inner, f)
1548 }
1549 }
1550
1551 #[derive(Debug, PartialEq)]
1552 enum ObjectStoreOperation {
1553 GetRanges(LocationType, Vec<Range<u64>>),
1554 GetRange(LocationType, Range<u64>),
1555 GetOpts(LocationType),
1556 Get(LocationType),
1557 }
1558
1559 #[derive(Debug, PartialEq)]
1560 enum LocationType {
1561 Data,
1562 Commit,
1563 }
1564
1565 impl From<&Path> for LocationType {
1566 fn from(value: &Path) -> Self {
1567 let dummy_url = Url::parse("dummy:///").unwrap();
1568 let parsed = ParsedLogPath::try_from(dummy_url.join(value.as_ref()).unwrap()).unwrap();
1569 if let Some(parsed) = parsed
1570 && matches!(parsed.file_type, LogPathFileType::Commit)
1571 {
1572 return LocationType::Commit;
1573 }
1574 if value.to_string().starts_with("part-") {
1575 LocationType::Data
1576 } else {
1577 panic!("Unknown location type: {value:?}")
1578 }
1579 }
1580 }
1581
1582 #[async_trait::async_trait]
1584 impl ObjectStore for RecordingObjectStore {
1585 async fn put(
1586 &self,
1587 location: &Path,
1588 payload: PutPayload,
1589 ) -> object_store::Result<PutResult> {
1590 self.inner.put(location, payload).await
1591 }
1592
1593 async fn put_opts(
1594 &self,
1595 location: &Path,
1596 payload: PutPayload,
1597 opts: PutOptions,
1598 ) -> object_store::Result<PutResult> {
1599 self.inner.put_opts(location, payload, opts).await
1600 }
1601
1602 async fn put_multipart(
1603 &self,
1604 location: &Path,
1605 ) -> object_store::Result<Box<dyn MultipartUpload>> {
1606 self.inner.put_multipart(location).await
1607 }
1608
1609 async fn put_multipart_opts(
1610 &self,
1611 location: &Path,
1612 opts: PutMultipartOptions,
1613 ) -> object_store::Result<Box<dyn MultipartUpload>> {
1614 self.inner.put_multipart_opts(location, opts).await
1615 }
1616
1617 async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
1618 self.operations
1619 .send(ObjectStoreOperation::Get(location.into()))
1620 .unwrap();
1621 self.inner.get(location).await
1622 }
1623
1624 async fn get_opts(
1625 &self,
1626 location: &Path,
1627 options: GetOptions,
1628 ) -> object_store::Result<GetResult> {
1629 self.operations
1630 .send(ObjectStoreOperation::GetOpts(location.into()))
1631 .unwrap();
1632 self.inner.get_opts(location, options).await
1633 }
1634
1635 async fn get_range(
1636 &self,
1637 location: &Path,
1638 range: Range<u64>,
1639 ) -> object_store::Result<Bytes> {
1640 self.operations
1641 .send(ObjectStoreOperation::GetRange(
1642 location.into(),
1643 range.clone(),
1644 ))
1645 .unwrap();
1646 self.inner.get_range(location, range).await
1647 }
1648
1649 async fn get_ranges(
1650 &self,
1651 location: &Path,
1652 ranges: &[Range<u64>],
1653 ) -> object_store::Result<Vec<Bytes>> {
1654 self.operations
1655 .send(ObjectStoreOperation::GetRanges(
1656 location.into(),
1657 ranges.to_vec(),
1658 ))
1659 .unwrap();
1660 self.inner.get_ranges(location, ranges).await
1661 }
1662
1663 async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
1664 self.inner.head(location).await
1665 }
1666
1667 async fn delete(&self, location: &Path) -> object_store::Result<()> {
1668 self.inner.delete(location).await
1669 }
1670
1671 fn delete_stream<'a>(
1672 &'a self,
1673 locations: BoxStream<'a, object_store::Result<Path>>,
1674 ) -> BoxStream<'a, object_store::Result<Path>> {
1675 self.inner.delete_stream(locations)
1676 }
1677
1678 fn list(
1679 &self,
1680 prefix: Option<&Path>,
1681 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1682 self.inner.list(prefix)
1683 }
1684
1685 fn list_with_offset(
1686 &self,
1687 prefix: Option<&Path>,
1688 offset: &Path,
1689 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
1690 self.inner.list_with_offset(prefix, offset)
1691 }
1692
1693 async fn list_with_delimiter(
1694 &self,
1695 prefix: Option<&Path>,
1696 ) -> object_store::Result<ListResult> {
1697 self.inner.list_with_delimiter(prefix).await
1698 }
1699
1700 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1701 self.inner.copy(from, to).await
1702 }
1703
1704 async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1705 self.inner.rename(from, to).await
1706 }
1707
1708 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1709 self.inner.copy_if_not_exists(from, to).await
1710 }
1711
1712 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
1713 self.inner.rename_if_not_exists(from, to).await
1714 }
1715 }
1716}