1use std::fmt::Debug;
27use std::sync::Arc;
28
29use arrow::array::types::UInt16Type;
30use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
31use arrow_cast::display::array_value_to_string;
32use arrow_cast::{CastOptions, cast_with_options};
33use arrow_schema::{
34 DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef,
35 TimeUnit,
36};
37use datafusion::catalog::{Session, TableProviderFactory};
38use datafusion::common::scalar::ScalarValue;
39use datafusion::common::{
40 Column, DFSchema, DataFusionError, Result as DataFusionResult, TableReference, ToDFSchema,
41};
42use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
43use datafusion::datasource::{MemTable, TableProvider};
44use datafusion::execution::TaskContext;
45use datafusion::execution::context::SessionContext;
46use datafusion::execution::runtime_env::RuntimeEnv;
47use datafusion::logical_expr::logical_plan::CreateExternalTable;
48use datafusion::logical_expr::utils::conjunction;
49use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
50use datafusion::physical_optimizer::pruning::PruningPredicate;
51use datafusion::physical_plan::{ExecutionPlan, Statistics};
52use datafusion_proto::logical_plan::LogicalExtensionCodec;
53use datafusion_proto::physical_plan::PhysicalExtensionCodec;
54use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
55use either::Either;
56use itertools::Itertools;
57use url::Url;
58
59use crate::delta_datafusion::expr::parse_predicate_expression;
60use crate::delta_datafusion::table_provider::DeltaScanWire;
61use crate::ensure_table_uri;
62use crate::errors::{DeltaResult, DeltaTableError};
63use crate::kernel::{
64 Add, DataCheck, EagerSnapshot, Invariant, LogDataHandler, Snapshot, StructTypeExt,
65};
66use crate::logstore::{LogStore, LogStoreRef};
67use crate::table::config::TablePropertiesExt as _;
68use crate::table::state::DeltaTableState;
69use crate::table::{Constraint, GeneratedColumn};
70use crate::{DeltaTable, open_table, open_table_with_storage_options};
71
72pub(crate) use self::session::session_state_from_session;
73pub use self::session::{
74 DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, DeltaSessionContext,
75 create_session,
76};
77pub use self::table_provider::next::DeltaScan as DeltaScanNext;
78pub use self::table_provider::next::SnapshotWrapper;
79pub(crate) use find_files::*;
80
81pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
82
83pub mod cdf;
84pub mod engine;
85pub mod expr;
86mod find_files;
87pub mod logical;
88pub mod physical;
89pub mod planner;
90mod schema_adapter;
91mod session;
92mod table_provider;
93
94pub use cdf::scan::DeltaCdfTableProvider;
95pub(crate) use table_provider::DeltaScanBuilder;
96pub use table_provider::{DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider};
97
98impl From<DeltaTableError> for DataFusionError {
99 fn from(err: DeltaTableError) -> Self {
100 match err {
101 DeltaTableError::Arrow { source } => DataFusionError::from(source),
102 DeltaTableError::Io { source } => DataFusionError::IoError(source),
103 DeltaTableError::ObjectStore { source } => DataFusionError::from(source),
104 DeltaTableError::Parquet { source } => DataFusionError::from(source),
105 _ => DataFusionError::External(Box::new(err)),
106 }
107 }
108}
109
110impl From<DataFusionError> for DeltaTableError {
111 fn from(err: DataFusionError) -> Self {
112 match err {
113 DataFusionError::ArrowError(source, _) => DeltaTableError::from(*source),
114 DataFusionError::IoError(source) => DeltaTableError::Io { source },
115 DataFusionError::ObjectStore(source) => DeltaTableError::from(*source),
116 DataFusionError::ParquetError(source) => DeltaTableError::from(*source),
117 _ => DeltaTableError::Generic(err.to_string()),
118 }
119 }
120}
121
122pub trait DataFusionMixins {
124 fn read_schema(&self) -> ArrowSchemaRef;
126
127 fn input_schema(&self) -> ArrowSchemaRef;
129
130 fn parse_predicate_expression(
132 &self,
133 expr: impl AsRef<str>,
134 session: &dyn Session,
135 ) -> DeltaResult<Expr>;
136}
137
138impl DataFusionMixins for Snapshot {
139 fn read_schema(&self) -> ArrowSchemaRef {
140 _arrow_schema(
141 self.arrow_schema(),
142 self.metadata().partition_columns(),
143 true,
144 )
145 }
146
147 fn input_schema(&self) -> ArrowSchemaRef {
148 _arrow_schema(
149 self.arrow_schema(),
150 self.metadata().partition_columns(),
151 false,
152 )
153 }
154
155 fn parse_predicate_expression(
156 &self,
157 expr: impl AsRef<str>,
158 session: &dyn Session,
159 ) -> DeltaResult<Expr> {
160 let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
161 parse_predicate_expression(&schema, expr, session)
162 }
163}
164
165impl DataFusionMixins for LogDataHandler<'_> {
166 fn read_schema(&self) -> ArrowSchemaRef {
167 _arrow_schema(
168 Arc::new(
169 self.table_configuration()
170 .schema()
171 .as_ref()
172 .try_into_arrow()
173 .unwrap(),
174 ),
175 self.table_configuration().metadata().partition_columns(),
176 true,
177 )
178 }
179
180 fn input_schema(&self) -> ArrowSchemaRef {
181 _arrow_schema(
182 Arc::new(
183 self.table_configuration()
184 .schema()
185 .as_ref()
186 .try_into_arrow()
187 .unwrap(),
188 ),
189 self.table_configuration().metadata().partition_columns(),
190 false,
191 )
192 }
193
194 fn parse_predicate_expression(
195 &self,
196 expr: impl AsRef<str>,
197 session: &dyn Session,
198 ) -> DeltaResult<Expr> {
199 let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
200 parse_predicate_expression(&schema, expr, session)
201 }
202}
203
204impl DataFusionMixins for EagerSnapshot {
205 fn read_schema(&self) -> ArrowSchemaRef {
206 self.snapshot().read_schema()
207 }
208
209 fn input_schema(&self) -> ArrowSchemaRef {
210 self.snapshot().input_schema()
211 }
212
213 fn parse_predicate_expression(
214 &self,
215 expr: impl AsRef<str>,
216 session: &dyn Session,
217 ) -> DeltaResult<Expr> {
218 self.snapshot().parse_predicate_expression(expr, session)
219 }
220}
221
222fn _arrow_schema(
223 schema: SchemaRef,
224 partition_columns: &[String],
225 wrap_partitions: bool,
226) -> ArrowSchemaRef {
227 let fields = schema
228 .fields()
229 .into_iter()
230 .filter(|f| !partition_columns.contains(&f.name().to_string()))
231 .cloned()
232 .chain(
233 partition_columns.iter().map(|partition_col| {
236 let field = schema.field_with_name(partition_col).unwrap();
237 let corrected = if wrap_partitions {
238 match field.data_type() {
239 ArrowDataType::Utf8
242 | ArrowDataType::LargeUtf8
243 | ArrowDataType::Binary
244 | ArrowDataType::LargeBinary => {
245 wrap_partition_type_in_dict(field.data_type().clone())
246 }
247 _ => field.data_type().clone(),
248 }
249 } else {
250 field.data_type().clone()
251 };
252 Arc::new(field.clone().with_data_type(corrected))
253 }),
254 )
255 .collect::<Vec<_>>();
256 Arc::new(ArrowSchema::new(fields))
257}
258
259pub(crate) fn files_matching_predicate<'a>(
260 log_data: LogDataHandler<'a>,
261 filters: &[Expr],
262) -> DeltaResult<impl Iterator<Item = Add> + 'a> {
263 if let Some(Some(predicate)) =
264 (!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
265 {
266 let expr = SessionContext::new()
267 .create_physical_expr(predicate, &log_data.read_schema().to_dfschema()?)?;
268 let pruning_predicate = PruningPredicate::try_new(expr, log_data.read_schema())?;
269 let mask = pruning_predicate.prune(&log_data)?;
270
271 Ok(Either::Left(log_data.into_iter().zip(mask).filter_map(
272 |(file, keep_file)| {
273 if keep_file {
274 Some(file.add_action())
275 } else {
276 None
277 }
278 },
279 )))
280 } else {
281 Ok(Either::Right(
282 log_data.into_iter().map(|file| file.add_action()),
283 ))
284 }
285}
286
287pub(crate) fn get_path_column<'a>(
288 batch: &'a RecordBatch,
289 path_column: &str,
290) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
291 let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
292 batch
293 .column_by_name(path_column)
294 .unwrap()
295 .as_any()
296 .downcast_ref::<DictionaryArray<UInt16Type>>()
297 .ok_or_else(err)?
298 .downcast_dict::<StringArray>()
299 .ok_or_else(err)
300}
301
302impl DeltaTableState {
303 pub fn datafusion_table_statistics(&self) -> Option<Statistics> {
305 self.snapshot.log_data().statistics()
306 }
307}
308
309pub(crate) fn register_store(store: LogStoreRef, env: &RuntimeEnv) {
312 let object_store_url = store.object_store_url();
313 let url: &Url = object_store_url.as_ref();
314 env.register_object_store(url, store.object_store(None));
315}
316
317pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
318 match t {
319 ArrowDataType::Null => Ok(ScalarValue::Null),
320 ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
321 ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
322 ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
323 ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
324 ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
325 ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
326 ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
327 ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
328 ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
329 ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
330 ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
331 ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
332 ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
333 ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
334 ArrowDataType::FixedSizeBinary(size) => {
335 Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
336 }
337 ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
338 ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
339 ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
340 ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
341 None,
342 precision.to_owned(),
343 scale.to_owned(),
344 )),
345 ArrowDataType::Timestamp(unit, tz) => {
346 let tz = tz.to_owned();
347 Ok(match unit {
348 TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
349 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
350 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
351 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
352 })
353 }
354 ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
355 k.clone(),
356 Box::new(get_null_of_arrow_type(v)?),
357 )),
358 ArrowDataType::Float16
360 | ArrowDataType::Decimal32(_, _)
361 | ArrowDataType::Decimal64(_, _)
362 | ArrowDataType::Decimal256(_, _)
363 | ArrowDataType::Union(_, _)
364 | ArrowDataType::LargeList(_)
365 | ArrowDataType::Struct(_)
366 | ArrowDataType::List(_)
367 | ArrowDataType::FixedSizeList(_, _)
368 | ArrowDataType::Time32(_)
369 | ArrowDataType::Time64(_)
370 | ArrowDataType::Duration(_)
371 | ArrowDataType::Interval(_)
372 | ArrowDataType::RunEndEncoded(_, _)
373 | ArrowDataType::BinaryView
374 | ArrowDataType::Utf8View
375 | ArrowDataType::LargeListView(_)
376 | ArrowDataType::ListView(_)
377 | ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
378 "Unsupported data type for Delta Lake {t}"
379 ))),
380 }
381}
382
383fn parse_date(
384 stat_val: &serde_json::Value,
385 field_dt: &ArrowDataType,
386) -> DataFusionResult<ScalarValue> {
387 let string = match stat_val {
388 serde_json::Value::String(s) => s.to_owned(),
389 _ => stat_val.to_string(),
390 };
391
392 let time_micro = ScalarValue::try_from_string(string, &ArrowDataType::Date32)?;
393 let cast_arr = cast_with_options(
394 &time_micro.to_array()?,
395 field_dt,
396 &CastOptions {
397 safe: false,
398 ..Default::default()
399 },
400 )?;
401 ScalarValue::try_from_array(&cast_arr, 0)
402}
403
404fn parse_timestamp(
405 stat_val: &serde_json::Value,
406 field_dt: &ArrowDataType,
407) -> DataFusionResult<ScalarValue> {
408 let string = match stat_val {
409 serde_json::Value::String(s) => s.to_owned(),
410 _ => stat_val.to_string(),
411 };
412
413 let time_micro = ScalarValue::try_from_string(
414 string,
415 &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
416 )?;
417 let cast_arr = cast_with_options(
418 &time_micro.to_array()?,
419 field_dt,
420 &CastOptions {
421 safe: false,
422 ..Default::default()
423 },
424 )?;
425 ScalarValue::try_from_array(&cast_arr, 0)
426}
427
428pub(crate) fn to_correct_scalar_value(
429 stat_val: &serde_json::Value,
430 field_dt: &ArrowDataType,
431) -> DataFusionResult<Option<ScalarValue>> {
432 match stat_val {
433 serde_json::Value::Array(_) => Ok(None),
434 serde_json::Value::Object(_) => Ok(None),
435 serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)),
436 serde_json::Value::String(string_val) => match field_dt {
437 ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
438 ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
439 _ => Ok(Some(ScalarValue::try_from_string(
440 string_val.to_owned(),
441 field_dt,
442 )?)),
443 },
444 other => match field_dt {
445 ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
446 ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
447 _ => Ok(Some(ScalarValue::try_from_string(
448 other.to_string(),
449 field_dt,
450 )?)),
451 },
452 }
453}
454
455#[derive(Clone, Default)]
457pub struct DeltaDataChecker {
458 constraints: Vec<Constraint>,
459 invariants: Vec<Invariant>,
460 generated_columns: Vec<GeneratedColumn>,
461 non_nullable_columns: Vec<String>,
462 ctx: SessionContext,
463}
464
465impl DeltaDataChecker {
466 pub fn empty() -> Self {
468 Self {
469 invariants: vec![],
470 constraints: vec![],
471 generated_columns: vec![],
472 non_nullable_columns: vec![],
473 ctx: DeltaSessionContext::default().into(),
474 }
475 }
476
477 pub fn new_with_invariants(invariants: Vec<Invariant>) -> Self {
479 Self {
480 invariants,
481 constraints: vec![],
482 generated_columns: vec![],
483 non_nullable_columns: vec![],
484 ctx: DeltaSessionContext::default().into(),
485 }
486 }
487
488 pub fn new_with_constraints(constraints: Vec<Constraint>) -> Self {
490 Self {
491 constraints,
492 invariants: vec![],
493 generated_columns: vec![],
494 non_nullable_columns: vec![],
495 ctx: DeltaSessionContext::default().into(),
496 }
497 }
498
499 pub fn new_with_generated_columns(generated_columns: Vec<GeneratedColumn>) -> Self {
501 Self {
502 constraints: vec![],
503 invariants: vec![],
504 generated_columns,
505 non_nullable_columns: vec![],
506 ctx: DeltaSessionContext::default().into(),
507 }
508 }
509
510 pub fn with_session_context(mut self, context: SessionContext) -> Self {
512 self.ctx = context;
513 self
514 }
515
516 pub fn with_extra_constraints(mut self, constraints: Vec<Constraint>) -> Self {
518 self.constraints.extend(constraints);
519 self
520 }
521
522 pub fn new(snapshot: &EagerSnapshot) -> Self {
524 let invariants = snapshot.schema().get_invariants().unwrap_or_default();
525 let generated_columns = snapshot
526 .schema()
527 .get_generated_columns()
528 .unwrap_or_default();
529 let constraints = snapshot.table_properties().get_constraints();
530 let non_nullable_columns = snapshot
531 .schema()
532 .fields()
533 .filter_map(|f| {
534 if !f.is_nullable() {
535 Some(f.name().clone())
536 } else {
537 None
538 }
539 })
540 .collect_vec();
541 Self {
542 invariants,
543 constraints,
544 generated_columns,
545 non_nullable_columns,
546 ctx: DeltaSessionContext::default().into(),
547 }
548 }
549
550 pub async fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
555 self.check_nullability(record_batch)?;
556 self.enforce_checks(record_batch, &self.invariants).await?;
557 self.enforce_checks(record_batch, &self.constraints).await?;
558 self.enforce_checks(record_batch, &self.generated_columns)
559 .await
560 }
561
562 fn check_nullability(&self, record_batch: &RecordBatch) -> Result<bool, DeltaTableError> {
564 let mut violations = Vec::with_capacity(self.non_nullable_columns.len());
565 for col in self.non_nullable_columns.iter() {
566 if let Some(arr) = record_batch.column_by_name(col) {
567 if arr.null_count() > 0 {
568 violations.push(format!(
569 "Non-nullable column violation for {col}, found {} null values",
570 arr.null_count()
571 ));
572 }
573 } else {
574 violations.push(format!(
575 "Non-nullable column violation for {col}, not found in batch!"
576 ));
577 }
578 }
579 if !violations.is_empty() {
580 Err(DeltaTableError::InvalidData { violations })
581 } else {
582 Ok(true)
583 }
584 }
585
586 async fn enforce_checks<C: DataCheck>(
587 &self,
588 record_batch: &RecordBatch,
589 checks: &[C],
590 ) -> Result<(), DeltaTableError> {
591 if checks.is_empty() {
592 return Ok(());
593 }
594 let table = MemTable::try_new(record_batch.schema(), vec![vec![record_batch.clone()]])?;
595 table.schema();
596 let table_name: String = uuid::Uuid::new_v4().to_string();
598 self.ctx.register_table(&table_name, Arc::new(table))?;
599
600 let mut violations: Vec<String> = Vec::with_capacity(checks.len());
601
602 for check in checks {
603 if check.get_name().contains('.') {
604 return Err(DeltaTableError::Generic(
605 "delta constraints for nested columns are not supported at the moment."
606 .to_string(),
607 ));
608 }
609
610 let field_to_select = if check.as_any().is::<Constraint>() {
611 "*"
612 } else {
613 check.get_name()
614 };
615 let sql = format!(
616 "SELECT {} FROM `{table_name}` WHERE NOT ({}) LIMIT 1",
617 field_to_select,
618 check.get_expression()
619 );
620
621 let dfs: Vec<RecordBatch> = self.ctx.sql(&sql).await?.collect().await?;
622 if !dfs.is_empty() && dfs[0].num_rows() > 0 {
623 let value: String = dfs[0]
624 .columns()
625 .iter()
626 .map(|c| array_value_to_string(c, 0).unwrap_or(String::from("null")))
627 .join(", ");
628
629 let msg = format!(
630 "Check or Invariant ({}) violated by value in row: [{value}]",
631 check.get_expression(),
632 );
633 violations.push(msg);
634 }
635 }
636
637 self.ctx.deregister_table(&table_name)?;
638 if !violations.is_empty() {
639 Err(DeltaTableError::InvalidData { violations })
640 } else {
641 Ok(())
642 }
643 }
644}
645
646#[derive(Debug)]
648pub struct DeltaPhysicalCodec {}
649
650impl PhysicalExtensionCodec for DeltaPhysicalCodec {
651 fn try_decode(
652 &self,
653 buf: &[u8],
654 inputs: &[Arc<dyn ExecutionPlan>],
655 _registry: &TaskContext,
656 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
657 let wire: DeltaScanWire = serde_json::from_reader(buf)
658 .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
659 let delta_scan = DeltaScan::new(
660 &wire.table_url,
661 wire.config,
662 (*inputs)[0].clone(),
663 wire.logical_schema,
664 );
665 Ok(Arc::new(delta_scan))
666 }
667
668 fn try_encode(
669 &self,
670 node: Arc<dyn ExecutionPlan>,
671 buf: &mut Vec<u8>,
672 ) -> Result<(), DataFusionError> {
673 let delta_scan = node
674 .as_any()
675 .downcast_ref::<DeltaScan>()
676 .ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?;
677
678 let wire = DeltaScanWire::from(delta_scan);
679 serde_json::to_writer(buf, &wire)
680 .map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?;
681 Ok(())
682 }
683}
684
685#[derive(Debug)]
687pub struct DeltaLogicalCodec {}
688
689impl LogicalExtensionCodec for DeltaLogicalCodec {
690 fn try_decode(
691 &self,
692 _buf: &[u8],
693 _inputs: &[LogicalPlan],
694 _ctx: &TaskContext,
695 ) -> Result<Extension, DataFusionError> {
696 todo!("DeltaLogicalCodec")
697 }
698
699 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<(), DataFusionError> {
700 todo!("DeltaLogicalCodec")
701 }
702
703 fn try_decode_table_provider(
704 &self,
705 buf: &[u8],
706 _table_ref: &TableReference,
707 _schema: SchemaRef,
708 _ctx: &TaskContext,
709 ) -> Result<Arc<dyn TableProvider>, DataFusionError> {
710 let provider: DeltaTable = serde_json::from_slice(buf)
711 .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))?;
712 Ok(Arc::new(provider))
713 }
714
715 fn try_encode_table_provider(
716 &self,
717 _table_ref: &TableReference,
718 node: Arc<dyn TableProvider>,
719 buf: &mut Vec<u8>,
720 ) -> Result<(), DataFusionError> {
721 let table = node
722 .as_ref()
723 .as_any()
724 .downcast_ref::<DeltaTable>()
725 .ok_or_else(|| {
726 DataFusionError::Internal("Can't encode non-delta tables".to_string())
727 })?;
728 serde_json::to_writer(buf, table)
729 .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
730 }
731}
732
733#[derive(Debug)]
735pub struct DeltaTableFactory {}
736
737#[async_trait::async_trait]
738impl TableProviderFactory for DeltaTableFactory {
739 async fn create(
740 &self,
741 _ctx: &dyn Session,
742 cmd: &CreateExternalTable,
743 ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
744 let provider = if cmd.options.is_empty() {
745 let table_url = ensure_table_uri(&cmd.to_owned().location)?;
746 open_table(table_url).await?
747 } else {
748 let table_url = ensure_table_uri(&cmd.to_owned().location)?;
749 open_table_with_storage_options(table_url, cmd.to_owned().options).await?
750 };
751 Ok(Arc::new(provider))
752 }
753}
754
755pub struct DeltaColumn {
757 inner: Column,
758}
759
760impl From<&str> for DeltaColumn {
761 fn from(c: &str) -> Self {
762 DeltaColumn {
763 inner: Column::from_qualified_name_ignore_case(c),
764 }
765 }
766}
767
768impl From<&String> for DeltaColumn {
770 fn from(c: &String) -> Self {
771 DeltaColumn {
772 inner: Column::from_qualified_name_ignore_case(c),
773 }
774 }
775}
776
777impl From<String> for DeltaColumn {
779 fn from(c: String) -> Self {
780 DeltaColumn {
781 inner: Column::from_qualified_name_ignore_case(c),
782 }
783 }
784}
785
786impl From<DeltaColumn> for Column {
787 fn from(value: DeltaColumn) -> Self {
788 value.inner
789 }
790}
791
792impl From<Column> for DeltaColumn {
794 fn from(c: Column) -> Self {
795 DeltaColumn { inner: c }
796 }
797}
798
799#[cfg(test)]
800mod tests {
801 use crate::logstore::ObjectStoreRef;
802 use crate::logstore::default_logstore::DefaultLogStore;
803 use crate::operations::write::SchemaMode;
804 use crate::writer::test_utils::get_delta_schema;
805 use arrow::array::StructArray;
806 use arrow::datatypes::{Field, Schema};
807 use arrow_array::cast::AsArray;
808 use bytes::Bytes;
809 use datafusion::assert_batches_sorted_eq;
810 use datafusion::config::TableParquetOptions;
811 use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
812 use datafusion::datasource::source::DataSourceExec;
813 use datafusion::logical_expr::lit;
814 use datafusion::physical_plan::empty::EmptyExec;
815 use datafusion::physical_plan::{ExecutionPlanVisitor, PhysicalExpr, visit_execution_plan};
816 use datafusion::prelude::{SessionConfig, col};
817 use datafusion_datasource::file::FileSource as _;
818 use datafusion_proto::physical_plan::AsExecutionPlan;
819 use datafusion_proto::protobuf;
820 use delta_kernel::path::{LogPathFileType, ParsedLogPath};
821 use delta_kernel::schema::ArrayType;
822 use futures::{StreamExt, stream::BoxStream};
823 use object_store::ObjectMeta;
824 use object_store::{
825 GetOptions, GetResult, ListResult, MultipartUpload, ObjectStore, PutMultipartOptions,
826 PutOptions, PutPayload, PutResult, path::Path,
827 };
828 use serde_json::json;
829 use std::fmt::{self, Debug, Display, Formatter};
830 use std::ops::Range;
831 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
832
833 use super::*;
834
835 #[test]
838 fn test_to_correct_scalar_value() {
839 let reference_pairs = &[
840 (
841 json!("2015"),
842 ArrowDataType::Int16,
843 ScalarValue::Int16(Some(2015)),
844 ),
845 (
846 json!("2015"),
847 ArrowDataType::Int32,
848 ScalarValue::Int32(Some(2015)),
849 ),
850 (
851 json!("2015"),
852 ArrowDataType::Int64,
853 ScalarValue::Int64(Some(2015)),
854 ),
855 (
856 json!("2015"),
857 ArrowDataType::Float32,
858 ScalarValue::Float32(Some(2015_f32)),
859 ),
860 (
861 json!("2015"),
862 ArrowDataType::Float64,
863 ScalarValue::Float64(Some(2015_f64)),
864 ),
865 (
866 json!(2015),
867 ArrowDataType::Float64,
868 ScalarValue::Float64(Some(2015_f64)),
869 ),
870 (
871 json!("2015-01-01"),
872 ArrowDataType::Date32,
873 ScalarValue::Date32(Some(16436)),
874 ),
875 (
897 json!(true),
898 ArrowDataType::Boolean,
899 ScalarValue::Boolean(Some(true)),
900 ),
901 ];
902
903 for (raw, data_type, ref_scalar) in reference_pairs {
904 let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap();
905 assert_eq!(*ref_scalar, scalar)
906 }
907 }
908
909 #[tokio::test]
910 async fn test_enforce_invariants() {
911 let schema = Arc::new(Schema::new(vec![
912 Field::new("a", ArrowDataType::Utf8, false),
913 Field::new("b", ArrowDataType::Int32, false),
914 ]));
915 let batch = RecordBatch::try_new(
916 Arc::clone(&schema),
917 vec![
918 Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c", "d"])),
919 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
920 ],
921 )
922 .unwrap();
923 let invariants: Vec<Invariant> = vec![];
925 assert!(
926 DeltaDataChecker::new_with_invariants(invariants)
927 .check_batch(&batch)
928 .await
929 .is_ok()
930 );
931
932 let invariants = vec![
934 Invariant::new("a", "a is not null"),
935 Invariant::new("b", "b < 1000"),
936 ];
937 assert!(
938 DeltaDataChecker::new_with_invariants(invariants)
939 .check_batch(&batch)
940 .await
941 .is_ok()
942 );
943
944 let invariants = vec![
946 Invariant::new("a", "a is null"),
947 Invariant::new("b", "b < 100"),
948 ];
949 let result = DeltaDataChecker::new_with_invariants(invariants)
950 .check_batch(&batch)
951 .await;
952 assert!(result.is_err());
953 assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
954 if let Err(DeltaTableError::InvalidData { violations }) = result {
955 assert_eq!(violations.len(), 2);
956 }
957
958 let invariants = vec![Invariant::new("c", "c > 2000")];
960 let result = DeltaDataChecker::new_with_invariants(invariants)
961 .check_batch(&batch)
962 .await;
963 assert!(result.is_err());
964
965 let struct_fields = schema.fields().clone();
967 let schema = Arc::new(Schema::new(vec![Field::new(
968 "x",
969 ArrowDataType::Struct(struct_fields),
970 false,
971 )]));
972 let inner = Arc::new(StructArray::from(batch));
973 let batch = RecordBatch::try_new(schema, vec![inner]).unwrap();
974
975 let invariants = vec![Invariant::new("x.b", "x.b < 1000")];
976 let result = DeltaDataChecker::new_with_invariants(invariants)
977 .check_batch(&batch)
978 .await;
979 assert!(result.is_err());
980 assert!(matches!(result, Err(DeltaTableError::Generic { .. })));
981 }
982
983 #[tokio::test]
984 async fn test_enforce_constraints() {
985 let schema = Arc::new(Schema::new(vec![
986 Field::new("a", ArrowDataType::Utf8, false),
987 Field::new("b", ArrowDataType::Int32, false),
988 ]));
989 let batch = RecordBatch::try_new(
990 Arc::clone(&schema),
991 vec![
992 Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c", "d"])),
993 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
994 ],
995 )
996 .unwrap();
997 let constraints: Vec<Constraint> = vec![];
999 assert!(
1000 DeltaDataChecker::new_with_constraints(constraints)
1001 .check_batch(&batch)
1002 .await
1003 .is_ok()
1004 );
1005
1006 let constraints = vec![
1008 Constraint::new("custom_a", "a is not null"),
1009 Constraint::new("custom_b", "b < 1000"),
1010 ];
1011 assert!(
1012 DeltaDataChecker::new_with_constraints(constraints)
1013 .check_batch(&batch)
1014 .await
1015 .is_ok()
1016 );
1017
1018 let constraints = vec![
1020 Constraint::new("custom_a", "a is null"),
1021 Constraint::new("custom_B", "b < 100"),
1022 ];
1023 let result = DeltaDataChecker::new_with_constraints(constraints)
1024 .check_batch(&batch)
1025 .await;
1026 assert!(result.is_err());
1027 assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
1028 if let Err(DeltaTableError::InvalidData { violations }) = result {
1029 assert_eq!(violations.len(), 2);
1030 }
1031
1032 let constraints = vec![Constraint::new("custom_c", "c > 2000")];
1034 let result = DeltaDataChecker::new_with_constraints(constraints)
1035 .check_batch(&batch)
1036 .await;
1037 assert!(result.is_err());
1038 }
1039
1040 #[tokio::test]
1044 async fn test_constraints_with_spacey_fields() -> DeltaResult<()> {
1045 let schema = Arc::new(Schema::new(vec![
1046 Field::new("a", ArrowDataType::Utf8, false),
1047 Field::new("b bop", ArrowDataType::Int32, false),
1048 ]));
1049 let batch = RecordBatch::try_new(
1050 Arc::clone(&schema),
1051 vec![
1052 Arc::new(arrow::array::StringArray::from(vec![
1053 "a", "b bop", "c", "d",
1054 ])),
1055 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
1056 ],
1057 )?;
1058
1059 let constraints = vec![
1061 Constraint::new("custom a", "a is not null"),
1062 Constraint::new("custom_b", "`b bop` < 1000"),
1063 ];
1064 assert!(
1065 DeltaDataChecker::new_with_constraints(constraints)
1066 .check_batch(&batch)
1067 .await
1068 .is_ok()
1069 );
1070
1071 let constraints = vec![
1073 Constraint::new("custom_a", "a is null"),
1074 Constraint::new("custom_B", "\"b bop\" < 100"),
1075 ];
1076 let result = DeltaDataChecker::new_with_constraints(constraints)
1077 .check_batch(&batch)
1078 .await;
1079 assert!(result.is_err());
1080 assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
1081 if let Err(DeltaTableError::InvalidData { violations }) = result {
1082 assert_eq!(violations.len(), 2);
1083 }
1084
1085 let constraints = vec![Constraint::new("custom_c", "c > 2000")];
1087 let result = DeltaDataChecker::new_with_constraints(constraints)
1088 .check_batch(&batch)
1089 .await;
1090 assert!(result.is_err());
1091 Ok(())
1092 }
1093
1094 #[test]
1095 fn roundtrip_test_delta_exec_plan() {
1096 let ctx = SessionContext::new();
1097 let codec = DeltaPhysicalCodec {};
1098
1099 let schema = Arc::new(Schema::new(vec![
1100 Field::new("a", ArrowDataType::Utf8, false),
1101 Field::new("b", ArrowDataType::Int32, false),
1102 ]));
1103 let exec_plan = Arc::from(DeltaScan::new(
1104 &Url::parse("s3://my_bucket/this/is/some/path").unwrap(),
1105 DeltaScanConfig::default(),
1106 Arc::from(EmptyExec::new(schema.clone())),
1107 schema.clone(),
1108 ));
1109 let proto: protobuf::PhysicalPlanNode =
1110 protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
1111 .expect("to proto");
1112
1113 let task_ctx = ctx.task_ctx();
1114 let result_exec_plan: Arc<dyn ExecutionPlan> = proto
1115 .try_into_physical_plan(&task_ctx, &codec)
1116 .expect("from proto");
1117 assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
1118 }
1119
1120 #[tokio::test]
1121 async fn delta_table_provider_with_config() {
1122 let table_path = std::path::Path::new("../test/tests/data/delta-2.2.0-partitioned-types")
1123 .canonicalize()
1124 .unwrap();
1125 let table_url = url::Url::from_directory_path(table_path).unwrap();
1126 let table = crate::open_table(table_url).await.unwrap();
1127 let config = DeltaScanConfigBuilder::new()
1128 .with_file_column_name(&"file_source")
1129 .build(table.snapshot().unwrap().snapshot())
1130 .unwrap();
1131
1132 let log_store = table.log_store();
1133 let provider = DeltaTableProvider::try_new(
1134 table.snapshot().unwrap().snapshot().clone(),
1135 log_store,
1136 config,
1137 )
1138 .unwrap();
1139 let ctx = SessionContext::new();
1140 ctx.register_table("test", Arc::new(provider)).unwrap();
1141
1142 let df = ctx.sql("select * from test").await.unwrap();
1143 let actual = df.collect().await.unwrap();
1144 let expected = vec![
1145 "+----+----+----+-------------------------------------------------------------------------------+",
1146 "| c3 | c1 | c2 | file_source |",
1147 "+----+----+----+-------------------------------------------------------------------------------+",
1148 "| 4 | 6 | a | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
1149 "| 5 | 4 | c | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
1150 "| 6 | 5 | b | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
1151 "+----+----+----+-------------------------------------------------------------------------------+",
1152 ];
1153 assert_batches_sorted_eq!(&expected, &actual);
1154 }
1155
1156 #[tokio::test]
1157 async fn delta_scan_mixed_partition_order() {
1158 let schema = Arc::new(ArrowSchema::new(vec![
1161 Field::new("modified", ArrowDataType::Utf8, true),
1162 Field::new("id", ArrowDataType::Utf8, true),
1163 Field::new("value", ArrowDataType::Int32, true),
1164 ]));
1165
1166 let table = DeltaTable::new_in_memory()
1167 .create()
1168 .with_columns(get_delta_schema().fields().cloned())
1169 .with_partition_columns(["modified", "id"])
1170 .await
1171 .unwrap();
1172 assert_eq!(table.version(), Some(0));
1173
1174 let batch = RecordBatch::try_new(
1175 schema.clone(),
1176 vec![
1177 Arc::new(arrow::array::StringArray::from(vec![
1178 "2021-02-01",
1179 "2021-02-01",
1180 "2021-02-02",
1181 "2021-02-02",
1182 ])),
1183 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1184 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1185 ],
1186 )
1187 .unwrap();
1188 let table = table
1190 .write(vec![batch.clone()])
1191 .with_save_mode(crate::protocol::SaveMode::Append)
1192 .await
1193 .unwrap();
1194
1195 let config = DeltaScanConfigBuilder::new()
1196 .build(table.snapshot().unwrap().snapshot())
1197 .unwrap();
1198
1199 let log_store = table.log_store();
1200 let provider = DeltaTableProvider::try_new(
1201 table.snapshot().unwrap().snapshot().clone(),
1202 log_store,
1203 config,
1204 )
1205 .unwrap();
1206 let logical_schema = provider.schema();
1207 let ctx = SessionContext::new();
1208 ctx.register_table("test", Arc::new(provider)).unwrap();
1209
1210 let expected_logical_order = vec!["value", "modified", "id"];
1211 let actual_order: Vec<String> = logical_schema
1212 .fields()
1213 .iter()
1214 .map(|f| f.name().to_owned())
1215 .collect();
1216
1217 let df = ctx.sql("select * from test").await.unwrap();
1218 let actual = df.collect().await.unwrap();
1219 let expected = vec![
1220 "+-------+------------+----+",
1221 "| value | modified | id |",
1222 "+-------+------------+----+",
1223 "| 1 | 2021-02-01 | A |",
1224 "| 10 | 2021-02-01 | B |",
1225 "| 100 | 2021-02-02 | D |",
1226 "| 20 | 2021-02-02 | C |",
1227 "+-------+------------+----+",
1228 ];
1229 assert_batches_sorted_eq!(&expected, &actual);
1230 assert_eq!(expected_logical_order, actual_order);
1231 }
1232
1233 #[tokio::test]
1234 async fn delta_scan_case_sensitive() {
1235 let schema = Arc::new(ArrowSchema::new(vec![
1236 Field::new("moDified", ArrowDataType::Utf8, true),
1237 Field::new("ID", ArrowDataType::Utf8, true),
1238 Field::new("vaLue", ArrowDataType::Int32, true),
1239 ]));
1240
1241 let batch = RecordBatch::try_new(
1242 schema.clone(),
1243 vec![
1244 Arc::new(arrow::array::StringArray::from(vec![
1245 "2021-02-01",
1246 "2021-02-01",
1247 "2021-02-02",
1248 "2021-02-02",
1249 ])),
1250 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1251 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1252 ],
1253 )
1254 .unwrap();
1255 let table = DeltaTable::new_in_memory()
1257 .write(vec![batch.clone()])
1258 .with_save_mode(crate::protocol::SaveMode::Append)
1259 .await
1260 .unwrap();
1261
1262 let config = DeltaScanConfigBuilder::new()
1263 .build(table.snapshot().unwrap().snapshot())
1264 .unwrap();
1265 let log = table.log_store();
1266
1267 let provider =
1268 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1269 .unwrap();
1270 let ctx: SessionContext = DeltaSessionContext::default().into();
1271 ctx.register_table("test", Arc::new(provider)).unwrap();
1272
1273 let df = ctx
1274 .sql("select ID, moDified, vaLue from test")
1275 .await
1276 .unwrap();
1277 let actual = df.collect().await.unwrap();
1278 let expected = vec![
1279 "+----+------------+-------+",
1280 "| ID | moDified | vaLue |",
1281 "+----+------------+-------+",
1282 "| A | 2021-02-01 | 1 |",
1283 "| B | 2021-02-01 | 10 |",
1284 "| C | 2021-02-02 | 20 |",
1285 "| D | 2021-02-02 | 100 |",
1286 "+----+------------+-------+",
1287 ];
1288 assert_batches_sorted_eq!(&expected, &actual);
1289
1290 }
1302
1303 #[tokio::test]
1304 async fn delta_scan_supports_missing_columns() {
1305 let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1306 "col_1",
1307 ArrowDataType::Utf8,
1308 true,
1309 )]));
1310
1311 let batch1 = RecordBatch::try_new(
1312 schema1.clone(),
1313 vec![Arc::new(arrow::array::StringArray::from(vec![
1314 Some("A"),
1315 Some("B"),
1316 ]))],
1317 )
1318 .unwrap();
1319
1320 let schema2 = Arc::new(ArrowSchema::new(vec![
1321 Field::new("col_1", ArrowDataType::Utf8, true),
1322 Field::new("col_2", ArrowDataType::Utf8, true),
1323 ]));
1324
1325 let batch2 = RecordBatch::try_new(
1326 schema2.clone(),
1327 vec![
1328 Arc::new(arrow::array::StringArray::from(vec![
1329 Some("E"),
1330 Some("F"),
1331 Some("G"),
1332 ])),
1333 Arc::new(arrow::array::StringArray::from(vec![
1334 Some("E2"),
1335 Some("F2"),
1336 Some("G2"),
1337 ])),
1338 ],
1339 )
1340 .unwrap();
1341
1342 let table = DeltaTable::new_in_memory()
1343 .write(vec![batch2])
1344 .with_save_mode(crate::protocol::SaveMode::Append)
1345 .await
1346 .unwrap();
1347
1348 let table = table
1349 .write(vec![batch1])
1350 .with_schema_mode(SchemaMode::Merge)
1351 .with_save_mode(crate::protocol::SaveMode::Append)
1352 .await
1353 .unwrap();
1354
1355 let config = DeltaScanConfigBuilder::new()
1356 .build(table.snapshot().unwrap().snapshot())
1357 .unwrap();
1358 let log = table.log_store();
1359
1360 let provider =
1361 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1362 .unwrap();
1363 let ctx: SessionContext = DeltaSessionContext::default().into();
1364 ctx.register_table("test", Arc::new(provider)).unwrap();
1365
1366 let df = ctx.sql("select col_1, col_2 from test").await.unwrap();
1367 let actual = df.collect().await.unwrap();
1368 let expected = vec![
1369 "+-------+-------+",
1370 "| col_1 | col_2 |",
1371 "+-------+-------+",
1372 "| A | |",
1373 "| B | |",
1374 "| E | E2 |",
1375 "| F | F2 |",
1376 "| G | G2 |",
1377 "+-------+-------+",
1378 ];
1379 assert_batches_sorted_eq!(&expected, &actual);
1380 }
1381
1382 #[tokio::test]
1383 async fn delta_scan_supports_pushdown() {
1384 let schema = Arc::new(ArrowSchema::new(vec![
1385 Field::new("col_1", ArrowDataType::Utf8, false),
1386 Field::new("col_2", ArrowDataType::Utf8, false),
1387 ]));
1388
1389 let batch = RecordBatch::try_new(
1390 schema.clone(),
1391 vec![
1392 Arc::new(arrow::array::StringArray::from(vec![
1393 Some("A"),
1394 Some("B"),
1395 Some("C"),
1396 ])),
1397 Arc::new(arrow::array::StringArray::from(vec![
1398 Some("A2"),
1399 Some("B2"),
1400 Some("C2"),
1401 ])),
1402 ],
1403 )
1404 .unwrap();
1405
1406 let table = DeltaTable::new_in_memory()
1407 .write(vec![batch])
1408 .with_save_mode(crate::protocol::SaveMode::Append)
1409 .await
1410 .unwrap();
1411
1412 let config = DeltaScanConfigBuilder::new()
1413 .build(table.snapshot().unwrap().snapshot())
1414 .unwrap();
1415 let log = table.log_store();
1416
1417 let provider =
1418 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1419 .unwrap();
1420
1421 let mut cfg = SessionConfig::default();
1422 cfg.options_mut().execution.parquet.pushdown_filters = true;
1423 let ctx = SessionContext::new_with_config(cfg);
1424 ctx.register_table("test", Arc::new(provider)).unwrap();
1425
1426 let df = ctx
1427 .sql("select col_1, col_2 from test WHERE col_1 = 'A'")
1428 .await
1429 .unwrap();
1430 let actual = df.collect().await.unwrap();
1431 let expected = vec![
1432 "+-------+-------+",
1433 "| col_1 | col_2 |",
1434 "+-------+-------+",
1435 "| A | A2 |",
1436 "+-------+-------+",
1437 ];
1438 assert_batches_sorted_eq!(&expected, &actual);
1439 }
1440
1441 #[tokio::test]
1442 async fn delta_scan_supports_nested_missing_columns() {
1443 let column1_schema1: arrow::datatypes::Fields =
1444 vec![Field::new("col_1a", ArrowDataType::Utf8, true)].into();
1445 let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1446 "col_1",
1447 ArrowDataType::Struct(column1_schema1.clone()),
1448 true,
1449 )]));
1450
1451 let batch1 = RecordBatch::try_new(
1452 schema1.clone(),
1453 vec![Arc::new(StructArray::new(
1454 column1_schema1,
1455 vec![Arc::new(arrow::array::StringArray::from(vec![
1456 Some("A"),
1457 Some("B"),
1458 ]))],
1459 None,
1460 ))],
1461 )
1462 .unwrap();
1463
1464 let column1_schema2: arrow_schema::Fields = vec![
1465 Field::new("col_1a", ArrowDataType::Utf8, true),
1466 Field::new("col_1b", ArrowDataType::Utf8, true),
1467 ]
1468 .into();
1469 let schema2 = Arc::new(ArrowSchema::new(vec![Field::new(
1470 "col_1",
1471 ArrowDataType::Struct(column1_schema2.clone()),
1472 true,
1473 )]));
1474
1475 let batch2 = RecordBatch::try_new(
1476 schema2.clone(),
1477 vec![Arc::new(StructArray::new(
1478 column1_schema2,
1479 vec![
1480 Arc::new(arrow::array::StringArray::from(vec![
1481 Some("E"),
1482 Some("F"),
1483 Some("G"),
1484 ])),
1485 Arc::new(arrow::array::StringArray::from(vec![
1486 Some("E2"),
1487 Some("F2"),
1488 Some("G2"),
1489 ])),
1490 ],
1491 None,
1492 ))],
1493 )
1494 .unwrap();
1495
1496 let table = DeltaTable::new_in_memory()
1497 .write(vec![batch1])
1498 .with_save_mode(crate::protocol::SaveMode::Append)
1499 .await
1500 .unwrap();
1501
1502 let table = table
1503 .write(vec![batch2])
1504 .with_schema_mode(SchemaMode::Merge)
1505 .with_save_mode(crate::protocol::SaveMode::Append)
1506 .await
1507 .unwrap();
1508
1509 let config = DeltaScanConfigBuilder::new()
1510 .build(table.snapshot().unwrap().snapshot())
1511 .unwrap();
1512 let log = table.log_store();
1513
1514 let provider =
1515 DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1516 .unwrap();
1517 let ctx: SessionContext = DeltaSessionContext::default().into();
1518 ctx.register_table("test", Arc::new(provider)).unwrap();
1519
1520 let df = ctx
1521 .sql("select col_1.col_1a, col_1.col_1b from test")
1522 .await
1523 .unwrap();
1524 let actual = df.collect().await.unwrap();
1525 let expected = vec![
1526 "+--------------------+--------------------+",
1527 "| test.col_1[col_1a] | test.col_1[col_1b] |",
1528 "+--------------------+--------------------+",
1529 "| A | |",
1530 "| B | |",
1531 "| E | E2 |",
1532 "| F | F2 |",
1533 "| G | G2 |",
1534 "+--------------------+--------------------+",
1535 ];
1536 assert_batches_sorted_eq!(&expected, &actual);
1537 }
1538
1539 #[tokio::test]
1540 async fn test_multiple_predicate_pushdown() {
1541 use crate::datafusion::prelude::SessionContext;
1542 let schema = Arc::new(ArrowSchema::new(vec![
1543 Field::new("moDified", ArrowDataType::Utf8, true),
1544 Field::new("id", ArrowDataType::Utf8, true),
1545 Field::new("vaLue", ArrowDataType::Int32, true),
1546 ]));
1547
1548 let batch = RecordBatch::try_new(
1549 schema.clone(),
1550 vec![
1551 Arc::new(arrow::array::StringArray::from(vec![
1552 "2021-02-01",
1553 "2021-02-01",
1554 "2021-02-02",
1555 "2021-02-02",
1556 ])),
1557 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1558 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1559 ],
1560 )
1561 .unwrap();
1562 let table = DeltaTable::new_in_memory()
1564 .write(vec![batch.clone()])
1565 .with_save_mode(crate::protocol::SaveMode::Append)
1566 .await
1567 .unwrap();
1568
1569 let datafusion = SessionContext::new();
1570 let table = Arc::new(table);
1571
1572 datafusion.register_table("snapshot", table).unwrap();
1573
1574 let df = datafusion
1575 .sql("select * from snapshot where id > 10000 and id < 20000")
1576 .await
1577 .unwrap();
1578
1579 df.collect().await.unwrap();
1580 }
1581
1582 #[tokio::test]
1583 async fn test_delta_scan_builder_no_scan_config() {
1584 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1585 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1586 let table = DeltaTable::new_in_memory()
1587 .write(vec![batch])
1588 .with_save_mode(crate::protocol::SaveMode::Append)
1589 .await
1590 .unwrap();
1591
1592 let ctx = SessionContext::new();
1593 let state = ctx.state();
1594 let scan = DeltaScanBuilder::new(
1595 table.snapshot().unwrap().snapshot(),
1596 table.log_store(),
1597 &state,
1598 )
1599 .with_filter(Some(col("a").eq(lit("s"))))
1600 .build()
1601 .await
1602 .unwrap();
1603
1604 let mut visitor = ParquetVisitor::default();
1605 visit_execution_plan(&scan, &mut visitor).unwrap();
1606
1607 assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
1608 }
1609
1610 #[tokio::test]
1611 async fn test_delta_scan_builder_scan_config_disable_pushdown() {
1612 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1613 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1614 let table = DeltaTable::new_in_memory()
1615 .write(vec![batch])
1616 .with_save_mode(crate::protocol::SaveMode::Append)
1617 .await
1618 .unwrap();
1619
1620 let snapshot = table.snapshot().unwrap();
1621 let ctx = SessionContext::new();
1622 let state = ctx.state();
1623 let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1624 .with_filter(Some(col("a").eq(lit("s"))))
1625 .with_scan_config(
1626 DeltaScanConfigBuilder::new()
1627 .with_parquet_pushdown(false)
1628 .build(snapshot.snapshot())
1629 .unwrap(),
1630 )
1631 .build()
1632 .await
1633 .unwrap();
1634
1635 let mut visitor = ParquetVisitor::default();
1636 visit_execution_plan(&scan, &mut visitor).unwrap();
1637
1638 assert!(visitor.predicate.is_none());
1639 }
1640
1641 #[tokio::test]
1642 async fn test_delta_scan_applies_parquet_options() {
1643 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1644 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1645 let table = DeltaTable::new_in_memory()
1646 .write(vec![batch])
1647 .with_save_mode(crate::protocol::SaveMode::Append)
1648 .await
1649 .unwrap();
1650
1651 let snapshot = table.snapshot().unwrap();
1652
1653 let mut config = SessionConfig::default();
1654 config.options_mut().execution.parquet.pushdown_filters = true;
1655 let ctx = SessionContext::new_with_config(config);
1656 let state = ctx.state();
1657
1658 let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1659 .build()
1660 .await
1661 .unwrap();
1662
1663 let mut visitor = ParquetVisitor::default();
1664 visit_execution_plan(&scan, &mut visitor).unwrap();
1665
1666 assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
1667 }
1668
1669 #[derive(Default)]
1671 struct ParquetVisitor {
1672 predicate: Option<Arc<dyn PhysicalExpr>>,
1673 options: Option<TableParquetOptions>,
1674 }
1675
1676 impl ExecutionPlanVisitor for ParquetVisitor {
1677 type Error = DataFusionError;
1678
1679 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
1680 let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
1681 return Ok(true);
1682 };
1683
1684 let Some(scan_config) = datasource_exec
1685 .data_source()
1686 .as_any()
1687 .downcast_ref::<FileScanConfig>()
1688 else {
1689 return Ok(true);
1690 };
1691
1692 if let Some(parquet_source) = scan_config
1693 .file_source
1694 .as_any()
1695 .downcast_ref::<ParquetSource>()
1696 {
1697 self.options = Some(parquet_source.table_parquet_options().clone());
1698 self.predicate = parquet_source.filter();
1699 }
1700
1701 Ok(true)
1702 }
1703 }
1704
1705 #[tokio::test]
1706 async fn passes_sanity_checker_when_all_files_filtered() {
1707 let table_path = std::path::Path::new("../test/tests/data/delta-2.2.0-partitioned-types")
1713 .canonicalize()
1714 .unwrap();
1715 let table_url = url::Url::from_directory_path(table_path).unwrap();
1716 let table = crate::open_table(table_url).await.unwrap();
1717 let ctx = SessionContext::new();
1718 ctx.register_table("test", Arc::new(table)).unwrap();
1719
1720 let df = ctx
1721 .sql("select * from test where c3 = 100 ORDER BY c1 ASC")
1722 .await
1723 .unwrap();
1724 let actual = df.collect().await.unwrap();
1725
1726 assert_eq!(actual.len(), 0);
1727 }
1728
1729 #[tokio::test]
1730 async fn test_check_nullability() -> DeltaResult<()> {
1731 use arrow::array::StringArray;
1732
1733 let data_checker = DeltaDataChecker {
1734 non_nullable_columns: vec!["zed".to_string(), "yap".to_string()],
1735 ..Default::default()
1736 };
1737
1738 let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
1739 let nulls: Arc<dyn Array> = Arc::new(StringArray::new_null(1));
1740 let batch = RecordBatch::try_from_iter(vec![("a", arr), ("zed", nulls)]).unwrap();
1741
1742 let result = data_checker.check_nullability(&batch);
1743 assert!(
1744 result.is_err(),
1745 "The result should have errored! {result:?}"
1746 );
1747
1748 let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
1749 let batch = RecordBatch::try_from_iter(vec![("zed", arr)]).unwrap();
1750 let result = data_checker.check_nullability(&batch);
1751 assert!(
1752 result.is_err(),
1753 "The result should have errored! {result:?}"
1754 );
1755
1756 let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
1757 let batch = RecordBatch::try_from_iter(vec![("zed", arr.clone()), ("yap", arr)]).unwrap();
1758 let _ = data_checker.check_nullability(&batch)?;
1759
1760 Ok(())
1761 }
1762
1763 #[tokio::test]
1764 async fn test_delta_scan_uses_parquet_column_pruning() {
1765 let small: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["a"]));
1766 let large: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec![
1767 "b".repeat(1024).as_str(),
1768 ]));
1769 let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap();
1770 let table = DeltaTable::new_in_memory()
1771 .write(vec![batch])
1772 .with_save_mode(crate::protocol::SaveMode::Append)
1773 .await
1774 .unwrap();
1775
1776 let config = DeltaScanConfigBuilder::new()
1777 .build(table.snapshot().unwrap().snapshot())
1778 .unwrap();
1779
1780 let (object_store, mut operations) =
1781 RecordingObjectStore::new(table.log_store().object_store(None));
1782 let both_store = Arc::new(object_store);
1784 let log_store = DefaultLogStore::new(
1785 both_store.clone(),
1786 both_store,
1787 table.log_store().config().clone(),
1788 );
1789 let provider = DeltaTableProvider::try_new(
1790 table.snapshot().unwrap().snapshot().clone(),
1791 Arc::new(log_store),
1792 config,
1793 )
1794 .unwrap();
1795 let ctx = SessionContext::new();
1796 ctx.register_table("test", Arc::new(provider)).unwrap();
1797 let state = ctx.state();
1798
1799 let df = ctx.sql("select small from test").await.unwrap();
1800 let plan = df.create_physical_plan().await.unwrap();
1801
1802 let mut stream = plan.execute(0, state.task_ctx()).unwrap();
1803 let Some(Ok(batch)) = stream.next().await else {
1804 panic!()
1805 };
1806 assert!(stream.next().await.is_none());
1807 assert_eq!(1, batch.num_columns());
1808 assert_eq!(1, batch.num_rows());
1809 let small = batch.column_by_name("small").unwrap().as_string::<i32>();
1810 assert_eq!("a", small.iter().next().unwrap().unwrap());
1811
1812 let expected = vec![
1813 ObjectStoreOperation::Get(LocationType::Commit),
1814 ObjectStoreOperation::GetRange(LocationType::Data, 957..965),
1815 ObjectStoreOperation::GetRange(LocationType::Data, 326..957),
1816 ];
1817 let mut actual = Vec::new();
1818 operations.recv_many(&mut actual, 3).await;
1819 assert_eq!(expected, actual);
1820 }
1821
1822 #[tokio::test]
1823 async fn test_push_down_filter_panic_2602() -> DeltaResult<()> {
1824 use crate::kernel::schema::{DataType, PrimitiveType};
1825 let ctx = SessionContext::new();
1826 let table = DeltaTable::new_in_memory()
1827 .create()
1828 .with_column("id", DataType::Primitive(PrimitiveType::Long), true, None)
1829 .with_column(
1830 "name",
1831 DataType::Primitive(PrimitiveType::String),
1832 true,
1833 None,
1834 )
1835 .with_column("b", DataType::Primitive(PrimitiveType::Boolean), true, None)
1836 .with_column(
1837 "ts",
1838 DataType::Primitive(PrimitiveType::Timestamp),
1839 true,
1840 None,
1841 )
1842 .with_column("dt", DataType::Primitive(PrimitiveType::Date), true, None)
1843 .with_column(
1844 "zap",
1845 DataType::Array(Box::new(ArrayType::new(
1846 DataType::Primitive(PrimitiveType::Boolean),
1847 true,
1848 ))),
1849 true,
1850 None,
1851 )
1852 .await?;
1853
1854 ctx.register_table("snapshot", Arc::new(table)).unwrap();
1855
1856 let df = ctx
1857 .sql("select * from snapshot where id > 10000 and id < 20000")
1858 .await
1859 .unwrap();
1860
1861 let _ = df.collect().await?;
1862 Ok(())
1863 }
1864
1865 struct RecordingObjectStore {
1867 inner: ObjectStoreRef,
1868 operations: UnboundedSender<ObjectStoreOperation>,
1869 }
1870
1871 impl RecordingObjectStore {
1872 fn new(inner: ObjectStoreRef) -> (Self, UnboundedReceiver<ObjectStoreOperation>) {
1874 let (operations, operations_receiver) = unbounded_channel();
1875 (Self { inner, operations }, operations_receiver)
1876 }
1877 }
1878
1879 impl Display for RecordingObjectStore {
1880 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1881 Display::fmt(&self.inner, f)
1882 }
1883 }
1884
1885 impl Debug for RecordingObjectStore {
1886 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1887 Debug::fmt(&self.inner, f)
1888 }
1889 }
1890
1891 #[derive(Debug, PartialEq)]
1892 enum ObjectStoreOperation {
1893 GetRanges(LocationType, Vec<Range<u64>>),
1894 GetRange(LocationType, Range<u64>),
1895 GetOpts(LocationType),
1896 Get(LocationType),
1897 }
1898
1899 #[derive(Debug, PartialEq)]
1900 enum LocationType {
1901 Data,
1902 Commit,
1903 }
1904
1905 impl From<&Path> for LocationType {
1906 fn from(value: &Path) -> Self {
1907 let dummy_url = Url::parse("dummy:///").unwrap();
1908 let parsed = ParsedLogPath::try_from(dummy_url.join(value.as_ref()).unwrap()).unwrap();
1909 if let Some(parsed) = parsed
1910 && matches!(parsed.file_type, LogPathFileType::Commit)
1911 {
1912 return LocationType::Commit;
1913 }
1914 if value.to_string().starts_with("part-") {
1915 LocationType::Data
1916 } else {
1917 panic!("Unknown location type: {value:?}")
1918 }
1919 }
1920 }
1921
1922 #[async_trait::async_trait]
1924 impl ObjectStore for RecordingObjectStore {
1925 async fn put(
1926 &self,
1927 location: &Path,
1928 payload: PutPayload,
1929 ) -> object_store::Result<PutResult> {
1930 self.inner.put(location, payload).await
1931 }
1932
1933 async fn put_opts(
1934 &self,
1935 location: &Path,
1936 payload: PutPayload,
1937 opts: PutOptions,
1938 ) -> object_store::Result<PutResult> {
1939 self.inner.put_opts(location, payload, opts).await
1940 }
1941
1942 async fn put_multipart(
1943 &self,
1944 location: &Path,
1945 ) -> object_store::Result<Box<dyn MultipartUpload>> {
1946 self.inner.put_multipart(location).await
1947 }
1948
1949 async fn put_multipart_opts(
1950 &self,
1951 location: &Path,
1952 opts: PutMultipartOptions,
1953 ) -> object_store::Result<Box<dyn MultipartUpload>> {
1954 self.inner.put_multipart_opts(location, opts).await
1955 }
1956
1957 async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
1958 self.operations
1959 .send(ObjectStoreOperation::Get(location.into()))
1960 .unwrap();
1961 self.inner.get(location).await
1962 }
1963
1964 async fn get_opts(
1965 &self,
1966 location: &Path,
1967 options: GetOptions,
1968 ) -> object_store::Result<GetResult> {
1969 self.operations
1970 .send(ObjectStoreOperation::GetOpts(location.into()))
1971 .unwrap();
1972 self.inner.get_opts(location, options).await
1973 }
1974
1975 async fn get_range(
1976 &self,
1977 location: &Path,
1978 range: Range<u64>,
1979 ) -> object_store::Result<Bytes> {
1980 self.operations
1981 .send(ObjectStoreOperation::GetRange(
1982 location.into(),
1983 range.clone(),
1984 ))
1985 .unwrap();
1986 self.inner.get_range(location, range).await
1987 }
1988
1989 async fn get_ranges(
1990 &self,
1991 location: &Path,
1992 ranges: &[Range<u64>],
1993 ) -> object_store::Result<Vec<Bytes>> {
1994 self.operations
1995 .send(ObjectStoreOperation::GetRanges(
1996 location.into(),
1997 ranges.to_vec(),
1998 ))
1999 .unwrap();
2000 self.inner.get_ranges(location, ranges).await
2001 }
2002
2003 async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
2004 self.inner.head(location).await
2005 }
2006
2007 async fn delete(&self, location: &Path) -> object_store::Result<()> {
2008 self.inner.delete(location).await
2009 }
2010
2011 fn delete_stream<'a>(
2012 &'a self,
2013 locations: BoxStream<'a, object_store::Result<Path>>,
2014 ) -> BoxStream<'a, object_store::Result<Path>> {
2015 self.inner.delete_stream(locations)
2016 }
2017
2018 fn list(
2019 &self,
2020 prefix: Option<&Path>,
2021 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
2022 self.inner.list(prefix)
2023 }
2024
2025 fn list_with_offset(
2026 &self,
2027 prefix: Option<&Path>,
2028 offset: &Path,
2029 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
2030 self.inner.list_with_offset(prefix, offset)
2031 }
2032
2033 async fn list_with_delimiter(
2034 &self,
2035 prefix: Option<&Path>,
2036 ) -> object_store::Result<ListResult> {
2037 self.inner.list_with_delimiter(prefix).await
2038 }
2039
2040 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
2041 self.inner.copy(from, to).await
2042 }
2043
2044 async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
2045 self.inner.rename(from, to).await
2046 }
2047
2048 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
2049 self.inner.copy_if_not_exists(from, to).await
2050 }
2051
2052 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
2053 self.inner.rename_if_not_exists(from, to).await
2054 }
2055 }
2056}