1use std::any::Any;
24use std::borrow::Cow;
25use std::collections::{HashMap, HashSet};
26use std::fmt::{self, Debug};
27use std::sync::Arc;
28
29use arrow_array::types::UInt16Type;
30use arrow_array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
31use arrow_cast::display::array_value_to_string;
32use arrow_cast::{cast_with_options, CastOptions};
33use arrow_schema::{
34 ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema, SchemaRef,
35 SchemaRef as ArrowSchemaRef, TimeUnit,
36};
37use arrow_select::concat::concat_batches;
38use async_trait::async_trait;
39use chrono::{DateTime, TimeZone, Utc};
40use datafusion::catalog::memory::DataSourceExec;
41use datafusion::catalog::{Session, TableProviderFactory};
42use datafusion::config::TableParquetOptions;
43use datafusion::datasource::physical_plan::{
44 wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileGroup, FileScanConfigBuilder,
45 ParquetSource,
46};
47use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
48use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
49use datafusion::execution::runtime_env::RuntimeEnv;
50use datafusion::execution::FunctionRegistry;
51use datafusion::optimizer::simplify_expressions::ExprSimplifier;
52use datafusion::physical_optimizer::pruning::PruningPredicate;
53use datafusion_common::scalar::ScalarValue;
54use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
55use datafusion_common::{
56 config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult,
57 TableReference, ToDFSchema,
58};
59use datafusion_expr::execution_props::ExecutionProps;
60use datafusion_expr::logical_plan::CreateExternalTable;
61use datafusion_expr::simplify::SimplifyContext;
62use datafusion_expr::utils::conjunction;
63use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility};
64use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
65use datafusion_physical_plan::filter::FilterExec;
66use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
67use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
68use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
69use datafusion_physical_plan::projection::ProjectionExec;
70use datafusion_physical_plan::{
71 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream,
72 Statistics,
73};
74use datafusion_proto::logical_plan::LogicalExtensionCodec;
75use datafusion_proto::physical_plan::PhysicalExtensionCodec;
76use datafusion_sql::planner::ParserOptions;
77use either::Either;
78use futures::TryStreamExt;
79use itertools::Itertools;
80use object_store::ObjectMeta;
81use parking_lot::RwLock;
82use serde::{Deserialize, Serialize};
83
84use url::Url;
85
86use crate::delta_datafusion::expr::parse_predicate_expression;
87use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;
88use crate::errors::{DeltaResult, DeltaTableError};
89use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt};
90use crate::logstore::LogStoreRef;
91use crate::table::builder::ensure_table_uri;
92use crate::table::state::DeltaTableState;
93use crate::table::{Constraint, GeneratedColumn};
94use crate::{open_table, open_table_with_storage_options, DeltaTable};
95
96pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
97
98pub mod cdf;
99pub mod expr;
100pub mod logical;
101pub mod physical;
102pub mod planner;
103
104pub use cdf::scan::DeltaCdfTableProvider;
105
106mod schema_adapter;
107
108impl From<DeltaTableError> for DataFusionError {
109 fn from(err: DeltaTableError) -> Self {
110 match err {
111 DeltaTableError::Arrow { source } => DataFusionError::ArrowError(source, None),
112 DeltaTableError::Io { source } => DataFusionError::IoError(source),
113 DeltaTableError::ObjectStore { source } => DataFusionError::ObjectStore(source),
114 DeltaTableError::Parquet { source } => DataFusionError::ParquetError(source),
115 _ => DataFusionError::External(Box::new(err)),
116 }
117 }
118}
119
120impl From<DataFusionError> for DeltaTableError {
121 fn from(err: DataFusionError) -> Self {
122 match err {
123 DataFusionError::ArrowError(source, _) => DeltaTableError::Arrow { source },
124 DataFusionError::IoError(source) => DeltaTableError::Io { source },
125 DataFusionError::ObjectStore(source) => DeltaTableError::ObjectStore { source },
126 DataFusionError::ParquetError(source) => DeltaTableError::Parquet { source },
127 _ => DeltaTableError::Generic(err.to_string()),
128 }
129 }
130}
131
132pub trait DataFusionMixins {
134 fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef>;
136
137 fn input_schema(&self) -> DeltaResult<ArrowSchemaRef>;
139
140 fn parse_predicate_expression(
142 &self,
143 expr: impl AsRef<str>,
144 df_state: &SessionState,
145 ) -> DeltaResult<Expr>;
146}
147
148impl DataFusionMixins for Snapshot {
149 fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
150 _arrow_schema(self, true)
151 }
152
153 fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
154 _arrow_schema(self, false)
155 }
156
157 fn parse_predicate_expression(
158 &self,
159 expr: impl AsRef<str>,
160 df_state: &SessionState,
161 ) -> DeltaResult<Expr> {
162 let schema = DFSchema::try_from(self.arrow_schema()?.as_ref().to_owned())?;
163 parse_predicate_expression(&schema, expr, df_state)
164 }
165}
166
167impl DataFusionMixins for EagerSnapshot {
168 fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
169 self.snapshot().arrow_schema()
170 }
171
172 fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
173 self.snapshot().input_schema()
174 }
175
176 fn parse_predicate_expression(
177 &self,
178 expr: impl AsRef<str>,
179 df_state: &SessionState,
180 ) -> DeltaResult<Expr> {
181 self.snapshot().parse_predicate_expression(expr, df_state)
182 }
183}
184
185impl DataFusionMixins for DeltaTableState {
186 fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
187 self.snapshot.arrow_schema()
188 }
189
190 fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
191 self.snapshot.input_schema()
192 }
193
194 fn parse_predicate_expression(
195 &self,
196 expr: impl AsRef<str>,
197 df_state: &SessionState,
198 ) -> DeltaResult<Expr> {
199 self.snapshot.parse_predicate_expression(expr, df_state)
200 }
201}
202
203fn _arrow_schema(snapshot: &Snapshot, wrap_partitions: bool) -> DeltaResult<ArrowSchemaRef> {
204 let meta = snapshot.metadata();
205
206 let schema = meta.schema()?;
207 let fields = schema
208 .fields()
209 .filter(|f| !meta.partition_columns.contains(&f.name().to_string()))
210 .map(|f| f.try_into())
211 .chain(
212 meta.partition_columns.iter().map(|partition_col| {
215 let f = schema.field(partition_col).unwrap();
216 let field = Field::try_from(f)?;
217 let corrected = if wrap_partitions {
218 match field.data_type() {
219 ArrowDataType::Utf8
222 | ArrowDataType::LargeUtf8
223 | ArrowDataType::Binary
224 | ArrowDataType::LargeBinary => {
225 wrap_partition_type_in_dict(field.data_type().clone())
226 }
227 _ => field.data_type().clone(),
228 }
229 } else {
230 field.data_type().clone()
231 };
232 Ok(field.with_data_type(corrected))
233 }),
234 )
235 .collect::<Result<Vec<Field>, _>>()?;
236
237 Ok(Arc::new(ArrowSchema::new(fields)))
238}
239
240pub(crate) fn files_matching_predicate<'a>(
241 snapshot: &'a EagerSnapshot,
242 filters: &[Expr],
243) -> DeltaResult<impl Iterator<Item = Add> + 'a> {
244 if let Some(Some(predicate)) =
245 (!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
246 {
247 let expr = SessionContext::new()
248 .create_physical_expr(predicate, &snapshot.arrow_schema()?.to_dfschema()?)?;
249 let pruning_predicate = PruningPredicate::try_new(expr, snapshot.arrow_schema()?)?;
250 Ok(Either::Left(
251 snapshot
252 .file_actions()?
253 .zip(pruning_predicate.prune(snapshot)?)
254 .filter_map(
255 |(action, keep_file)| {
256 if keep_file {
257 Some(action)
258 } else {
259 None
260 }
261 },
262 ),
263 ))
264 } else {
265 Ok(Either::Right(snapshot.file_actions()?))
266 }
267}
268
269pub(crate) fn get_path_column<'a>(
270 batch: &'a RecordBatch,
271 path_column: &str,
272) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
273 let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
274 batch
275 .column_by_name(path_column)
276 .unwrap()
277 .as_any()
278 .downcast_ref::<DictionaryArray<UInt16Type>>()
279 .ok_or_else(err)?
280 .downcast_dict::<StringArray>()
281 .ok_or_else(err)
282}
283
284impl DeltaTableState {
285 pub fn datafusion_table_statistics(&self) -> Option<Statistics> {
287 self.snapshot.datafusion_table_statistics()
288 }
289}
290
291pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
294 let object_store_url = store.object_store_url();
295 let url: &Url = object_store_url.as_ref();
296 env.register_object_store(url, store.object_store(None));
297}
298
299pub(crate) fn df_logical_schema(
303 snapshot: &DeltaTableState,
304 file_column_name: &Option<String>,
305 schema: Option<ArrowSchemaRef>,
306) -> DeltaResult<SchemaRef> {
307 let input_schema = match schema {
308 Some(schema) => schema,
309 None => snapshot.input_schema()?,
310 };
311 let table_partition_cols = &snapshot.metadata().partition_columns;
312
313 let mut fields: Vec<Arc<Field>> = input_schema
314 .fields()
315 .iter()
316 .filter(|f| !table_partition_cols.contains(f.name()))
317 .cloned()
318 .collect();
319
320 for partition_col in table_partition_cols.iter() {
321 fields.push(Arc::new(
322 input_schema
323 .field_with_name(partition_col)
324 .unwrap()
325 .to_owned(),
326 ));
327 }
328
329 if let Some(file_column_name) = file_column_name {
330 fields.push(Arc::new(Field::new(
331 file_column_name,
332 ArrowDataType::Utf8,
333 true,
334 )));
335 }
336
337 Ok(Arc::new(ArrowSchema::new(fields)))
338}
339
340#[derive(Debug, Clone)]
341pub struct DeltaScanConfigBuilder {
343 include_file_column: bool,
345 file_column_name: Option<String>,
350 wrap_partition_values: Option<bool>,
352 enable_parquet_pushdown: bool,
354 schema: Option<SchemaRef>,
356}
357
358impl Default for DeltaScanConfigBuilder {
359 fn default() -> Self {
360 DeltaScanConfigBuilder {
361 include_file_column: false,
362 file_column_name: None,
363 wrap_partition_values: None,
364 enable_parquet_pushdown: true,
365 schema: None,
366 }
367 }
368}
369
370impl DeltaScanConfigBuilder {
371 pub fn new() -> Self {
373 Self::default()
374 }
375
376 pub fn with_file_column(mut self, include: bool) -> Self {
379 self.include_file_column = include;
380 self.file_column_name = None;
381 self
382 }
383
384 pub fn with_file_column_name<S: ToString>(mut self, name: &S) -> Self {
386 self.file_column_name = Some(name.to_string());
387 self.include_file_column = true;
388 self
389 }
390
391 pub fn wrap_partition_values(mut self, wrap: bool) -> Self {
393 self.wrap_partition_values = Some(wrap);
394 self
395 }
396
397 pub fn with_parquet_pushdown(mut self, pushdown: bool) -> Self {
400 self.enable_parquet_pushdown = pushdown;
401 self
402 }
403
404 pub fn with_schema(mut self, schema: SchemaRef) -> Self {
406 self.schema = Some(schema);
407 self
408 }
409
410 pub fn build(&self, snapshot: &DeltaTableState) -> DeltaResult<DeltaScanConfig> {
412 let file_column_name = if self.include_file_column {
413 let input_schema = snapshot.input_schema()?;
414 let mut column_names: HashSet<&String> = HashSet::new();
415 for field in input_schema.fields.iter() {
416 column_names.insert(field.name());
417 }
418
419 match &self.file_column_name {
420 Some(name) => {
421 if column_names.contains(name) {
422 return Err(DeltaTableError::Generic(format!(
423 "Unable to add file path column since column with name {name} exits"
424 )));
425 }
426
427 Some(name.to_owned())
428 }
429 None => {
430 let prefix = PATH_COLUMN;
431 let mut idx = 0;
432 let mut name = prefix.to_owned();
433
434 while column_names.contains(&name) {
435 idx += 1;
436 name = format!("{prefix}_{idx}");
437 }
438
439 Some(name)
440 }
441 }
442 } else {
443 None
444 };
445
446 Ok(DeltaScanConfig {
447 file_column_name,
448 wrap_partition_values: self.wrap_partition_values.unwrap_or(true),
449 enable_parquet_pushdown: self.enable_parquet_pushdown,
450 schema: self.schema.clone(),
451 })
452 }
453}
454
455#[derive(Debug, Clone, Default, Serialize, Deserialize)]
456pub struct DeltaScanConfig {
458 pub file_column_name: Option<String>,
460 pub wrap_partition_values: bool,
462 pub enable_parquet_pushdown: bool,
464 pub schema: Option<SchemaRef>,
466}
467
468pub(crate) struct DeltaScanBuilder<'a> {
469 snapshot: &'a DeltaTableState,
470 log_store: LogStoreRef,
471 filter: Option<Expr>,
472 session: &'a dyn Session,
473 projection: Option<&'a Vec<usize>>,
474 limit: Option<usize>,
475 files: Option<&'a [Add]>,
476 config: Option<DeltaScanConfig>,
477}
478
479impl<'a> DeltaScanBuilder<'a> {
480 pub fn new(
481 snapshot: &'a DeltaTableState,
482 log_store: LogStoreRef,
483 session: &'a dyn Session,
484 ) -> Self {
485 DeltaScanBuilder {
486 snapshot,
487 log_store,
488 filter: None,
489 session,
490 projection: None,
491 limit: None,
492 files: None,
493 config: None,
494 }
495 }
496
497 pub fn with_filter(mut self, filter: Option<Expr>) -> Self {
498 self.filter = filter;
499 self
500 }
501
502 pub fn with_files(mut self, files: &'a [Add]) -> Self {
503 self.files = Some(files);
504 self
505 }
506
507 pub fn with_projection(mut self, projection: Option<&'a Vec<usize>>) -> Self {
508 self.projection = projection;
509 self
510 }
511
512 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
513 self.limit = limit;
514 self
515 }
516
517 pub fn with_scan_config(mut self, config: DeltaScanConfig) -> Self {
518 self.config = Some(config);
519 self
520 }
521
522 pub async fn build(self) -> DeltaResult<DeltaScan> {
523 let config = match self.config {
524 Some(config) => config,
525 None => DeltaScanConfigBuilder::new().build(self.snapshot)?,
526 };
527
528 let schema = match config.schema.clone() {
529 Some(value) => Ok(value),
530 None => self.snapshot.arrow_schema(),
531 }?;
532
533 let logical_schema = df_logical_schema(
534 self.snapshot,
535 &config.file_column_name,
536 Some(schema.clone()),
537 )?;
538
539 let logical_schema = if let Some(used_columns) = self.projection {
540 let mut fields = vec![];
541 for idx in used_columns {
542 fields.push(logical_schema.field(*idx).to_owned());
543 }
544 Arc::new(ArrowSchema::new(fields))
545 } else {
546 logical_schema
547 };
548
549 let context = SessionContext::new();
550 let df_schema = logical_schema.clone().to_dfschema()?;
551
552 let logical_filter = self.filter.map(|expr| {
553 let props = ExecutionProps::new();
555 let simplify_context =
556 SimplifyContext::new(&props).with_schema(df_schema.clone().into());
557 let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10);
558 let simplified = simplifier.simplify(expr).unwrap();
559
560 context
561 .create_physical_expr(simplified, &df_schema)
562 .unwrap()
563 });
564
565 let (files, files_scanned, files_pruned) = match self.files {
567 Some(files) => {
568 let files = files.to_owned();
569 let files_scanned = files.len();
570 (files, files_scanned, 0)
571 }
572 None => {
573 if let Some(predicate) = &logical_filter {
574 let pruning_predicate =
575 PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
576 let files_to_prune = pruning_predicate.prune(self.snapshot)?;
577 let mut files_pruned = 0usize;
578 let files = self
579 .snapshot
580 .file_actions_iter()?
581 .zip(files_to_prune.into_iter())
582 .filter_map(|(action, keep)| {
583 if keep {
584 Some(action.to_owned())
585 } else {
586 files_pruned += 1;
587 None
588 }
589 })
590 .collect::<Vec<_>>();
591
592 let files_scanned = files.len();
593 (files, files_scanned, files_pruned)
594 } else {
595 let files = self.snapshot.file_actions()?;
596 let files_scanned = files.len();
597 (files, files_scanned, 0)
598 }
599 }
600 };
601
602 let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();
606
607 let table_partition_cols = &self.snapshot.metadata().partition_columns;
608
609 for action in files.iter() {
610 let mut part = partitioned_file_from_action(action, table_partition_cols, &schema);
611
612 if config.file_column_name.is_some() {
613 let partition_value = if config.wrap_partition_values {
614 wrap_partition_value_in_dict(ScalarValue::Utf8(Some(action.path.clone())))
615 } else {
616 ScalarValue::Utf8(Some(action.path.clone()))
617 };
618 part.partition_values.push(partition_value);
619 }
620
621 file_groups
622 .entry(part.partition_values.clone())
623 .or_default()
624 .push(part);
625 }
626
627 let file_schema = Arc::new(ArrowSchema::new(
628 schema
629 .fields()
630 .iter()
631 .filter(|f| !table_partition_cols.contains(f.name()))
632 .cloned()
633 .collect::<Vec<arrow::datatypes::FieldRef>>(),
634 ));
635
636 let mut table_partition_cols = table_partition_cols
637 .iter()
638 .map(|name| schema.field_with_name(name).map(|f| f.to_owned()))
639 .collect::<Result<Vec<_>, ArrowError>>()?;
640
641 if let Some(file_column_name) = &config.file_column_name {
642 let field_name_datatype = if config.wrap_partition_values {
643 wrap_partition_type_in_dict(ArrowDataType::Utf8)
644 } else {
645 ArrowDataType::Utf8
646 };
647 table_partition_cols.push(Field::new(
648 file_column_name.clone(),
649 field_name_datatype,
650 false,
651 ));
652 }
653
654 let stats = self
655 .snapshot
656 .datafusion_table_statistics()
657 .unwrap_or(Statistics::new_unknown(&schema));
658
659 let parquet_options = TableParquetOptions {
660 global: self.session.config().options().execution.parquet.clone(),
661 ..Default::default()
662 };
663
664 let mut file_source = ParquetSource::new(parquet_options)
665 .with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {}));
666
667 if let Some(predicate) = logical_filter {
671 if config.enable_parquet_pushdown {
672 file_source = file_source.with_predicate(Arc::clone(&file_schema), predicate);
673 }
674 };
675
676 let file_scan_config = FileScanConfigBuilder::new(
677 self.log_store.object_store_url(),
678 file_schema,
679 Arc::new(file_source),
680 )
681 .with_file_groups(
682 if file_groups.is_empty() {
687 vec![FileGroup::from(vec![])]
688 } else {
689 file_groups.into_values().map(FileGroup::from).collect()
690 },
691 )
692 .with_statistics(stats)
693 .with_projection(self.projection.cloned())
694 .with_limit(self.limit)
695 .with_table_partition_cols(table_partition_cols)
696 .build();
697
698 let metrics = ExecutionPlanMetricsSet::new();
699 MetricBuilder::new(&metrics)
700 .global_counter("files_scanned")
701 .add(files_scanned);
702 MetricBuilder::new(&metrics)
703 .global_counter("files_pruned")
704 .add(files_pruned);
705
706 Ok(DeltaScan {
707 table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(),
708 parquet_scan: DataSourceExec::from_data_source(file_scan_config),
709 config,
710 logical_schema,
711 metrics,
712 })
713 }
714}
715
716#[async_trait]
718impl TableProvider for DeltaTable {
719 fn as_any(&self) -> &dyn Any {
720 self
721 }
722
723 fn schema(&self) -> Arc<ArrowSchema> {
724 self.snapshot().unwrap().arrow_schema().unwrap()
725 }
726
727 fn table_type(&self) -> TableType {
728 TableType::Base
729 }
730
731 fn get_table_definition(&self) -> Option<&str> {
732 None
733 }
734
735 fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
736 None
737 }
738
739 async fn scan(
740 &self,
741 session: &dyn Session,
742 projection: Option<&Vec<usize>>,
743 filters: &[Expr],
744 limit: Option<usize>,
745 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
746 register_store(self.log_store(), session.runtime_env().clone());
747 let filter_expr = conjunction(filters.iter().cloned());
748
749 let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session)
750 .with_projection(projection)
751 .with_limit(limit)
752 .with_filter(filter_expr)
753 .build()
754 .await?;
755
756 Ok(Arc::new(scan))
757 }
758
759 fn supports_filters_pushdown(
760 &self,
761 filter: &[&Expr],
762 ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
763 Ok(filter
764 .iter()
765 .map(|_| TableProviderFilterPushDown::Inexact)
766 .collect())
767 }
768
769 fn statistics(&self) -> Option<Statistics> {
770 self.snapshot().ok()?.datafusion_table_statistics()
771 }
772}
773
774#[derive(Debug)]
776pub struct DeltaTableProvider {
777 snapshot: DeltaTableState,
778 log_store: LogStoreRef,
779 config: DeltaScanConfig,
780 schema: Arc<ArrowSchema>,
781 files: Option<Vec<Add>>,
782}
783
784impl DeltaTableProvider {
785 pub fn try_new(
787 snapshot: DeltaTableState,
788 log_store: LogStoreRef,
789 config: DeltaScanConfig,
790 ) -> DeltaResult<Self> {
791 Ok(DeltaTableProvider {
792 schema: df_logical_schema(&snapshot, &config.file_column_name, config.schema.clone())?,
793 snapshot,
794 log_store,
795 config,
796 files: None,
797 })
798 }
799
800 pub fn with_files(mut self, files: Vec<Add>) -> DeltaTableProvider {
802 self.files = Some(files);
803 self
804 }
805}
806
807#[async_trait]
808impl TableProvider for DeltaTableProvider {
809 fn as_any(&self) -> &dyn Any {
810 self
811 }
812
813 fn schema(&self) -> Arc<ArrowSchema> {
814 self.schema.clone()
815 }
816
817 fn table_type(&self) -> TableType {
818 TableType::Base
819 }
820
821 fn get_table_definition(&self) -> Option<&str> {
822 None
823 }
824
825 fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
826 None
827 }
828
829 async fn scan(
830 &self,
831 session: &dyn Session,
832 projection: Option<&Vec<usize>>,
833 filters: &[Expr],
834 limit: Option<usize>,
835 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
836 register_store(self.log_store.clone(), session.runtime_env().clone());
837 let filter_expr = conjunction(filters.iter().cloned());
838
839 let mut scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
840 .with_projection(projection)
841 .with_limit(limit)
842 .with_filter(filter_expr)
843 .with_scan_config(self.config.clone());
844
845 if let Some(files) = &self.files {
846 scan = scan.with_files(files);
847 }
848 Ok(Arc::new(scan.build().await?))
849 }
850
851 fn supports_filters_pushdown(
852 &self,
853 filter: &[&Expr],
854 ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
855 Ok(filter
856 .iter()
857 .map(|_| TableProviderFilterPushDown::Inexact)
858 .collect())
859 }
860
861 fn statistics(&self) -> Option<Statistics> {
862 self.snapshot.datafusion_table_statistics()
863 }
864}
865
866#[derive(Debug)]
867pub struct LazyTableProvider {
868 schema: Arc<ArrowSchema>,
869 batches: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
870}
871
872impl LazyTableProvider {
873 pub fn try_new(
875 schema: Arc<ArrowSchema>,
876 batches: Vec<Arc<RwLock<dyn LazyBatchGenerator>>>,
877 ) -> DeltaResult<Self> {
878 Ok(LazyTableProvider { schema, batches })
879 }
880}
881
882#[async_trait]
883impl TableProvider for LazyTableProvider {
884 fn as_any(&self) -> &dyn Any {
885 self
886 }
887
888 fn schema(&self) -> Arc<ArrowSchema> {
889 self.schema.clone()
890 }
891
892 fn table_type(&self) -> TableType {
893 TableType::Base
894 }
895
896 fn get_table_definition(&self) -> Option<&str> {
897 None
898 }
899
900 fn get_logical_plan(&self) -> Option<Cow<'_, LogicalPlan>> {
901 None
902 }
903
904 async fn scan(
905 &self,
906 _session: &dyn Session,
907 projection: Option<&Vec<usize>>,
908 filters: &[Expr],
909 limit: Option<usize>,
910 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
911 let mut plan: Arc<dyn ExecutionPlan> = Arc::new(LazyMemoryExec::try_new(
912 self.schema(),
913 self.batches.clone(),
914 )?);
915
916 let df_schema: DFSchema = plan.schema().try_into()?;
917
918 if let Some(filter_expr) = conjunction(filters.iter().cloned()) {
919 let physical_expr =
920 create_physical_expr(&filter_expr, &df_schema, &ExecutionProps::new())?;
921 plan = Arc::new(FilterExec::try_new(physical_expr, plan)?);
922 }
923
924 if let Some(projection) = projection {
925 let current_projection = (0..plan.schema().fields().len()).collect::<Vec<usize>>();
926 if projection != ¤t_projection {
927 let execution_props = &ExecutionProps::new();
928 let fields: DeltaResult<Vec<(Arc<dyn PhysicalExpr>, String)>> = projection
929 .iter()
930 .map(|i| {
931 let (table_ref, field) = df_schema.qualified_field(*i);
932 create_physical_expr(
933 &Expr::Column(Column::from((table_ref, field))),
934 &df_schema,
935 execution_props,
936 )
937 .map(|expr| (expr, field.name().clone()))
938 .map_err(DeltaTableError::from)
939 })
940 .collect();
941 plan = Arc::new(ProjectionExec::try_new(fields?, plan)?);
942 }
943 }
944
945 if let Some(limit) = limit {
946 plan = Arc::new(GlobalLimitExec::new(plan, 0, Some(limit)))
947 };
948
949 Ok(plan)
950 }
951
952 fn supports_filters_pushdown(
953 &self,
954 filter: &[&Expr],
955 ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
956 Ok(filter
957 .iter()
958 .map(|_| TableProviderFilterPushDown::Inexact)
959 .collect())
960 }
961
962 fn statistics(&self) -> Option<Statistics> {
963 None
964 }
965}
966
967#[derive(Debug)]
970pub struct DeltaScan {
971 pub table_uri: String,
973 pub config: DeltaScanConfig,
975 pub parquet_scan: Arc<dyn ExecutionPlan>,
977 pub logical_schema: Arc<ArrowSchema>,
979 metrics: ExecutionPlanMetricsSet,
981}
982
983#[derive(Debug, Serialize, Deserialize)]
984struct DeltaScanWire {
985 pub table_uri: String,
986 pub config: DeltaScanConfig,
987 pub logical_schema: Arc<ArrowSchema>,
988}
989
990impl DisplayAs for DeltaScan {
991 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
992 write!(f, "DeltaScan")
993 }
994}
995
996impl ExecutionPlan for DeltaScan {
997 fn name(&self) -> &str {
998 Self::static_name()
999 }
1000
1001 fn as_any(&self) -> &dyn Any {
1002 self
1003 }
1004
1005 fn schema(&self) -> SchemaRef {
1006 self.parquet_scan.schema()
1007 }
1008
1009 fn properties(&self) -> &PlanProperties {
1010 self.parquet_scan.properties()
1011 }
1012
1013 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1014 vec![&self.parquet_scan]
1015 }
1016
1017 fn with_new_children(
1018 self: Arc<Self>,
1019 children: Vec<Arc<dyn ExecutionPlan>>,
1020 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
1021 if children.len() != 1 {
1022 return Err(DataFusionError::Plan(format!(
1023 "DeltaScan wrong number of children {}",
1024 children.len()
1025 )));
1026 }
1027 Ok(Arc::new(DeltaScan {
1028 table_uri: self.table_uri.clone(),
1029 config: self.config.clone(),
1030 parquet_scan: children[0].clone(),
1031 logical_schema: self.logical_schema.clone(),
1032 metrics: self.metrics.clone(),
1033 }))
1034 }
1035
1036 fn execute(
1037 &self,
1038 partition: usize,
1039 context: Arc<TaskContext>,
1040 ) -> DataFusionResult<SendableRecordBatchStream> {
1041 self.parquet_scan.execute(partition, context)
1042 }
1043
1044 fn metrics(&self) -> Option<MetricsSet> {
1045 Some(self.metrics.clone_inner())
1046 }
1047
1048 fn statistics(&self) -> DataFusionResult<Statistics> {
1049 self.parquet_scan.statistics()
1050 }
1051
1052 fn repartitioned(
1053 &self,
1054 target_partitions: usize,
1055 config: &ConfigOptions,
1056 ) -> DataFusionResult<Option<Arc<dyn ExecutionPlan>>> {
1057 if let Some(parquet_scan) = self.parquet_scan.repartitioned(target_partitions, config)? {
1058 Ok(Some(Arc::new(DeltaScan {
1059 table_uri: self.table_uri.clone(),
1060 config: self.config.clone(),
1061 parquet_scan,
1062 logical_schema: self.logical_schema.clone(),
1063 metrics: self.metrics.clone(),
1064 })))
1065 } else {
1066 Ok(None)
1067 }
1068 }
1069}
1070
1071pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
1072 match t {
1073 ArrowDataType::Null => Ok(ScalarValue::Null),
1074 ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
1075 ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
1076 ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
1077 ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
1078 ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
1079 ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
1080 ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
1081 ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
1082 ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
1083 ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
1084 ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
1085 ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
1086 ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
1087 ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
1088 ArrowDataType::FixedSizeBinary(size) => {
1089 Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
1090 }
1091 ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
1092 ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
1093 ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
1094 ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
1095 None,
1096 precision.to_owned(),
1097 scale.to_owned(),
1098 )),
1099 ArrowDataType::Timestamp(unit, tz) => {
1100 let tz = tz.to_owned();
1101 Ok(match unit {
1102 TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
1103 TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
1104 TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
1105 TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
1106 })
1107 }
1108 ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
1109 k.clone(),
1110 Box::new(get_null_of_arrow_type(v).unwrap()),
1111 )),
1112 ArrowDataType::Float16
1114 | ArrowDataType::Decimal256(_, _)
1115 | ArrowDataType::Union(_, _)
1116 | ArrowDataType::LargeList(_)
1117 | ArrowDataType::Struct(_)
1118 | ArrowDataType::List(_)
1119 | ArrowDataType::FixedSizeList(_, _)
1120 | ArrowDataType::Time32(_)
1121 | ArrowDataType::Time64(_)
1122 | ArrowDataType::Duration(_)
1123 | ArrowDataType::Interval(_)
1124 | ArrowDataType::RunEndEncoded(_, _)
1125 | ArrowDataType::BinaryView
1126 | ArrowDataType::Utf8View
1127 | ArrowDataType::LargeListView(_)
1128 | ArrowDataType::ListView(_)
1129 | ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
1130 "Unsupported data type for Delta Lake {t}"
1131 ))),
1132 }
1133}
1134
1135fn partitioned_file_from_action(
1136 action: &Add,
1137 partition_columns: &[String],
1138 schema: &ArrowSchema,
1139) -> PartitionedFile {
1140 let partition_values = partition_columns
1141 .iter()
1142 .map(|part| {
1143 action
1144 .partition_values
1145 .get(part)
1146 .map(|val| {
1147 schema
1148 .field_with_name(part)
1149 .map(|field| match val {
1150 Some(value) => to_correct_scalar_value(
1151 &serde_json::Value::String(value.to_string()),
1152 field.data_type(),
1153 )
1154 .unwrap_or(Some(ScalarValue::Null))
1155 .unwrap_or(ScalarValue::Null),
1156 None => get_null_of_arrow_type(field.data_type())
1157 .unwrap_or(ScalarValue::Null),
1158 })
1159 .unwrap_or(ScalarValue::Null)
1160 })
1161 .unwrap_or(ScalarValue::Null)
1162 })
1163 .collect::<Vec<_>>();
1164
1165 let ts_secs = action.modification_time / 1000;
1166 let ts_ns = (action.modification_time % 1000) * 1_000_000;
1167 let last_modified = Utc.from_utc_datetime(
1168 &DateTime::from_timestamp(ts_secs, ts_ns as u32)
1169 .unwrap()
1170 .naive_utc(),
1171 );
1172 PartitionedFile {
1173 object_meta: ObjectMeta {
1174 last_modified,
1175 ..action.try_into().unwrap()
1176 },
1177 partition_values,
1178 range: None,
1179 extensions: None,
1180 statistics: None,
1181 metadata_size_hint: None,
1182 }
1183}
1184
1185fn parse_date(
1186 stat_val: &serde_json::Value,
1187 field_dt: &ArrowDataType,
1188) -> DataFusionResult<ScalarValue> {
1189 let string = match stat_val {
1190 serde_json::Value::String(s) => s.to_owned(),
1191 _ => stat_val.to_string(),
1192 };
1193
1194 let time_micro = ScalarValue::try_from_string(string, &ArrowDataType::Date32)?;
1195 let cast_arr = cast_with_options(
1196 &time_micro.to_array()?,
1197 field_dt,
1198 &CastOptions {
1199 safe: false,
1200 ..Default::default()
1201 },
1202 )?;
1203 ScalarValue::try_from_array(&cast_arr, 0)
1204}
1205
1206fn parse_timestamp(
1207 stat_val: &serde_json::Value,
1208 field_dt: &ArrowDataType,
1209) -> DataFusionResult<ScalarValue> {
1210 let string = match stat_val {
1211 serde_json::Value::String(s) => s.to_owned(),
1212 _ => stat_val.to_string(),
1213 };
1214
1215 let time_micro = ScalarValue::try_from_string(
1216 string,
1217 &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
1218 )?;
1219 let cast_arr = cast_with_options(
1220 &time_micro.to_array()?,
1221 field_dt,
1222 &CastOptions {
1223 safe: false,
1224 ..Default::default()
1225 },
1226 )?;
1227 ScalarValue::try_from_array(&cast_arr, 0)
1228}
1229
1230pub(crate) fn to_correct_scalar_value(
1231 stat_val: &serde_json::Value,
1232 field_dt: &ArrowDataType,
1233) -> DataFusionResult<Option<ScalarValue>> {
1234 match stat_val {
1235 serde_json::Value::Array(_) => Ok(None),
1236 serde_json::Value::Object(_) => Ok(None),
1237 serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)),
1238 serde_json::Value::String(string_val) => match field_dt {
1239 ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
1240 ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
1241 _ => Ok(Some(ScalarValue::try_from_string(
1242 string_val.to_owned(),
1243 field_dt,
1244 )?)),
1245 },
1246 other => match field_dt {
1247 ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
1248 ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
1249 _ => Ok(Some(ScalarValue::try_from_string(
1250 other.to_string(),
1251 field_dt,
1252 )?)),
1253 },
1254 }
1255}
1256
1257pub(crate) async fn execute_plan_to_batch(
1258 state: &SessionState,
1259 plan: Arc<dyn ExecutionPlan>,
1260) -> DeltaResult<arrow::record_batch::RecordBatch> {
1261 let data = futures::future::try_join_all(
1262 (0..plan.properties().output_partitioning().partition_count()).map(|p| {
1263 let plan_copy = plan.clone();
1264 let task_context = state.task_ctx().clone();
1265 async move {
1266 let batch_stream = plan_copy.execute(p, task_context)?;
1267
1268 let schema = batch_stream.schema();
1269
1270 let batches = batch_stream.try_collect::<Vec<_>>().await?;
1271
1272 DataFusionResult::<_>::Ok(concat_batches(&schema, batches.iter())?)
1273 }
1274 }),
1275 )
1276 .await?;
1277
1278 Ok(concat_batches(&plan.schema(), data.iter())?)
1279}
1280
1281#[derive(Clone, Default)]
1283pub struct DeltaDataChecker {
1284 constraints: Vec<Constraint>,
1285 invariants: Vec<Invariant>,
1286 generated_columns: Vec<GeneratedColumn>,
1287 non_nullable_columns: Vec<String>,
1288 ctx: SessionContext,
1289}
1290
1291impl DeltaDataChecker {
1292 pub fn empty() -> Self {
1294 Self {
1295 invariants: vec![],
1296 constraints: vec![],
1297 generated_columns: vec![],
1298 non_nullable_columns: vec![],
1299 ctx: DeltaSessionContext::default().into(),
1300 }
1301 }
1302
1303 pub fn new_with_invariants(invariants: Vec<Invariant>) -> Self {
1305 Self {
1306 invariants,
1307 constraints: vec![],
1308 generated_columns: vec![],
1309 non_nullable_columns: vec![],
1310 ctx: DeltaSessionContext::default().into(),
1311 }
1312 }
1313
1314 pub fn new_with_constraints(constraints: Vec<Constraint>) -> Self {
1316 Self {
1317 constraints,
1318 invariants: vec![],
1319 generated_columns: vec![],
1320 non_nullable_columns: vec![],
1321 ctx: DeltaSessionContext::default().into(),
1322 }
1323 }
1324
1325 pub fn new_with_generated_columns(generated_columns: Vec<GeneratedColumn>) -> Self {
1327 Self {
1328 constraints: vec![],
1329 invariants: vec![],
1330 generated_columns,
1331 non_nullable_columns: vec![],
1332 ctx: DeltaSessionContext::default().into(),
1333 }
1334 }
1335
1336 pub fn with_session_context(mut self, context: SessionContext) -> Self {
1338 self.ctx = context;
1339 self
1340 }
1341
1342 pub fn with_extra_constraints(mut self, constraints: Vec<Constraint>) -> Self {
1344 self.constraints.extend(constraints);
1345 self
1346 }
1347
1348 pub fn new(snapshot: &DeltaTableState) -> Self {
1350 let invariants = snapshot.schema().get_invariants().unwrap_or_default();
1351 let generated_columns = snapshot
1352 .schema()
1353 .get_generated_columns()
1354 .unwrap_or_default();
1355 let constraints = snapshot.table_config().get_constraints();
1356 let non_nullable_columns = snapshot
1357 .schema()
1358 .fields()
1359 .filter_map(|f| {
1360 if !f.is_nullable() {
1361 Some(f.name().clone())
1362 } else {
1363 None
1364 }
1365 })
1366 .collect_vec();
1367 Self {
1368 invariants,
1369 constraints,
1370 generated_columns,
1371 non_nullable_columns,
1372 ctx: DeltaSessionContext::default().into(),
1373 }
1374 }
1375
1376 pub async fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
1381 self.check_nullability(record_batch)?;
1382 self.enforce_checks(record_batch, &self.invariants).await?;
1383 self.enforce_checks(record_batch, &self.constraints).await?;
1384 self.enforce_checks(record_batch, &self.generated_columns)
1385 .await
1386 }
1387
1388 fn check_nullability(&self, record_batch: &RecordBatch) -> Result<bool, DeltaTableError> {
1390 let mut violations = Vec::new();
1391 for col in self.non_nullable_columns.iter() {
1392 if let Some(arr) = record_batch.column_by_name(col) {
1393 if arr.null_count() > 0 {
1394 violations.push(format!(
1395 "Non-nullable column violation for {col}, found {} null values",
1396 arr.null_count()
1397 ));
1398 }
1399 } else {
1400 violations.push(format!(
1401 "Non-nullable column violation for {col}, not found in batch!"
1402 ));
1403 }
1404 }
1405 if !violations.is_empty() {
1406 Err(DeltaTableError::InvalidData { violations })
1407 } else {
1408 Ok(true)
1409 }
1410 }
1411
1412 async fn enforce_checks<C: DataCheck>(
1413 &self,
1414 record_batch: &RecordBatch,
1415 checks: &[C],
1416 ) -> Result<(), DeltaTableError> {
1417 if checks.is_empty() {
1418 return Ok(());
1419 }
1420 let table = MemTable::try_new(record_batch.schema(), vec![vec![record_batch.clone()]])?;
1421 let schema = table.schema();
1422 let table_name: String = uuid::Uuid::new_v4().to_string();
1424 self.ctx.register_table(&table_name, Arc::new(table))?;
1425
1426 let mut violations: Vec<String> = Vec::new();
1427
1428 for check in checks {
1429 let check_name = check.get_name();
1430 if check_name.contains('.') {
1431 return Err(DeltaTableError::Generic(
1432 "Support for nested columns is not supported.".to_string(),
1433 ));
1434 }
1435
1436 let field_to_select = if check.as_any().is::<Constraint>() {
1437 "*"
1438 } else {
1439 check_name
1440 };
1441
1442 let mut expression = check.get_expression().to_string();
1445 for field in schema.fields() {
1446 if expression.contains(field.name()) {
1447 expression =
1448 expression.replace(field.name(), format!("`{}` ", field.name()).as_str());
1449 break;
1450 }
1451 }
1452 let sql = format!(
1453 "SELECT {} FROM `{table_name}` WHERE NOT ({}) LIMIT 1",
1454 field_to_select, expression
1455 );
1456
1457 let dfs: Vec<RecordBatch> = self.ctx.sql(&sql).await?.collect().await?;
1458 if !dfs.is_empty() && dfs[0].num_rows() > 0 {
1459 let value: String = dfs[0]
1460 .columns()
1461 .iter()
1462 .map(|c| array_value_to_string(c, 0).unwrap_or(String::from("null")))
1463 .join(", ");
1464
1465 let msg = format!(
1466 "Check or Invariant ({}) violated by value in row: [{value}]",
1467 check.get_expression(),
1468 );
1469 violations.push(msg);
1470 }
1471 }
1472
1473 self.ctx.deregister_table(&table_name)?;
1474 if !violations.is_empty() {
1475 Err(DeltaTableError::InvalidData { violations })
1476 } else {
1477 Ok(())
1478 }
1479 }
1480}
1481
1482#[derive(Debug)]
1484pub struct DeltaPhysicalCodec {}
1485
1486impl PhysicalExtensionCodec for DeltaPhysicalCodec {
1487 fn try_decode(
1488 &self,
1489 buf: &[u8],
1490 inputs: &[Arc<dyn ExecutionPlan>],
1491 _registry: &dyn FunctionRegistry,
1492 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
1493 let wire: DeltaScanWire = serde_json::from_reader(buf)
1494 .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
1495 let delta_scan = DeltaScan {
1496 table_uri: wire.table_uri,
1497 parquet_scan: (*inputs)[0].clone(),
1498 config: wire.config,
1499 logical_schema: wire.logical_schema,
1500 metrics: ExecutionPlanMetricsSet::new(),
1501 };
1502 Ok(Arc::new(delta_scan))
1503 }
1504
1505 fn try_encode(
1506 &self,
1507 node: Arc<dyn ExecutionPlan>,
1508 buf: &mut Vec<u8>,
1509 ) -> Result<(), DataFusionError> {
1510 let delta_scan = node
1511 .as_any()
1512 .downcast_ref::<DeltaScan>()
1513 .ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?;
1514
1515 let wire = DeltaScanWire {
1516 table_uri: delta_scan.table_uri.to_owned(),
1517 config: delta_scan.config.clone(),
1518 logical_schema: delta_scan.logical_schema.clone(),
1519 };
1520 serde_json::to_writer(buf, &wire)
1521 .map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?;
1522 Ok(())
1523 }
1524}
1525
1526#[derive(Debug)]
1528pub struct DeltaLogicalCodec {}
1529
1530impl LogicalExtensionCodec for DeltaLogicalCodec {
1531 fn try_decode(
1532 &self,
1533 _buf: &[u8],
1534 _inputs: &[LogicalPlan],
1535 _ctx: &SessionContext,
1536 ) -> Result<Extension, DataFusionError> {
1537 todo!("DeltaLogicalCodec")
1538 }
1539
1540 fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<(), DataFusionError> {
1541 todo!("DeltaLogicalCodec")
1542 }
1543
1544 fn try_decode_table_provider(
1545 &self,
1546 buf: &[u8],
1547 _table_ref: &TableReference,
1548 _schema: SchemaRef,
1549 _ctx: &SessionContext,
1550 ) -> Result<Arc<dyn TableProvider>, DataFusionError> {
1551 let provider: DeltaTable = serde_json::from_slice(buf)
1552 .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))?;
1553 Ok(Arc::new(provider))
1554 }
1555
1556 fn try_encode_table_provider(
1557 &self,
1558 _table_ref: &TableReference,
1559 node: Arc<dyn TableProvider>,
1560 buf: &mut Vec<u8>,
1561 ) -> Result<(), DataFusionError> {
1562 let table = node
1563 .as_ref()
1564 .as_any()
1565 .downcast_ref::<DeltaTable>()
1566 .ok_or_else(|| {
1567 DataFusionError::Internal("Can't encode non-delta tables".to_string())
1568 })?;
1569 serde_json::to_writer(buf, table)
1570 .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
1571 }
1572}
1573
1574#[derive(Debug)]
1576pub struct DeltaTableFactory {}
1577
1578#[async_trait]
1579impl TableProviderFactory for DeltaTableFactory {
1580 async fn create(
1581 &self,
1582 _ctx: &dyn Session,
1583 cmd: &CreateExternalTable,
1584 ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
1585 let provider = if cmd.options.is_empty() {
1586 open_table(cmd.to_owned().location).await?
1587 } else {
1588 open_table_with_storage_options(cmd.to_owned().location, cmd.to_owned().options).await?
1589 };
1590 Ok(Arc::new(provider))
1591 }
1592}
1593
1594pub(crate) struct FindFilesExprProperties {
1595 pub partition_columns: Vec<String>,
1596
1597 pub partition_only: bool,
1598 pub result: DeltaResult<()>,
1599}
1600
1601impl TreeNodeVisitor<'_> for FindFilesExprProperties {
1605 type Node = Expr;
1606
1607 fn f_down(&mut self, expr: &Self::Node) -> datafusion_common::Result<TreeNodeRecursion> {
1608 match expr {
1613 Expr::Column(c) => {
1614 if !self.partition_columns.contains(&c.name) {
1615 self.partition_only = false;
1616 }
1617 }
1618 Expr::ScalarVariable(_, _)
1619 | Expr::Literal(_)
1620 | Expr::Alias(_)
1621 | Expr::BinaryExpr(_)
1622 | Expr::Like(_)
1623 | Expr::SimilarTo(_)
1624 | Expr::Not(_)
1625 | Expr::IsNotNull(_)
1626 | Expr::IsNull(_)
1627 | Expr::IsTrue(_)
1628 | Expr::IsFalse(_)
1629 | Expr::IsUnknown(_)
1630 | Expr::IsNotTrue(_)
1631 | Expr::IsNotFalse(_)
1632 | Expr::IsNotUnknown(_)
1633 | Expr::Negative(_)
1634 | Expr::InList { .. }
1635 | Expr::Between(_)
1636 | Expr::Case(_)
1637 | Expr::Cast(_)
1638 | Expr::TryCast(_) => (),
1639 Expr::ScalarFunction(scalar_function) => {
1640 match scalar_function.func.signature().volatility {
1641 Volatility::Immutable => (),
1642 _ => {
1643 self.result = Err(DeltaTableError::Generic(format!(
1644 "Find files predicate contains nondeterministic function {}",
1645 scalar_function.func.name()
1646 )));
1647 return Ok(TreeNodeRecursion::Stop);
1648 }
1649 }
1650 }
1651 _ => {
1652 self.result = Err(DeltaTableError::Generic(format!(
1653 "Find files predicate contains unsupported expression {expr}"
1654 )));
1655 return Ok(TreeNodeRecursion::Stop);
1656 }
1657 }
1658
1659 Ok(TreeNodeRecursion::Continue)
1660 }
1661}
1662
1663#[derive(Debug, Hash, Eq, PartialEq)]
1664pub struct FindFiles {
1666 pub candidates: Vec<Add>,
1668 pub partition_scan: bool,
1670}
1671
1672fn join_batches_with_add_actions(
1673 batches: Vec<RecordBatch>,
1674 mut actions: HashMap<String, Add>,
1675 path_column: &str,
1676 dict_array: bool,
1677) -> DeltaResult<Vec<Add>> {
1678 let mut files = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum());
1682 for batch in batches {
1683 let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
1684
1685 let iter: Box<dyn Iterator<Item = Option<&str>>> = if dict_array {
1686 let array = get_path_column(&batch, path_column)?;
1687 Box::new(array.into_iter())
1688 } else {
1689 let array = batch
1690 .column_by_name(path_column)
1691 .ok_or_else(err)?
1692 .as_any()
1693 .downcast_ref::<StringArray>()
1694 .ok_or_else(err)?;
1695 Box::new(array.into_iter())
1696 };
1697
1698 for path in iter {
1699 let path = path.ok_or(DeltaTableError::Generic(format!(
1700 "{path_column} cannot be null"
1701 )))?;
1702
1703 match actions.remove(path) {
1704 Some(action) => files.push(action),
1705 None => {
1706 return Err(DeltaTableError::Generic(
1707 "Unable to map __delta_rs_path to action.".to_owned(),
1708 ))
1709 }
1710 }
1711 }
1712 }
1713 Ok(files)
1714}
1715
1716pub(crate) async fn find_files_scan(
1718 snapshot: &DeltaTableState,
1719 log_store: LogStoreRef,
1720 state: &SessionState,
1721 expression: Expr,
1722) -> DeltaResult<Vec<Add>> {
1723 let candidate_map: HashMap<String, Add> = snapshot
1724 .file_actions_iter()?
1725 .map(|add| (add.path.clone(), add.to_owned()))
1726 .collect();
1727
1728 let scan_config = DeltaScanConfigBuilder {
1729 include_file_column: true,
1730 ..Default::default()
1731 }
1732 .build(snapshot)?;
1733
1734 let logical_schema = df_logical_schema(snapshot, &scan_config.file_column_name, None)?;
1735
1736 let mut used_columns = expression
1738 .column_refs()
1739 .into_iter()
1740 .map(|column| logical_schema.index_of(&column.name))
1741 .collect::<Result<Vec<usize>, ArrowError>>()?;
1742 used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);
1744
1745 let scan = DeltaScanBuilder::new(snapshot, log_store, state)
1746 .with_filter(Some(expression.clone()))
1747 .with_projection(Some(&used_columns))
1748 .with_scan_config(scan_config)
1749 .build()
1750 .await?;
1751 let scan = Arc::new(scan);
1752
1753 let config = &scan.config;
1754 let input_schema = scan.logical_schema.as_ref().to_owned();
1755 let input_dfschema = input_schema.clone().try_into()?;
1756
1757 let predicate_expr =
1758 state.create_physical_expr(Expr::IsTrue(Box::new(expression.clone())), &input_dfschema)?;
1759
1760 let filter: Arc<dyn ExecutionPlan> =
1761 Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);
1762 let limit: Arc<dyn ExecutionPlan> = Arc::new(LocalLimitExec::new(filter, 1));
1763
1764 let task_ctx = Arc::new(TaskContext::from(state));
1765 let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?;
1766
1767 join_batches_with_add_actions(
1768 path_batches,
1769 candidate_map,
1770 config.file_column_name.as_ref().unwrap(),
1771 true,
1772 )
1773}
1774
1775pub(crate) async fn scan_memory_table(
1776 snapshot: &DeltaTableState,
1777 predicate: &Expr,
1778) -> DeltaResult<Vec<Add>> {
1779 let actions = snapshot.file_actions()?;
1780
1781 let batch = snapshot.add_actions_table(true)?;
1782 let mut arrays = Vec::new();
1783 let mut fields = Vec::new();
1784
1785 let schema = batch.schema();
1786
1787 arrays.push(
1788 batch
1789 .column_by_name("path")
1790 .ok_or(DeltaTableError::Generic(
1791 "Column with name `path` does not exist".to_owned(),
1792 ))?
1793 .to_owned(),
1794 );
1795 fields.push(Field::new(PATH_COLUMN, ArrowDataType::Utf8, false));
1796
1797 for field in schema.fields() {
1798 if field.name().starts_with("partition.") {
1799 let name = field.name().strip_prefix("partition.").unwrap();
1800
1801 arrays.push(batch.column_by_name(field.name()).unwrap().to_owned());
1802 fields.push(Field::new(
1803 name,
1804 field.data_type().to_owned(),
1805 field.is_nullable(),
1806 ));
1807 }
1808 }
1809
1810 let schema = Arc::new(ArrowSchema::new(fields));
1811 let batch = RecordBatch::try_new(schema, arrays)?;
1812 let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
1813
1814 let ctx = SessionContext::new();
1815 let mut df = ctx.read_table(Arc::new(mem_table))?;
1816 df = df
1817 .filter(predicate.to_owned())?
1818 .select(vec![col(PATH_COLUMN)])?;
1819 let batches = df.collect().await?;
1820
1821 let map = actions
1822 .into_iter()
1823 .map(|action| (action.path.clone(), action))
1824 .collect::<HashMap<String, Add>>();
1825
1826 join_batches_with_add_actions(batches, map, PATH_COLUMN, false)
1827}
1828
1829pub async fn find_files(
1831 snapshot: &DeltaTableState,
1832 log_store: LogStoreRef,
1833 state: &SessionState,
1834 predicate: Option<Expr>,
1835) -> DeltaResult<FindFiles> {
1836 let current_metadata = snapshot.metadata();
1837
1838 match &predicate {
1839 Some(predicate) => {
1840 let mut expr_properties = FindFilesExprProperties {
1842 partition_only: true,
1843 partition_columns: current_metadata.partition_columns.clone(),
1844 result: Ok(()),
1845 };
1846
1847 TreeNode::visit(predicate, &mut expr_properties)?;
1848 expr_properties.result?;
1849
1850 if expr_properties.partition_only {
1851 let candidates = scan_memory_table(snapshot, predicate).await?;
1852 Ok(FindFiles {
1853 candidates,
1854 partition_scan: true,
1855 })
1856 } else {
1857 let candidates =
1858 find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?;
1859
1860 Ok(FindFiles {
1861 candidates,
1862 partition_scan: false,
1863 })
1864 }
1865 }
1866 None => Ok(FindFiles {
1867 candidates: snapshot.file_actions()?,
1868 partition_scan: true,
1869 }),
1870 }
1871}
1872
1873pub struct DeltaParserOptions {
1875 inner: ParserOptions,
1876}
1877
1878impl Default for DeltaParserOptions {
1879 fn default() -> Self {
1880 DeltaParserOptions {
1881 inner: ParserOptions {
1882 enable_ident_normalization: false,
1883 ..ParserOptions::default()
1884 },
1885 }
1886 }
1887}
1888
1889impl From<DeltaParserOptions> for ParserOptions {
1890 fn from(value: DeltaParserOptions) -> Self {
1891 value.inner
1892 }
1893}
1894
1895pub struct DeltaSessionConfig {
1897 inner: SessionConfig,
1898}
1899
1900impl Default for DeltaSessionConfig {
1901 fn default() -> Self {
1902 DeltaSessionConfig {
1903 inner: SessionConfig::default()
1904 .set_bool("datafusion.sql_parser.enable_ident_normalization", false),
1905 }
1906 }
1907}
1908
1909impl From<DeltaSessionConfig> for SessionConfig {
1910 fn from(value: DeltaSessionConfig) -> Self {
1911 value.inner
1912 }
1913}
1914
1915pub struct DeltaSessionContext {
1917 inner: SessionContext,
1918}
1919
1920impl Default for DeltaSessionContext {
1921 fn default() -> Self {
1922 DeltaSessionContext {
1923 inner: SessionContext::new_with_config(DeltaSessionConfig::default().into()),
1924 }
1925 }
1926}
1927
1928impl From<DeltaSessionContext> for SessionContext {
1929 fn from(value: DeltaSessionContext) -> Self {
1930 value.inner
1931 }
1932}
1933
1934pub struct DeltaColumn {
1936 inner: Column,
1937}
1938
1939impl From<&str> for DeltaColumn {
1940 fn from(c: &str) -> Self {
1941 DeltaColumn {
1942 inner: Column::from_qualified_name_ignore_case(c),
1943 }
1944 }
1945}
1946
1947impl From<&String> for DeltaColumn {
1949 fn from(c: &String) -> Self {
1950 DeltaColumn {
1951 inner: Column::from_qualified_name_ignore_case(c),
1952 }
1953 }
1954}
1955
1956impl From<String> for DeltaColumn {
1958 fn from(c: String) -> Self {
1959 DeltaColumn {
1960 inner: Column::from_qualified_name_ignore_case(c),
1961 }
1962 }
1963}
1964
1965impl From<DeltaColumn> for Column {
1966 fn from(value: DeltaColumn) -> Self {
1967 value.inner
1968 }
1969}
1970
1971impl From<Column> for DeltaColumn {
1973 fn from(c: Column) -> Self {
1974 DeltaColumn { inner: c }
1975 }
1976}
1977
1978#[cfg(test)]
1979mod tests {
1980 use crate::kernel::log_segment::PathExt;
1981 use crate::logstore::default_logstore::DefaultLogStore;
1982 use crate::logstore::ObjectStoreRef;
1983 use crate::operations::write::SchemaMode;
1984 use crate::writer::test_utils::get_delta_schema;
1985 use arrow::array::StructArray;
1986 use arrow::datatypes::{Field, Schema};
1987 use arrow_array::cast::AsArray;
1988 use bytes::Bytes;
1989 use chrono::{TimeZone, Utc};
1990 use datafusion::assert_batches_sorted_eq;
1991 use datafusion::datasource::physical_plan::FileScanConfig;
1992 use datafusion::datasource::source::DataSourceExec;
1993 use datafusion::physical_plan::empty::EmptyExec;
1994 use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr};
1995 use datafusion_expr::lit;
1996 use datafusion_proto::physical_plan::AsExecutionPlan;
1997 use datafusion_proto::protobuf;
1998 use futures::{stream::BoxStream, StreamExt};
1999 use object_store::{
2000 path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectStore,
2001 PutMultipartOpts, PutOptions, PutPayload, PutResult,
2002 };
2003 use serde_json::json;
2004 use std::fmt::{Debug, Display, Formatter};
2005 use std::ops::{Deref, Range};
2006 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
2007
2008 use super::*;
2009
2010 #[test]
2013 fn test_to_correct_scalar_value() {
2014 let reference_pairs = &[
2015 (
2016 json!("2015"),
2017 ArrowDataType::Int16,
2018 ScalarValue::Int16(Some(2015)),
2019 ),
2020 (
2021 json!("2015"),
2022 ArrowDataType::Int32,
2023 ScalarValue::Int32(Some(2015)),
2024 ),
2025 (
2026 json!("2015"),
2027 ArrowDataType::Int64,
2028 ScalarValue::Int64(Some(2015)),
2029 ),
2030 (
2031 json!("2015"),
2032 ArrowDataType::Float32,
2033 ScalarValue::Float32(Some(2015_f32)),
2034 ),
2035 (
2036 json!("2015"),
2037 ArrowDataType::Float64,
2038 ScalarValue::Float64(Some(2015_f64)),
2039 ),
2040 (
2041 json!(2015),
2042 ArrowDataType::Float64,
2043 ScalarValue::Float64(Some(2015_f64)),
2044 ),
2045 (
2046 json!("2015-01-01"),
2047 ArrowDataType::Date32,
2048 ScalarValue::Date32(Some(16436)),
2049 ),
2050 (
2072 json!(true),
2073 ArrowDataType::Boolean,
2074 ScalarValue::Boolean(Some(true)),
2075 ),
2076 ];
2077
2078 for (raw, data_type, ref_scalar) in reference_pairs {
2079 let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap();
2080 assert_eq!(*ref_scalar, scalar)
2081 }
2082 }
2083
2084 #[test]
2085 fn test_partitioned_file_from_action() {
2086 let mut partition_values = std::collections::HashMap::new();
2087 partition_values.insert("month".to_string(), Some("1".to_string()));
2088 partition_values.insert("year".to_string(), Some("2015".to_string()));
2089 let action = Add {
2090 path: "year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string(),
2091 size: 10644,
2092 partition_values,
2093 modification_time: 1660497727833,
2094 data_change: true,
2095 stats: None,
2096 deletion_vector: None,
2097 stats_parsed: None,
2098 tags: None,
2099 base_row_id: None,
2100 default_row_commit_version: None,
2101 clustering_provider: None,
2102 };
2103 let schema = ArrowSchema::new(vec![
2104 Field::new("year", ArrowDataType::Int64, true),
2105 Field::new("month", ArrowDataType::Int64, true),
2106 ]);
2107
2108 let part_columns = vec!["year".to_string(), "month".to_string()];
2109 let file = partitioned_file_from_action(&action, &part_columns, &schema);
2110 let ref_file = PartitionedFile {
2111 object_meta: object_store::ObjectMeta {
2112 location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
2113 last_modified: Utc.timestamp_millis_opt(1660497727833).unwrap(),
2114 size: 10644,
2115 e_tag: None,
2116 version: None,
2117 },
2118 partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(),
2119 range: None,
2120 extensions: None,
2121 statistics: None,
2122 metadata_size_hint: None,
2123 };
2124 assert_eq!(file.partition_values, ref_file.partition_values)
2125 }
2126
2127 #[tokio::test]
2128 async fn test_enforce_invariants() {
2129 let schema = Arc::new(Schema::new(vec![
2130 Field::new("a", ArrowDataType::Utf8, false),
2131 Field::new("b", ArrowDataType::Int32, false),
2132 ]));
2133 let batch = RecordBatch::try_new(
2134 Arc::clone(&schema),
2135 vec![
2136 Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c", "d"])),
2137 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
2138 ],
2139 )
2140 .unwrap();
2141 let invariants: Vec<Invariant> = vec![];
2143 assert!(DeltaDataChecker::new_with_invariants(invariants)
2144 .check_batch(&batch)
2145 .await
2146 .is_ok());
2147
2148 let invariants = vec![
2150 Invariant::new("a", "a is not null"),
2151 Invariant::new("b", "b < 1000"),
2152 ];
2153 assert!(DeltaDataChecker::new_with_invariants(invariants)
2154 .check_batch(&batch)
2155 .await
2156 .is_ok());
2157
2158 let invariants = vec![
2160 Invariant::new("a", "a is null"),
2161 Invariant::new("b", "b < 100"),
2162 ];
2163 let result = DeltaDataChecker::new_with_invariants(invariants)
2164 .check_batch(&batch)
2165 .await;
2166 assert!(result.is_err());
2167 assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
2168 if let Err(DeltaTableError::InvalidData { violations }) = result {
2169 assert_eq!(violations.len(), 2);
2170 }
2171
2172 let invariants = vec![Invariant::new("c", "c > 2000")];
2174 let result = DeltaDataChecker::new_with_invariants(invariants)
2175 .check_batch(&batch)
2176 .await;
2177 assert!(result.is_err());
2178
2179 let struct_fields = schema.fields().clone();
2181 let schema = Arc::new(Schema::new(vec![Field::new(
2182 "x",
2183 ArrowDataType::Struct(struct_fields),
2184 false,
2185 )]));
2186 let inner = Arc::new(StructArray::from(batch));
2187 let batch = RecordBatch::try_new(schema, vec![inner]).unwrap();
2188
2189 let invariants = vec![Invariant::new("x.b", "x.b < 1000")];
2190 let result = DeltaDataChecker::new_with_invariants(invariants)
2191 .check_batch(&batch)
2192 .await;
2193 assert!(result.is_err());
2194 assert!(matches!(result, Err(DeltaTableError::Generic { .. })));
2195 }
2196
2197 #[tokio::test]
2198 async fn test_enforce_constraints() {
2199 let schema = Arc::new(Schema::new(vec![
2200 Field::new("a", ArrowDataType::Utf8, false),
2201 Field::new("b", ArrowDataType::Int32, false),
2202 ]));
2203 let batch = RecordBatch::try_new(
2204 Arc::clone(&schema),
2205 vec![
2206 Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c", "d"])),
2207 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
2208 ],
2209 )
2210 .unwrap();
2211 let constraints: Vec<Constraint> = vec![];
2213 assert!(DeltaDataChecker::new_with_constraints(constraints)
2214 .check_batch(&batch)
2215 .await
2216 .is_ok());
2217
2218 let constraints = vec![
2220 Constraint::new("custom_a", "a is not null"),
2221 Constraint::new("custom_b", "b < 1000"),
2222 ];
2223 assert!(DeltaDataChecker::new_with_constraints(constraints)
2224 .check_batch(&batch)
2225 .await
2226 .is_ok());
2227
2228 let constraints = vec![
2230 Constraint::new("custom_a", "a is null"),
2231 Constraint::new("custom_B", "b < 100"),
2232 ];
2233 let result = DeltaDataChecker::new_with_constraints(constraints)
2234 .check_batch(&batch)
2235 .await;
2236 assert!(result.is_err());
2237 assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
2238 if let Err(DeltaTableError::InvalidData { violations }) = result {
2239 assert_eq!(violations.len(), 2);
2240 }
2241
2242 let constraints = vec![Constraint::new("custom_c", "c > 2000")];
2244 let result = DeltaDataChecker::new_with_constraints(constraints)
2245 .check_batch(&batch)
2246 .await;
2247 assert!(result.is_err());
2248 }
2249
2250 #[tokio::test]
2254 async fn test_constraints_with_spacey_fields() -> DeltaResult<()> {
2255 let schema = Arc::new(Schema::new(vec![
2256 Field::new("a", ArrowDataType::Utf8, false),
2257 Field::new("b bop", ArrowDataType::Int32, false),
2258 ]));
2259 let batch = RecordBatch::try_new(
2260 Arc::clone(&schema),
2261 vec![
2262 Arc::new(arrow::array::StringArray::from(vec![
2263 "a", "b bop", "c", "d",
2264 ])),
2265 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
2266 ],
2267 )?;
2268
2269 let constraints = vec![
2271 Constraint::new("custom a", "a is not null"),
2272 Constraint::new("custom_b", "b bop < 1000"),
2273 ];
2274 assert!(DeltaDataChecker::new_with_constraints(constraints)
2275 .check_batch(&batch)
2276 .await
2277 .is_ok());
2278
2279 let constraints = vec![
2281 Constraint::new("custom_a", "a is null"),
2282 Constraint::new("custom_B", "b bop < 100"),
2283 ];
2284 let result = DeltaDataChecker::new_with_constraints(constraints)
2285 .check_batch(&batch)
2286 .await;
2287 assert!(result.is_err());
2288 assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
2289 if let Err(DeltaTableError::InvalidData { violations }) = result {
2290 assert_eq!(violations.len(), 2);
2291 }
2292
2293 let constraints = vec![Constraint::new("custom_c", "c > 2000")];
2295 let result = DeltaDataChecker::new_with_constraints(constraints)
2296 .check_batch(&batch)
2297 .await;
2298 assert!(result.is_err());
2299 Ok(())
2300 }
2301
2302 #[test]
2303 fn roundtrip_test_delta_exec_plan() {
2304 let ctx = SessionContext::new();
2305 let codec = DeltaPhysicalCodec {};
2306
2307 let schema = Arc::new(Schema::new(vec![
2308 Field::new("a", ArrowDataType::Utf8, false),
2309 Field::new("b", ArrowDataType::Int32, false),
2310 ]));
2311 let exec_plan = Arc::from(DeltaScan {
2312 table_uri: "s3://my_bucket/this/is/some/path".to_string(),
2313 parquet_scan: Arc::from(EmptyExec::new(schema.clone())),
2314 config: DeltaScanConfig::default(),
2315 logical_schema: schema.clone(),
2316 metrics: ExecutionPlanMetricsSet::new(),
2317 });
2318 let proto: protobuf::PhysicalPlanNode =
2319 protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
2320 .expect("to proto");
2321
2322 let runtime = ctx.runtime_env();
2323 let result_exec_plan: Arc<dyn ExecutionPlan> = proto
2324 .try_into_physical_plan(&ctx, runtime.deref(), &codec)
2325 .expect("from proto");
2326 assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
2327 }
2328
2329 #[tokio::test]
2330 async fn delta_table_provider_with_config() {
2331 let table = crate::open_table("../test/tests/data/delta-2.2.0-partitioned-types")
2332 .await
2333 .unwrap();
2334 let config = DeltaScanConfigBuilder::new()
2335 .with_file_column_name(&"file_source")
2336 .build(table.snapshot().unwrap())
2337 .unwrap();
2338
2339 let log_store = table.log_store();
2340 let provider =
2341 DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config)
2342 .unwrap();
2343 let ctx = SessionContext::new();
2344 ctx.register_table("test", Arc::new(provider)).unwrap();
2345
2346 let df = ctx.sql("select * from test").await.unwrap();
2347 let actual = df.collect().await.unwrap();
2348 let expected = vec! [
2349 "+----+----+----+-------------------------------------------------------------------------------+",
2350 "| c3 | c1 | c2 | file_source |",
2351 "+----+----+----+-------------------------------------------------------------------------------+",
2352 "| 4 | 6 | a | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
2353 "| 5 | 4 | c | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
2354 "| 6 | 5 | b | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
2355 "+----+----+----+-------------------------------------------------------------------------------+",
2356 ];
2357 assert_batches_sorted_eq!(&expected, &actual);
2358 }
2359
2360 #[tokio::test]
2361 async fn delta_scan_mixed_partition_order() {
2362 let schema = Arc::new(ArrowSchema::new(vec![
2365 Field::new("modified", ArrowDataType::Utf8, true),
2366 Field::new("id", ArrowDataType::Utf8, true),
2367 Field::new("value", ArrowDataType::Int32, true),
2368 ]));
2369
2370 let table = crate::DeltaOps::new_in_memory()
2371 .create()
2372 .with_columns(get_delta_schema().fields().cloned())
2373 .with_partition_columns(["modified", "id"])
2374 .await
2375 .unwrap();
2376 assert_eq!(table.version(), 0);
2377
2378 let batch = RecordBatch::try_new(
2379 schema.clone(),
2380 vec![
2381 Arc::new(arrow::array::StringArray::from(vec![
2382 "2021-02-01",
2383 "2021-02-01",
2384 "2021-02-02",
2385 "2021-02-02",
2386 ])),
2387 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
2388 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
2389 ],
2390 )
2391 .unwrap();
2392 let table = crate::DeltaOps(table)
2394 .write(vec![batch.clone()])
2395 .with_save_mode(crate::protocol::SaveMode::Append)
2396 .await
2397 .unwrap();
2398
2399 let config = DeltaScanConfigBuilder::new()
2400 .build(table.snapshot().unwrap())
2401 .unwrap();
2402
2403 let log_store = table.log_store();
2404 let provider =
2405 DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config)
2406 .unwrap();
2407 let logical_schema = provider.schema();
2408 let ctx = SessionContext::new();
2409 ctx.register_table("test", Arc::new(provider)).unwrap();
2410
2411 let expected_logical_order = vec!["value", "modified", "id"];
2412 let actual_order: Vec<String> = logical_schema
2413 .fields()
2414 .iter()
2415 .map(|f| f.name().to_owned())
2416 .collect();
2417
2418 let df = ctx.sql("select * from test").await.unwrap();
2419 let actual = df.collect().await.unwrap();
2420 let expected = vec![
2421 "+-------+------------+----+",
2422 "| value | modified | id |",
2423 "+-------+------------+----+",
2424 "| 1 | 2021-02-01 | A |",
2425 "| 10 | 2021-02-01 | B |",
2426 "| 100 | 2021-02-02 | D |",
2427 "| 20 | 2021-02-02 | C |",
2428 "+-------+------------+----+",
2429 ];
2430 assert_batches_sorted_eq!(&expected, &actual);
2431 assert_eq!(expected_logical_order, actual_order);
2432 }
2433
2434 #[tokio::test]
2435 async fn delta_scan_case_sensitive() {
2436 let schema = Arc::new(ArrowSchema::new(vec![
2437 Field::new("moDified", ArrowDataType::Utf8, true),
2438 Field::new("ID", ArrowDataType::Utf8, true),
2439 Field::new("vaLue", ArrowDataType::Int32, true),
2440 ]));
2441
2442 let batch = RecordBatch::try_new(
2443 schema.clone(),
2444 vec![
2445 Arc::new(arrow::array::StringArray::from(vec![
2446 "2021-02-01",
2447 "2021-02-01",
2448 "2021-02-02",
2449 "2021-02-02",
2450 ])),
2451 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
2452 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
2453 ],
2454 )
2455 .unwrap();
2456 let table = crate::DeltaOps::new_in_memory()
2458 .write(vec![batch.clone()])
2459 .with_save_mode(crate::protocol::SaveMode::Append)
2460 .await
2461 .unwrap();
2462
2463 let config = DeltaScanConfigBuilder::new()
2464 .build(table.snapshot().unwrap())
2465 .unwrap();
2466 let log = table.log_store();
2467
2468 let provider =
2469 DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
2470 let ctx: SessionContext = DeltaSessionContext::default().into();
2471 ctx.register_table("test", Arc::new(provider)).unwrap();
2472
2473 let df = ctx
2474 .sql("select ID, moDified, vaLue from test")
2475 .await
2476 .unwrap();
2477 let actual = df.collect().await.unwrap();
2478 let expected = vec![
2479 "+----+------------+-------+",
2480 "| ID | moDified | vaLue |",
2481 "+----+------------+-------+",
2482 "| A | 2021-02-01 | 1 |",
2483 "| B | 2021-02-01 | 10 |",
2484 "| C | 2021-02-02 | 20 |",
2485 "| D | 2021-02-02 | 100 |",
2486 "+----+------------+-------+",
2487 ];
2488 assert_batches_sorted_eq!(&expected, &actual);
2489
2490 }
2502
2503 #[tokio::test]
2504 async fn delta_scan_supports_missing_columns() {
2505 let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
2506 "col_1",
2507 ArrowDataType::Utf8,
2508 true,
2509 )]));
2510
2511 let batch1 = RecordBatch::try_new(
2512 schema1.clone(),
2513 vec![Arc::new(arrow::array::StringArray::from(vec![
2514 Some("A"),
2515 Some("B"),
2516 ]))],
2517 )
2518 .unwrap();
2519
2520 let schema2 = Arc::new(ArrowSchema::new(vec![
2521 Field::new("col_1", ArrowDataType::Utf8, true),
2522 Field::new("col_2", ArrowDataType::Utf8, true),
2523 ]));
2524
2525 let batch2 = RecordBatch::try_new(
2526 schema2.clone(),
2527 vec![
2528 Arc::new(arrow::array::StringArray::from(vec![
2529 Some("E"),
2530 Some("F"),
2531 Some("G"),
2532 ])),
2533 Arc::new(arrow::array::StringArray::from(vec![
2534 Some("E2"),
2535 Some("F2"),
2536 Some("G2"),
2537 ])),
2538 ],
2539 )
2540 .unwrap();
2541
2542 let table = crate::DeltaOps::new_in_memory()
2543 .write(vec![batch2])
2544 .with_save_mode(crate::protocol::SaveMode::Append)
2545 .await
2546 .unwrap();
2547
2548 let table = crate::DeltaOps(table)
2549 .write(vec![batch1])
2550 .with_schema_mode(SchemaMode::Merge)
2551 .with_save_mode(crate::protocol::SaveMode::Append)
2552 .await
2553 .unwrap();
2554
2555 let config = DeltaScanConfigBuilder::new()
2556 .build(table.snapshot().unwrap())
2557 .unwrap();
2558 let log = table.log_store();
2559
2560 let provider =
2561 DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
2562 let ctx: SessionContext = DeltaSessionContext::default().into();
2563 ctx.register_table("test", Arc::new(provider)).unwrap();
2564
2565 let df = ctx.sql("select col_1, col_2 from test").await.unwrap();
2566 let actual = df.collect().await.unwrap();
2567 let expected = vec![
2568 "+-------+-------+",
2569 "| col_1 | col_2 |",
2570 "+-------+-------+",
2571 "| A | |",
2572 "| B | |",
2573 "| E | E2 |",
2574 "| F | F2 |",
2575 "| G | G2 |",
2576 "+-------+-------+",
2577 ];
2578 assert_batches_sorted_eq!(&expected, &actual);
2579 }
2580
2581 #[tokio::test]
2582 async fn delta_scan_supports_pushdown() {
2583 let schema = Arc::new(ArrowSchema::new(vec![
2584 Field::new("col_1", ArrowDataType::Utf8, false),
2585 Field::new("col_2", ArrowDataType::Utf8, false),
2586 ]));
2587
2588 let batch = RecordBatch::try_new(
2589 schema.clone(),
2590 vec![
2591 Arc::new(arrow::array::StringArray::from(vec![
2592 Some("A"),
2593 Some("B"),
2594 Some("C"),
2595 ])),
2596 Arc::new(arrow::array::StringArray::from(vec![
2597 Some("A2"),
2598 Some("B2"),
2599 Some("C2"),
2600 ])),
2601 ],
2602 )
2603 .unwrap();
2604
2605 let table = crate::DeltaOps::new_in_memory()
2606 .write(vec![batch])
2607 .with_save_mode(crate::protocol::SaveMode::Append)
2608 .await
2609 .unwrap();
2610
2611 let config = DeltaScanConfigBuilder::new()
2612 .build(table.snapshot().unwrap())
2613 .unwrap();
2614 let log = table.log_store();
2615
2616 let provider =
2617 DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
2618
2619 let mut cfg = SessionConfig::default();
2620 cfg.options_mut().execution.parquet.pushdown_filters = true;
2621 let ctx = SessionContext::new_with_config(cfg);
2622 ctx.register_table("test", Arc::new(provider)).unwrap();
2623
2624 let df = ctx
2625 .sql("select col_1, col_2 from test WHERE col_1 = 'A'")
2626 .await
2627 .unwrap();
2628 let actual = df.collect().await.unwrap();
2629 let expected = vec![
2630 "+-------+-------+",
2631 "| col_1 | col_2 |",
2632 "+-------+-------+",
2633 "| A | A2 |",
2634 "+-------+-------+",
2635 ];
2636 assert_batches_sorted_eq!(&expected, &actual);
2637 }
2638
2639 #[tokio::test]
2640 async fn delta_scan_supports_nested_missing_columns() {
2641 let column1_schema1: arrow::datatypes::Fields =
2642 vec![Field::new("col_1a", ArrowDataType::Utf8, true)].into();
2643 let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
2644 "col_1",
2645 ArrowDataType::Struct(column1_schema1.clone()),
2646 true,
2647 )]));
2648
2649 let batch1 = RecordBatch::try_new(
2650 schema1.clone(),
2651 vec![Arc::new(StructArray::new(
2652 column1_schema1,
2653 vec![Arc::new(arrow::array::StringArray::from(vec![
2654 Some("A"),
2655 Some("B"),
2656 ]))],
2657 None,
2658 ))],
2659 )
2660 .unwrap();
2661
2662 let column1_schema2: arrow_schema::Fields = vec![
2663 Field::new("col_1a", ArrowDataType::Utf8, true),
2664 Field::new("col_1b", ArrowDataType::Utf8, true),
2665 ]
2666 .into();
2667 let schema2 = Arc::new(ArrowSchema::new(vec![Field::new(
2668 "col_1",
2669 ArrowDataType::Struct(column1_schema2.clone()),
2670 true,
2671 )]));
2672
2673 let batch2 = RecordBatch::try_new(
2674 schema2.clone(),
2675 vec![Arc::new(StructArray::new(
2676 column1_schema2,
2677 vec![
2678 Arc::new(arrow::array::StringArray::from(vec![
2679 Some("E"),
2680 Some("F"),
2681 Some("G"),
2682 ])),
2683 Arc::new(arrow::array::StringArray::from(vec![
2684 Some("E2"),
2685 Some("F2"),
2686 Some("G2"),
2687 ])),
2688 ],
2689 None,
2690 ))],
2691 )
2692 .unwrap();
2693
2694 let table = crate::DeltaOps::new_in_memory()
2695 .write(vec![batch1])
2696 .with_save_mode(crate::protocol::SaveMode::Append)
2697 .await
2698 .unwrap();
2699
2700 let table = crate::DeltaOps(table)
2701 .write(vec![batch2])
2702 .with_schema_mode(SchemaMode::Merge)
2703 .with_save_mode(crate::protocol::SaveMode::Append)
2704 .await
2705 .unwrap();
2706
2707 let config = DeltaScanConfigBuilder::new()
2708 .build(table.snapshot().unwrap())
2709 .unwrap();
2710 let log = table.log_store();
2711
2712 let provider =
2713 DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap();
2714 let ctx: SessionContext = DeltaSessionContext::default().into();
2715 ctx.register_table("test", Arc::new(provider)).unwrap();
2716
2717 let df = ctx
2718 .sql("select col_1.col_1a, col_1.col_1b from test")
2719 .await
2720 .unwrap();
2721 let actual = df.collect().await.unwrap();
2722 let expected = vec![
2723 "+--------------------+--------------------+",
2724 "| test.col_1[col_1a] | test.col_1[col_1b] |",
2725 "+--------------------+--------------------+",
2726 "| A | |",
2727 "| B | |",
2728 "| E | E2 |",
2729 "| F | F2 |",
2730 "| G | G2 |",
2731 "+--------------------+--------------------+",
2732 ];
2733 assert_batches_sorted_eq!(&expected, &actual);
2734 }
2735
2736 #[tokio::test]
2737 async fn test_multiple_predicate_pushdown() {
2738 use crate::datafusion::prelude::SessionContext;
2739 let schema = Arc::new(ArrowSchema::new(vec![
2740 Field::new("moDified", ArrowDataType::Utf8, true),
2741 Field::new("id", ArrowDataType::Utf8, true),
2742 Field::new("vaLue", ArrowDataType::Int32, true),
2743 ]));
2744
2745 let batch = RecordBatch::try_new(
2746 schema.clone(),
2747 vec![
2748 Arc::new(arrow::array::StringArray::from(vec![
2749 "2021-02-01",
2750 "2021-02-01",
2751 "2021-02-02",
2752 "2021-02-02",
2753 ])),
2754 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
2755 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
2756 ],
2757 )
2758 .unwrap();
2759 let table = crate::DeltaOps::new_in_memory()
2761 .write(vec![batch.clone()])
2762 .with_save_mode(crate::protocol::SaveMode::Append)
2763 .await
2764 .unwrap();
2765
2766 let datafusion = SessionContext::new();
2767 let table = Arc::new(table);
2768
2769 datafusion.register_table("snapshot", table).unwrap();
2770
2771 let df = datafusion
2772 .sql("select * from snapshot where id > 10000 and id < 20000")
2773 .await
2774 .unwrap();
2775
2776 df.collect().await.unwrap();
2777 }
2778
2779 #[tokio::test]
2780 async fn test_delta_scan_builder_no_scan_config() {
2781 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
2782 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
2783 let table = crate::DeltaOps::new_in_memory()
2784 .write(vec![batch])
2785 .with_save_mode(crate::protocol::SaveMode::Append)
2786 .await
2787 .unwrap();
2788
2789 let ctx = SessionContext::new();
2790 let state = ctx.state();
2791 let scan = DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store(), &state)
2792 .with_filter(Some(col("a").eq(lit("s"))))
2793 .build()
2794 .await
2795 .unwrap();
2796
2797 let mut visitor = ParquetVisitor::default();
2798 visit_execution_plan(&scan, &mut visitor).unwrap();
2799
2800 assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
2801 }
2802
2803 #[tokio::test]
2804 async fn test_delta_scan_builder_scan_config_disable_pushdown() {
2805 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
2806 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
2807 let table = crate::DeltaOps::new_in_memory()
2808 .write(vec![batch])
2809 .with_save_mode(crate::protocol::SaveMode::Append)
2810 .await
2811 .unwrap();
2812
2813 let snapshot = table.snapshot().unwrap();
2814 let ctx = SessionContext::new();
2815 let state = ctx.state();
2816 let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state)
2817 .with_filter(Some(col("a").eq(lit("s"))))
2818 .with_scan_config(
2819 DeltaScanConfigBuilder::new()
2820 .with_parquet_pushdown(false)
2821 .build(snapshot)
2822 .unwrap(),
2823 )
2824 .build()
2825 .await
2826 .unwrap();
2827
2828 let mut visitor = ParquetVisitor::default();
2829 visit_execution_plan(&scan, &mut visitor).unwrap();
2830
2831 assert!(visitor.predicate.is_none());
2832 }
2833
2834 #[tokio::test]
2835 async fn test_delta_scan_applies_parquet_options() {
2836 let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
2837 let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
2838 let table = crate::DeltaOps::new_in_memory()
2839 .write(vec![batch])
2840 .with_save_mode(crate::protocol::SaveMode::Append)
2841 .await
2842 .unwrap();
2843
2844 let snapshot = table.snapshot().unwrap();
2845
2846 let mut config = SessionConfig::default();
2847 config.options_mut().execution.parquet.pushdown_filters = true;
2848 let ctx = SessionContext::new_with_config(config);
2849 let state = ctx.state();
2850
2851 let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state)
2852 .build()
2853 .await
2854 .unwrap();
2855
2856 let mut visitor = ParquetVisitor::default();
2857 visit_execution_plan(&scan, &mut visitor).unwrap();
2858
2859 assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
2860 }
2861
2862 #[derive(Default)]
2864 struct ParquetVisitor {
2865 predicate: Option<Arc<dyn PhysicalExpr>>,
2866 options: Option<TableParquetOptions>,
2867 }
2868
2869 impl ExecutionPlanVisitor for ParquetVisitor {
2870 type Error = DataFusionError;
2871
2872 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
2873 let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
2874 return Ok(true);
2875 };
2876
2877 let Some(scan_config) = datasource_exec
2878 .data_source()
2879 .as_any()
2880 .downcast_ref::<FileScanConfig>()
2881 else {
2882 return Ok(true);
2883 };
2884
2885 if let Some(parquet_source) = scan_config
2886 .file_source
2887 .as_any()
2888 .downcast_ref::<ParquetSource>()
2889 {
2890 self.options = Some(parquet_source.table_parquet_options().clone());
2891 self.predicate = parquet_source.predicate().cloned();
2892 }
2893
2894 Ok(true)
2895 }
2896 }
2897
2898 #[tokio::test]
2899 async fn passes_sanity_checker_when_all_files_filtered() {
2900 let table = crate::open_table("../test/tests/data/delta-2.2.0-partitioned-types")
2906 .await
2907 .unwrap();
2908 let ctx = SessionContext::new();
2909 ctx.register_table("test", Arc::new(table)).unwrap();
2910
2911 let df = ctx
2912 .sql("select * from test where c3 = 100 ORDER BY c1 ASC")
2913 .await
2914 .unwrap();
2915 let actual = df.collect().await.unwrap();
2916
2917 assert_eq!(actual.len(), 0);
2918 }
2919
2920 #[tokio::test]
2921 async fn test_check_nullability() -> DeltaResult<()> {
2922 use arrow::array::StringArray;
2923
2924 let data_checker = DeltaDataChecker {
2925 non_nullable_columns: vec!["zed".to_string(), "yap".to_string()],
2926 ..Default::default()
2927 };
2928
2929 let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
2930 let nulls: Arc<dyn Array> = Arc::new(StringArray::new_null(1));
2931 let batch = RecordBatch::try_from_iter(vec![("a", arr), ("zed", nulls)]).unwrap();
2932
2933 let result = data_checker.check_nullability(&batch);
2934 assert!(
2935 result.is_err(),
2936 "The result should have errored! {result:?}"
2937 );
2938
2939 let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
2940 let batch = RecordBatch::try_from_iter(vec![("zed", arr)]).unwrap();
2941 let result = data_checker.check_nullability(&batch);
2942 assert!(
2943 result.is_err(),
2944 "The result should have errored! {result:?}"
2945 );
2946
2947 let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
2948 let batch = RecordBatch::try_from_iter(vec![("zed", arr.clone()), ("yap", arr)]).unwrap();
2949 let _ = data_checker.check_nullability(&batch)?;
2950
2951 Ok(())
2952 }
2953
2954 #[tokio::test]
2955 async fn test_delta_scan_uses_parquet_column_pruning() {
2956 let small: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["a"]));
2957 let large: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["b"
2958 .repeat(1024)
2959 .as_str()]));
2960 let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap();
2961 let table = crate::DeltaOps::new_in_memory()
2962 .write(vec![batch])
2963 .with_save_mode(crate::protocol::SaveMode::Append)
2964 .await
2965 .unwrap();
2966
2967 let config = DeltaScanConfigBuilder::new()
2968 .build(table.snapshot().unwrap())
2969 .unwrap();
2970
2971 let (object_store, mut operations) =
2972 RecordingObjectStore::new(table.log_store().object_store(None));
2973 let log_store =
2974 DefaultLogStore::new(Arc::new(object_store), table.log_store().config().clone());
2975 let provider = DeltaTableProvider::try_new(
2976 table.snapshot().unwrap().clone(),
2977 Arc::new(log_store),
2978 config,
2979 )
2980 .unwrap();
2981 let ctx = SessionContext::new();
2982 ctx.register_table("test", Arc::new(provider)).unwrap();
2983 let state = ctx.state();
2984
2985 let df = ctx.sql("select small from test").await.unwrap();
2986 let plan = df.create_physical_plan().await.unwrap();
2987
2988 let mut stream = plan.execute(0, state.task_ctx()).unwrap();
2989 let Some(Ok(batch)) = stream.next().await else {
2990 panic!()
2991 };
2992 assert!(stream.next().await.is_none());
2993 assert_eq!(1, batch.num_columns());
2994 assert_eq!(1, batch.num_rows());
2995 let small = batch.column_by_name("small").unwrap().as_string::<i32>();
2996 assert_eq!("a", small.iter().next().unwrap().unwrap());
2997
2998 let expected = vec![
2999 ObjectStoreOperation::GetRange(LocationType::Data, 4920..4928),
3000 ObjectStoreOperation::GetRange(LocationType::Data, 2399..4920),
3001 #[expect(clippy::single_range_in_vec_init)]
3002 ObjectStoreOperation::GetRanges(LocationType::Data, vec![4..58]),
3003 ];
3004 let mut actual = Vec::new();
3005 operations.recv_many(&mut actual, 3).await;
3006 assert_eq!(expected, actual);
3007 }
3008
3009 struct RecordingObjectStore {
3011 inner: ObjectStoreRef,
3012 operations: UnboundedSender<ObjectStoreOperation>,
3013 }
3014
3015 impl RecordingObjectStore {
3016 fn new(inner: ObjectStoreRef) -> (Self, UnboundedReceiver<ObjectStoreOperation>) {
3018 let (operations, operations_receiver) = unbounded_channel();
3019 (Self { inner, operations }, operations_receiver)
3020 }
3021 }
3022
3023 impl Display for RecordingObjectStore {
3024 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3025 Display::fmt(&self.inner, f)
3026 }
3027 }
3028
3029 impl Debug for RecordingObjectStore {
3030 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
3031 Debug::fmt(&self.inner, f)
3032 }
3033 }
3034
3035 #[derive(Debug, PartialEq)]
3036 enum ObjectStoreOperation {
3037 GetRanges(LocationType, Vec<Range<u64>>),
3038 GetRange(LocationType, Range<u64>),
3039 GetOpts(LocationType),
3040 Get(LocationType),
3041 }
3042
3043 #[derive(Debug, PartialEq)]
3044 enum LocationType {
3045 Data,
3046 Commit,
3047 }
3048
3049 impl From<&Path> for LocationType {
3050 fn from(value: &Path) -> Self {
3051 if value.is_commit_file() {
3052 LocationType::Commit
3053 } else if value.to_string().starts_with("part-") {
3054 LocationType::Data
3055 } else {
3056 panic!("Unknown location type: {value:?}")
3057 }
3058 }
3059 }
3060
3061 #[async_trait]
3063 impl ObjectStore for RecordingObjectStore {
3064 async fn put(
3065 &self,
3066 location: &Path,
3067 payload: PutPayload,
3068 ) -> object_store::Result<PutResult> {
3069 self.inner.put(location, payload).await
3070 }
3071
3072 async fn put_opts(
3073 &self,
3074 location: &Path,
3075 payload: PutPayload,
3076 opts: PutOptions,
3077 ) -> object_store::Result<PutResult> {
3078 self.inner.put_opts(location, payload, opts).await
3079 }
3080
3081 async fn put_multipart(
3082 &self,
3083 location: &Path,
3084 ) -> object_store::Result<Box<dyn MultipartUpload>> {
3085 self.inner.put_multipart(location).await
3086 }
3087
3088 async fn put_multipart_opts(
3089 &self,
3090 location: &Path,
3091 opts: PutMultipartOpts,
3092 ) -> object_store::Result<Box<dyn MultipartUpload>> {
3093 self.inner.put_multipart_opts(location, opts).await
3094 }
3095
3096 async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
3097 self.operations
3098 .send(ObjectStoreOperation::Get(location.into()))
3099 .unwrap();
3100 self.inner.get(location).await
3101 }
3102
3103 async fn get_opts(
3104 &self,
3105 location: &Path,
3106 options: GetOptions,
3107 ) -> object_store::Result<GetResult> {
3108 self.operations
3109 .send(ObjectStoreOperation::GetOpts(location.into()))
3110 .unwrap();
3111 self.inner.get_opts(location, options).await
3112 }
3113
3114 async fn get_range(
3115 &self,
3116 location: &Path,
3117 range: Range<u64>,
3118 ) -> object_store::Result<Bytes> {
3119 self.operations
3120 .send(ObjectStoreOperation::GetRange(
3121 location.into(),
3122 range.clone(),
3123 ))
3124 .unwrap();
3125 self.inner.get_range(location, range).await
3126 }
3127
3128 async fn get_ranges(
3129 &self,
3130 location: &Path,
3131 ranges: &[Range<u64>],
3132 ) -> object_store::Result<Vec<Bytes>> {
3133 self.operations
3134 .send(ObjectStoreOperation::GetRanges(
3135 location.into(),
3136 ranges.to_vec(),
3137 ))
3138 .unwrap();
3139 self.inner.get_ranges(location, ranges).await
3140 }
3141
3142 async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
3143 self.inner.head(location).await
3144 }
3145
3146 async fn delete(&self, location: &Path) -> object_store::Result<()> {
3147 self.inner.delete(location).await
3148 }
3149
3150 fn delete_stream<'a>(
3151 &'a self,
3152 locations: BoxStream<'a, object_store::Result<Path>>,
3153 ) -> BoxStream<'a, object_store::Result<Path>> {
3154 self.inner.delete_stream(locations)
3155 }
3156
3157 fn list(
3158 &self,
3159 prefix: Option<&Path>,
3160 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
3161 self.inner.list(prefix)
3162 }
3163
3164 fn list_with_offset(
3165 &self,
3166 prefix: Option<&Path>,
3167 offset: &Path,
3168 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
3169 self.inner.list_with_offset(prefix, offset)
3170 }
3171
3172 async fn list_with_delimiter(
3173 &self,
3174 prefix: Option<&Path>,
3175 ) -> object_store::Result<ListResult> {
3176 self.inner.list_with_delimiter(prefix).await
3177 }
3178
3179 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
3180 self.inner.copy(from, to).await
3181 }
3182
3183 async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
3184 self.inner.rename(from, to).await
3185 }
3186
3187 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
3188 self.inner.copy_if_not_exists(from, to).await
3189 }
3190
3191 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
3192 self.inner.rename_if_not_exists(from, to).await
3193 }
3194 }
3195}